IncrementalDataset¶
IncrementalDataset is used to manage datasets that grow incrementally over time.
kedro_datasets.partitions.IncrementalDataset ¶
IncrementalDataset(
*,
path,
dataset,
checkpoint=None,
filepath_arg="filepath",
filename_suffix="",
credentials=None,
load_args=None,
fs_args=None,
metadata=None
)
Bases: PartitionedDataset
IncrementalDataset inherits from PartitionedDataset, which loads
and saves partitioned file-like data using the underlying dataset
definition. For filesystem level operations it uses fsspec:
https://github.com/intake/filesystem_spec. IncrementalDataset also stores
the information about the last processed partition in so-called checkpoint
that is persisted to the location of the data partitions by default, so that
subsequent pipeline run loads only new partitions past the checkpoint.
Examples:
Using the Python API:
>>> from kedro_datasets.partitions import IncrementalDataset
>>>
>>> dataset = IncrementalDataset(
... path=str(tmp_path / "test_data"), dataset="pandas.CSVDataset"
... )
>>> loaded = dataset.load() # loads all available partitions
>>> # assert isinstance(loaded, dict)
>>>
>>> dataset.confirm() # update checkpoint value to the last processed partition ID
>>> reloaded = dataset.load() # still loads all available partitions
>>>
>>> dataset.release() # clears load cache
>>> # returns an empty dictionary as no new partitions were added
>>> assert dataset.load() == {}
Parameters:
-
path(str) –Path to the folder containing partitioned data. If path starts with the protocol (e.g.,
s3://) then the correspondingfsspecconcrete filesystem implementation will be used. If protocol is not specified,fsspec.implementations.local.LocalFileSystemwill be used. Note: Some concrete implementations are bundled withfsspec, while others (likes3orgcs) must be installed separately prior to usage of thePartitionedDataset. -
dataset(str | type[AbstractDataset] | dict[str, Any]) –Underlying dataset definition. This is used to instantiate the dataset for each file located inside the
path. Accepted formats are: a) object of a class that inherits fromAbstractDatasetb) a string representing a fully qualified class name to such class c) a dictionary withtypekey pointing to a string from b), other keys are passed to the Dataset initializer. Credentials for the dataset can be explicitly specified in this configuration. -
checkpoint(str | dict[str, Any] | None, default:None) –Optional checkpoint configuration. Accepts a dictionary with the corresponding dataset definition including
filepath(unlikedatasetargument). Checkpoint configuration is described here: https://docs.kedro.org/en/stable/data/partitioned_and_incremental_datasets.html#checkpoint-configuration Credentials for the checkpoint can be explicitly specified in this configuration. -
filepath_arg(str, default:'filepath') –Underlying dataset initializer argument that will contain a path to each corresponding partition file. If unspecified, defaults to "filepath".
-
filename_suffix(str, default:'') –If specified, only partitions that end with this string will be processed.
-
credentials(dict[str, Any] | None, default:None) –Protocol-specific options that will be passed to
fsspec.filesystemhttps://filesystem-spec.readthedocs.io/en/latest/api.html#fsspec.filesystem, the dataset initializer and the checkpoint. If the dataset or the checkpoint configuration contains explicit credentials spec, then such spec will take precedence. All possible credentials management scenarios are documented here: https://docs.kedro.org/en/stable/data/partitioned_and_incremental_datasets.html#partitioned-dataset-credentials -
load_args(dict[str, Any] | None, default:None) –Keyword arguments to be passed into
find()method of the filesystem implementation. -
fs_args(dict[str, Any] | None, default:None) –Extra arguments to pass into underlying filesystem class constructor (e.g.
{"project": "my-project"}forGCSFileSystem). -
metadata(dict[str, Any] | None, default:None) –Any arbitrary metadata. This is ignored by Kedro, but may be consumed by users or external plugins.
Raises:
-
DatasetError–If versioning is enabled for the checkpoint dataset.
Source code in kedro_datasets/partitions/incremental_dataset.py
66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 | |
DEFAULT_CHECKPOINT_FILENAME
class-attribute
instance-attribute
¶
DEFAULT_CHECKPOINT_FILENAME = 'CHECKPOINT'
DEFAULT_CHECKPOINT_TYPE
class-attribute
instance-attribute
¶
DEFAULT_CHECKPOINT_TYPE = 'kedro_datasets.text.TextDataset'
_list_partitions ¶
_list_partitions()
Source code in kedro_datasets/partitions/incremental_dataset.py
181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 | |
_parse_checkpoint_config ¶
_parse_checkpoint_config(checkpoint_config)
Source code in kedro_datasets/partitions/incremental_dataset.py
149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 | |
_read_checkpoint ¶
_read_checkpoint()
Source code in kedro_datasets/partitions/incremental_dataset.py
211 212 213 214 215 216 217 | |
confirm ¶
confirm()
Confirm the dataset by updating the checkpoint value to the latest processed partition ID
Source code in kedro_datasets/partitions/incremental_dataset.py
233 234 235 236 237 238 | |
load ¶
load()
Source code in kedro_datasets/partitions/incremental_dataset.py
219 220 221 222 223 224 225 226 227 228 229 230 231 | |