Skip to content

PartitionedDataset

PartitionedDataset is used to manage datasets that are partitioned into multiple files or directories.

kedro_datasets.partitions.PartitionedDataset

PartitionedDataset(
    *,
    path,
    dataset,
    filepath_arg="filepath",
    filename_suffix="",
    credentials=None,
    load_args=None,
    fs_args=None,
    overwrite=False,
    save_lazily=True,
    metadata=None
)

Bases: AbstractDataset[dict[str, Any], dict[str, Callable[[], Any]]]

PartitionedDataset loads and saves partitioned file-like data using the underlying dataset definition. For filesystem level operations it uses fsspec: https://github.com/intake/filesystem_spec.

It also supports advanced features like lazy saving.

Examples:

Using the YAML API:

station_data:
  type: partitions.PartitionedDataset
  path: data/03_primary/station_data
  dataset:
    type: pandas.CSVDataset
    load_args:
      sep: '\t'
    save_args:
      sep: '\t'
      index: true
  filename_suffix: '.dat'
  save_lazily: True

Using the Python API:

>>> import pandas as pd
>>> from kedro_datasets.partitions import PartitionedDataset
>>>
>>> # Create a fake pandas dataframe with 10 rows of data
>>> df = pd.DataFrame([{"DAY_OF_MONTH": str(i), "VALUE": i} for i in range(1, 11)])
>>>
>>> # Convert it to a dict of pd.DataFrame with DAY_OF_MONTH as the dict key
>>> dict_df = {
...     day_of_month: df[df["DAY_OF_MONTH"] == day_of_month]
...     for day_of_month in df["DAY_OF_MONTH"]
... }
>>>
>>> # Save it as small partitions with DAY_OF_MONTH as the partition key
>>> dataset = PartitionedDataset(
...     path=str(tmp_path / "df_with_partition"),
...     dataset="pandas.CSVDataset",
...     filename_suffix=".csv",
...     save_lazily=False,
... )
>>> # This will create a folder `df_with_partition` and save multiple files
>>> # with the dict key + filename_suffix as filename, i.e. 1.csv, 2.csv etc.
>>> dataset.save(dict_df)
>>>
>>> # This will create lazy load functions instead of loading data into memory immediately.
>>> loaded = dataset.load()
>>>
>>> # Load all the partitions
>>> for partition_id, partition_load_func in loaded.items():
...     # The actual function that loads the data
...     partition_data = partition_load_func()
...
>>> # Add the processing logic for individual partition HERE
>>> # print(partition_data)

You can also load multiple partitions from a remote storage and combine them like this:

>>> import pandas as pd
>>> from kedro_datasets.partitions import PartitionedDataset
>>>
>>> # these credentials will be passed to both 'fsspec.filesystem()' call
>>> # and the dataset initializer
>>> credentials = {"key1": "secret1", "key2": "secret2"}
>>>
>>> dataset = PartitionedDataset(
...     path="s3://bucket-name/path/to/folder",
...     dataset="pandas.CSVDataset",
...     credentials=credentials,
... )
>>> loaded = dataset.load()
>>> # assert isinstance(loaded, dict)
>>>
>>> combine_all = pd.DataFrame()
>>>
>>> for partition_id, partition_load_func in loaded.items():
...     partition_data = partition_load_func()
...     combine_all = pd.concat([combine_all, partition_data], ignore_index=True, sort=True)
...
>>> new_data = pd.DataFrame({"new": [1, 2]})
>>> # creates "s3://bucket-name/path/to/folder/new/partition.csv"
>>> dataset.save({"new/partition.csv": new_data})

