Skip to content

IncrementalDataset

IncrementalDataset is used to manage datasets that grow incrementally over time.

kedro_datasets.partitions.IncrementalDataset

IncrementalDataset(
    *,
    path,
    dataset,
    checkpoint=None,
    filepath_arg="filepath",
    filename_suffix="",
    credentials=None,
    load_args=None,
    fs_args=None,
    metadata=None
)

Bases: PartitionedDataset

IncrementalDataset inherits from PartitionedDataset, which loads and saves partitioned file-like data using the underlying dataset definition. For filesystem level operations it uses fsspec: https://github.com/intake/filesystem_spec. IncrementalDataset also stores the information about the last processed partition in so-called checkpoint that is persisted to the location of the data partitions by default, so that subsequent pipeline run loads only new partitions past the checkpoint.

Examples:

Using the Python API:

>>> from kedro_datasets.partitions import IncrementalDataset
>>>
>>> dataset = IncrementalDataset(
...     path=str(tmp_path / "test_data"), dataset="pandas.CSVDataset"
... )
>>> loaded = dataset.load()  # loads all available partitions
>>> # assert isinstance(loaded, dict)
>>>
>>> dataset.confirm()  # update checkpoint value to the last processed partition ID
>>> reloaded = dataset.load()  # still loads all available partitions
>>>
>>> dataset.release()  # clears load cache
>>> # returns an empty dictionary as no new partitions were added
>>> assert dataset.load() == {}

Parameters:

  • path (str) –

    Path to the folder containing partitioned data. If path starts with the protocol (e.g., s3://) then the corresponding fsspec concrete filesystem implementation will be used. If protocol is not specified, fsspec.implementations.local.LocalFileSystem will be used. Note: Some concrete implementations are bundled with fsspec, while others (like s3 or gcs) must be installed separately prior to usage of the PartitionedDataset.

  • dataset (str | type[AbstractDataset] | dict[str, Any]) –

    Underlying dataset definition. This is used to instantiate the dataset for each file located inside the path. Accepted formats are: a) object of a class that inherits from AbstractDataset b) a string representing a fully qualified class name to such class c) a dictionary with type key pointing to a string from b), other keys are passed to the Dataset initializer. Credentials for the dataset can be explicitly specified in this configuration.

  • checkpoint (str | dict[str, Any] | None, default: None ) –

    Optional checkpoint configuration. Accepts a dictionary with the corresponding dataset definition including filepath (unlike dataset argument). Checkpoint configuration is described here: https://docs.kedro.org/en/stable/data/partitioned_and_incremental_datasets.html#checkpoint-configuration Credentials for the checkpoint can be explicitly specified in this configuration.

  • filepath_arg (str, default: 'filepath' ) –

    Underlying dataset initializer argument that will contain a path to each corresponding partition file. If unspecified, defaults to "filepath".

  • filename_suffix (str, default: '' ) –

    If specified, only partitions that end with this string will be processed.

  • credentials (dict[str, Any] | None, default: None ) –

    Protocol-specific options that will be passed to fsspec.filesystem https://filesystem-spec.readthedocs.io/en/latest/api.html#fsspec.filesystem, the dataset initializer and the checkpoint. If the dataset or the checkpoint configuration contains explicit credentials spec, then such spec will take precedence. All possible credentials management scenarios are documented here: https://docs.kedro.org/en/stable/data/partitioned_and_incremental_datasets.html#partitioned-dataset-credentials

  • load_args (dict[str, Any] | None, default: None ) –

    Keyword arguments to be passed into find() method of the filesystem implementation.

  • fs_args (dict[str, Any] | None, default: None ) –

    Extra arguments to pass into underlying filesystem class constructor (e.g. {"project": "my-project"} for GCSFileSystem).

  • metadata (dict[str, Any] | None, default: None ) –

    Any arbitrary metadata. This is ignored by Kedro, but may be consumed by users or external plugins.

Raises:

  • DatasetError

    If versioning is enabled for the checkpoint dataset.

