Skip to content

How to validate data using Pandera in your Kedro workflow

Pandera is an open-source data validation library that enables schema validation with expressive and flexible checks. It supports pandas, PySpark, Polars, Dask, Modin, and other dataframe libraries. It helps ensure data quality by validating types, ranges, business rules, and statistical properties of your datasets.

Adding Pandera to a Kedro project enables you to catch data quality issues early in your pipeline. For example, you can validate that CSV columns have the correct types and enforce business rules (like "price must be positive"). When collaborating with others on a Kedro project, Pandera schemas serve as documentation of your data contracts.

Prerequisites

You will need the following:

  • A working Kedro project. The examples in this document assume the spaceflights-pandas starter. If you're unfamiliar with the Spaceflights project, check out our tutorial.
  • Pandera installed into the project.

To set yourself up, create a new Kedro project:

kedro new --starter=spaceflights-pandas --name spaceflights-pandera

Navigate to your project directory and add Pandera with pandas support to your requirements.txt:

pandera[pandas]>=0.18.0

Install the project dependencies:

uv pip install -r requirements.txt

Use cases

This section explains how you can use Pandera to validate your Kedro datasets.

Basic validation of Kedro datasets using Pandera schemas

The simplest way to add validation is to define schemas for your datasets and validate them using Kedro hooks.

First, create a directory structure for your schemas:

mkdir -p src/spaceflights_pandera/schemas
touch src/spaceflights_pandera/schemas/__init__.py

Create src/spaceflights_pandera/schemas/raw.py with validation schemas for your raw datasets:

"""Pandera schemas for raw data validation."""
from pandera import Column, DataFrameSchema, Check

companies_schema = DataFrameSchema(
    columns={
        "id": Column(
            int,
            nullable=False,
            unique=True,
        ),
        "total_fleet_count": Column(
            float,
            nullable=True,
            checks=Check.greater_than_or_equal_to(0),
        ),
        "iata_approved": Column(
            str,
            nullable=True,
            checks=Check.isin(["t", "f"]),
        ),
    },
    strict=False,  # Allow additional columns not in schema
    name="companies_raw",
)

shuttles_schema = DataFrameSchema(
    columns={
        "id": Column(int, nullable=False, unique=True),
        "passenger_capacity": Column(
            int,
            nullable=False,
            checks=Check.greater_than_or_equal_to(1),
        ),
        "engines": Column(
            float,  # Use float to handle nullable integers (NaN compatibility)
            nullable=True,
            checks=Check.greater_than_or_equal_to(0),
        ),
    },
    strict=False,
    name="shuttles_raw",
)

reviews_schema = DataFrameSchema(
    columns={
        "shuttle_id": Column(int, nullable=False, unique=True),
        "review_scores_rating": Column(
            float,
            nullable=True,
            checks=Check.in_range(0, 100),
        ),
        "number_of_reviews": Column(
            int,
            nullable=True,
            checks=Check.greater_than_or_equal_to(0),
        ),
    },
    strict=False,
    name="reviews_raw",
)

Tip

Alternative schema style: Instead of defining schemas as DataFrameSchema objects, Pandera also supports a Pydantic-like API with DataFrameModel classes and type annotations. This can give you IDE/type-checking support. See the Typed Schemas guide.

Now create a hook to validate datasets. Create src/spaceflights_pandera/hooks/validation.py:

"""Kedro hooks for Pandera validation integration."""
import logging
from typing import Any, Dict

from kedro.framework.hooks import hook_impl
from kedro.io import DataCatalog
from pandera.errors import SchemaError, SchemaErrors

from ..schemas.raw import companies_schema, shuttles_schema, reviews_schema

logger = logging.getLogger(__name__)


class PanderaValidationHook:
    """Kedro hook that integrates Pandera validation."""

    def __init__(self):
        # Map catalog dataset names to their validation schemas
        self.raw_schemas = {
            "companies": companies_schema,
            "shuttles": shuttles_schema,
            "reviews": reviews_schema,
        }

    @hook_impl
    def before_node_run(
        self,
        node,
        catalog: DataCatalog,
        inputs: Dict[str, Any],
        is_async: bool,
    ) -> Dict[str, Any]:
        """Validate input data before node execution."""
        validated_inputs = {}

        for input_name, data in inputs.items():
            schema = self.raw_schemas.get(input_name)

            if schema is None:
                validated_inputs[input_name] = data
                continue

            logger.info(f"Validating '{input_name}' for node '{node.name}'")

            try:
                validated_data = schema.validate(data, lazy=True)
                logger.info(f"✓ Validation passed for '{input_name}'")
                validated_inputs[input_name] = validated_data
            except (SchemaError, SchemaErrors) as e:
                logger.error("Validation failed for '%s': %s", input_name, e)
                raise

        return validated_inputs

