How to integrate Amazon SageMaker into your Kedro pipeline

This tutorial explains how to integrate a Kedro project with Amazon SageMaker in order to train a machine learning model. It shows how to build machine learning pipelines in Kedro and while taking advantage of the power of SageMaker for potentially compute-intensive machine learning tasks.

The Kedro project will still run locally (or on one of many supported workflow engines like Argo, Prefect, Kubeflow, AWS Batch and others), but the model training step will be offloaded onto SageMaker.

Why would you use Amazon SageMaker?

Machine learning development is a complex, expensive, and labour-intensive process with very specific requirements for the execution environment (ML tools and libraries, hardware requirements for CPU or GPU-optimised algorithms). Amazon SageMaker provides the components used for machine learning in a single toolset so models get to production faster with much less effort and at lower cost. Sagemaker supports both classical machine learning libraries like Scikit-Learn or XGBoost, and Deep Learning frameworks such as TensorFlow or PyTorch.

Amazon SageMaker is a fully-managed service and its features are covered by the official service documentation. In this tutorial we will focus on training a simple machine learning model on SageMaker as part of Kedro pipeline execution.

Prerequisites

To use Amazon SageMaker, make sure you have the following prerequisites in place:

Prepare the environment

Install SageMaker package dependencies

First, you should add extra package dependencies that are required to communicate with SageMaker via its Python SDK.

If you have run kedro install at least once for your project, you should already have the src/requirements.in file, so you need to modify that. Otherwise, if you have never run kedro install for your project, you should modify src/requirements.txt. Open the corresponding file with a text editor and add the following lines at the end of the file:

sagemaker>=2.13.0
s3fs>=0.3.0, <0.4.1  # will be needed to work with AWS S3

Since you have added two extra dependencies, you should compile and install the updated project dependencies by running the following from your terminal:

cd <project_root>
kedro install --build-reqs

Note

All CLI commands in the following sections should be executed from the project root directory.

Create SageMaker execution role

  1. Sign into the AWS Management Console and open the IAM console

  2. In the left navigation pane, choose Roles

  3. Choose Create role

  4. For role type, select AWS Service, find and choose SageMaker, and then pick the SageMaker - Execution use case, then click Next: Permissions

  5. On the Attach permissions policy page, select AmazonSageMakerFullAccess managed policy, then click Next: Review

  6. Give it a name (for example, AmazonSageMaker-ExecutionRole) and choose Create role

IAM role creation wizard

Create S3 bucket

You should create a new S3 bucket rather than use an existing one because SageMaker jobs will save source script data to the bucket root. Having a dedicated bucket for this tutorial makes the cleanup easier.

Your bucket name should contain the word sagemaker, this way the role that we created earlier will automatically have all necessary access permissions to it. Your new bucket does not require public access, so you can leave Block all public access setting enabled and preserve other defaults.

It’s generally a good practice to create AWS resources (like S3 bucket above) for the tutorial within the same region that you have in your local configuration where possible. It helps reduce the network transmission latency and ensure that different services can talk to each other.

Update the Kedro project

Create the configuration environment

Configuration in Kedro is logically separated into configuration environments which are loaded in specific order where the project is run. To separate SageMaker-specific configuration from the default one, let’s create a new configuration environment. Go ahead and create a conf/sagemaker folder and then create the following files in it.

Note

${key} in the YAML snippets below is a special syntax which allows you to template the project configuration. You don’t need to replace those values, just paste them as-is.

  • catalog.yml - defines the datasets that need to be saved into S3 (rather than kept in memory):

X_train@pickle:
  type: pickle.PickleDataSet
  filepath: ${s3.train_path}/X_train.pickle

X_train@path:
  type: MemoryDataSet
  data: ${s3.train_path}/X_train.pickle

y_train:
  type: pickle.PickleDataSet
  filepath: ${s3.train_path}/y_train.pickle

Node: @pickle and @path in the dataset names above correspond to the dataset transcoding feature of Kedro. This allows to pass S3 path to the X_train dataset instead of the actual data itself to the train_model_sagemaker node that you will create shortly.

sklearn_estimator_kwargs:
  entry_point: src/kedro_tutorial/sagemaker_entry_point.py  # you will create this file later
  role: <your_sagemaker_role_name>  # put the name of the role you've created earlier
  instance_type: ml.m4.xlarge
  instance_count: 1
  framework_version: 0.23-1
  output_path: ${s3.output_path}
  • globals.yml - contains the values that will be used to extrapolate the values in the config files above (you will need to replace the bucket name in the snippet below):

