Pipeline objects¶
We previously introduced Nodes as building blocks that represent tasks, and can be combined in a pipeline to build your workflow. A pipeline organises the dependencies and execution order of your collection of nodes, and connects inputs and outputs while keeping your code modular. The pipeline resolves dependencies to determine the node execution order, and does not necessarily run the nodes in the order in which they are passed in.
To benefit from Kedro’s automatic dependency resolution is that you can chain your nodes into a Pipeline
, which is a list of nodes that use a shared set of variables. That class can be created using the pipeline
method, based on nodes or other pipelines (in which case all nodes from that pipeline will be used).
The following sections explain how to create and use Kedro pipelines:
How to use
describe
to discover what nodes are part of the pipelineHow to receive information about pipeline inputs and outputs
How to build a pipeline¶
In the following example, we construct a simple pipeline that computes the variance of a set of numbers. In practice, pipelines can use more complicated node definitions, and the variables they use usually correspond to entire datasets:
from kedro.pipeline import pipeline, node
def mean(xs, n):
return sum(xs) / n
def mean_sos(xs, n):
return sum(x**2 for x in xs) / n
def variance(m, m2):
return m2 - m * m
variance_pipeline = pipeline(
[
node(len, "xs", "n"),
node(mean, ["xs", "n"], "m", name="mean_node"),
node(mean_sos, ["xs", "n"], "m2", name="mean_sos"),
node(variance, ["m", "m2"], "v", name="variance_node"),
]
)
Kedro determines the order of execution of these nodes based on the inputs and outputs specified for each node. In this example:
The first node computes the length of
xs
and outputs it asn
.The second node calculates the mean using
xs
andn
, and outputs it asm
.The third node computes the mean sum of squares (mean_sos) using
xs
andn
, and outputs it asm2
.The fourth node calculates the variance using the mean (
m
) and mean sum of squares (m2
), and outputs it asv
.
Kedro’s dependency resolution algorithm ensures that each node runs only after its required inputs are available from the outputs of previous nodes. This way, the nodes are executed in the correct order automatically, based on the defined dependencies.
How to use describe
to discover what nodes are part of the pipeline¶
You can use the describe
method to get an overview of the nodes in your pipeline and their execution order.
print(variance_pipeline.describe())
The output is as follows:
#### Pipeline execution order ####
Name: None
Inputs: xs
len([xs]) -> [n]
mean_node
mean_sos
variance_node
Outputs: v
##################################
How to merge multiple pipelines¶
You can merge multiple pipelines as shown below. Note that, in this case, pipeline_de
and pipeline_ds
are expanded to a list of their underlying nodes and these are merged together:
pipeline_de = pipeline([node(len, "xs", "n"), node(mean, ["xs", "n"], "m")])
pipeline_ds = pipeline(
[node(mean_sos, ["xs", "n"], "m2"), node(variance, ["m", "m2"], "v")]
)
last_node = node(print, "v", None)
pipeline_all = pipeline([pipeline_de, pipeline_ds, last_node])
print(pipeline_all.describe())
The output is as follows:
#### Pipeline execution order ####
Name: None
Inputs: xs
len([xs]) -> [n]
mean([n,xs]) -> [m]
mean_sos([n,xs]) -> [m2]
variance([m,m2]) -> [v]
print([v]) -> None
Outputs: None
##################################
How to receive information about the nodes in a pipeline¶
Pipelines provide access to their nodes in a topological order to enable custom functionality, e.g. pipeline visualisation. Each node has information about its inputs and outputs:
nodes = variance_pipeline.nodes
nodes
The output is as follows:
[
Node(len, "xs", "n", None),
Node(mean, ["xs", "n"], "m", "mean_node"),
Node(mean_sos, ["xs", "n"], "m2", "mean_sos"),
Node(variance, ["m", "m2"], "v", "variance node"),
]
To find out about the inputs:
nodes[0].inputs
You should see the following:
["xs"]
How to receive information about pipeline inputs and outputs¶
In a similar way to the above, you can use inputs()
and outputs()
to check the inputs and outputs of a pipeline:
variance_pipeline.inputs()
Gives the following:
Out[7]: {'xs'}
variance_pipeline.outputs()
Displays the output:
Out[8]: {'v'}
How to tag a pipeline¶
You can also tag your pipeline by providing the tags
argument, which will tag all of the pipeline’s nodes. In the following example, both nodes are tagged with pipeline_tag
.
pipeline = pipeline(
[node(..., name="node1"), node(..., name="node2")], tags="pipeline_tag"
)
You can combine pipeline tagging with node tagging. In the following example, node1
and node2
are tagged with pipeline_tag
, while node2
also has a node_tag
.
pipeline = pipeline(
[node(..., name="node1"), node(..., name="node2", tags="node_tag")],
tags="pipeline_tag",
)
How to avoid creating bad pipelines¶
A pipelines can usually readily resolve its dependencies. In some cases, resolution is not possible. In this case, the pipeline is not well-formed.
Pipeline with bad nodes¶
In this case, we have a pipeline consisting of a single node with no input and output:
try:
pipeline([node(lambda: print("!"), None, None)])
except Exception as e:
print(e)
Gives the following output:
Invalid Node definition: it must have some `inputs` or `outputs`.
Format should be: node(function, inputs, outputs)
Pipeline with circular dependencies¶
For every two variables where the first depends on the second, there must not be a way in which the second also depends on the first, otherwise, a circular dependency will prevent us from compiling the pipeline.
The first node captures the relationship of how to calculate y
from x
and the second captures the relationship of how to calculate x
knowing y
. The pair of nodes cannot co-exist in the same pipeline:
try:
pipeline(
[
node(lambda x: x + 1, "x", "y", name="first node"),
node(lambda y: y - 1, "y", "x", name="second node"),
]
)
except Exception as e:
print(e)
The output is as follows:
Circular dependencies exist among these items: ['first node: <lambda>([x]) -> [y]', 'second node: <lambda>([y]) -> [x]']
Pipeline nodes named with the dot notation¶
Nodes named with dot notation may behave strangely.
pipeline([node(lambda x: x, inputs="input1kedro", outputs="output1.kedro")])
Nodes that are created with input or output names that contain .
risk a disconnected pipeline or improperly-formatted Kedro structure.
This is because .
has a special meaning internally and indicates a namespace pipeline. In the example, the outputs segment should be disconnected as the name implies there is an “output1” namespace pipeline. The input is not namespaced, but the output is via its dot notation. This leads to Kedro processing each separately. For this example, a better approach would’ve been writing both as input1_kedro
and output1_kedro
.
We recommend use of characters like _
instead of .
as name separators.
How to store pipeline code in a kedro project¶
When managing your Kedro project, we recommend grouping related tasks into individual pipelines to achieve modularity. A project typically contains many tasks, and organising frequently executed tasks together into separate pipelines helps maintain order and efficiency. Each pipeline should ideally be organised in its own folder, promoting easy copying and reuse within project. Simply put: one pipeline, one folder. To assist with this, Kedro introduces the concept of Modular Pipelines, which are described in the next section.