Create a data science pipeline¶
Note
Don’t forget to check the tutorial FAQ if you run into problems, or ask the community for help if you need it!
This section explains the following:
How to add a second Kedro pipeline for data science code that extends the default project pipeline
How to ‘slice’ the project and run just part of the default pipeline
(Optional) How to make a modular pipeline
(Optional) How to specify the way the pipeline nodes are run: sequentially or in parallel
Note
If you are using the tutorial created by the spaceflights starter, you can omit the copy/paste steps below, but it is worth reviewing the files described.
Data science pipeline¶
The data science pipeline uses the LinearRegression
implementation from the scikit-learn library.
Generate a new pipeline template¶
Run the following command to create the data_science
pipeline.
kedro pipeline create data_science
Add the following code to the src/kedro_tutorial/pipelines/data_science/nodes.py
file:
Click to expand
import logging
from typing import Dict, Tuple
import pandas as pd
from sklearn.linear_model import LinearRegression
from sklearn.metrics import r2_score
from sklearn.model_selection import train_test_split
def split_data(data: pd.DataFrame, parameters: Dict) -> Tuple:
"""Splits data into features and targets training and test sets.
Args:
data: Data containing features and target.
parameters: Parameters defined in parameters/data_science.yml.
Returns:
Split data.
"""
X = data[parameters["features"]]
y = data["price"]
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=parameters["test_size"], random_state=parameters["random_state"]
)
return X_train, X_test, y_train, y_test
def train_model(X_train: pd.DataFrame, y_train: pd.Series) -> LinearRegression:
"""Trains the linear regression model.
Args:
X_train: Training data of independent features.
y_train: Training data for price.
Returns:
Trained model.
"""
regressor = LinearRegression()
regressor.fit(X_train, y_train)
return regressor
def evaluate_model(
regressor: LinearRegression, X_test: pd.DataFrame, y_test: pd.Series
):
"""Calculates and logs the coefficient of determination.
Args:
regressor: Trained model.
X_test: Testing data of independent features.
y_test: Testing data for price.
"""
y_pred = regressor.predict(X_test)
score = r2_score(y_test, y_pred)
logger = logging.getLogger(__name__)
logger.info("Model has a coefficient R^2 of %.3f on test data.", score)
Configure the input parameters¶
You now need to add some parameters that are used by the DataCatalog
when the pipeline executes.
Add the following to conf/base/parameters/data_science.yml
:
model_options:
test_size: 0.2
random_state: 3
features:
- engines
- passenger_capacity
- crew
- d_check_complete
- moon_clearance_complete
- iata_approved
- company_rating
- review_scores_rating
Here, the parameters test_size
and random_state
are used as part of the train-test split, and features
gives the names of columns in the model input table to use as features.
More information about parameters is available in later documentation for advanced usage.
Assemble the data science pipeline¶
To create a modular pipeline for the price prediction model, replace the contents of src/kedro_tutorial/pipelines/data_science/pipeline.py
with the following:
Click to expand
from kedro.pipeline import Pipeline, node, pipeline
from .nodes import evaluate_model, split_data, train_model
def create_pipeline(**kwargs) -> Pipeline:
return pipeline(
[
node(
func=split_data,
inputs=["model_input_table", "params:model_options"],
outputs=["X_train", "X_test", "y_train", "y_test"],
name="split_data_node",
),
node(
func=train_model,
inputs=["X_train", "y_train"],
outputs="regressor",
name="train_model_node",
),
node(
func=evaluate_model,
inputs=["regressor", "X_test", "y_test"],
outputs=None,
name="evaluate_model_node",
),
]
)
Register the dataset¶
The next step is to register the dataset that will save the trained model, by adding the following definition to conf/base/catalog.yml
:
regressor:
type: pickle.PickleDataSet
filepath: data/06_models/regressor.pickle
versioned: true
By setting versioned
to true
, versioning is enabled for regressor
. This means that the pickled output of the regressor
is saved every time the pipeline runs, which stores the history of the models built using this pipeline. You can learn more in the Versioning section.
Test the pipelines¶
You can now instruct Kedro to run the default pipeline, which automatically executes the data processing and then data science pipeline in turn.
kedro run
You should see output similar to the following:
Click to expand
[08/09/22 16:56:00] INFO Kedro project kedro-tutorial session.py:346
INFO Loading data from 'companies' (CSVDataSet)... data_catalog.py:343
INFO Running node: preprocess_companies_node: node.py:327
preprocess_companies([companies]) -> [preprocessed_companies]
INFO Saving data to 'preprocessed_companies' (MemoryDataSet)... data_catalog.py:382
INFO Completed 1 out of 6 tasks sequential_runner.py:85
INFO Loading data from 'shuttles' (ExcelDataSet)... data_catalog.py:343
[08/09/22 16:56:15] INFO Running node: preprocess_shuttles_node: preprocess_shuttles([shuttles]) node.py:327
-> [preprocessed_shuttles]
INFO Saving data to 'preprocessed_shuttles' (MemoryDataSet)... data_catalog.py:382
INFO Completed 2 out of 6 tasks sequential_runner.py:85
INFO Loading data from 'preprocessed_shuttles' (MemoryDataSet)... data_catalog.py:343
INFO Loading data from 'preprocessed_companies' (MemoryDataSet)... data_catalog.py:343
INFO Loading data from 'reviews' (CSVDataSet)... data_catalog.py:343
INFO Running node: create_model_input_table_node: node.py:327
create_model_input_table([preprocessed_shuttles,preprocessed_companies,
reviews]) -> [model_input_table]
[08/09/22 16:56:18] INFO Saving data to 'model_input_table' (MemoryDataSet)... data_catalog.py:382
[08/09/22 16:56:19] INFO Completed 3 out of 6 tasks sequential_runner.py:85
INFO Loading data from 'model_input_table' (MemoryDataSet)... data_catalog.py:343
INFO Loading data from 'params:model_options' (MemoryDataSet)... data_catalog.py:343
INFO Running node: split_data_node: node.py:327
split_data([model_input_table,params:model_options]) ->
[X_train,X_test,y_train,y_test]
INFO Saving data to 'X_train' (MemoryDataSet)... data_catalog.py:382
INFO Saving data to 'X_test' (MemoryDataSet)... data_catalog.py:382
INFO Saving data to 'y_train' (MemoryDataSet)... data_catalog.py:382
INFO Saving data to 'y_test' (MemoryDataSet)... data_catalog.py:382
INFO Completed 4 out of 6 tasks sequential_runner.py:85
INFO Loading data from 'X_train' (MemoryDataSet)... data_catalog.py:343
INFO Loading data from 'y_train' (MemoryDataSet)... data_catalog.py:343
INFO Running node: train_model_node: train_model([X_train,y_train]) -> node.py:327
[regressor]
[08/09/22 16:56:20] INFO Saving data to 'regressor' (PickleDataSet)... data_catalog.py:382
INFO Completed 5 out of 6 tasks sequential_runner.py:85
INFO Loading data from 'regressor' (PickleDataSet)... data_catalog.py:343
INFO Loading data from 'X_test' (MemoryDataSet)... data_catalog.py:343
INFO Loading data from 'y_test' (MemoryDataSet)... data_catalog.py:343
INFO Running node: evaluate_model_node: node.py:327
evaluate_model([regressor,X_test,y_test]) -> None
INFO Model has a coefficient R^2 of 0.462 on test data. nodes.py:55
INFO Completed 6 out of 6 tasks sequential_runner.py:85
INFO Pipeline execution completed successfully. runner.py:89
As you can see we successfully ran both the data_processing
and data_science
pipelines, generating a model and evaluating it.
Slice a pipeline¶
When you created your project with kedro new
, one of the files generated was src/<project_name>/pipeline_registry.py
which constructs the __default__
pipeline from every pipeline in the project, so you do not need to manually instruct Kedro to run each pipeline.
However, there may be occasions when you want to run just part of the default pipeline. For example, you could skip data_processing
execution and run only the data_science
pipeline to tune the hyperparameters of the price prediction model.
You can ‘slice’ the pipeline and specify just the portion you want to run by using the --pipeline
command line option. For example, to only run the pipeline named data_science
(as labelled automatically in register_pipelines
), execute the following command:
kedro run --pipeline=data_science
Note
This will only work if you have persisted the input to the data_science
pipeline, which is the model_input_table
. This was an optional step in the previous chapter.
There are a range of options to run sections of the default pipeline as described in the pipeline slicing documentation and the kedro run
CLI documentation.
Optional: modular pipelines¶
In many typical Kedro projects, a single (“main”) pipeline increases in complexity as the project evolves. To keep your project fit for purpose, we recommend that you create modular pipelines, which are logically isolated and can be reused. You can instantiate a modular pipeline multiple times as a “template” pipeline that can run with different inputs/outputs/parameters.
Modular pipelines are easier to develop, test and maintain. They are reusable within the same codebase but also portable across projects via micro-packaging. This is a scalable way to use Kedro, and will change how you think about Kedro pipelines.
Extend the project with namespacing and a modular pipeline¶
We first add some namespaces to the modelling component of the data science pipeline to instantiate it as a template with different parameters for an active_modelling_pipeline
and a candidate_modelling_pipeline
in order to test the model using different combinations of features.
Note
This is optional code so is not provided in the spaceflights starter. Unlike the rest of the tutorial, if you want to see this in action, you need to copy and paste the code as instructed.
Update your catalog to add namespaces to the outputs of each instance. Replace the
regressor
key with the following two new dataset keys in theconf/base/catalog.yml
file:
active_modelling_pipeline.regressor:
type: pickle.PickleDataSet
filepath: data/06_models/regressor_active.pickle
versioned: true
candidate_modelling_pipeline.regressor:
type: pickle.PickleDataSet
filepath: data/06_models/regressor_candidate.pickle
versioned: true
Update the parameters file for the data science pipeline in
conf/base/parameters/data_science.yml
to replace the existing contents formodel_options
with the following for the two instances of the template pipeline:
active_modelling_pipeline:
model_options:
test_size: 0.2
random_state: 3
features:
- engines
- passenger_capacity
- crew
- d_check_complete
- moon_clearance_complete
- iata_approved
- company_rating
- review_scores_rating
candidate_modelling_pipeline:
model_options:
test_size: 0.2
random_state: 8
features:
- engines
- passenger_capacity
- crew
- review_scores_rating
Replace the code in
pipelines/data_science/pipeline.py
with the snippet below:
Click to expand
from kedro.pipeline import Pipeline, node
from kedro.pipeline.modular_pipeline import pipeline
from .nodes import evaluate_model, split_data, train_model
def create_pipeline(**kwargs) -> Pipeline:
pipeline_instance = pipeline(
[
node(
func=split_data,
inputs=["model_input_table", "params:model_options"],
outputs=["X_train", "X_test", "y_train", "y_test"],
name="split_data_node",
),
node(
func=train_model,
inputs=["X_train", "y_train"],
outputs="regressor",
name="train_model_node",
),
node(
func=evaluate_model,
inputs=["regressor", "X_test", "y_test"],
outputs=None,
name="evaluate_model_node",
),
]
)
ds_pipeline_1 = pipeline(
pipe=pipeline_instance,
inputs="model_input_table",
namespace="active_modelling_pipeline",
)
ds_pipeline_2 = pipeline(
pipe=pipeline_instance,
inputs="model_input_table",
namespace="candidate_modelling_pipeline",
)
return ds_pipeline_1 + ds_pipeline_2
After executing kedro run
, you should see output as follows:
Click to expand
[11/02/22 10:41:06] WARNING /Users/<username>/opt/anaconda3/envs/py38/lib/python3.8/site-packages/plotly/graph_objects/ warnings.py:109
__init__.py:288: DeprecationWarning: distutils Version classes are deprecated. Use
packaging.version instead.
if LooseVersion(ipywidgets.__version__) >= LooseVersion("7.0.0"):
[11/02/22 10:41:07] INFO Kedro project kedro-tutorial session.py:340
[11/02/22 10:41:08] INFO Loading data from 'companies' (CSVDataSet)... data_catalog.py:343
INFO Running node: preprocess_companies_node: preprocess_companies([companies]) -> node.py:327
[preprocessed_companies]
INFO Saving data to 'preprocessed_companies' (ParquetDataSet)... data_catalog.py:382
INFO Completed 1 out of 9 tasks sequential_runner.py:85
INFO Loading data from 'shuttles' (ExcelDataSet)... data_catalog.py:343
[11/02/22 10:41:13] INFO Running node: preprocess_shuttles_node: preprocess_shuttles([shuttles]) -> node.py:327
[preprocessed_shuttles]
WARNING /Users/<username>/Documents/kedro-projects/kedro-tutorial/src/kedro_tutorial/pipelines/data warnings.py:109
_processing/nodes.py:19: FutureWarning: The default value of regex will change from True to
False in a future version. In addition, single character regular expressions will *not* be
treated as literal strings when regex=True.
x = x.str.replace("$", "").str.replace(",", "")
INFO Saving data to 'preprocessed_shuttles' (ParquetDataSet)... data_catalog.py:382
INFO Completed 2 out of 9 tasks sequential_runner.py:85
INFO Loading data from 'preprocessed_shuttles' (ParquetDataSet)... data_catalog.py:343
INFO Loading data from 'preprocessed_companies' (ParquetDataSet)... data_catalog.py:343
INFO Loading data from 'reviews' (CSVDataSet)... data_catalog.py:343
INFO Running node: create_model_input_table_node: node.py:327
create_model_input_table([preprocessed_shuttles,preprocessed_companies,reviews]) ->
[model_input_table]
^[[B[11/02/22 10:41:14] INFO Saving data to 'model_input_table' (ParquetDataSet)... data_catalog.py:382
[11/02/22 10:41:15] INFO Completed 3 out of 9 tasks sequential_runner.py:85
INFO Loading data from 'model_input_table' (ParquetDataSet)... data_catalog.py:343
INFO Loading data from 'params:active_modelling_pipeline.model_options' (MemoryDataSet)... data_catalog.py:343
INFO Running node: split_data_node: node.py:327
split_data([model_input_table,params:active_modelling_pipeline.model_options]) ->
[active_modelling_pipeline.X_train,active_modelling_pipeline.X_test,active_modelling_pipeline.y_t
rain,active_modelling_pipeline.y_test]
INFO Saving data to 'active_modelling_pipeline.X_train' (MemoryDataSet)... data_catalog.py:382
INFO Saving data to 'active_modelling_pipeline.X_test' (MemoryDataSet)... data_catalog.py:382
INFO Saving data to 'active_modelling_pipeline.y_train' (MemoryDataSet)... data_catalog.py:382
INFO Saving data to 'active_modelling_pipeline.y_test' (MemoryDataSet)... data_catalog.py:382
INFO Completed 4 out of 9 tasks sequential_runner.py:85
INFO Loading data from 'model_input_table' (ParquetDataSet)... data_catalog.py:343
INFO Loading data from 'params:candidate_modelling_pipeline.model_options' (MemoryDataSet)... data_catalog.py:343
INFO Running node: split_data_node: node.py:327
split_data([model_input_table,params:candidate_modelling_pipeline.model_options]) ->
[candidate_modelling_pipeline.X_train,candidate_modelling_pipeline.X_test,candidate_modelling_pip
eline.y_train,candidate_modelling_pipeline.y_test]
INFO Saving data to 'candidate_modelling_pipeline.X_train' (MemoryDataSet)... data_catalog.py:382
INFO Saving data to 'candidate_modelling_pipeline.X_test' (MemoryDataSet)... data_catalog.py:382
INFO Saving data to 'candidate_modelling_pipeline.y_train' (MemoryDataSet)... data_catalog.py:382
INFO Saving data to 'candidate_modelling_pipeline.y_test' (MemoryDataSet)... data_catalog.py:382
INFO Completed 5 out of 9 tasks sequential_runner.py:85
INFO Loading data from 'active_modelling_pipeline.X_train' (MemoryDataSet)... data_catalog.py:343
INFO Loading data from 'active_modelling_pipeline.y_train' (MemoryDataSet)... data_catalog.py:343
INFO Running node: train_model_node: node.py:327
train_model([active_modelling_pipeline.X_train,active_modelling_pipeline.y_train]) ->
[active_modelling_pipeline.regressor]
INFO Saving data to 'active_modelling_pipeline.regressor' (PickleDataSet)... data_catalog.py:382
INFO Completed 6 out of 9 tasks sequential_runner.py:85
INFO Loading data from 'candidate_modelling_pipeline.X_train' (MemoryDataSet)... data_catalog.py:343
INFO Loading data from 'candidate_modelling_pipeline.y_train' (MemoryDataSet)... data_catalog.py:343
INFO Running node: train_model_node: node.py:327
train_model([candidate_modelling_pipeline.X_train,candidate_modelling_pipeline.y_train]) ->
[candidate_modelling_pipeline.regressor]
INFO Saving data to 'candidate_modelling_pipeline.regressor' (PickleDataSet)... data_catalog.py:382
INFO Completed 7 out of 9 tasks sequential_runner.py:85
INFO Loading data from 'active_modelling_pipeline.regressor' (PickleDataSet)... data_catalog.py:343
INFO Loading data from 'active_modelling_pipeline.X_test' (MemoryDataSet)... data_catalog.py:343
INFO Loading data from 'active_modelling_pipeline.y_test' (MemoryDataSet)... data_catalog.py:343
INFO Running node: evaluate_model_node: node.py:327
evaluate_model([active_modelling_pipeline.regressor,active_modelling_pipeline.X_test,active_model
ling_pipeline.y_test]) -> None
INFO Model has a coefficient R^2 of 0.462 on test data. nodes.py:60
INFO Completed 8 out of 9 tasks sequential_runner.py:85
INFO Loading data from 'candidate_modelling_pipeline.regressor' (PickleDataSet)... data_catalog.py:343
INFO Loading data from 'candidate_modelling_pipeline.X_test' (MemoryDataSet)... data_catalog.py:343
INFO Loading data from 'candidate_modelling_pipeline.y_test' (MemoryDataSet)... data_catalog.py:343
INFO Running node: evaluate_model_node: node.py:327
evaluate_model([candidate_modelling_pipeline.regressor,candidate_modelling_pipeline.X_test,candid
ate_modelling_pipeline.y_test]) -> None
INFO Model has a coefficient R^2 of 0.449 on test data. nodes.py:60
INFO Completed 9 out of 9 tasks sequential_runner.py:85
INFO Pipeline execution completed successfully.
How it works: the modular pipeline()
wrapper¶
The import you added to the code introduces the pipeline wrapper, which enables you to instantiate multiple instances of pipelines with static structure, but dynamic inputs/outputs/parameters:
from kedro.pipeline.modular_pipeline import pipeline
The pipeline()
wrapper method takes the following arguments:
Keyword argument |
Description |
---|---|
|
The |
|
Any overrides provided to this instance of the underlying wrapped |
|
Any overrides provided to this instance of the underlying wrapped |
|
Any overrides provided to this instance of the underlying wrapped |
|
The namespace that will be encapsulated by this pipeline instance |
You can see this snippet as part of the code you added to the example:
...
ds_pipeline_1 = pipeline(
pipe=pipeline_instance,
inputs="model_input_table",
namespace="active_modelling_pipeline",
)
ds_pipeline_2 = pipeline(
pipe=pipeline_instance,
inputs="model_input_table",
namespace="candidate_modelling_pipeline",
)
We instantiate the template_pipeline twice but pass in different parameters. The pipeline_instance
variable is the template pipeline, and ds_pipeline_1
and ds_pipeline_2
are the two separately parameterised instantiations.
So let’s go through in detail how those namespaces affect our catalog references:
All
inputs
andoutputs
within the nodes of ourds_pipeline_1
have theactive_modelling_pipeline
prefix:params:model_options
turns intoactive_modelling_pipeline.params:model_options
X_train
turns intoactive_modelling_pipeline.X_train
X_test
turns intoactive_modelling_pipeline.X_test
, and so on
There are a separate set of parameters for
ds_pipeline_2
with thecandidate_modelling_pipeline
prefix:params:model_options
turns intocandidate_modelling_pipeline.params:model_options
X_train
turns intocandidate_modelling_pipeline.X_train
X_test
turns intocandidate_modelling_pipeline.X_test
, and so on
However, model_input_table
does not get parameterised as it needs to be shared between instances, so is frozen outside the scope of the namespace wrappers.
This renders as follows using kedro viz
(hover over the datasets to see their full path) :
Optional: Kedro runners¶
There are three different Kedro runners that can run the pipeline:
SequentialRunner
- runs your nodes sequentially; once a node has completed its task then the next one starts.ParallelRunner
- runs your nodes in parallel; independent nodes are able to run at the same time, which is more efficient when there are independent branches in your pipeline and enables you to take advantage of multiple CPU cores.ThreadRunner
- runs your nodes in parallel, similarly toParallelRunner
, but uses multithreading instead of multiprocessing.
By default, Kedro uses a SequentialRunner
, which is instantiated when you execute kedro run
from the command line. If you decide to use ParallelRunner
, ThreadRunner
or a custom runner, you can do so through the --runner
flag as follows:
kedro run --runner=ParallelRunner
kedro run --runner=ThreadRunner
kedro run --runner=module.path.to.my.runner
Note
ParallelRunner
performs task parallelisation via multiprocessing, while ThreadRunner
is intended for use with remote execution engines such as Spark and Dask.
You can find out more about the runners Kedro provides, and how to create your own, in the pipeline documentation about runners.