Create a data science pipeline¶
This section explains the following:
How to add a second Kedro pipeline for data science code that extends the default project pipeline
How to ‘slice’ the project to run just part of the entire pipeline
(Optional) How to make a modular pipeline
(Optional) How to specify the way the pipeline nodes are run: sequentially or in parallel
Data science nodes¶
The data science pipeline uses the LinearRegression
implementation from the scikit-learn library.
The data science pipeline is made up of the following:
Two python files within
src/spaceflights/pipelines/data_science
nodes.py
(for the node functions that form the data processing)pipeline.py
(to build the pipeline)
A yaml file:
conf/base/parameters/data_science.yml
to define the parameters used when running the pipeline__init__.py
files in the required folders to ensure that Python can import the pipeline
First, take a look at the functions for the data science nodes in src/spaceflights/pipelines/data_science/nodes.py
:
Click to expand
import logging
from typing import Dict, Tuple
import pandas as pd
from sklearn.linear_model import LinearRegression
from sklearn.metrics import r2_score
from sklearn.model_selection import train_test_split
def split_data(data: pd.DataFrame, parameters: Dict) -> Tuple:
"""Splits data into features and targets training and test sets.
Args:
data: Data containing features and target.
parameters: Parameters defined in parameters/data_science.yml.
Returns:
Split data.
"""
X = data[parameters["features"]]
y = data["price"]
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=parameters["test_size"], random_state=parameters["random_state"]
)
return X_train, X_test, y_train, y_test
def train_model(X_train: pd.DataFrame, y_train: pd.Series) -> LinearRegression:
"""Trains the linear regression model.
Args:
X_train: Training data of independent features.
y_train: Training data for price.
Returns:
Trained model.
"""
regressor = LinearRegression()
regressor.fit(X_train, y_train)
return regressor
def evaluate_model(
regressor: LinearRegression, X_test: pd.DataFrame, y_test: pd.Series
):
"""Calculates and logs the coefficient of determination.
Args:
regressor: Trained model.
X_test: Testing data of independent features.
y_test: Testing data for price.
"""
y_pred = regressor.predict(X_test)
score = r2_score(y_test, y_pred)
logger = logging.getLogger(__name__)
logger.info("Model has a coefficient R^2 of %.3f on test data.", score)
Input parameter configuration¶
Parameters that are used by the DataCatalog
when the pipeline executes are stored in conf/base/parameters/data_science.yml
:
Click to expand
model_options:
test_size: 0.2
random_state: 3
features:
- engines
- passenger_capacity
- crew
- d_check_complete
- moon_clearance_complete
- iata_approved
- company_rating
- review_scores_rating
Here, the parameters test_size
and random_state
are used as part of the train-test split, and features
gives the names of columns in the model input table to use as features.
More information about parameters is available in the configuration documentation.
Model registration¶
The following definition in conf/base/catalog.yml
registers the dataset that saves the trained model:
regressor:
type: pickle.PickleDataSet
filepath: data/06_models/regressor.pickle
versioned: true
By setting versioned
to true
, versioning is enabled for regressor
. This means that the pickled output of the regressor
is saved every time the pipeline runs, which stores the history of the models built using this pipeline. You can learn more in the Versioning section.
Data science pipeline¶
The data science pipeline is defined in src/spaceflights/pipelines/data_science/pipeline.py
:
Click to expand
from kedro.pipeline import Pipeline, node, pipeline
from .nodes import evaluate_model, split_data, train_model
def create_pipeline(**kwargs) -> Pipeline:
return pipeline(
[
node(
func=split_data,
inputs=["model_input_table", "params:model_options"],
outputs=["X_train", "X_test", "y_train", "y_test"],
name="split_data_node",
),
node(
func=train_model,
inputs=["X_train", "y_train"],
outputs="regressor",
name="train_model_node",
),
node(
func=evaluate_model,
inputs=["regressor", "X_test", "y_test"],
outputs=None,
name="evaluate_model_node",
),
]
)
Test the pipelines¶
When you created your project with kedro new
, one of the files generated was src/<package_name>/pipeline_registry.py
which constructs a __default__
pipeline that includes every pipeline in the project.
This means that you do not need to manually instruct Kedro to run each pipeline, but can execute the default pipeline, which consists of the data processing and then data science pipeline in turn.
kedro run
You should see output similar to the following:
Click to expand
INFO Loading data from 'companies' (CSVDataSet)... data_catalog.py:343
INFO Running node: preprocess_companies_node: node.py:327
preprocess_companies([companies]) -> [preprocessed_companies]
INFO Saving data to 'preprocessed_companies' (MemoryDataSet)... data_catalog.py:382
INFO Completed 1 out of 6 tasks sequential_runner.py:85
INFO Loading data from 'shuttles' (ExcelDataSet)... data_catalog.py:343
[08/09/22 16:56:15] INFO Running node: preprocess_shuttles_node: preprocess_shuttles([shuttles]) node.py:327
-> [preprocessed_shuttles]
INFO Saving data to 'preprocessed_shuttles' (MemoryDataSet)... data_catalog.py:382
INFO Completed 2 out of 6 tasks sequential_runner.py:85
INFO Loading data from 'preprocessed_shuttles' (MemoryDataSet)... data_catalog.py:343
INFO Loading data from 'preprocessed_companies' (MemoryDataSet)... data_catalog.py:343
INFO Loading data from 'reviews' (CSVDataSet)... data_catalog.py:343
INFO Running node: create_model_input_table_node: node.py:327
create_model_input_table([preprocessed_shuttles,preprocessed_companies,
reviews]) -> [model_input_table]
[08/09/22 16:56:18] INFO Saving data to 'model_input_table' (MemoryDataSet)... data_catalog.py:382
[08/09/22 16:56:19] INFO Completed 3 out of 6 tasks sequential_runner.py:85
INFO Loading data from 'model_input_table' (MemoryDataSet)... data_catalog.py:343
INFO Loading data from 'params:model_options' (MemoryDataSet)... data_catalog.py:343
INFO Running node: split_data_node: node.py:327
split_data([model_input_table,params:model_options]) ->
[X_train,X_test,y_train,y_test]
INFO Saving data to 'X_train' (MemoryDataSet)... data_catalog.py:382
INFO Saving data to 'X_test' (MemoryDataSet)... data_catalog.py:382
INFO Saving data to 'y_train' (MemoryDataSet)... data_catalog.py:382
INFO Saving data to 'y_test' (MemoryDataSet)... data_catalog.py:382
INFO Completed 4 out of 6 tasks sequential_runner.py:85
INFO Loading data from 'X_train' (MemoryDataSet)... data_catalog.py:343
INFO Loading data from 'y_train' (MemoryDataSet)... data_catalog.py:343
INFO Running node: train_model_node: train_model([X_train,y_train]) -> node.py:327
[regressor]
[08/09/22 16:56:20] INFO Saving data to 'regressor' (PickleDataSet)... data_catalog.py:382
INFO Completed 5 out of 6 tasks sequential_runner.py:85
INFO Loading data from 'regressor' (PickleDataSet)... data_catalog.py:343
INFO Loading data from 'X_test' (MemoryDataSet)... data_catalog.py:343
INFO Loading data from 'y_test' (MemoryDataSet)... data_catalog.py:343
INFO Running node: evaluate_model_node: node.py:327
evaluate_model([regressor,X_test,y_test]) -> None
INFO Model has a coefficient R^2 of 0.462 on test data. nodes.py:55
INFO Completed 6 out of 6 tasks sequential_runner.py:85
INFO Pipeline execution completed successfully. runner.py:89
As you can see, the data_processing
and data_science
pipelines ran successfully, generated a model and evaluated it.
Slice a pipeline¶
There may be occasions when you want to run just part of the default pipeline. For example, you could skip data_processing
execution and run only the data_science
pipeline to tune the hyperparameters of the price prediction model.
You can ‘slice’ the pipeline and specify just the portion you want to run by using the --pipeline
option. For example, to only run the pipeline named data_science
(as labelled automatically in register_pipelines
), execute the following command:
kedro run --pipeline=data_science
There are a range of options to run sections of the default pipeline as described in the pipeline slicing documentation and the kedro run
CLI documentation.
Modular pipelines¶
In many typical Kedro projects, a single (“main”) pipeline increases in complexity as the project evolves. To keep your project fit for purpose, we recommend that you create modular pipelines, which are logically isolated and can be reused. You can instantiate a modular pipeline multiple times as a “template” pipeline that can run with different inputs/outputs/parameters.
Modular pipelines are easier to develop, test and maintain. They are reusable within the same codebase but also portable across projects via micro-packaging as a scalable way to use Kedro pipelines.
Optional: Extend the project with namespacing and a modular pipeline¶
This is optional code so is not provided in the spaceflights starter. If you want to see this in action, you need to copy and paste the code as instructed.
First, add namespaces to the modelling component of the data science pipeline to instantiate it as a template with different parameters for an active_modelling_pipeline
and a candidate_modelling_pipeline
to test the model using different combinations of features.
Update your catalog to add namespaces to the outputs of each instance. Replace the
regressor
key with the following two new dataset keys in theconf/base/catalog.yml
file:
Click to expand
active_modelling_pipeline.regressor:
type: pickle.PickleDataSet
filepath: data/06_models/regressor_active.pickle
versioned: true
candidate_modelling_pipeline.regressor:
type: pickle.PickleDataSet
filepath: data/06_models/regressor_candidate.pickle
versioned: true
Update the parameters file for the data science pipeline in
conf/base/parameters/data_science.yml
to replace the existing contents formodel_options
with the following for the two instances of the template pipeline:
Click to expand
active_modelling_pipeline:
model_options:
test_size: 0.2
random_state: 3
features:
- engines
- passenger_capacity
- crew
- d_check_complete
- moon_clearance_complete
- iata_approved
- company_rating
- review_scores_rating
candidate_modelling_pipeline:
model_options:
test_size: 0.2
random_state: 8
features:
- engines
- passenger_capacity
- crew
- review_scores_rating
Replace the code in
pipelines/data_science/pipeline.py
with the snippet below:
Click to expand
from kedro.pipeline import Pipeline, node
from kedro.pipeline.modular_pipeline import pipeline
from .nodes import evaluate_model, split_data, train_model
def create_pipeline(**kwargs) -> Pipeline:
pipeline_instance = pipeline(
[
node(
func=split_data,
inputs=["model_input_table", "params:model_options"],
outputs=["X_train", "X_test", "y_train", "y_test"],
name="split_data_node",
),
node(
func=train_model,
inputs=["X_train", "y_train"],
outputs="regressor",
name="train_model_node",
),
node(
func=evaluate_model,
inputs=["regressor", "X_test", "y_test"],
outputs=None,
name="evaluate_model_node",
),
]
)
ds_pipeline_1 = pipeline(
pipe=pipeline_instance,
inputs="model_input_table",
namespace="active_modelling_pipeline",
)
ds_pipeline_2 = pipeline(
pipe=pipeline_instance,
inputs="model_input_table",
namespace="candidate_modelling_pipeline",
)
return ds_pipeline_1 + ds_pipeline_2
Execute
kedro run
from the terminal. You should see output as follows:
Click to expand
[11/02/22 10:41:08] INFO Loading data from 'companies' (CSVDataSet)... data_catalog.py:343
INFO Running node: preprocess_companies_node: preprocess_companies([companies]) -> node.py:327
[preprocessed_companies]
INFO Saving data to 'preprocessed_companies' (ParquetDataSet)... data_catalog.py:382
INFO Completed 1 out of 9 tasks sequential_runner.py:85
INFO Loading data from 'shuttles' (ExcelDataSet)... data_catalog.py:343
[11/02/22 10:41:13] INFO Running node: preprocess_shuttles_node: preprocess_shuttles([shuttles]) -> node.py:327
[preprocessed_shuttles]
INFO Saving data to 'preprocessed_shuttles' (ParquetDataSet)... data_catalog.py:382
INFO Completed 2 out of 9 tasks sequential_runner.py:85
INFO Loading data from 'preprocessed_shuttles' (ParquetDataSet)... data_catalog.py:343
INFO Loading data from 'preprocessed_companies' (ParquetDataSet)... data_catalog.py:343
INFO Loading data from 'reviews' (CSVDataSet)... data_catalog.py:343
INFO Running node: create_model_input_table_node: node.py:327
create_model_input_table([preprocessed_shuttles,preprocessed_companies,reviews]) ->
[model_input_table]
^[[B[11/02/22 10:41:14] INFO Saving data to 'model_input_table' (ParquetDataSet)... data_catalog.py:382
[11/02/22 10:41:15] INFO Completed 3 out of 9 tasks sequential_runner.py:85
INFO Loading data from 'model_input_table' (ParquetDataSet)... data_catalog.py:343
INFO Loading data from 'params:active_modelling_pipeline.model_options' (MemoryDataSet)... data_catalog.py:343
INFO Running node: split_data_node: node.py:327
split_data([model_input_table,params:active_modelling_pipeline.model_options]) ->
[active_modelling_pipeline.X_train,active_modelling_pipeline.X_test,active_modelling_pipeline.y_t
rain,active_modelling_pipeline.y_test]
INFO Saving data to 'active_modelling_pipeline.X_train' (MemoryDataSet)... data_catalog.py:382
INFO Saving data to 'active_modelling_pipeline.X_test' (MemoryDataSet)... data_catalog.py:382
INFO Saving data to 'active_modelling_pipeline.y_train' (MemoryDataSet)... data_catalog.py:382
INFO Saving data to 'active_modelling_pipeline.y_test' (MemoryDataSet)... data_catalog.py:382
INFO Completed 4 out of 9 tasks sequential_runner.py:85
INFO Loading data from 'model_input_table' (ParquetDataSet)... data_catalog.py:343
INFO Loading data from 'params:candidate_modelling_pipeline.model_options' (MemoryDataSet)... data_catalog.py:343
INFO Running node: split_data_node: node.py:327
split_data([model_input_table,params:candidate_modelling_pipeline.model_options]) ->
[candidate_modelling_pipeline.X_train,candidate_modelling_pipeline.X_test,candidate_modelling_pip
eline.y_train,candidate_modelling_pipeline.y_test]
INFO Saving data to 'candidate_modelling_pipeline.X_train' (MemoryDataSet)... data_catalog.py:382
INFO Saving data to 'candidate_modelling_pipeline.X_test' (MemoryDataSet)... data_catalog.py:382
INFO Saving data to 'candidate_modelling_pipeline.y_train' (MemoryDataSet)... data_catalog.py:382
INFO Saving data to 'candidate_modelling_pipeline.y_test' (MemoryDataSet)... data_catalog.py:382
INFO Completed 5 out of 9 tasks sequential_runner.py:85
INFO Loading data from 'active_modelling_pipeline.X_train' (MemoryDataSet)... data_catalog.py:343
INFO Loading data from 'active_modelling_pipeline.y_train' (MemoryDataSet)... data_catalog.py:343
INFO Running node: train_model_node: node.py:327
train_model([active_modelling_pipeline.X_train,active_modelling_pipeline.y_train]) ->
[active_modelling_pipeline.regressor]
INFO Saving data to 'active_modelling_pipeline.regressor' (PickleDataSet)... data_catalog.py:382
INFO Completed 6 out of 9 tasks sequential_runner.py:85
INFO Loading data from 'candidate_modelling_pipeline.X_train' (MemoryDataSet)... data_catalog.py:343
INFO Loading data from 'candidate_modelling_pipeline.y_train' (MemoryDataSet)... data_catalog.py:343
INFO Running node: train_model_node: node.py:327
train_model([candidate_modelling_pipeline.X_train,candidate_modelling_pipeline.y_train]) ->
[candidate_modelling_pipeline.regressor]
INFO Saving data to 'candidate_modelling_pipeline.regressor' (PickleDataSet)... data_catalog.py:382
INFO Completed 7 out of 9 tasks sequential_runner.py:85
INFO Loading data from 'active_modelling_pipeline.regressor' (PickleDataSet)... data_catalog.py:343
INFO Loading data from 'active_modelling_pipeline.X_test' (MemoryDataSet)... data_catalog.py:343
INFO Loading data from 'active_modelling_pipeline.y_test' (MemoryDataSet)... data_catalog.py:343
INFO Running node: evaluate_model_node: node.py:327
evaluate_model([active_modelling_pipeline.regressor,active_modelling_pipeline.X_test,active_model
ling_pipeline.y_test]) -> None
INFO Model has a coefficient R^2 of 0.462 on test data. nodes.py:60
INFO Completed 8 out of 9 tasks sequential_runner.py:85
INFO Loading data from 'candidate_modelling_pipeline.regressor' (PickleDataSet)... data_catalog.py:343
INFO Loading data from 'candidate_modelling_pipeline.X_test' (MemoryDataSet)... data_catalog.py:343
INFO Loading data from 'candidate_modelling_pipeline.y_test' (MemoryDataSet)... data_catalog.py:343
INFO Running node: evaluate_model_node: node.py:327
evaluate_model([candidate_modelling_pipeline.regressor,candidate_modelling_pipeline.X_test,candid
ate_modelling_pipeline.y_test]) -> None
INFO Model has a coefficient R^2 of 0.449 on test data. nodes.py:60
INFO Completed 9 out of 9 tasks sequential_runner.py:85
INFO Pipeline execution completed successfully.
How it works: the modular pipeline()
wrapper¶
The import you added to the code introduces the pipeline wrapper, which enables you to instantiate multiple instances of pipelines with static structure, but dynamic inputs/outputs/parameters:
from kedro.pipeline.modular_pipeline import pipeline
The pipeline()
wrapper method takes the following arguments:
Keyword argument |
Description |
---|---|
|
The |
|
Any overrides provided to this instance of the underlying wrapped |
|
Any overrides provided to this instance of the underlying wrapped |
|
Any overrides provided to this instance of the underlying wrapped |
|
The namespace that will be encapsulated by this pipeline instance |
You can see this snippet as part of the code you added to the example:
Click to expand
...
ds_pipeline_1 = pipeline(
pipe=pipeline_instance,
inputs="model_input_table",
namespace="active_modelling_pipeline",
)
ds_pipeline_2 = pipeline(
pipe=pipeline_instance,
inputs="model_input_table",
namespace="candidate_modelling_pipeline",
)
The code instantiates the template_pipeline twice but passes in different parameters. The pipeline_instance
variable is the template pipeline, and ds_pipeline_1
and ds_pipeline_2
are the two separately parameterised instantiations.
How do namespaces affect parameters?¶
All inputs
and outputs
within the nodes of the ds_pipeline_1
have the active_modelling_pipeline
prefix:
params:model_options
turns intoactive_modelling_pipeline.params:model_options
X_train
turns intoactive_modelling_pipeline.X_train
X_test
turns intoactive_modelling_pipeline.X_test
, and so on
There are a separate set of parameters for ds_pipeline_2
with the candidate_modelling_pipeline
prefix:
params:model_options
turns intocandidate_modelling_pipeline.params:model_options
X_train
turns intocandidate_modelling_pipeline.X_train
X_test
turns intocandidate_modelling_pipeline.X_test
, and so on
However, model_input_table
does not get parameterised as it needs to be shared between instances, so is frozen outside the scope of the namespace wrappers.
This renders as follows using kedro viz
(hover over the datasets to see their full path) :
Optional: Kedro runners¶
There are three different Kedro runners that can run the pipeline:
SequentialRunner
- runs nodes sequentially; once a node has completed its task then the next one starts.ParallelRunner
- runs nodes in parallel; independent nodes are able to run at the same time, which is more efficient when there are independent branches in your pipeline and enables you to take advantage of multiple CPU cores.ThreadRunner
- runs nodes in parallel, similarly toParallelRunner
, but uses multithreading instead of multiprocessing.
By default, Kedro uses a SequentialRunner
, which is instantiated when you execute kedro run
from the terminal. If you decide to use ParallelRunner
, ThreadRunner
or a custom runner, you can do so through the --runner
flag as follows:
kedro run --runner=ParallelRunner
kedro run --runner=ThreadRunner
kedro run --runner=module.path.to.my.runner
ParallelRunner
performs task parallelisation via multiprocessing, while ThreadRunner
is intended for use with remote execution engines such as Spark and Dask.
You can find out more about the runners Kedro provides, and how to create your own, in the pipeline documentation about runners. atasets to work with different data formats (including CSV, Excel, and Parquet)