Parameters:

  • path (str) –

    Path to the folder containing partitioned data. If path starts with the protocol (e.g., s3://) then the corresponding fsspec concrete filesystem implementation will be used. If protocol is not specified, fsspec.implementations.local.LocalFileSystem will be used. Note: Some concrete implementations are bundled with fsspec, while others (like s3 or gcs) must be installed separately prior to usage of the PartitionedDataset.

  • dataset (str | type[AbstractDataset] | dict[str, Any]) –

    Underlying dataset definition. This is used to instantiate the dataset for each file located inside the path. Accepted formats are: a) object of a class that inherits from AbstractDataset b) a string representing a fully qualified class name to such class c) a dictionary with type key pointing to a string from b), other keys are passed to the Dataset initializer. Credentials for the dataset can be explicitly specified in this configuration.

  • filepath_arg (str, default: 'filepath' ) –

    Underlying dataset initializer argument that will contain a path to each corresponding partition file. If unspecified, defaults to "filepath".

  • filename_suffix (str, default: '' ) –

    If specified, only partitions that end with this string will be processed.

  • credentials (dict[str, Any] | None, default: None ) –

    Protocol-specific options that will be passed to fsspec.filesystem https://filesystem-spec.readthedocs.io/en/latest/api.html#fsspec.filesystem and the dataset initializer. If the dataset config contains explicit credentials spec, then such spec will take precedence. All possible credentials management scenarios are documented here: https://docs.kedro.org/en/stable/data/partitioned_and_incremental_datasets.html#partitioned-dataset-credentials

  • load_args (dict[str, Any] | None, default: None ) –

    Keyword arguments to be passed into find() method of the filesystem implementation.

  • fs_args (dict[str, Any] | None, default: None ) –

    Extra arguments to pass into underlying filesystem class constructor (e.g. {"project": "my-project"} for GCSFileSystem).

  • overwrite (bool, default: False ) –

    If True, any existing partitions will be removed.

  • save_lazily (bool, default: True ) –

    Parameter to enable/disable lazy saving, the default is True. Meaning that if callable object is passed as data to save, the partition’s data will not be materialised until it is time to write. Lazy saving example: https://docs.kedro.org/en/stable/catalog-data/partitioned_and_incremental_datasets/#partitioned-dataset-lazy-saving

  • metadata (dict[str, Any] | None, default: None ) –

    Any arbitrary metadata. This is ignored by Kedro, but may be consumed by users or external plugins.

Raises:

  • DatasetError

    If versioning is enabled for the underlying dataset.

