Source code for kedro.runner.sequential_runner

"""``SequentialRunner`` is an ``AbstractRunner`` implementation. It can be
used to run the ``Pipeline`` in a sequential manner using a topological sort
of provided nodes.
from __future__ import annotations

from collections import Counter
from itertools import chain
from typing import Any

from pluggy import PluginManager

from import DataCatalog
from kedro.pipeline import Pipeline
from kedro.runner.runner import AbstractRunner, run_node

[docs] class SequentialRunner(AbstractRunner): """``SequentialRunner`` is an ``AbstractRunner`` implementation. It can be used to run the ``Pipeline`` in a sequential manner using a topological sort of provided nodes. """
[docs] def __init__( self, is_async: bool = False, extra_dataset_patterns: dict[str, dict[str, Any]] | None = None, ): """Instantiates the runner class. Args: is_async: If True, the node inputs and outputs are loaded and saved asynchronously with threads. Defaults to False. extra_dataset_patterns: Extra dataset factory patterns to be added to the DataCatalog during the run. This is used to set the default datasets to MemoryDataset for `SequentialRunner`. """ default_dataset_pattern = {"{default}": {"type": "MemoryDataset"}} self._extra_dataset_patterns = extra_dataset_patterns or default_dataset_pattern super().__init__( is_async=is_async, extra_dataset_patterns=self._extra_dataset_patterns )
def _run( self, pipeline: Pipeline, catalog: DataCatalog, hook_manager: PluginManager, session_id: str | None = None, ) -> None: """The method implementing sequential pipeline running. Args: pipeline: The ``Pipeline`` to run. catalog: The ``DataCatalog`` from which to fetch data. hook_manager: The ``PluginManager`` to activate hooks. session_id: The id of the session. Raises: Exception: in case of any downstream node failure. """ if not self._is_async: "Using synchronous mode for loading and saving data. Use the --async flag " "for potential performance gains." ) nodes = pipeline.nodes done_nodes = set() load_counts = Counter(chain.from_iterable(n.inputs for n in nodes)) for exec_index, node in enumerate(nodes): try: run_node(node, catalog, hook_manager, self._is_async, session_id) done_nodes.add(node) except Exception: self._suggest_resume_scenario(pipeline, done_nodes, catalog) raise # decrement load counts and release any data sets we've finished with for dataset in node.inputs: load_counts[dataset] -= 1 if load_counts[dataset] < 1 and dataset not in pipeline.inputs(): catalog.release(dataset) for dataset in node.outputs: if load_counts[dataset] < 1 and dataset not in pipeline.outputs(): catalog.release(dataset) "Completed %d out of %d tasks", exec_index + 1, len(nodes) )