Skip to content

ParquetDataset

ParquetDataset loads and saves data to parquet file(s). It uses Dask remote data services to handle the corresponding load and save operations.

kedro_datasets.dask.ParquetDataset

ParquetDataset(
    *,
    filepath,
    load_args=None,
    save_args=None,
    credentials=None,
    fs_args=None,
    metadata=None
)

Bases: AbstractDataset[DataFrame, DataFrame]

ParquetDataset loads and saves data to parquet file(s). It uses Dask remote data services to handle the corresponding load and save operations: https://docs.dask.org/en/stable/how-to/connect-to-remote-data.html

Examples:

Using the YAML API:

cars:
  type: dask.ParquetDataset
  filepath: s3://bucket_name/path/to/folder
  save_args:
    compression: GZIP
  credentials:
    client_kwargs:
      aws_access_key_id: YOUR_KEY
      aws_secret_access_key: YOUR_SECRET

Using the Python API:

>>> import dask.dataframe as dd
>>> import pandas as pd
>>> import numpy as np
>>> from kedro_datasets.dask import ParquetDataset
>>>
>>> data = pd.DataFrame({"col1": [1, 2], "col2": [4, 5], "col3": [6, 7]})
>>> ddf = dd.from_pandas(data, npartitions=2)
>>>
>>> dataset = ParquetDataset(
...     filepath=tmp_path / "path/to/folder", save_args={"compression": "GZIP"}
... )
>>> dataset.save(ddf)
>>> reloaded = dataset.load()
>>> assert np.array_equal(ddf.compute(), reloaded.compute())

The output schema can also be explicitly specified using Triad. This is processed to map specific columns to PyArrow field types or schema. For instance:

parquet_dataset:
  type: dask.ParquetDataset
  filepath: "s3://bucket_name/path/to/folder"
  credentials:
    client_kwargs:
      aws_access_key_id: YOUR_KEY
      aws_secret_access_key: "YOUR SECRET"
  save_args:
    compression: GZIP
    schema:
      col1: [int32]
      col2: [int32]
      col3: [[int32]]

Parameters:

  • filepath (str | PathLike) –

    Filepath in POSIX format to a parquet file parquet collection or the directory of a multipart parquet.

  • load_args (dict[str, Any] | None, default: None ) –

    Additional loading options dask.dataframe.read_parquet: https://docs.dask.org/en/stable/generated/dask.dataframe.read_parquet.html

  • save_args (dict[str, Any] | None, default: None ) –

    Additional saving options for dask.dataframe.to_parquet: https://docs.dask.org/en/stable/generated/dask.dataframe.to_parquet.html

  • credentials (dict[str, Any] | None, default: None ) –

    Credentials required to get access to the underlying filesystem. E.g. for GCSFileSystem it should look like {"token": None}.

  • fs_args (dict[str, Any] | None, default: None ) –

    Optional parameters to the backend file system driver: https://docs.dask.org/en/stable/how-to/connect-to-remote-data.html#optional-parameters

  • metadata (dict[str, Any] | None, default: None ) –

    Any arbitrary metadata. This is ignored by Kedro, but may be consumed by users or external plugins.

Source code in kedro_datasets/dask/parquet_dataset.py
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
def __init__(  # noqa: PLR0913
    self,
    *,
    filepath: str | os.PathLike,
    load_args: dict[str, Any] | None = None,
    save_args: dict[str, Any] | None = None,
    credentials: dict[str, Any] | None = None,
    fs_args: dict[str, Any] | None = None,
    metadata: dict[str, Any] | None = None,
) -> None:
    """Creates a new instance of ``ParquetDataset`` pointing to concrete
    parquet files.

    Args:
        filepath: Filepath in POSIX format to a parquet file
            parquet collection or the directory of a multipart parquet.
        load_args: Additional loading options `dask.dataframe.read_parquet`:
            https://docs.dask.org/en/stable/generated/dask.dataframe.read_parquet.html
        save_args: Additional saving options for `dask.dataframe.to_parquet`:
            https://docs.dask.org/en/stable/generated/dask.dataframe.to_parquet.html
        credentials: Credentials required to get access to the underlying filesystem.
            E.g. for ``GCSFileSystem`` it should look like `{"token": None}`.
        fs_args: Optional parameters to the backend file system driver:
            https://docs.dask.org/en/stable/how-to/connect-to-remote-data.html#optional-parameters
        metadata: Any arbitrary metadata.
            This is ignored by Kedro, but may be consumed by users or external plugins.
    """
    self._filepath = filepath
    self._fs_args = deepcopy(fs_args or {})
    self._credentials = deepcopy(credentials or {})

    self.metadata = metadata

    # Handle default load and save arguments
    self._load_args = {**self.DEFAULT_LOAD_ARGS, **(load_args or {})}
    self._save_args = {**self.DEFAULT_SAVE_ARGS, **(save_args or {})}