Source code in kedro_datasets/partitions/partitioned_dataset.py
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
def __init__(  # noqa: PLR0913
    self,
    *,
    path: str,
    dataset: str | type[AbstractDataset] | dict[str, Any],
    filepath_arg: str = "filepath",
    filename_suffix: str = "",
    credentials: dict[str, Any] | None = None,
    load_args: dict[str, Any] | None = None,
    fs_args: dict[str, Any] | None = None,
    overwrite: bool = False,
    save_lazily: bool = True,
    metadata: dict[str, Any] | None = None,
) -> None:
    """Creates a new instance of ``PartitionedDataset``.

    Args:
        path: Path to the folder containing partitioned data.
            If path starts with the protocol (e.g., ``s3://``) then the
            corresponding ``fsspec`` concrete filesystem implementation will
            be used. If protocol is not specified,
            ``fsspec.implementations.local.LocalFileSystem`` will be used.
            **Note:** Some concrete implementations are bundled with ``fsspec``,
            while others (like ``s3`` or ``gcs``) must be installed separately
            prior to usage of the ``PartitionedDataset``.
        dataset: Underlying dataset definition. This is used to instantiate
            the dataset for each file located inside the ``path``.
            Accepted formats are:
            a) object of a class that inherits from ``AbstractDataset``
            b) a string representing a fully qualified class name to such class
            c) a dictionary with ``type`` key pointing to a string from b),
            other keys are passed to the Dataset initializer.
            Credentials for the dataset can be explicitly specified in
            this configuration.
        filepath_arg: Underlying dataset initializer argument that will
            contain a path to each corresponding partition file.
            If unspecified, defaults to "filepath".
        filename_suffix: If specified, only partitions that end with this
            string will be processed.
        credentials: Protocol-specific options that will be passed to
            ``fsspec.filesystem``
            https://filesystem-spec.readthedocs.io/en/latest/api.html#fsspec.filesystem
            and the dataset initializer. If the dataset config contains
            explicit credentials spec, then such spec will take precedence.
            All possible credentials management scenarios are documented here:
            https://docs.kedro.org/en/stable/data/partitioned_and_incremental_datasets.html#partitioned-dataset-credentials
        load_args: Keyword arguments to be passed into ``find()`` method of
            the filesystem implementation.
        fs_args: Extra arguments to pass into underlying filesystem class constructor
            (e.g. `{"project": "my-project"}` for ``GCSFileSystem``).
        overwrite: If True, any existing partitions will be removed.
        save_lazily: Parameter to enable/disable lazy saving, the default is True. Meaning that if callable object
            is passed as data to save, the partition’s data will not be materialised until it is time to write.
            Lazy saving example:
            https://docs.kedro.org/en/stable/catalog-data/partitioned_and_incremental_datasets/#partitioned-dataset-lazy-saving
        metadata: Any arbitrary metadata.
            This is ignored by Kedro, but may be consumed by users or external plugins.

    Raises:
        DatasetError: If versioning is enabled for the underlying dataset.
    """
    # for performance reasons
    from fsspec.utils import infer_storage_options  # noqa: PLC0415

    super().__init__()

    self._path = path
    self._filename_suffix = filename_suffix
    self._overwrite = overwrite
    self._protocol = infer_storage_options(self._path)["protocol"]
    self._partition_cache: Cache = Cache(maxsize=1)
    self._save_lazily = save_lazily
    self.metadata = metadata

    dataset = dataset if isinstance(dataset, dict) else {"type": dataset}
    self._dataset_type, self._dataset_config = parse_dataset_definition(dataset)

    if credentials:
        if CREDENTIALS_KEY in self._dataset_config:
            self._logger.warning(
                KEY_PROPAGATION_WARNING,
                {"keys": CREDENTIALS_KEY, "target": "underlying dataset"},
            )
        else:
            self._dataset_config[CREDENTIALS_KEY] = deepcopy(credentials)

    self._credentials = deepcopy(credentials or {})

    self._fs_args = deepcopy(fs_args or {})
    if self._fs_args:
        if "fs_args" in self._dataset_config:
            self._logger.warning(
                KEY_PROPAGATION_WARNING,
                {"keys": "filesystem arguments", "target": "underlying dataset"},
            )
        else:
            self._dataset_config["fs_args"] = deepcopy(self._fs_args)

    self._filepath_arg = filepath_arg
    if self._filepath_arg in self._dataset_config:
        warn(
            f"'{self._filepath_arg}' key must not be specified in the dataset "
            f"definition as it will be overwritten by partition path"
        )

    self._load_args = deepcopy(load_args or {})
    self._sep = self._filesystem.sep
    # since some filesystem implementations may implement a global cache
    self._invalidate_caches()

_credentials instance-attribute

_credentials = deepcopy(credentials or {})

_filename_suffix instance-attribute

_filename_suffix = filename_suffix

_filepath_arg instance-attribute

_filepath_arg = filepath_arg

_filesystem property

_filesystem

_fs_args instance-attribute

_fs_args = deepcopy(fs_args or {})

_load_args instance-attribute

_load_args = deepcopy(load_args or {})

_normalized_path property

_normalized_path

_overwrite instance-attribute

_overwrite = overwrite

_partition_cache instance-attribute

_partition_cache = Cache(maxsize=1)

_path instance-attribute

_path = path

_protocol instance-attribute

_protocol = infer_storage_options(_path)['protocol']

_save_lazily instance-attribute

_save_lazily = save_lazily

_sep instance-attribute

_sep = sep

metadata instance-attribute

metadata = metadata

__repr__

__repr__()
Source code in kedro_datasets/partitions/partitioned_dataset.py
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
def __repr__(self) -> str:
    object_description = self._describe()

    # Dummy object to call _pretty_repr
    # Only clean_dataset_config parameters are exposed
    kwargs = deepcopy(self._dataset_config)
    kwargs[self._filepath_arg] = ""
    dataset = self._dataset_type(**kwargs)  # type: ignore

    object_description_repr = {
        "filepath": object_description["path"],
        "dataset": dataset._pretty_repr(object_description["dataset_config"]),
    }

    return self._pretty_repr(object_description_repr)

_describe

_describe()
Source code in kedro_datasets/partitions/partitioned_dataset.py
323
324
325
326
327
328
329
330
331
332
333
def _describe(self) -> dict[str, Any]:
    clean_dataset_config = (
        {k: v for k, v in self._dataset_config.items() if k != CREDENTIALS_KEY}
        if isinstance(self._dataset_config, dict)
        else self._dataset_config
    )
    return {
        "path": self._path,
        "dataset_type": self._dataset_type.__name__,
        "dataset_config": clean_dataset_config,
    }

