Skip to content

DeltaTableDataset

DeltaTableDataset loads and saves data to/from Delta tables using pandas.

kedro_datasets.pandas.DeltaTableDataset

DeltaTableDataset(
    *,
    filepath=None,
    catalog_type=None,
    catalog_name=None,
    database=None,
    table=None,
    load_args=None,
    save_args=None,
    credentials=None,
    fs_args=None
)

Bases: AbstractDataset

DeltaTableDataset loads/saves delta tables from/to a filesystem (e.g.: local, S3, GCS), Databricks unity catalog and AWS Glue catalog respectively. It handles load and save using a pandas dataframe. When saving data, you can specify one of two modes: overwrite(default), append. If you wish to alter the schema as a part of overwrite, pass overwrite_schema=True. You can overwrite a specific partition by using mode=overwrite together with partition_filters. This will remove all files within the matching partition and insert your data as new files.

Examples:

Using the YAML API:

boats_filesystem:
  type: pandas.DeltaTableDataset
  filepath: data/01_raw/boats
  credentials: dev_creds
  load_args:
    version: 7
  save_args:
    mode: overwrite

boats_databricks_unity_catalog:
  type: pandas.DeltaTableDataset
  credentials: dev_creds
  catalog_type: UNITY
  database: simple_database
  table: simple_table
  save_args:
    mode: overwrite

trucks_aws_glue_catalog:
  type: pandas.DeltaTableDataset
  credentials: dev_creds
  catalog_type: AWS
  catalog_name: main
  database: db_schema
  table: db_table
  save_args:
    mode: overwrite

Using the Python API:

>>> from kedro_datasets.pandas import DeltaTableDataset
>>> import pandas as pd
>>>
>>> data = pd.DataFrame({"col1": [1, 2], "col2": [4, 5], "col3": [5, 6]})
>>>
>>> dataset = DeltaTableDataset(filepath=tmp_path / "test")
>>> dataset.save(data)
>>> reloaded = dataset.load()
>>> assert data.equals(reloaded)
>>>
>>> new_data = pd.DataFrame({"col1": [7, 8], "col2": [9, 10], "col3": [11, 12]})
>>> dataset.save(new_data)
>>> assert isinstance(dataset.get_loaded_version(), int)

Parameters:

  • filepath (str, default: None ) –

    Filepath to a delta lake file with the following accepted protocol: S3: s3://<bucket>/<path>, s3a://<bucket>/<path> Azure: az://<container>/<path>, adl://<container>/<path>, abfs://<container>/<path> GCS: gs://<bucket>/<path> If any of the prefix above is not provided, file protocol (local filesystem) will be used.

  • catalog_type ((DataCatalog, Optional), default: None ) –

    AWS or UNITY if filepath is not provided. Defaults to None.

  • catalog_name ((str, Optional), default: None ) –

    the name of catalog in AWS Glue or Databricks Unity. Defaults to None.

  • database ((str, Optional), default: None ) –

    the name of the database (also referred to as schema). Defaults to None.

  • table ((str, Optional), default: None ) –

    the name of the table.

  • load_args ((Dict[str, Any], Optional), default: None ) –

    Additional options for loading file(s) into DeltaTableDataset. load_args accepts version to load the appropriate version when loading from a filesystem.

  • save_args ((Dict[str, Any], Optional), default: None ) –

    Additional saving options for saving into Delta lake. Here you can find all available arguments: https://delta-io.github.io/delta-rs/python/api_reference.html#writing-deltatables

  • credentials ((Dict[str, Any], Optional), default: None ) –

    Credentials required to get access to the underlying filesystem. E.g. for GCSFileSystem it should look like {"token": None}.

  • fs_args ((Dict[str, Any], Optional), default: None ) –

    Extra arguments to pass into underlying filesystem class constructor. (e.g. {"project": "my-project"} for GCSFileSystem).

Raises: DatasetError: Invalid configuration supplied (through DeltaTableDataset validation)

