Skip to content

kedro.server

kedro.server

kedro.server provides an optional HTTP server for triggering Kedro pipelines.

Requires the server extra: pip install "kedro[server]".

Name Type Description
create_http_server Function Create and configure the FastAPI HTTP server application.

kedro.server.http_server.create_http_server

create_http_server(project_path=None, env=None, conf_source=None)

Create and configure the FastAPI HTTP server.

Programmatic values passed to this factory take precedence over environment-variable defaults set by the CLI.

Parameters:

  • project_path (str | Path | None, default: None ) –

    Optional path to the Kedro project to serve. If not provided, it will be resolved from KEDRO_PROJECT_PATH environment variable.

  • env (str | None, default: None ) –

    Optional Kedro environment to use. Overrides the KEDRO_SERVER_ENV environment variable if provided.

  • conf_source (str | None, default: None ) –

    Optional configuration source to use. Overrides the KEDRO_SERVER_CONF_SOURCE environment variable if provided.

Returns: Configured FastAPI application instance.

Source code in kedro/server/http_server.py
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
def create_http_server(
    project_path: str | Path | None = None,
    env: str | None = None,
    conf_source: str | None = None,
) -> FastAPI:
    """Create and configure the FastAPI HTTP server.

    Programmatic values passed to this factory take precedence over
    environment-variable defaults set by the CLI.

    Args:
        project_path: Optional path to the Kedro project to serve. If not provided,
            it will be resolved from KEDRO_PROJECT_PATH environment variable.
        env: Optional Kedro environment to use. Overrides the KEDRO_SERVER_ENV environment variable if provided.
        conf_source: Optional configuration source to use. Overrides the KEDRO_SERVER_CONF_SOURCE environment variable if provided.
    Returns:
        Configured FastAPI application instance.
    """
    resolved_env = env if env is not None else os.environ.get(KEDRO_SERVER_ENV)
    resolved_conf_source = (
        conf_source
        if conf_source is not None
        else os.environ.get(KEDRO_SERVER_CONF_SOURCE)
    )

    app = FastAPI(
        title="Kedro Server",
        description="HTTP API for triggering pipeline runs, inspecting project metadata, and more",
        version=kedro_version,
        lifespan=lifespan,
    )
    app.state.default_env = resolved_env
    app.state.default_conf_source = resolved_conf_source
    app.state.project_path = _resolve_project_path(project_path)
    app.state.session_lock = threading.Lock()

    @app.get("/health", response_model=HealthResponse, tags=["health"])
    async def health() -> HealthResponse:
        """Health check endpoint.

        Returns server status and Kedro version information.
        """
        return HealthResponse(
            status="healthy",
            kedro_version=kedro_version,
        )

    @app.get("/snapshot", response_model=SnapshotResponse, tags=["inspection"])
    def get_snapshot() -> SnapshotResponse:
        """Return a read-only snapshot of the Kedro project.

        Uses the server-level environment configured at startup (equivalent to
        the ``env`` passed to ``create_http_server`` or the ``KEDRO_SERVER_ENV``
        environment variable).

        Returns:
            `SnapshotResponse` with project metadata, pipelines, datasets,
            and parameter keys, or error details on failure.
        """
        try:
            snapshot = get_project_snapshot(
                env=app.state.default_env,
                conf_source=app.state.default_conf_source,
                metadata=app.state.metadata,
            )
            return SnapshotSuccess(
                status="success",
                metadata=snapshot.metadata,
                pipelines=snapshot.pipelines,
                datasets=snapshot.datasets,
                parameters=snapshot.parameters,
            )
        except Exception as exc:
            logger.error("Snapshot request failed: %s", str(exc), exc_info=True)
            return SnapshotFailure(
                status="failure",
                error=ErrorDetail(
                    type=type(exc).__qualname__,
                    message=str(exc),
                ),
            )

    @app.post("/run", response_model=RunResponse, tags=["pipeline"])
    def run_pipeline(request: RunRequest) -> RunResponse:
        """Execute a Kedro pipeline.

        This endpoint accepts a `RunRequest` with parameters for pipeline execution,
        runs the pipeline using the session, and returns a `RunResponse` with the result.
        First run request creates a new `KedroServiceSession`, and subsequent requests reuse it.

        Args:
            request: Pipeline execution parameters.

        Returns:
            `RunResponse` with run_id, status, duration, and optional error details.
        """
        with app.state.session_lock:
            if not hasattr(app.state, "session"):
                app.state.session = KedroServiceSession.create(
                    project_path=app.state.project_path,
                    env=app.state.default_env,
                    conf_source=app.state.default_conf_source,
                )

        return _execute_pipeline(session=app.state.session, request=request)

    return app