Stream Feature View
This feature is not supported in Tecton on Snowflake.
If you are interested in this functionality, please file a feature request.
A Stream Feature View defines transformations against a Stream Data Source and can compute features in near-real-time. It processes raw data from a streaming source (e.g. Kafka or Kinesis) and can be backfilled from a batch source that contains a historical log of stream events.
Stream Feature Transformationsβ
Stream Feature Views can run row-level Spark SQL or PySpark transformations and can apply optional time-windowed aggregations (see section below). Tecton executes these transformations as Spark jobs on your connected data platform (Databricks or EMR).
from tecton import stream_feature_view
@stream_feature_view(source=transactions_stream, entities=[user], mode="spark_sql")
def user_last_transaction_amount(transactions_stream):
return f"""
SELECT
USER_ID,
AMOUNT,
TIMESTAMP
FROM
{transactions_stream}
"""
Materialization to the Online and Offline Feature Storeβ
Online Materializationβ
When online=True
is set, Tecton will run the Stream Feature View
transformation on each event that comes in from the underlying stream source and
write it to the online store. Any previous values will be overwritten, so the
online store only has the most recent value. Feature data will be backfilled
from the Stream Data Source's log of historical events (configured via its
batch_config
). The feature_start_time
specifies how far back to backfill
features.
from tecton import stream_feature_view
from datetime import datetime, timedelta
@stream_feature_view(
source=transactions_stream,
entities=[user],
mode="spark_sql",
online=True,
feature_start_time=datetime(2020, 10, 10),
)
def user_last_transaction_amount(transactions_stream):
return f"""
SELECT
USER_ID,
AMOUNT,
TIMESTAMP
FROM
{transactions_stream}
"""
Offline Materializationβ
Feature data can also be materialized to the Offline Feature Store in order to
speed up offline queries (for testing and training data generation). When
offline=True
is set, Tecton will run the same Stream Feature View
transformation pipeline against the batch source (the historical log of stream
events) that backs the stream source. The batch_schedule
parameter determines
how often Tecton will run offline materialization jobs.
from tecton import stream_feature_view
from datetime import datetime, timedelta
@stream_feature_view(
source=transactions_stream,
entities=[user],
mode="spark_sql",
online=True,
offline=True,
feature_start_time=datetime(2020, 10, 10),
batch_schedule=timedelta(days=1),
)
def user_last_transaction_amount(transactions_stream):
return f"""
SELECT
USER_ID,
AMOUNT,
TIMESTAMP
FROM
{transactions_stream}
"""
Time-Windowed Aggregationsβ
Tecton provides built-in implementations of common time-windowed aggregations that can be applied to Stream Feature Views. These time-windowed aggregations are optimized for performance and efficiency and are applied consistently online and offline.
For a technical deep dive, check out our two-part blog post on Real-Time Aggregation Features for Machine Learning.
Time-windowed aggregations can be specified in the Stream Feature View decorator
using the aggregations
parameter. Tecton expects the Stream Feature View
transformation to select the raw events (with timestamps) to be aggregated.
Aggregations can be updated either on a sliding time window or with continuous processing for the fresher feature values.
For a list of available time-window aggregation functions and other information about the functions, refer to the Time-Window Aggregation Functions Reference.
If the aggregation you need is not supported, please make a feature request.
Sliding Time-Windowsβ
Sliding time-windows are configured with the aggregation_interval
parameter.
Tecton will update the feature value in the online store after the aggregation
interval has elapsed, assuming there was at least one event for that key.
from tecton import stream_feature_view, Aggregation
from datetime import datetime, timedelta
@stream_feature_view(
source=transactions_stream,
entities=[user],
mode="spark_sql",
online=True,
offline=True,
feature_start_time=datetime(2020, 10, 10),
batch_schedule=timedelta(days=1),
aggregation_interval=timedelta(minutes=10),
aggregations=[
Aggregation(
column="AMOUNT",
function="mean",
time_window=timedelta(hours=1),
name="average_amount_1h",
),
Aggregation(
column="AMOUNT",
function="mean",
time_window=timedelta(hours=12),
name="average_amount_12h",
),
Aggregation(
column="AMOUNT",
function="mean",
time_window=timedelta(hours=24),
name="average_amount_24h",
),
],
)
def user_transaction_amount_averages(transactions_stream):
return f"""
SELECT
USER_ID,
AMOUNT,
TIMESTAMP
FROM
{transactions_stream}
"""
When using a sliding time-window, Tecton stores partial aggregations in the form
of tiles. The tile size is defined by the aggregation_interval
parameter. At
feature request-time, Tecton's online and offline feature serving capabilities
automatically roll up the persisted tiles. This design has several key benefits:
- Significantly reduced storage requirements if you define several time windows.
- Reduced stream job memory requirements.
- Streaming features with a time window that exceeds the streaming source's retention period can be backfilled from the batch source (the historical log of stream events) that backs the stream source. Tecton will backfill historical tiles from the batch source and combine tiles that were written by the streaming source transparently at request time.
Continuous Processing for Low-Latency Ingestionβ
Continuous processing for Stream Feature Views can update feature values within
a few seconds of when the event is available in the stream data source, rather
than waiting for the slide interval to complete. To enable continuous mode, set
stream_processing_mode=StreamProcessingMode.CONTINUOUS
as shown in the example
below:
from tecton import stream_feature_view, Aggregation, StreamProcessingMode
from datetime import datetime, timedelta
@stream_feature_view(
source=transactions_stream,
entities=[user],
mode="spark_sql",
online=True,
offline=True,
feature_start_time=datetime(2020, 10, 10),
batch_schedule=timedelta(days=1),
stream_processing_mode=StreamProcessingMode.CONTINUOUS,
aggregations=[
Aggregation(
column="AMOUNT",
function="mean",
time_window=timedelta(hours=1),
name="average_amount_1h",
),
Aggregation(
column="AMOUNT",
function="mean",
time_window=timedelta(hours=12),
name="average_amount_12h",
),
Aggregation(
column="AMOUNT",
function="mean",
time_window=timedelta(hours=24),
name="average_amount_24h",
),
],
)
def user_transaction_amount_averages(transactions_stream):
return f"""
SELECT
USER_ID,
AMOUNT,
TIMESTAMP
FROM
{transactions_stream}
"""
When using continuous processing, Tecton will store all transformed events in the online store and run the full aggregations at the time of request.
When to Use Continuous Processingβ
You should use continuous processing if your model performance depends on extremely fresh features. For example, it may be important for a fraud detection use-case for features to include previous transactions made even a few seconds prior.
You should not use continuous processing if your model can tolerate features updating every few minutes, and you are trying to optimize costs. Continuous processing may lead to higher infrastructure costs due to more frequent feature writes and checkpointing updates. Using a longer aggregation interval can have lower costs, especially if a single key may have multiple events in a short period of time that can be grouped into a single interval.
Continuous mode can cause significant S3 costs for customers using Tecton with EMR due to the frequency of writing Spark Streaming checkpoints to S3. The Databricks implementation of Spark Streaming has much lower checkpointing costs.
Productionizing a Streamβ
For a Stream Feature View used in production where late data loss is
unacceptable, it is recommended to set the Stream Data Source
watermark_delay_threshold
to your stream retention period, or at least 24
hours. This will configure Spark Structured Streaming to not drop data in the
event that it processes the events late or out-of-order. The tradeoff of a
longer watermark delay is greater amount of in-memory state used by the
streaming job.
The ttl
(time-to-live) parameterβ
The value of ttl
(a Stream Feature View parameter) affects the availability of
feature data in the online store, the generation of training feature data, and
the deletion of feature values from the online store.
For more details, see The ttl Parameter in Feature Views.
Full list of parametersβ
See theΒ API referenceΒ for the full list of parameters.