Data Quality Monitoring with Great Expectations

A full tutorial of a basic workflow
great_expectations
tutorial
long
Author

Matt Triano

Published

June 23, 2023

Modified

June 27, 2023

Imports and path-definition
from collections import Counter
import datetime as dt
from pathlib import Path
from urllib.request import urlretrieve

import geopandas as gpd
import pandas as pd

PROJECT_DIR = Path(".").resolve()
PROJECT_DATA_DIR = PROJECT_DIR.joinpath("data")

Great Expectations (or GX for short) is an open-source Python-based library that brings the idea of “testing” to your data. It enables you to define expectations for properties of your datasets (like records per batch, distribution of values in a column, columns in a table, etc) and check that the data meets those expectations when the data is updated.

Workflow Overview

The high-level great_expectations workflow follows this pattern:

  1. Install great_expectations.
  2. Create (or load) a DataContext for your project.
  3. Connect Datasources to your DataContext.
  4. Define DataAssets
  5. Define Expectations for your DataAssets.
  6. Set Checkpoints to validate your DataAssets.

In this post, I’ll demonstrate this workflow with local-file-based DataAssets in a local filesystem DataContext (although great_expectations also works with SQL-based and cloud-bucket-based DataAssets).

Sample Data Collection and Preparation

Many data pipelines process data in discrete, periodic batches. I’m going to simulate that situation by downloading a dataset, splitting it into 1-month batches (based on a datetime-like column), and then write each of those 1-month batches to file. I’m going to use the Food Inspection dataset on Chicago’s public data portal.

Step 0: Great Expectations Setup

First, you’ll need to install the great_expectations. If you already have conda installed on your machine, you can easily set up a conda env just like the one used to run this notebook by: 1. copying the gx_env_environment.yml file in the same dir as this notebook file to your machine, 2. open a terminal and navigate to the dir with that new file, and 3. run command conda env create -f environment.yml

If you don’t have conda but would like to, check out my opinionated conda install and configuration post.

Collecting and preprocessing sample data for this post
PROJECT_DATA_DIR.mkdir(exist_ok=True)

# First, we need to download the data to our local machine.
url = "https://data.cityofchicago.org/api/geospatial/4ijn-s7e5?method=export&format=GeoJSON"
full_file_path = PROJECT_DATA_DIR.joinpath("full_food_inspections.geojson")
if not full_file_path.is_file():
    urlretrieve(url=url, filename=full_file_path)
food_inspection_gdf = gpd.read_file(full_file_path)

# For some reason, Socrata adds on these four always-null location columns on
#   to geospatial exports. I'm going to remove them.
location_cols = ["location_state", "location_zip", "location_address", "location_city"]
# uncomment the lines below to confirm those columns are always empty
# print("Rows with a non-null value in these location_xxx columns:")
# display(food_inspection_gdf[location_cols].notnull().sum())
food_inspection_gdf = food_inspection_gdf.drop(columns=location_cols)

# That column ordering is a bit chaotic, so I'll reorder them (for readability).
col_order = [
    "inspection_id", "inspection_date", "dba_name", "aka_name", "license_", "facility_type",
    "risk", "inspection_type", "results", "address", "city", "state", "zip", "violations",
    "longitude", "latitude", "geometry"
]
food_inspection_gdf = food_inspection_gdf[col_order].copy()

# I also want to break this into batches based on the dates, so I need to cast
#   the `inspection_date` to a datetime type.
food_inspection_gdf["inspection_date"] = pd.to_datetime(
    food_inspection_gdf["inspection_date"]
)

# I'll also cast string and numeric features to their proper dtypes.
# food_inspection_gdf = food_inspection_gdf.convert_dtypes()
food_inspection_gdf["inspection_id"] = food_inspection_gdf["inspection_id"].astype("Int64")
food_inspection_gdf["longitude"] = food_inspection_gdf["longitude"].astype(float)
food_inspection_gdf["latitude"] = food_inspection_gdf["latitude"].astype(float)

# I'll also just make all string uppercase (to reduce cardinality)
str_cols = list(food_inspection_gdf.head(2).select_dtypes(include="object").columns)
food_inspection_gdf[str_cols] = food_inspection_gdf[str_cols].apply(lambda x: x.str.upper())

In the (folded up) cell below, we split the dataset into batches and write each batch to file in this post’s ./data directory.

And here we split the dataset into batches and write each batch to file in this post’s ./data directory.
# I want to split the data into 1-month batches, so I need to get the first day of the month
#   for every month between the earliest inspection and the month after the latest inspection
#   in our food inspection dataset.
month_start_dates = pd.date_range(
    start=food_inspection_gdf["inspection_date"].min() + pd.DateOffset(months=-1),
    end=food_inspection_gdf["inspection_date"].max(),
    freq="MS",
)

# Here, we'll iterate through each of those month_start_dates, extract the batch of data,
#   format a filename containing the month_start_date, and write the batch to file.
for month_start_date in month_start_dates:
    batch_period = pd.to_datetime(month_start_date).strftime("%Y_%m")
    batch_data = food_inspection_gdf.loc[
        food_inspection_gdf["inspection_date"].between(
            left=month_start_date,
            right=month_start_date + pd.DateOffset(months=1),
            inclusive="left")
    ].copy()
    batch_file_path = PROJECT_DATA_DIR.joinpath(f"food_inspection_batch_{batch_period}.parquet")
    if not batch_file_path.is_file():
        batch_data.to_parquet(batch_file_path, index=False)

Step 1: Create or Load Great Expectations DataContext

A DataContext is your primary tool for configuring your project and accessing project resources or GX methods. When you first create a DataContext for your project, GX will create a directory named /great_expectations in the project_root_dir directory.

The code below will create a new DataContext if one doesn’t already exist in the PROJECT_DIR directory, and then load a DataContext instance from that PROJECT_DIR. Great Expectations defaults to collecting anonymized usage statistics, but you can disable that for your context by setting usage_statistics_enabled=False.

import great_expectations as gx
from great_expectations.data_context import FileDataContext

context = FileDataContext.create(project_root_dir=PROJECT_DIR, usage_statistics_enabled=False)

This tutorial uses a local FileDataContext, but GX also supports CloudDataContexts and EphemeralDataContexts.

Kinds of DataContexts
[el for el in dir(gx.data_context) if el.endswith("Context")]
['AbstractDataContext',
 'BaseDataContext',
 'CloudDataContext',
 'DataContext',
 'EphemeralDataContext',
 'FileDataContext']

Step 2: Create or load a Datasource

A GX Datasource connects you to a source of data and gives you methods to define and access DataAssets.

The code below will check the DataContext context for a Datasource with the given datasource_name, and either load or create a local filesystem Datasource instance.

datasource_name = "food_inspection_datasource"

if any(el["name"] == datasource_name for el in context.list_datasources()):
    print(f"Datasource with name '{datasource_name}' found; loading now")
    datasource = context.get_datasource(datasource_name)
else:
    print(f"No Datasource with name '{datasource_name}' found; creating now")
    datasource = context.sources.add_pandas_filesystem(
        name=datasource_name,
        base_directory=PROJECT_DATA_DIR
    )
No Datasource with name 'food_inspection_datasource' found; creating now
Other kinds of GX Datasources
[el for el in dir(context.sources) if el.startswith("add_") and "_update_" not in el]
['add_pandas',
 'add_pandas_abs',
 'add_pandas_dbfs',
 'add_pandas_filesystem',
 'add_pandas_gcs',
 'add_pandas_s3',
 'add_postgres',
 'add_spark',
 'add_spark_abs',
 'add_spark_dbfs',
 'add_spark_filesystem',
 'add_spark_gcs',
 'add_spark_s3',
 'add_sql',
 'add_sqlite']

Step 3: Define DataAssets in that Datasource

A GX DataAsset specifies a collection of records in a Datasource and the method for accessing those records.

The code below checks if a DataAsset with the given name exists in the datasource, loading it if it exists, or specifying it if not. In the part that specifies the DataAsset, note that we set the name of the asset, specify that the data is in parquet files, and provide a regex pattern for the file_names and also defines variable-names for the year and month parts each file_name. We can use those year and month variables to specify how DataAssets should be split into batches and the order of those batches.

data_asset_name = "food_inspections_asset"

if data_asset_name not in datasource.get_asset_names():
    print(f"Creating data asset {data_asset_name}")
    data_asset = datasource.add_parquet_asset(
        name=data_asset_name,
        batching_regex = r"food_inspection_batch_(?P<year>\d{4})_(?P<month>\d{2})\.parquet"
    )
else:
    data_asset = datasource.get_asset(data_asset_name)
data_asset = data_asset.add_sorters(["+year", "+month"])
Creating data asset food_inspections_asset
Other data file formats GX supports
[el for el in dir(datasource) if el.startswith("add_")]
['add_csv_asset',
 'add_excel_asset',
 'add_feather_asset',
 'add_fwf_asset',
 'add_hdf_asset',
 'add_html_asset',
 'add_json_asset',
 'add_orc_asset',
 'add_parquet_asset',
 'add_pickle_asset',
 'add_sas_asset',
 'add_spss_asset',
 'add_stata_asset',
 'add_xml_asset']

Step 4: Create Expectations for a DataAsset

A GX Expectation is a verifiable assertion about some property of a DataAsset, and defining Expectations both enables GX to check that data meets expectations and enables domain experts to explicitly represent and communicate Expectations for data.

GX supports hundreds of different Expectations and catalogs them in the Expectation Gallery (although not all Expectations are implemented for all kinds of Datasources). GX also provides tools to aid in several workflows for defining suites of Expectations, including the GX Data Assistant workflow (used below), which builds a suite of Expectations by profiling batches of data.

In the code below, we create a new Expectation suite (on lines 3-5), organize batches of data (on lines 6-7), and use the data assistant to profile the DataAsset based on our batches of data (on lines 8-11).

expectation_suite_name = "food_inspections_suite"

expectation_suite = context.add_or_update_expectation_suite(
    expectation_suite_name=expectation_suite_name
)
batch_request = data_asset.build_batch_request()
batches = data_asset.get_batch_list_from_batch_request(batch_request)
data_assistant_result = context.assistants.onboarding.run(
    batch_request=batch_request,
    exclude_column_names=["inspection_date", "geometry"],
)

The profiler will sequentially generate a lot of progress bars (like these) as it profiles dataset features.

Profiler output
data_assistant_result.plot_expectations_and_metrics()
64 Expectations produced, 23 Expectation and Metric plots implemented
Use DataAssistantResult.show_expectations_by_domain_type() or
DataAssistantResult.show_expectations_by_expectation_type() to show all produced Expectations

Data Assistant Plot Inspector plots

After the Data Assistant finishes profiling, it outputs results to a variable we named data_assistant_result, and you can explore the results across batches by calling data_assistant_result.plot_expectations_and_metrics() and selecting the expectation and column you’re interested in.

Total row count per batch Unique facility_type values per batch Unique inspection_type values per batch Unique result values per batch

Extracting, [optionally] Editing, and Committing our Expectation Suite to our DataContext

If we’re content with the Expectations generated by the Data Assistant’s profiler, we can simply extract the Expectations and add them to our context via

expectation_suite = data_assistant_result.get_expectation_suite(
    expectation_suite_name=expectation_suite_name
)
saved_suite = context.add_or_update_expectation_suite(expectation_suite=expectation_suite)

In a future post I’ll go into further depth on methods for editing Expectations, but here I’ll show how to inspect and remove Expectations.

expectation_suite = data_assistant_result.get_expectation_suite(
    expectation_suite_name=expectation_suite_name
)
Counts of Expectations by Type
print(f"Counts of Expectations by Expectation-type:")
expecs_by_type = expectation_suite.get_grouped_and_ordered_expectations_by_expectation_type()
display(Counter([ex._expectation_type for ex in expecs_by_type]))
Counts of Expectations by Expectation-type:
Counter({'expect_column_value_lengths_to_be_between': 12,
         'expect_column_values_to_match_regex': 11,
         'expect_column_proportion_of_unique_values_to_be_between': 7,
         'expect_column_unique_value_count_to_be_between': 7,
         'expect_column_values_to_be_in_set': 7,
         'expect_column_values_to_not_be_null': 4,
         'expect_column_max_to_be_between': 2,
         'expect_column_mean_to_be_between': 2,
         'expect_column_median_to_be_between': 2,
         'expect_column_min_to_be_between': 2,
         'expect_column_quantile_values_to_be_between': 2,
         'expect_column_stdev_to_be_between': 2,
         'expect_column_values_to_be_between': 2,
         'expect_table_columns_to_match_set': 1,
         'expect_table_row_count_to_be_between': 1})
Counts of Expectations by Column
print(f"Counts of Expectations by Column-name:")
expecs_by_col = expectation_suite.get_grouped_and_ordered_expectations_by_column()
expec_count_by_col = {col: len(col_expecs) for col, col_expecs in expecs_by_col[0].items()}
display(sorted(expec_count_by_col.items(), key=lambda x: x[1], reverse=True))
Counts of Expectations by Column-name:
[('longitude', 7),
 ('latitude', 7),
 ('inspection_type', 6),
 ('results', 6),
 ('facility_type', 5),
 ('risk', 5),
 ('city', 5),
 ('state', 5),
 ('zip', 5),
 ('dba_name', 3),
 ('_nocolumn', 2),
 ('address', 2),
 ('aka_name', 2),
 ('license_', 2),
 ('violations', 2)]
Expectation-related methods on our expectation_suite
[el for el in dir(expectation_suite) if "_expectation" in el]
['_add_expectation',
 '_get_expectations_by_domain_using_accessor_method',
 '_validate_expectation_configuration_before_adding',
 'add_expectation',
 'add_expectation_configurations',
 'append_expectation',
 'find_expectation_indexes',
 'find_expectations',
 'get_column_expectations',
 'get_column_pair_expectations',
 'get_grouped_and_ordered_expectations_by_column',
 'get_grouped_and_ordered_expectations_by_domain_type',
 'get_grouped_and_ordered_expectations_by_expectation_type',
 'get_multicolumn_expectations',
 'get_table_expectations',
 'patch_expectation',
 'remove_all_expectations_of_type',
 'remove_expectation',
 'replace_expectation',
 'show_expectations_by_domain_type',
 'show_expectations_by_expectation_type']
Inspecting Expectation types for a given column
col_name = "longitude"
[ex["expectation_type"] for ex in expectation_suite.get_column_expectations() if ex["kwargs"]["column"] == col_name]
['expect_column_min_to_be_between',
 'expect_column_max_to_be_between',
 'expect_column_values_to_be_between',
 'expect_column_quantile_values_to_be_between',
 'expect_column_median_to_be_between',
 'expect_column_mean_to_be_between',
 'expect_column_stdev_to_be_between']

Some Expectations are redundant, such as expect_column_min_to_be_between and expect_column_max_to_be_between. I’ll remove them.

col_name = "longitude"
expectation_types_to_remove = [
    "expect_column_min_to_be_between", "expect_column_max_to_be_between"
]
col_expectations_w_type = [
    ex for ex in expectation_suite.get_column_expectations()
    if (ex["kwargs"]["column"] == col_name) and (ex["expectation_type"] in expectation_types_to_remove)
]
col_expectations_w_type
[{"kwargs": {"column": "longitude", "strict_min": false, "min_value": -87.91442843927047, "strict_max": false, "max_value": -87.81649552747085}, "meta": {"profiler_details": {"metric_configuration": {"metric_name": "column.min", "domain_kwargs": {"column": "longitude"}, "metric_value_kwargs": null}, "num_batches": 162}}, "expectation_type": "expect_column_min_to_be_between"},
 {"kwargs": {"column": "longitude", "strict_min": false, "min_value": -87.5510612280602, "strict_max": false, "max_value": -87.5250941359867}, "meta": {"profiler_details": {"metric_configuration": {"metric_name": "column.max", "domain_kwargs": {"column": "longitude"}, "metric_value_kwargs": null}, "num_batches": 162}}, "expectation_type": "expect_column_max_to_be_between"}]
print(f"Expectations prior to removal: {len(expectation_suite.expectations)}")
for expectation_to_remove in col_expectations_w_type:
    removed_expec = expectation_suite.remove_expectation(expectation_to_remove)
print(f"Expectations after removal:    {len(expectation_suite.expectations)}")
Expectations prior to removal: 64
Expectations after removal:    62

We can also get rid of every instance of an Expectation type.

print(f"Removing Expectation types:")
for expec_type in expectation_types_to_remove:
    print(f"  - {expec_type}")
print(f"Expectations prior to removal: {len(expectation_suite.expectations)}")
removed_expecs = expectation_suite.remove_all_expectations_of_type(expectation_types=expectation_types_to_remove)
print(f"Expectations after removal:    {len(expectation_suite.expectations)}")
Removing Expectation types:
  - expect_column_min_to_be_between
  - expect_column_max_to_be_between
Expectations prior to removal: 62
Expectations after removal:    60

After reviewing and editing Expectations, the Expectation Suite must be committed to the DataContext.

saved_suite = context.add_or_update_expectation_suite(expectation_suite=expectation_suite)

Step 5: Setup a Checkpoint to check Expectations

A GX Checkpoint configures the validation process for an Expectation Suite.

Specifically, a Checkpoint defines: * the Expectation Suite to evaluate, * the data Batches to evaluate against, and * the actions to take after evaluation.

We’ll take the actions of compiling a report of results and committing our Checkpoint to our DataContext, but you can also configure a Checkpoint to send results via a email or Slack notification.

checkpoint_name = "food_inspections_checkpoint"

checkpoint = gx.checkpoint.SimpleCheckpoint(
    name=checkpoint_name,
    data_context=context,
    validations=[
        {
            "batch_request": batch_request,
            "expectation_suite_name": expectation_suite_name,
        },
    ],
)
checkpoint_result = checkpoint.run()
context.build_data_docs()
{'local_site': 'file:///home/matt/projects/blogs/quarto_blog/posts/006_great_expectations_setup/great_expectations/uncommitted/data_docs/local_site/index.html'}

To view the generated validation report (or Data Docs), open the file .../great_expectations/uncommitted/data_docs/local_site/index.html and select the validation run you want to review. You may have to click Trust HTML (upper left corner in Jupyterlab) to navigate the document.

Data docs index Sample view of a validation report

context.add_checkpoint(checkpoint=checkpoint)
{
  "action_list": [
    {
      "name": "store_validation_result",
      "action": {
        "class_name": "StoreValidationResultAction"
      }
    },
    {
      "name": "store_evaluation_params",
      "action": {
        "class_name": "StoreEvaluationParametersAction"
      }
    },
    {
      "name": "update_data_docs",
      "action": {
        "class_name": "UpdateDataDocsAction"
      }
    }
  ],
  "batch_request": {},
  "class_name": "SimpleCheckpoint",
  "config_version": 1.0,
  "evaluation_parameters": {},
  "module_name": "great_expectations.checkpoint",
  "name": "food_inspections_checkpoint",
  "profilers": [],
  "runtime_configuration": {},
  "validations": [
    {
      "batch_request": {
        "datasource_name": "food_inspection_datasource",
        "data_asset_name": "food_inspections_asset",
        "options": {}
      },
      "expectation_suite_name": "food_inspections_suite"
    }
  ]
}

You can easily integrate a defined Checkpoint into a pipeline with just a few lines of code (and another dependency).

import great_expectations as gx

context = gx.get_context(context_root_dir=PROJECT_DIR.joinpath("great_expectations"))
retrieved_checkpoint = context.get_checkpoint(name="food_inspections_checkpoint")
retrieved_checkpoint_result = retrieved_checkpoint.run()
if not retrieved_checkpoint_result["success"]:
    print(f"Failed Validation Checkpoint!")
    # or raise Exception("if you'd rather handle validation failures that way")

Summary

In this notebook, we’ve built some data infrastructure that helps communicate some expectations for our data to data users and enables us to check that those expectations are still being met after every update.

Next Steps

In future posts, I’ll dive deeper into the Expectation-setting process, demonstrate a workflow with a PostgreSQL Datasource and DataAssets (where GX really shines), and explore strategies for integrating data monitoring into production ETL/ELT pipelines (like those in my personal data warehousing platform).