DEFAULT_LOAD_ARGS class-attribute instance-attribute

DEFAULT_LOAD_ARGS = {}

DEFAULT_SAVE_ARGS class-attribute instance-attribute

DEFAULT_SAVE_ARGS = {'write_index': False}

_credentials instance-attribute

_credentials = deepcopy(credentials or {})

_filepath instance-attribute

_filepath = filepath

_fs_args instance-attribute

_fs_args = deepcopy(fs_args or {})

_load_args instance-attribute

_load_args = {
    None: DEFAULT_LOAD_ARGS,
    None: load_args or {},
}

_save_args instance-attribute

_save_args = {
    None: DEFAULT_SAVE_ARGS,
    None: save_args or {},
}

fs_args property

fs_args

Property of optional file system parameters.

Returns:

  • dict[str, Any]

    A dictionary of backend file system parameters, including credentials.

metadata instance-attribute

metadata = metadata

_describe

_describe()
Source code in kedro_datasets/dask/parquet_dataset.py
127
128
129
130
131
132
def _describe(self) -> dict[str, Any]:
    return {
        "filepath": self._filepath,
        "load_args": self._load_args,
        "save_args": self._save_args,
    }

_exists

_exists()
Source code in kedro_datasets/dask/parquet_dataset.py
195
196
197
198
def _exists(self) -> bool:
    protocol = get_protocol_and_path(self._filepath)[0]
    file_system = fsspec.filesystem(protocol=protocol, **self.fs_args)
    return file_system.exists(self._filepath)

_process_schema

_process_schema()

This method processes the schema in the catalog.yml or the API, if provided. This assumes that the schema is specified using Triad's grammar for schema definition.

When the value of the schema variable is a string, it is assumed that it corresponds to the full schema specification for the data.

Alternatively, if the schema is specified as a dictionary, then only the columns that are specified will be strictly mapped to a field type. The other unspecified columns, if present, will be inferred from the data.

This method converts the Triad-parsed schema into a pyarrow schema. The output directly supports Dask's specifications for providing a schema when saving to a parquet file.

Note that if a pa.Schema object is passed directly in the schema argument, no processing will be done. Additionally, the behavior when passing a pa.Schema object is assumed to be consistent with how Dask sees it. That is, it should fully define the schema for all fields.

Source code in kedro_datasets/dask/parquet_dataset.py
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
def _process_schema(self) -> None:
    """This method processes the schema in the catalog.yml or the API, if provided.
    This assumes that the schema is specified using Triad's grammar for
    schema definition.

    When the value of the `schema` variable is a string, it is assumed that
    it corresponds to the full schema specification for the data.

    Alternatively, if the `schema` is specified as a dictionary, then only the
    columns that are specified will be strictly mapped to a field type. The other
    unspecified columns, if present, will be inferred from the data.

    This method converts the Triad-parsed schema into a pyarrow schema.
    The output directly supports Dask's specifications for providing a schema
    when saving to a parquet file.

    Note that if a `pa.Schema` object is passed directly in the `schema` argument, no
    processing will be done. Additionally, the behavior when passing a `pa.Schema`
    object is assumed to be consistent with how Dask sees it. That is, it should fully
    define the  schema for all fields.
    """
    schema = self._save_args.get("schema")

    if isinstance(schema, dict):
        # The schema may contain values of different types, e.g., pa.DataType, Python types,
        # strings, etc. The latter requires a transformation, then we use triad handle all
        # other value types.

        # Create a schema from values that triad can handle directly
        triad_schema = triad.Schema(
            {k: v for k, v in schema.items() if not isinstance(v, str)}
        )

        # Handle the schema keys that are represented as string and add them to the triad schema
        triad_schema.update(
            triad.Schema(
                ",".join(
                    [f"{k}:{v}" for k, v in schema.items() if isinstance(v, str)]
                )
            )
        )

        # Update the schema argument with the normalized schema
        self._save_args["schema"].update(
            {col: field.type for col, field in triad_schema.items()}
        )

    elif isinstance(schema, str):
        self._save_args["schema"] = triad.Schema(schema).pyarrow_schema

load

load()
Source code in kedro_datasets/dask/parquet_dataset.py
134
135
136
137
def load(self) -> dd.DataFrame:
    return dd.read_parquet(
        self._filepath, storage_options=self.fs_args, **self._load_args
    )

save

save(data)
Source code in kedro_datasets/dask/parquet_dataset.py
139
140
141
142
143
def save(self, data: dd.DataFrame) -> None:
    self._process_schema()
    data.to_parquet(
        path=self._filepath, storage_options=self.fs_args, **self._save_args
    )