Source code in kedro_datasets/partitions/incremental_dataset.py
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
def __init__(  # noqa: PLR0913
    self,
    *,
    path: str,
    dataset: str | type[AbstractDataset] | dict[str, Any],
    checkpoint: str | dict[str, Any] | None = None,
    filepath_arg: str = "filepath",
    filename_suffix: str = "",
    credentials: dict[str, Any] | None = None,
    load_args: dict[str, Any] | None = None,
    fs_args: dict[str, Any] | None = None,
    metadata: dict[str, Any] | None = None,
) -> None:
    """Creates a new instance of ``IncrementalDataset``.

    Args:
        path: Path to the folder containing partitioned data.
            If path starts with the protocol (e.g., ``s3://``) then the
            corresponding ``fsspec`` concrete filesystem implementation will
            be used. If protocol is not specified,
            ``fsspec.implementations.local.LocalFileSystem`` will be used.
            **Note:** Some concrete implementations are bundled with ``fsspec``,
            while others (like ``s3`` or ``gcs``) must be installed separately
            prior to usage of the ``PartitionedDataset``.
        dataset: Underlying dataset definition. This is used to instantiate
            the dataset for each file located inside the ``path``.
            Accepted formats are:
            a) object of a class that inherits from ``AbstractDataset``
            b) a string representing a fully qualified class name to such class
            c) a dictionary with ``type`` key pointing to a string from b),
            other keys are passed to the Dataset initializer.
            Credentials for the dataset can be explicitly specified in
            this configuration.
        checkpoint: Optional checkpoint configuration. Accepts a dictionary
            with the corresponding dataset definition including ``filepath``
            (unlike ``dataset`` argument). Checkpoint configuration is
            described here:
            https://docs.kedro.org/en/stable/data/partitioned_and_incremental_datasets.html#checkpoint-configuration
            Credentials for the checkpoint can be explicitly specified
            in this configuration.
        filepath_arg: Underlying dataset initializer argument that will
            contain a path to each corresponding partition file.
            If unspecified, defaults to "filepath".
        filename_suffix: If specified, only partitions that end with this
            string will be processed.
        credentials: Protocol-specific options that will be passed to
            ``fsspec.filesystem``
            https://filesystem-spec.readthedocs.io/en/latest/api.html#fsspec.filesystem,
            the dataset initializer and the checkpoint. If
            the dataset or the checkpoint configuration contains explicit
            credentials spec, then such spec will take precedence.
            All possible credentials management scenarios are documented here:
            https://docs.kedro.org/en/stable/data/partitioned_and_incremental_datasets.html#partitioned-dataset-credentials
        load_args: Keyword arguments to be passed into ``find()`` method of
            the filesystem implementation.
        fs_args: Extra arguments to pass into underlying filesystem class constructor
            (e.g. `{"project": "my-project"}` for ``GCSFileSystem``).
        metadata: Any arbitrary metadata.
            This is ignored by Kedro, but may be consumed by users or external plugins.

    Raises:
        DatasetError: If versioning is enabled for the checkpoint dataset.
    """

    super().__init__(
        path=path,
        dataset=dataset,
        filepath_arg=filepath_arg,
        filename_suffix=filename_suffix,
        credentials=credentials,
        load_args=load_args,
        fs_args=fs_args,
    )

    self._checkpoint_config = self._parse_checkpoint_config(checkpoint)
    self._force_checkpoint = self._checkpoint_config.pop("force_checkpoint", None)
    self.metadata = metadata

    comparison_func = self._checkpoint_config.pop("comparison_func", operator.gt)
    if isinstance(comparison_func, str):
        comparison_func = load_obj(comparison_func)
    self._comparison_func = comparison_func

DEFAULT_CHECKPOINT_FILENAME class-attribute instance-attribute

DEFAULT_CHECKPOINT_FILENAME = 'CHECKPOINT'

DEFAULT_CHECKPOINT_TYPE class-attribute instance-attribute

DEFAULT_CHECKPOINT_TYPE = 'kedro_datasets.text.TextDataset'

_checkpoint property

_checkpoint

_checkpoint_config instance-attribute

_checkpoint_config = _parse_checkpoint_config(checkpoint)

_comparison_func instance-attribute

_comparison_func = comparison_func

_force_checkpoint instance-attribute

_force_checkpoint = pop('force_checkpoint', None)

metadata instance-attribute

metadata = metadata

_list_partitions

_list_partitions()
Source code in kedro_datasets/partitions/incremental_dataset.py
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
def _list_partitions(self) -> list[str]:
    if self._cached_partitions is not None:
        return self._cached_partitions

    checkpoint = self._read_checkpoint()
    checkpoint_path = self._filesystem._strip_protocol(
        self._checkpoint_config[self._filepath_arg]
    )
    dataset_is_versioned = VERSION_KEY in self._dataset_config

    def _is_valid_partition(partition) -> bool:
        if not partition.endswith(self._filename_suffix):
            return False
        if partition == checkpoint_path:
            return False
        if checkpoint is None:
            # nothing was processed yet
            return True
        partition_id = self._path_to_partition(partition)
        return self._comparison_func(partition_id, checkpoint)

    self._cached_partitions = sorted(
        _grandparent(path) if dataset_is_versioned else path
        for path in self._filesystem.find(self._normalized_path, **self._load_args)
        if _is_valid_partition(path)
    )
    return self._cached_partitions

