Node
kedro.pipeline.node ¶
This module provides user-friendly functions for creating nodes as parts of Kedro pipelines.
GroupedNodes
dataclass
¶
GroupedNodes(name, type, nodes=list(), dependencies=list())
Represents a logical group of nodes, typically by namespace or a custom grouping. A group can also consist of a single node. This is used to support deployment—for example, by executing the entire group in a single container run.
Node ¶
Node(func, inputs, outputs, *, name=None, tags=None, confirms=None, namespace=None, preview_fn=None)
Node is an auxiliary class facilitating the operations required to
run user-provided functions as part of Kedro pipelines.
Parameters:
-
func(Callable) –A function that corresponds to the node logic. The function should have at least one input or output.
-
inputs(str | list[str] | dict[str, str] | None) –The name or the list of the names of variables used as inputs to the function. The number of names should match the number of arguments in the definition of the provided function. When dict[str, str] is provided, variable names will be mapped to function argument names.
-
outputs(str | list[str] | dict[str, str] | None) –The name or the list of the names of variables used as outputs of the function. The number of names should match the number of outputs returned by the provided function. When dict[str, str] is provided, variable names will be mapped to the named outputs the function returns.
-
name(str | None, default:None) –Optional node name to be used when displaying the node in logs or any other visualisations. Valid node name must contain only letters, digits, hyphens, underscores and/or fullstops.
-
tags(str | Iterable[str] | None, default:None) –Optional set of tags to be applied to the node. Valid node tag must contain only letters, digits, hyphens, underscores and/or fullstops.
-
confirms(str | list[str] | None, default:None) –Optional name or the list of the names of the datasets that should be confirmed. This will result in calling
confirm()method of the corresponding dataset instance. Specified dataset names do not necessarily need to be present in the nodeinputsoroutputs. -
namespace(str | None, default:None) –Optional node namespace.
-
preview_fn(Callable[..., PreviewPayload] | None, default:None) –Optional preview function that returns one of the valid preview types (TextPreview, MermaidPreview, ImagePreview, or CustomPreview). This is an experimental feature.
Raises:
-
ValueError–Raised in the following cases: a) When the provided arguments do not conform to the format suggested by the type hint of the argument. b) When the node produces multiple outputs with the same name. c) When an input has the same name as an output. d) When the given node name violates the requirements: it must contain only letters, digits, hyphens, underscores and/or fullstops.
Source code in kedro/pipeline/node.py
51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 | |
confirms
property
¶
confirms
Return dataset names to confirm as a list.
Returns:
-
list[str]–Dataset names to confirm as a list.
func
property
writable
¶
func
Exposes the underlying function of the node.
Returns:
-
Callable–Return the underlying function of the node.
inputs
cached
property
¶
inputs
Return node inputs as a list, in the order required to bind them properly to the node's function.
Returns:
-
list[str]–Node input names as a list.
name
property
¶
name
Node's name.
Returns:
-
str–Node's name if provided or the name of its function.
namespace
property
¶
namespace
Node's namespace.
Returns:
-
str | None–String representing node's namespace, typically from outer to inner scopes.
namespace_prefixes
cached
property
¶
namespace_prefixes
Return all hierarchical prefixes of the node's namespace.
Returns:
-
list[str]–A list of namespace prefixes, from shortest to longest.
-
list[str]–For example, a namespace 'a.b.c' would return ['a', 'a.b', 'a.b.c'].
-
list[str]–If the node has no namespace, returns an empty list.
outputs
property
¶
outputs
Return node outputs as a list preserving the original order if possible.
Returns:
-
list[str]–Node output names as a list.
short_name
property
¶
short_name
Node's name.
Returns:
-
str–Returns a short, user-friendly name that is not guaranteed to be unique.
-
str–The namespace is stripped out of the node name.
tags
property
¶
tags
Return the tags assigned to the node.
Returns:
-
set[str]–Return the set of all assigned tags to the node.
preview ¶
preview()
Execute the preview function if available and validate its return type.
Returns:
-
PreviewPayload | None–A preview payload (one of TextPreview, MermaidPreview, ImagePreview, or CustomPreview) if preview_fn is set, None otherwise.
Raises:
-
ValueError–If the preview function does not return one of the valid preview types.
Examples:
from kedro.pipeline.preview_contract import (
MermaidPreview,
ImagePreview,
)
# Define your preview methods
# Example 1: Mermaid diagram
def preview_pipeline_flow() -> MermaidPreview:
steps = ["Load", "Validate", "Transform", "Save"]
mermaid = "graph LR\n"
for i, step in enumerate(steps):
if i < len(steps) - 1:
mermaid += f" {step} --> {steps[i+1]}\n"
return MermaidPreview(content=mermaid)
# Example 2: Image preview (URL or data URI)
def preview_image() -> ImagePreview:
return ImagePreview(
content="https://example.com/chart.png",
# or use data URI: "data:image/png;base64,iVBORw0KGgo..."
)
# Define your node which uses the preview_fn
my_node = node(
func=process_data,
inputs="raw_data",
outputs="processed_data",
preview_fn=your_preview_function,
)
# Receive the preview payload
payload = my_node.preview()
# Serialize for frontend/API use:
json_dict = payload.to_dict() # Returns JSONObject
Source code in kedro/pipeline/node.py
419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 | |
run ¶
run(inputs=None)
Run this node using the provided inputs and return its results in a dictionary.
Parameters:
-
inputs(dict[str, Any] | None, default:None) –Dictionary of inputs as specified at the creation of the node.
Raises:
-
ValueError–In the following cases: a) The node function inputs are incompatible with the node input definition. Example 1: node definition input is a list of 2 DataFrames, whereas only 1 was provided or 2 different ones were provided. b) The node function outputs are incompatible with the node output definition. Example 1: node function definition is a dictionary, whereas function returns a list. Example 2: node definition output is a list of 5 strings, whereas the function returns a list of 4 objects.
-
Exception–Any exception thrown during execution of the node.
Returns:
-
dict[str, Any]–All produced node outputs are returned in a dictionary, where the
-
dict[str, Any]–keys are defined by the node outputs.
Source code in kedro/pipeline/node.py
503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 | |
tag ¶
tag(tags)
Create a new Node which is an exact copy of the current one,
but with more tags added to it.
Parameters:
-
tags(str | Iterable[str]) –The tags to be added to the new node.
Returns:
-
Node–A copy of the current
Nodeobject with the tags added.
Source code in kedro/pipeline/node.py
308 309 310 311 312 313 314 315 316 317 318 319 | |
node ¶
node(func, inputs, outputs, *, name=None, tags=None, confirms=None, namespace=None, preview_fn=None)
Create a node in the pipeline by providing a function to be called along with variable names for inputs and/or outputs.
Parameters:
-
func(Callable) –A function that corresponds to the node logic. The function should have at least one input or output.
-
inputs(str | list[str] | dict[str, str] | None) –The name or the list of the names of variables used as inputs to the function. The number of names should match the number of arguments in the definition of the provided function. When dict[str, str] is provided, variable names will be mapped to function argument names.
-
outputs(str | list[str] | dict[str, str] | None) –The name or the list of the names of variables used as outputs to the function. The number of names should match the number of outputs returned by the provided function. When dict[str, str] is provided, variable names will be mapped to the named outputs the function returns.
-
name(str | None, default:None) –Optional node name to be used when displaying the node in logs or any other visualisations.
-
tags(str | Iterable[str] | None, default:None) –Optional set of tags to be applied to the node.
-
confirms(str | list[str] | None, default:None) –Optional name or the list of the names of the datasets that should be confirmed. This will result in calling
confirm()method of the corresponding dataset instance. Specified dataset names do not necessarily need to be present in the nodeinputsoroutputs. -
namespace(str | None, default:None) –Optional node namespace.
-
preview_fn(Callable[..., PreviewPayload] | None, default:None) –Optional preview function that returns one of the valid preview types (TextPreview, MermaidPreview, ImagePreview, or CustomPreview). This is an experimental feature.
Returns:
-
Node–A Node object with mapped inputs, outputs and function.
Example:
import pandas as pd
import numpy as np
def clean_data(cars: pd.DataFrame, boats: pd.DataFrame) -> dict[str, pd.DataFrame]:
return dict(cars_df=cars.dropna(), boats_df=boats.dropna())
def halve_dataframe(data: pd.DataFrame) -> List[pd.DataFrame]:
return np.array_split(data, 2)
nodes = [
node(
clean_data,
inputs=["cars2017", "boats2017"],
outputs=dict(cars_df="clean_cars2017", boats_df="clean_boats2017"),
),
node(halve_dataframe, "clean_cars2017", ["train_cars2017", "test_cars2017"]),
node(
halve_dataframe,
dict(data="clean_boats2017"),
["train_boats2017", "test_boats2017"],
),
]
Source code in kedro/pipeline/node.py
780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 850 851 852 853 854 855 856 857 858 859 860 861 | |