Stream Feature View Examples
Row-Level SQL Transformation​
from tecton import stream_feature_view, FilteredSource
from fraud.entities import user
from fraud.data_sources.transactions import transactions_stream
from datetime import datetime, timedelta
@stream_feature_view(
source=transactions_stream,
entities=[user],
mode="spark_sql",
online=True,
offline=True,
feature_start_time=datetime(2022, 5, 1),
batch_schedule=timedelta(days=1),
ttl=timedelta(days=30),
description="Last user transaction amount (stream calculated)",
)
def last_transaction_amount_sql(transactions):
return f"""
SELECT
timestamp,
user_id,
amt
FROM
{transactions}
"""
Row-Level PySpark Transformation​
from tecton import stream_feature_view, FilteredSource
from fraud.entities import user
from fraud.data_sources.transactions import transactions_stream
from datetime import datetime, timedelta
@stream_feature_view(
source=transactions_stream,
entities=[user],
mode="pyspark",
online=True,
offline=True,
feature_start_time=datetime(2022, 5, 1),
batch_schedule=timedelta(days=1),
ttl=timedelta(days=30),
description="Last user transaction amount (stream calculated)",
)
def last_transaction_amount_pyspark(transactions):
from pyspark.sql import functions as f
return transactions.select("timestamp", "user_id", "amt")
Time-Windowed Aggregations​
from tecton import stream_feature_view, FilteredSource, Aggregation
from fraud.entities import user
from fraud.data_sources.transactions import transactions_stream
from datetime import datetime, timedelta
# The following defines several sliding time window aggregations over a user's transaction amounts
@stream_feature_view(
source=FilteredSource(transactions_stream),
entities=[user],
mode="spark_sql",
aggregation_interval=timedelta(minutes=10), # Defines how frequently feature values get updated in the online store
batch_schedule=timedelta(
days=1
), # Defines how frequently batch jobs are scheduled to ingest into the offline store
aggregations=[
Aggregation(column="amt", function="sum", time_window=timedelta(hours=1)),
Aggregation(column="amt", function="sum", time_window=timedelta(days=1)),
Aggregation(column="amt", function="sum", time_window=timedelta(days=3)),
Aggregation(column="amt", function="mean", time_window=timedelta(hours=1)),
Aggregation(column="amt", function="mean", time_window=timedelta(days=1)),
Aggregation(column="amt", function="mean", time_window=timedelta(days=3)),
],
online=True,
offline=True,
feature_start_time=datetime(2022, 5, 1),
description="Transaction amount statistics and total over a series of time windows, updated every 10 minutes.",
)
def user_transaction_amount_metrics(transactions):
return f"""
SELECT
user_id,
amt,
timestamp
FROM
{transactions}
"""
Time-Windowed Aggregations with Continuous Mode​
For applications that require the most up-to-date feature data, "continuous mode" for Stream Feature Views can update feature values in less than a second after the event is available in the stream data source. With continuous mode, Tecton will process each event as it arrives, rather than waiting for the slide period to complete.
Please see the Stream Feature View Overview documentation for details on how a Stream Feature View Overview Feature View works.
Feature Definition Example​
- Set
stream_processing_mode=StreamProcessingMode.CONTINUOUS
to enable continuous event processing. - Optionally set
instance_availability="on_demand"
within thestream_cluster_config
. Spot Instances may lead to feature processing delays due to spot termination or looking for an available instance, so On-demand Instances will deliver more consistent performance.
This example Feature View shows how to configure the decorator parameters.
from tecton import stream_feature_view, FilteredSource, Aggregation, AggregationMode
from fraud.entities import user
from fraud.data_sources.transactions import transactions_stream
from datetime import datetime, timedelta
# The following defines a continuous streaming feature
# It counts the number of non-fraudulent transactions per user over a 1min, 5min and 1h time window
# The expected freshness for these features is ~5 seconds
@stream_feature_view(
source=FilteredSource(transactions_stream),
entities=[user],
mode="spark_sql",
aggregation_mode=AggregationMode.CONTINUOUS,
aggregations=[
Aggregation(column="transaction", function="count", time_window=timedelta(minutes=1)),
Aggregation(column="transaction", function="count", time_window=timedelta(minutes=30)),
Aggregation(column="transaction", function="count", time_window=timedelta(hours=1)),
],
online=True,
offline=True,
feature_start_time=datetime(2022, 5, 1),
description="Number of transactions a user has made recently",
)
def user_continuous_transaction_count(transactions):
return f"""
SELECT
user_id,
1 as transaction,
timestamp
FROM
{transactions}
"""
Stream Data Source Configuration​
If your stream data source is Kinesis, we suggest lowering the default buffering to avoid delays in event processing.
Here are some suggested parameter values for a
KinesisConfig
:
-
For Databricks users:
maxFetchDuration="200ms"
maxFetchRate="2"
minFetchPeriod="200ms"
-
For EMR users:
kinesis.executor.idleTimeBetweenReadsInMs ="200"
kinesis.executor.maxFetchTimeInMs = "200"
This example data source shows how to configure the stream options on a
KinesisConfig
with Databricks.
from tecton import (
HiveConfig,
KinesisConfig,
StreamSource,
BatchSource,
DatetimePartitionColumn,
)
from datetime import timedelta
def raw_data_deserialization(df):
from pyspark.sql.functions import col, from_json, from_utc_timestamp, when
from pyspark.sql.types import (
StructType,
StructField,
StringType,
DoubleType,
TimestampType,
BooleanType,
IntegerType,
)
payload_schema = StructType(
[
StructField("user_id", StringType(), False),
StructField("transaction_id", StringType(), False),
StructField("category", StringType(), False),
StructField("amt", StringType(), False),
StructField("is_fraud", StringType(), False),
StructField("merchant", StringType(), False),
StructField("merch_lat", StringType(), False),
StructField("merch_long", StringType(), False),
StructField("timestamp", StringType(), False),
]
)
return (
df.selectExpr("cast (data as STRING) jsonData")
.select(from_json("jsonData", payload_schema).alias("payload"))
.select(
col("payload.user_id").alias("user_id"),
col("payload.transaction_id").alias("transaction_id"),
col("payload.category").alias("category"),
col("payload.amt").cast("double").alias("amt"),
col("payload.is_fraud").cast("long").alias("is_fraud"),
col("payload.merchant").alias("merchant"),
col("payload.merch_lat").cast("double").alias("merch_lat"),
col("payload.merch_long").cast("double").alias("merch_long"),
from_utc_timestamp("payload.timestamp", "UTC").alias("timestamp"),
)
)
partition_columns = [
DatetimePartitionColumn(column_name="partition_0", datepart="year", zero_padded=True),
DatetimePartitionColumn(column_name="partition_1", datepart="month", zero_padded=True),
DatetimePartitionColumn(column_name="partition_2", datepart="day", zero_padded=True),
]
batch_config = HiveConfig(
database="demo_fraud_v2",
table="transactions",
timestamp_field="timestamp",
datetime_partition_columns=partition_columns,
)
transactions_stream = StreamSource(
name="transactions_stream",
stream_config=KinesisConfig(
stream_name="tecton-demo-fraud-data-stream",
region="us-west-2",
initial_stream_position="latest",
watermark_delay_threshold=timedelta(hours=24),
timestamp_field="timestamp",
post_processor=raw_data_deserialization,
options={"roleArn": "arn:aws:iam::706752053316:role/tecton-demo-fraud-data-cross-account-kinesis-ro"},
),
batch_config=batch_config,
)
transactions_batch = BatchSource(
name="transactions_batch",
batch_config=batch_config,
)