s3:
  train_path: s3://<your_s3_bucket_name>/train
  output_path: s3://<your_s3_bucket_name>/output

Update the project hooks

Now you need to tell Kedro to use the TemplatedConfigLoader instead of the default ConfigLoader class to read the project configuration. It is very easy to do via Kedro hooks - open src/kedro_tutorial/hooks.py file, locate the definition of ProjectHooks and add the following method to it:

from typing import Iterable

from kedro.config import TemplatedConfigLoader
from kedro.framework.hooks import hook_impl


class ProjectHooks:
    # <other hooks>

    @hook_impl
    def register_config_loader(
        self, conf_paths: Iterable[str]
    ) -> TemplatedConfigLoader:
        return TemplatedConfigLoader(conf_paths, globals_pattern="*globals.yml")

Update the data science pipeline

Now modify the data science pipeline in the project to send the training job to Amazon SageMaker and to process the resulting model artifact afterwards.

Create node functions

Open src/kedro_tutorial/pipelines/data_science/nodes.py and add these new node functions:

Click to expand
import pickle
import tarfile
from typing import Any, Dict

import fsspec
from sagemaker.sklearn.estimator import SKLearn
from sklearn.linear_model import LinearRegression

# <other node functions>


def train_model_sagemaker(
    X_train_path: str, sklearn_estimator_kwargs: Dict[str, Any]
) -> str:
    """Train the linear regression model on SageMaker.

    Args:
        X_train_path: Full S3 path to `X_train` dataset.
        sklearn_estimator_kwargs: Keyword arguments that will be used
            to instantiate SKLearn estimator.

    Returns:
        Full S3 path to `model.tar.gz` file containing the model artifact.

    """
    sklearn_estimator = SKLearn(**sklearn_estimator_kwargs)

    # we need a path to the directory containing both
    # X_train (feature table) and y_train (target variable)
    inputs_dir = X_train_path.rsplit("/", 1)[0]
    inputs = {"train": inputs_dir}

    # wait=True ensures that the execution is blocked
    # until the job finishes on SageMaker
    sklearn_estimator.fit(inputs=inputs, wait=True)

    training_job = sklearn_estimator.latest_training_job
    job_description = training_job.describe()
    model_path = job_description["ModelArtifacts"]["S3ModelArtifacts"]
    return model_path


def untar_model(model_path: str) -> LinearRegression:
    """Unarchive the linear regression model artifact produced
    by the training job on SageMaker.

        Args:
            model_path: Full S3 path to `model.tar.gz` file containing
                the model artifact.

        Returns:
            Trained model.

    """
    with fsspec.open(model_path) as s3_file, tarfile.open(
        fileobj=s3_file, mode="r:gz"
    ) as tar:
        # we expect to have only one file inside the `model.tar.gz` archive
        filename = tar.getnames()[0]
        model_obj = tar.extractfile(filename)
        return pickle.load(model_obj)

Update the pipeline definition

Open src/kedro_tutorial/pipelines/data_science/pipeline.py and replace its contents with the following:

Click to expand
from kedro.pipeline import Pipeline, node

from .nodes import (
    evaluate_model,
    split_data,
    train_model_sagemaker,
    untar_model,
)


def create_pipeline(**kwargs):
    return Pipeline(
        [
            node(
                func=split_data,
                inputs=["model_input_table", "parameters"],
                outputs=["X_train@pickle", "X_test", "y_train", "y_test"],
            ),
            node(
                func=train_model_sagemaker,
                inputs=["X_train@path", "params:sklearn_estimator_kwargs"],
                outputs="model_path",
            ),
            node(untar_model, inputs="model_path", outputs="regressor"),
            node(
                func=evaluate_model,
                inputs=["regressor", "X_test", "y_test"],
                outputs=None,
            ),
        ]
    )

Great, you are almost ready to run your pipeline with the SageMaker integration. The last step before we can do that will be to create the entry point script.

Create the SageMaker entry point

