kedro.pipeline.Pipeline¶
- class kedro.pipeline.Pipeline(nodes, *, tags=None)[source]¶
A
Pipeline
defined as a collection ofNode
objects. This class treats nodes as part of a graph representation and provides inputs, outputs and execution order.Attributes
Return a list of the pipeline nodes in topologically ordered groups, i.e. if node A needs to be run before node B, it will appear in an earlier group.
Return a dictionary of the pipeline nodes grouped by namespace with information about the nodes, their type, and dependencies.
All dependencies of nodes where the first Node has a direct dependency on the second Node.
Return a list of the pipeline nodes in topological order, i.e. if node A needs to be run before node B, it will appear earlier in the list.
Methods
All inputs for all nodes in the pipeline.
All outputs of all nodes in the pipeline.
datasets
()The names of all datasets used by the
Pipeline
, including inputs and outputs.describe
([names_only])Obtain the order of execution and expected free input variables in a loggable pre-formatted string.
filter
([tags, from_nodes, to_nodes, ...])Creates a new
Pipeline
object with the nodes that meet all of the specified filtering conditions.from_inputs
(*inputs)Create a new
Pipeline
object with the nodes which depend directly or transitively on the provided inputs.from_nodes
(*node_names)Create a new
Pipeline
object with the nodes which depend directly or transitively on the provided nodes.inputs
()The names of free inputs that must be provided at runtime so that the pipeline is runnable.
only_nodes
(*node_names)Create a new
Pipeline
which will contain only the specified nodes by name.only_nodes_with_inputs
(*inputs)Create a new
Pipeline
object with the nodes which depend directly on the provided inputs.only_nodes_with_namespace
(node_namespace)Creates a new
Pipeline
containing only nodes with the specified namespace.only_nodes_with_outputs
(*outputs)Create a new
Pipeline
object with the nodes which are directly required to produce the provided outputs.only_nodes_with_tags
(*tags)Creates a new
Pipeline
object with the nodes which contain any of the provided tags.outputs
()The names of outputs produced when the whole pipeline is run.
tag
(tags)Tags all the nodes in the pipeline.
to_json
()Return a json representation of the pipeline.
to_nodes
(*node_names)Create a new
Pipeline
object with the nodes required directly or transitively by the provided nodes.to_outputs
(*outputs)Create a new
Pipeline
object with the nodes which are directly or transitively required to produce the provided outputs.- __init__(nodes, *, tags=None)[source]¶
Initialise
Pipeline
with a list ofNode
instances.- Parameters:
nodes (Iterable[Node | Pipeline]) – The iterable of nodes the
Pipeline
will be made of. If you provide pipelines among the list of nodes, those pipelines will be expanded and all their nodes will become part of this new pipeline.tags (str | Iterable[str] | None) – Optional set of tags to be applied to all the pipeline nodes.
- Raises:
ValueError – When an empty list of nodes is provided, or when not all nodes have unique names.
CircularDependencyError – When visiting all the nodes is not possible due to the existence of a circular dependency.
OutputNotUniqueError – When multiple
Node
instances produce the same output.ConfirmNotUniqueError – When multiple
Node
instances attempt to confirm the same dataset.
Example:
from kedro.pipeline import Pipeline from kedro.pipeline import node # In the following scenario first_ds and second_ds # are datasets provided by io. Pipeline will pass these # datasets to first_node function and provides the result # to the second_node as input. def first_node(first_ds, second_ds): return dict(third_ds=first_ds+second_ds) def second_node(third_ds): return third_ds pipeline = Pipeline([ node(first_node, ['first_ds', 'second_ds'], ['third_ds']), node(second_node, dict(third_ds='third_ds'), 'fourth_ds')]) pipeline.describe()
- describe(names_only=True)[source]¶
Obtain the order of execution and expected free input variables in a loggable pre-formatted string. The order of nodes matches the order of execution given by the topological sort.
- Parameters:
names_only (
bool
) – The flag to describe names_only pipeline with just node names.
Example:
pipeline = Pipeline([ ... ]) logger = logging.getLogger(__name__) logger.info(pipeline.describe())
After invocation the following will be printed as an info level log statement:
#### Pipeline execution order #### Inputs: C, D func1([C]) -> [A] func2([D]) -> [B] func3([A, D]) -> [E] Outputs: B, E ##################################
- Return type:
- Returns:
The pipeline description as a formatted string.
- filter(tags=None, from_nodes=None, to_nodes=None, node_names=None, from_inputs=None, to_outputs=None, node_namespace=None)[source]¶
Creates a new
Pipeline
object with the nodes that meet all of the specified filtering conditions.The new pipeline object is the intersection of pipelines that meet each filtering condition. This is distinct from chaining multiple filters together.
- Parameters:
tags (Iterable[str] | None) – A list of node tags which should be used to lookup the nodes of the new
Pipeline
.from_nodes (Iterable[str] | None) – A list of node names which should be used as a starting point of the new
Pipeline
.to_nodes (Iterable[str] | None) – A list of node names which should be used as an end point of the new
Pipeline
.node_names (Iterable[str] | None) – A list of node names which should be selected for the new
Pipeline
.from_inputs (Iterable[str] | None) – A list of inputs which should be used as a starting point of the new
Pipeline
to_outputs (Iterable[str] | None) – A list of outputs which should be the final outputs of the new
Pipeline
.node_namespace (str | None) – One node namespace which should be used to select nodes in the new
Pipeline
.
- Return type:
- Returns:
- A new
Pipeline
object with nodes that meet all of the specified filtering conditions.
- A new
- Raises:
ValueError – The filtered
Pipeline
has no nodes.
Example:
pipeline = Pipeline( [ node(func, "A", "B", name="node1"), node(func, "B", "C", name="node2"), node(func, "C", "D", name="node3"), ] ) pipeline.filter(node_names=["node1", "node3"], from_inputs=["A"]) # Gives a new pipeline object containing node1 and node3.
- from_inputs(*inputs)[source]¶
Create a new
Pipeline
object with the nodes which depend directly or transitively on the provided inputs. If provided a name, but no format, for a transcoded input, it includes all the nodes that use inputs with that name, otherwise it matches to the fully-qualified name only (i.e. name@format).- Parameters:
*inputs (
str
) – A list of inputs which should be used as a starting point of the newPipeline
- Raises:
ValueError – Raised when any of the given inputs do not exist in the
Pipeline
object.- Return type:
- Returns:
- A new
Pipeline
object, containing a subset of the nodes of the current one such that only nodes depending directly or transitively on the provided inputs are being copied.
- A new
- from_nodes(*node_names)[source]¶
Create a new
Pipeline
object with the nodes which depend directly or transitively on the provided nodes.- Parameters:
*node_names (
str
) – A list of node_names which should be used as a starting point of the newPipeline
.- Raises:
ValueError – Raised when any of the given names do not exist in the
Pipeline
object.- Return type:
- Returns:
- A new
Pipeline
object, containing a subset of the nodes of the current one such that only nodes depending directly or transitively on the provided nodes are being copied.
- A new
- property grouped_nodes: list[list[Node]]¶
Return a list of the pipeline nodes in topologically ordered groups, i.e. if node A needs to be run before node B, it will appear in an earlier group.
- property grouped_nodes_by_namespace: dict[str, dict[str, Any]]¶
Return a dictionary of the pipeline nodes grouped by namespace with information about the nodes, their type, and dependencies. The structure of the dictionary is: {‘node_name/namespace_name’ : {‘name’: ‘node_name/namespace_name’,’type’: ‘namespace’ or ‘node’,’nodes’: [list of nodes],’dependencies’: [list of dependencies]}} This property is intended to be used by deployment plugins to group nodes by namespace.
- inputs()[source]¶
The names of free inputs that must be provided at runtime so that the pipeline is runnable. Does not include intermediate inputs which are produced and consumed by the inner pipeline nodes. Resolves transcoded names where necessary.
- property node_dependencies: dict[Node, set[Node]]¶
All dependencies of nodes where the first Node has a direct dependency on the second Node.
- property nodes: list[Node]¶
Return a list of the pipeline nodes in topological order, i.e. if node A needs to be run before node B, it will appear earlier in the list.
- only_nodes(*node_names)[source]¶
Create a new
Pipeline
which will contain only the specified nodes by name.- Parameters:
*node_names (
str
) – One or more node names. The returnedPipeline
will only contain these nodes.- Raises:
ValueError – When some invalid node name is given.
- Return type:
- Returns:
A new
Pipeline
, containing onlynodes
.
- only_nodes_with_inputs(*inputs)[source]¶
Create a new
Pipeline
object with the nodes which depend directly on the provided inputs. If provided a name, but no format, for a transcoded input, it includes all the nodes that use inputs with that name, otherwise it matches to the fully-qualified name only (i.e. name@format).- Parameters:
*inputs (
str
) – A list of inputs which should be used as a starting point of the newPipeline
.- Raises:
ValueError – Raised when any of the given inputs do not exist in the
Pipeline
object.- Return type:
- Returns:
- A new
Pipeline
object, containing a subset of the nodes of the current one such that only nodes depending directly on the provided inputs are being copied.
- A new
- only_nodes_with_namespace(node_namespace)[source]¶
Creates a new
Pipeline
containing only nodes with the specified namespace.- Parameters:
node_namespace (
str
) – One node namespace.- Raises:
ValueError – When pipeline contains no nodes with the specified namespace.
- Return type:
- Returns:
A new
Pipeline
containing nodes with the specified namespace.
- only_nodes_with_outputs(*outputs)[source]¶
Create a new
Pipeline
object with the nodes which are directly required to produce the provided outputs. If provided a name, but no format, for a transcoded dataset, it includes all the nodes that output to that name, otherwise it matches to the fully-qualified name only (i.e. name@format).- Parameters:
*outputs (
str
) – A list of outputs which should be the final outputs of the newPipeline
.- Raises:
ValueError – Raised when any of the given outputs do not exist in the
Pipeline
object.- Return type:
- Returns:
A new
Pipeline
object, containing a subset of the nodes of the current one such that only nodes which are directly required to produce the provided outputs are being copied.
- only_nodes_with_tags(*tags)[source]¶
Creates a new
Pipeline
object with the nodes which contain any of the provided tags. The resultingPipeline
is empty if no tags are provided.
- outputs()[source]¶
The names of outputs produced when the whole pipeline is run. Does not include intermediate outputs that are consumed by other pipeline nodes. Resolves transcoded names where necessary.
- to_nodes(*node_names)[source]¶
Create a new
Pipeline
object with the nodes required directly or transitively by the provided nodes.- Parameters:
*node_names (
str
) – A list of node_names which should be used as an end point of the newPipeline
.- Raises:
ValueError – Raised when any of the given names do not exist in the
Pipeline
object.- Return type:
- Returns:
- A new
Pipeline
object, containing a subset of the nodes of the current one such that only nodes required directly or transitively by the provided nodes are being copied.
- A new
- to_outputs(*outputs)[source]¶
Create a new
Pipeline
object with the nodes which are directly or transitively required to produce the provided outputs. If provided a name, but no format, for a transcoded dataset, it includes all the nodes that output to that name, otherwise it matches to the fully-qualified name only (i.e. name@format).- Parameters:
*outputs (
str
) – A list of outputs which should be the final outputs of the newPipeline
.- Raises:
ValueError – Raised when any of the given outputs do not exist in the
Pipeline
object.- Return type:
- Returns:
A new
Pipeline
object, containing a subset of the nodes of the current one such that only nodes which are directly or transitively required to produce the provided outputs are being copied.