Skip to content

DeltaTableDataset

DeltaTableDataset loads and saves data to Delta tables using Apache Spark.

kedro_datasets.spark.DeltaTableDataset

DeltaTableDataset(*, filepath, metadata=None)

Bases: AbstractDataset[None, DeltaTable]

DeltaTableDataset loads data into DeltaTable objects.

Examples:

Using the YAML API:

weather@spark:
  type: spark.SparkDataset
  filepath: data/02_intermediate/data.parquet
  file_format: "delta"

weather@delta:
  type: spark.DeltaTableDataset
  filepath: data/02_intermediate/data.parquet

Using the Python API:

>>> from delta import DeltaTable
>>> from kedro_datasets.spark import DeltaTableDataset, SparkDataset
>>> from pyspark.sql import SparkSession
>>> from pyspark.sql.types import StructField, StringType, IntegerType, StructType
>>>
>>> schema = StructType(
...     [StructField("name", StringType(), True), StructField("age", IntegerType(), True)]
... )
>>> data = [("Alex", 31), ("Bob", 12), ("Clarke", 65), ("Dave", 29)]
>>> spark_df = SparkSession.builder.getOrCreate().createDataFrame(data, schema)
>>>
>>> filepath = (tmp_path / "test_data").as_posix()
>>> dataset = SparkDataset(filepath=filepath, file_format="delta")
>>> dataset.save(spark_df)
>>> deltatable_dataset = DeltaTableDataset(filepath=filepath)
>>> delta_table = deltatable_dataset.load()
>>> assert isinstance(delta_table, DeltaTable)

Parameters:

  • filepath (str) –

    Filepath in POSIX format to a Spark dataframe. When using Databricks and working with data written to mount path points, specify filepaths for (versioned) SparkDatasets starting with /dbfs/mnt.

  • metadata (dict[str, Any] | None, default: None ) –

    Any arbitrary metadata. This is ignored by Kedro, but may be consumed by users or external plugins.

Source code in kedro_datasets/spark/deltatable_dataset.py
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
def __init__(
    self, *, filepath: str, metadata: dict[str, Any] | None = None
) -> None:
    """Creates a new instance of ``DeltaTableDataset``.

    Args:
        filepath: Filepath in POSIX format to a Spark dataframe. When using Databricks
            and working with data written to mount path points,
            specify ``filepath``s for (versioned) ``SparkDataset``s
            starting with ``/dbfs/mnt``.
        metadata: Any arbitrary metadata.
            This is ignored by Kedro, but may be consumed by users or external plugins.
    """
    fs_prefix, filepath = split_filepath(filepath)

    self._fs_prefix = fs_prefix
    self._filepath = PurePosixPath(filepath)
    self.metadata = metadata

_SINGLE_PROCESS class-attribute instance-attribute

_SINGLE_PROCESS = True

_filepath instance-attribute

_filepath = PurePosixPath(filepath)

_fs_prefix instance-attribute

_fs_prefix = fs_prefix

metadata instance-attribute

metadata = metadata

_describe

_describe()
Source code in kedro_datasets/spark/deltatable_dataset.py
102
103
def _describe(self):
    return {"filepath": str(self._filepath), "fs_prefix": self._fs_prefix}

_exists

_exists()
Source code in kedro_datasets/spark/deltatable_dataset.py
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
def _exists(self) -> bool:
    load_path = strip_dbfs_prefix(self._fs_prefix + str(self._filepath))

    try:
        get_spark().read.load(path=load_path, format="delta")
    except AnalysisException as exception:
        # `AnalysisException.desc` is deprecated with pyspark >= 3.4
        message = exception.desc if hasattr(exception, "desc") else str(exception)
        if "Path does not exist:" in message or "is not a Delta table" in message:
            return False
        raise

    return True

load

load()
Source code in kedro_datasets/spark/deltatable_dataset.py
81
82
83
def load(self) -> DeltaTable:
    load_path = self._fs_prefix + str(self._filepath)
    return DeltaTable.forPath(get_spark(), load_path)

save

save(data)
Source code in kedro_datasets/spark/deltatable_dataset.py
85
86
def save(self, data: None) -> NoReturn:
    raise DatasetError(f"{self.__class__.__name__} is a read only dataset type")