โก๏ธ Building Streaming Features
Real-time data can make all the difference for real-time models, but leveraging it can be quite the challenge.
With Tecton you can build millisecond fresh features using plain Python and without any complex streaming infrastructure! Best of all, you can test it all locally and iterate in a notebook to quickly train better models that operate consistently online and offline.
This tutorial assumes some basic familiarity with Tecton. If you are new to Tecton, we recommend first checking out the Quickstart Tutorial which walks through an end-to-end journey of building a real-time ML application with Tecton.
Most of this tutorial is intended to be run in a notebook. Some steps will explicitly note to run commands in your terminal.
Be sure to install the Tecton SDK before getting started. You will also need to install the following packages, used for reading data from S3:
pip install s3fs fsspec
In this tutorial we will:
- Create a streaming data source
- Define and test streaming features
- Query data online and offline
Before you start, run tecton login [you-org-account-name].tecton.ai
in your
CLI. Be sure to fill in your organization's Tecton account name.
Let's get started by running the code below!
import tecton
import pandas as pd
from datetime import datetime
from pprint import pprint
tecton.set_validation_mode("auto")
๐ Create a Stream Source for ingesting real-time dataโ
First, let's define a local Stream Source that supports ingesting real-time data. Once productionized, this will give us an online HTTP endpoint to push events to in real-time which Tecton will then transform into features for online inference.
As part of our Stream Source, we also register a historical log of the stream
via the batch_config
parameter. Tecton uses this historical log for backfills
and offline development.
Alternatively, you can have Tecton maintain this historical log for you! Simply
add the log_offline=True
parameter to the PushConfig
and omit the
batch_config
. With this setup Tecton will log all ingested events and use
those to backfill any features that use this source.
from tecton import StreamSource, PushConfig, FileConfig
from tecton.types import Field, String, Timestamp, Float64
transactions = StreamSource(
name="transactions",
stream_config=PushConfig(),
batch_config=FileConfig(
uri="s3://anonymous@tecton.ai.public/tutorials/fraud_demo/transactions/data.pq",
file_format="parquet",
timestamp_field="timestamp",
),
schema=[Field("user_id", String), Field("timestamp", Timestamp), Field("amt", Float64)],
)
๐ Test the new Stream Sourceโ
We can pull a range of offline data from a Stream Source's historical event log
using get_dataframe()
.
start = datetime(2023, 5, 1)
end = datetime(2023, 8, 1)
df = transactions.get_dataframe(start, end).to_pandas()
display(df)
user_id | transaction_id | category | amt | is_fraud | merchant | merch_lat | merch_long | timestamp | |
---|---|---|---|---|---|---|---|---|---|
0 | user_884240387242 | 3eb88afb219c9a10f5130d0b89a13451 | gas_transport | 68.23 | 0 | fraud_Kutch, Hermiston and Farrell | 42.71 | -78.3386 | 2023-06-20 10:26:41 |
1 | user_268514844966 | 72e23b9193f97c2ba654854a66890432 | misc_pos | 32.98 | 0 | fraud_Lehner, Reichert and Mills | 39.1536 | -122.364 | 2023-06-20 12:57:20 |
2 | user_722584453020 | db7a41ce2d16a4452c973418d9e544b1 | home | 4.5 | 0 | fraud_Koss, Hansen and Lueilwitz | 33.0332 | -105.746 | 2023-06-20 14:49:59 |
3 | user_337750317412 | edfc42f7bc4b86d8c142acefb88c4565 | misc_pos | 7.68 | 0 | fraud_Buckridge PLC | 40.6828 | -88.8084 | 2023-06-20 14:50:13 |
4 | user_934384811883 | 93d28b6d2e5afebf9c40304aa709ab29 | kids_pets | 68.97 | 1 | fraud_Lubowitz-Walter | 39.1443 | -96.125 | 2023-06-20 15:55:09 |
๐ฉโ๐ป Define and test streaming features locallyโ
Now that we have a Stream Source defined, we are ready to create some features.
Let's use this data source to create the following 3 features:
- A user's total transaction amount in the last 1 minute
- A user's total transaction amount in the last 1 hour
- A user's total transaction amount in the last 30 days
To build these features, we will define a Stream Feature View that consumes from
our transactions
Stream Source.
The Stream Feature View transformation operates on events in a Pandas Dataframe and can do any arbitrary projections, filters, or expressions as needed. It's just Python!
The Python transformation runs before the aggregations so you can transform data as needed before it is aggregated.
from tecton import Entity, stream_feature_view, Aggregation
from datetime import datetime, timedelta
user = Entity(name="user", join_keys=["user_id"])
@stream_feature_view(
source=transactions,
entities=[user],
mode="pandas",
schema=[Field("user_id", String), Field("timestamp", Timestamp), Field("amt", Float64)],
aggregations=[
Aggregation(function="sum", column="amt", time_window=timedelta(minutes=1)),
Aggregation(function="sum", column="amt", time_window=timedelta(hours=1)),
Aggregation(function="sum", column="amt", time_window=timedelta(days=30)),
],
)
def user_transaction_amount_totals(transactions):
return transactions[["user_id", "timestamp", "amt"]]
๐งช Test features interactivelyโ
Now that we've defined and validated our Feature View, we can use
get_historical_features
to produce a range of feature values and check out the
feature data.
These features are calculated against the Stream Source's historical event log.
start = datetime(2022, 1, 1)
end = datetime(2022, 2, 1)
df = user_transaction_amount_totals.get_historical_features(start_time=start, end_time=end).to_pandas()
display(df)
user_id | amt_sum_1m_continuous | amt_sum_1h_continuous | amt_sum_30d_continuous | timestamp | _effective_timestamp | |
---|---|---|---|---|---|---|
0 | user_268308151877 | 108.64 | 108.64 | 590.93 | 2022-01-01 18:08:24 | 2022-01-02 00:00:00 |
1 | user_268308151877 | 91.75 | 91.75 | 682.68 | 2022-01-04 17:57:30 | 2022-01-05 00:00:00 |
2 | user_268308151877 | 4.29 | 4.29 | 686.97 | 2022-01-05 08:39:52 | 2022-01-06 00:00:00 |
3 | user_268308151877 | 5.14 | 5.14 | 682.77 | 2022-01-12 08:03:12 | 2022-01-13 00:00:00 |
4 | user_268308151877 | 54.39 | 54.39 | 737.16 | 2022-01-12 18:05:31 | 2022-01-13 00:00:00 |