Skip to content

The pipeline registry

Projects generated using Kedro 0.17.2 or later define their pipelines in src/<package_name>/pipeline_registry.py. This populates the pipelines variable in kedro.framework.project that the Kedro CLI and plugins use to access project pipelines. The pipeline_registry module must contain a top-level register_pipelines() function that returns a mapping from pipeline names to Pipeline objects.

For example, the pipeline registry in the Kedro starter for the completed spaceflights tutorial defines the following register_pipelines() function. It exposes the data processing pipeline, the data science pipeline, and a default pipeline that combines both:

import spaceflights.pipelines.data_processing as dp
import spaceflights.pipelines.data_science as ds


def register_pipelines() -> Dict[str, Pipeline]:
    """Register the project's pipelines.

    Returns:
        A mapping from pipeline names to ``Pipeline`` objects.
    """
    data_processing_pipeline = dp.create_pipeline()
    data_science_pipeline = ds.create_pipeline()

    return {
        "__default__": data_processing_pipeline + data_science_pipeline,
        "data_processing": data_processing_pipeline,
        "data_science": data_science_pipeline,
    }

As a reminder, running kedro run without the --pipeline option runs the default pipeline.

Note

The order in which you add the pipelines together is not significant (data_science_pipeline + data_processing_pipeline would produce the same result), since Kedro automatically detects the data-centric execution order for all the nodes in the resulting pipeline.

Pipeline autodiscovery

In the above example, you need to update the register_pipelines() function whenever you create a pipeline that should be returned as part of the project's pipelines. Since Kedro 0.18.3, you can achieve the same result with less code using find_pipelines(). The updated pipeline registry contains no project-specific code:

def register_pipelines() -> Dict[str, Pipeline]:
    """Register the project's pipelines.

    Returns:
        A mapping from pipeline names to ``Pipeline`` objects.
    """
    pipelines = find_pipelines()
    pipelines["__default__"] = sum(pipelines.values())
    return pipelines

Under the hood, the find_pipelines() function traverses the src/<package_name>/pipelines/ directory and returns a mapping from pipeline directory name to Pipeline object by:

  1. Importing the <package_name>.pipelines.<pipeline_name> module
  2. Calling the create_pipeline() function exposed by the <package_name>.pipelines.<pipeline_name> module
  3. Validating that the constructed object is a Pipeline

By default, if any of these steps fail, find_pipelines() (or find_pipelines(raise_errors=False)) raises an appropriate warning and skips the current pipeline but continues traversal. During development, this enables you to run your project with some pipelines, even if other pipelines are broken or works in progress.

If you specify find_pipelines(raise_errors=True), the autodiscovery process will fail upon the first error. In production, this ensures errors are caught up front and prevents pipelines from being excluded.

The mapping returned by find_pipelines() can be modified, meaning you are not limited to the pipelines returned by each of the create_pipeline() functions found above. For example, to add a data engineering pipeline that isn't part of the default pipeline, add it to the dictionary after constructing the default pipeline:

def register_pipelines() -> Dict[str, Pipeline]:
    """Register the project's pipelines.

    Returns:
        A mapping from pipeline names to ``Pipeline`` objects.
    """
    pipelines = find_pipelines()
    pipelines["__default__"] = sum(pipelines.values())
    pipelines["data_engineering"] = Pipeline(
        pipelines["data_processing"], tags="data_engineering"
    )
    return pipelines

Note

In the case above, kedro run --tags data_engineering will not run the data engineering pipeline, as it is not part of the default pipeline. To run the data engineering pipeline, you need to specify kedro run --pipeline data_engineering --tags data_engineering.

You can also change pipelines before assigning pipelines["__default__"] = sum(pipelines.values()), which includes them in the default pipeline. For example, you can update the data_processing pipeline with the data_engineering tag in pipeline_registry.py and also include this change in the default pipeline:

def register_pipelines() -> Dict[str, Pipeline]:
    """Register the project's pipelines.

    Returns:
        A mapping from pipeline names to ``Pipeline`` objects.
    """
    pipelines = find_pipelines()
    pipelines["data_processing"] = Pipeline(
        pipelines["data_processing"], tags="data_engineering"
    )
    pipelines["__default__"] = sum(pipelines.values())
    return pipelines

Selective pipeline loading

In large projects, you may want to load a subset of pipelines rather than discovering every one of them. You can do this by passing a list of pipeline names to find_pipelines() through the pipelines_to_find parameter:

def register_pipelines() -> Dict[str, Pipeline]:
    """Register the project's pipelines.

    Returns:
        A mapping from pipeline names to ``Pipeline`` objects.
    """
    pipelines = find_pipelines(pipelines_to_find=["data_science", "reporting"])
    pipelines["__default__"] = sum(pipelines.values())
    return pipelines

Passing None (the default) or a list containing "__default__" loads all pipelines, which is the same as calling find_pipelines() with no arguments.

Note

If a pipeline name in pipelines_to_find does not match any pipeline in src/<package_name>/pipelines/, find_pipelines() will raise a warning and skip it. Set raise_errors=True to turn that into an error instead.