You generally write unit tests for your code, but do you also test your data? Incoming data quality can make or break your application. Incorrect, missing, or malformed data can have a large impact on production systems. Examples of data quality issues include the following:
- Missing values can lead to failures in production system that require non-null values (
- Changes in the distribution of data can lead to unexpected outputs of machine learning (ML) models
- Aggregations of incorrect data can lead to wrong business decisions
In this post, we introduce PyDeequ, an open-source Python wrapper over Deequ (an open-source tool developed and used at Amazon). Deequ is written in Scala, whereas PyDeequ allows you to use its data quality and testing capabilities from Python and PySpark, the language of choice of many data scientists. PyDeequ democratizes and extends the power of Deequ by allowing you to use it alongside the many data science libraries that are available in that language. Furthermore, PyDeequ allows for fluid interface with Pandas DataFrames as opposed to restricting within Apache Spark DataFrames.
Deequ allows you to calculate data quality metrics on your dataset, define and verify data quality constraints, and be informed about changes in the data distribution. Instead of implementing checks and verification algorithms on your own, you can focus on describing how your data should look. Deequ supports you by suggesting checks for you. Deequ is implemented on top of Apache Spark and is designed to scale with large datasets (billions of rows) that typically live in a data lake, distributed file system, or a data warehouse. PyDeequ gives you access to this capability, but also allows you to use it from the familiar environment of your Python Jupyter notebook.
Deequ at Amazon
Deequ is used internally at Amazon to verify the quality of many large production datasets. Dataset producers can add and edit data quality constraints. The system computes data quality metrics on a regular basis (with every new version of a dataset), verifies constraints defined by dataset producers, and publishes datasets to consumers in case of success. In error cases, dataset publication can be stopped, and producers are notified to take action. Data quality issues don’t propagate to consumer data pipelines, reducing their blast radius.
Deequ is also used within Amazon SageMaker Model Monitor. Now with the availability of PyDeequ, you can use it from a broader set of environments— Amazon SageMaker notebooks, AWS Glue, Amazon EMR, and more.
Overview of PyDeequ
Let’s look at PyDeequ’s main components, and how they relate to Deequ (shown in the following diagram):
- Metrics computation – Deequ computes data quality metrics, that is, statistics such as completeness, maximum, or correlation. Deequ uses Spark to read from sources such as Amazon Simple Storage Service (Amazon S3) and compute metrics through an optimized set of aggregation queries. You have direct access to the raw metrics computed on the data.
- Constraint verification – As a user, you focus on defining a set of data quality constraints to be verified. Deequ takes care of deriving the required set of metrics to be computed on the data. Deequ generates a data quality report, which contains the result of the constraint verification.
- Constraint suggestion – You can choose to define your own custom data quality constraints or use the automated constraint suggestion methods that profile the data to infer useful constraints.
- Python wrappers – You can call each Deequ function using Python syntax. The wrappers translate the commands to the underlying Deequ calls and return their response.
Use case overview
As a running example, we use a customer review dataset provided by Amazon on Amazon S3. We intentionally follow the example in the post Test data quality at scale with Deequ to show the similarity in functionality and implementation. We begin the way many data science projects do: with initial data exploration and assessment in a Jupyter notebook.
If you’d like to follow along with a live Jupyter notebook, check out the notebook on our GitHub repo.
During the data exploration phase, you want to easily answer some basic questions about the data:
- Are the fields that are supposed to contain unique values really unique? Are there fields that are missing values?
- How many distinct categories are there in the categorical fields?
- Are there correlations between some key features?
- If there are two supposedly similar datasets (such as different categories or different time periods), are they really similar?
We also show you how to scale this approach to large-scale datasets, using the same code on an Amazon EMR cluster. This is how you’d likely do your ML training, and later as you move into a production setting.
Starting a PySpark session in a SageMaker notebook
To follow along with this post, open up a SageMaker notebook instance, clone the PyDeequ GitHub on the Sagemaker notebook instance, and run the test_data_quality_at_scale.ipynb notebook from the tutorials directory from the PyDeequ repository.
Let’s install our dependencies first in a terminal window:
Next, in a cell of our SageMaker notebook, we need to create a PySpark session:
Load the dataset containing reviews for the category
Electronics into our Jupyter notebook:
After you load the DataFrame, you can run
df.printSchema() to view the schema of the dataset:
Before we define checks on the data, we want to calculate some statistics on the dataset; we call them metrics. As with Deequ, PyDeequ supports a rich set of metrics. For more information, see Test data quality at scale with Deequ or the GitHub repo. In the following example, we use the AnalysisRunner to capture the metrics you’re interested in:
The following table summarizes our findings.
From this, we learn the following:
review_idhas no missing values and approximately 3,010,972 unique values
- 9% of reviews have a
star_ratingof 4 or higher
star_ratingare not correlated
total_votesare strongly correlated
- The average
- The dataset contains 3,120,938 reviews
Defining and running tests for data
After analyzing and understanding the data, we want to verify that the properties we have derived also hold for new versions of the dataset. By defining assertions on the data distribution as part of a data pipeline, we can ensure that every processed dataset is of high quality, and that any application consuming the data can rely on it.
- At least 3 million rows in total
review_idis never NULL
star_ratinghas a minimum of 1.0 and maximum of 5.0
yeardoes not contain negative values
This is the code that reflects the previous statements. For information about all available checks, see the GitHub repo. You can run this directly in the Spark shell as previously explained:
run(), PyDeequ translates your test description into Deequ, which translates it into a series of Spark jobs that are run to compute metrics on the data. Afterwards, it invokes your assertion functions (for example,
lambda x: x == 1.0 for the minimum star rating check) on these metrics to see if the constraints hold on the data. The following table summarizes our findings.
review_id column isn’t unique, which resulted in a failure of the check on uniqueness. We can also look at all the metrics that Deequ computed for this check by running the following:
The following table summarizes our findings.
Automated constraint suggestion
If you own a large number of datasets or if your dataset has many columns, it may be challenging for you to manually define appropriate constraints. Deequ can automatically suggest useful constraints based on the data distribution. Deequ first runs a data profiling method and then applies a set of rules on the result. For more information about how to run a data profiling method, see the GitHub repo.
The result contains a list of constraints with descriptions and Python code, so that you can directly apply it in your data quality checks. Call
print(json.dumps(result_json)) to inspect the suggested constraints; the following table shows a subset.
You can explore the other tutorials in the PyDeequ GitHub repo.
Scaling to production
So far, we’ve shown you how to use these capabilities in the context of data exploration using a Jupyter notebook running on a SageMaker notebook instance. As your project matures, you need to use the same capabilities on larger and larger datasets, and in a production environment. With PyDeequ, it’s easy to make that transition. The following diagram illustrates deployment options for local and production purposes on AWS.
Amazon EMR and AWS Glue interface with PyDeequ through the PySpark drivers that PyDeequ utilizes as its main engine. PyDeequ can run as a PySpark application in both contexts when the Deequ JAR is added the Spark context. You can run PyDeequ’s data validation toolkit after the Spark context and drivers are configured and your data is loaded into a DataFrame. We describe the Amazon EMR configuration options and use cases in this section (configurations 2 and 3 in the diagram).
Data exploration from a SageMaker notebook via an EMR cluster
As shown in configuration 2 in the diagram, you can connect to an EMR cluster from a SageMaker notebook to run PyDeequ. This enables you to explore much larger volumes of data than you can using a single notebook. Your Amazon EMR cluster must be running Spark v2.4.6, available with Amazon EMR version 5.31 or higher, in order to work with PyDeequ. After you have a running cluster that has those components and a SageMaker notebook, you configure a
SparkSession object using the following template to connect to your cluster. For more information about connecting a SageMaker notebook to Amazon EMR or the necessary IAM permissions, see Submitting User Applications with spark-submit.
In the SageMaker notebook, run the following JSON in a cell before you start your
SparkSession to configure your EMR cluster:
SparkSession object in a cell after the preceding configuration by running
spark. Then install PyDeequ onto your EMR cluster using the
SparkContext (default named
sc) with the following command:
Now you can start using PyDeequ from your notebook to run the same statements as before, but with much larger volumes of data.
Running a transient EMR cluster
Another way to leverage the power of an EMR cluster is to treat it as a transient cluster and run it in a headless configuration, as shown in configuration 3 in the diagram. We use
spark-submit in an
EMR add-step to run PyDeequ on Amazon EMR. For each of the following steps, make sure to replace the values in brackets accordingly.
- Create a bootstrap shell script and upload it to an S3 bucket. The following code is an example of
- Create an EMR cluster via the AWS Command Line Interface (AWS CLI):
- Create your PySpark PyDeequ run script and upload into Amazon S3. The following code is our example of
- When your cluster is running and in the
WAITINGstage, submit your Spark job to Amazon EMR via the AWS CLI:
Congratulations, you have now submitted a PyDeequ PySpark job to Amazon EMR. Give the job a few minutes to run, after which you can view your results at the S3 output path specified on the last line of
Afterwards, remember to clean up your results and spin down the EMR cluster using the following command:
Now you can use Amazon EMR to process large datasets in batch using PyDeequ to plug into your pipelines and provide scalable tests on your data.
More examples on GitHub
You can find examples of more advanced features on the Deequ GitHub page:
- Deequ provides more than data quality checks with fixed thresholds. Learn how to use anomaly detection on data quality metrics to apply tests on metrics that change over time.
- Deequ offers support for storing and loading metrics. Learn how to use the MetricsRepository for this use case.
- If your dataset grows over time or is partitioned, you can use Deequ’s incremental metrics computation For each partition, Deequ stores a state for each computed metric. To compute metrics for the union of partitions, Deequ can use these states to efficiently derive overall metrics without reloading the data.
This post showed you how to use PyDeequ for calculating data quality metrics, verifying data quality metrics, and profiling data to automate the configuration of data quality checks. PyDeequ is available via
pip install and on GitHub now for you to build your own data quality management pipeline.
Learn more about the inner workings of Deequ in the VLDB 2018 paper Automating large-scale data quality verification.
Stay tuned for another post demonstrating production workflows on AWS Glue.
About the Authors
Calvin Wang is a Data Scientist at AWS AI/ML. He holds a B.S. in Computer Science from UC Santa Barbara and loves using machine learning to build cool stuff.
Chris Ghyzel is a Data Engineer for AWS Professional Services. Currently, he is working with customers to integrate machine learning solutions on AWS into their production pipelines.
Veronika Megler, PhD, is Principal Data Scientist for Amazon.com Consumer Packaging. Until recently she was the Principal Data Scientist for AWS Professional Services. She enjoys adapting innovative big data, AI, and ML technologies to help companies solve new problems, and to solve old problems more efficiently and effectively. Her work has lately been focused more heavily on economic impacts of ML models and exploring causality.