Source code in kedro_datasets/pandas/deltatable_dataset.py
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
def __init__(  # noqa: PLR0913
    self,
    *,
    filepath: str | None = None,
    catalog_type: str | None = None,
    catalog_name: str | None = None,
    database: str | None = None,
    table: str | None = None,
    load_args: dict[str, Any] | None = None,
    save_args: dict[str, Any] | None = None,
    credentials: dict[str, Any] | None = None,
    fs_args: dict[str, Any] | None = None,
) -> None:
    """Creates a new instance of ``DeltaTableDataset``

    Args:
        filepath (str): Filepath to a delta lake file with the following accepted protocol:
            ``S3``: `s3://<bucket>/<path>`, `s3a://<bucket>/<path>`
            ``Azure``: `az://<container>/<path>`, `adl://<container>/<path>`,
            `abfs://<container>/<path>`
            ``GCS``: `gs://<bucket>/<path>`
            If any of the prefix above is not provided, `file` protocol (local filesystem)
            will be used.
        catalog_type (DataCatalog, Optional): `AWS` or `UNITY` if filepath is not provided.
            Defaults to None.
        catalog_name (str, Optional): the name of catalog in AWS Glue or Databricks Unity.
            Defaults to None.
        database (str, Optional): the name of the database (also referred to as schema).
            Defaults to None.
        table (str, Optional): the name of the table.
        load_args (Dict[str, Any], Optional): Additional options for loading file(s)
            into DeltaTableDataset. `load_args` accepts `version` to load the appropriate
            version when loading from a filesystem.
        save_args (Dict[str, Any], Optional): Additional saving options for saving into
            Delta lake. Here you can find all available arguments:
            https://delta-io.github.io/delta-rs/python/api_reference.html#writing-deltatables
        credentials (Dict[str, Any], Optional): Credentials required to get access to
            the underlying filesystem. E.g. for ``GCSFileSystem`` it should look like
            `{"token": None}`.
        fs_args (Dict[str, Any], Optional): Extra arguments to pass into underlying
            filesystem class constructor.
            (e.g. `{"project": "my-project"}` for ``GCSFileSystem``).
    Raises:
        DatasetError: Invalid configuration supplied (through DeltaTableDataset validation)

    """
    self._filepath = filepath
    self._catalog_type = catalog_type
    self._catalog_name = catalog_name
    self._database = database
    self._table = table
    self._fs_args = deepcopy(fs_args or {})
    self._credentials = deepcopy(credentials or {})

    # DeltaTable cannot be instantiated from an empty directory
    # for the first time creation from filepath, we need to delay the instantiation
    self.is_empty_dir: bool = False
    self._delta_table: DeltaTable | None = None

    # Handle default load and save arguments
    self._load_args = {**self.DEFAULT_LOAD_ARGS, **(load_args or {})}
    self._save_args = {**self.DEFAULT_SAVE_ARGS, **(save_args or {})}

    write_mode = self._save_args.get("mode", None)
    if write_mode not in self.ACCEPTED_WRITE_MODES:
        raise DatasetError(
            f"Write mode {write_mode} is not supported, "
            f"Please use any of the following accepted modes "
            f"{self.ACCEPTED_WRITE_MODES}"
        )

    self._version = self._load_args.get("version", None)

    if self._filepath and self._catalog_type:
        raise DatasetError(
            "DeltaTableDataset can either load from "
            "filepath or catalog_type. Please provide "
            "one of either filepath or catalog_type."
        )

    if self._filepath:
        try:
            self._delta_table = DeltaTable(
                table_uri=self._filepath,
                storage_options=self.fs_args,
                version=self._version,
            )
        except TableNotFoundError:
            self.is_empty_dir = True
    elif self._catalog_type:
        if self._catalog_type.upper() == "AWS":
            table_uri = f"glue:///{self._database}/{self._table}"
        elif self._catalog_type.upper() == "UNITY":
            table_uri = (
                f"unity://{self._catalog_name}/{self._database}/{self._table}"
            )
        else:
            raise ValueError(f"Unsupported catalog type: {self._catalog_type}")

        self._delta_table = DeltaTable(
            table_uri=table_uri,
            storage_options=self.fs_args,
            version=self._version,
        )