SageMaker job requires an entry point script that it will execute to perform the actual model training. This script will be automatically uploaded into S3 and run as part of the training job. SageMaker will also automatically download the training data from S3 to the container before running the job and then upload the trained model artifact back into S3 after the job is complete. Therefore the entry point script does not need to worry about data transfer from and to S3, but it will need to serialise/deserialise such data using pickle.

Create the file src/kedro_tutorial/sagemaker_entry_point.py and paste the following into it:

Click to expand
import argparse
import pickle
from os import getenv
from pathlib import Path
from typing import Any

from sklearn.linear_model import LinearRegression


def _pickle(path: Path, data: Any) -> None:
    """Pickle the object and save it to disk"""
    with path.open("wb") as f:
        pickle.dump(data, f)


def _unpickle(path: Path) -> Any:
    """Unpickle the object from a given file"""
    with path.open("rb") as f:
        return pickle.load(f)


def _get_arg_parser() -> argparse.ArgumentParser:
    """Instantiate the command line argument parser"""
    parser = argparse.ArgumentParser()

    parser.add_argument(
        "--output-data-dir", type=str, default=getenv("SM_OUTPUT_DATA_DIR")
    )
    parser.add_argument("--model-dir", type=str, default=getenv("SM_MODEL_DIR"))
    parser.add_argument("--train", type=str, default=getenv("SM_CHANNEL_TRAIN"))
    parser.add_argument("--test", type=str, default=getenv("SM_CHANNEL_TEST"))

    return parser


def main():
    """The main script entry point which will be called by SageMaker
    when running the training job
    """
    parser = _get_arg_parser()
    args = parser.parse_args()

    data_path = Path(args.train)
    X_train = _unpickle(data_path / "X_train.pickle")
    y_train = _unpickle(data_path / "y_train.pickle")

    regressor = LinearRegression()
    regressor.fit(X_train, y_train)

    model_dir = Path(args.model_dir)
    _pickle(model_dir / "regressor.pickle", regressor)


if __name__ == "__main__":
    # SageMaker will run this script as the main program
    main()

Run the project

You are now ready to run your project! To do that, execute the following CLI command:

kedro run --env sagemaker

The first 4 nodes of the pipeline will still run locally, but then you should see a similar output in the terminal:

2020-10-06 21:20:25,696 - kedro.runner.sequential_runner - INFO - Completed 4 out of 7 tasks
2020-10-06 21:20:25,696 - kedro.io.data_catalog - INFO - Loading data from `X_train@path` (MemoryDataSet)...
2020-10-06 21:20:25,696 - kedro.io.data_catalog - INFO - Loading data from `params:sklearn_estimator_kwargs` (MemoryDataSet)...
2020-10-06 21:20:25,697 - kedro.pipeline.node - INFO - Running node: train_model_sagemaker([X_train@path,params:sklearn_estimator_kwargs]) -> [model_path]
2020-10-06 21:20:25,713 - botocore.credentials - INFO - Found credentials in shared credentials file: ~/.aws/credentials
2020-10-06 21:20:25,793 - sagemaker.image_uris - INFO - Same images used for training and inference. Defaulting to image scope: inference.
2020-10-06 21:20:27,197 - sagemaker - INFO - Creating training-job with name: sagemaker-scikit-learn-2020-10-06-20-20-25-801
2020-10-06 20:20:27 Starting - Starting the training job...
2020-10-06 20:20:33 Starting - Launching requested ML instances...
2020-10-06 20:21:31 Starting - Preparing the instances for training...
2020-10-06 20:22:27 Downloading - Downloading input data...
2020-10-06 20:23:02 Training - Downloading the training image...

... [SageMaker Job Logs] ...

2020-10-06 20:23:56 Uploading - Uploading generated training model
2020-10-06 20:23:56 Completed - Training job completed
2020-10-06 21:24:20,485 - kedro.io.data_catalog - INFO - Saving data to `model_path` (MemoryDataSet)...
2020-10-06 21:24:20,486 - kedro.runner.sequential_runner - INFO - Completed 5 out of 7 tasks

You should also find your training job if you open SageMaker console and choose Training jobs tab from the left.

Now you know how to run serverless machine learning jobs using SageMaker right from your Kedro pipeline!

Cleanup

To cleanup the resources, simply delete the S3 bucket and, optionally, the IAM role you’ve created earlier (IAM resources are free). The job details of an already completed SageMaker training job cannot be deleted, but such jobs don’t incur any costs.