Skip to main content
Version: Beta 🚧

Source Logging with Stream Ingest API

You can also choose to generate a historical event log of records sent to the Stream Ingest API for a Stream Source and use it for training data generation, instead of bringing a batch_config. Below is an example Stream Source that has source logging enabled.

from tecton import PushConfig, StreamSource
from tecton.types import String, Int64, Timestamp, Field

input_schema = [
Field(name="user_id", dtype=String),
Field(name="timestamp", dtype=Timestamp),
Field(name="clicked", dtype=Int64),
]

stream_config_log = PushConfig(log_offline=True)
impressions_event_source = StreamSource(
name="impressions_event_source", schema=input_schema, stream_config=stream_config_log
)

Below is a Stream Feature View using the above Stream Source.

from datetime import datetime, timedelta
from tecton import StreamFeatureView
from ads.entities import user
from ads.data_sources.ad_impressions import impressions_event_source

schema = [
Field(name="user_id", dtype=String),
Field(name="timestamp", dtype=Timestamp),
Field(name="clicked", dtype=Int64),
]

click_events_fv = StreamFeatureView(
name="click_events_fv",
source=impressions_event_source,
entities=[user],
online=True,
offline=True,
feature_start_time=datetime(2022, 1, 1),
batch_schedule=timedelta(days=1),
ttl=timedelta(days=7),
description="The count of ad clicks for a user",
schema=schema,
)

Training Data Generation​

Historical data can be retrieved via the Feature View's get_historical_features() method in the Python SDK with from_source=True. Note that newly applied Feature Views using the same Stream Source can also retrieve historical data previously ingested.

Was this page helpful?

🧠 Hi! Ask me anything about Tecton!

Floating button icon