kedro_datasets.spark.SparkStreamingDataset

class kedro_datasets.spark.SparkStreamingDataset(*, filepath='', file_format='', save_args=None, load_args=None, metadata=None)[source]

SparkStreamingDataset loads data to Spark Streaming Dataframe objects.

Example usage for the YAML API:

raw.new_inventory:
  type: spark.SparkStreamingDataset
  filepath: data/01_raw/stream/inventory/
  file_format: json
  save_args:
    output_mode: append
    checkpoint: data/04_checkpoint/raw_new_inventory
    header: True
  load_args:
    schema:
        filepath: data/01_raw/schema/inventory_schema.json

Attributes

DEFAULT_LOAD_ARGS

DEFAULT_SAVE_ARGS

Methods

exists()

Checks whether a dataset's output already exists by calling the provided _exists() method.

from_config(name, config[, load_version, ...])

Create a dataset instance using the configuration provided.

load()

Loads data from filepath.

release()

Release any cached data.

save(data)

Saves pyspark dataframe.

DEFAULT_LOAD_ARGS: dict[str, Any] = {}
DEFAULT_SAVE_ARGS: dict[str, Any] = {}
__init__(*, filepath='', file_format='', save_args=None, load_args=None, metadata=None)[source]

Creates a new instance of SparkStreamingDataset.

Parameters:
  • filepath (str) – Filepath in POSIX format to a Spark dataframe. When using Databricks specify filepath``s starting with ``/dbfs/. For message brokers such as Kafka and all filepath is not required.

  • file_format (str) – File format used during load and save operations. These are formats supported by the running SparkContext including parquet, csv, and delta. For a list of supported formats please refer to the Apache Spark documentation at https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html

  • load_args (Optional[dict[str, Any]]) – Load args passed to Spark DataFrameReader load method. It is dependent on the selected file format. You can find a list of read options for each selected format in Spark DataFrame read documentation, see https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html. Please note that a schema is mandatory for a streaming DataFrame if schemaInference is not True.

  • save_args (Optional[dict[str, Any]]) – Save args passed to Spark DataFrameReader write options. Similar to load_args, this is dependent on the selected file format. You can pass mode and partitionBy to specify your overwrite mode and partitioning respectively. You can find a list of options for each selected format in Spark DataFrame write documentation, see https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html

  • metadata (Optional[dict[str, Any]]) – Any arbitrary metadata. This is ignored by Kedro, but may be consumed by users or external plugins.

exists()

Checks whether a dataset’s output already exists by calling the provided _exists() method.

Return type:

bool

Returns:

Flag indicating whether the output already exists.

Raises:

DatasetError – when underlying exists method raises error.

classmethod from_config(name, config, load_version=None, save_version=None)

Create a dataset instance using the configuration provided.

Parameters:
  • name (str) – Data set name.

  • config (dict[str, Any]) – Data set config dictionary.

  • load_version (Optional[str]) – Version string to be used for load operation if the dataset is versioned. Has no effect on the dataset if versioning was not enabled.

  • save_version (Optional[str]) – Version string to be used for save operation if the dataset is versioned. Has no effect on the dataset if versioning was not enabled.

Return type:

AbstractDataset

Returns:

An instance of an AbstractDataset subclass.

Raises:

DatasetError – When the function fails to create the dataset from its config.

load()[source]

Loads data from filepath. If the connector type is kafka then no file_path is required, schema needs to be seperated from load_args. :rtype: DataFrame :returns: Data from filepath as pyspark dataframe.

release()

Release any cached data.

Raises:

DatasetError – when underlying release method raises error.

Return type:

None

save(data)[source]

Saves pyspark dataframe. :type data: DataFrame :param data: PySpark streaming dataframe for saving

Return type:

None