Skip to content

AbstractRunner

kedro.runner.AbstractRunner

AbstractRunner(is_async=False)

Bases: ABC

AbstractRunner is the base class for all Pipeline runner implementations.

Parameters:

  • is_async (bool, default: False ) –

    If True, the node inputs and outputs are loaded and saved asynchronously with threads. Defaults to False.

Source code in kedro/runner/runner.py
44
45
46
47
48
49
50
51
52
53
54
def __init__(
    self,
    is_async: bool = False,
):
    """Instantiates the runner class.

    Args:
        is_async: If True, the node inputs and outputs are loaded and saved
            asynchronously with threads. Defaults to False.
    """
    self._is_async = is_async

_is_async instance-attribute

_is_async = is_async

_logger property

_logger

_get_executor abstractmethod

_get_executor(max_workers)

Abstract method to provide the correct executor (e.g., ThreadPoolExecutor, ProcessPoolExecutor or None if running sequentially).

Source code in kedro/runner/runner.py
148
149
150
151
@abstractmethod  # pragma: no cover
def _get_executor(self, max_workers: int) -> Executor | None:
    """Abstract method to provide the correct executor (e.g., ThreadPoolExecutor, ProcessPoolExecutor or None if running sequentially)."""
    pass

_get_required_workers_count

_get_required_workers_count(pipeline)
Source code in kedro/runner/runner.py
334
335
def _get_required_workers_count(self, pipeline: Pipeline) -> int:
    return 1

_raise_runtime_error staticmethod

_raise_runtime_error(todo_nodes, done_nodes, ready, done)
Source code in kedro/runner/runner.py
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
@staticmethod
def _raise_runtime_error(
    todo_nodes: set[Node],
    done_nodes: set[Node],
    ready: set[Node],
    done: set[Future[Node]] | None,
) -> None:
    debug_data = {
        "todo_nodes": todo_nodes,
        "done_nodes": done_nodes,
        "ready_nodes": ready,
        "done_futures": done,
    }
    debug_data_str = "\n".join(f"{k} = {v}" for k, v in debug_data.items())
    raise RuntimeError(
        f"Unable to schedule new tasks although some nodes "
        f"have not been run:\n{debug_data_str}"
    )

_release_datasets staticmethod

_release_datasets(node, catalog, load_counts, pipeline)

Decrement dataset load counts and release any datasets we've finished with

Source code in kedro/runner/runner.py
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
@staticmethod
def _release_datasets(
    node: Node,
    catalog: CatalogProtocol | SharedMemoryCatalogProtocol,
    load_counts: dict,
    pipeline: Pipeline,
) -> None:
    """Decrement dataset load counts and release any datasets we've finished with"""
    for dataset in node.inputs:
        load_counts[dataset] -= 1
        if load_counts[dataset] < 1 and dataset not in pipeline.inputs():
            catalog.release(dataset)
    for dataset in node.outputs:
        if load_counts[dataset] < 1 and dataset not in pipeline.outputs():
            catalog.release(dataset)

_run abstractmethod

_run(pipeline, catalog, hook_manager=None, run_id=None)

The abstract interface for running pipelines, assuming that the inputs have already been checked and normalized by run(). This contains the Common pipeline execution logic using an executor.

Parameters:

  • pipeline (Pipeline) –

    The Pipeline to run.

  • catalog (CatalogProtocol | SharedMemoryCatalogProtocol) –

    An implemented instance of CatalogProtocol or SharedMemoryCatalogProtocol from which to fetch data.

  • hook_manager (PluginManager | None, default: None ) –

    The PluginManager to activate hooks.

  • run_id (str | None, default: None ) –

    The id of the run.

