Validating Data in a Production Pipeline: The TFX Way | By Akila S | June 2024

Machine Learning


Learn more about validating data with Tensorflow Data Validation

Akira S
Towards Data Science

Imagine this: you have a perfectly working machine learning pipeline, no issues there. So you decide to push it to production. Everything is fine in production, but one day, a small change is made to one of the components that generates input data for your pipeline, and your pipeline breaks. Oops!!!

Photo by Sarah Kilian on Unsplash

Why did this happen?

ML models are highly dependent on the data used, so remember the old adage “Garbage In, Garbage Out” – with the right data the pipeline works well, but any change tends to make the pipeline break down.

The data passed down the pipeline is primarily generated through automated systems, which means less control over the type of data generated.

What should I do?

Data validation is the answer.

Data validation is a safeguard that verifies whether data is in the correct format for the pipeline to consume.

Read this article to understand why validation is important in an ML pipeline and the five stages of machine learning validation.

TensorFlow Data Validation (TFDV) is part of the TFX ecosystem and can be used to validate data in your ML pipelines.

TFDV compares training and serving data to compute descriptive statistics, schemas, and identify anomalies, ensuring that training and serving data are consistent and the pipeline does not break or create unexpected predictions.

The folks at Google wanted TFDV to be used early in the ML process, which is why they made it available in their notebooks, and we'll do the same here.

First, you need to install the tensorflow-data-validation library using pip. Preferably, create a virtual environment and start with the installation:

important point: Check the version compatibility of your TFX library before installing

pip install tensorflow-data-validation

The data validation process follows these steps:

  1. Generate statistics from training data
  2. Inferring a schema from training data
  3. Generate statistics on the evaluation data and compare it to the training data
  4. Identifying and correcting anomalies
  5. Checking for drift and data skew
  6. Save the schema

We use three types of datasets here to mimic real-time usage: training data, evaluation data, and serving data. The ML model is trained using the training data. Evaluation data (also called test data) is a portion of data designated to test the model as soon as the training phase is completed. Serving data is presented to the model in production to make predictions.

The entire code discussed in this article is available in my GitHub repository, which you can download here.

We will be using the Titanic Spaceship dataset from Kaggle, for more information and to download the dataset please see this link.

Sample views of the spaceship Titanic dataset

The data consists of a mixture of numerical and categorical data, it is a classification dataset, and the class labels are TransportedThe value is True or False.

Data Description

The necessary imports are done and the paths of the csv files are defined. The actual dataset contains training and testing data. I manually introduced some errors and saved the file as 'titanic_test_anomalies.csv' (this file is not available on Kaggle, you can download it from my GitHub repository link).

Here, we use ANOMALOUS_DATA as the evaluation data and TEST_DATA as the provided data.

import tensorflow_data_validation as tfdv
import tensorflow as tf

TRAIN_DATA = '/data/titanic_train.csv'
TEST_DATA = '/data/titanic_test.csv'
ANOMALOUS_DATA = '/data/titanic_test_anomalies.csv'

The first step is to analyze the training data and identify its statistical properties. generate_statistics_from_csv The function reads data directly from a csv file. TFDV has generate_statistics_from_tfrecord The data is TFRecord .

of visualize_statistics The function displays an 8-point summary and a handy chart to help you understand the basic statistics of your data. This is called the facet view. Important details that require your attention are highlighted in red. Many other features for analyzing your data are also available here. Play around and get a better understanding.

# Generate statistics for training data
train_stats=tfdv.generate_statistics_from_csv(TRAIN_DATA)
tfdv.visualize_statistics(train_stats)
Statistics generated for the dataset

Here we can see that the Age and RoomService features have missing values ​​and need to be imputed. We also see that RoomService has 65.52% zeros. Since this is how this particular data is distributed, we do not consider it an anomaly and move on.

Once all issues have been resolved satisfactorily, infer_schema function.

schema=tfdv.infer_schema(statistics=train_stats)
tfdv.display_schema(schema=schema)

A schema is typically expressed in two sections: the first section gives details such as data type, existence, valence, domain, etc. The second section gives the values ​​that the domain comprises.

Section 1: Feature Details
Section 2: Domain Values

This is an initial raw schema that we will refine in later steps.

Now we take the evaluation data and generate statistics. We use ANOMALOUS_DATA as the evaluation data because we need to understand how anomalies should be handled. Anomalies have been manually introduced into this data.

Once you have generated the statistics, visualize the data. You can apply visualization only to the evaluation data (as you did for the training data). However, it makes more sense to compare the evaluation data statistics to the training statistics, so that you can understand how different the evaluation data is from the training data.

# Generate statistics for evaluation data

eval_stats=tfdv.generate_statistics_from_csv(ANOMALOUS_DATA)

tfdv.visualize_statistics(lhs_statistics = train_stats, rhs_statistics = eval_stats,
lhs_name = "Training Data", rhs_name = "Evaluation Data")

Comparing training and evaluation data statistics

Here we can see that the RoomService feature is not present in the evaluation data (a big red flag). The other features look pretty OK, as they show a similar distribution to the training data.

But in a production environment, visual inspection alone is not enough, so we let TFDV actually analyze it and report back if there are any issues.

The next step is to verify the statistics obtained from the evaluation data and compare it to the schema generated on the training data. display_anomalies This feature displays in a tabular format the anomalies identified by TFDV and their descriptions.

# Identifying Anomalies
anomalies=tfdv.validate_statistics(statistics=eval_stats, schema=schema)
tfdv.display_anomalies(anomalies)
Anomaly list provided by TFDV