_parse_checkpoint_config

_parse_checkpoint_config(checkpoint_config)
Source code in kedro_datasets/partitions/incremental_dataset.py
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
def _parse_checkpoint_config(
    self, checkpoint_config: str | dict[str, Any] | None
) -> dict[str, Any]:
    checkpoint_config = deepcopy(checkpoint_config)
    if isinstance(checkpoint_config, str):
        checkpoint_config = {"force_checkpoint": checkpoint_config}
    checkpoint_config = checkpoint_config or {}

    for key in {VERSION_KEY, VERSIONED_FLAG_KEY} & checkpoint_config.keys():
        raise DatasetError(
            f"'{self.__class__.__name__}' does not support versioning of the "
            f"checkpoint. Please remove '{key}' key from the checkpoint definition."
        )

    default_checkpoint_path = self._sep.join(
        [self._normalized_path.rstrip(self._sep), self.DEFAULT_CHECKPOINT_FILENAME]
    )
    default_config = {
        "type": self.DEFAULT_CHECKPOINT_TYPE,
        self._filepath_arg: default_checkpoint_path,
    }
    if self._credentials:
        default_config[CREDENTIALS_KEY] = deepcopy(self._credentials)

    if CREDENTIALS_KEY in default_config.keys() & checkpoint_config.keys():
        self._logger.warning(
            KEY_PROPAGATION_WARNING,
            {"keys": CREDENTIALS_KEY, "target": "checkpoint"},
        )

    merged = {**default_config, **checkpoint_config}

    if self._filepath_arg in checkpoint_config:
        user_filepath = str(checkpoint_config[self._filepath_arg])
        if self._protocol == "file":
            # Local filesystem: use os.path directly so Windows drive
            # letters are handled correctly across all fsspec versions.
            base = os.path.normcase(os.path.normpath(self._normalized_path))
            if os.path.isabs(user_filepath):
                resolved = os.path.normcase(os.path.normpath(user_filepath))
            else:
                resolved = os.path.normcase(
                    os.path.normpath(os.path.join(base, user_filepath))
                )
            if not (resolved == base or resolved.startswith(base + os.sep)):
                raise DatasetError(
                    f"Checkpoint filepath '{user_filepath}' resolves to "
                    f"'{resolved}' which is outside the dataset "
                    f"directory '{base}'."
                )
        else:
            # Cloud filesystem (S3, GCS, Azure, …): strip protocol from
            # both paths, then do a direct normpath prefix check — the same
            # pattern as the local branch — so same-bucket traversal (../)
            # and cross-bucket paths are both caught without path joining.
            dir_path = self._filesystem._strip_protocol(
                self._normalized_path
            ).rstrip(self._sep)
            fp_stripped = self._filesystem._strip_protocol(user_filepath).replace(
                "\\", "/"
            )
            normalized_dir = posixpath.normpath(dir_path)
            normalized_fp = posixpath.normpath(fp_stripped)
            if not (
                normalized_fp == normalized_dir
                or normalized_fp.startswith(normalized_dir + "/")
            ):
                raise DatasetError(
                    f"Checkpoint filepath '{user_filepath}' resolves to "
                    f"'{normalized_fp}' which is outside the dataset "
                    f"directory '{normalized_dir}'."
                )

    return merged

_read_checkpoint

_read_checkpoint()
Source code in kedro_datasets/partitions/incremental_dataset.py
258
259
260
261
262
263
264
def _read_checkpoint(self) -> str | None:
    if self._force_checkpoint is not None:
        return self._force_checkpoint
    try:
        return self._checkpoint.load()
    except DatasetError:
        return None

confirm

confirm()

Confirm the dataset by updating the checkpoint value to the latest processed partition ID

Source code in kedro_datasets/partitions/incremental_dataset.py
280
281
282
283
284
285
def confirm(self) -> None:
    """Confirm the dataset by updating the checkpoint value to the latest
    processed partition ID"""
    partition_ids = [self._path_to_partition(p) for p in self._list_partitions()]
    if partition_ids:
        self._checkpoint.save(partition_ids[-1])  # checkpoint to last partition

load

load()
Source code in kedro_datasets/partitions/incremental_dataset.py
266
267
268
269
270
271
272
273
274
275
276
277
278
def load(self) -> dict[str, Callable[[], Any]]:
    partitions: dict[str, Any] = {}

    for partition in self._list_partitions():
        partition_id = self._path_to_partition(partition)
        kwargs = deepcopy(self._dataset_config)
        # join the protocol back since PySpark may rely on it
        kwargs[self._filepath_arg] = self._join_protocol(partition)
        partitions[partition_id] = self._dataset_type(  # type: ignore
            **kwargs
        ).load()

    return partitions