Add Kedro features to a notebook¶
This page describes how to add Kedro features incrementally to a notebook.
It starts with a notebook example which does NOT use Kedro. It then explains how to convert portions of the code to use Kedro features while remaining runnable within a notebook. For that part of the example, you need to have set up Kedro.
NOTE: If you want to experiment with the code in a notebook, you can find it in the
notebook-example
folder on GitHub. Be sure to download the entire folder, or clone the entire repo, because theadd_kedro_to_spaceflights_notebook.ipynb
notebook relies upon files stored in thenotebook-example
folder.
Kedro spaceflights¶
The Kedro spaceflights tutorial introduces the basics of Kedro in a tutorial that runs as a Kedro project, that is, as a set of .py
files. The premise is as follows:
It is 2160, and the space tourism industry is booming. Globally, thousands of space shuttle companies take tourists to the Moon and back. You have been able to source data that lists the amenities offered in each space shuttle, customer reviews, and company information.
Project: You want to construct a model that predicts the price for each trip to the Moon and the corresponding return flight.
The notebook example¶
The full example code is given below. To run this, you will need
import pandas as pd
companies = pd.read_csv("data/companies.csv")
reviews = pd.read_csv("data/reviews.csv")
shuttles = pd.read_excel("data/shuttles.xlsx", engine="openpyxl")
# Data processing
companies["iata_approved"] = companies["iata_approved"] == "t"
companies["company_rating"] = (
companies["company_rating"].str.replace("%", "").astype(float)
)
shuttles["d_check_complete"] = shuttles["d_check_complete"] == "t"
shuttles["moon_clearance_complete"] = shuttles["moon_clearance_complete"] == "t"
shuttles["price"] = (
shuttles["price"].str.replace("$", "").str.replace(",", "").astype(float)
)
rated_shuttles = shuttles.merge(reviews, left_on="id", right_on="shuttle_id")
model_input_table = rated_shuttles.merge(companies, left_on="company_id", right_on="id")
model_input_table = model_input_table.dropna()
model_input_table.head()
# Model training
from sklearn.model_selection import train_test_split
X = model_input_table[
[
"engines",
"passenger_capacity",
"crew",
"d_check_complete",
"moon_clearance_complete",
"iata_approved",
"company_rating",
"review_scores_rating",
]
]
y = model_input_table["price"]
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3, random_state=3)
from sklearn.linear_model import LinearRegression
model = LinearRegression()
model.fit(X_train, y_train)
model.predict(X_test)
# Model evaluation
from sklearn.metrics import r2_score
y_pred = model.predict(X_test)
r2_score(y_test, y_pred)
Use Kedro for data processing¶
Even if you’re not ready to work with a full Kedro project, you can still use its for data handling within an existing notebook project. This section shows you how.
Kedro’s Data Catalog is a registry of all data sources available for use by the project. It offers a separate place to declare details of the datasets your projects use. Kedro provides built-in datasets for different file types and file systems so you don’t have to write any of the logic for reading or writing data.
Kedro offers a range of datasets, including CSV, Excel, Parquet, Feather, HDF5, JSON, Pickle, SQL Tables, SQL Queries, Spark DataFrames, and more. They are supported with the APIs of pandas, spark, networkx, matplotlib, yaml, and beyond. It relies on fsspec
to read and save data from a variety of data stores including local file systems, network file systems, cloud object stores, and Hadoop. You can pass arguments in to load and save operations, and use versioning and credentials for data access.
To start using the Data Catalog, you’ll need a catalog.yml
to define datasets that can be used when writing your functions. There is one included in the same folder as your notebook:
companies:
type: pandas.CSVDataset
filepath: data/companies.csv
reviews:
type: pandas.CSVDataset
filepath: data/reviews.csv
shuttles:
type: pandas.ExcelDataset
filepath: data/shuttles.xlsx
By using Kedro to load the catalog.yml
file, you can reference the Data Catalog in your notebook as you load the data for data processing.
# Using Kedro's DataCatalog
from kedro.io import DataCatalog
import yaml
# load the configuration file
with open("catalog.yml") as f:
conf_catalog = yaml.safe_load(f)
# Create the DataCatalog instance from the configuration
catalog = DataCatalog.from_config(conf_catalog)
# Load the datasets
companies = catalog.load("companies")
reviews = catalog.load("reviews")
shuttles = catalog.load("shuttles")
The rest of the spaceflights notebook code for data processing and model evaluation from above can now run as before.
Use a YAML configuration file¶
Use a configuration file for “magic numbers”¶
When writing exploratory code, it’s tempting to hard code values to save time, but it makes code harder to maintain in the longer-term. The example code for model evaluation above calls sklearn.model_selection.train_test_split()
, passing in a model input table and outputs the test and train datasets. There are hard-code values supplied to test_size
and random_state
.
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3, random_state=3)
Good software engineering practice suggests that we extract ‘magic numbers’ into named constants. These could be defined at the top of a file or in a utility file, in a format such as yaml.
# params.yml
model_options:
test_size: 0.3
random_state: 3
The params.yml
file is included in the example folder so you can reference the values with notebook code as follows:
import yaml
with open("params.yml", encoding="utf-8") as yaml_file:
params = yaml.safe_load(yaml_file)
test_size = params["model_options"]["test_size"]
random_state = params["model_options"]["random_state"]
features = [
"engines",
"passenger_capacity",
"crew",
"d_check_complete",
"moon_clearance_complete",
"iata_approved",
"company_rating",
"review_scores_rating",
]
X = model_input_table[features]
y = model_input_table["price"]
from sklearn.model_selection import train_test_split
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=test_size, random_state=random_state
)
The rest of the model evaluation code can now run as before.
from sklearn.linear_model import LinearRegression
model = LinearRegression()
model.fit(X_train, y_train)
model.predict(X_test)
from sklearn.metrics import r2_score
y_pred = model.predict(X_test)
r2_score(y_test, y_pred)
Use a configuration file for all “magic values”¶
If we extend the concept of magic numbers to encompass magic values in general, it seems possible that the variable features
might also be reusable elsewhere. Extracting it from code into the configuration file named parameters.yml
leads to the following:
# parameters.yml
model_options:
test_size: 0.3
random_state: 3
features:
- engines
- passenger_capacity
- crew
- d_check_complete
- moon_clearance_complete
- iata_approved
- company_rating
- review_scores_rating
The parameters.yml
file is included in the example folder so you can reference the values with notebook code as follows:
import yaml
with open("parameters.yml", encoding="utf-8") as yaml_file:
parameters = yaml.safe_load(yaml_file)
test_size = parameters["model_options"]["test_size"]
random_state = parameters["model_options"]["random_state"]
X = model_input_table[parameters["model_options"]["features"]]
y = model_input_table["price"]
from sklearn.model_selection import train_test_split
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=test_size, random_state=random_state
)
The rest of the model evaluation code can now run as before.
from sklearn.linear_model import LinearRegression
model = LinearRegression()
model.fit(X_train, y_train)
model.predict(X_test)
from sklearn.metrics import r2_score
y_pred = model.predict(X_test)
r2_score(y_test, y_pred)
Use Kedro configuration¶
Kedro offers a configuration loader to abstract loading values from a yaml file. You can use Kedro configuration loading without a full Kedro project and this approach replaces the need to load the configuration file with yaml.safe_load
.
Use Kedro’s configuration loader to load “magic values”¶
To use Kedro’s OmegaConfigLoader
to load parameters.yml
the code is as follows:
from kedro.config import OmegaConfigLoader
conf_loader = OmegaConfigLoader(conf_source=".")
conf_params = conf_loader["parameters"]
test_size = conf_params["model_options"]["test_size"]
random_state = conf_params["model_options"]["random_state"]
X = model_input_table[conf_params["model_options"]["features"]]
y = model_input_table["price"]
from sklearn.model_selection import train_test_split
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=test_size, random_state=random_state
)
The rest of the model evaluation code can now run as before.
from sklearn.linear_model import LinearRegression
model = LinearRegression()
model.fit(X_train, y_train)
model.predict(X_test)
from sklearn.metrics import r2_score
y_pred = model.predict(X_test)
r2_score(y_test, y_pred)
Use Kedro’s configuration loader to load the Data Catalog¶
Earlier in the example, we saw how to use Kedro’s Data Catalog to load a yaml
file, with safe_load
and pass it to the DataCatalog
class.
# Using Kedro's DataCatalog
from kedro.io import DataCatalog
import yaml
# load the configuration file
with open("catalog.yml") as f:
conf_catalog = yaml.safe_load(f)
# Create the DataCatalog instance from the configuration
catalog = DataCatalog.from_config(conf_catalog)
# Load the datasets
...
It’s also possible to use Kedro’s OmegaConfigLoader
configuration loader to initialise the Data Catalog.
To load catalog.yml
the code is as follows:
# Now we are using Kedro's ConfigLoader alongside the DataCatalog
from kedro.config import OmegaConfigLoader
from kedro.io import DataCatalog
conf_loader = OmegaConfigLoader(conf_source=".")
conf_catalog = conf_loader["catalog"]
# Create the DataCatalog instance from the configuration
catalog = DataCatalog.from_config(conf_catalog)
# Load the datasets
companies = catalog.load("companies")
reviews = catalog.load("reviews")
shuttles = catalog.load("shuttles")
Where next?¶
At this point in the notebook, we’ve introduced Kedro data management (using the Data Catalog) and configuration loader. You have now “Kedro-ised” the notebook code to make it more reusable in future. You can go further if your ultimate goal is to migrate code out of the notebook and use it in a full-blown Kedro project.
Refactor your code into functions¶
Code in a Kedro project runs in one or more pipelines, where a pipeline is a series of “nodes”, which wrap discrete functions. One option is to put everything into a single function. Let’s try this.
# Use Kedro for data management and configuration
from kedro.config import OmegaConfigLoader
from kedro.io import DataCatalog
conf_loader = OmegaConfigLoader(conf_source=".")
conf_catalog = conf_loader["catalog"]
conf_params = conf_loader["parameters"]
# Create the DataCatalog instance from the configuration
catalog = DataCatalog.from_config(conf_catalog)
# Load the datasets
companies = catalog.load("companies")
reviews = catalog.load("reviews")
shuttles = catalog.load("shuttles")
# Load the configuration data
test_size = conf_params["model_options"]["test_size"]
random_state = conf_params["model_options"]["random_state"]
def big_function():
####################
# Data processing #
####################
companies["iata_approved"] = companies["iata_approved"] == "t"
companies["company_rating"] = (
companies["company_rating"].str.replace("%", "").astype(float)
)
shuttles["d_check_complete"] = shuttles["d_check_complete"] == "t"
shuttles["moon_clearance_complete"] = shuttles["moon_clearance_complete"] == "t"
shuttles["price"] = (
shuttles["price"].str.replace("$", "").str.replace(",", "").astype(float)
)
rated_shuttles = shuttles.merge(reviews, left_on="id", right_on="shuttle_id")
model_input_table = rated_shuttles.merge(
companies, left_on="company_id", right_on="id"
)
model_input_table = model_input_table.dropna()
model_input_table.head()
X = model_input_table[conf_params["model_options"]["features"]]
y = model_input_table["price"]
##################################
# Model training and evaluation #
##################################
from sklearn.model_selection import train_test_split
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=test_size, random_state=random_state
)
from sklearn.linear_model import LinearRegression
model = LinearRegression()
model.fit(X_train, y_train)
model.predict(X_test)
from sklearn.metrics import r2_score
y_pred = model.predict(X_test)
print(r2_score(y_test, y_pred))
# Call the one big function
big_function()
In truth, this code is not much more maintainable than previous versions.
Maybe we could do better with a series of smaller functions that map to the Kedro vision of a pipeline of nodes. A node should behave consistently, repeatably, and predictably, so that a given input to a node always returns the same output. For those in the know, this is the definition of a pure function. Nodes/pure functions should be small single responsibility functions that perform a single specific task.
Let’s try this with our code. We’ll split it into a set of functions to process the data, which are based on the code in big_function
but where each function has a single responsibility. Then we’ll add a set of data science functions which split the model training and evaluation code into three separate functions.
####################
# Data processing #
####################
import pandas as pd
def _is_true(x: pd.Series) -> pd.Series:
return x == "t"
def _parse_percentage(x: pd.Series) -> pd.Series:
x = x.str.replace("%", "")
x = x.astype(float) / 100
return x
def _parse_money(x: pd.Series) -> pd.Series:
x = x.str.replace("$", "").str.replace(",", "")
x = x.astype(float)
return x
def preprocess_companies(companies: pd.DataFrame) -> pd.DataFrame:
companies["iata_approved"] = _is_true(companies["iata_approved"])
companies["company_rating"] = _parse_percentage(companies["company_rating"])
return companies
def preprocess_shuttles(shuttles: pd.DataFrame) -> pd.DataFrame:
shuttles["d_check_complete"] = _is_true(shuttles["d_check_complete"])
shuttles["moon_clearance_complete"] = _is_true(shuttles["moon_clearance_complete"])
shuttles["price"] = _parse_money(shuttles["price"])
return shuttles
def create_model_input_table(
shuttles: pd.DataFrame, companies: pd.DataFrame, reviews: pd.DataFrame
) -> pd.DataFrame:
rated_shuttles = shuttles.merge(reviews, left_on="id", right_on="shuttle_id")
model_input_table = rated_shuttles.merge(
companies, left_on="company_id", right_on="id"
)
model_input_table = model_input_table.dropna()
return model_input_table
##################################
# Model training and evaluation #
##################################
from typing import Dict, Tuple
from sklearn.linear_model import LinearRegression
from sklearn.metrics import r2_score
from sklearn.model_selection import train_test_split
def split_data(data: pd.DataFrame, parameters: Dict) -> Tuple:
X = data[parameters["features"]]
y = data["price"]
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=parameters["test_size"], random_state=parameters["random_state"]
)
return X_train, X_test, y_train, y_test
def train_model(X_train: pd.DataFrame, y_train: pd.Series) -> LinearRegression:
regressor = LinearRegression()
regressor.fit(X_train, y_train)
return regressor
def evaluate_model(
regressor: LinearRegression, X_test: pd.DataFrame, y_test: pd.Series
):
y_pred = regressor.predict(X_test)
print(r2_score(y_test, y_pred))
# Call data processing functions
preprocessed_companies = preprocess_companies(companies)
preprocessed_shuttles = preprocess_shuttles(shuttles)
model_input_table = create_model_input_table(
preprocessed_shuttles, preprocessed_companies, reviews
)
# Call model evaluation functions
X_train, X_test, y_train, y_test = split_data(
model_input_table, conf_params["model_options"]
)
regressor = train_model(X_train, y_train)
evaluate_model(regressor, X_test, y_test)
And that’s it. The notebook code has been refactored into a series of functions. Let’s reproduce it all in one big notebook cell for reference. Compare it to the notebook code at the top of this page that began this example.
# Kedro setup for data management and configuration
from kedro.config import OmegaConfigLoader
from kedro.io import DataCatalog
conf_loader = OmegaConfigLoader(conf_source=".")
conf_catalog = conf_loader["catalog"]
conf_params = conf_loader["parameters"]
# Create the DataCatalog instance from the configuration
catalog = DataCatalog.from_config(conf_catalog)
# Load the datasets
companies = catalog.load("companies")
reviews = catalog.load("reviews")
shuttles = catalog.load("shuttles")
# Load the configuration data
test_size = conf_params["model_options"]["test_size"]
random_state = conf_params["model_options"]["random_state"]
####################
# Data processing #
####################
import pandas as pd
def _is_true(x: pd.Series) -> pd.Series:
return x == "t"
def _parse_percentage(x: pd.Series) -> pd.Series:
x = x.str.replace("%", "")
x = x.astype(float) / 100
return x
def _parse_money(x: pd.Series) -> pd.Series:
x = x.str.replace("$", "").str.replace(",", "")
x = x.astype(float)
return x
def preprocess_companies(companies: pd.DataFrame) -> pd.DataFrame:
companies["iata_approved"] = _is_true(companies["iata_approved"])
companies["company_rating"] = _parse_percentage(companies["company_rating"])
return companies
def preprocess_shuttles(shuttles: pd.DataFrame) -> pd.DataFrame:
shuttles["d_check_complete"] = _is_true(shuttles["d_check_complete"])
shuttles["moon_clearance_complete"] = _is_true(shuttles["moon_clearance_complete"])
shuttles["price"] = _parse_money(shuttles["price"])
return shuttles
def create_model_input_table(
shuttles: pd.DataFrame, companies: pd.DataFrame, reviews: pd.DataFrame
) -> pd.DataFrame:
rated_shuttles = shuttles.merge(reviews, left_on="id", right_on="shuttle_id")
model_input_table = rated_shuttles.merge(
companies, left_on="company_id", right_on="id"
)
model_input_table = model_input_table.dropna()
return model_input_table
##################################
# Model training and evaluation #
##################################
from typing import Dict, Tuple
from sklearn.linear_model import LinearRegression
from sklearn.metrics import r2_score
from sklearn.model_selection import train_test_split
def split_data(data: pd.DataFrame, parameters: Dict) -> Tuple:
X = data[parameters["features"]]
y = data["price"]
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=parameters["test_size"], random_state=parameters["random_state"]
)
return X_train, X_test, y_train, y_test
def train_model(X_train: pd.DataFrame, y_train: pd.Series) -> LinearRegression:
regressor = LinearRegression()
regressor.fit(X_train, y_train)
return regressor
def evaluate_model(
regressor: LinearRegression, X_test: pd.DataFrame, y_test: pd.Series
):
y_pred = regressor.predict(X_test)
print(r2_score(y_test, y_pred))
# Call data processing functions
preprocessed_companies = preprocess_companies(companies)
preprocessed_shuttles = preprocess_shuttles(shuttles)
model_input_table = create_model_input_table(
preprocessed_shuttles, preprocessed_companies, reviews
)
# Call model evaluation functions
X_train, X_test, y_train, y_test = split_data(
model_input_table, conf_params["model_options"]
)
regressor = train_model(X_train, y_train)
evaluate_model(regressor, X_test, y_test)