Register the hook in src/spaceflights_pandera/settings.py:

"""Project settings."""
from spaceflights_pandera.hooks.validation import PanderaValidationHook

HOOKS = (PanderaValidationHook(),)

From this point, when you execute kedro run you will see the validation logs:

[10/01/25 11:10:13] INFO     Validating 'companies' for node 'preprocess_companies_node'
                    INFO     ✓ Validation passed for 'companies'
                    INFO     Validating 'shuttles' for node 'preprocess_shuttles_node'
                    INFO     ✓ Validation passed for 'shuttles'
                    INFO     Validating 'reviews' for node 'create_model_input_table_node'
                    INFO     ✓ Validation passed for 'reviews'

What happens when validation fails?

Let's make one schema rule fail on purpose so you can see Pandera in action.

In src/spaceflights_pandera/schemas/raw.py, change the total_fleet_count column check:

- checks=Check.greater_than_or_equal_to(0),
+ checks=Check.greater_than(100),  # intentionally to fail

Now run the pipeline again:

kedro run

You should see Pandera raise a SchemaErrors exception, reporting which rows failed:

SchemaErrors: Column 'total_fleet_count' failed check 'greater_than_or_equal_to(100)'
failure cases: [1.0, 2.0, 5.0, ...]

Because the hook uses lazy=True, Pandera collects all errors at one go, making it easier to spot every problem.

Validating data before saving with before_dataset_saved

So far, we validated datasets when they were loaded into a node (before_node_run). You can also validate data before it is written back to the catalog, using before_dataset_saved.

The main difference is in what Kedro passes to the hook:

  • before_node_run → gets all node inputs as a dictionary {dataset_name: data}
  • before_dataset_saved → gets a single dataset name and data being saved
@hook_impl
def before_dataset_saved(self, dataset_name: str, data: Any) -> None:
    """Validate data before saving to catalog."""
    schema = self.raw_schemas.get(dataset_name)
    if schema:
        schema.validate(data, lazy=True)

Info

  • Use before_node_run if you want to guarantee that downstream nodes always receive valid inputs.
  • Use before_dataset_saved if you want to enforce contracts on the data you persist to the catalog (for example, preventing invalid data from being stored in S3, a database, or parquet files).
  • You can also combine both hooks to validate at different stages of the pipeline.

Tip

Function-level validation Instead of using Kedro hooks, you can validate directly at function boundaries with decorators:

  • @pa.check_input
  • @pa.check_output
  • @pa.check_io

This can be useful for validating node functions in isolation.

Validating inside a Kedro node

If you prefer the validation step to be an explicit node in the pipeline, you can create a validation node that uses your Pandera schemas. It will fail the run when validation does not pass.

Add a node to src/spaceflights_pandera/pipelines/data_processing/nodes.py:

import pandas as pd
from pandera.errors import SchemaErrors
from schemas.raw import companies_schema, shuttles_schema, reviews_schema

def validate_datasets(companies: pd.DataFrame, shuttles: pd.DataFrame, reviews: pd.DataFrame) -> None:
    """Validate multiple datasets using Pandera schemas and raise on failure.

    Args:
        companies: Companies dataframe.
        shuttles: Shuttles dataframe.
        reviews: Reviews dataframe.

    Raises:
        pandera.errors.SchemaErrors: If any validation fails.
    """

    EXPECTED = {
        "companies": (companies_schema, companies),
        "shuttles": (shuttles_schema, shuttles),
        "reviews": (reviews_schema, reviews),
    }

    for name, (schema, df) in EXPECTED.items():
        try:
            schema.validate(df, lazy=True)
        except SchemaErrors:
            # Re-raise so Kedro stops the pipeline; you can customize logging or
            # aggregate results before raising if you prefer non-eager behaviour.
            raise

Then add the node to your pipeline (example pipeline change):

def create_pipeline(**kwargs) -> Pipeline:
    return Pipeline(
        [
            Node(
                func=validate_datasets,
                inputs=["companies", "shuttles", "reviews"],
                outputs=None,
                name="validate_datasets_node",
            ),
            # ...existing nodes...
        ]
    )
  • Use a validation node when you want the validation step visible in the DAG and to create explicit validated outputs (you can also make validation nodes pass data through by returning validated data).
  • Prefer loading schemas from the schemas module (as shown) or from config; avoid hard-coding rules inside hook/node bodies.
  • Decide on eager vs. lazy behaviour: this example uses lazy=True to collect all errors; you can switch to lazy=False for fail-fast behaviour.

