kedro_datasets.partitions.PartitionedDataset¶
- class kedro_datasets.partitions.PartitionedDataset(*, path, dataset, filepath_arg='filepath', filename_suffix='', credentials=None, load_args=None, fs_args=None, overwrite=False, save_lazily=True, metadata=None)[source]¶
PartitionedDataset
loads and saves partitioned file-like data using the underlying dataset definition. For filesystem level operations it uses fsspec: https://github.com/intake/filesystem_spec.It also supports advanced features like lazy saving.
Example usage for the YAML API:
station_data: type: partitions.PartitionedDataset path: data/03_primary/station_data dataset: type: pandas.CSVDataset load_args: sep: '\t' save_args: sep: '\t' index: true filename_suffix: '.dat' save_lazily: True
Example usage for the Python API:
import pandas as pd from kedro_datasets.partitions import PartitionedDataset # Create a fake pandas dataframe with 10 rows of data df = pd.DataFrame([{"DAY_OF_MONTH": str(i), "VALUE": i} for i in range(1, 11)]) # Convert it to a dict of pd.DataFrame with DAY_OF_MONTH as the dict key dict_df = { ... day_of_month: df[df["DAY_OF_MONTH"] == day_of_month] ... for day_of_month in df["DAY_OF_MONTH"] ... } # Save it as small partitions with DAY_OF_MONTH as the partition key dataset = PartitionedDataset( ... path=str(tmp_path / "df_with_partition"), ... dataset="pandas.CSVDataset", ... filename_suffix=".csv", ... save_lazily=False ... ) # This will create a folder `df_with_partition` and save multiple files # with the dict key + filename_suffix as filename, i.e. 1.csv, 2.csv etc. dataset.save(dict_df) # This will create lazy load functions instead of loading data into memory immediately. loaded = dataset.load() # Load all the partitions for partition_id, partition_load_func in loaded.items(): ... # The actual function that loads the data ... partition_data = partition_load_func() ... # Add the processing logic for individual partition HERE # print(partition_data)
You can also load multiple partitions from a remote storage and combine them like this:
import pandas as pd from kedro_datasets.partitions import PartitionedDataset # these credentials will be passed to both 'fsspec.filesystem()' call # and the dataset initializer credentials = {"key1": "secret1", "key2": "secret2"} dataset = PartitionedDataset( ... path="s3://bucket-name/path/to/folder", ... dataset="pandas.CSVDataset", ... credentials=credentials, ... ) loaded = dataset.load() # assert isinstance(loaded, dict) combine_all = pd.DataFrame() for partition_id, partition_load_func in loaded.items(): ... partition_data = partition_load_func() ... combine_all = pd.concat([combine_all, partition_data], ignore_index=True, sort=True) ... new_data = pd.DataFrame({"new": [1, 2]}) # creates "s3://bucket-name/path/to/folder/new/partition.csv" dataset.save({"new/partition.csv": new_data})
Methods
exists
()Checks whether a dataset's output already exists by calling the provided _exists() method.
from_config
(name, config[, load_version, ...])Create a dataset instance using the configuration provided.
load
()Loads data by delegation to the provided load method.
release
()Release any cached data.
save
(data)Saves data by delegation to the provided save method.
Converts the dataset instance into a dictionary-based configuration for serialization.
- __init__(*, path, dataset, filepath_arg='filepath', filename_suffix='', credentials=None, load_args=None, fs_args=None, overwrite=False, save_lazily=True, metadata=None)[source]¶
Creates a new instance of
PartitionedDataset
.- Parameters:
path (
str
) – Path to the folder containing partitioned data. If path starts with the protocol (e.g.,s3://
) then the correspondingfsspec
concrete filesystem implementation will be used. If protocol is not specified,fsspec.implementations.local.LocalFileSystem
will be used. Note: Some concrete implementations are bundled withfsspec
, while others (likes3
orgcs
) must be installed separately prior to usage of thePartitionedDataset
.dataset (
str
|type
[AbstractDataset
] |dict
[str
,Any
]) – Underlying dataset definition. This is used to instantiate the dataset for each file located inside thepath
. Accepted formats are: a) object of a class that inherits fromAbstractDataset
b) a string representing a fully qualified class name to such class c) a dictionary withtype
key pointing to a string from b), other keys are passed to the Dataset initializer. Credentials for the dataset can be explicitly specified in this configuration.filepath_arg (
str
) – Underlying dataset initializer argument that will contain a path to each corresponding partition file. If unspecified, defaults to “filepath”.filename_suffix (
str
) – If specified, only partitions that end with this string will be processed.credentials (
Optional
[dict
[str
,Any
]]) – Protocol-specific options that will be passed tofsspec.filesystem
https://filesystem-spec.readthedocs.io/en/latest/api.html#fsspec.filesystem and the dataset initializer. If the dataset config contains explicit credentials spec, then such spec will take precedence. All possible credentials management scenarios are documented here: https://docs.kedro.org/en/stable/data/partitioned_and_incremental_datasets.html#partitioned-dataset-credentialsload_args (
Optional
[dict
[str
,Any
]]) – Keyword arguments to be passed intofind()
method of the filesystem implementation.fs_args (
Optional
[dict
[str
,Any
]]) – Extra arguments to pass into underlying filesystem class constructor (e.g. {“project”: “my-project”} forGCSFileSystem
).overwrite (
bool
) – If True, any existing partitions will be removed.save_lazily (
bool
) – Parameter to enable/disable lazy saving, the default is True. Meaning that if callable object is passed as data to save, the partition’s data will not be materialised until it is time to write. Lazy saving example: https://docs.kedro.org/en/stable/data/kedro_io.html#partitioned-dataset-lazy-savingmetadata (
Optional
[dict
[str
,Any
]]) – Any arbitrary metadata. This is ignored by Kedro, but may be consumed by users or external plugins.
- Raises:
DatasetError – If versioning is enabled for the underlying dataset.
- exists()[source]¶
Checks whether a dataset’s output already exists by calling the provided _exists() method.
- Return type:
- Returns:
Flag indicating whether the output already exists.
- Raises:
DatasetError – when underlying exists method raises error.
- classmethod from_config(name, config, load_version=None, save_version=None)[source]¶
Create a dataset instance using the configuration provided.
- Parameters:
name (
str
) – Data set name.load_version (
Optional
[str
]) – Version string to be used forload
operation if the dataset is versioned. Has no effect on the dataset if versioning was not enabled.save_version (
Optional
[str
]) – Version string to be used forsave
operation if the dataset is versioned. Has no effect on the dataset if versioning was not enabled.
- Return type:
- Returns:
An instance of an
AbstractDataset
subclass.- Raises:
DatasetError – When the function fails to create the dataset from its config.
- release()[source]¶
Release any cached data.
- Raises:
DatasetError – when underlying release method raises error.
- Return type:
- save(data)[source]¶
Saves data by delegation to the provided save method.
- Parameters:
data (
dict
[str
,Any
]) – the value to be saved by provided save method.- Raises:
DatasetError – when underlying save method raises error.
FileNotFoundError – when save method got file instead of dir, on Windows.
NotADirectoryError – when save method got file instead of dir, on Unix.
- Return type:
- to_config()[source]¶
Converts the dataset instance into a dictionary-based configuration for serialization. Ensures that any subclass-specific details are handled, with additional logic for versioning and caching implemented for CachedDataset.
Adds a key for the dataset’s type using its module and class name and includes the initialization arguments.
For CachedDataset it extracts the underlying dataset’s configuration, handles the versioned flag and removes unnecessary metadata. It also ensures the embedded dataset’s configuration is appropriately flattened or transformed.
If the dataset has a version key, it sets the versioned flag in the configuration.
Removes the metadata key from the configuration if present.