ACCEPTED_WRITE_MODES class-attribute instance-attribute

ACCEPTED_WRITE_MODES = ('overwrite', 'append')

DEFAULT_LOAD_ARGS class-attribute instance-attribute

DEFAULT_LOAD_ARGS = {}

DEFAULT_SAVE_ARGS class-attribute instance-attribute

DEFAULT_SAVE_ARGS = {'mode': DEFAULT_WRITE_MODE}

DEFAULT_WRITE_MODE class-attribute instance-attribute

DEFAULT_WRITE_MODE = 'overwrite'

_catalog_name instance-attribute

_catalog_name = catalog_name

_catalog_type instance-attribute

_catalog_type = catalog_type

_credentials instance-attribute

_credentials = deepcopy(credentials or {})

_database instance-attribute

_database = database

_delta_table instance-attribute

_delta_table = DeltaTable(
    table_uri=_filepath,
    storage_options=fs_args,
    version=_version,
)

_filepath instance-attribute

_filepath = filepath

_fs_args instance-attribute

_fs_args = deepcopy(fs_args or {})

_load_args instance-attribute

_load_args = {
    None: DEFAULT_LOAD_ARGS,
    None: load_args or {},
}

_save_args instance-attribute

_save_args = {
    None: DEFAULT_SAVE_ARGS,
    None: save_args or {},
}

_table instance-attribute

_table = table

_version instance-attribute

_version = get('version', None)

fs_args property

fs_args

Appends and returns filesystem credentials to fs_args.

history property

history

Returns the history of actions on DeltaTableDataset as a list of dictionaries.

is_empty_dir instance-attribute

is_empty_dir = False

metadata property

metadata

Returns the metadata of the DeltaTableDataset as a dictionary. Metadata contains the following: 1. A unique id 2. A name, if provided 3. A description, if provided 4. The list of partition_columns. 5. The created_time of the table 6. A map of table configuration. This includes fields such as delta.appendOnly, which if true indicates the table is not meant to have data deleted from it.

Returns: Metadata object containing the above metadata attributes.

schema property

schema

Returns the schema of the DeltaTableDataset as a dictionary.

_describe

_describe()
Source code in kedro_datasets/pandas/deltatable_dataset.py
252
253
254
255
256
257
258
259
260
261
262
def _describe(self) -> dict[str, Any]:
    return {
        "filepath": self._filepath,
        "catalog_type": self._catalog_type,
        "catalog_name": self._catalog_name,
        "database": self._database,
        "table": self._table,
        "load_args": self._load_args,
        "save_args": self._save_args,
        "version": self._version,
    }

get_loaded_version

get_loaded_version()

Returns the version of the DeltaTableDataset that is currently loaded.

Source code in kedro_datasets/pandas/deltatable_dataset.py
222
223
224
def get_loaded_version(self) -> int | None:
    """Returns the version of the DeltaTableDataset that is currently loaded."""
    return self._delta_table.version() if self._delta_table else None

load

load()
Source code in kedro_datasets/pandas/deltatable_dataset.py
226
227
def load(self) -> pd.DataFrame:
    return self._delta_table.to_pandas() if self._delta_table else None

save

save(data)
Source code in kedro_datasets/pandas/deltatable_dataset.py
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
def save(self, data: pd.DataFrame) -> None:
    if self.is_empty_dir:
        # first time creation of delta table
        write_deltalake(
            self._filepath or "",
            data,
            storage_options=self.fs_args,
            **self._save_args,
        )
        self.is_empty_dir = False
        self._delta_table = DeltaTable(
            table_uri=self._filepath or "",
            storage_options=self.fs_args,
            version=self._version,
        )
    else:
        write_deltalake(
            self._delta_table or "",
            data,
            storage_options=self.fs_args,
            **self._save_args,
        )