Source code for kedro.framework.cli.pipeline

"""A collection of CLI commands for working with Kedro pipelines."""

from __future__ import annotations

import re
import shutil
from pathlib import Path
from textwrap import indent
from typing import TYPE_CHECKING, Any, NamedTuple

import click

import kedro
from kedro.framework.cli.utils import (
    KedroCliError,
    _clean_pycache,
    command_with_verbosity,
    env_option,
)
from kedro.framework.project import settings

if TYPE_CHECKING:
    from kedro.framework.startup import ProjectMetadata

_SETUP_PY_TEMPLATE = """# -*- coding: utf-8 -*-
from setuptools import setup, find_packages

setup(
    name="{name}",
    version="{version}",
    description="Modular pipeline `{name}`",
    packages=find_packages(),
    include_package_data=True,
    install_requires={install_requires},
)
"""


[docs] class PipelineArtifacts(NamedTuple): """An ordered collection of source_path, tests_path, config_paths""" pipeline_dir: Path pipeline_tests: Path pipeline_conf: Path
def _assert_pkg_name_ok(pkg_name: str) -> None: """Check that python package name is in line with PEP8 requirements. Args: pkg_name: Candidate Python package name. Raises: KedroCliError: If package name violates the requirements. """ base_message = f"'{pkg_name}' is not a valid Python package name." if not re.match(r"^[a-zA-Z_]", pkg_name): message = base_message + " It must start with a letter or underscore." raise KedroCliError(message) if len(pkg_name) < 2: # noqa: PLR2004 message = base_message + " It must be at least 2 characters long." raise KedroCliError(message) if not re.match(r"^\w+$", pkg_name[1:]): message = ( base_message + " It must contain only letters, digits, and/or underscores." ) raise KedroCliError(message) def _check_pipeline_name(ctx: click.Context, param: Any, value: str) -> str: if value: _assert_pkg_name_ok(value) return value @click.group(name="Kedro") def pipeline_cli() -> None: # pragma: no cover pass @pipeline_cli.group() def pipeline() -> None: """Commands for working with pipelines.""" @command_with_verbosity(pipeline, "create") @click.argument("name", nargs=1, callback=_check_pipeline_name) @click.option( "--skip-config", is_flag=True, help="Skip creation of config files for the new pipeline(s).", ) @click.option( "template_path", "-t", "--template", type=click.Path(file_okay=False, dir_okay=True, exists=True, path_type=Path), help="Path to cookiecutter template to use for pipeline(s). Will override any local templates.", ) @env_option(help="Environment to create pipeline configuration in. Defaults to `base`.") @click.pass_obj # this will pass the metadata as first argument def create_pipeline( metadata: ProjectMetadata, /, name: str, template_path: Path, skip_config: bool, env: str, **kwargs: Any, ) -> None: """Create a new modular pipeline by providing a name.""" package_dir = metadata.source_dir / metadata.package_name project_root = metadata.project_path / metadata.project_name conf_source = settings.CONF_SOURCE project_conf_path = metadata.project_path / conf_source base_env = settings.CONFIG_LOADER_ARGS.get("base_env", "base") env = env or base_env if not skip_config and not (project_conf_path / env).exists(): raise KedroCliError( f"Unable to locate environment '{env}'. " f"Make sure it exists in the project configuration." ) # Precedence for template_path is: command line > project templates/pipeline dir > global default # If passed on the CLI, click will verify that the path exists so no need to check again if template_path is None: # No path provided on the CLI, try `PROJECT_PATH/templates/pipeline` template_path = Path(metadata.project_path / "templates" / "pipeline") if not template_path.exists(): # and if that folder doesn't exist fall back to the global default template_path = Path(kedro.__file__).parent / "templates" / "pipeline" click.secho(f"Using pipeline template at: '{template_path}'") result_path = _create_pipeline(name, template_path, package_dir / "pipelines") _copy_pipeline_tests(name, result_path, project_root) _copy_pipeline_configs(result_path, project_conf_path, skip_config, env=env) click.secho(f"\nPipeline '{name}' was successfully created.\n", fg="green") @command_with_verbosity(pipeline, "delete") @click.argument("name", nargs=1, callback=_check_pipeline_name) @env_option( help="Environment to delete pipeline configuration from. Defaults to 'base'." ) @click.option( "-y", "--yes", is_flag=True, help="Confirm deletion of pipeline non-interactively." ) @click.pass_obj # this will pass the metadata as first argument def delete_pipeline( metadata: ProjectMetadata, /, name: str, env: str, yes: bool, **kwargs: Any ) -> None: """Delete a modular pipeline by providing a name.""" package_dir = metadata.source_dir / metadata.package_name conf_source = settings.CONF_SOURCE project_conf_path = metadata.project_path / conf_source base_env = settings.CONFIG_LOADER_ARGS.get("base_env", "base") env = env or base_env if not (project_conf_path / env).exists(): raise KedroCliError( f"Unable to locate environment '{env}'. " f"Make sure it exists in the project configuration." ) pipeline_artifacts = _get_pipeline_artifacts(metadata, pipeline_name=name, env=env) files_to_delete = [ pipeline_artifacts.pipeline_conf / filepath for confdir in ("parameters", "catalog") # Since we remove nesting in 'parameters' and 'catalog' folders, # we want to also del the old project's structure for backward compatibility for filepath in (Path(f"{confdir}_{name}.yml"), Path(confdir) / f"{name}.yml") if (pipeline_artifacts.pipeline_conf / filepath).is_file() ] dirs_to_delete = [ path for path in (pipeline_artifacts.pipeline_dir, pipeline_artifacts.pipeline_tests) if path.is_dir() ] if not files_to_delete and not dirs_to_delete: raise KedroCliError(f"Pipeline '{name}' not found.") if not yes: _echo_deletion_warning( "The following paths will be removed:", directories=dirs_to_delete, files=files_to_delete, ) click.echo() yes = click.confirm(f"Are you sure you want to delete pipeline '{name}'?") click.echo() if not yes: raise KedroCliError("Deletion aborted!") _delete_artifacts(*files_to_delete, *dirs_to_delete) click.secho(f"\nPipeline '{name}' was successfully deleted.", fg="green") click.secho( f"\nIf you added the pipeline '{name}' to 'register_pipelines()' in" f""" '{package_dir / "pipeline_registry.py"}', you will need to remove it.""", fg="yellow", ) def _echo_deletion_warning(message: str, **paths: list[Path]) -> None: paths = {key: values for key, values in paths.items() if values} if paths: click.secho(message, bold=True) for key, values in paths.items(): click.echo(f"\n{key.capitalize()}:") paths_str = "\n".join(str(value) for value in values) click.echo(indent(paths_str, " " * 2)) def _create_pipeline(name: str, template_path: Path, output_dir: Path) -> Path: from cookiecutter.main import cookiecutter cookie_context = {"pipeline_name": name, "kedro_version": kedro.__version__} click.echo(f"Creating the pipeline '{name}': ", nl=False) try: cookiecutter_result = cookiecutter( str(template_path), output_dir=str(output_dir), no_input=True, extra_context=cookie_context, ) except Exception as exc: click.secho("FAILED", fg="red") cls = exc.__class__ raise KedroCliError(f"{cls.__module__}.{cls.__qualname__}: {exc}") from exc click.secho("OK", fg="green") result_path = Path(cookiecutter_result) message = indent(f"Location: '{result_path.resolve()}'", " " * 2) click.secho(message, bold=True) _clean_pycache(result_path) return result_path def _sync_dirs( source: Path, target: Path, prefix: str = "", overwrite: bool = False ) -> None: """Recursively copies `source` directory (or file) into `target` directory without overwriting any existing files/directories in the target using the following rules: 1) Skip any files/directories which names match with files in target, unless overwrite=True. 2) Copy all files from source to target. 3) Recursively copy all directories from source to target. Args: source: A local directory to copy from, must exist. target: A local directory to copy to, will be created if doesn't exist yet. prefix: Prefix for CLI message indentation. """ existing = list(target.iterdir()) if target.is_dir() else [] existing_files = {f.name for f in existing if f.is_file()} existing_folders = {f.name for f in existing if f.is_dir()} if source.is_dir(): content = list(source.iterdir()) elif source.is_file(): content = [source] else: # nothing to copy content = [] # pragma: no cover for source_path in content: source_name = source_path.name target_path = target / source_name click.echo(indent(f"Creating '{target_path}': ", prefix), nl=False) if ( # rule #1 not overwrite and source_name in existing_files or source_path.is_file() and source_name in existing_folders ): click.secho("SKIPPED (already exists)", fg="yellow") elif source_path.is_file(): # rule #2 try: target.mkdir(exist_ok=True, parents=True) shutil.copyfile(str(source_path), str(target_path)) except Exception: click.secho("FAILED", fg="red") raise click.secho("OK", fg="green") else: # source_path is a directory, rule #3 click.echo() new_prefix = (prefix or "") + " " * 2 _sync_dirs(source_path, target_path, prefix=new_prefix) def _get_pipeline_artifacts( project_metadata: ProjectMetadata, pipeline_name: str, env: str ) -> PipelineArtifacts: artifacts = _get_artifacts_to_package( project_metadata, f"pipelines.{pipeline_name}", env ) return PipelineArtifacts(*artifacts) def _get_artifacts_to_package( project_metadata: ProjectMetadata, module_path: str, env: str ) -> tuple[Path, Path, Path]: """From existing project, returns in order: source_path, tests_path, config_paths""" package_dir = project_metadata.source_dir / project_metadata.package_name project_root = project_metadata.project_path project_conf_path = project_metadata.project_path / settings.CONF_SOURCE artifacts = ( Path(package_dir, *module_path.split(".")), Path(project_root, "tests", *module_path.split(".")), project_conf_path / env, ) return artifacts def _copy_pipeline_tests( pipeline_name: str, result_path: Path, project_root: Path ) -> None: tests_source = result_path / "tests" tests_target = project_root.parent / "tests" / "pipelines" / pipeline_name try: _sync_dirs(tests_source, tests_target) finally: shutil.rmtree(tests_source) def _copy_pipeline_configs( result_path: Path, conf_path: Path, skip_config: bool, env: str ) -> None: config_source = result_path / "config" try: if not skip_config: config_target = conf_path / env _sync_dirs(config_source, config_target) finally: shutil.rmtree(config_source) def _delete_artifacts(*artifacts: Path) -> None: for artifact in artifacts: click.echo(f"Deleting '{artifact}': ", nl=False) try: if artifact.is_dir(): shutil.rmtree(artifact) else: artifact.unlink() except Exception as exc: click.secho("FAILED", fg="red") cls = exc.__class__ raise KedroCliError(f"{cls.__module__}.{cls.__qualname__}: {exc}") from exc click.secho("OK", fg="green")