"""``DeltaTableDataset`` loads/saves delta tables from/to a filesystem (e.g.: local,
S3, GCS), Databricks unity catalog and AWS Glue catalog respectively. It handles
load and save using a pandas dataframe.
"""
import warnings
from copy import deepcopy
from typing import Any, Dict, List, Optional
import pandas as pd
from deltalake import DataCatalog, DeltaTable, Metadata
from deltalake.exceptions import TableNotFoundError
from deltalake.writer import write_deltalake
from kedro_datasets import KedroDeprecationWarning
from kedro_datasets._io import AbstractDataset, DatasetError
[docs]class DeltaTableDataset(AbstractDataset):
"""``DeltaTableDataset`` loads/saves delta tables from/to a filesystem (e.g.: local,
S3, GCS), Databricks unity catalog and AWS Glue catalog respectively. It handles
load and save using a pandas dataframe. When saving data, you can specify one of two
modes: overwrite(default), append. If you wish to alter the schema as a part of
overwrite, pass overwrite_schema=True. You can overwrite a specific partition by using
mode=overwrite together with partition_filters. This will remove all files within the
matching partition and insert your data as new files.
Example usage for the
`YAML API <https://kedro.readthedocs.io/en/stable/data/\
data_catalog_yaml_examples.html>`_:
.. code-block:: yaml
boats_filesystem:
type: pandas.DeltaTableDataset
filepath: data/01_raw/boats
credentials: dev_creds
load_args:
version: 7
save_args:
mode: overwrite
boats_databricks_unity_catalog:
type: pandas.DeltaTableDataset
credentials: dev_creds
catalog_type: UNITY
database: simple_database
table: simple_table
save_args:
mode: overwrite
trucks_aws_glue_catalog:
type: pandas.DeltaTableDataset
credentials: dev_creds
catalog_type: AWS
catalog_name: main
database: db_schema
table: db_table
save_args:
mode: overwrite
Example usage for the
`Python API <https://kedro.readthedocs.io/en/stable/data/\
advanced_data_catalog_usage.html>`_:
::
>>> from kedro_datasets.pandas import DeltaTableDataset
>>> import pandas as pd
>>>
>>> data = pd.DataFrame({'col1': [1, 2], 'col2': [4, 5], 'col3': [5, 6]})
>>> dataset = DeltaTableDataset(filepath="test")
>>>
>>> dataset.save(data)
>>> reloaded = dataset.load()
>>> assert data.equals(reloaded)
>>>
>>> new_data = pd.DataFrame({'col1': [7, 8], 'col2': [9, 10], 'col3': [11, 12]})
>>> dataset.save(new_data)
>>> dataset.get_loaded_version()
"""
DEFAULT_WRITE_MODE = "overwrite"
ACCEPTED_WRITE_MODES = ("overwrite", "append")
DEFAULT_LOAD_ARGS: Dict[str, Any] = {}
DEFAULT_SAVE_ARGS: Dict[str, Any] = {"mode": DEFAULT_WRITE_MODE}
[docs] def __init__( # noqa: PLR0913
self,
filepath: Optional[str] = None,
catalog_type: Optional[DataCatalog] = None,
catalog_name: Optional[str] = None,
database: Optional[str] = None,
table: Optional[str] = None,
load_args: Optional[Dict[str, Any]] = None,
save_args: Optional[Dict[str, Any]] = None,
credentials: Optional[Dict[str, Any]] = None,
fs_args: Optional[Dict[str, Any]] = None,
) -> None:
"""Creates a new instance of ``DeltaTableDataset``
Args:
filepath (str): Filepath to a delta lake file with the following accepted protocol:
``S3``: `s3://<bucket>/<path>`, `s3a://<bucket>/<path>`
``Azure``: `az://<container>/<path>`, `adl://<container>/<path>`,
`abfs://<container>/<path>`
``GCS``: `gs://<bucket>/<path>`
If any of the prefix above is not provided, `file` protocol (local filesystem)
will be used.
catalog_type (DataCatalog, Optional): `AWS` or `UNITY` if filepath is not provided.
Defaults to None.
catalog_name (str, Optional): the name of catalog in AWS Glue or Databricks Unity.
Defaults to None.
database (str, Optional): the name of the database (also referred to as schema).
Defaults to None.
table (str, Optional): the name of the table.
load_args (Dict[str, Any], Optional): Additional options for loading file(s)
into DeltaTableDataset. `load_args` accepts `version` to load the appropriate
version when loading from a filesystem.
save_args (Dict[str, Any], Optional): Additional saving options for saving into
Delta lake. Here you can find all available arguments:
https://delta-io.github.io/delta-rs/python/api_reference.html#writing-deltatables
credentials (Dict[str, Any], Optional): Credentials required to get access to
the underlying filesystem. E.g. for ``GCSFileSystem`` it should look like
`{"token": None}`.
fs_args (Dict[str, Any], Optional): Extra arguments to pass into underlying
filesystem class constructor.
(e.g. `{"project": "my-project"}` for ``GCSFileSystem``).
Raises:
DatasetError: Invalid configuration supplied (through DeltaTableDataset validation)
"""
self._filepath = filepath
self._catalog_type = catalog_type
self._catalog_name = catalog_name
self._database = database
self._table = table
self._fs_args = deepcopy(fs_args) or {}
self._credentials = deepcopy(credentials) or {}
# DeltaTable cannot be instantiated from an empty directory
# for the first time creation from filepath, we need to delay the instantiation
self.is_empty_dir: bool = False
self._delta_table: Optional[DeltaTable] = None
self._load_args = deepcopy(self.DEFAULT_LOAD_ARGS)
if load_args:
self._load_args.update(load_args)
self._save_args = deepcopy(self.DEFAULT_SAVE_ARGS)
if save_args:
self._save_args.update(save_args)
write_mode = self._save_args.get("mode", None)
if write_mode not in self.ACCEPTED_WRITE_MODES:
raise DatasetError(
f"Write mode {write_mode} is not supported, "
f"Please use any of the following accepted modes "
f"{self.ACCEPTED_WRITE_MODES}"
)
self._version = self._load_args.get("version", None)
if self._filepath and self._catalog_type:
raise DatasetError(
"DeltaTableDataset can either load from "
"filepath or catalog_type. Please provide "
"one of either filepath or catalog_type."
)
if self._filepath:
try:
self._delta_table = DeltaTable(
table_uri=self._filepath,
storage_options=self.fs_args,
version=self._version,
)
except TableNotFoundError:
self.is_empty_dir = True
else:
self._delta_table = DeltaTable.from_data_catalog(
data_catalog=DataCatalog[self._catalog_type],
data_catalog_id=self._catalog_name,
database_name=self._database,
table_name=self._table,
)
@property
def fs_args(self) -> Dict[str, Any]:
"""Appends and returns filesystem credentials to fs_args."""
fs_args = deepcopy(self._fs_args)
fs_args.update(self._credentials)
return fs_args
@property
def schema(self) -> Dict[str, Any]:
"""Returns the schema of the DeltaTableDataset as a dictionary."""
return self._delta_table.schema().json()
@property
def metadata(self) -> Metadata:
"""Returns the metadata of the DeltaTableDataset as a dictionary.
Metadata contains the following:
1. A unique id
2. A name, if provided
3. A description, if provided
4. The list of partition_columns.
5. The created_time of the table
6. A map of table configuration. This includes fields such as delta.appendOnly,
which if true indicates the table is not meant to have data deleted from it.
Returns: Metadata object containing the above metadata attributes.
"""
return self._delta_table.metadata()
@property
def history(self) -> List[Dict[str, Any]]:
"""Returns the history of actions on DeltaTableDataset as a list of dictionaries."""
return self._delta_table.history()
[docs] def get_loaded_version(self) -> int:
"""Returns the version of the DeltaTableDataset that is currently loaded."""
return self._delta_table.version()
def _load(self) -> pd.DataFrame:
return self._delta_table.to_pandas()
def _save(self, data: pd.DataFrame) -> None:
if self.is_empty_dir:
# first time creation of delta table
write_deltalake(
self._filepath,
data,
storage_options=self.fs_args,
**self._save_args,
)
self.is_empty_dir = False
self._delta_table = DeltaTable(
table_uri=self._filepath,
storage_options=self.fs_args,
version=self._version,
)
else:
write_deltalake(
self._delta_table,
data,
storage_options=self.fs_args,
**self._save_args,
)
def _describe(self) -> Dict[str, Any]:
return {
"filepath": self._filepath,
"catalog_type": self._catalog_type,
"catalog_name": self._catalog_name,
"database": self._database,
"table": self._table,
"load_args": self._load_args,
"save_args": self._save_args,
"version": self._version,
}
_DEPRECATED_CLASSES = {
"DeltaTableDataSet": DeltaTableDataset,
}
def __getattr__(name):
if name in _DEPRECATED_CLASSES:
alias = _DEPRECATED_CLASSES[name]
warnings.warn(
f"{repr(name)} has been renamed to {repr(alias.__name__)}, "
f"and the alias will be removed in Kedro-Datasets 2.0.0",
KedroDeprecationWarning,
stacklevel=2,
)
return alias
raise AttributeError(f"module {repr(__name__)} has no attribute {repr(name)}")