Advanced use cases

Controlling validation behaviour with environment variables

You can control validation behaviour without modifying code by using environment variables.

Create configuration helpers in src/spaceflights_pandera/schemas/__init__.py:

"""Configuration helpers for validation behaviour."""
import os


def is_validation_enabled() -> bool:
    """Control validation via VALIDATION_ENABLED environment variable.

    Set to 'false' to disable all validation (useful for quick debugging).
    Default: true
    """
    return os.getenv("VALIDATION_ENABLED", "true").lower() == "true"


def should_fail_on_error() -> bool:
    """Control whether validation errors stop the pipeline.

    - true: Pipeline stops on validation errors (recommended for CI/CD)
    - false: Log errors but continue (useful for data exploration)

    Default: true
    """
    return os.getenv("VALIDATION_FAIL_ON_ERROR", "true").lower() == "true"


def get_lazy_flag() -> bool:
    """Control Pandera's lazy validation mode.

    - true: Collect ALL validation errors before failing (better error visibility)
    - false: Fail on first validation error (faster for CI/CD)

    Set via VALIDATION_LAZY environment variable.
    Default: true (collect all errors)
    """
    return os.getenv("VALIDATION_LAZY", "true").lower() == "true"

Update your hook to use these configuration helpers:

from pandera.errors import SchemaError, SchemaErrors
from ..schemas import is_validation_enabled, should_fail_on_error, get_lazy_flag

class PanderaValidationHook:
    def __init__(self):
        self.enabled = is_validation_enabled()
        self.fail_on_error = should_fail_on_error()
        self.lazy = get_lazy_flag()

        self.raw_schemas = {
            "companies": companies_schema,
            "shuttles": shuttles_schema,
            "reviews": reviews_schema,
        }

    @hook_impl
    def before_node_run(
        self,
        node,
        catalog: DataCatalog,
        inputs: Dict[str, Any],
        is_async: bool,
    ) -> Dict[str, Any]:
        """Validate input data before node execution."""

        if not self.enabled:
            return inputs

        validated_inputs = {}

        for input_name, data in inputs.items():
            schema = self.raw_schemas.get(input_name)

            if schema is None:
                validated_inputs[input_name] = data
                continue

            logger.info(f"Validating '{input_name}' for node '{node.name}'")

            try:
                validated_data = schema.validate(data, lazy=self.lazy)
                logger.info(f"✓ Validation passed for '{input_name}'")
                validated_inputs[input_name] = validated_data
            except (SchemaError, SchemaErrors) as e:
                logger.error("Validation failed for '%s': %s", input_name, e)
                if self.fail_on_error:
                    raise
                else:
                    # Exploration mode: continue with original data
                    logger.warning(
                        "Continuing despite validation errors because "
                        "VALIDATION_FAIL_ON_ERROR=false"
                    )
                    validated_inputs[input_name] = data
            except (TypeError, AttributeError):
                # Not a validatable type, pass through unchanged
                validated_inputs[input_name] = data

        return {**inputs, **validated_inputs}

Now you can control validation behaviour:

# Disable validation entirely (for quick debugging)
export VALIDATION_ENABLED=false
kedro run

# Log errors but don't stop pipeline (for data exploration)
export VALIDATION_FAIL_ON_ERROR=false
kedro run

# Fail on first error (for faster CI/CD)
export VALIDATION_LAZY=false
kedro run

Caching validation results: To avoid re-validating datasets used by multiple nodes, add caching to your hook:

class PanderaValidationHook:
    def __init__(self):
        # ... existing init code ...
        self._validated_datasets = set()  # Add this line

    @hook_impl
    def after_context_created(self, context) -> None:
        """Reset cache at pipeline start."""
        self._validated_datasets.clear()

    @hook_impl
    def before_node_run(self, node, catalog, inputs, is_async):
        # ... existing code ...
        for input_name, data in inputs.items():
            # Skip if already validated
            if input_name in self._validated_datasets:
                validated_inputs[input_name] = data
                continue

            # ... validation logic ...

            # After successful validation:
            self._validated_datasets.add(input_name)

Enhanced error reporting for validation failures

By default, Pandera error messages can be verbose. You can improve error reporting by parsing the SchemaError and logging the most relevant information.