Source code in kedro/runner/runner.py
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
@abstractmethod  # pragma: no cover
def _run(
    self,
    pipeline: Pipeline,
    catalog: CatalogProtocol | SharedMemoryCatalogProtocol,
    hook_manager: PluginManager | None = None,
    run_id: str | None = None,
) -> None:
    """The abstract interface for running pipelines, assuming that the
     inputs have already been checked and normalized by run().
     This contains the Common pipeline execution logic using an executor.

    Args:
        pipeline: The ``Pipeline`` to run.
        catalog: An implemented instance of ``CatalogProtocol`` or ``SharedMemoryCatalogProtocol`` from which to fetch data.
        hook_manager: The ``PluginManager`` to activate hooks.
        run_id: The id of the run.
    """

    nodes = pipeline.nodes

    self._validate_catalog(catalog)
    self._validate_nodes(nodes)
    self._set_manager_datasets(catalog)

    load_counts = Counter(chain.from_iterable(n.inputs for n in pipeline.nodes))
    node_dependencies = pipeline.node_dependencies
    todo_nodes = set(node_dependencies.keys())
    done_nodes: set[Node] = set()
    futures = set()
    done = None
    max_workers = self._get_required_workers_count(pipeline)

    pool = self._get_executor(max_workers)
    if pool is None:
        for exec_index, node in enumerate(nodes):
            try:
                Task(
                    node=node,
                    catalog=catalog,
                    hook_manager=hook_manager,
                    is_async=self._is_async,
                    run_id=run_id,
                ).execute()
                done_nodes.add(node)
            except Exception:
                self._suggest_resume_scenario(pipeline, done_nodes, catalog)
                raise
            self._logger.info("Completed node: %s", node.name)
            self._logger.info(
                "Completed %d out of %d tasks", len(done_nodes), len(nodes)
            )
            self._release_datasets(node, catalog, load_counts, pipeline)

        return  # Exit early since everything runs sequentially

    with pool as executor:
        while True:
            ready = {n for n in todo_nodes if node_dependencies[n] <= done_nodes}
            todo_nodes -= ready
            for node in ready:
                task = Task(
                    node=node,
                    catalog=catalog,
                    hook_manager=hook_manager,
                    is_async=self._is_async,
                    run_id=run_id,
                )
                if isinstance(executor, ProcessPoolExecutor):
                    task.parallel = True
                futures.add(executor.submit(task))
            if not futures:
                if todo_nodes:
                    self._raise_runtime_error(todo_nodes, done_nodes, ready, done)
                break
            done, futures = wait(futures, return_when=FIRST_COMPLETED)
            for future in done:
                try:
                    node = future.result()
                except Exception:
                    self._suggest_resume_scenario(pipeline, done_nodes, catalog)
                    raise
                done_nodes.add(node)
                self._logger.info("Completed node: %s", node.name)
                self._logger.info(
                    "Completed %d out of %d tasks", len(done_nodes), len(nodes)
                )
                self._release_datasets(node, catalog, load_counts, pipeline)

_set_manager_datasets

_set_manager_datasets(catalog)
Source code in kedro/runner/runner.py
328
329
330
331
332
def _set_manager_datasets(
    self, catalog: CatalogProtocol | SharedMemoryCatalogProtocol
) -> None:
    # Set up any necessary manager datasets here
    pass

_suggest_resume_scenario

_suggest_resume_scenario(pipeline, done_nodes, catalog)

Suggest a command to the user to resume a run after it fails. The run should be started from the point closest to the failure for which persisted input exists.

Parameters:

  • pipeline (Pipeline) –

    the Pipeline of the run.

  • done_nodes (Iterable[Node]) –

    the Nodes that executed successfully.

  • catalog (CatalogProtocol | SharedMemoryCatalogProtocol) –

    an implemented instance of CatalogProtocol or SharedMemoryCatalogProtocol of the run.

