How to Set Up Streaming Ingestion with Kafka or Kinesis Sources in Feast

Feast implements streaming ingestion as a client-side two-step pipeline that combines stream sources (KafkaSource or KinesisSource), stream feature views with optional transformation UDFs, and pluggable stream processors to materialize real-time events into the online store.

Setting up streaming ingestion with Kafka or Kinesis sources in Feast requires orchestrating three core components defined in the feast-dev/feast repository: the streaming data source configuration, the stream feature view declaration, and the stream processor implementation. Unlike batch ingestion, streaming pipelines run entirely client-side, meaning you deploy and monitor the streaming job while Feast provides the abstraction layers to write transformed features to the online store.

Architectural Overview

Feast's streaming architecture consists of four primary components defined across the repository:

  1. Streaming Data Source: The KafkaSource and KinesisSource classes in sdk/python/feast/data_source.py (lines 418-447 and 703-735 respectively) hold connection metadata including bootstrap servers, topic/stream names, message formats (JSON or Avro), and watermark thresholds.

  2. Batch Source (Optional): Feast recommends pairing your stream source with a batch source (e.g., FileSource, BigQuerySource) to supply historical data for point-in-time joins and back-filling capabilities.

  3. Stream Feature View: The @stream_feature_view decorator in sdk/python/feast/stream_feature_view.py binds your streaming source to a schema, transformation UDF, TTL, and entity definitions.

  4. Stream Processor: The StreamProcessor abstract base class in sdk/python/feast/infra/contrib/stream_processor.py (lines 31-84) defines the consumption interface. Feast provides SparkKafkaProcessor in sdk/python/feast/infra/contrib/spark_kafka_processor.py for Kafka, while Kinesis requires a custom implementation.

The processor reads messages, decodes them according to the message_format, applies the optional UDF, deduplicates rows using entity join keys and event timestamps, and writes the latest values to the online store via FeatureStore.write_to_online_store.

Setting Up Kafka Streaming Ingestion

Defining Batch and Stream Sources

First, define a batch source for historical data, then create the KafkaSource referencing it:

from feast import FileSource, KafkaSource
from feast.data_format import JsonFormat
from datetime import timedelta

# Historical batch source for back-filling

driver_stats_batch = FileSource(
    name="driver_stats_batch",
    path="data/driver_stats.parquet",
    timestamp_field="event_timestamp",
)

# Kafka streaming source configuration

driver_stats_stream = KafkaSource(
    name="driver_stats_stream",
    kafka_bootstrap_servers="localhost:9092",
    topic="drivers",
    timestamp_field="event_timestamp",
    batch_source=driver_stats_batch,
    message_format=JsonFormat(
        schema_json=(
            "driver_id integer, event_timestamp timestamp, "
            "conv_rate double, acc_rate double, created timestamp"
        )
    ),
    watermark_delay_threshold=timedelta(minutes=5),
)

Creating the Stream Feature View

Register a transformation UDF using the @stream_feature_view decorator:

from feast import stream_feature_view, Field
from feast.types import Float32

@stream_feature_view(
    entities=[driver],  # Assume driver entity defined elsewhere

    ttl=timedelta(days=365),
    mode="spark",
    schema=[
        Field(name="conv_percentage", dtype=Float32),
        Field(name="acc_percentage", dtype=Float32),
    ],
    timestamp_field="event_timestamp",
    online=True,
    source=driver_stats_stream,
)
def driver_hourly_stats(df):
    from pyspark.sql.functions import col
    return (
        df.withColumn("conv_percentage", col("conv_rate") * 100.0)
          .withColumn("acc_percentage", col("acc_rate") * 100.0)
          .drop("conv_rate", "acc_rate")
    )

Running the Spark Kafka Processor

Instantiate and launch the built-in processor using get_stream_processor_object defined in sdk/python/feast/infra/contrib/stream_processor.py (lines 88-107):

from feast import FeatureStore
from feast.infra.contrib.stream_processor import ProcessorConfig, get_stream_processor_object

fs = FeatureStore(repo_path=".")
sfv = fs.get_stream_feature_view("driver_hourly_stats")

config = ProcessorConfig(mode="spark", source="kafka")
processor = get_stream_processor_object(config=config, fs=fs, sfv=sfv)

# Materialize to online store, or "online_and_offline" for both stores

processor.ingest_stream_feature_view(to="online")

Setting Up Kinesis Streaming Ingestion

Configuring the Kinesis Source

Define a KinesisSource similar to Kafka, referencing your batch source:

from feast import KinesisSource