Add this helper method to your PanderaValidationHook class:

def _handle_validation_error(self, dataset_name: str, error: SchemaError) -> None:
    """Parse and log validation errors in a readable format."""
    logger.error(f"{'='*60}")
    logger.error(f"VALIDATION FAILED: {dataset_name}")
    logger.error(f"{'='*60}")

    # Extract key error information
    if hasattr(error, 'failure_cases') and error.failure_cases is not None:
        logger.error(f"\nFailed checks summary:")
        logger.error(f"\n{error.failure_cases.to_string()}")

    logger.error(f"\nFull error:\n{error}")
    logger.error(f"{'='*60}")

    if self.fail_on_error:
        raise

Then use it in your validation logic:

try:
    validated_data = schema.validate(data, lazy=schema.lazy)
    logger.info(f"✓ Validation passed for '{input_name}'")
    validated_inputs[input_name] = validated_data
except SchemaError as e:
    self._handle_validation_error(input_name, e)
    validated_inputs[input_name] = data

DataFrame-level checks for business rules

You can add custom business rules that validate relationships across rows or columns. For example, let's add a rule: "Type F5 shuttles cannot have more than 16 passengers."

In your schema file, define a custom check function:

from pandera import Column, DataFrameSchema, Check
import pandas as pd

def check_f5_shuttle_capacity(df: pd.DataFrame) -> bool:
    """Business rule: Type F5 shuttles cannot have more than 16 passengers."""
    is_f5 = df["shuttle_type"] == "Type F5"
    has_valid_capacity = df["passenger_capacity"] <= 16
    return (~is_f5 | has_valid_capacity).all()

shuttles_schema = DataFrameSchema(
    columns={
        "id": Column(int, nullable=False, unique=True),
        "shuttle_type": Column(str, nullable=True),
        "passenger_capacity": Column(int, nullable=False),
    },
    checks=[
        Check(
            check_f5_shuttle_capacity,
            error="Type F5 shuttles cannot have more than 16 passengers"
        ),
    ],
    strict=False,
    name="shuttles_raw",
)

Tip

Distribution checks and hypothesis testing Beyond basic rules, Pandera supports statistical hypothesis tests (for example two-sample tests) to validate whether two datasets come from the same distribution. This is useful for detecting data drift between training and serving environments. See Hypothesis Testing.

Writing tests for your schemas

To ensure your schemas match your actual data, write tests.

Create src/spaceflights_pandera/schemas/test_schemas.py:

"""Test utilities for validating schemas against sample data.

Run with: python -m pytest src/spaceflights_pandera/schemas/test_schemas.py -v
"""
import pandas as pd
import pytest
from pandera.errors import SchemaError, SchemaErrors

from .raw import companies_schema


class TestRawSchemas:
    """Test raw data schemas against sample data."""

    def test_companies_schema_valid_data(self):
        """Test companies schema accepts valid data."""
        valid_data = pd.DataFrame({
            "id": [1, 2, 3],
            "company_rating": ["100%", "50%", "75%"],
            "company_location": ["USA", "UK", "France"],
            "total_fleet_count": [10.0, 5.0, 8.0],
            "iata_approved": ["t", "f", "t"],
        })

        validated = companies_schema.validate(valid_data)
        assert validated.shape == valid_data.shape

    def test_companies_schema_rejects_invalid_iata(self):
        """Test companies schema rejects invalid IATA values."""
        invalid_data = pd.DataFrame({
            "id": [1],
            "company_rating": ["100%"],
            "company_location": ["USA"],
            "total_fleet_count": [10.0],
            "iata_approved": ["invalid"],  # Should be 't' or 'f'
        })

        with pytest.raises(SchemaErrors) as exc_info:
            companies_schema.validate(invalid_data, lazy=True)

        assert "iata_approved" in str(exc_info.value)

    def test_companies_schema_rejects_negative_fleet(self):
        """Test companies schema rejects negative fleet counts."""
        invalid_data = pd.DataFrame({
            "id": [1],
            "company_rating": ["100%"],
            "company_location": ["USA"],
            "total_fleet_count": [-5.0],  # Must be >= 0
            "iata_approved": ["t"],
        })

        with pytest.raises(SchemaErrors):
            companies_schema.validate(invalid_data, lazy=True)

Run your tests:

python -m pytest src/spaceflights_pandera/schemas/test_schemas.py -v

Further reading

For a declarative approach to validation using catalog metadata, see the community-maintained kedro-pandera plugin (last updated July 2024).