Source code in kedro/runner/runner.py
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
def _suggest_resume_scenario(
    self,
    pipeline: Pipeline,
    done_nodes: Iterable[Node],
    catalog: CatalogProtocol | SharedMemoryCatalogProtocol,
) -> None:
    """
    Suggest a command to the user to resume a run after it fails.
    The run should be started from the point closest to the failure
    for which persisted input exists.

    Args:
        pipeline: the ``Pipeline`` of the run.
        done_nodes: the ``Node``s that executed successfully.
        catalog: an implemented instance of ``CatalogProtocol`` or ``SharedMemoryCatalogProtocol`` of the run.

    """
    remaining_nodes = set(pipeline.nodes) - set(done_nodes)

    postfix = ""
    if done_nodes:
        start_node_names = _find_nodes_to_resume_from(
            pipeline=pipeline,
            unfinished_nodes=remaining_nodes,
            catalog=catalog,
        )
        start_nodes_str = ",".join(sorted(start_node_names))
        postfix += f'  --from-nodes "{start_nodes_str}"'

    if not postfix:
        self._logger.warning(
            "No nodes ran. Repeat the previous command to attempt a new run."
        )
    else:
        self._logger.warning(
            f"There are {len(remaining_nodes)} nodes that have not run.\n"
            "You can resume the pipeline run from the nearest nodes with "
            "persisted inputs by adding the following "
            f"argument to your previous command:\n{postfix}"
        )

_validate_catalog

_validate_catalog(catalog)
Source code in kedro/runner/runner.py
318
319
320
321
322
def _validate_catalog(
    self, catalog: CatalogProtocol | SharedMemoryCatalogProtocol
) -> None:
    # Add catalog validation logic here if needed
    pass

_validate_max_workers classmethod

_validate_max_workers(max_workers)

Validates and returns the number of workers. Sets to os.cpu_count() or 1 if max_workers is None, and limits max_workers to 61 on Windows.

Parameters:

  • max_workers (int | None) –

    Desired number of workers. If None, defaults to os.cpu_count() or 1.

Returns:

  • int

    A valid number of workers to use.

Raises:

  • ValueError

    If max_workers is set and is not positive.

Source code in kedro/runner/runner.py
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
@classmethod
def _validate_max_workers(cls, max_workers: int | None) -> int:
    """
    Validates and returns the number of workers. Sets to os.cpu_count() or 1 if max_workers is None,
    and limits max_workers to 61 on Windows.

    Args:
        max_workers: Desired number of workers. If None, defaults to os.cpu_count() or 1.

    Returns:
        A valid number of workers to use.

    Raises:
        ValueError: If max_workers is set and is not positive.
    """
    if max_workers is None:
        max_workers = os.cpu_count() or 1
        if sys.platform == "win32":
            max_workers = min(_MAX_WINDOWS_WORKERS, max_workers)
    elif max_workers <= 0:
        raise ValueError("max_workers should be positive")

    return max_workers

_validate_nodes

_validate_nodes(node)
Source code in kedro/runner/runner.py
324
325
326
def _validate_nodes(self, node: Iterable[Node]) -> None:
    # Add node validation logic here if needed
    pass

run

run(pipeline, catalog, hook_manager=None, run_id=None)

Run the Pipeline using the datasets provided by catalog and save results back to the same objects.

Parameters:

  • pipeline (Pipeline) –

    The Pipeline to run.

  • catalog (CatalogProtocol | SharedMemoryCatalogProtocol) –

    An implemented instance of CatalogProtocol or SharedMemoryCatalogProtocol from which to fetch data.

  • hook_manager (PluginManager | None, default: None ) –

    The PluginManager to activate hooks.

  • run_id (str | None, default: None ) –

    The id of the run.

Raises:

  • ValueError

    Raised when Pipeline inputs cannot be satisfied.

Returns:

  • dict[str, Any]

    Dictionary with pipeline outputs, where keys are dataset names

  • dict[str, Any]

    and values are dataset object.

