Source code for kedro_datasets.ibis.file_dataset

"""Provide file loading and saving functionality for Ibis's backends."""
from __future__ import annotations

from copy import deepcopy
from pathlib import Path, PurePosixPath
from typing import TYPE_CHECKING, Any, ClassVar

import ibis.expr.types as ir
from kedro.io import AbstractVersionedDataset, DatasetError, Version

from kedro_datasets._utils import ConnectionMixin

if TYPE_CHECKING:
    from ibis import BaseBackend


[docs] class FileDataset(ConnectionMixin, AbstractVersionedDataset[ir.Table, ir.Table]): """``FileDataset`` loads/saves data from/to a specified file format. Example usage for the `YAML API <https://docs.kedro.org/en/stable/data/data_catalog_yaml_examples.html>`_: .. code-block:: yaml cars: type: ibis.FileDataset filepath: data/01_raw/company/cars.csv file_format: csv table_name: cars connection: backend: duckdb database: company.db load_args: sep: "," nullstr: "#NA" save_args: sep: "," nullstr: "#NA" motorbikes: type: ibis.FileDataset filepath: s3://your_bucket/data/02_intermediate/company/motorbikes/ file_format: delta table_name: motorbikes connection: backend: polars Example usage for the `Python API <https://docs.kedro.org/en/stable/data/\ advanced_data_catalog_usage.html>`_: .. code-block:: pycon >>> import ibis >>> from kedro_datasets.ibis import FileDataset >>> >>> data = ibis.memtable({"col1": [1, 2], "col2": [4, 5], "col3": [5, 6]}) >>> >>> dataset = FileDataset( ... filepath=tmp_path / "test.csv", ... file_format="csv", ... table_name="test", ... connection={"backend": "duckdb", "database": tmp_path / "file.db"}, ... ) >>> dataset.save(data) >>> reloaded = dataset.load() >>> assert data.execute().equals(reloaded.execute()) """ DEFAULT_CONNECTION_CONFIG: ClassVar[dict[str, Any]] = { "backend": "duckdb", "database": ":memory:", } DEFAULT_LOAD_ARGS: ClassVar[dict[str, Any]] = {} DEFAULT_SAVE_ARGS: ClassVar[dict[str, Any]] = {} _CONNECTION_GROUP: ClassVar[str] = "ibis"
[docs] def __init__( # noqa: PLR0913 self, filepath: str, file_format: str = "parquet", *, table_name: str | None = None, connection: dict[str, Any] | None = None, load_args: dict[str, Any] | None = None, save_args: dict[str, Any] | None = None, version: Version | None = None, metadata: dict[str, Any] | None = None, ) -> None: """Creates a new ``FileDataset`` pointing to the given filepath. ``FileDataset`` connects to the Ibis backend object constructed from the connection configuration. The `backend` key provided in the config can be any of the `supported backends <https://ibis-\ project.org/install>`_. The remaining dictionary entries will be passed as arguments to the underlying ``connect()`` method (e.g. `ibis.duckdb.connect() <https://ibis-project.org/backends/duckdb\ #ibis.duckdb.connect>`_). The read method corresponding to the given ``file_format`` (e.g. `read_csv() <https://ibis-project.org/backends/\ duckdb#ibis.backends.duckdb.Backend.read_csv>`_) is used to load the file with the backend. Note that only the data is loaded; no link to the underlying file exists past ``FileDataset.load()``. Args: filepath: Path to a file to register as a table. Most useful for loading data into your data warehouse (for testing). On save, the backend exports data to the specified path. file_format: String specifying the file format for the file. Defaults to writing execution results to a Parquet file. table_name: The name to use for the created table (on load). connection: Configuration for connecting to an Ibis backend. If not provided, connect to DuckDB in in-memory mode. load_args: Additional arguments passed to the Ibis backend's `read_{file_format}` method. save_args: Additional arguments passed to the Ibis backend's `to_{file_format}` method. version: If specified, should be an instance of ``kedro.io.core.Version``. If its ``load`` attribute is None, the latest version will be loaded. If its ``save`` attribute is None, save version will be autogenerated. metadata: Any arbitrary metadata. This is ignored by Kedro, but may be consumed by users or external plugins. """ self._file_format = file_format self._table_name = table_name self._connection_config = connection or self.DEFAULT_CONNECTION_CONFIG self.metadata = metadata super().__init__( filepath=PurePosixPath(filepath), version=version, exists_function=lambda filepath: Path(filepath).exists(), ) # Set load and save arguments, overwriting defaults if provided. self._load_args = deepcopy(self.DEFAULT_LOAD_ARGS) if load_args is not None: self._load_args.update(load_args) self._save_args = deepcopy(self.DEFAULT_SAVE_ARGS) if save_args is not None: self._save_args.update(save_args)
def _connect(self) -> BaseBackend: import ibis config = deepcopy(self._connection_config) backend = getattr(ibis, config.pop("backend")) return backend.connect(**config) @property def connection(self) -> BaseBackend: """The ``Backend`` instance for the connection configuration.""" return self._connection
[docs] def load(self) -> ir.Table: load_path = self._get_load_path() reader = getattr(self.connection, f"read_{self._file_format}") return reader(load_path, self._table_name, **self._load_args)
[docs] def save(self, data: ir.Table) -> None: save_path = self._get_save_path() Path(save_path).parent.mkdir(parents=True, exist_ok=True) writer = getattr(self.connection, f"to_{self._file_format}") writer(data, save_path, **self._save_args)
def _describe(self) -> dict[str, Any]: return { "filepath": self._filepath, "file_format": self._file_format, "table_name": self._table_name, "backend": self._connection_config["backend"], "load_args": self._load_args, "save_args": self._save_args, "version": self._version, } def _exists(self) -> bool: try: load_path = self._get_load_path() except DatasetError: return False return Path(load_path).exists()