Create a data processing pipeline¶
Note
Don’t forget to check the tutorial FAQ if you run into problems, or ask the community for help if you need it!
This section explains the following:
How to create a Kedro node from a Python function
How to construct a Kedro pipeline from a set of nodes
How to run the pipeline
Note
If you are using the tutorial created by the spaceflights starter, you can omit the copy/paste steps below, but it is worth reviewing the files described.
Data processing pipeline¶
You will use the data to train a model to predict the price of shuttle hire, but before you get to train the model, you need to prepare the data for model building by combining the files to create a model input table.
You previously registered the raw datasets for your Kedro project, so you can now create nodes to preprocess two of the datasets, companies.csv
, and shuttles.xlsx
, to prepare the data for modelling.
Generate a new pipeline template¶
In the terminal, run the following command to generate a new pipeline for data processing:
kedro pipeline create data_processing
This command generates all the files you need for the pipeline:
Two python files within
src/kedro_tutorial/pipelines/data_processing
nodes.py
(for the node functions that form the data processing)pipeline.py
(to build the pipeline)
A yaml file:
conf/base/parameters/data_processing.yml
to define the parameters used when running the pipelineA folder for test code:
src/tests/pipelines/data_processing
__init__.py
files in the required folders to ensure that Python can import the pipeline
Add node functions¶
Open src/kedro_tutorial/pipelines/data_processing/nodes.py
and add the code below, which provides two functions (preprocess_companies
and preprocess_shuttles
) that each takes a raw DataFrame as input, convert the data in several columns to different types, and output a DataFrame containing the preprocessed data:
Click to expand
import pandas as pd
def _is_true(x: pd.Series) -> pd.Series:
return x == "t"
def _parse_percentage(x: pd.Series) -> pd.Series:
x = x.str.replace("%", "")
x = x.astype(float) / 100
return x
def _parse_money(x: pd.Series) -> pd.Series:
x = x.str.replace("$", "").str.replace(",", "")
x = x.astype(float)
return x
def preprocess_companies(companies: pd.DataFrame) -> pd.DataFrame:
"""Preprocesses the data for companies.
Args:
companies: Raw data.
Returns:
Preprocessed data, with `company_rating` converted to a float and
`iata_approved` converted to boolean.
"""
companies["iata_approved"] = _is_true(companies["iata_approved"])
companies["company_rating"] = _parse_percentage(companies["company_rating"])
return companies
def preprocess_shuttles(shuttles: pd.DataFrame) -> pd.DataFrame:
"""Preprocesses the data for shuttles.
Args:
shuttles: Raw data.
Returns:
Preprocessed data, with `price` converted to a float and `d_check_complete`,
`moon_clearance_complete` converted to boolean.
"""
shuttles["d_check_complete"] = _is_true(shuttles["d_check_complete"])
shuttles["moon_clearance_complete"] = _is_true(shuttles["moon_clearance_complete"])
shuttles["price"] = _parse_money(shuttles["price"])
return shuttles
Assemble nodes into the data processing pipeline¶
The next steps are to create a node for each function and to create a modular pipeline for data processing:
First, add import statements for your functions by adding them to the beginning of pipeline.py
:
from kedro.pipeline import Pipeline, node, pipeline
from .nodes import preprocess_companies, preprocess_shuttles
Next, add the following to src/kedro_tutorial/pipelines/data_processing/pipeline.py
, so the create_pipeline()
function is as follows:
Click to expand
def create_pipeline(**kwargs) -> Pipeline:
return pipeline(
[
node(
func=preprocess_companies,
inputs="companies",
outputs="preprocessed_companies",
name="preprocess_companies_node",
),
node(
func=preprocess_shuttles,
inputs="shuttles",
outputs="preprocessed_shuttles",
name="preprocess_shuttles_node",
),
]
)
Note that the inputs
statements for companies
and shuttles
refer to the datasets defined in conf/base/catalog.yml
. They are inputs to the preprocess_companies
and preprocess_shuttles
functions. Kedro uses the named node inputs (and outputs) to determine interdependencies between the nodes, and their execution order.
Test the example¶
Run the following command in your terminal window to test the node named preprocess_companies_node
:
kedro run --node=preprocess_companies_node
You should see output similar to the below:
Click to expand
[08/09/22 16:43:10] INFO Kedro project kedro-tutorial session.py:346
[08/09/22 16:43:11] INFO Loading data from 'companies' (CSVDataSet)... data_catalog.py:343
INFO Running node: preprocess_companies_node: node.py:327
preprocess_companies([companies]) -> [preprocessed_companies]
INFO Saving data to 'preprocessed_companies' (MemoryDataSet)... data_catalog.py:382
INFO Completed 1 out of 1 tasks sequential_runner.py:85
INFO Pipeline execution completed successfully. runner.py:89
INFO Loading data from 'preprocessed_companies' (MemoryDataSet)... data_catalog.py:343
You can run the preprocess_shuttles
node similarly. To test both nodes together as the complete data processing pipeline:
kedro run
You should see output similar to the following:
Click to expand
[08/09/22 16:45:46] INFO Kedro project kedro-tutorial session.py:346
INFO Loading data from 'companies' (CSVDataSet)... data_catalog.py:343
INFO Running node: preprocess_companies_node: node.py:327
preprocess_companies([companies]) -> [preprocessed_companies]
INFO Saving data to 'preprocessed_companies' (MemoryDataSet)... data_catalog.py:382
INFO Completed 1 out of 2 tasks sequential_runner.py:85
INFO Loading data from 'shuttles' (ExcelDataSet)... data_catalog.py:343
[08/09/22 16:46:08] INFO Running node: preprocess_shuttles_node: preprocess_shuttles([shuttles]) node.py:327
-> [preprocessed_shuttles]
INFO Saving data to 'preprocessed_shuttles' (MemoryDataSet)... data_catalog.py:382
INFO Completed 2 out of 2 tasks sequential_runner.py:85
INFO Pipeline execution completed successfully. runner.py:89
INFO Loading data from 'preprocessed_companies' (MemoryDataSet)... data_catalog.py:343
INFO Loading data from 'preprocessed_shuttles' (MemoryDataSet)... data_catalog.py:343
Optional: save the preprocessed data¶
Each of the nodes outputs a new dataset (preprocessed_companies
and preprocessed_shuttles
).
When Kedro runs the pipeline, it determines that neither dataset is registered in the data catalog, so it stores these as temporary datasets in memory as Python objects using the MemoryDataSet class. Once all nodes that depend on an temporary dataset have executed, the dataset is cleared and the Python garbage collector releases the memory.
If you prefer to save the preprocessed data to file, add the following to the end of conf/base/catalog.yml
(If you are using the tutorial created by the spaceflights starter, you can omit the copy/paste):
preprocessed_companies:
type: pandas.ParquetDataSet
filepath: data/02_intermediate/preprocessed_companies.pq
preprocessed_shuttles:
type: pandas.ParquetDataSet
filepath: data/02_intermediate/preprocessed_shuttles.pq
Adding the data to the catalog declares explicitly that Kedro should use pandas.ParquetDataSet instead of MemoryDataSet
. The Data Catalog automatically saves the datasets (in Parquet format) to the path specified next time the pipeline is run. There is no need to change any code in your preprocessing functions to accommodate this change.
We chose the Apache Parquet format for working with processed and typed data, and we recommend getting your data out of CSV as soon as possible. Parquet supports things like compression, partitioning and types out of the box. While you lose the ability to view the file as text, the benefits greatly outweigh the drawbacks.
Extend the data processing pipeline¶
The next step in the tutorial is to add another node for a function that joins together the three datasets into a single model input table. You’ll add some code for a function and node called create_model_input_table
, which Kedro processes as follows:
Kedro uses the
preprocessed_shuttles
,preprocessed_companies
, andreviews
datasets as inputsKedro saves the output as a dataset called
model_input_table
First, add the create_model_input_table()
function from the snippet below to src/kedro_tutorial/pipelines/data_processing/nodes.py
.
Click to expand
def create_model_input_table(
shuttles: pd.DataFrame, companies: pd.DataFrame, reviews: pd.DataFrame
) -> pd.DataFrame:
"""Combines all data to create a model input table.
Args:
shuttles: Preprocessed data for shuttles.
companies: Preprocessed data for companies.
reviews: Raw data for reviews.
Returns:
model input table.
"""
rated_shuttles = shuttles.merge(reviews, left_on="id", right_on="shuttle_id")
model_input_table = rated_shuttles.merge(
companies, left_on="company_id", right_on="id"
)
model_input_table = model_input_table.dropna()
return model_input_table
Add an import statement for create_model_input_table
at the top of src/kedro_tutorial/pipelines/data_processing/pipeline.py
:
from .nodes import create_model_input_table, preprocess_companies, preprocess_shuttles
Add the code below to include the new node in the pipeline:
node(
func=create_model_input_table,
inputs=["preprocessed_shuttles", "preprocessed_companies", "reviews"],
outputs="model_input_table",
name="create_model_input_table_node",
),
Optional: save the model input table¶
If you want the model input table data to be saved to file (in data/03_primary
) rather than used in memory, add an entry to conf/base/catalog.yml
:
model_input_table:
type: pandas.ParquetDataSet
filepath: data/03_primary/model_input_table.pq
Test the example¶
To test the progress of the example:
kedro run
You should see output similar to the following:
Click to expand
[08/09/22 17:00:54] INFO Kedro project kedro-tutorial session.py:346
[08/09/22 17:01:10] INFO Reached after_catalog_created hook plugin.py:17
INFO Loading data from 'companies' (CSVDataSet)... data_catalog.py:343
INFO Running node: preprocess_companies_node: node.py:327
preprocess_companies([companies]) -> [preprocessed_companies]
INFO Saving data to 'preprocessed_companies' (MemoryDataSet)... data_catalog.py:382
INFO Completed 1 out of 3 tasks sequential_runner.py:85
INFO Loading data from 'shuttles' (ExcelDataSet)... data_catalog.py:343
[08/09/22 17:01:25] INFO Running node: preprocess_shuttles_node: preprocess_shuttles([shuttles]) node.py:327
-> [preprocessed_shuttles]
INFO Saving data to 'preprocessed_shuttles' (MemoryDataSet)... data_catalog.py:382
INFO Completed 2 out of 3 tasks sequential_runner.py:85
INFO Loading data from 'preprocessed_shuttles' (MemoryDataSet)... data_catalog.py:343
INFO Loading data from 'preprocessed_companies' (MemoryDataSet)... data_catalog.py:343
INFO Loading data from 'reviews' (CSVDataSet)... data_catalog.py:343
INFO Running node: create_model_input_table_node: node.py:327
create_model_input_table([preprocessed_shuttles,preprocessed_companies,
reviews]) -> [model_input_table]
[08/09/22 17:01:28] INFO Saving data to 'model_input_table' (MemoryDataSet)... data_catalog.py:382
[08/09/22 17:01:29] INFO Completed 3 out of 3 tasks sequential_runner.py:85
INFO Pipeline execution completed successfully. runner.py:89
INFO Loading data from 'model_input_table' (MemoryDataSet)... data_catalog.py:343
Checkpoint¶
This is an excellent place to take a breath and summarise what you have done so far.
Photo by Malte Helmhold on Unsplash
Created a new project and installed dependencies
Added three datasets to the project and set up the Kedro Data Catalog
Created a data processing pipeline with three nodes to transform and merge the input datasets and create a model input table
The next step is to create the data science pipeline for spaceflight price prediction.