Source code for kedro_datasets.redis.redis_dataset

"""``PickleDataset`` loads/saves data from/to a Redis database. The underlying
functionality is supported by the redis library, so it supports all allowed
options for instantiating the redis app ``from_url`` and setting a value."""
import importlib
import os
from copy import deepcopy
from typing import Any

import redis
from kedro.io.core import AbstractDataset, DatasetError


[docs] class PickleDataset(AbstractDataset[Any, Any]): """``PickleDataset`` loads/saves data from/to a Redis database. The underlying functionality is supported by the redis library, so it supports all allowed options for instantiating the redis app ``from_url`` and setting a value. Example usage for the `YAML API <https://kedro.readthedocs.io/en/stable/data/\ data_catalog_yaml_examples.html>`_: .. code-block:: yaml my_python_object: # simple example type: redis.PickleDataset key: my_object from_url_args: url: redis://127.0.0.1:6379 final_python_object: # example with save args type: redis.PickleDataset key: my_final_object from_url_args: url: redis://127.0.0.1:6379 db: 1 save_args: ex: 10 Example usage for the `Python API <https://kedro.readthedocs.io/en/stable/data/\ advanced_data_catalog_usage.html>`_: .. code-block:: pycon >>> from kedro_datasets.redis import PickleDataset >>> import pandas as pd >>> >>> data = pd.DataFrame({"col1": [1, 2], "col2": [4, 5], "col3": [5, 6]}) >>> >>> my_data = PickleDataset(key="my_data") >>> my_data.save(data) >>> reloaded = my_data.load() >>> assert data.equals(reloaded) """ DEFAULT_REDIS_URL = os.getenv("REDIS_URL", "redis://127.0.0.1:6379") DEFAULT_LOAD_ARGS: dict[str, Any] = {} DEFAULT_SAVE_ARGS: dict[str, Any] = {}
[docs] def __init__( # noqa: PLR0913 self, *, key: str, backend: str = "pickle", load_args: dict[str, Any] = None, save_args: dict[str, Any] = None, credentials: dict[str, Any] = None, redis_args: dict[str, Any] = None, metadata: dict[str, Any] = None, ) -> None: """Creates a new instance of ``PickleDataset``. This loads/saves data from/to a Redis database while deserialising/serialising. Supports custom backends to serialise/deserialise objects. Example backends that are compatible (non-exhaustive): * `pickle` * `dill` * `compress_pickle` * `cloudpickle` Example backends that are incompatible: * `torch` Args: key: The key to use for saving/loading object to Redis. backend: Backend to use, must be an import path to a module which satisfies the ``pickle`` interface. That is, contains a `loads` and `dumps` function. Defaults to 'pickle'. load_args: Pickle options for loading pickle files. You can pass in arguments that the backend load function specified accepts, e.g: pickle.loads: https://docs.python.org/3/library/pickle.html#pickle.loads dill.loads: https://dill.readthedocs.io/en/latest/index.html#dill.loads compress_pickle.loads: https://lucianopaz.github.io/compress_pickle/html/api/compress_pickle.html#compress_pickle.compress_pickle.loads cloudpickle.loads: https://github.com/cloudpipe/cloudpickle/blob/master/tests/cloudpickle_test.py All defaults are preserved. save_args: Pickle options for saving pickle files. You can pass in arguments that the backend dump function specified accepts, e.g: pickle.dumps: https://docs.python.org/3/library/pickle.html#pickle.dump dill.dumps: https://dill.readthedocs.io/en/latest/index.html#dill.dumps compress_pickle.dumps: https://lucianopaz.github.io/compress_pickle/html/api/compress_pickle.html#compress_pickle.compress_pickle.dumps cloudpickle.dumps: https://github.com/cloudpipe/cloudpickle/blob/master/tests/cloudpickle_test.py All defaults are preserved. credentials: Credentials required to get access to the redis server. E.g. `{"password": None}`. redis_args: Extra arguments to pass into the redis client constructor ``redis.StrictRedis.from_url``. (e.g. `{"socket_timeout": 10}`), as well as to pass to the ``redis.StrictRedis.set`` through nested keys `from_url_args` and `set_args`. Here you can find all available arguments for `from_url`: https://redis-py.readthedocs.io/en/stable/connections.html?highlight=from_url#redis.Redis.from_url All defaults are preserved, except `url`, which is set to `redis://127.0.0.1:6379`. You could also specify the url through the env variable ``REDIS_URL``. metadata: Any arbitrary metadata. This is ignored by Kedro, but may be consumed by users or external plugins. Raises: ValueError: If ``backend`` does not satisfy the `pickle` interface. ImportError: If the ``backend`` module could not be imported. """ try: imported_backend = importlib.import_module(backend) except ImportError as exc: raise ImportError( f"Selected backend '{backend}' could not be imported. " "Make sure it is installed and importable." ) from exc if not ( hasattr(imported_backend, "loads") and hasattr(imported_backend, "dumps") ): raise ValueError( f"Selected backend '{backend}' should satisfy the pickle interface. " "Missing one of 'loads' and 'dumps' on the backend." ) self._backend = backend self._key = key self.metadata = metadata _redis_args = deepcopy(redis_args) or {} self._redis_from_url_args = _redis_args.pop("from_url_args", {}) self._redis_from_url_args.setdefault("url", self.DEFAULT_REDIS_URL) self._redis_set_args = _redis_args.pop("set_args", {}) _credentials = deepcopy(credentials) or {} self._load_args = deepcopy(self.DEFAULT_LOAD_ARGS) if load_args is not None: self._load_args.update(load_args) self._save_args = deepcopy(self.DEFAULT_SAVE_ARGS) if save_args is not None: self._save_args.update(save_args) self._redis_db = redis.Redis.from_url( **self._redis_from_url_args, **_credentials )
def _describe(self) -> dict[str, Any]: return {"key": self._key, **self._redis_from_url_args} # `redis_db` mypy does not work since it is optional and optional is not # accepted by pickle.loads. def _load(self) -> Any: if not self.exists(): raise DatasetError(f"The provided key {self._key} does not exists.") imported_backend = importlib.import_module(self._backend) return imported_backend.loads( # type: ignore self._redis_db.get(self._key), **self._load_args ) # type: ignore def _save(self, data: Any) -> None: try: imported_backend = importlib.import_module(self._backend) self._redis_db.set( self._key, imported_backend.dumps(data, **self._save_args), # type: ignore **self._redis_set_args, ) except Exception as exc: raise DatasetError( f"{data.__class__} was not serialised due to: {exc}" ) from exc def _exists(self) -> bool: try: return bool(self._redis_db.exists(self._key)) except Exception as exc: raise DatasetError( f"The existence of key {self._key} could not be established due to: {exc}" ) from exc