tecton.transformation¶
-
tecton.
transformation
(mode, name=None, description=None, owner=None, tags=None)¶ Declares a Transformation that wraps a user function. Transformations are assembled in a pipeline function of a Feature View.
- Parameters
mode (
str
) – The mode for this transformation must be one of “spark_sql”, “pyspark”, “snowflake_sql”, “snowpark”, “athena”, “pandas” or “python”.name (
Optional
[str
]) – Unique, human friendly name that identifies the Transformation. Defaults to the function name.owner (
Optional
[str
]) – Owner name (typically the email of the primary maintainer).tags (
Optional
[Dict
[str
,str
]]) – Tags associated with this Tecton Object (key-value pairs of arbitrary metadata).
- Returns
A wrapped transformation
Examples of Spark SQL, PySpark, Pandas, and Python transformation declarations:
from tecton import transformation from pyspark.sql import DataFrame import pandas as pd # Create a Spark SQL transformation. @transformation(mode="spark_sql", description="Create new column by splitting the string in an existing column") def str_split(input_data, column_to_split, new_column_name, delimiter): return f''' SELECT *, split({column_to_split}, {delimiter}) AS {new_column_name} FROM {input_data} ''' # Create an Athena transformation. @transformation(mode="athena", description="Create new column by splitting the string in an existing column") def str_split(input_data, column_to_split, new_column_name, delimiter): return f''' SELECT *, split({column_to_split}, '{delimiter}') AS {new_column_name} FROM {input_data} ''' # Create a PySpark transformation. @transformation(mode="pyspark", description="Add a new column 'user_has_good_credit' if score is > 670") def user_has_good_credit_transformation(credit_scores): from pyspark.sql import functions as F (df = credit_scores.withColumn("user_has_good_credit", F.when(credit_scores["credit_score"] > 670, 1).otherwise(0)) return df.select("user_id", df["date"].alias("timestamp"), "user_has_good_credit") ) # Create a Pandas transformation. @transformation(mode="pandas", description="Whether the transaction amount is considered high (over $10000)") def transaction_amount_is_high(transaction_request): import pandas as pd df = pd.DataFrame() df['amount_is_high'] = (request['amount'] >= 10000).astype('int64') return df @transformation(mode="python", description="Whether the transaction amount is considered high (over $10000)") # Create a Python transformation. def transaction_amount_is_high(transaction_request): result = {} result['transaction_amount_is_high'] = int(transaction_request['amount'] >= 10000) return result