Unit Tests
If you are using Spark and you will be running unit tests, the Tecton CLI must be installed using one of the following commands:
- To install with Pyspark 3.1:
pip install 'tecton[pyspark]'
- To install with Pyspark 3.2:
pip install 'tecton[pyspark3.2]'
- To install with Pyspark 3.3:
pip install tecton pyspark==3.3
Overview​
Unit tests are stored in feature repositories, in files whose path matches the
pattern **/tests/*.py
.
Tests run when the following commands are executed:
-
tecton apply
: Runs the tests and applies the repo if the tests pass. -
tecton plan
: Runs the tests and shows the changes that would be made to the repo if the changes were applied. -
tecton test
: Runs the tests, only.
On-Demand Feature View Unit Test​
Testing a On-Demand Feature View is straightforward, all that we need is the
On-Demand Feature View and a test file located in a tests
directory.
For example, let's say I have a feature view that determines if a transaction amount is high:
from tecton import RequestSource, on_demand_feature_view
from tecton.types import Field, Float64, Int64
from pyspark.sql.types import StructType, LongType
import pandas
# Define the request schema
transaction_request = RequestSource(schema=[Field("amount", Float64)])
# Define the output schema
output_schema = [Field("transaction_amount_is_high", Int64)]
# This On-Demand Feature View evaluates a transaction amount and declares it as "high", if it's higher than 10,000
@on_demand_feature_view(
sources=[transaction_request],
mode='pandas',
schema=output_schema,
description='Whether the transaction amount is considered high (over $10000)'
)
def transaction_amount_is_high(transaction_request: pandas.DataFrame):
import pandas as pd
df = pd.DataFrame()
df['transaction_amount_is_high'] = (transaction_request['amount'] >= 10000).astype('int64')
return df
With the above feature view, we can define the unit test that mocks up some sample inputs, and asserts that we're getting the expected result.
### tests/transaction_amount_is_high.py ###
from fraud.features.on_demand_feature_views.transaction_amount_is_high import transaction_amount_is_high
import pandas as pd
from pandas.testing import assert_frame_equal
# Testing the 'transaction_amount_is_high' feature which depends on request data ('amount') as input
def test_transaction_amount_is_high():
transaction_request = pd.DataFrame({'amount': [124, 10001, 34235436234]})
actual = transaction_amount_is_high.run(transaction_request=transaction_request)
expected = pd.DataFrame({'transaction_amount_is_high': [0, 1, 1]})
assert_frame_equal(actual, expected)
Spark Feature View Unit Test​
Creating a unit test in a PySpark or Spark SQL feature view is similar to the
above example, except that we also need to install the Java Development Kit
(JDK) locally and provide a SparkSession
in the test code.
Installing the JDK​
Unit tests using a PySpark or Spark SQL feature view require JDK version 8 (u201 or later) or JDK version 11 to run.
As noted on the Oracle web site, older versions of the JDK are provided to help developers debug issues in older systems. They are not updated with the latest security patches and are not recommended for use in production.
Unit tests using a PySpark or Spark SQL feature view are not supported in environments that have Databricks Connect installed.
Install the JDK and then set the JAVA_HOME
environment variable.
For example, let's say I have a feature view that determines if a user has good credit:
### user_has_good_credit.py ###
from tecton import batch_feature_view, FilteredSource
from fraud.entities import user
from fraud.data_sources.credit_scores_batch import credit_scores_batch
from datetime import datetime, timedelta
@batch_feature_view(
sources=[FilteredSource(source=credit_scores_batch)],
entities=[user],
mode='spark_sql',
online=True,
offline=True,
feature_start_time=datetime(2021, 1, 1),
batch_schedule=timedelta(days=1),
ttl=timedelta(days=120)
)
def user_has_good_credit(credit_scores):
return f'''
SELECT
user_id,
IF (credit_score > 670, 1, 0) as user_has_good_credit,
timestamp
FROM
{credit_scores}
'''
Because this is a Spark SQL feature view, we'll need a SparkSession to test.
Tecton provides the tecton_pytest_spark_session
pytest
fixture. This fixture creates a
SparkSession.
Finally, we can define the actual unit test that mocks up some sample inputs, and asserts that we're getting the expected result.
import datetime
import pyspark
from fraud.features.batch_feature_views.user_has_good_credit import user_has_good_credit
def test_monthly_impression_count(tecton_pytest_spark_session):
mock_data = [
('user_id1', "2020-10-28 05:02:11", 700),
('user_id2', "2020-10-28 05:02:11", 650)
]
input_df = tecton_pytest_spark_session.createDataFrame(mock_data, ['user_id', 'timestamp', 'credit_score'])
output = user_has_good_credit.run(tecton_pytest_spark_session, credit_scores=input_df)
output = output.toPandas()
vals = output.values.tolist()
expected = [['user_id1', 1, '2020-10-28 05:02:11'], ['user_id2', 0, '2020-10-28 05:02:11']]
assert vals == expected
Just like in the example above, this test will now run when we execute
tecton apply
, tecton plan
, or tecton test
.
Skip Tests​
Specifying the --skip-tests
flag when running tecton apply
, tecton plan
,
or tecton test
will skip execution of Tecton tests.