Test a Kedro project¶
It is important to test our Kedro projects to validate and verify that our nodes and pipelines behave as we expect them to. In this section we look at some example tests for the spaceflights project.
This section explains the following:
How to test a Kedro node
How to test a Kedro pipeline
Testing best practices
This section does not cover:
Automating your tests - instead read our automated testing documentation.
More advanced features of testing, including mocking and parameterising tests.
Writing tests for Kedro nodes: Unit testing¶
Kedro expects node functions to be pure functions; a pure function is one whose output follows solely from its inputs, without any observable side effects. Testing these functions checks that a node will behave as expected - for a given set of input values, a node will produce the expected output. These tests are referred to as unit tests.
Let us explore what this looks like in practice. Consider the node function split_data
defined in the data science pipeline:
Click to expand
def split_data(data: pd.DataFrame, parameters: dict[str, Any]) -> Tuple:
"""Splits data into features and targets training and test sets.
Args:
data: Data containing features and target.
parameters: Parameters defined in parameters_data_science.yml.
Returns:
Split data.
"""
X = data[parameters["features"]]
y = data["price"]
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=parameters["test_size"], random_state=parameters["random_state"]
)
return X_train, X_test, y_train, y_test
The function takes a pandas DataFrame
and dictionary of parameters as input, and splits the input data into four different data objects as per the parameters provided. We recommend following pytest’s anatomy of a test which breaks a test down into four steps: arrange, act, assert, and cleanup. For this specific function, these steps will be:
Arrange: Prepare the inputs
data
andparameters
.Act: Make a call to
split_data
and capture the outputs withX_train
,X_test
,Y_train
, andY_test
.Assert: Ensure that the length of the outputs are the same as the expected lengths
The cleanup step becomes necessary in a test when any of the previous steps make modifications that may influence other tests - e.g. by modifying a file used as input for several tests. This is not the case for the example tests below, and so the cleanup step is omitted.
Remember to import the function being tested and any necessary modules at the top of the file.
When we put these steps together, we have the following test:
Click to expand
# NOTE: This example test is yet to be refactored.
# A complete version is available under the testing best practices section.
import pandas as pd
from spaceflights.pipelines.data_science.nodes import split_data
def test_split_data():
# Arrange
dummy_data = pd.DataFrame(
{
"engines": [1, 2, 3],
"crew": [4, 5, 6],
"passenger_capacity": [5, 6, 7],
"price": [120, 290, 30],
}
)
dummy_parameters = {
"model_options": {
"test_size": 0.2,
"random_state": 3,
"features": ["engines", "passenger_capacity", "crew"],
}
}
# Act
X_train, X_test, y_train, y_test = split_data(dummy_data, dummy_parameters["model_options"])
# Assert
assert len(X_train) == 2
assert len(y_train) == 2
assert len(X_test) == 1
assert len(y_test) == 1
This test is an example of positive testing - it tests that a valid input produces the expected output. The inverse, testing that an invalid output will be appropriately rejected, is called negative testing and is equally as important.
Using the same steps as above, we can write the following test to validate an error is thrown when price data is not available:
Click to expand
# NOTE: This example test is yet to be refactored.
# A complete version is available under the testing best practices section.
import pandas as pd
from spaceflights.pipelines.data_science.nodes import split_data
def test_split_data_missing_price():
# Arrange
dummy_data = pd.DataFrame(
{
"engines": [1, 2, 3],
"crew": [4, 5, 6],
"passenger_capacity": [5, 6, 7],
# Note the missing price data
}
)
dummy_parameters = {
"model_options": {
"test_size": 0.2,
"random_state": 3,
"features": ["engines", "passenger_capacity", "crew"],
}
}
with pytest.raises(KeyError) as e_info:
# Act
X_train, X_test, y_train, y_test = split_data(dummy_data, dummy_parameters["model_options"])
# Assert
assert "price" in str(e_info.value) # checks that the error is about the missing price data
Writing tests for Kedro pipelines: Integration testing¶
Writing tests for each node ensures each node will behave as expected when run individually. However, we must also consider how nodes in a pipeline interact with each other - this is called integration testing. Integration testing combines individual units as a group and checks whether they communicate, share data, and work together as expected. Let us look at this in practice.
Consider the data science pipeline as a whole:
Click to expand
from kedro.pipeline import Pipeline, node, pipeline
from .nodes import evaluate_model, split_data, train_model
def create_pipeline(**kwargs) -> Pipeline:
return pipeline(
[
node(
func=split_data,
inputs=["model_input_table", "params:model_options"],
outputs=["X_train", "X_test", "y_train", "y_test"],
name="split_data_node",
),
node(
func=train_model,
inputs=["X_train", "y_train"],
outputs="regressor",
name="train_model_node",
),
node(
func=evaluate_model,
inputs=["regressor", "X_test", "y_test"],
outputs=None,
name="evaluate_model_node",
),
]
)
The pipeline takes a pandas DataFrame
and dictionary of parameters as input, splits the data in accordance to the parameters, and uses it to train and evaluate a regression model. With an integration test, we can validate that this sequence of nodes runs as expected.
From earlier in this tutorial we know a successful pipeline run will conclude with the message Pipeline execution completed successfully.
being logged. To validate this is being logged in our tests we make use of pytest’s caplog
feature to capture logs generated during the execution.
As we did with our unit tests, we break this down into several steps:
Arrange: Prepare the runner and its inputs
pipeline
andcatalog
, and any additional test setup.Act: Run the pipeline.
Assert: Ensure a successful run message was logged.
When we put this together, we get the following test:
Click to expand
# NOTE: This example test is yet to be refactored.
# A complete version is available under the testing best practices section.
import logging
import pandas as pd
from kedro.io import DataCatalog
from kedro.runner import SequentialRunner
from spaceflights.pipelines.data_science import create_pipeline as create_ds_pipeline
def test_data_science_pipeline(caplog): # Note: caplog is passed as an argument
# Arrange pipeline
pipeline = create_ds_pipeline()
# Arrange data catalog
catalog = DataCatalog()
dummy_data = pd.DataFrame(
{
"engines": [1, 2, 3],
"crew": [4, 5, 6],
"passenger_capacity": [5, 6, 7],
"price": [120, 290, 30],
}
)
duummy_parameters = {
"model_options": {
"test_size": 0.2,
"random_state": 3,
"features": ["engines", "passenger_capacity", "crew"],
}
}
catalog.add_feed_dict(
{
"model_input_table" : dummy_data,
"params:model_options": dummy_parameters["model_options"],
}
)
# Arrange the log testing setup
caplog.set_level(logging.DEBUG, logger="kedro") # Ensure all logs produced by Kedro are captured
successful_run_msg = "Pipeline execution completed successfully."
# Act
SequentialRunner().run(pipeline, catalog)
# Assert
assert successful_run_msg in caplog.text
Testing best practices¶
Where to write your tests¶
We recommend creating a tests
directory within the root directory of your project. The structure should mirror the directory structure of /src/spaceflights
:
src
│ ...
└───spaceflights
│ └───pipelines
│ └───data_science
│ │ __init__.py
│ │ nodes.py
│ │ pipeline.py
│
tests
| ...
└───pipelines
│ └───data_science
│ │ test_data_science_pipeline.py
Using fixtures¶
In our tests, we can see that dummy_data
and dummy_parameters
have been defined three times with (mostly) the same values. Instead, we can define these outside of our tests as pytest fixtures:
Click to expand
import pytest
@pytest.fixture
def dummy_data():
return pd.DataFrame(
{
"engines": [1, 2, 3],
"crew": [4, 5, 6],
"passenger_capacity": [5, 6, 7],
"price": [120, 290, 30],
}
)
@pytest.fixture
def dummy_parameters():
parameters = {
"model_options": {
"test_size": 0.2,
"random_state": 3,
"features": ["engines", "passenger_capacity", "crew"],
}
}
return parameters
We can then access these through the test arguments.
def test_split_data(dummy_data, dummy_parameters):
...
Pipeline slicing¶
In the test test_data_science_pipeline
we test the data science pipeline, as currently defined, can be run successfully. However, as pipelines are not static, this test is not robust. Instead we should be specific with how we define the pipeline to be tested; we do this by using pipeline slicing to specify the pipeline’s start and end:
def test_data_science_pipeline(self):
# Arrange pipeline
pipeline = create_pipeline().from_nodes("split_data_node").to_nodes("evaluate_model_node")
...
This ensures that the test will still perform as designed, even with the addition of more nodes to the pipeline.
After incorporating these testing practices, our test file test_data_science.py
becomes:
# tests/pipelines/test_data_science_pipeline.py
import logging
import pandas as pd
import pytest
from kedro.io import DataCatalog
from kedro.runner import SequentialRunner
from spaceflights.pipelines.data_science import create_pipeline as create_ds_pipeline
from spaceflights.pipelines.data_science.nodes import split_data
@pytest.fixture
def dummy_data():
return pd.DataFrame(
{
"engines": [1, 2, 3],
"crew": [4, 5, 6],
"passenger_capacity": [5, 6, 7],
"price": [120, 290, 30],
}
)
@pytest.fixture
def dummy_parameters():
parameters = {
"model_options": {
"test_size": 0.2,
"random_state": 3,
"features": ["engines", "passenger_capacity", "crew"],
}
}
return parameters
def test_split_data(dummy_data, dummy_parameters):
X_train, X_test, y_train, y_test = split_data(
dummy_data, dummy_parameters["model_options"]
)
assert len(X_train) == 2
assert len(y_train) == 2
assert len(X_test) == 1
assert len(y_test) == 1
def test_split_data_missing_price(dummy_data, dummy_parameters):
dummy_data_missing_price = dummy_data.drop(columns="price")
with pytest.raises(KeyError) as e_info:
X_train, X_test, y_train, y_test = split_data(dummy_data_missing_price, dummy_parameters["model_options"])
assert "price" in str(e_info.value)
def test_data_science_pipeline(caplog, dummy_data, dummy_parameters):
pipeline = (
create_ds_pipeline()
.from_nodes("split_data_node")
.to_nodes("evaluate_model_node")
)
catalog = DataCatalog()
catalog.add_feed_dict(
{
"model_input_table" : dummy_data,
"params:model_options": dummy_parameters["model_options"],
}
)
caplog.set_level(logging.DEBUG, logger="kedro")
successful_run_msg = "Pipeline execution completed successfully."
SequentialRunner().run(pipeline, catalog)
assert successful_run_msg in caplog.text
Run your tests¶
First, confirm that your project has been installed locally. This can be done by navigating to the project root and running the following command:
pip install -e .
This step allows pytest to accurately resolve the import statements in your test files.
NOTE: The option
-e
installs an editable version of your project, allowing you to make changes to the project files without needing to re-install them each time.
Ensure you have pytest
installed. Please see our automated testing documentation for more information on getting set up with pytest.
To run your tests, run pytest
from within your project’s root directory.
cd <project_root>
pytest tests/pipelines/test_data_science.py
You should see the following output in your shell.
============================= test session starts ==============================
...
collected 2 items
tests/pipelines/test_data_science.py .. [100%]
============================== 2 passed in 4.38s ===============================
This output indicates that all tests ran successfully in the file tests/pipelines/test_data_science.py
.