Skip to main content
Version: 0.8

Training Data Generation with Stream Ingest API

To backfill data into the online and/or offline store with Stream Ingest API, a batch_config can be included in the PushSource. Its important to note that the schema of the Batch Source must contain at least all the columns defined in the schema of the Push Source. Alternatively, a post_processor can be used in the batch_config to align column names and data types between the Batch Source and the Push Source schema.

Below is an example PushSource impressions_event_source with a batch_config for backfilling data to the online and offline store. The post_processor ensures that the schema of the batch source matches the input_schema for the Push Source.

from tecton import PushSource, HiveConfig
from tecton.types import String, Int64, Timestamp, Field

input_schema = [
Field(name="user_id", dtype=String),
Field(name="timestamp", dtype=Timestamp),
Field(name="clicked", dtype=Int64),
]


def post_processor_batch(df):
from pyspark.sql.functions import col

df = df.select(
col("user_id").cast("string").alias("user_id"),
col("timestamp").cast("timestamp").alias("timestamp"),
col("clicked").cast("long").alias("clicked"),
)
return df


impressions_event_source = PushSource(
name="impressions_event_source",
schema=input_schema,
batch_config=HiveConfig(
database="demo_ads",
table="impressions_batch",
post_processor=post_processor_batch,
timestamp_field="timestamp",
),
)

Below is a Stream Feature View using the above Push Source, with additional configurations such as batch_schedule and an optional manual_trigger_backfill_end_time for efficient backfills.

from datetime import datetime, timedelta
from tecton import StreamFeatureView
from ads.entities import user
from ads.data_sources.ad_impressions import impressions_event_source

click_events_fv = StreamFeatureView(
name="click_events_fv",
source=impressions_event_source,
entities=[user],
online=True,
offline=True,
feature_start_time=datetime(2022, 1, 1),
batch_schedule=timedelta(days=1),
manual_trigger_backfill_end_time=datetime(2023, 7, 1),
ttl=timedelta(days=7),
description="The count of ad clicks for a user",
)
  • When a batch_config is defined in the Push Source, any records in the associated Batch Source will be backfilled to the online and/or offline store from the feature_start_time to the manual_trigger_backfill_end_time.
  • If changes are made to feature_start_time or manual_trigger_backfill_end_time, Tecton will intelligently schedule jobs to backfill only unmaterialized data (instead of fully rematerializing the entire Feature View).
  • If manual_trigger_backfill_end_time is absent in the Feature View definition, Tecton will not automatically schedule the backfill materialization jobs and instead, you will need to trigger the jobs manually. Please refer here on more information on manually triggering materialization jobs.

Was this page helpful?

🧠 Hi! Ask me anything about Tecton!

Floating button icon