_exists

_exists()
Source code in kedro_datasets/partitions/partitioned_dataset.py
355
356
def _exists(self) -> bool:
    return bool(self._list_partitions())

_invalidate_caches

_invalidate_caches()
Source code in kedro_datasets/partitions/partitioned_dataset.py
351
352
353
def _invalidate_caches(self) -> None:
    self._partition_cache.clear()
    self._filesystem.invalidate_cache(self._normalized_path)

_join_protocol

_join_protocol(path)
Source code in kedro_datasets/partitions/partitioned_dataset.py
267
268
269
270
271
272
273
def _join_protocol(self, path: str) -> str:
    protocol_prefix = f"{self._protocol}://"
    if self._path.startswith(protocol_prefix) and not path.startswith(
        protocol_prefix
    ):
        return f"{protocol_prefix}{path}"
    return path

_list_partitions

_list_partitions()
Source code in kedro_datasets/partitions/partitioned_dataset.py
258
259
260
261
262
263
264
265
@cachedmethod(cache=operator.attrgetter("_partition_cache"))
def _list_partitions(self) -> list[str]:
    dataset_is_versioned = VERSION_KEY in self._dataset_config
    return [
        _grandparent(path) if dataset_is_versioned else path
        for path in self._filesystem.find(self._normalized_path, **self._load_args)
        if path.endswith(self._filename_suffix)
    ]

_partition_to_path

_partition_to_path(path)
Source code in kedro_datasets/partitions/partitioned_dataset.py
275
276
277
278
279
def _partition_to_path(self, path: str):
    dir_path = self._path.rstrip(self._sep)
    path = path.lstrip(self._sep)
    full_path = self._sep.join([dir_path, path]) + self._filename_suffix
    return full_path

_path_to_partition

_path_to_partition(path)
Source code in kedro_datasets/partitions/partitioned_dataset.py
281
282
283
284
285
286
def _path_to_partition(self, path: str) -> str:
    dir_path = self._filesystem._strip_protocol(self._normalized_path)
    path = path.split(dir_path, 1).pop().lstrip(self._sep)
    if self._filename_suffix and path.endswith(self._filename_suffix):
        path = path[: -len(self._filename_suffix)]
    return path

_release

_release()
Source code in kedro_datasets/partitions/partitioned_dataset.py
358
359
360
def _release(self) -> None:
    super()._release()
    self._invalidate_caches()

load

load()
Source code in kedro_datasets/partitions/partitioned_dataset.py
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
def load(self) -> dict[str, Callable[[], Any]]:
    self._invalidate_caches()  # Invalidate cache before listing partitions

    partitions = {}

    # The _list_partitions() call will now always perform a fresh scan
    # because its cache was just cleared.
    for partition_file_path in self._list_partitions():
        kwargs = deepcopy(self._dataset_config)
        # join the protocol back since PySpark may rely on it
        kwargs[self._filepath_arg] = self._join_protocol(partition_file_path)
        dataset = self._dataset_type(**kwargs)  # type: ignore
        partition_id = self._path_to_partition(partition_file_path)
        partitions[partition_id] = dataset.load

    if not partitions:
        raise DatasetError(f"No partitions found in '{self._path}'")

    return partitions

save

save(data)
Source code in kedro_datasets/partitions/partitioned_dataset.py
308
309
310
311
312
313
314
315
316
317
318
319
320
321
def save(self, data: dict[str, Any]) -> None:
    if self._overwrite and self._filesystem.exists(self._normalized_path):
        self._filesystem.rm(self._normalized_path, recursive=True)

    for partition_id, partition_data in sorted(data.items()):
        kwargs = deepcopy(self._dataset_config)
        partition = self._partition_to_path(partition_id)
        # join the protocol back since tools like PySpark may rely on it
        kwargs[self._filepath_arg] = self._join_protocol(partition)
        dataset = self._dataset_type(**kwargs)  # type: ignore
        if callable(partition_data) and self._save_lazily:
            partition_data = partition_data()  # noqa: PLW2901
        dataset.save(partition_data)
    self._invalidate_caches()