kedro_datasets.partitions.PartitionedDataset¶
- class kedro_datasets.partitions.PartitionedDataset(*, path, dataset, filepath_arg='filepath', filename_suffix='', credentials=None, load_args=None, fs_args=None, overwrite=False, metadata=None)[source]¶
PartitionedDatasetloads and saves partitioned file-like data using the underlying dataset definition. For filesystem level operations it uses fsspec: https://github.com/intake/filesystem_spec.It also supports advanced features like lazy saving.
Example usage for the YAML API:
station_data: type: partitions.PartitionedDataset path: data/03_primary/station_data dataset: type: pandas.CSVDataset load_args: sep: '\t' save_args: sep: '\t' index: true filename_suffix: '.dat'
Example usage for the Python API:
import pandas as pd from kedro_datasets.partitions import PartitionedDataset # Create a fake pandas dataframe with 10 rows of data df = pd.DataFrame([{"DAY_OF_MONTH": str(i), "VALUE": i} for i in range(1, 11)]) # Convert it to a dict of pd.DataFrame with DAY_OF_MONTH as the dict key dict_df = { ... day_of_month: df[df["DAY_OF_MONTH"] == day_of_month] ... for day_of_month in df["DAY_OF_MONTH"] ... } # Save it as small partitions with DAY_OF_MONTH as the partition key dataset = PartitionedDataset( ... path=str(tmp_path / "df_with_partition"), ... dataset="pandas.CSVDataset", ... filename_suffix=".csv", ... ) # This will create a folder `df_with_partition` and save multiple files # with the dict key + filename_suffix as filename, i.e. 1.csv, 2.csv etc. dataset.save(dict_df) # This will create lazy load functions instead of loading data into memory immediately. loaded = dataset.load() # Load all the partitions for partition_id, partition_load_func in loaded.items(): ... # The actual function that loads the data ... partition_data = partition_load_func() ... # Add the processing logic for individual partition HERE # print(partition_data)
You can also load multiple partitions from a remote storage and combine them like this:
import pandas as pd from kedro_datasets.partitions import PartitionedDataset # these credentials will be passed to both 'fsspec.filesystem()' call # and the dataset initializer credentials = {"key1": "secret1", "key2": "secret2"} dataset = PartitionedDataset( ... path="s3://bucket-name/path/to/folder", ... dataset="pandas.CSVDataset", ... credentials=credentials, ... ) loaded = dataset.load() # assert isinstance(loaded, dict) combine_all = pd.DataFrame() for partition_id, partition_load_func in loaded.items(): ... partition_data = partition_load_func() ... combine_all = pd.concat([combine_all, partition_data], ignore_index=True, sort=True) ... new_data = pd.DataFrame({"new": [1, 2]}) # creates "s3://bucket-name/path/to/folder/new/partition.csv" dataset.save({"new/partition.csv": new_data})
Methods
exists()Checks whether a data set's output already exists by calling the provided _exists() method.
from_config(name, config[, load_version, ...])Create a data set instance using the configuration provided.
load()Loads data by delegation to the provided load method.
release()Release any cached data.
save(data)Saves data by delegation to the provided save method.
- __init__(*, path, dataset, filepath_arg='filepath', filename_suffix='', credentials=None, load_args=None, fs_args=None, overwrite=False, metadata=None)[source]¶
Creates a new instance of
PartitionedDataset.- Parameters:
path (str) – Path to the folder containing partitioned data. If path starts with the protocol (e.g.,
s3://) then the correspondingfsspecconcrete filesystem implementation will be used. If protocol is not specified,fsspec.implementations.local.LocalFileSystemwill be used. Note: Some concrete implementations are bundled withfsspec, while others (likes3orgcs) must be installed separately prior to usage of thePartitionedDataset.dataset (str | type[AbstractDataset] | dict[str, Any]) – Underlying dataset definition. This is used to instantiate the dataset for each file located inside the
path. Accepted formats are: a) object of a class that inherits fromAbstractDatasetb) a string representing a fully qualified class name to such class c) a dictionary withtypekey pointing to a string from b), other keys are passed to the Dataset initializer. Credentials for the dataset can be explicitly specified in this configuration.filepath_arg (str) – Underlying dataset initializer argument that will contain a path to each corresponding partition file. If unspecified, defaults to “filepath”.
filename_suffix (str) – If specified, only partitions that end with this string will be processed.
credentials (dict[str, Any]) – Protocol-specific options that will be passed to
fsspec.filesystemhttps://filesystem-spec.readthedocs.io/en/latest/api.html#fsspec.filesystem and the dataset initializer. If the dataset config contains explicit credentials spec, then such spec will take precedence. All possible credentials management scenarios are documented here: https://kedro.readthedocs.io/en/stable/data/kedro_io.html#partitioned-dataset-credentialsload_args (dict[str, Any]) – Keyword arguments to be passed into
find()method of the filesystem implementation.fs_args (dict[str, Any]) – Extra arguments to pass into underlying filesystem class constructor (e.g. {“project”: “my-project”} for
GCSFileSystem).overwrite (bool) – If True, any existing partitions will be removed.
metadata (dict[str, Any]) – Any arbitrary metadata. This is ignored by Kedro, but may be consumed by users or external plugins.
- Raises:
DatasetError – If versioning is enabled for the underlying dataset.
- exists()¶
Checks whether a data set’s output already exists by calling the provided _exists() method.
- Return type:
- Returns:
Flag indicating whether the output already exists.
- Raises:
DatasetError – when underlying exists method raises error.
- classmethod from_config(name, config, load_version=None, save_version=None)¶
Create a data set instance using the configuration provided.
- Parameters:
name (str) – Data set name.
load_version (str | None) – Version string to be used for
loadoperation if the data set is versioned. Has no effect on the data set if versioning was not enabled.save_version (str | None) – Version string to be used for
saveoperation if the data set is versioned. Has no effect on the data set if versioning was not enabled.
- Return type:
AbstractDataset
- Returns:
An instance of an
AbstractDatasetsubclass.- Raises:
DatasetError – When the function fails to create the data set from its config.
- load()¶
Loads data by delegation to the provided load method.
- Return type:
TypeVar(_DO)- Returns:
Data returned by the provided load method.
- Raises:
DatasetError – When underlying load method raises error.
- release()¶
Release any cached data.
- Raises:
DatasetError – when underlying release method raises error.
- Return type:
- save(data)¶
Saves data by delegation to the provided save method.
- Parameters:
data (
TypeVar(_DI)) – the value to be saved by provided save method.- Raises:
DatasetError – when underlying save method raises error.
FileNotFoundError – when save method got file instead of dir, on Windows.
NotADirectoryError – when save method got file instead of dir, on Unix.
- Return type: