How to Integrate Feast with Great Expectations for Data Validation
To integrate Feast with Great Expectations for data validation, install the feast[ge] extra, define a profiler function using the @ge_profiler decorator that returns a Great Expectations ExpectationSuite, create a validation reference from a saved dataset, and pass that reference to RetrievalJob.to_df() to automatically validate historical features against your expectations.
Feast's Data Quality Monitoring (DQM) module provides native integration with Great Expectations, allowing you to enforce data quality constraints on historical feature retrievals within the feast-dev/feast repository. This integration compares incoming feature datasets against reference datasets using Great Expectations ExpectationSuite objects generated by user-defined profiler functions.
Architecture Overview
The integration architecture centers on three core components implemented in sdk/python/feast/dqm/:
-
GEProfilerandGEProfile: Located insdk/python/feast/dqm/profilers/ge_profiler.py, theGEProfilerclass wraps user-defined functions into aProfileobject that produces aGEProfile. TheGEProfile.validate()method executes the Great Expectations suite against incoming DataFrames. -
@ge_profilerdecorator: This decorator, defined inge_profiler.pylines 94-104, registers a function that accepts aPandasDatasetand returns anExpectationSuite, converting it into a Feast-compatible profiler. -
validation_reference: ASavedDatasetannotated with a profiler viaas_reference(), stored as a reference point for subsequent validation runs.
The data flow follows this path: historical retrieval job generates a DataFrame → GEProfile.validate() invokes great_expectations.validate() → produces a ValidationReport → raises ValidationFailed if expectations are violated.
Step-by-Step Integration
Install Feast with Great Expectations Support
Install the optional Great Expectations dependency using the ge extra declared in pyproject.toml:
pip install 'feast[ge]'
This installs great_expectations>=0.15.41,<1 as specified in the project's dependency configuration.
Define a Great Expectations Profiler
Create a profiler function decorated with @ge_profiler that receives a PandasDataset and returns an ExpectationSuite. This example from sdk/python/feast/dqm/profilers/ge_profiler.py demonstrates statistical profiling:
from feast.dqm.profilers.ge_profiler import ge_profiler
from great_expectations.core import ExpectationSuite
from great_expectations.dataset import PandasDataset
DELTA = 0.1 # tolerance for mean/quantile checks
@ge_profiler
def stats_profiler(ds: PandasDataset) -> ExpectationSuite:
# Range checks for specific columns
ds.expect_column_values_to_be_between("avg_speed", min_value=0, max_value=60, mostly=0.99)
ds.expect_column_values_to_be_between("total_miles_travelled", min_value=0, max_value=500, mostly=0.99)
# Mean-based expectations with tolerance
mean_trip = ds.trip_count.mean()
ds.expect_column_mean_to_be_between("trip_count",
min_value=mean_trip * (1 - DELTA),
max_value=mean_trip * (1 + DELTA))
mean_earn = ds.earned_per_hour.mean()
ds.expect_column_mean_to_be_between("earned_per_hour",
min_value=mean_earn * (1 - DELTA),
max_value=mean_earn * (1 + DELTA))
# Quantile checks
qs = [0.5, 0.75, 0.9, 0.95]
q_vals = ds.avg_fare.quantile(qs)
ds.expect_column_quantile_values_to_be_between(
"avg_fare",
quantile_ranges={"quantiles": qs,
"value_ranges": [[None, v] for v in q_vals]}
)
return ds.get_expectation_suite()
The @ge_profiler decorator converts this function into a GEProfiler instance compatible with Feast's DQM pipeline.
Create a Reference Dataset
Generate a validation reference from a saved historical dataset, attaching your profiler:
# Assuming `store` is a FeatureStore instance
ds = store.get_saved_dataset("my_training_ds")
validation_reference = ds.as_reference(name="reference_2021_jun", profiler=stats_profiler)
This creates a reference point containing the ExpectationSuite generated by your profiler against the reference data.
Run Retrieval with Validation
Execute a historical feature retrieval job, passing the validation reference to trigger automatic validation:
from feast.dqm.errors import ValidationFailed
# Create retrieval job
job = store.get_historical_features(
entity_df=entity_df,
features=[
"trip_stats:total_miles_travelled",
"trip_stats:total_trip_seconds",
"trip_stats:total_earned",
"trip_stats:trip_count",
"on_demand_stats:avg_fare",
"on_demand_stats:avg_trip_seconds",
"on_demand_stats:avg_speed",
"on_demand_stats:earned_per_hour",
]
)
# Retrieve with validation
try:
df = job.to_df(validation_reference=validation_reference)
except ValidationFailed as exc:
print("Validation errors:")
print(exc.validation_report) # Contains list of failing ValidationErrors
else:
print("All expectations passed – dataset is valid")
When to_df() or to_arrow() is called with validation_reference, Feast automatically runs the Great Expectations suite against the retrieved DataFrame.
Handle Validation Failures
The ValidationFailed exception, imported from feast.dqm.errors, contains a validation_report attribute detailing which expectations failed. This report includes observed values, missing counts, and other diagnostic information from Great Expectations, allowing you to inspect data quality issues programmatically.
Complete Working Example
Here is a minimal end-to-end implementation combining all components:
# 1️⃣ Install (run once)
# !pip install 'feast[ge]'
# 2️⃣ Imports
from feast import FeatureStore
from feast.dqm.profilers.ge_profiler import ge_profiler
from feast.dqm.errors import ValidationFailed
from great_expectations.core import ExpectationSuite
from great_expectations.dataset import PandasDataset
import pandas as pd
# 3️⃣ Define profiler
@ge_profiler
def my_profiler(ds: PandasDataset) -> ExpectationSuite:
ds.expect_column_values_to_be_between("avg_speed", min_value=0, max_value=60)
ds.expect_column_values_to_be_between("total_miles_travelled", min_value=0, max_value=500)
return ds.get_expectation_suite()
# 4️⃣ Load store & create reference
store = FeatureStore(".")
ref_ds = store.get_saved_dataset("my_training_ds")
reference = ref_ds.as_reference(name="ref_june_2021", profiler=my_profiler)
# 5️⃣ Prepare entity dataframe
entity_df = pd.DataFrame({
"taxi_id": ["taxi_1", "taxi_2"],
"event_timestamp": pd.to_datetime(["2021-06-01", "2021-06-02"])
})
# 6️⃣ Retrieve features with validation
job = store.get_historical_features(
entity_df=entity_df,
features=["trip_stats:total_miles_travelled", "on_demand_stats:avg_fare"]
)
try:
df = job.to_df(validation_reference=reference)
print(df.head())
except ValidationFailed as err:
print("Data quality issues detected:")
print(err.validation_report)
Key Source Files
Summary
Integrating Feast with Great Expectations for data validation involves five key steps:
- Install the
feast[ge]extra to include Great Expectations as a dependency. - Define a profiler function using the
@ge_profilerdecorator that creates anExpectationSuitefrom aPandasDataset. - Create a validation reference from a saved dataset using
as_reference(), attaching your profiler to establish a baseline. - Trigger validation by passing the reference to
RetrievalJob.to_df()orto_arrow(), which automatically runs expectations against retrieved data. - Handle failures by catching the
ValidationFailedexception fromfeast.dqm.errorsand inspecting thevalidation_reportfor detailed diagnostics.
This integration leverages the GEProfiler and GEProfile classes in sdk/python/feast/dqm/profilers/ge_profiler.py to bridge Feast's feature store operations with Great Expectations' validation engine.
Frequently Asked Questions
How do I install Feast with Great Expectations support?
Install Feast with the ge extra using pip: pip install 'feast[ge]'. This installs great_expectations>=0.15.41,<1 as defined in the project's pyproject.toml, enabling the Data Quality Monitoring module that connects Feast with Great Expectations.
What is the role of the @ge_profiler decorator in Feast?
The @ge_profiler decorator, defined in sdk/python/feast/dqm/profilers/ge_profiler.py, converts a user-defined function into a GEProfiler instance. This function must accept a Great Expectations PandasDataset and return an ExpectationSuite, allowing Feast to automatically run these expectations during feature retrieval.
How do I create a validation reference in Feast?
Create a validation reference by calling as_reference() on a saved dataset object, passing a name and your profiler function: validation_reference = ds.as_reference(name="reference_2021_jun", profiler=stats_profiler). This stores the ExpectationSuite generated from the reference data for comparison against future retrievals.
What happens when Great Expectations validation fails in Feast?
When validation fails, Feast raises a ValidationFailed exception from feast.dqm.errors. This exception contains a validation_report attribute with detailed information about which expectations failed, including observed values and missing counts from Great Expectations, allowing you to programmatically handle data quality issues.
Have a question about this repo?
These articles cover the highlights, but your codebase questions are specific. Give your agent direct access to the source. Share this with your agent to get started:
curl -s "https://instagit.com/install.md" Maintain an open-source project? Get it listed too →