Skip to main content
Version: Beta 🚧

Creating a Stream Source with Push Config

This guide shows you how to create a StreamSource that processes records sent to the Stream Ingest API.

Defining a Stream Source wih Push Config​

A StreamSource is used by a StreamFeatureView to generate feature values using data from both the stream and batch sources. A StreamFeatureView applies the same transformation to both data sources.

In addition to the metadata needed to define Tecton objects, a StreamSource needs a schema and optionally a batch_config:

  • schema: This is the schema of the stream source, i.e. the schema of the events that will be sent to the Stream Ingest API. We will use this schema to validate the JSON users send to Tecton. The types supported by the API are the same ones supported by the Feature Server.
  • batch_config: The configuration for a batch source that backs the push source. Must contain at least all the columns defined in the schema.
    • You can use post_processor to match values to the push source schema. The batch source defined by the batch_config contains the stream's historical data. It will be used to backfill feature data into the online store.
    • If the batch_config is not specified, then the only way to populate the data source for testing is by sending records to the Stream Ingest API.
    • The value of the batch_config can be the name of an object (such as HiveConfig) or a Data Source Function. A Data Source Function offers more flexibility than an object.

The following example declares a StreamSource object.

from tecton import StreamSource, HiveConfig, PushConfig
from tecton.types import String, Int64, Timestamp, Field

input_schema = [
Field(name="user_id", dtype=String),
Field(name="timestamp", dtype=Timestamp),
Field(name="clicked", dtype=Int64),
]


def post_processor_batch(df):
from pyspark.sql.functions import col

df = df.select(
col("user_id").cast("string").alias("user_id"),
col("timestamp").cast("timestamp").alias("timestamp"),
col("clicked").cast("long").alias("clicked"),
)
return df


stream_config = PushConfig(log_offline=False)

impressions_event_source = StreamSource(
name="impressions_event_source",
schema=input_schema,
batch_config=HiveConfig(
database="demo_ads",
table="impressions_batch",
post_processor=post_processor_batch,
timestamp_field="timestamp",
),
stream_config=stream_config,
description="Sample Push Source for ad impression events",
)

Was this page helpful?

🧠 Hi! Ask me anything about Tecton!

Floating button icon