driver_stats_kinesis = KinesisSource(
    name="driver_stats_kinesis",
    stream_name="drivers",
    timestamp_field="event_timestamp",
    batch_source=driver_stats_batch,
    record_format=JsonFormat(
        schema_json=(
            "driver_id integer, event_timestamp timestamp, "
            "conv_rate double, acc_rate double, created timestamp"
        )
    ),
    watermark_delay_threshold=timedelta(minutes=5),
)

Use the same @stream_feature_view pattern as Kafka, substituting the Kinesis source as the source parameter.

Building a Custom Kinesis Processor

Feast does not provide a built-in Kinesis processor. Implement the StreamProcessor interface using Spark Structured Streaming with the Kinesis connector:

from pyspark.sql import SparkSession
from feast import FeatureStore

def run_kinesis_ingestion(sfv_name: str, checkpoint_dir: str = "/tmp/kinesis_checkpoint"):
    fs = FeatureStore(repo_path=".")
    sfv = fs.get_stream_feature_view(sfv_name)
    
    spark = SparkSession.builder.appName("FeastKinesisIngestion").getOrCreate()
    
    kinesis_opts = {
        "streamName": sfv.stream_source.stream_name,
        "region": "us-east-1",
        "initialPosition": "LATEST",
    }
    
    # Read from Kinesis stream

    raw_df = spark.readStream.format("kinesis").options(**kinesis_opts).load()
    
    # Decode JSON using schema from source definition

    json_df = raw_df.selectExpr("CAST(data AS STRING) as json_str") \
        .selectExpr(f"from_json(json_str, '{sfv.stream_source.record_format.schema_json}') as data") \
        .select("data.*")
    
    # Apply transformation UDF if defined

    transformed = sfv.udf(json_df) if sfv.udf else json_df
    
    def batch_write(batch_df, batch_id):
        rows = batch_df.toPandas()
        fs.write_to_online_store(sfv.name, rows)
    
    query = (
        transformed.writeStream.foreachBatch(batch_write)
        .option("checkpointLocation", checkpoint_dir)
        .outputMode("update")
        .start()
    )
    query.awaitTermination()

Deploy this processor via spark-submit or your cluster manager. The processor uses the same write_to_online_store API as the built-in Kafka implementation.

Summary

  • Feast streaming ingestion requires defining a stream source (KafkaSource or KinesisSource), creating a stream feature view with the @stream_feature_view decorator, and running a stream processor to materialize data.
  • Kafka has built-in support through SparkKafkaProcessor, accessible via get_stream_processor_object with ProcessorConfig(mode="spark", source="kafka").
  • Kinesis requires implementing a custom processor following the StreamProcessor abstract base class pattern, typically using Spark Structured Streaming with the Kinesis connector.
  • All processing occurs client-side; Feast does not manage the streaming job lifecycle, so you must handle deployment, scaling, and monitoring via spark-submit or your orchestration tool.
  • Always pair streaming sources with a batch source to enable historical back-filling and point-in-time joins.

Frequently Asked Questions

Does Feast provide a built-in Kinesis stream processor?

No. While KinesisSource is fully supported in sdk/python/feast/data_source.py for source configuration, Feast does not include a built-in Kinesis processor implementation. You must create a custom processor using the StreamProcessor interface, typically adapting the SparkKafkaProcessor pattern to use Spark's Kinesis connector instead of Kafka.

What is the purpose of the batch_source parameter in KafkaSource and KinesisSource?

The batch_source parameter associates your streaming source with a historical batch source (such as FileSource or BigQuerySource). This linkage enables Feast to perform point-in-time joins against historical data and back-fill features for training datasets. While optional, omitting it limits your ability to retrieve historical feature values.

How does Feast handle duplicate events in the streaming pipeline?

The stream processor deduplicates incoming rows using the entity join keys and the event timestamp field defined in your stream feature view. Only the latest row per entity key is written to the online store during each micro-batch, ensuring idempotent writes even if the stream contains duplicate or out-of-order messages.

Who is responsible for running and monitoring the streaming ingestion job?

You are responsible for the entire lifecycle. Feast operates as a client-side library; it does not launch, schedule, or monitor streaming jobs. You must deploy your processor (via spark-submit, Kubernetes, or similar) and ensure it has network access to both the streaming service (Kafka/Kinesis) and the Feast online store.

Have a question about this repo?

These articles cover the highlights, but your codebase questions are specific. Give your agent direct access to the source. Share this with your agent to get started:

Share the following with your agent to get started:
curl -s "https://instagit.com/install.md"

Works with
Claude Codex Cursor VS Code OpenClaw Any MCP Client

Maintain an open-source project? Get it listed too →