PySpark integration

This page outlines some best practices when building a Kedro pipeline with PySpark. It assumes a basic understanding of both Kedro and PySpark.

Centralise Spark configuration in conf/base/spark.yml

Spark allows you to specify many different configuration options. We recommend storing all of these options in a file located at conf/base/spark.yml. Below is an example of the content of the file to specify the maxResultSize of the Spark’s driver and to use the FAIR scheduler:

spark.driver.maxResultSize: 3g
spark.scheduler.mode: FAIR

Note

Optimal configuration for Spark depends on the setup of your Spark cluster.

Initialise a SparkSession using a hook

Before any PySpark operations are performed, you should initialise your SparkSession using an after_context_created hook. This ensures that a SparkSession has been initialised before the Kedro pipeline is run.

Below is an example implementation to initialise the SparkSession in src/<package_name>/hooks.py by reading configuration from the spark.yml configuration file created in the previous section:

from kedro.framework.hooks import hook_impl
from pyspark import SparkConf
from pyspark.sql import SparkSession


class SparkHooks:
    @hook_impl
    def after_context_created(self, context) -> None:
        """Initialises a SparkSession using the config
        defined in project's conf folder.
        """

        # Load the spark configuration in spark.yaml using the config loader
        parameters = context.config_loader.get("spark*", "spark*/**")
        spark_conf = SparkConf().setAll(parameters.items())

        # Initialise the spark session
        spark_session_conf = (
            SparkSession.builder.appName(context.project_path.name)
            .enableHiveSupport()
            .config(conf=spark_conf)
        )
        _spark_session = spark_session_conf.getOrCreate()
        _spark_session.sparkContext.setLogLevel("WARN")

You should modify this code to adapt it to your cluster’s setup, e.g. setting master to yarn if you are running Spark on YARN.

Call SparkSession.builder.getOrCreate() to obtain the SparkSession anywhere in your pipeline. SparkSession.builder.getOrCreate() is a global singleton.

We don’t recommend storing Spark session on the context object, as it cannot be serialised and therefore prevents the context from being initialised for some plugins.

You will also need to register SparkHooks by updating the HOOKS variable in src/<package_name>/settings.py as follows:

from <package_name>.hooks import SparkHooks

HOOKS = (SparkHooks(),)

Use Kedro’s built-in Spark datasets to load and save raw data

We recommend using Kedro’s built-in Spark datasets to load raw data into Spark’s DataFrame, as well as to write them back to storage. Some of our built-in Spark datasets include:

The example below illustrates how to use spark.SparkDataset to read a CSV file located in S3 into a DataFrame in conf/base/catalog.yml:

weather:
  type: spark.SparkDataset
  filepath: s3a://your_bucket/data/01_raw/weather*
  file_format: csv
  load_args:
    header: True
    inferSchema: True
  save_args:
    sep: '|'
    header: True

Or using the Python API:

import pyspark.sql
from kedro.io import DataCatalog
from kedro_datasets.spark import SparkDataset

spark_ds = SparkDataset(
    filepath="s3a://your_bucket/data/01_raw/weather*",
    file_format="csv",
    load_args={"header": True, "inferSchema": True},
    save_args={"sep": "|", "header": True},
)
catalog = DataCatalog({"weather": spark_ds})

df = catalog.load("weather")
assert isinstance(df, pyspark.sql.DataFrame)

Spark and Delta Lake interaction

Delta Lake is an open-source project that enables building a Lakehouse architecture on top of data lakes. It provides ACID transactions and unifies streaming and batch data processing on top of existing data lakes, such as S3, ADLS, GCS, and HDFS. To setup PySpark with Delta Lake, have a look at the recommendations in Delta Lake’s documentation.

We recommend the following workflow, which makes use of the transcoding feature in Kedro:

  • To create a Delta table, use a SparkDataset with file_format="delta". You can also use this type of dataset to read from a Delta table or overwrite it.

  • To perform Delta table deletes, updates, and merges, load the data using a DeltaTableDataset and perform the write operations within the node function.

As a result, we end up with a catalog that looks like this:

temperature:
  type: spark.SparkDataset
  filepath: data/01_raw/data.csv
  file_format: "csv"
  load_args:
    header: True
    inferSchema: True
  save_args:
    sep: '|'
    header: True

weather@spark:
  type: spark.SparkDataset
  filepath: s3a://my_bucket/03_primary/weather
  file_format: "delta"
  save_args:
    mode: "overwrite"
    versionAsOf: 0

