Skip to content

PySpark integration

This page outlines some best practices when building a Kedro pipeline with PySpark. It assumes a basic understanding of both Kedro and PySpark.

Get started with the PySpark starter

You can use the built-in PySpark starter with the kedro new command to create a working Spark-based project:

uvx kedro new --name=spaceflights-databricks --tools=pyspark --example=y

This starter is designed specifically for Spark. It replaces pandas-based datasets with SparkDatasetV2 in the Data Catalog and implements data transformations using Spark.

The starter supports multiple execution modes, including running Kedro directly on Databricks and using a remote Spark cluster from your local machine with Databricks Connect.

If you want to run the project locally, install PySpark first. It is not included in the starter dependencies by default. You can install it with kedro-datasets:

uv pip install kedro-datasets[spark-local]

Use Kedro’s built-in Spark datasets to load and save raw data

We recommend using Kedro’s built-in Spark datasets to load raw data into Spark DataFrames and to write the results back to storage.

You can find full documentation for the main Spark dataset, including examples for both Data Catalog configuration and Python API usage, here:

Kedro also provides several platform-specific Spark datasets:

Spark and Delta Lake interaction

Delta Lake is an open-source project that enables building a lake house architecture on top of data lakes. It provides ACID transactions and unifies streaming and batch data processing on top of existing data lakes, such as S3, ADLS, GCS, and HDFS. To set up PySpark with Delta Lake, review the recommendations in Delta Lake's documentation. You may have to update the SparkHooks in your src/<package_name>/hooks.py to set up the SparkSession with Delta Lake support:

from kedro.framework.hooks import hook_impl
from pyspark import SparkConf
from pyspark.sql import SparkSession
+ from delta import configure_spark_with_delta_pip

class SparkHooks:
    @hook_impl
    def after_context_created(self, context) -> None:
        """Initialises a SparkSession using the config
        defined in project's conf folder.
        """

        # Load the spark configuration in spark.yaml using the config loader
        parameters = context.config_loader["spark"]
        spark_conf = SparkConf().setAll(parameters.items())

        # Initialise the spark session
        spark_session_conf = (
            SparkSession.builder.appName(context.project_path.name)
            .enableHiveSupport()
            .config(conf=spark_conf)
        )
-       _spark_session = spark_session_conf.getOrCreate()
+       _spark_session = configure_spark_with_delta_pip(spark_session_conf).getOrCreate()
        _spark_session.sparkContext.setLogLevel("WARN")

Refer to the more detailed section on Kedro and Delta Lake integration in the Delta Lake integration guide.

Use MemoryDataset for intermediary DataFrame

For nodes operating on DataFrame that doesn't need to perform Spark actions such as writing the DataFrame to storage, we recommend using the default MemoryDataset to hold the DataFrame. In other words, there is no need to specify it in the DataCatalog or catalog.yml. This allows you to take advantage of Spark's optimiser and lazy evaluation.

Use MemoryDataset with copy_mode="assign" for non-DataFrame Spark objects

Sometimes, you might want to use Spark objects that aren't DataFrame as inputs and outputs in your pipeline. For example, suppose you have a train_model node to train a classifier using the Spark ML RandomForestClassifier and a predict node to make predictions using this classifier. In this scenario, the train_model node will output a RandomForestClassifier object, which then becomes the input for the predict node. Below is the code for this pipeline:

from typing import Any, Dict

from kedro.pipeline import Node, Pipeline
from pyspark.ml.classification import RandomForestClassifier
from pyspark.sql import DataFrame


def train_model(training_data: DataFrame) -> RandomForestClassifier:
    """Node for training a random forest model to classify the data."""
    classifier = RandomForestClassifier(numTrees=10)
    return classifier.fit(training_data)


def predict(model: RandomForestClassifier, testing_data: DataFrame) -> DataFrame:
    """Node for making predictions given a pre-trained model and a testing dataset."""
    predictions = model.transform(testing_data)
    return predictions


def create_pipeline(**kwargs) -> Pipeline:
    return Pipeline(
        [
            Node(train_model, inputs=["training_data"], outputs="example_classifier"),
            Node(
                predict,
                inputs=dict(model="example_classifier", testing_data="testing_data"),
                outputs="example_predictions",
            ),
        ]
    )

To make the pipeline work, you will need to specify example_classifier as follows in the catalog.yml:

example_classifier:
  type: MemoryDataset
  copy_mode: assign

The assign copy mode ensures that the MemoryDataset will be assigned the Spark object itself, not a deep copy version of it, since deep copy doesn't work with Spark object generally.

Tips for maximising concurrency using ThreadRunner

Under the hood, every Kedro node that performs a Spark action (for example, save, collect) is submitted to the Spark cluster as a Spark job through the same SparkSession instance. These jobs may be running concurrently if they were submitted by different threads. To do that, you will need to run your Kedro pipeline with the kedro.runner.ThreadRunner:

kedro run --runner=ThreadRunner

To further increase the concurrency level, you can enable fair scheduling in Spark ≥ 0.8. This gives each node an equal share of the Spark cluster and increases the chance that jobs run concurrently. By default, Spark uses FIFO scheduling, so a job that consumes excessive resources can block other jobs. To enable fair scheduling, configure the spark.scheduler.mode option in your local PySpark settings file.

For more information, see the Spark documentation on jobs scheduling within an application.