Source Logging with Stream Ingest API
You can also choose to generate a historical event log of records sent to the
Stream Ingest API for a Stream Source and use it for training data generation,
instead of bringing a batch_config
. Below is an example Stream Source that has
source logging enabled.
from tecton import PushConfig, StreamSource
from tecton.types import String, Int64, Timestamp, Field
input_schema = [
Field(name="user_id", dtype=String),
Field(name="timestamp", dtype=Timestamp),
Field(name="clicked", dtype=Int64),
]
stream_config_log = PushConfig(log_offline=True)
impressions_event_source = StreamSource(
name="impressions_event_source", schema=input_schema, stream_config=stream_config_log
)
Below is a Stream Feature View using the above Stream Source.
from datetime import datetime, timedelta
from tecton import StreamFeatureView
from ads.entities import user
from ads.data_sources.ad_impressions import impressions_event_source
schema = [
Field(name="user_id", dtype=String),
Field(name="timestamp", dtype=Timestamp),
Field(name="clicked", dtype=Int64),
]
click_events_fv = StreamFeatureView(
name="click_events_fv",
source=impressions_event_source,
entities=[user],
online=True,
offline=True,
feature_start_time=datetime(2022, 1, 1),
batch_schedule=timedelta(days=1),
ttl=timedelta(days=7),
description="The count of ad clicks for a user",
schema=schema,
)
Training Data Generation​
Historical data can be retrieved via the Feature View's
get_historical_features()
method in the Python SDK with from_source=True
. Note that newly applied
Feature Views using the same Stream Source can also retrieve historical data
previously ingested.