kedro_datasets.spark.SparkDataset

class kedro_datasets.spark.SparkDataset(*, filepath, file_format='parquet', load_args=None, save_args=None, version=None, credentials=None, metadata=None)[source]

SparkDataset loads and saves Spark dataframes.

Example usage for the YAML API:

weather:
  type: spark.SparkDataset
  filepath: s3a://your_bucket/data/01_raw/weather/*
  file_format: csv
  load_args:
    header: True
    inferSchema: True
  save_args:
    sep: '|'
    header: True

weather_with_schema:
  type: spark.SparkDataset
  filepath: s3a://your_bucket/data/01_raw/weather/*
  file_format: csv
  load_args:
    header: True
    schema:
      filepath: path/to/schema.json
  save_args:
    sep: '|'
    header: True

weather_cleaned:
  type: spark.SparkDataset
  filepath: data/02_intermediate/data.parquet
  file_format: parquet

Example usage for the Python API:

 from pyspark.sql import SparkSession
 from pyspark.sql.types import IntegerType, Row, StringType, StructField, StructType

 from kedro_datasets.spark import SparkDataset

 schema = StructType(
...     [StructField("name", StringType(), True), StructField("age", IntegerType(), True)]
... )

 data = [("Alex", 31), ("Bob", 12), ("Clarke", 65), ("Dave", 29)]

 spark_df = SparkSession.builder.getOrCreate().createDataFrame(data, schema)

 dataset = SparkDataset(filepath=tmp_path / "test_data")
 dataset.save(spark_df)
 reloaded = dataset.load()

 assert Row(name="Bob", age=12) in reloaded.take(4)

Attributes

DEFAULT_LOAD_ARGS

DEFAULT_SAVE_ARGS

Methods

exists()

Checks whether a data set's output already exists by calling the provided _exists() method.

from_config(name, config[, load_version, ...])

Create a data set instance using the configuration provided.

load()

Loads data by delegation to the provided load method.

release()

Release any cached data.

resolve_load_version()

Compute the version the dataset should be loaded with.

resolve_save_version()

Compute the version the dataset should be saved with.

save(data)

Saves data by delegation to the provided save method.

DEFAULT_LOAD_ARGS: dict[str, Any] = {}
DEFAULT_SAVE_ARGS: dict[str, Any] = {}
__init__(*, filepath, file_format='parquet', load_args=None, save_args=None, version=None, credentials=None, metadata=None)[source]

Creates a new instance of SparkDataset.

Parameters:
  • filepath (str) – Filepath in POSIX format to a Spark dataframe. When using Databricks specify filepath``s starting with ``/dbfs/.

  • file_format (str) – File format used during load and save operations. These are formats supported by the running SparkContext include parquet, csv, delta. For a list of supported formats please refer to Apache Spark documentation at https://spark.apache.org/docs/latest/sql-programming-guide.html

  • load_args (Optional[dict[str, Any]]) – Load args passed to Spark DataFrameReader load method. It is dependent on the selected file format. You can find a list of read options for each supported format in Spark DataFrame read documentation: https://spark.apache.org/docs/latest/api/python/getting_started/quickstart_df.html

  • save_args (Optional[dict[str, Any]]) – Save args passed to Spark DataFrame write options. Similar to load_args this is dependent on the selected file format. You can pass mode and partitionBy to specify your overwrite mode and partitioning respectively. You can find a list of options for each format in Spark DataFrame write documentation: https://spark.apache.org/docs/latest/api/python/getting_started/quickstart_df.html

  • version (Optional[Version]) – If specified, should be an instance of kedro.io.core.Version. If its load attribute is None, the latest version will be loaded. If its save attribute is None, save version will be autogenerated.

  • credentials (Optional[dict[str, Any]]) – Credentials to access the S3 bucket, such as key, secret, if filepath prefix is s3a:// or s3n://. Optional keyword arguments passed to hdfs.client.InsecureClient if filepath prefix is hdfs://. Ignored otherwise.

  • metadata (Optional[dict[str, Any]]) – Any arbitrary metadata. This is ignored by Kedro, but may be consumed by users or external plugins.

exists()

Checks whether a data set’s output already exists by calling the provided _exists() method.

Return type:

bool

Returns:

Flag indicating whether the output already exists.

Raises:

DatasetError – when underlying exists method raises error.

classmethod from_config(name, config, load_version=None, save_version=None)

Create a data set instance using the configuration provided.

Parameters:
  • name (str) – Data set name.

  • config (dict[str, Any]) – Data set config dictionary.

  • load_version (str | None) – Version string to be used for load operation if the data set is versioned. Has no effect on the data set if versioning was not enabled.

  • save_version (str | None) – Version string to be used for save operation if the data set is versioned. Has no effect on the data set if versioning was not enabled.

Return type:

AbstractDataset

Returns:

An instance of an AbstractDataset subclass.

Raises:

DatasetError – When the function fails to create the data set from its config.

load()

Loads data by delegation to the provided load method.

Return type:

TypeVar(_DO)

Returns:

Data returned by the provided load method.

Raises:

DatasetError – When underlying load method raises error.

release()

Release any cached data.

Raises:

DatasetError – when underlying release method raises error.

Return type:

None

resolve_load_version()

Compute the version the dataset should be loaded with.

Return type:

str | None

resolve_save_version()

Compute the version the dataset should be saved with.

Return type:

str | None

save(data)

Saves data by delegation to the provided save method.

Parameters:

data (TypeVar(_DI)) – the value to be saved by provided save method.

Raises:
  • DatasetError – when underlying save method raises error.

  • FileNotFoundError – when save method got file instead of dir, on Windows.

  • NotADirectoryError – when save method got file instead of dir, on Unix.

Return type:

None