Source code for kedro_datasets.ibis.table_dataset

"""Provide data loading and saving functionality for Ibis's backends."""
from __future__ import annotations

from copy import deepcopy
from typing import TYPE_CHECKING, Any, ClassVar

import ibis.expr.types as ir
from kedro.io import AbstractDataset

from kedro_datasets._utils import ConnectionMixin

if TYPE_CHECKING:
    from ibis import BaseBackend


[docs] class TableDataset(ConnectionMixin, AbstractDataset[ir.Table, ir.Table]): """``TableDataset`` loads/saves data from/to Ibis table expressions. Example usage for the `YAML API <https://docs.kedro.org/en/stable/data/data_catalog_yaml_examples.html>`_: .. code-block:: yaml cars: type: ibis.TableDataset table_name: cars connection: backend: duckdb database: company.db save_args: materialized: table motorbikes: type: ibis.TableDataset table_name: motorbikes connection: backend: duckdb database: company.db Example usage for the `Python API <https://docs.kedro.org/en/stable/data/\ advanced_data_catalog_usage.html>`_: .. code-block:: pycon >>> import ibis >>> from kedro_datasets.ibis import TableDataset >>> >>> data = ibis.memtable({"col1": [1, 2], "col2": [4, 5], "col3": [5, 6]}) >>> >>> dataset = TableDataset( ... table_name="test", ... connection={"backend": "duckdb", "database": tmp_path / "file.db"}, ... save_args={"materialized": "table"}, ... ) >>> dataset.save(data) >>> reloaded = dataset.load() >>> assert data.execute().equals(reloaded.execute()) """ DEFAULT_CONNECTION_CONFIG: ClassVar[dict[str, Any]] = { "backend": "duckdb", "database": ":memory:", } DEFAULT_LOAD_ARGS: ClassVar[dict[str, Any]] = {} DEFAULT_SAVE_ARGS: ClassVar[dict[str, Any]] = { "materialized": "view", "overwrite": True, } _CONNECTION_GROUP: ClassVar[str] = "ibis"
[docs] def __init__( # noqa: PLR0913 self, *, table_name: str, database: str | None = None, connection: dict[str, Any] | None = None, load_args: dict[str, Any] | None = None, save_args: dict[str, Any] | None = None, metadata: dict[str, Any] | None = None, ) -> None: """Creates a new ``TableDataset`` pointing to a table. ``TableDataset`` connects to the Ibis backend object constructed from the connection configuration. The `backend` key provided in the config can be any of the `supported backends <https://ibis-\ project.org/install>`_. The remaining dictionary entries will be passed as arguments to the underlying ``connect()`` method (e.g. `ibis.duckdb.connect() <https://ibis-project.org/backends/duckdb\ #ibis.duckdb.connect>`_). The dataset establishes a connection to the relevant table for the execution backend. Therefore, Ibis doesn't fetch data on load; all compute is deferred until materialization, when the expression is saved. In practice, this happens when another ``TableDataset`` instance is saved, after running code defined across one more more nodes. Args: table_name: The name of the table or view to read or create. database: The name of the database to read the table or view from or create the table or view in. If not passed, then the current database is used. Provide a tuple of strings (e.g. `("catalog", "database")`) or a dotted string path (e.g. `"catalog.database"`) to reference a table or view in a multi-level table hierarchy. connection: Configuration for connecting to an Ibis backend. If not provided, connect to DuckDB in in-memory mode. load_args: Additional arguments passed to the Ibis backend's `read_{file_format}` method. save_args: Additional arguments passed to the Ibis backend's `create_{materialized}` method. By default, ``ir.Table`` objects are materialized as views. To save a table using a different materialization strategy, supply a value for `materialized` in `save_args`. metadata: Any arbitrary metadata. This is ignored by Kedro, but may be consumed by users or external plugins. """ self._table_name = table_name self._database = database self._connection_config = connection or self.DEFAULT_CONNECTION_CONFIG self.metadata = metadata # Set load and save arguments, overwriting defaults if provided. self._load_args = deepcopy(self.DEFAULT_LOAD_ARGS) if load_args is not None: self._load_args.update(load_args) if database is not None: self._load_args["database"] = database self._save_args = deepcopy(self.DEFAULT_SAVE_ARGS) if save_args is not None: self._save_args.update(save_args) if database is not None: self._save_args["database"] = database self._materialized = self._save_args.pop("materialized")
def _connect(self) -> BaseBackend: import ibis config = deepcopy(self._connection_config) backend = getattr(ibis, config.pop("backend")) return backend.connect(**config) @property def connection(self) -> BaseBackend: """The ``Backend`` instance for the connection configuration.""" return self._connection
[docs] def load(self) -> ir.Table: return self.connection.table(self._table_name, **self._load_args)
[docs] def save(self, data: ir.Table) -> None: writer = getattr(self.connection, f"create_{self._materialized}") writer(self._table_name, data, **self._save_args)
def _describe(self) -> dict[str, Any]: load_args = deepcopy(self._load_args) save_args = deepcopy(self._save_args) load_args.pop("database", None) save_args.pop("database", None) return { "table_name": self._table_name, "database": self._database, "backend": self._connection_config["backend"], "load_args": load_args, "save_args": save_args, "materialized": self._materialized, } def _exists(self) -> bool: return ( self._table_name is not None and self._table_name in self.connection.tables )