Batch Feature View
A Batch Feature View defines transformations against one or many Batch Data Sources (e.g. Snowflake, Redshift, BigQuery, S3 etc.) and computes features on a schedule. They can also run automatic backfills when they’re first created.
Feature Examples:
- determining if a user's credit score is over a pre-defined threshold
- counting the total number of transactions over a time window in batch
- batch ingesting pre-computed feature values from an existing batch source
Batch Feature Transformations
Batch Feature Views can define row-level transformations for filtering and projection. They can also define aggregations using Tecton's Aggregation Engine or plain SQL.
We will look at them one by one.
Row-Level Transformations
Batch Feature Views can define row-level filtering or projection transformations using Pandas, Snowflake SQL, or Spark
Here is a very simple row-level transformation example against a single data source.
- Python (Private Preview)
- Snowflake
- Spark
from tecton import batch_feature_view
from tecton.types import Field, String, Timestamp, Float64
from datetime import datetime, timedelta
@batch_feature_view(
sources=[transactions],
entities=[user],
mode="pandas",
batch_schedule=timedelta(days=1),
schema=[Field("user_id", String), Field("timestamp", Timestamp), Field("amt", Float64)],
)
def user_last_transaction_amount(transactions):
return transactions[["user_id", "timestamp", "amt"]]
from tecton import batch_feature_view
@batch_feature_view(sources=[transactions], entities=[user], mode="snowflake_sql")
def user_last_transaction_amount(transactions):
return f"""
SELECT
USER_ID,
AMOUNT,
TIMESTAMP
FROM
{transactions}
"""
from tecton import batch_feature_view
@batch_feature_view(sources=[transactions], entities=[user], mode="spark_sql")
def user_last_transaction_amount(transactions):
return f"""
SELECT
USER_ID,
AMOUNT,
TIMESTAMP
FROM
{transactions}
"""
Of course, you can always join multiple batch data sources together:
- Python (Private Preview)
- Snowflake
- Spark
from tecton import batch_feature_view, FilteredSource
from fraud.entities import user
from fraud.data_sources.fraud_users import fraud_users_batch
from fraud.data_sources.transactions import transactions_batch
from tecton.types import Field, String, Timestamp, Int64
from datetime import datetime, timedelta
# For every transaction, the following Feature View precomputes a feature that indicates
# whether a user was an adult as of the time of the transaction
@batch_feature_view(
sources=[FilteredSource(transactions_batch), fraud_users_batch],
entities=[user],
mode="pandas",
feature_start_time=datetime(2022, 5, 1),
batch_schedule=timedelta(days=1),
schema=[Field("user_id", String), Field("timestamp", Timestamp), Field("user_is_adult", Int64)],
description="Whether the user performing the transaction is over 18 years old.",
)
def transaction_user_is_adult(transactions_batch, fraud_users_batch):
fraud_users_batch["dob"] = pd.to_datetime(fraud_users_batch["dob"])
transactions_batch["timestamp"] = pd.to_datetime(transactions_batch["timestamp"])
joined_df = pd.merge(transactions_batch, fraud_users_batch, on="user_id")
joined_df["user_is_adult"] = ((joined_df["timestamp"] - joined_df["dob"]).dt.days > 18 * 365).astype(int)
return joined_df[["user_id", "timestamp", "user_is_adult"]]
from tecton import batch_feature_view, FilteredSource
from fraud.entities import user
from fraud.data_sources.fraud_users import fraud_users_batch
from fraud.data_sources.transactions import transactions_batch
from datetime import datetime, timedelta
# For every transaction, the following Feature View precomputes a feature that indicates
# whether a user was an adult as of the time of the transaction
@batch_feature_view(
sources=[FilteredSource(transactions_batch), fraud_users_batch],
entities=[user],
mode="snowflake_sql",
feature_start_time=datetime(2022, 5, 1),
batch_schedule=timedelta(days=1),
description="Whether the user performing the transaction is over 18 years old.",
)
def transaction_user_is_adult(transactions_batch, fraud_users_batch):
return f"""
select
timestamp,
t.user_id,
IFF (datediff(year, timestamp, to_date(dob)) > (18*365), 1, 0) as user_is_adult
from {transactions_batch} t join {fraud_users_batch} u on t.user_id=u.user_id
"""
from tecton import batch_feature_view, FilteredSource
from fraud.entities import user
from fraud.data_sources.fraud_users import fraud_users_batch
from fraud.data_sources.transactions import transactions_batch
from datetime import datetime, timedelta
# For every transaction, the following Feature View precomputes a feature that indicates
# whether a user was an adult as of the time of the transaction
@batch_feature_view(
sources=[FilteredSource(transactions_batch), fraud_users_batch],
entities=[user],
mode="spark_sql",
feature_start_time=datetime(2022, 5, 1),
batch_schedule=timedelta(days=1),
description="Whether the user performing the transaction is over 18 years old.",
)
def transaction_user_is_adult(transactions_batch, fraud_users_batch):
return f"""
select
timestamp,
t.user_id,
IF (datediff(timestamp, to_date(dob)) > (18*365), 1, 0) as user_is_adult
from {transactions_batch} t join {fraud_users_batch} u on t.user_id=u.user_id
"""
Time-Window aggregations
Batch Feature Views can also run aggregation transformations using Tecton's Aggregation Engine:
from tecton import batch_feature_view, FilteredSource, Aggregation, TimeWindow
from fraud.entities import user
from fraud.data_sources.transactions import transactions_batch
from tecton.types import Field, String, Timestamp, Int64
from datetime import datetime, timedelta
@batch_feature_view(
sources=[FilteredSource(transactions_batch)],
entities=[user],
mode="pandas",
aggregation_interval=timedelta(days=1),
aggregations=[
Aggregation(column="transaction", function="count", time_window=TimeWindow(window_size=timedelta(days=1))),
Aggregation(column="transaction", function="count", time_window=TimeWindow(window_size=timedelta(days=30))),
Aggregation(column="transaction", function="count", time_window=TimeWindow(window_size=timedelta(days=90))),
],
schema=[Field("user_id", String), Field("timestamp", Timestamp), Field("transaction", Int64)],
online=True,
offline=True,
feature_start_time=datetime(2022, 5, 1),
description="User transaction totals over a series of time windows, updated daily.",
)
def user_transaction_counts(transactions):
transactions["transaction"] = 1
return transactions[["user_id", "transaction", "timestamp"]]
The aggregation_interval
parameter defines the interval by which the window
advances (e.g., 1 day, in the example above). Tecton updates feature values
whenever the aggregation interval elapses (e.g., daily, in the example above).
Custom ETL features (such as arbitrary SQL aggregations)
In general, if you can express your aggregation features using Tecton's Aggregation Engine, you should always do so because of the engine's performance & efficiency, simplicity and accuracy benefits.
However, in some cases, you may not want to use Tecton's Aggregation Engine. The most common reasons are:
- You are migrating an existing feature into Tecton and don't want to make any changes
- You need aggregation functions that Tecton's Engine doesn't support
- You want to precompute several nested layers of aggregations (e.g. calculate the standard-deviation of the max amount of the lifetime transaction number per country)
Here is a simple example:
- Python (Private Preview)
- Snowflake
- Spark
from tecton import batch_feature_view, FilteredSource, materialization_context
from fraud.entities import user
from fraud.data_sources.transactions import transactions_batch
from tecton.types import Field, String, Timestamp, Int64
from datetime import datetime, timedelta
@batch_feature_view(
sources=[FilteredSource(transactions_batch, start_time_offset=timedelta(days=-29))],
entities=[user],
mode="pandas",
feature_start_time=datetime(2022, 4, 1),
incremental_backfills=True,
batch_schedule=timedelta(days=1),
schema=[
Field("user_id", String),
Field("timestamp", Timestamp),
Field("distinct_merchant_transaction_count_30d", Int64),
],
ttl=timedelta(days=2),
description="How many transactions the user has made to distinct merchants in the last 30 days.",
)
def user_distinct_merchant_transaction_count_30d(transactions_batch, context=materialization_context()):
end_time = pd.to_datetime(context.end_time)
start_time = end_time - pd.Timedelta(days=30)
recent_transactions = transactions_batch[
(transactions_batch["timestamp"] >= start_time) & (transactions_batch["timestamp"] < end_time)
]
result = (
recent_transactions.groupby("user_id")
.agg(distinct_merchant_transaction_count_30d=("merchant", "nunique"))
.reset_index()
)
result["timestamp"] = end_time - pd.Timedelta(microseconds=1)
return result
from tecton import batch_feature_view, FilteredSource, materialization_context
from fraud.entities import user
from fraud.data_sources.transactions import transactions_batch
from datetime import datetime, timedelta
@batch_feature_view(
sources=[FilteredSource(transactions_batch, start_time_offset=timedelta(days=-29))],
entities=[user],
mode="snowflake_sql",
feature_start_time=datetime(2022, 4, 1),
incremental_backfills=True,
batch_schedule=timedelta(days=1),
ttl=timedelta(days=2),
description="How many transactions the user has made to distinct merchants in the last 30 days.",
)
def user_distinct_merchant_transaction_count_30d(transactions_batch, context=materialization_context()):
return f"""
SELECT
user_id,
TIMESTAMPADD(microsecond, -1, TO_TIMESTAMP('{context.end_time}')) as timestamp,
COUNT(DISTINCT merchant) AS distinct_merchant_transaction_count_30d
FROM {transactions_batch}
GROUP BY
user_id
"""
from tecton import batch_feature_view, FilteredSource, materialization_context
from fraud.entities import user
from fraud.data_sources.transactions import transactions_batch
from datetime import datetime, timedelta
@batch_feature_view(
sources=[FilteredSource(transactions_batch, start_time_offset=timedelta(days=-29))],
entities=[user],
mode="spark_sql",
feature_start_time=datetime(2022, 4, 1),
incremental_backfills=True,
batch_schedule=timedelta(days=1),
ttl=timedelta(days=2),
description="How many transactions the user has made to distinct merchants in the last 30 days.",
)
def user_distinct_merchant_transaction_count_30d(transactions_batch, context=materialization_context()):
return f"""
SELECT
user_id,
TO_TIMESTAMP("{context.end_time}") - INTERVAL 1 MICROSECOND as timestamp,
COUNT(DISTINCT merchant) AS distinct_merchant_transaction_count_30d
FROM {transactions_batch}
GROUP BY
user_id
"""
Please review this guide for more details on how to develop custom ETL features in a performant and cost-efficient way.
Materialization to the Online and Offline Feature Store
Online Materialization
You can easily make batch features available for low-latency online retrieval to
feed an online model. Simply set online=True
. If you want Tecton to backfill
the feature, please also define the feature_start_time
.
Offline Materialization
Tecton also supports offline materialization. This can speed up some expensive
queries considerably. When you set offline=True
, Tecton will materialize
offline feature data to an offline table according to the Feature View's
configured schedule and feature_start_time
.
If you don’t materialize Batch Feature Views offline, Tecton will execute your transformation directly against the upstream raw data sources when you use the Tecton SDK to generate offline data. Speaking in SQL terms, a Batch Feature View without offline materialization is simply a “View”. A Batch Feature View with offline materialization is a “Materialized View”.
Offline materialization has additional benefits including:
- Online-offline skew is minimized because the data in the online and offline store is updated at the same time.
- Offline feature data is saved so it is resilient to any losses of historical data upstream and training datasets can always be regenerated.
Materialization Job Scheduling Behavior
During the initial backfill of your feature, Tecton tries to minimize the number
of backfill jobs in order to drastically reduce the backfill costs. For example,
if you define a feature with a batch_schedule
of 1 day that needs to backfill
1 year worth of data, you will find that Tecton schedules just ~10 distinct
backfill jobs, rather than 365, as you may typically expect. You can modify
Tecton's backfill job splitting behavior by setting the
max_backfill_interval
parameter.
If you're used to common Data Engineering tools like Airflow, you may expect
Tecton to schedule one backfill job for every batch_schedule
interval in your
backfill period. For instance, you may expect a feature that has a daily
batch_schedule
, and that needs to backfilled for the past 1 year, to kick off
365 distinct backfill jobs. This can be very expensive. It is also common
practice and the default behavior of most off-the-shelf ETL solutions (like
Airflow). You can force Tecton to use this naive backfill mode by setting
incremental_backfill
to True. Please visit
this guide
that discusses a valid use case for this mode.
For every steady-state, forward fill, of your feature, Tecton will schedule exactly one materialization job.
Feature Data Timestamp Expectations
Every materialization run is expected to produce feature values for a specific time range. This time range is known as the “materialization window”. The materialization window is different for backfills and incremental runs:
- During the initial backfill of feature data to the Online and Offline Feature
Store, the materialization time window starts with
feature_start_time
and ends with Tecton’s “current wall clock time” at which the feature’s materialization is enabled. - On incremental runs, the materialization time window starts with the previous
run’s
start_time
and ends withstart_time + batch_schedule
.
Tecton only materializes feature values that fall within the materialization
time window. It automatically filters out unexpected feature values as shown
with the WHERE
clause below:
--Tecton applies this filter to the user-provided transformation
SELECT * FROM {batch_feature_view_transformation}
WHERE {timestamp_field} >= {start_time}
AND {timestamp_field} < {end_time}
The start time of the window is inclusive and the end time is exclusive. This
means that a feature value whose timestamp is exactly equal to the end_time
is
not part of the window.
Efficient Incremental Materialization
In many cases, incremental materialization runs do not need to process all of the input source's raw data.
For example, an incremental materialization run of a row-level transformation that processes raw data at midnight every day should only look at the event data of the past 24 hours, and not the entire event history.
Automatic filtering using FilteredSource
For convenience, Tecton offers a FilteredSource
class that automatically
pushes timestamp and partition filtering to the data source.
As a result, your Feature View transformation does not need to manually filter out raw data that's not required for the current materialization window.
Behind the scenes, Tecton will automatically filter the data source’s data based
on its timestamp_field
and, if applicable, its datetime_partition_columns
.
Here is an example that shows how to use the FilteredSource
in practice.
- Python (Private Preview)
- Snowflake
- Spark
from tecton import batch_feature_view, FilteredSource
from tecton.types import Field, String, Timestamp, Float64
from datetime import datetime, timedelta
@batch_feature_view(
sources=[FilteredSource(transactions)],
entities=[user],
mode="pandas",
online=True,
batch_schedule=timedelta(days=1),
schema=[Field("user_id", String), Field("timestamp", Timestamp), Field("amt", Float64)],
feature_start_time=datetime(2020, 10, 10),
offline=True,
)
def user_last_transaction_amount(transactions):
return transactions[["user_id", "timestamp", "amt"]]
from tecton import batch_feature_view, FilteredSource
from datetime import datetime, timedelta
@batch_feature_view(
sources=[FilteredSource(transactions)],
entities=[user],
mode="snowflake_sql",
online=True,
batch_schedule=timedelta(days=1),
feature_start_time=datetime(2020, 10, 10),
offline=True,
)
def user_last_transaction_amount(transactions):
return f"""
SELECT
USER_ID,
AMOUNT,
TIMESTAMP
FROM
{transactions}
"""
from tecton import batch_feature_view, FilteredSource
from datetime import datetime, timedelta
@batch_feature_view(
sources=[FilteredSource(transactions)],
entities=[user],
mode="spark_sql",
online=True,
batch_schedule=timedelta(days=1),
feature_start_time=datetime(2020, 10, 10),
offline=True,
)
def user_last_transaction_amount(transactions):
return f"""
SELECT
USER_ID,
AMOUNT,
TIMESTAMP
FROM
{transactions}
"""
By default, FilteredSource
filters for data between context.start_time
and
context.end_time
.
Manual filtering
If FilteredSource
isn't an option for you, you can manually filter for the raw
data needed to produce feature values on each run by leveraging a context
object that Tecton passes into the transformation function. context.start_time
and context.end_time
are equal to the expected materialization time window as
shown in the diagram below:
The example transformation below filters for the required raw data in the
WHERE
clause.
- Python (Private Preview)
- Snowflake
- Spark
from tecton import batch_feature_view, materialization_context
from tecton.types import Field, String, Timestamp, Float64
from datetime import datetime, timedelta
@batch_feature_view(
sources=[transactions],
entities=[user],
online=True,
mode="pandas",
batch_schedule=timedelta(days=1),
schema=[Field("user_id", String), Field("timestamp", Timestamp), Field("amt", Float64)],
feature_start_time=datetime(2020, 10, 10),
offline=True,
)
def user_last_transaction_amount(transactions, context=materialization_context()):
df = transactions[["user_id", "amt", "timestamp"]]
return df[(df["timestamp"] >= context.start_time) & (df["timestamp"] < context.end_time)]
from tecton import batch_feature_view, materialization_context
from datetime import datetime, timedelta
@batch_feature_view(
sources=[transactions],
entities=[user],
online=True,
mode="snowflake_sql",
batch_schedule=timedelta(days=1),
feature_start_time=datetime(2020, 10, 10),
offline=True,
)
def user_last_transaction_amount(transactions, context=materialization_context()):
return f"""
SELECT
USER_ID,
AMOUNT,
TIMESTAMP
FROM
{transactions}
WHERE TIMESTAMP >= TO_TIMESTAMP("{context.start_time}")
AND TIMESTAMP < TO_TIMESTAMP("{context.end_time}")
"""
from tecton import batch_feature_view, materialization_context
from datetime import datetime, timedelta
@batch_feature_view(
sources=[transactions],
entities=[user],
online=True,
mode="spark_sql",
batch_schedule=timedelta(days=1),
feature_start_time=datetime(2020, 10, 10),
offline=True,
)
def user_last_transaction_amount(transactions, context=materialization_context()):
return f"""
SELECT
USER_ID,
AMOUNT,
TIMESTAMP
FROM
{transactions}
WHERE TIMESTAMP >= TO_TIMESTAMP("{context.start_time}")
AND TIMESTAMP < TO_TIMESTAMP("{context.end_time}")
"""
In cases where you read from a time-partitioned data source, like a Glue table or partitioned data on S3, you typically will also want to filter by partition columns.
Late Arriving Data
By default, incremental materialization jobs for Batch Feature Views run
immediately at the end of the batch schedule period. To override this default,
set the data_delay
parameter, which is specified in the data source
configuration (the batch_config
object referenced in the BatchSource
object
used by the Batch Feature View). data_delay
configures how long jobs wait
after the end of the batch schedule period before starting. This is typically
used to ensure that all data has landed. For example, if a Batch Feature View
has a batch_schedule
of 1 day and its data source input has
data_delay=timedelta(hours=1)
set, then incremental batch materialization jobs
will run at 01:00 UTC.
If your upstream data delay is unpredictable, you can trigger materialization with an API call. Please follow these instructions
Full list of parameters
See the API reference for the full list of parameters.