weather@delta:
  type: spark.DeltaTableDataset
  filepath: s3a://my_bucket/03_primary/weather

The DeltaTableDataset does not support save() operation, as the updates happen in place inside the node function, i.e. through DeltaTable.update(), DeltaTable.delete(), DeltaTable.merge().

Note

If you have defined an implementation for the Kedro before_dataset_saved/after_dataset_saved hook, the hook will not be triggered. This is because the save operation happens within the node itself, via the DeltaTable API.

pipeline(
    [
        node(
            func=process_barometer_data, inputs="temperature", outputs="weather@spark"
        ),
        node(
            func=update_meterological_state,
            inputs="weather@delta",
            outputs="first_operation_complete",
        ),
        node(
            func=estimate_weather_trend,
            inputs=["first_operation_complete", "weather@delta"],
            outputs="second_operation_complete",
        ),
    ]
)

first_operation_complete is a MemoryDataset and it signals that any Delta operations which occur “outside” the Kedro DAG are complete. This can be used as input to a downstream node, to preserve the shape of the DAG. Otherwise, if no downstream nodes need to run after this, the node can simply not return anything:

pipeline(
    [
        node(func=..., inputs="temperature", outputs="weather@spark"),
        node(func=..., inputs="weather@delta", outputs=None),
    ]
)

The following diagram is the visual representation of the workflow explained above:

Spark and Delta Lake workflow

Note

This pattern of creating “dummy” datasets to preserve the data flow also applies to other “out of DAG” execution operations such as SQL operations within a node.

Use MemoryDataset for intermediary DataFrame

For nodes operating on DataFrame that doesn’t need to perform Spark actions such as writing the DataFrame to storage, we recommend using the default MemoryDataset to hold the DataFrame. In other words, there is no need to specify it in the DataCatalog or catalog.yml. This allows you to take advantage of Spark’s optimiser and lazy evaluation.

Use MemoryDataset with copy_mode="assign" for non-DataFrame Spark objects

Sometimes, you might want to use Spark objects that aren’t DataFrame as inputs and outputs in your pipeline. For example, suppose you have a train_model node to train a classifier using Spark ML’s RandomForrestClassifier and a predict node to make predictions using this classifier. In this scenario, the train_model node will output a RandomForestClassifier object, which then becomes the input for the predict node. Below is the code for this pipeline:

from typing import Any, Dict

from kedro.pipeline import node, pipeline
from pyspark.ml.classification import RandomForestClassifier
from pyspark.sql import DataFrame


def train_model(training_data: DataFrame) -> RandomForestClassifier:
    """Node for training a random forest model to classify the data."""
    classifier = RandomForestClassifier(numTrees=10)
    return classifier.fit(training_data)


def predict(model: RandomForestClassifier, testing_data: DataFrame) -> DataFrame:
    """Node for making predictions given a pre-trained model and a testing dataset."""
    predictions = model.transform(testing_data)
    return predictions


def create_pipeline(**kwargs) -> Pipeline:
    return pipeline(
        [
            node(train_model, inputs=["training_data"], outputs="example_classifier"),
            node(
                predict,
                inputs=dict(model="example_classifier", testing_data="testing_data"),
                outputs="example_predictions",
            ),
        ]
    )

To make the pipeline work, you will need to specify example_classifier as follows in the catalog.yml:

example_classifier:
  type: MemoryDataset
  copy_mode: assign

The assign copy mode ensures that the MemoryDataset will be assigned the Spark object itself, not a deep copy version of it, since deep copy doesn’t work with Spark object generally.

Tips for maximising concurrency using ThreadRunner

Under the hood, every Kedro node that performs a Spark action (e.g. save, collect) is submitted to the Spark cluster as a Spark job through the same SparkSession instance. These jobs may be running concurrently if they were submitted by different threads. In order to do that, you will need to run your Kedro pipeline with the ThreadRunner:

kedro run --runner=ThreadRunner

To further increase the concurrency level, if you are using Spark >= 0.8, you can also give each node a roughly equal share of the Spark cluster by turning on fair sharing and therefore giving them a roughly equal chance of being executed concurrently. By default, they are executed in a FIFO manner, which means if a job takes up too much resources, it could hold up the execution of other jobs. In order to turn on fair sharing, put the following in your conf/base/spark.yml file, which was created in the Initialise a SparkSession section:

spark.scheduler.mode: FAIR

For more information, see the Spark documentation on jobs scheduling within an application.