Source code for kedro_datasets.dask.csv_dataset

"""``CSVDataset`` is a dataset used to load and save data to CSV files using Dask
dataframe"""
from __future__ import annotations

from copy import deepcopy
from typing import Any

import dask.dataframe as dd
import fsspec
from kedro.io.core import AbstractDataset, get_protocol_and_path


[docs] class CSVDataset(AbstractDataset[dd.DataFrame, dd.DataFrame]): """``CSVDataset`` loads and saves data to comma-separated value file(s). It uses Dask remote data services to handle the corresponding load and save operations: https://docs.dask.org/en/latest/how-to/connect-to-remote-data.html Example usage for the `YAML API <https://docs.kedro.org/en/stable/data/data_catalog_yaml_examples.html>`_: .. code-block:: yaml cars: type: dask.CSVDataset filepath: s3://bucket_name/path/to/folder save_args: compression: GZIP credentials: client_kwargs: aws_access_key_id: YOUR_KEY aws_secret_access_key: YOUR_SECRET Example usage for the `Python API <https://docs.kedro.org/en/stable/data/\ advanced_data_catalog_usage.html>`_: .. code-block:: pycon >>> from kedro_datasets.dask import CSVDataset >>> import pandas as pd >>> import numpy as np >>> import dask.dataframe as dd >>> data = pd.DataFrame({"col1": [1, 2], "col2": [4, 5], "col3": [[5, 6], [7, 8]]}) >>> ddf = dd.from_pandas(data, npartitions=1) >>> dataset = CSVDataset(filepath="path/to/folder/*.csv") >>> dataset.save(ddf) >>> reloaded = dataset.load() >>> assert np.array_equal(ddf.compute(), reloaded.compute()) """ DEFAULT_LOAD_ARGS: dict[str, Any] = {} DEFAULT_SAVE_ARGS: dict[str, Any] = {"index": False}
[docs] def __init__( # noqa: PLR0913 self, filepath: str, load_args: dict[str, Any] | None = None, save_args: dict[str, Any] | None = None, credentials: dict[str, Any] | None = None, fs_args: dict[str, Any] | None = None, metadata: dict[str, Any] | None = None, ) -> None: """Creates a new instance of ``CSVDataset`` pointing to concrete CSV files. Args: filepath: Filepath in POSIX format to a CSV file CSV collection or the directory of a multipart CSV. load_args: Additional loading options `dask.dataframe.read_csv`: https://docs.dask.org/en/latest/generated/dask.dataframe.read_csv.html save_args: Additional saving options for `dask.dataframe.to_csv`: https://docs.dask.org/en/latest/generated/dask.dataframe.to_csv.html credentials: Credentials required to get access to the underlying filesystem. E.g. for ``GCSFileSystem`` it should look like `{"token": None}`. fs_args: Optional parameters to the backend file system driver: https://docs.dask.org/en/latest/how-to/connect-to-remote-data.html#optional-parameters metadata: Any arbitrary metadata. This is ignored by Kedro, but may be consumed by users or external plugins. """ self._filepath = filepath self._fs_args = deepcopy(fs_args) or {} self._credentials = deepcopy(credentials) or {} self.metadata = metadata # Handle default load and save arguments self._load_args = {**self.DEFAULT_LOAD_ARGS, **(load_args or {})} self._save_args = {**self.DEFAULT_SAVE_ARGS, **(save_args or {})}
@property def fs_args(self) -> dict[str, Any]: """Property of optional file system parameters. Returns: A dictionary of backend file system parameters, including credentials. """ fs_args = deepcopy(self._fs_args) fs_args.update(self._credentials) return fs_args def _describe(self) -> dict[str, Any]: return { "filepath": self._filepath, "load_args": self._load_args, "save_args": self._save_args, }
[docs] def load(self) -> dd.DataFrame: return dd.read_csv( self._filepath, storage_options=self.fs_args, **self._load_args )
[docs] def save(self, data: dd.DataFrame) -> None: data.to_csv(self._filepath, storage_options=self.fs_args, **self._save_args)
def _exists(self) -> bool: protocol = get_protocol_and_path(self._filepath)[0] file_system = fsspec.filesystem(protocol=protocol, **self.fs_args) files = file_system.glob(self._filepath) return bool(files)