Aggregation Windows
When using Tecton's Aggregation Engine you can choose between different types of aggregation windows.
Sliding Window​
The most common aggregation features aggregate over a fixed-length time window.
Example:
from datetime import datetime, timedelta
from tecton import StreamFeatureView, Entity, Aggregation, PushSource
from tecton.types import String, Timestamp, Float64, Field
# Define the entity for this feature
user = Entity(name="User", join_keys=["user_id"])
# Define a Streaming Push Source
input_schema = [
Field(name="user_id", dtype=String),
Field(name="timestamp", dtype=Timestamp),
Field(name="transaction_amount", dtype=Float64),
]
transactions_source = PushSource(
name="transactions_source",
schema=input_schema,
)
# Time window aggregation feature
transaction_aggregations = StreamFeatureView(
name="transaction_aggregations",
source=transactions_source,
entities=[user],
online=True,
offline=True,
feature_start_time=datetime(2018, 1, 1),
aggregations=[
Aggregation(column="transaction_amount", function="sum", time_window=timedelta(days=30)),
Aggregation(column="transaction_amount", function="sum", time_window=timedelta(days=180)),
Aggregation(column="transaction_amount", function="count", time_window=timedelta(days=180)),
Aggregation(column="transaction_amount", function="last", time_window=timedelta(days=180)),
Aggregation(column="transaction_amount", function="max", time_window=timedelta(days=365 * 4)),
Aggregation(column="transaction_amount", function="min", time_window=timedelta(days=365 * 4)),
],
)
The Stream Feature View example above aggregates over multiple different time windows. Each one of those time windows is of a fixed length.
The time windows slide forward continuously and provide you with the freshest possible streaming time window aggregation features in Tecton.
Sliding Windows that move by a fixed interval​
If you don't want your time windows to slide forward continuously, you have the option to determine a fixed interval by which time windows move forward.
You can specify the interval using the aggregation_interval
parameter
Example:
from tecton import stream_feature_view, FilteredSource, Aggregation
from fraud.entities import user
from fraud.data_sources.transactions import transactions_stream
from datetime import datetime, timedelta
# The following defines several sliding time window aggregations over a user's transaction amounts
@stream_feature_view(
source=FilteredSource(transactions_stream),
entities=[user],
mode="spark_sql",
aggregation_interval=timedelta(minutes=10), # Defines how frequently feature values get updated in the online store
batch_schedule=timedelta(
days=1
), # Defines how frequently batch jobs are scheduled to ingest into the offline store
aggregations=[
Aggregation(column="amt", function="sum", time_window=timedelta(hours=1)),
Aggregation(column="amt", function="sum", time_window=timedelta(days=1)),
Aggregation(column="amt", function="sum", time_window=timedelta(days=3)),
Aggregation(column="amt", function="mean", time_window=timedelta(hours=1)),
Aggregation(column="amt", function="mean", time_window=timedelta(days=1)),
Aggregation(column="amt", function="mean", time_window=timedelta(days=3)),
],
online=True,
offline=True,
feature_start_time=datetime(2022, 5, 1),
description="Transaction amount statistics and total over a series of time windows, updated every 10 minutes.",
)
def user_transaction_amount_metrics(transactions):
return f"""
SELECT
user_id,
amt,
timestamp
FROM
{transactions}
"""
The time windows of this StreamFeatureView
move forward with a fixed interval
of 10 minutes, as specified by aggregation_interval
.
Time windows that move forward by a fixed interval are only supported with Spark-based Streaming FeatureViews today
Sliding Windows that lag by a fixed interval​
A common use case in feature engineering is to calculate lagging time windows. Effectively, you ask Tecton to anchor the end of your time window not to the feature query time, but to a relative point of time in the past.
Example:
from tecton import TimeWindow
@stream_feature_view(
sources=[transactions],
entities=[user],
mode="spark_sql",
aggregation_interval=timedelta(minutes=5),
aggregations=[
Aggregation(
function="sum",
column="amt",
time_window=TimeWindow(window_size=timedelta(days=7), offset=timedelta(days=-1)),
)
],
)
def user_transaction_amount_averages(transactions):
return f"""
SELECT user_id, timestamp, amt
FROM {transactions}
"""
The example above produces a 7-day trailing sum aggregation that lags behind by
1 day, as specified by the offset
parameter. If you fetch the feature value at
1pm today, Tecton will return a value that aggregates the data between 1pm
yesterday and 1pm 8 days ago.
Lifetime Window (Coming soon)​
Lifetime windows continuously expand and are not of a fixed length.
The lifetime start time is specified by the feature_start_time
parameter.
Example:
from tecton import LifetimeWindow
# Time window aggregation feature
transaction_aggregations = StreamFeatureView(
name="transaction_aggregations",
source=transactions_source,
entities=[user],
online=True,
offline=True,
aggregations=[
Aggregation(
column="transaction_amount", function="sum", time_window=LifetimeWindow(start_time=datetime(2018, 1, 1))
)
],
)
Sliding Window Series (Coming soon)​
Sliding Window Series compute a series of time windows for each step size in a given fixed time range:
Example:
from tecton import SlidingWindowSeries
@stream_feature_view(
sources=[transactions],
entities=[user],
mode="spark_sql",
aggregation_interval=timedelta(minutes=5),
aggregations=[
Aggregation(
function="sum",
column="amt",
time_window=SlidingWindowSeries(
start=timedelta(days=-7),
stop=timedelta(days=-3),
step_size=timedelta(days=1),
window_size=timedelta(days=2),
),
)
],
)
def user_transaction_amount_averages(transactions):
return f"""
SELECT user_id, timestamp, amt
FROM {transactions}
"""
The feature defined above produces several hopping time window aggregations for any given point of time. For instance, at feature request time, the example above will produce an array of the following time window aggregations:
- Start:
Now - 5 days
. End:Now - 3 days
- Start:
Now - 6 days
. End:Now - 4 days
- Start:
Now - 7 days
. End:Now - 5 days
- Start:
Now - 8 days
. End:Now - 6 days
You configure the produced series with the following parameters:
window_size
: What is the fixed length of the time window to aggregate the datastep_size
: This is the hopping intervalstart
: This defines the start of the time window seriesend
: This defines the end of the time window series