tecton.declarative.KinesisConfig¶
-
class
tecton.declarative.
KinesisConfig
(stream_name, region, post_processor, timestamp_field, initial_stream_position, watermark_delay_threshold=datetime.timedelta(days=1), deduplication_columns=None, options=None)¶ Configuration used to reference a Kinesis stream.
The KinesisConfig class is used to create a reference to an AWS Kinesis stream.
This class used as an input to a
StreamSource
’s parameterstream_config
. This class is not a Tecton Object: it is a grouping of parameters. Declaring this class alone will not register a data source. Instead, declare as part ofStreamSource
that takes this configuration class instance as a parameter.Methods
Instantiates a new KinesisConfig.
-
__init__
(stream_name, region, post_processor, timestamp_field, initial_stream_position, watermark_delay_threshold=datetime.timedelta(days=1), deduplication_columns=None, options=None)¶ Instantiates a new KinesisConfig.
- Parameters
stream_name (
str
) – Name of the Kinesis stream.region (
str
) – AWS region of the stream, e.g: “us-west-2”.post_processor – Python user defined function f(DataFrame) -> DataFrame that takes in raw Pyspark data source DataFrame and translates it to the DataFrame to be consumed by the Feature View. See an example of post_processor in the User Guide.
timestamp_field (
str
) – Name of the column containing timestamp for watermarking.initial_stream_position (
str
) – Initial position in stream, e.g: “latest” or “trim_horizon”. More information available in Spark Kinesis Documentation.watermark_delay_threshold (
timedelta
) – (Default: 24h) Watermark time interval, e.g: timedelta(hours=36), used by Spark Structured Streaming to account for late-arriving data. See: Productionizing a Stream.deduplication_columns (
Optional
[List
[str
]]) – Columns in the stream data that uniquely identify data records. Used for de-duplicating.options (
Optional
[Dict
[str
,str
]]) – A map of additional Spark readStream options
- Returns
A KinesisConfig class instance.
Example of a KinesisConfig declaration:
import pyspark from tecton import KinesisConfig # Define our deserialization raw stream translator def raw_data_deserialization(df:pyspark.sql.DataFrame) -> pyspark.sql.DataFrame: from pyspark.sql.functions import col, from_json, from_utc_timestamp from pyspark.sql.types import StructType, StringType payload_schema = ( StructType() .add('amount', StringType(), False) .add('isFraud', StringType(), False) .add('timestamp', StringType(), False) ) return ( df.selectExpr('cast (data as STRING) jsonData') .select(from_json('jsonData', payload_schema).alias('payload')) .select( col('payload.amount').cast('long').alias('amount'), col('payload.isFraud').cast('long').alias('isFraud'), from_utc_timestamp('payload.timestamp', 'UTC').alias('timestamp') ) ) # Declare KinesisConfig instance object that can be used as argument in `StreamSource` stream_config = KinesisConfig( stream_name='transaction_events', region='us-west-2', initial_stream_position='latest', timestamp_field='timestamp', post_processor=raw_data_deserialization, options={'roleArn': 'arn:aws:iam::472542229217:role/demo-cross-account-kinesis-ro'} )
-