From the table, we can see that the evaluation data is missing two columns (Transported and RoomService), the domain of the Destination feature has an additional value called “Anomaly” (which is not present in the training data), the CryoSleep and VIP features have values ​​”TRUE” and “FALSE” (which are not present in the training data), and finally, five features contain integer values ​​while the schema expects floating point values.

That's a lot of work. So let's get to work.

There are two ways to fix anomalies: either process the assessment data (manually) to make it conform to the schema, or modify the schema so that these anomalies are accepted. Again, this requires domain experts to decide which anomalies are acceptable and which require data processing.

Let's start with the “destination” feature. We found a new value “anomaly” that was not in the domain list in the training data. Let's add it to the domain and make it also a tolerable value for the feature.

# Adding a new value for 'Destination'
destination_domain=tfdv.get_domain(schema, 'Destination')
destination_domain.value.append('Anomaly')

anomalies=tfdv.validate_statistics(statistics=eval_stats, schema=schema)
tfdv.display_anomalies(anomalies)

This anomaly has been removed and is no longer visible in the anomalies list. Let's move on to the next anomaly.

The destination anomaly has been resolved

If you look at the domains for VIP and CryoSleep, you'll see that the training data has lowercase values ​​and the evaluation data has the same values ​​in uppercase. One option is to preprocess the data so that all the data is converted to either lowercase or uppercase, but add these values ​​to the domain. Since VIP and CryoSleep use the same set of values ​​(true and false), set CryoSleep's domain to use VIP's domain.

# Adding data in CAPS to domain for VIP and CryoSleep

vip_domain=tfdv.get_domain(schema, 'VIP')
vip_domain.value.extend(['TRUE','FALSE'])

# Setting domain of one feature to another
tfdv.set_domain(schema, 'CryoSleep', vip_domain)

anomalies=tfdv.validate_statistics(statistics=eval_stats, schema=schema)
tfdv.display_anomalies(anomalies)

Resolved CryoSleep and VIP issues

It is fairly safe to convert integer features to floating point numbers, so we ask the evaluation data to infer data types from the schema of the training data, which solves any data type related issues.

# INT can be safely converted to FLOAT. So we can safely ignore it and ask TFDV to use schema

options = tfdv.StatsOptions(schema=schema, infer_type_from_schema=True)
eval_stats=tfdv.generate_statistics_from_csv(ANOMALOUS_DATA, stats_options=options)

anomalies=tfdv.validate_statistics(statistics=eval_stats, schema=schema)
tfdv.display_anomalies(anomalies)

Resolved data type issue

Finally, we arrive at our final set of anomalies: two columns that are present in the training data are not present in the evaluation data.

“Transported” is a class label and is not available in the evaluation data. If you know that your training and evaluation features may differ from each other, you can create multiple environments. Here, you create a training environment and a serving environment. You specify that the “Transported” feature is available in the training environment but not in the serving environment.

# Transported is the class label and will not be available in Evaluation data.
# To indicate that we set two environments; Training and Serving

schema.default_environment.append('Training')
schema.default_environment.append('Serving')

tfdv.get_feature(schema, 'Transported').not_in_environment.append('Serving')

serving_anomalies_with_environment=tfdv.validate_statistics(
statistics=eval_stats, schema=schema, environment='Serving')

tfdv.display_anomalies(serving_anomalies_with_environment)

“RoomService” is a required feature that is not available in a Serving environment, requiring manual intervention by a domain expert in such cases.

Keep solving the problem until you get this output.

All anomalies have been resolved

All anomalies have been resolved

The next step is to check for drift and skew. Skew is caused by irregularities in the distribution of data. When you first train your model, your predictions are usually perfect. But over time, the distribution of your data changes and misclassification errors start to increase. This is called drift. These issues require you to retrain your model.

The L-infinity distance is used to measure skew and drift. A threshold is set based on the L-infinity distance. If the difference between a feature analyzed in the training and service environments exceeds the specified threshold, the feature is considered to have experienced drift. A similar threshold-based approach is adopted for skew. In this example, the threshold levels for both drift and skew are set to 0.01.

serving_stats = tfdv.generate_statistics_from_csv(TEST_DATA)

# Skew Comparator
spa_analyze=tfdv.get_feature(schema, 'Spa')
spa_analyze.skew_comparator.infinity_norm.threshold=0.01

# Drift Comparator
CryoSleep_analyze=tfdv.get_feature(schema, 'CryoSleep')
CryoSleep_analyze.drift_comparator.infinity_norm.threshold=0.01

skew_anomalies=tfdv.validate_statistics(statistics=train_stats, schema=schema,
previous_statistics=eval_stats,
serving_statistics=serving_stats)
tfdv.display_anomalies(skew_anomalies)

We can see that “Spa” exhibits an acceptable level of skew (since it is not listed in the anomaly list). However, “CryoSleep” exhibits a high level of drift. If we were to create an automated pipeline, these anomalies could be used as triggers for automated model retraining.

High distortion of CryoSleep

After you resolve all anomalies, you can save the schema as an artifact or store it in your metadata repository for use in your ML pipeline.

# Saving the Schema
from tensorflow.python.lib.io import file_io
from google.protobuf import text_format

file_io.recursive_create_dir('schema')
schema_file = os.path.join('schema', 'schema.pbtxt')
tfdv.write_schema_text(schema, schema_file)

# Loading the Schema
loaded_schema= tfdv.load_schema_text(schema_file)
loaded_schema

You can download the notebook and data files from my GitHub repository using this link.

Read the articles below to learn about your options and how to choose the right framework for your ML pipeline project:

Thank you for reading my article, if you liked it, please clap and encourage me, and if you disagree, please let me know in the comments what you think I could improve. Ciao.

All images are by the author unless otherwise noted.



Source link

Leave a Reply

Your email address will not be published. Required fields are marked *