Source code in kedro/runner/runner.py
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
def run(
    self,
    pipeline: Pipeline,
    catalog: CatalogProtocol | SharedMemoryCatalogProtocol,
    hook_manager: PluginManager | None = None,
    run_id: str | None = None,
) -> dict[str, Any]:
    """Run the ``Pipeline`` using the datasets provided by ``catalog``
    and save results back to the same objects.

    Args:
        pipeline: The ``Pipeline`` to run.
        catalog: An implemented instance of ``CatalogProtocol`` or ``SharedMemoryCatalogProtocol`` from which to fetch data.
        hook_manager: The ``PluginManager`` to activate hooks.
        run_id: The id of the run.

    Raises:
        ValueError: Raised when ``Pipeline`` inputs cannot be satisfied.

    Returns:
        Dictionary with pipeline outputs, where keys are dataset names
        and values are dataset object.
    """
    # Run a warm-up to materialize all datasets in the catalog before run
    for ds in pipeline.datasets():
        _ = catalog.get(ds, fallback_to_runtime_pattern=True)

    hook_or_null_manager = hook_manager or _NullPluginManager()

    if self._is_async:
        self._logger.info(
            "Asynchronous mode is enabled for loading and saving data."
        )

    start_time = perf_counter()
    self._run(pipeline, catalog, hook_or_null_manager, run_id)  # type: ignore[arg-type]
    end_time = perf_counter()
    run_duration = end_time - start_time

    self._logger.info(
        f"Pipeline execution completed successfully in {run_duration:.1f} sec."
    )

    # Now we return all pipeline outputs, but we do not load datasets data
    run_output = {ds_name: catalog[ds_name] for ds_name in pipeline.outputs()}

    return run_output

run_only_missing

run_only_missing(pipeline, catalog, hook_manager=None)

Run only the missing outputs from the Pipeline using the datasets provided by catalog, and save results back to the same objects.

Parameters:

  • pipeline (Pipeline) –

    The Pipeline to run.

  • catalog (CatalogProtocol | SharedMemoryCatalogProtocol) –

    An implemented instance of CatalogProtocol or SharedMemoryCatalogProtocol from which to fetch data.

  • hook_manager (PluginManager | None, default: None ) –

    The PluginManager to activate hooks.

Raises: ValueError: Raised when Pipeline inputs cannot be satisfied.

Returns:

  • dict[str, Any]

    Any node outputs that cannot be processed by the

  • dict[str, Any]

    catalog. These are returned in a dictionary, where

  • dict[str, Any]

    the keys are defined by the node outputs.

Source code in kedro/runner/runner.py
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
def run_only_missing(
    self,
    pipeline: Pipeline,
    catalog: CatalogProtocol | SharedMemoryCatalogProtocol,
    hook_manager: PluginManager | None = None,
) -> dict[str, Any]:
    """Run only the missing outputs from the ``Pipeline`` using the
    datasets provided by ``catalog``, and save results back to the
    same objects.

    Args:
        pipeline: The ``Pipeline`` to run.
        catalog: An implemented instance of ``CatalogProtocol`` or ``SharedMemoryCatalogProtocol`` from which to fetch data.
        hook_manager: The ``PluginManager`` to activate hooks.
    Raises:
        ValueError: Raised when ``Pipeline`` inputs cannot be
            satisfied.

    Returns:
        Any node outputs that cannot be processed by the
        catalog. These are returned in a dictionary, where
        the keys are defined by the node outputs.

    """
    free_outputs = pipeline.outputs() - set(catalog.keys())
    missing = {ds for ds in catalog if not catalog.exists(ds)}
    to_build = free_outputs | missing
    to_rerun = pipeline.only_nodes_with_outputs(*to_build) + pipeline.from_inputs(
        *to_build
    )

    # We also need any missing datasets that are required to run the
    # `to_rerun` pipeline, including any chains of missing datasets.
    unregistered_ds = pipeline.datasets() - set(catalog.keys())
    output_to_unregistered = pipeline.only_nodes_with_outputs(*unregistered_ds)
    input_from_unregistered = to_rerun.inputs() & unregistered_ds
    to_rerun += output_to_unregistered.to_outputs(*input_from_unregistered)

    return self.run(to_rerun, catalog, hook_manager)