# How to Set Up Streaming Ingestion with Kafka or Kinesis Sources in Feast

> Learn to set up streaming ingestion with Kafka or Kinesis sources in Feast. This guide explains the client-side pipeline for real-time feature engineering and materialization.

- Repository: [Feast/feast](https://github.com/feast-dev/feast)
- Tags: how-to-guide
- Published: 2026-03-01

---

**Feast implements streaming ingestion as a client-side two-step pipeline that combines stream sources (`KafkaSource` or `KinesisSource`), stream feature views with optional transformation UDFs, and pluggable stream processors to materialize real-time events into the online store.**

Setting up streaming ingestion with Kafka or Kinesis sources in Feast requires orchestrating three core components defined in the feast-dev/feast repository: the streaming data source configuration, the stream feature view declaration, and the stream processor implementation. Unlike batch ingestion, streaming pipelines run entirely client-side, meaning you deploy and monitor the streaming job while Feast provides the abstraction layers to write transformed features to the online store.

## Architectural Overview

Feast's streaming architecture consists of four primary components defined across the repository:

1. **Streaming Data Source**: The `KafkaSource` and `KinesisSource` classes in [`sdk/python/feast/data_source.py`](https://github.com/feast-dev/feast/blob/main/sdk/python/feast/data_source.py) (lines 418-447 and 703-735 respectively) hold connection metadata including bootstrap servers, topic/stream names, message formats (JSON or Avro), and watermark thresholds.

2. **Batch Source (Optional)**: Feast recommends pairing your stream source with a batch source (e.g., `FileSource`, `BigQuerySource`) to supply historical data for point-in-time joins and back-filling capabilities.

3. **Stream Feature View**: The `@stream_feature_view` decorator in [`sdk/python/feast/stream_feature_view.py`](https://github.com/feast-dev/feast/blob/main/sdk/python/feast/stream_feature_view.py) binds your streaming source to a schema, transformation UDF, TTL, and entity definitions.

4. **Stream Processor**: The `StreamProcessor` abstract base class in [`sdk/python/feast/infra/contrib/stream_processor.py`](https://github.com/feast-dev/feast/blob/main/sdk/python/feast/infra/contrib/stream_processor.py) (lines 31-84) defines the consumption interface. Feast provides `SparkKafkaProcessor` in [`sdk/python/feast/infra/contrib/spark_kafka_processor.py`](https://github.com/feast-dev/feast/blob/main/sdk/python/feast/infra/contrib/spark_kafka_processor.py) for Kafka, while Kinesis requires a custom implementation.

The processor reads messages, decodes them according to the `message_format`, applies the optional UDF, deduplicates rows using entity join keys and event timestamps, and writes the latest values to the online store via `FeatureStore.write_to_online_store`.

## Setting Up Kafka Streaming Ingestion

### Defining Batch and Stream Sources

First, define a batch source for historical data, then create the `KafkaSource` referencing it:

```python
from feast import FileSource, KafkaSource
from feast.data_format import JsonFormat
from datetime import timedelta

# Historical batch source for back-filling

driver_stats_batch = FileSource(
    name="driver_stats_batch",
    path="data/driver_stats.parquet",
    timestamp_field="event_timestamp",
)

# Kafka streaming source configuration

driver_stats_stream = KafkaSource(
    name="driver_stats_stream",
    kafka_bootstrap_servers="localhost:9092",
    topic="drivers",
    timestamp_field="event_timestamp",
    batch_source=driver_stats_batch,
    message_format=JsonFormat(
        schema_json=(
            "driver_id integer, event_timestamp timestamp, "
            "conv_rate double, acc_rate double, created timestamp"
        )
    ),
    watermark_delay_threshold=timedelta(minutes=5),
)

```

### Creating the Stream Feature View

Register a transformation UDF using the `@stream_feature_view` decorator:

```python
from feast import stream_feature_view, Field
from feast.types import Float32

@stream_feature_view(
    entities=[driver],  # Assume driver entity defined elsewhere

    ttl=timedelta(days=365),
    mode="spark",
    schema=[
        Field(name="conv_percentage", dtype=Float32),
        Field(name="acc_percentage", dtype=Float32),
    ],
    timestamp_field="event_timestamp",
    online=True,
    source=driver_stats_stream,
)
def driver_hourly_stats(df):
    from pyspark.sql.functions import col
    return (
        df.withColumn("conv_percentage", col("conv_rate") * 100.0)
          .withColumn("acc_percentage", col("acc_rate") * 100.0)
          .drop("conv_rate", "acc_rate")
    )

```

### Running the Spark Kafka Processor

Instantiate and launch the built-in processor using `get_stream_processor_object` defined in [`sdk/python/feast/infra/contrib/stream_processor.py`](https://github.com/feast-dev/feast/blob/main/sdk/python/feast/infra/contrib/stream_processor.py) (lines 88-107):

```python
from feast import FeatureStore
from feast.infra.contrib.stream_processor import ProcessorConfig, get_stream_processor_object

fs = FeatureStore(repo_path=".")
sfv = fs.get_stream_feature_view("driver_hourly_stats")

config = ProcessorConfig(mode="spark", source="kafka")
processor = get_stream_processor_object(config=config, fs=fs, sfv=sfv)

# Materialize to online store, or "online_and_offline" for both stores

processor.ingest_stream_feature_view(to="online")

```

## Setting Up Kinesis Streaming Ingestion

### Configuring the Kinesis Source

Define a `KinesisSource` similar to Kafka, referencing your batch source:

```python
from feast import KinesisSource

driver_stats_kinesis = KinesisSource(
    name="driver_stats_kinesis",
    stream_name="drivers",
    timestamp_field="event_timestamp",
    batch_source=driver_stats_batch,
    record_format=JsonFormat(
        schema_json=(
            "driver_id integer, event_timestamp timestamp, "
            "conv_rate double, acc_rate double, created timestamp"
        )
    ),
    watermark_delay_threshold=timedelta(minutes=5),
)

```

Use the same `@stream_feature_view` pattern as Kafka, substituting the Kinesis source as the `source` parameter.

### Building a Custom Kinesis Processor

Feast does not provide a built-in Kinesis processor. Implement the `StreamProcessor` interface using Spark Structured Streaming with the Kinesis connector:

```python
from pyspark.sql import SparkSession
from feast import FeatureStore

def run_kinesis_ingestion(sfv_name: str, checkpoint_dir: str = "/tmp/kinesis_checkpoint"):
    fs = FeatureStore(repo_path=".")
    sfv = fs.get_stream_feature_view(sfv_name)
    
    spark = SparkSession.builder.appName("FeastKinesisIngestion").getOrCreate()
    
    kinesis_opts = {
        "streamName": sfv.stream_source.stream_name,
        "region": "us-east-1",
        "initialPosition": "LATEST",
    }
    
    # Read from Kinesis stream

    raw_df = spark.readStream.format("kinesis").options(**kinesis_opts).load()
    
    # Decode JSON using schema from source definition

    json_df = raw_df.selectExpr("CAST(data AS STRING) as json_str") \
        .selectExpr(f"from_json(json_str, '{sfv.stream_source.record_format.schema_json}') as data") \
        .select("data.*")
    
    # Apply transformation UDF if defined

    transformed = sfv.udf(json_df) if sfv.udf else json_df
    
    def batch_write(batch_df, batch_id):
        rows = batch_df.toPandas()
        fs.write_to_online_store(sfv.name, rows)
    
    query = (
        transformed.writeStream.foreachBatch(batch_write)
        .option("checkpointLocation", checkpoint_dir)
        .outputMode("update")
        .start()
    )
    query.awaitTermination()

```

Deploy this processor via `spark-submit` or your cluster manager. The processor uses the same `write_to_online_store` API as the built-in Kafka implementation.

## Summary

- **Feast streaming ingestion** requires defining a stream source (`KafkaSource` or `KinesisSource`), creating a stream feature view with the `@stream_feature_view` decorator, and running a stream processor to materialize data.
- **Kafka** has built-in support through `SparkKafkaProcessor`, accessible via `get_stream_processor_object` with `ProcessorConfig(mode="spark", source="kafka")`.
- **Kinesis** requires implementing a custom processor following the `StreamProcessor` abstract base class pattern, typically using Spark Structured Streaming with the Kinesis connector.
- All processing occurs **client-side**; Feast does not manage the streaming job lifecycle, so you must handle deployment, scaling, and monitoring via `spark-submit` or your orchestration tool.
- Always pair streaming sources with a **batch source** to enable historical back-filling and point-in-time joins.

## Frequently Asked Questions

### Does Feast provide a built-in Kinesis stream processor?

No. While `KinesisSource` is fully supported in [`sdk/python/feast/data_source.py`](https://github.com/feast-dev/feast/blob/main/sdk/python/feast/data_source.py) for source configuration, Feast does not include a built-in Kinesis processor implementation. You must create a custom processor using the `StreamProcessor` interface, typically adapting the `SparkKafkaProcessor` pattern to use Spark's Kinesis connector instead of Kafka.

### What is the purpose of the batch_source parameter in KafkaSource and KinesisSource?

The `batch_source` parameter associates your streaming source with a historical batch source (such as `FileSource` or `BigQuerySource`). This linkage enables Feast to perform point-in-time joins against historical data and back-fill features for training datasets. While optional, omitting it limits your ability to retrieve historical feature values.

### How does Feast handle duplicate events in the streaming pipeline?

The stream processor deduplicates incoming rows using the entity join keys and the event timestamp field defined in your stream feature view. Only the latest row per entity key is written to the online store during each micro-batch, ensuring idempotent writes even if the stream contains duplicate or out-of-order messages.

### Who is responsible for running and monitoring the streaming ingestion job?

You are responsible for the entire lifecycle. Feast operates as a client-side library; it does not launch, schedule, or monitor streaming jobs. You must deploy your processor (via `spark-submit`, Kubernetes, or similar) and ensure it has network access to both the streaming service (Kafka/Kinesis) and the Feast online store.