Skip to content

CSVDataset

CSVDataset loads and saves data to comma-separated value file(s). It uses Dask remote data services to handle the corresponding load and save operations.

kedro_datasets.dask.CSVDataset

CSVDataset(
    filepath,
    load_args=None,
    save_args=None,
    credentials=None,
    fs_args=None,
    metadata=None,
)

Bases: AbstractDataset[DataFrame, DataFrame]

CSVDataset loads and saves data to comma-separated value file(s). It uses Dask remote data services to handle the corresponding load and save operations: https://docs.dask.org/en/stable/how-to/connect-to-remote-data.html

Examples:

Using the YAML API:

cars:
  type: dask.CSVDataset
  filepath: s3://bucket_name/path/to/folder
  save_args:
    compression: GZIP
  credentials:
    client_kwargs:
      aws_access_key_id: YOUR_KEY
      aws_secret_access_key: YOUR_SECRET

Using the Python API:

>>> import dask.dataframe as dd
>>> import numpy as np
>>> import pandas as pd
>>> from kedro_datasets.dask import CSVDataset
>>>
>>> data = pd.DataFrame({"col1": [1, 2], "col2": [4, 5], "col3": [[5, 6], [7, 8]]})
>>> ddf = dd.from_pandas(data, npartitions=1)
>>>
>>> dataset = CSVDataset(filepath="path/to/folder/*.csv")
>>> dataset.save(ddf)
>>> reloaded = dataset.load()
>>> assert np.array_equal(ddf.compute(), reloaded.compute())

Parameters:

  • filepath (str | PathLike) –

    Filepath in POSIX format to a CSV file CSV collection or the directory of a multipart CSV.

  • load_args (dict[str, Any] | None, default: None ) –

    Additional loading options dask.dataframe.read_csv: https://docs.dask.org/en/stable/generated/dask.dataframe.read_csv.html

  • save_args (dict[str, Any] | None, default: None ) –

    Additional saving options for dask.dataframe.to_csv: https://docs.dask.org/en/stable/generated/dask.dataframe.to_csv.html

  • credentials (dict[str, Any] | None, default: None ) –

    Credentials required to get access to the underlying filesystem. E.g. for GCSFileSystem it should look like {"token": None}.

  • fs_args (dict[str, Any] | None, default: None ) –

    Optional parameters to the backend file system driver: https://docs.dask.org/en/stable/how-to/connect-to-remote-data.html#optional-parameters

  • metadata (dict[str, Any] | None, default: None ) –

    Any arbitrary metadata. This is ignored by Kedro, but may be consumed by users or external plugins.

Source code in kedro_datasets/dask/csv_dataset.py
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
def __init__(  # noqa: PLR0913
    self,
    filepath: str | os.PathLike,
    load_args: dict[str, Any] | None = None,
    save_args: dict[str, Any] | None = None,
    credentials: dict[str, Any] | None = None,
    fs_args: dict[str, Any] | None = None,
    metadata: dict[str, Any] | None = None,
) -> None:
    """Creates a new instance of ``CSVDataset`` pointing to concrete
    CSV files.

    Args:
        filepath: Filepath in POSIX format to a CSV file
            CSV collection or the directory of a multipart CSV.
        load_args: Additional loading options `dask.dataframe.read_csv`:
            https://docs.dask.org/en/stable/generated/dask.dataframe.read_csv.html
        save_args: Additional saving options for `dask.dataframe.to_csv`:
            https://docs.dask.org/en/stable/generated/dask.dataframe.to_csv.html
        credentials: Credentials required to get access to the underlying filesystem.
            E.g. for ``GCSFileSystem`` it should look like `{"token": None}`.
        fs_args: Optional parameters to the backend file system driver:
            https://docs.dask.org/en/stable/how-to/connect-to-remote-data.html#optional-parameters
        metadata: Any arbitrary metadata.
            This is ignored by Kedro, but may be consumed by users or external plugins.
    """
    self._filepath = filepath
    self._fs_args = deepcopy(fs_args or {})
    self._credentials = deepcopy(credentials or {})

    self.metadata = metadata

    # Handle default load and save arguments
    self._load_args = {**self.DEFAULT_LOAD_ARGS, **(load_args or {})}
    self._save_args = {**self.DEFAULT_SAVE_ARGS, **(save_args or {})}

DEFAULT_LOAD_ARGS class-attribute instance-attribute

DEFAULT_LOAD_ARGS = {}

DEFAULT_SAVE_ARGS class-attribute instance-attribute

DEFAULT_SAVE_ARGS = {'index': False}

_credentials instance-attribute

_credentials = deepcopy(credentials or {})

_filepath instance-attribute

_filepath = filepath

_fs_args instance-attribute

_fs_args = deepcopy(fs_args or {})

_load_args instance-attribute

_load_args = {
    None: DEFAULT_LOAD_ARGS,
    None: load_args or {},
}

_save_args instance-attribute

_save_args = {
    None: DEFAULT_SAVE_ARGS,
    None: save_args or {},
}

fs_args property

fs_args

Property of optional file system parameters.

Returns:

  • dict[str, Any]

    A dictionary of backend file system parameters, including credentials.

metadata instance-attribute

metadata = metadata

_describe

_describe()
Source code in kedro_datasets/dask/csv_dataset.py
102
103
104
105
106
107
def _describe(self) -> dict[str, Any]:
    return {
        "filepath": self._filepath,
        "load_args": self._load_args,
        "save_args": self._save_args,
    }

_exists

_exists()
Source code in kedro_datasets/dask/csv_dataset.py
117
118
119
120
121
122
123
124
def _exists(self) -> bool:
    protocol = get_protocol_and_path(self._filepath)[0]
    file_system = fsspec.filesystem(protocol=protocol, **self.fs_args)
    try:
        files = file_system.glob(self._filepath)
    except FileNotFoundError:
        return False
    return bool(files)

load

load()
Source code in kedro_datasets/dask/csv_dataset.py
109
110
111
112
def load(self) -> dd.DataFrame:
    return dd.read_csv(
        self._filepath, storage_options=self.fs_args, **self._load_args
    )

save

save(data)
Source code in kedro_datasets/dask/csv_dataset.py
114
115
def save(self, data: dd.DataFrame) -> None:
    data.to_csv(self._filepath, storage_options=self.fs_args, **self._save_args)