Simple descriptions
An easy-to-understand explanation of all the key concepts and tools used to create a sentiment analysis model.
Learn how to apply BERT to sentiment analysis using TFX and Vertex AI pipelines with this
free course from DLabs.AI.
An easy-to-understand explanation of all the key concepts and tools used to create a sentiment analysis model.
The entire pipeline and helper library codebase, ready for use as a template in your own project.
Step-by-step instructions on how to apply TFX to sentiment analysis using ready-made components.
This free DLabs.AI course will teach you everything you need to run step-by-step sentiment analysis.
At DLabs.AI1 we wanted to start using the Google-backed TFX framework for Machine Learning pipelines, but were faced with a lack of comprehensive tutorials about it. So we started our own practical research into how TFX can be applied to Machine Learning in production. We decided to pick:
Sentiment analysis is a classic and easy-to-understand Machine Learning problem. It is also a task that has wide applications, especially in the retail industry.
BERT is a widely recognized NLP model that is used often throughout the industry.
On the other hand Vertex AI (a Google Cloud Platform service) is a product that was introduced not that long ago and thus is still in the adoption phase.
Finally, TFX (Google’s MLOps framework) is a tool that doesn’t yet have as wide an adoption as it deserves.
This course presents the outcome of our research – a very comprehensive, step-by-step description on how to apply TFX to our problem.
In this course, we will show how to:
We will also argue:
As you can see, there is a lot of ground to cover, so strap in, this ride is not going to be short but we promise, it’s going to be comprehensive.
You can view the full, open-source code of this pipeline at https://github.com/dlabsai/tfx-sentiment-analysis and the code of the helper library at https://github.com/dlabsai/tfx-helper (see also section 16).
In order to follow this course, you need the following:
From the technical standpoint, since we present a fully containerized solution (see section 11.8), you will only need an editor and a configured Docker2 on your machine.
In order to deploy the pipeline to the cloud you might also want to install Google Cloud CLI in order to be able to perform parts of the setup (see section 12.2).
Sentiment analysis is the use of Natural Language Processing to systematically extract affective states from text. In simple cases, it is used to extract a polarity of opinions (deciding whether an opinion is positive or negative). It is also commonly used to automatically classify customer reviews or emails as favourable or unfavourable. Companies typically wish to address critical opinions with haste and a Machine Learning model can help to identify such opinions as soon as they are posted, without human intervention. The company can then take swift action on encountering negative comments and improve overall consumer experience.
From a machine learning point of view, sentiment analysis is a binary classification problem. Having text as input we want to classify it as one of two exclusive classes (positive or negative). In practice we want the model to predict a single output – the polarity of opinion on a scale of 0 (negative) to 1 (positive).
Historically there were multiple approaches to solve the sentiment analysis problem.
As one can imagine, the simplest possible solution would be classifying a document based on the presence of hardcoded words associated with either a positive (amazing, great, like) or negative (hate, bad, disappointing) opinion. Such solutions were implemented in the past using classic NLP techniques:
However, such approaches had a major drawback: human language is deeply contextual and the tools were not equipped to deal with that because they were operating at token (word) level. The mere presence of a word doesn’t always determine the overall sentiment of a message. In the simplest case adding a negation (the product is not bad) can change the meaning of a phrase. In more complex examples (as is often the case with news articles or more formal correspondence) the sentiment is expressed less explicitly and relying on word presence is insufficient. Thus the contemporary approaches to natural language processing involve paying attention to the context in which words appear.
One of the famous Machine Learning models that can deal with phrase context is BERT3 (Bidirectional Encoder Representations from Transformers) developed by Google. This NLP model is based on transformers architecture, which uses a mechanism called bidirectional self-attention4 to learn the relationships between the words in a sentence. Together with the power of deep learning (neural networks) this enables BERT to understand the intricacies of natural language much better than previous methods.
BERT became an industry standard for Natural Language Processing. A big “selling point” of BERT is its ability to easily adapt the pre-trained model to many downstream tasks solving different NLP problems. As such it is an ideal candidate for adaptation to sentiment analysis task.
BERT is an open-source model that can be easily integrated into popular Machine Learning frameworks. In this course, we will use both the ready-made BERT preprocessor5 as well as BERT encoder6 from Tensorflow Hub.
The biggest disadvantage of BERT is the size of the model (110 million parameters). This means that both training and inference require powerful hardware and in practice need GPUs (Graphics Processing Units) or TPUs (Tensor Processing Units) to function efficiently. Regardless whether you will opt for a robust multi-core CPU machine or a machine with GPU, the costs of training and inference will be higher than your average tabular deep learning model.
There are a few ways to decrease the costs:
In real life it doesn’t matter at all how good your model is, unless you can deliver it to a production environment. MLOps is a set of practices for reliably deploying and maintaining machine learning models in production. MLOps is to Machine Learning what DevOps is to software development.
Many Machine Learning Engineers and Data Scientists concentrate most of their efforts on creating the best possible model, tuning and tweaking it, and trying out new things they read about in recent research papers. Since a lot of this work is experimentation, and development often happens in Jupyter Notebooks, one often ends up with a solution that cannot be reliably reproduced. Often the models produced by the ML team are difficult to deploy since they require additional dependencies and integration of parts of the ML code for preprocessing. Another factor that adds to this complexity is that development teams (responsible for embedding the ML models into wider applications) are not familiar with ML libraries and ML concepts (for example array operations) and find it hard to work with ML models.
MLOps is a methodology of dealing with those problems and TFX is Google’s own MLOps solution.
TFX9 (Tensorflow Extended) brings to the table a very well structured set of Google’s best MLOps practices. The library can be used to create Machine Learning pipelines in a clear and declarative way. The main advantage of employing TFX is the fact that for the most part there is no code to write (except of course the actual feature engineering and modelling code) to get a pipeline going.
The library comes with a set of standard components that implement various tasks in an ML pipeline (and require no user code unless stated):
ExampleGen
components can import training data from various formats and split them into training/validation/test sets.StatisticsGen
component can generate statistics describing training data.SchemaGen
component can automatically generate a schema of training data that the ML team can tweak later on.ExampleValidator
component can detect anomalies in data by comparing a set of data with previously calculated statistics and schema.Transform
component can perform data preprocessing and feature engineering (preprocessing callback function required to implement your logic).Trainer
component can train an ML model (model training callback function required).Tuner
component can perform best model hyperparameters search (tuner configuration callback function required).Evaluator
component can validate a trained model and verify if it passes a given set of checks.Pusher
component can push a trained model to an external destination (for example cloud endpoint).BulkInferrer
component can perform batch predictions using a trained model.Resolver
nodes can find artifacts from previous pipeline runs (for example previous trained model or last best set of hyperparameters).In addition one can define custom components that can be included into the pipeline.
The pipeline defined using TFX can be run on both local machines as well as in the cloud. Cloud deployments usually use Kubeflow Pipelines10 under the hood for workload orchestration. For the purpose of pipeline development one can also use InteractiveContext
from within a Jupyter Notebook environment to experiment with the pipeline and run steps one at a time (great tool for learning TFX).
TFX comes with a set of Google Cloud Platform-targeted extra components, which are almost drop-in replacements for standard TFX components that enable the user to easily leverage the distributed training and serving capabilities of Google Cloud Platform.
Some of the disadvantages of TFX we try to alleviate using our home-grown TFX helper library (see section 11.3).
Google Cloud Platform introduced Vertex AI not that long ago as their new Machine Learning platform that unites all AI/ML-related services under a common brand.
However from the perspective of creating Machine Learning pipelines the biggest advantage of Vertex AI over AI Platform is the introduction of managed pipelines. Vertex AI Pipelines service offers the possibility of running Kubeflow Pipelines and TFX workloads in a serverless manner – without having to keep a Kubeflow cluster running.
What that means is your Machine Learning team:
This allows your team to focus on the pipeline and modelling itself and not have to worry about providing an execution environment for the pipeline. It also means that you don’t need DevOps expertise to run a TFX pipeline in the cloud.
TFX supports running on Vertex AI out of the box (if you install it with Kubeflow Pipelines extension tfx[kfp]
).
In this section of the course, we are going to explain on a conceptual level how we can use BERT for sentiment analysis. For technical details jump down to the pipeline implementation section.
As mentioned earlier, BERT is a pre-trained NLP model that can be easily adapted to new downstream tasks. We will use BERT in the following way:
Please consult figure 1 for a conceptual visualization of the whole training process.
Sentiment analysis training process diagram
The IMDB reviews dataset13 is designed for binary sentiment analysis. Each sample contains only two fields:
review
– text of the user review,sentiment
– sentiment label; enumeration; either "positive"
or "negative"
.The text of the review contains HTML tags, mainly newlines (<br />
) that we will want to remove. We will use Tensorflow’s built-in regular expression support to execute the substitution.
There are 50k reviews in total. The labels are perfectly balanced – there are 25k positive and 25k negative reviews.
Our binary classification problem statement requires us to encode labels as a single numeric variable (0 for negative sentiment, 1 for positive sentiment).
BERT preprocessor is a component that transforms input raw text into the inputs required by BERT encoder. The component takes care of:
The preprocessor’s outputs are:
input_word_ids
– tensor with token IDs of input sequence,input_mask
– has value 1 for input tokens, 0 for padding tokens,input_type_ids
– used to distinguish between multiple input segments – in our case we have only one segment, so all values are zeroes.We will use an uncased English language BERT preprocessor14 fetched from Tensorflow Hub. Please note that the selection of the preprocessor variant depends on the selection of BERT Encoder that you are planning to use.
BERT encoder is a transformer-based NLP model. It needs to be fed carefully prepared inputs (see above). The encoder emits two main outputs:
pooled_output
– a high-dimensional text summarization vector,sequence_output
– one high-dimensional vector for each token in the input sequence.For the purpose of sentiment analysis, we can use the pooled_output
vector, since we need to generate a single classification for each input text we process. The encoder will effectively provide us with context-aware high-dimensional summarization of each of the input texts that we can use as input for subsequent modeling steps.
BERT models come in various sizes and variants. In this course, we will use the base size and uncased variant15 for the English language fetched from Tensorflow Hub.
The BERT encoder is going to provide a high-dimensional summarization of each input text. We are going to use this summarization and run it through a very simple neural network. The network will consist of:
We will use binary crossentropy loss as model loss. We will use Adam optimizer with learning rate selected by hyperparameter tuning. Since our training set is perfectly balanced we can choose a simple binary accuracy as the evaluation metric.
Our sentiment analysis model achieves about 80% accuracy on the test set.
From this point onwards the course goes into technical details and shows code snippets of both the pipeline creation as well as modeling code. The amount of code we want to show is huge, so we will need to take some shortcuts for the purpose of being concise. Please refer to the full code repository (see section 16) for code organization, full modules, and bits that were skipped in the course.
In this section we will:
We want our ML pipeline to consist of the following steps:
Please refer to figure 2 for rendered graph of pipeline components.
Figure 2: Pipeline components
DLabs.AI18 created a small open-source helper library19 (installable from PYPI20), called tfx-helper
, to help us deal with some aspects of TFX and add some missing functionalities.
The library:
if
statements and parameters in pipeline definition code,custom_config
),Evaluator
component and smaller machines for GCP extension components).In the following code samples we will use the helper library to write the pipeline definition code in a more streamlined fashion than pure TFX.
A TFX pipeline consists of a number of components that can be connected together in such a way that one component consumes the output of another component.
Using our helper library the signature of the pipeline creation function can look like this:
def create_pipeline(
# the helper utility instance from DLabs library
pipeline_helper: PipelineHelperInterface,
*,
# any number of any types of custom parameters you wish
data_path: str,
... # more parameters
) -> Iterable[BaseComponent]: # pipeline returns a collection of TFX components
Listing 1: Pipeline creation function signature
The pipeline_helper
will be used to avoid manual selection between components like:
tfx.components.Trainer
tfx.extensions.google_cloud_ai_platform.Trainer.
Since we can return any iterable of components, we don’t have to construct a materialized list of components. Instead, we can use generator functions21. This also reduces boilerplate in cases where components are conditionally included in the pipeline.
Without further ado, here’s a pretty robust ML pipeline for sentiment analysis model training:
def create_pipeline(
pipeline_helper: PipelineHelperInterface,
*,
data_path: str, # the directory with training CSV
number_of_trials: int, # number of hyperparam tuning trials
eval_accuracy_threshold: float = 0.7, # minimal accuracy required to bless the model
# the proportions of train/validation/test split
train_hash_buckets: int = 4,
validation_hash_buckets: int = 1,
evaluation_hash_buckets: int = 1,
train_patience: int, # early stopping patience (in epochs) in trainer
tune_patience: int, # early stopping patience (in epochs) in tuner
train_epochs: int, # maximum number of training epochs in trainer
tune_epochs: int, # maximum number of training epochs in tuner
# set to `True` to skip tuning in this run and use hyperparams from previous run
use_previous_hparams: bool,
) -> Iterable[BaseComponent]:
"""Pipeline definition."""
# Import and split training data from CSV into TFRecord files
splits = [
tfx.proto.SplitConfig.Split(name="train", hash_buckets=train_hash_buckets),
tfx.proto.SplitConfig.Split(name="valid", hash_buckets=validation_hash_buckets),
tfx.proto.SplitConfig.Split(name="eval", hash_buckets=evaluation_hash_buckets),
]
output_config = tfx.proto.Output(
split_config=tfx.proto.SplitConfig(splits=splits),
)
example_gen = tfx.components.CsvExampleGen( # type:ignore[attr-defined]
input_base=data_path, output_config=output_config
)
yield example_gen
# Computes statistics over data for visualization and example validation.
statistics_gen = tfx.components.StatisticsGen( # type:ignore[attr-defined]
examples=example_gen.outputs["examples"]
)
yield statistics_gen
# Generates schema based on statistics files.
# Since we have a very straightforward text dataset, we can depend
# on the auto-generated schema.
# Otherwise one can use `ImportSchemaGen` to import customized schema
# and `ExampleValidator` to check if examples are conforming to the schema.
schema_gen = tfx.components.SchemaGen( # type:ignore[attr-defined]
statistics=statistics_gen.outputs["statistics"], infer_feature_shape=True
)
yield schema_gen
# Performs data preprocessing and feature engineering
transform = tfx.components.Transform( # type:ignore[attr-defined]
examples=example_gen.outputs["examples"],
schema=schema_gen.outputs["schema"],
# the preprocessing callback function can be provided as importable name
# ..
preprocessing_fn="models.preprocessing.preprocessing_fn",
splits_config=tfx.proto.SplitsConfig(
analyze=["train", "valid"], # fit transformations on part of training data
transform=["train", "valid", "eval"], # transform all splits
),
)
yield transform
if use_previous_hparams:
# Find latest best hyperparameters computed in a previous run
hparams_resolver = tfx.dsl.Resolver( # type:ignore[attr-defined]
strategy_class=tfx.dsl.experimental.LatestArtifactStrategy, # type:ignore[attr-defined]
hyperparameters=tfx.dsl.Channel( # type:ignore[attr-defined]
type=tfx.types.standard_artifacts.HyperParameters
),
).with_id("latest_hyperparams_resolver")
yield hparams_resolver
hparams = hparams_resolver.outputs["hyperparameters"]
else:
# Launch hyperparamter tuning to find the best set of hyperparameters.
tuner = pipeline_helper.construct_tuner(
tuner_fn="models.model.tuner_fn",
examples=transform.outputs["transformed_examples"],
transform_graph=transform.outputs["transform_graph"],
train_args=tfx.proto.TrainArgs(splits=["train"]),
eval_args=tfx.proto.EvalArgs(splits=["valid"]),
custom_config={
"number_of_trials": number_of_trials,
"epochs": tune_epochs,
"patience": tune_patience,
},
)
yield tuner
hparams = tuner.outputs["best_hyperparameters"]
# Train a Tensorflow model
trainer = pipeline_helper.construct_trainer(
# the training callback function provided as importable name
run_fn="models.model.run_fn",
# training will operate on examples already preprocessed
examples=transform.outputs["transformed_examples"],
# a Tensorflow graph of preprocessing function is exposed so that it
# can be embedded into the trained model and used when serving.
transform_graph=transform.outputs["transform_graph"],
schema=schema_gen.outputs["schema"],
# use hyperparameters from tuning
hyperparameters=hparams,
train_args=tfx.proto.TrainArgs(splits=["train"]), # split to use for training
eval_args=tfx.proto.EvalArgs(splits=["valid"]), # split to use for validation
# custom parameters to the training callback
custom_config={"epochs": train_epochs, "patience": train_patience},
)
yield trainer
# Get the latest blessed model for model validation comparison.
model_resolver = tfx.dsl.Resolver( # type:ignore[attr-defined]
strategy_class=tfx.dsl.experimental.LatestBlessedModelStrategy, # type:ignore[attr-defined]
model=tfx.dsl.Channel( # type:ignore[attr-defined]
type=tfx.types.standard_artifacts.Model
),
model_blessing=tfx.dsl.Channel( # type:ignore[attr-defined]
type=tfx.types.standard_artifacts.ModelBlessing
),
).with_id("latest_blessed_model_resolver")
yield model_resolver
# Uses TFMA to compute evaluation statistics over features of a model and
# perform quality validation of a candidate model (compared to a baseline).
eval_config = tfma.EvalConfig(
model_specs=[
tfma.ModelSpec(
signature_name="from_examples",
label_key="sentiment",
preprocessing_function_names=["transform_features"],
)
],
# Can be used for fairness calculation. Our dataset is pure text,
# so we run the evaluation only on full dataset.
slicing_specs=[tfma.SlicingSpec()],
metrics_specs=[
tfma.MetricsSpec(
metrics=[
tfma.MetricConfig(
# Metric to use
class_name="BinaryAccuracy",
threshold=tfma.MetricThreshold(
# Require an absolute value of metric to exceed threshold
value_threshold=tfma.GenericValueThreshold(
lower_bound={"value": eval_accuracy_threshold}
),
# Require the candidate model to be better than
# previous (baseline) model by given margin
change_threshold=tfma.GenericChangeThreshold(
direction=tfma.MetricDirection.HIGHER_IS_BETTER,
absolute={"value": -1e-10},
),
),
)
]
)
],
)
evaluator = tfx.components.Evaluator( # type:ignore[attr-defined]
examples=example_gen.outputs["examples"],
example_splits=["eval"], # split of examples to use for evaluation
model=trainer.outputs["model"], # candidate model
baseline_model=model_resolver.outputs["model"], # baseline model
# Change threshold will be ignored if there is no baseline (first run).
eval_config=eval_config,
)
yield evaluator
# Pushes the model to a file/endpoint destination if checks passed.
pusher = pipeline_helper.construct_pusher(
model=trainer.outputs["model"], # model to push
model_blessing=evaluator.outputs["blessing"], # required blessing
)
yield pusher
Listing 2: Pipeline definition
How do you find it? For all the interesting things it does, we believe that it’s succinct enough. Please notice how only 3 components (transform, trainer, and tuner) require custom code. The rest of the components are all using out-of-the-box TFX logic conforming to our configuration in the pipeline definition.
In the following sections, we will go into the detail of all the custom code callbacks.
The Transform
component requires a custom callback that performs data preprocessing. Preprocessing can consist of different operations, expressed as either Tensorflow or Tensorflow Transform22 operations, for example:
The signature of the preprocessing callback is quite simple: input tensors come in, output tensors come out.
In our case the preprocessing code (see listing 3, 4, 5) performs:
def preprocessing_fn(inputs: Dict[str, tf.Tensor]) -> Dict[str, tf.Tensor]:
# We want to preprocess the input text by a ready-made BERT preprocessor
preprocessor = hub.KerasLayer(constants.PREPROCESSOR)
# Reviews are (None, 1) shape, but need to be (None, ) shape
reviews = tf.squeeze(inputs[features.INPUT_TEXT_FEATURE])
# Remove any HTML tags from input
cleaned_reviews = tf.strings.regex_replace(reviews, "<[^>]+>", " ")
# Run the cleaned texts through BERT preprocessor
encoder_inputs = preprocessor(cleaned_reviews)
# This prepares for us 3 tensors (tokens and masks) that we can feed into
# BERT layer directly later.
# Bert tokenizer outputs int32 tensors, but for pipeline storage in TFRecord
# file we need int64, so we convert them here and we will have to convert them
# back into int32 before feeding BERT model.
int64_encoder_inputs = {
key: tf.cast(value, tf.int64) for key, value in encoder_inputs.items()
}
# Encode labels from string "pos" / "neg" into 64bit integers 1 / 0
int64_labels = tf.where(
inputs[features.LABEL_KEY] == features.POSITIVE_LABEL,
tf.constant(1, dtype=tf.int64),
tf.constant(0, dtype=tf.int64),
)
# Return both BERT inputs and label
return {**int64_encoder_inputs, features.LABEL_KEY: int64_labels}
Listing 3: Preprocessing callback
INPUT_TEXT_FEATURE = "review" # name of the text feature
LABEL_KEY = "sentiment" # name of the label feature
POSITIVE_LABEL = "positive" # the value of the positive label
Listing 4: The contents of the features
module
SEQ_LEN = 128 # maximum sequence length (in tokens) for BERT input
# The BERT preprocessor from TF HUB to use
PREPROCESSOR = "https://tfhub.dev/tensorflow/bert_en_uncased_preprocess/3"
# The BERT encoder from TF HUB to use
ENCODER = "https://tfhub.dev/tensorflow/bert_en_uncased_L-12_H-768_A-12/4"
# Batch sizes for training and validation
TRAIN_BATCH_SIZE = 32
EVAL_BATCH_SIZE = 32
Listing 5: The contents of the constants
module
Please note how easy it is to import the BERT preprocessor from Tensorflow Hub.
The Trainer
component requires a custom callback that takes care of training the model. In the following sections we will present the elements of this callback:
First, let’s create a pure-Keras24 model creation function:
def _build_keras_model(hparams: keras_tuner.HyperParameters) -> tf.keras.Model:
"""Creates a DNN Keras model for classifying text."""
# Input are matching outputs of BERT preprocessor.
# Since our training examples are stored in TFRecord format
# the inputs are coming as int64.
inputs = {
"input_type_ids": tf.keras.layers.Input(
shape=(constants.SEQ_LEN,), name="input_type_ids", dtype=tf.int64
),
"input_mask": tf.keras.layers.Input(
shape=(constants.SEQ_LEN,), name="input_mask", dtype=tf.int64
),
"input_word_ids": tf.keras.layers.Input(
shape=(constants.SEQ_LEN,), name="input_word_ids", dtype=tf.int64
),
}
# BERT requires int32 inputs. We were explicitely converting them to int64
# in preprocessing, so converting back does not loose any information.
encoder_inputs = {
key: tf.cast(value, dtype=tf.int32) for key, value in inputs.items()
}
# Run the inputs through BERT and get only the pooled (summary) output.
encoder = hub.KerasLayer(
constants.ENCODER,
trainable=False, # we don't allow BERT fine-tuning
)
outputs = encoder(encoder_inputs)
pooled_output = outputs["pooled_output"]
# Then we run the BERT outputs through a simple neural net
# with one hidden layer and ReLU activation
hidden = tf.keras.layers.Dense(hparams.get("hidden_size"), activation="relu")(
pooled_output
)
# And we expect a single output in range <0, 1>
predictions = tf.keras.layers.Dense(1, activation="sigmoid")(hidden)
# Compile the model
model = tf.keras.Model(inputs=inputs, outputs=predictions)
model.compile(
loss="binary_crossentropy", # loss appropriate for binary classification
optimizer=tf.keras.optimizers.Adam(learning_rate=hparams.get("learning_rate")),
metrics=["accuracy"], # additional metrics to compute during training
)
model.summary(print_fn=logging.info)
return model
Listing 6: Model creation function
And a wrapper that builds the model using mirrored strategy25 (useful if you have more than one GPU on the machine):
def _build_keras_model_with_strategy(
hparams: keras_tuner.HyperParameters,
) -> tf.keras.Model:
gpus = tf.config.list_logical_devices("GPU")
logging.info("Available GPU devices %r", gpus)
cpus = tf.config.list_logical_devices("CPU")
logging.info("Available CPU devices %r", cpus)
# The strategy should pick all GPUs if available, otherwise all CPUs automatically
with tf.distribute.MirroredStrategy().scope():
return _build_keras_model(hparams)
Listing 7: Model creation wrapper
So far the code is very standard, by-the-book Tensorflow 2 Keras model building.
Now let’s take a look at the TFX training callback function. The function receives a single argument fn_args
which is a container for all the parameters passed down from the TFX pipeline.
def run_fn(
fn_args: tfx.components.FnArgs, # type:ignore[name-defined]
) -> None:
"""Train the model based on given args."""
# Load the Transform component output
tf_transform_output = tft.TFTransformOutput(fn_args.transform_output)
# The output contains among other attributs the schema of transformed examples
schema = tf_transform_output.transformed_metadata.schema
# Choose batch sizes
train_batch_size = constants.TRAIN_BATCH_SIZE
eval_batch_size = constants.EVAL_BATCH_SIZE
# Load transformed examples as tf.data.Dataset
train_dataset = _input_fn(
fn_args.train_files,
fn_args.data_accessor,
schema,
batch_size=train_batch_size,
)
eval_dataset = _input_fn(
fn_args.eval_files,
fn_args.data_accessor,
schema,
batch_size=eval_batch_size,
)
# Load best set of hyperparameters from Tuner component
assert fn_args.hyperparameters, "Expected hyperparameters from Tuner"
hparams = keras_tuner.HyperParameters.from_config(fn_args.hyperparameters)
logging.info("Hyper parameters for training: %s", hparams.get_config())
# Build the model using the best hyperparameters
model = _build_keras_model_with_strategy(hparams)
# Write logs together with model, so that we can later view the training
# curves in Tensorboard.
tensorboard_callback = tf.keras.callbacks.TensorBoard(
log_dir=fn_args.model_run_dir, update_freq="batch"
)
# Configure early stopping on validation loss increase
epochs: int = fn_args.custom_config["epochs"]
patience: int = fn_args.custom_config["patience"]
early_stopping_callback = tf.keras.callbacks.EarlyStopping(
monitor="val_loss", patience=patience, verbose=1, restore_best_weights=True
)
# Train the model in the usual Keras fashion
model.fit(
train_dataset,
# steps_per_epoch=fn_args.train_steps,
epochs=epochs,
validation_data=eval_dataset,
# validation_steps=fn_args.eval_steps,
callbacks=[tensorboard_callback, early_stopping_callback],
)
# We need to manually add the transformation layer to the model instance
# in order for it to be tracked by the model and included in the saved model format.
model.tft_layer = tf_transform_output.transform_features_layer()
# Create signature (endpoints) for the model.
signatures = {
# What to do when serving from Tensorflow Serving
"serving_default": _get_text_serving_signature(
model, schema, tf_transform_output
),
# What do do when processing serialized Examples in TFRecord files
"from_examples": _get_tf_examples_serving_signature(
model, schema, tf_transform_output
),
# How to perform only preprocessing.
"transform_features": _get_transform_features_signature(
model, schema, tf_transform_output
),
}
# Save the model in SavedModel format together with the above signatures.
# This saved model will be used by all other pipeline components that require
# a model (for example Evaluator or Pusher).
model.save(fn_args.serving_model_dir, save_format="tf", signatures=signatures)
Listing 8: Training callback
This is still pretty standard Keras training code:
Let’s zoom in on how the datasets are loaded:
def _input_fn(
file_pattern: List[str], # TFRecord file names
# TFX dataset loading helper
data_accessor: tfx.components.DataAccessor, # type:ignore[name-defined]
schema: schema_pb2.Schema, # Dataset schema
batch_size: int, # batch size
) -> tf.data.Dataset:
"""Generates features and label for tuning/training."""
return data_accessor.tf_dataset_factory(
file_pattern,
tfxio.TensorFlowDatasetOptions(
batch_size=batch_size,
# automatically pop the label so the dataset contains tuples (features, labels)
label_key=features.LABEL_KEY,
# We say that the whole dataset is a single epoch. After the first epoch
# of training Tensorflow will remember the size of the dataset
# (number of batches in epoch) and will provide correct ETA times
num_epochs=1,
),
schema, # parse the examples into schema generated by SchemaGen component
)
Listing 9: Dataset loading function
The manner is very similar to the usual tf.data.Dataset
code but passes the arguments through a TFX wrapper (DataAccessor
).
Serving signatures are a way of providing extra logic on top of the ML model itself. The serving signatures are included in the saved model and can be used by any component that later loads the model. The most widely used signature is serving_default
– this is the signature that will be called by Tensorflow Serving when your model is deployed to an endpoint. You are free to define any number of signatures. Most TFX components allow specifying which signature from the model they should use. Thanks to this you can have one signature that will be used for evaluating model from Examples
in TFRecord files and one for serving online prediction requests where the data already comes in a plain string format.
Now let’s take a look at all the signatures that we are defining. First off – the endpoint for processing serialized Examples
– this one will be used during evaluation:
def _get_tf_examples_serving_signature(
model: tf.keras.Model,
schema: schema_pb2.Schema,
tf_transform_output: tft.TFTransformOutput,
) -> Any:
"""
Returns a serving signature that accepts `tensorflow.Example`.
This signature will be used for evaluation or bulk inference.
"""
@tf.function(
# Receive examples packed into bytes (unparsed)
input_signature=[tf.TensorSpec(shape=[None], dtype=tf.string, name="examples")]
)
def serve_tf_examples_fn(
serialized_tf_example: tf.Tensor,
) -> Dict[str, tf.Tensor]:
"""Returns the output to be used in the serving signature."""
# Load the schema of raw examples.
raw_feature_spec = tf_transform_output.raw_feature_spec()
# Remove label feature since these will not be present at serving time.
raw_feature_spec.pop(features.LABEL_KEY)
# Parse the examples using schema into raw features
raw_features = tf.io.parse_example(serialized_tf_example, raw_feature_spec)
# Preprocess the raw features
transformed_features = model.tft_layer(raw_features)
# Run preprocessed inputs through the model to get the prediction
outputs = model(transformed_features)
return {features.LABEL_KEY: outputs}
return serve_tf_examples_fn
Listing 10: Serialized example serving signature
Next, let’s see the online prediction signature:
def _get_text_serving_signature(
model: tf.keras.Model,
schema: schema_pb2.Schema,
tf_transform_output: tft.TFTransformOutput,
) -> Any:
"""
Returns a serving signature that accepts flat text inputs.
This signature will be used for online predictions.
"""
@tf.function(
input_signature=[
tf.TensorSpec(
# Receive texts to analyze as plain string
shape=[None],
dtype=tf.string,
name=features.INPUT_TEXT_FEATURE,
)
]
)
def serve_text_fn(text_tensor: tf.Tensor) -> Dict[str, tf.Tensor]:
"""Returns the output to be used in the serving signature."""
# The transform layer expects (batch, 1) shape tensor, but our flat inputs
# are (batch, ) shape, so we need to add a dimension.
# We also simulate the same features structure as in the training set.
input_features = {features.INPUT_TEXT_FEATURE: tf.expand_dims(text_tensor, 1)}
# Preprocess the raw features
transformed_features = model.tft_layer(input_features)
# Run preprocessed inputs through the model to get the prediction
outputs = model(transformed_features)
# the outputs are of (batch, 1) shape, but for maximum simplicity we want
# to return (batch, ) shape, so we eliminate the extra dimension.
return {features.LABEL_KEY: tf.squeeze(outputs)}
return serve_text_fn
Listing 11: Online prediction serving signature
This signature will allow us to have a super-simple prediction interface: we send in a one-dimensional array of strings and we get back a one-dimensional array of numerical predictions.
And finally, we are also exposing a signature that would be able to perform just the preprocessing:
def _get_transform_features_signature(
model: Any, schema: schema_pb2.Schema, tf_transform_output: tft.TFTransformOutput
) -> Any:
"""
Returns a serving signature that applies tf.Transform to features.
This signature can be used to pre-process the inputs.
"""
@tf.function(
# Receive examples packed into bytes (unparsed)
input_signature=[tf.TensorSpec(shape=[None], dtype=tf.string, name="examples")]
)
def transform_features_fn(serialized_tf_example: tf.TensorSpec) -> Any:
"""Returns the transformed features."""
# Load the schema of raw examples.
raw_feature_spec = tf_transform_output.raw_feature_spec()
# Parse the examples using schema into raw features
raw_features = tf.io.parse_example(serialized_tf_example, raw_feature_spec)
# Preprocess the raw features
transformed_features = model.tft_layer(raw_features)
return transformed_features
return transform_features_fn
Listing 12: Transform serving signature
TFX uses keras_tuner
26 library for hyperparameter tuning. First, let’s check out how to define a simple hyperparameter search space:
def _get_hyperparameters() -> keras_tuner.HyperParameters:
"""Returns hyperparameters for building Keras model."""
hp = keras_tuner.HyperParameters()
# Defines search space.
hp.Choice("learning_rate", [1e-2, 1e-3, 1e-4], default=1e-3)
hp.Choice("hidden_size", [64, 128, 256], default=128)
return hp
Listing 13: Hyperparameter search space definition
Now let’s take a look at how to define the TFX tuning callback. The callback receives pipeline parameters as the fn_args
argument. It needs to return a named tuple with a tuner instance and its parameters. TFX will take care of selecting hyperparameter combinations to check, running trials (with extension components on many distributed machines), and of selecting and storing the best ones. The user only needs to configure the tuner. The callback bears an obvious resemblance to the training callback shown previously (see section 11.6.2).
def tuner_fn(
fn_args: tfx.components.FnArgs, # type:ignore[name-defined]
) -> tfx.components.TunerFnResult: # type:ignore[name-defined]
"""Build the tuner using the KerasTuner API."""
# Number of different hyperparameter combinations to check
number_of_trials: int = fn_args.custom_config["number_of_trials"]
# RandomSearch is a subclass of keras_tuner.Tuner which inherits from
# BaseTuner.
tuner = keras_tuner.RandomSearch(
_build_keras_model_with_strategy, # model building callback
max_trials=number_of_trials, # number of trials to perform
hyperparameters=_get_hyperparameters(), # hyperparameter search space
allow_new_entries=False, # don't allow requesting parameters outside the search space
# We want to choose the set of hyperparms that causes fastest convergence
# so we will select validation loss minimalization as objective.
objective=keras_tuner.Objective("val_loss", "min"),
directory=fn_args.working_dir, # operating directory
project_name="sentiment_analysis_tuning", # will be used to add prefix to artifacts
)
# Load the Transform component output
tf_transform_output = tft.TFTransformOutput(fn_args.transform_graph_path)
# Get the preprocessed inputs schema
schema = tf_transform_output.transformed_metadata.schema
# Load datasets
train_dataset = _input_fn(
fn_args.train_files,
fn_args.data_accessor,
schema,
constants.TRAIN_BATCH_SIZE,
)
eval_dataset = _input_fn(
fn_args.eval_files, fn_args.data_accessor, schema, constants.EVAL_BATCH_SIZE
)
# Configure early stopping
epochs: int = fn_args.custom_config["epochs"]
patience: int = fn_args.custom_config["patience"]
early_stopping_callback = tf.keras.callbacks.EarlyStopping(
monitor="val_loss", patience=patience, verbose=1, restore_best_weights=True
)
# Request hyperparameter tuning
return tfx.components.TunerFnResult( # type:ignore[attr-defined]
tuner=tuner,
fit_kwargs={
"x": train_dataset,
"validation_data": eval_dataset,
"epochs": epochs,
"callbacks": [early_stopping_callback],
},
)
Listing 14: Hyperparameter tuning callback
Here at DLabs.AI we strongly believe in containerization27. We use it to create well-separated, replicable working environments that are straight-forward to use when deploying to the cloud. That’s why even in the local setup we will be running the pipeline inside a custom-built Docker container. A very important pro of using a container is the ability to structure your modelling code into multiple importable Python modules. This helps a lot with maintaining code quality.
To make our life easier, our Docker image will be based on TFX container image28. Thanks to this:
The organization of our code is like this:
models/
– contains modules related to preprocessing and modelling – all TFX callback implementations,pipeline.py
– contains the pipeline definition,local_runner.py
– script for running the pipeline locally,vertex_ai_runner.py
– script for running the pipeline on Vertex AI Pipelines.Below is our Dockerfile
:
# Start from TFX base image!
FROM gcr.io/tfx-oss-public/tfx:1.6.1
# Install additional dependencies
RUN python -m pip install -q 'tfx[kfp]==1.6.1' tensorflow-text tfx-helper==0.0.3
# It takes a while to download BERT, let's display a progress bar during build
ENV TFHUB_DOWNLOAD_PROGRESS=1
# We copy the pipeline creation code into the image, because we will run
# the pipeline through docker
# Copy the pipeline definition into the image
COPY pipeline.py ./pipeline.py
# Copy the runners into the image
COPY local_runner.py vertex_ai_runner.py ./
# We copy the model code into the image, because TFX will try to import the model
# code during pipeline execution
# Copy your modelling code
COPY models ./models
Listing 15: Custom Docker file based on TFX
In our full solution, we are also preloading BERT layers into the docker image so that they don’t have to be downloaded from Tensorflow Hub every training run. This step is optional. You can check it out in our full code repository (see section
To build the docker image issue the following command:
docker build -t "sentiment-analysis-image" .
Listing 16: Local container image building command
The code responsible for pipeline creation is runtime-environment-agnostic. This means that the pipeline definition is generic enough so that the same pipeline can be run both locally and in the cloud.
In order to run the pipeline locally, we need to create a runner script that will call the pipeline definition code and schedule the pipeline for execution.
The script needs to:
def run() -> None:
"""Create and run a pipeline locally."""
# Read the pipeline artifact directory from environment variable
output_dir = os.environ["PIPELINE_OUTPUT"]
# Directory for exporting trained model will be a sub-directory
# of the pipeline artifact directory.
serving_model_dir = os.path.join(output_dir, "serving_model")
# Create pipeline helper instance of local flavour.
pipeline_helper = LocalPipelineHelper(
pipeline_name="sentimentanalysis",
output_dir=output_dir,
# Where should the model be pushed to
model_push_destination=tfx.proto.PushDestination(
filesystem=tfx.proto.PushDestination.Filesystem(
base_directory=serving_model_dir
)
),
)
# Read the input CSV files directory from environment variable
input_dir = os.environ["INPUT_DIR"]
components = create_pipeline(
# Pass our pipeline helper instance
pipeline_helper,
# The rest of the parameters are pipeline-specific.
data_path=input_dir,
# We want to run only a few hyperparameter optimization trials
number_of_trials=2,
# We don't aim for superior accuracy
eval_accuracy_threshold=0.6,
# Fast tuning runs
tune_epochs=2,
tune_patience=1,
# A bit longer training run
train_epochs=10,
train_patience=3,
use_previous_hparams=False,
)
pipeline_helper.create_and_run_pipeline(components)
Listing 17: Local runner script
When we have the script ready and the image built, we can execute the pipeline locally using Docker. The command for running the pipeline will be a bit complex because we need to expose both the input data directory (as /input_data
inside the container) and the pipeline output directory (as /tfx_pipeline_output
) to the container:
docker run \
-it --rm \
--entrypoint python \
--volume "$(pwd)/tfx_pipeline_output:/tfx_pipeline_output:rw" \
--env "PIPELINE_OUTPUT=/tfx_pipeline_output" \
--volume "$(pwd)/input_data:/input_data:ro" \
--env "INPUT_DIR=/input_data" \
"sentiment-analysis-image" \
-m local_runner
Listing 18: Local pipeline run command
While the pipeline is running you should be able to see a local tfx_pipeline_output
directory filling up with pipeline execution artifacts.
Once the pipeline successfully finishes, there should be a trained model stored in a local directory tfx_pipeline_output/serving_model/
. This should be a totally self-contained model that can be deployed to any server that supports Tensorflow models. We can just spin up a Docker container with Tensorflow Serving to check out how our model works. All we need to do is start a stock Tensorflow Serving that will have the model directory mounted:
docker run \
-it --rm \
-p 8501:8501 \
-v "$(pwd)/tfx_pipeline_output/serving_model:/models/sentiment_analysis" \
-e MODEL_NAME=sentiment_analysis \
tensorflow/serving
Listing 19: Start local Tensor ow Serving command
Once the server is running we can try it out by sending some sample requests to validate whether the model is working in the way we intended:
curl -d '{"instances": ["I loved it!", "The movie was bad", "I hated the whole thing", "Best flick ever"]}' \
-X POST \
http://localhost:8501/v1/models/sentiment_analysis:predict
Listing 20: Query local Tensor ow Serving command
We should get back a response similar to this one:
{
"predictions": [
0.870253503, 0.0926531255, 0.225041032, 0.593504
]
}
Listing 21: Prediction response
In this section of the course, we are going to:
Section 13 is going to explain how to visualize pipeline artifacts produced during a cloud run.
So far our pipeline was working inside a Docker container, but it was using a filesystem for input data, storing pipeline artifacts and ML Metadata, and pushing the trained model to a directory. It was also using the local machine’s resources to execute the computations.
When running our pipeline in the Google Cloud Platform’s Vertex AI Pipelines:
To be able to run the pipeline in the cloud we need to first setup the needed services and accounts.
gcloud
CLI on your machine30.gcloud
util using your user account31.># GCP project ID
PROJECT="tfx-article"
# The name of the service account to create
SERVICE_ACCOUNT="pipelines-service-account"
# The name of the bucket with input data and for pipeline artifacts
BUCKET_NAME="tfx-article"
# The local file to save the service account credentials in
KEY_FILE="service_account_key.json"
# create service account
gcloud iam service-accounts create "${SERVICE_ACCOUNT}" \
--description="Service account for Vertex AI Pipelines" \
--display-name="Pipelines" \
--project="${PROJECT}"
Listing 22: Create service account command
# grant access to Vertex AI
gcloud projects add-iam-policy-binding "${PROJECT}" \
--member="serviceAccount:${SERVICE_ACCOUNT}@${PROJECT}.iam.gserviceaccount.com" \
--role="roles/aiplatform.user"
# grant read access to artifact registry
gcloud projects add-iam-policy-binding "${PROJECT}" \
--member="serviceAccount:${SERVICE_ACCOUNT}@${PROJECT}.iam.gserviceaccount.com" \
--role="roles/artifactregistry.reader"
# grant access to storage bucket
gsutil iam ch \
"serviceAccount:${SERVICE_ACCOUNT}@${PROJECT}.iam.gserviceaccount.com:roles/storage.objectAdmin" \
"gs://${BUCKET_NAME}"
# grant the service account permission to submit pipeline jobs on its behalf
gcloud iam service-accounts add-iam-policy-binding \
"${SERVICE_ACCOUNT}@${PROJECT}.iam.gserviceaccount.com" \
--member="serviceAccount:${SERVICE_ACCOUNT}@${PROJECT}.iam.gserviceaccount.com" \
--role="roles/iam.serviceAccountUser"
Listing 23: Grant roles to service account command
Create and download JSON credentials for the created service account.
# create and download service account key
gcloud iam service-accounts keys create "${KEY_FILE}" \
--iam-account="${SERVICE_ACCOUNT}@${PROJECT}.iam.gserviceaccount.com"
Listing 24: Create and download credentials command
Upload your input files to the created Cloud Storage bucket.
To be able to run your pipeline code on Google Cloud resources we need to package our code as a Docker image and push it to a cloud image respository. This is needed so that Google Cloud Platform can have access to our code when it is spinning up containers for each pipeline component during pipeline execution. The following sections are going to show the steps needed to deploy your code to Artifact Registry.
You should have created a Docker repository in Artifact Registry. In order to be able to push to that repository you need to configure authentication between your local docker daemon and the cloud repository. The exact step will vary depending on the location of the repository you created. The easiest way to get it right is to go to the repository details in Google Cloud Platform UI and select Setup instructions (see figure 3).
The help panel should list the command to use. In our case it was:
gcloud auth configure-docker europe-west4-docker.pkg.dev
Listing 25: Setup Docker authentication command
You need to pick a name for your image. Once again, We called ours sentiment-analysis-image
. We can then tag with a new tag (or build and tag) the same image we used for local development.
IMAGE="europe-west4-docker.pkg.dev/tfx-article/sentiment-analysis-repo/sentiment-analysis-image:latest"
docker build -t "${IMAGE}" .
docker push "${IMAGE}"
Listing 26: Docker build and push container image to Artifact Registry command
The first time around, the push
command is bound to take longer. You will be effectively making a copy of the TFX base image layers in your repository and will have to push over 6 gigabytes of data to the cloud. If this is something you don’t want to do, you can consider using Cloud Build service to execute the build in the cloud (outside the scope of this course).
In order to run the pipeline in the cloud, on Vertex AI Pipelines, we need to create a runner script that will call the pipeline definition code and schedule the pipeline for execution, just like in section 11.9 where we were running the pipeline locally.
This time though we will have to configure all the cloud parameters – Google Cloud Platform project, region, service account and more importantly: types of machines for our distributed training. The TFX helper library (see section 11.3) tries to make this configuration as simple as possible. To add a GPU to your machines just pass the type of accelerator to helper initializer. If you don’t pass any or pass None
, the machine will be CPU-only. Our TFX helper library enables you to also define resource requirements for standard TFX components. If not overridden the components are running on e2-standard-4
machines33.
Here’s our Vertex AI runner script:
def run() -> None:
# save pipeline artifacts to Cloud Storage
output_dir = "gs://tfx-article/pipeline_artifacts"
# minimal (less than the standard `e2-standard-4`) resource for components
# that won't execute computations
minimal_resources = Resources(cpu=1, memory=4)
# create a helper instance of cloud flavour
pipeline_helper = VertexAIPipelineHelper(
pipeline_name="sentimentanalysis",
output_dir=output_dir,
google_cloud_project="tfx-article",
google_cloud_region="europe-west4",
# all the components will use our custom image for running
docker_image="europe-west4-docker.pkg.dev/tfx-article/sentiment-analysis-repo/sentiment-analysis-image:latest",
service_account="pipelines-service-account@tfx-article.iam.gserviceaccount.com",
# name of the Vertex AI Endpoint
serving_endpoint_name="sentimentanalysis",
# NUmber of parallel hyperparameter tuning trails
num_parallel_trials=2,
# GPU for Trainer and Tuner components
trainer_accelerator_type="NVIDIA_TESLA_T4",
# Machine type for Trainer and Tuner components
trainer_machine_type="n1-standard-4",
# GPU for serving endpoint
serving_accelerator_type="NVIDIA_TESLA_T4",
# Machine type for serving endpoint
serving_machine_type="n1-standard-4",
# Override resource requirements of components. The dictionary key is the ID
# of the component (usually class name, unless changed with `with_id` method).
resource_overrides={
# evaluator needs more RAM than standard machine can provide
"Evaluator": Resources(cpu=16, memory=32),
# training is done as Vertex job on a separate machine
"Trainer": minimal_resources,
# tuning is done as Vertex job on a separate set of machines
"Tuner": minimal_resources,
# pusher is just submitting a job
"Pusher": minimal_resources,
},
)
components = create_pipeline(
pipeline_helper,
# Input data in Cloud Storage
data_path="gs://tfx-article/input_data/",
# Run a few hyperparameter tuning trials
number_of_trials=10,
# Make the trials short
tune_epochs=3,
tune_patience=1,
# Make the final training long (aim for best accuracy)
train_epochs=50,
train_patience=10,
# After the first successful run you can change this to `True` to skip
# tuning in subsequent runs.
use_previous_hparams=False,
)
pipeline_helper.create_and_run_pipeline(components, enable_cache=False)
Listing 27: Vertex AI runner scripte
The script is still pretty straight-forward. Please note that the complexities of constructing parameters for TFX extension components are abstracted away from the user by the TFX helper library.
Before scheduling the pipeline execution remember to re-build and push the custom docker image to the cloud repository (see section 12.3.2)! Otherwise the pipeline will execute an old version of your code.
To schedule the execution of the pipeline we just need to run the runner module inside our custom image. Please note that even though you will run the container on your machine, it will not perform any computations, instead it will just submit a pipeline job to Vertex AI. We need to share authentication credentials with the container for it to be able to submit the job successfully.
GOOGLE_APPLICATION_CREDENTIALS="$(pwd)/service_account_keys.json"
IMAGE="europe-west4-docker.pkg.dev/tfx-article/sentiment-analysis-repo/sentiment-analysis-image:latest"
docker run \
-it --rm \
--entrypoint python \
--volume "${GOOGLE_APPLICATION_CREDENTIALS}:/creds.json:ro" \
--env "GOOGLE_APPLICATION_CREDENTIALS=/creds.json" \
"${IMAGE}" \
-m vertex_ai_runner
Listing 28: Vertex AI pipeline scheduling command
After the job is submitted you can track the pipeline execution in Vertex AI Pipelines UI (see figure 4).
Once the model is pushed, we can use it to make online prediction requests using an HTTP endpoint.
First we need to find out the details of the endpoint. The easiest way is through GCP UI. Go to Vertex AI Endpoints and identify your endpoint (see figure 5).
Then click on Sample request. A panel should open that will contain instructions for making a sample request to the deployed endpoint (see figure 6).
In our case the command can look similar to this:
curl \
-X POST \
-H "Authorization: Bearer $(gcloud auth print-access-token)" \
-H "Content-Type: application/json" \
https://europe-west4-aiplatform.googleapis.com/v1/projects/tfx-article/locations/europe-west4/endpoints/3364804648157315072:predict \
-d '{"instances": ["I hate the main actor", "I loved the movie!", "This is a masterpiece", "I am dissapponted"]}'
Listing 29: Vertex AI endpoint query command
And the response can look like this (see also figure 7):
{
"predictions": [
0.331922054,
0.974850595,
0.958151937,
0.149421304
],
"deployedModelId": "...",
"model": "projects/.../locations/europe-west4/models/...",
"modelDisplayName": "v..."
}
Listing 30: Vertex AI endpoint response
In this section, we are going to show how you can preview the artifacts of a pipeline running on Vertex AI or locally.
For most artifacts we will be using a Jupyter Notebook instance running inside our custom Docker image (TFX base image comes with Jupyter Notebook pre-installed). In order for the code inside the container to be able to authenticate with Google Cloud for the purpose of artifact loading, we need to share the service account credentials with the container. If you don’t want to lose your work when you shut down your notebook server, you should also mount the notebooks directory in read-write mode. Please note that in the command below we are disabling notebook authentication to skip the token copy-paste requirement. This is only a good idea if your Docker setup does not expose the running containers to the outside of your local machine. In all other cases, this is a security concern, and you should use the standard notebook authentication mechanism.
GOOGLE_APPLICATION_CREDENTIALS="$(pwd)/service_account_key.json"
IMAGE="europe-west4-docker.pkg.dev/tfx-article/sentiment-analysis-repo/sentiment-analysis-image:latest"
docker run \
-it --rm \
--entrypoint jupyter \
-p 8888:8888 \
--volume "${GOOGLE_APPLICATION_CREDENTIALS}:/creds.json:ro" \
--env "GOOGLE_APPLICATION_CREDENTIALS=/creds.json" \
--volume "$(pwd)/nbs:/nbs:rw" \
"${IMAGE}" \
notebook \
-y \
--no-browser \
--allow-root \
--ip=0.0.0.0 --port=8888 --port-retries=0 \
--notebook-dir=/nbs \
--NotebookApp.password='' \
--NotebookApp.token=''
Listing 31: Running Jupyter Notebook command
You can then navigate in your browser to http://localhost:8888/ to view the notebooks (see figure 8).
The code listings in the following sections are going to be using the Python interactive shell syntax convention (>>>
prompt signifies user input, lines without prompt are results), but they are meant to be executed in a Jupyter Notebook so that the visualizations can be rendered.
To obtain the URI of the statistics that TFX calculates as part of the pipeline select the statistics
output of the StatisticsGen
component (see figure 9).
We will use tensorflow_data_validation
library to load the statistics. TFX stores statistics per each split that we configured. We can list the splits it created:
>>> import os
>>> import tensorflow as tf
>>> import tensorflow_data_validation as tfdv
>>> STATS_URI = "gs://tfx-article/pipeline_artifacts/sentimentanalysis/30425340785/sentimentanalysis-20220217094443/StatisticsGen_2854107885334429696/statistics"
>>> directories = tf.io.gfile.glob(os.path.join(STATS_URI, 'Split-*'))
>>> names = map(os.path.basename, directories)
>>> splits = {name: os.path.join(directory, 'FeatureStats.pb') for name, directory in zip(names, directories)}
>>> splits
{'Split-eval': 'gs://tfx-article/pipeline_artifacts/sentimentanalysis/30425340785/sentimentanalysis-20220217094443/StatisticsGen_2854107885334429696/statistics/Split-eval/FeatureStats.pb',
'Split-train': 'gs://tfx-article/pipeline_artifacts/sentimentanalysis/30425340785/sentimentanalysis-20220217094443/StatisticsGen_2854107885334429696/statistics/Split-train/FeatureStats.pb',
'Split-valid': 'gs://tfx-article/pipeline_artifacts/sentimentanalysis/30425340785/sentimentanalysis-20220217094443/StatisticsGen_2854107885334429696/statistics/Split-valid/FeatureStats.pb'}
Listing 32: Listing statistics splits
Then we can select up to 2 splits that we want to visualize:
>>> lhs_split = 'Split-valid'
>>> rhs_split = 'Split-eval'
Listing 33: Selecting splits to investigate
And finally, we can load them and display them:
>>> tfdv.visualize_statistics(
lhs_statistics=tfdv.load_stats_binary(splits[lhs_split]),
lhs_name=lhs_split,
rhs_statistics=tfdv.load_stats_binary(splits[rhs_split]),
rhs_name=rhs_split
)
Listing 34: Visualizing splits statistics
See figure 10 to view a sample visualization result.
To obtain the URI of the schema that TFX generates as part of the pipeline select schema
output of the SchemaGen
component (see figure 11).
We will use tensorflow_data_validation
library to load the schema:
>>> import os
>>> import tensorflow_data_validation as tfdv
>>> SCHEMA_URI = "gs://tfx-article/pipeline_artifacts/sentimentanalysis/30425340785/sentimentanalysis-20220217094443/SchemaGen_-4063421142306652160/schema"
>>> schema = tfdv.load_schema_text(os.path.join(SCHEMA_URI, 'schema.pbtxt'))
>>> tfdv.display_schema(schema)
Listing 35: Visualizing schema
See figure 12 to view a schema visualization result.
To obtain the URI of the best hyperparameters found during pipeline execution, select best_hyperparameters
output of the Tuner component (see figure 13).
The parameters are stored as a JSON file. We can load the JSON config using keras_tuner
library:
>>> import os
>>> import json
>>> import keras_tuner
>>> import tensorflow as tf
>>> HPARAMS_URI = "gs://tfx-article/pipeline_artifacts/sentimentanalysis/30425340785/sentimentanalysis-20220216174148/Tuner_-2301950738051366912/best_hyperparameters"
>>> with tf.io.gfile.GFile(os.path.join(HPARAMS_URI, 'best_hyperparameters.txt'), 'r') as f: hparams_config = json.load(f)
>>> hparams = keras_tuner.HyperParameters.from_config(hparams_config)
>>> hparams.values
{'learning_rate': 0.01, 'hidden_size': 64}
Listing 36: Visualizing best hyperparameters
Because we used Tensorboard
callback in our modelling code, we can view the progress of model training live as it is happening. To do so we need to run Tensorboard34 while pointing it at the location of the Model Run artifact in pipeline output. The TFX docker image should already come with Tensorboard pre-installed, so we just need to spin up a container and setup GCP authentication inside it. The location of Tensorboard log directory can be obtained from the Vertex AI Pipelines UI by navigating to the model_run
output of the Trainer
component and copying its URI (see figure 14).
To spin up a Tensorboard container use the following command:
IMAGE="europe-west4-docker.pkg.dev/tfx-article/sentiment-analysis-repo/sentiment-analysis-image:latest"
GOOGLE_APPLICATION_CREDENTIALS="$(pwd)/service_account_key.json"
LOG_DIR="gs://tfx-article/pipeline_artifacts/sentimentanalysis/30425340785/sentimentanalysis-20220217094443/Trainer_-6369264151520346112/model_run"
docker run \
-it --rm \
-p 6006:6006 \
--entrypoint tensorboard \
--volume "${GOOGLE_APPLICATION_CREDENTIALS}:/creds.json:ro" \
--env "GOOGLE_APPLICATION_CREDENTIALS=/creds.json" \
"${IMAGE}" \
--bind_all \
--port 6006 \
--logdir="${LOG_DIR}"
Listing 37: Running Tensorboard to view model training logs
Then, in your browser navigate to http://localhost:6006/#scalars to view the training curves (see figure 15).
To obtain the URI of the model evaluation that TFX generates as part of the pipeline select evaluation
output of the Evaluator
component (see figure 16).
We will use tensorflow_model_analysis
library to visualize the result. First though we need to install Jupter Notebook extensions that will allow the widget to render properly:
jupyter nbextension enable --py widgetsnbextension --sys-prefix
jupyter nbextension install --py --symlink tensorflow_model_analysis --sys-prefix
jupyter nbextension enable --py tensorflow_model_analysis --sys-prefix
Listing 38: Installing notebook extensions
Afterwards, we can load and render the results:
>>> import tensorflow_model_analysis as tfma
>>> EVALUATION_URI = "gs://tfx-article/pipeline_artifacts/sentimentanalysis/30425340785/sentimentanalysis-20220216174148/Evaluator_-9219479765692448768/evaluation"
>>> result = tfma.load_eval_result(output_path=EVALUATION_URI, output_file_format=None)
>>> tfma.view.render_slicing_metrics(result)
Listing 39: Visualizing evaluation results
See figure 17 for a sample evaluation visualization.
Running the described pipeline in the cloud is definitely the right thing to do. But all good things in life cost money. Running described pipeline on Google Cloud Platform can incur the following charges:
Our experience shows that Vertex AI charges tend to be the most significant ones. Your costs might vary depending on region, amount of data, number of epochs and so on.
We triggered a single run of the described pipeline with the following parameters:
n1-standard-4
machines with one NVIDIA_TESLA_T4
GPU each,n1-standard-4
machine with one NVIDIA_TESLA_T4
GPU,The below table presents the execution times of pipeline components in the above-described run:
Component | Execution time |
CsvExampleGen | 0h 11m |
StatisticsGen | 0h 01m |
SchemaGen | 0h 01m |
Transform | 0h 03m |
Tuner | 2h 10m |
Trainer | 4h 49m |
Evaluator | 0h 52m |
Pusher | 0h 20m |
Total | 8h 27m |
Table 1: Pipeline components execution times
The following table presents the costs incurred on Google Cloud Platform for the single run:
Service | Charged amount in PLN | Charged amount in EUR |
Vertex AI | 30.99 PLN | 6.62 EUR |
Cloud Storage | 0.19 PLN | 0.04 EUR |
Artifact Registry | 0.14 PLN | 0.03 EUR |
Cloud Logging | 0.00 PLN | 0.00 EUR |
Total | 31.32 PLN | 6.69 EUR |
Table 2: Cost incurred by single pipeline run
The binary sentiment analysis problem that we presented in this course is just one example of a problem that can be solved using NLP tools like BERT. In fact BERT can be used for a multitude of other problems. At DLabs so far we used it for:
We have used other NLP methods for:
We also recommend you check out the full, open-source code of the pipeline presented in this course at: https://github.com/dlabsai/tfx-sentiment-analysis. We also recommend you to check out our TFX helper library at: https://github.com/dlabsai/tfx-helper .
The full code contains a Makefile
that helps in performing most of the commands described in the course. It features additional tweaks (like preloading BERT into the container image). In there you can easily preview the whole code organization into modules.
In this course, we showed an end-to-end MLOps pipeline for training and deploying a Machine Learning model for sentiment analysis. We hope that we managed to convince you:
We here at DLabs.AI certainly appreciate the completeness of the created pipeline – going from input CSV files to deployed live endpoint in one reproducible flow. We can hardly imagine a faster way towards a production-grade MLOps than TFX.