Skip to content

AbstractVersionedDataset

kedro.io.AbstractVersionedDataset

AbstractVersionedDataset(filepath, version, exists_function=None, glob_function=None)

Bases: AbstractDataset[_DI, _DO], ABC

AbstractVersionedDataset is the base class for all versioned dataset implementations.

All datasets that implement versioning should extend this abstract class and implement the methods marked as abstract.

Example:

from pathlib import Path, PurePosixPath
import pandas as pd
from kedro.io import AbstractVersionedDataset


class MyOwnDataset(AbstractVersionedDataset):
    def __init__(self, filepath, version, param1, param2=True):
        super().__init__(PurePosixPath(filepath), version)
        self._param1 = param1
        self._param2 = param2

    def load(self) -> pd.DataFrame:
        load_path = self._get_load_path()
        return pd.read_csv(load_path)

    def save(self, df: pd.DataFrame) -> None:
        save_path = self._get_save_path()
        df.to_csv(str(save_path))

    def _exists(self) -> bool:
        path = self._get_load_path()
        return Path(path.as_posix()).exists()

    def _describe(self):
        return dict(version=self._version, param1=self._param1, param2=self._param2)

Example catalog.yml specification:

my_dataset:
    type: <path-to-my-own-dataset>.MyOwnDataset
    filepath: data/01_raw/my_data.csv
    versioned: true
    param1: <param1-value> # param1 is a required argument
    # param2 will be True by default

Parameters:

  • filepath (PurePosixPath) –

    Filepath in POSIX format to a file.

  • version (Version | None) –

    If specified, should be an instance of kedro.io.core.Version. If its load attribute is None, the latest version will be loaded. If its save attribute is None, save version will be autogenerated.

  • exists_function (Callable[[str], bool] | None, default: None ) –

    Function that is used for determining whether a path exists in a filesystem.

  • glob_function (Callable[[str], list[str]] | None, default: None ) –

    Function that is used for finding all paths in a filesystem, which match a given pattern.

Source code in kedro/io/core.py
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
def __init__(
    self,
    filepath: PurePosixPath,
    version: Version | None,
    exists_function: Callable[[str], bool] | None = None,
    glob_function: Callable[[str], list[str]] | None = None,
):
    """Creates a new instance of ``AbstractVersionedDataset``.

    Args:
        filepath: Filepath in POSIX format to a file.
        version: If specified, should be an instance of
            ``kedro.io.core.Version``. If its ``load`` attribute is
            None, the latest version will be loaded. If its ``save``
            attribute is None, save version will be autogenerated.
        exists_function: Function that is used for determining whether
            a path exists in a filesystem.
        glob_function: Function that is used for finding all paths
            in a filesystem, which match a given pattern.
    """
    self._filepath = filepath
    self._version = version
    self._exists_function = exists_function or _local_exists
    self._glob_function = glob_function or iglob
    self._cached_load_version: str | None = None
    self._cached_save_version: str | None = None

_cached_load_version instance-attribute

_cached_load_version = None

_cached_save_version instance-attribute

_cached_save_version = None

_exists_function instance-attribute

_exists_function = exists_function or _local_exists

_filepath instance-attribute

_filepath = filepath

_glob_function instance-attribute

_glob_function = glob_function or iglob

_version instance-attribute

_version = version

_clear_version_cache

_clear_version_cache()

Clear both load and save version caches.

Source code in kedro/io/core.py
738
739
740
741
def _clear_version_cache(self) -> None:
    """Clear both load and save version caches."""
    self._cached_load_version = None
    self._cached_save_version = None

_fetch_latest_load_version

_fetch_latest_load_version()

Fetch the most recent existing version from the given path. Results are cached to avoid repeated filesystem operations.

Source code in kedro/io/core.py
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
def _fetch_latest_load_version(self) -> str:
    """Fetch the most recent existing version from the given path.
    Results are cached to avoid repeated filesystem operations.
    """
    # Return cached version if available
    if self._cached_load_version is not None:
        return self._cached_load_version

    # When load version is unpinned, fetch the most recent existing
    # version from the given path.
    version_paths = self._get_versions()

    most_recent = next((path for path in version_paths), None)
    if not most_recent:
        message = f"Did not find any versions for {self}"
        raise VersionNotFoundError(message)

    # Cache and return the result
    self._cached_load_version = PurePath(most_recent).parent.name
    return self._cached_load_version

_fetch_latest_save_version

_fetch_latest_save_version()

Generate and cache the current save version

Source code in kedro/io/core.py
778
779
780
781
782
783
784
785
786
def _fetch_latest_save_version(self) -> str:
    """Generate and cache the current save version"""
    # Return cached version if available
    if self._cached_save_version is not None:
        return self._cached_save_version

    # Generate new timestamp and cache it
    self._cached_save_version = generate_timestamp()
    return self._cached_save_version

_get_load_path

_get_load_path()
Source code in kedro/io/core.py
796
797
798
799
800
801
802
def _get_load_path(self) -> PurePosixPath:
    if not self._version:
        # When versioning is disabled, load from original filepath
        return self._filepath

    load_version = self.resolve_load_version()
    return self._get_versioned_path(load_version)  # type: ignore[arg-type]

_get_save_path

_get_save_path()
Source code in kedro/io/core.py
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
def _get_save_path(self) -> PurePosixPath:
    if not self._version:
        # When versioning is disabled, return original filepath
        return self._filepath

    save_version = self.resolve_save_version()
    versioned_path = self._get_versioned_path(save_version)  # type: ignore[arg-type]

    if self._exists_function(str(versioned_path)):
        raise DatasetError(
            f"Save path '{versioned_path}' for {self!s} must not exist if "
            f"versioning is enabled."
        )

    return versioned_path

_get_version_from_path

_get_version_from_path(path)

Extract version string from a versioned dataset path.

Parameters:

  • path (str) –

    Full path to a versioned dataset file.

Returns:

  • str

    Version string extracted from the path.

Source code in kedro/io/core.py
837
838
839
840
841
842
843
844
845
846
def _get_version_from_path(self, path: str) -> str:
    """Extract version string from a versioned dataset path.

    Args:
        path: Full path to a versioned dataset file.

    Returns:
        Version string extracted from the path.
    """
    return PurePath(path).parent.name

_get_versioned_path

_get_versioned_path(version)
Source code in kedro/io/core.py
828
829
830
831
832
833
834
835
def _get_versioned_path(self, version: str) -> PurePosixPath:
    if _is_unsafe_version(version):
        raise DatasetError(
            f"Version string '{version}' is not allowed. "
            "Version strings must be a single non-empty path component with no "
            "path separators ('/' or '\\') and must not be '.' or '..'."
        )
    return self._filepath / version / self._filepath.name

_get_versions

_get_versions()

Get all existing versions for this dataset, sorted by most recent first.

Source code in kedro/io/core.py
743
744
745
746
747
748
749
750
751
752
753
754
755
def _get_versions(self) -> list[str]:
    """Get all existing versions for this dataset, sorted by most recent first."""
    pattern = str(self._get_versioned_path("*"))
    try:
        version_paths = sorted(self._glob_function(pattern), reverse=True)
    except Exception as exc:
        message = (
            f"Did not find any versions for {self}. This could be "
            f"due to insufficient permission. Exception: {exc}"
        )
        raise VersionNotFoundError(message) from exc

    return [path for path in version_paths if self._exists_function(path)]

_release

_release()
Source code in kedro/io/core.py
949
950
951
def _release(self) -> None:
    super()._release()
    self._clear_version_cache()

_save_wrapper classmethod

_save_wrapper(save_func)

Decorate save_func with logging and error handling code.

Source code in kedro/io/core.py
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
@classmethod
def _save_wrapper(
    cls, save_func: Callable[[Self, _DI], None]
) -> Callable[[Self, _DI], None]:
    """Decorate `save_func` with logging and error handling code."""

    @wraps(save_func)
    def save(self: Self, data: _DI) -> None:
        self._clear_version_cache()
        save_version = (
            self.resolve_save_version()
        )  # Make sure last save version is set
        try:
            super()._save_wrapper(save_func)(self, data)
        except (FileNotFoundError, NotADirectoryError) as err:
            # FileNotFoundError raised in Win, NotADirectoryError raised in Unix
            _default_version = "YYYY-MM-DDThh.mm.ss.sssZ"
            raise DatasetError(
                f"Cannot save versioned dataset '{self._filepath.name}' to "
                f"'{self._filepath.parent.as_posix()}' because a file with the same "
                f"name already exists in the directory. This is likely because "
                f"versioning was enabled on a dataset already saved previously. Either "
                f"remove '{self._filepath.name}' from the directory or manually "
                f"convert it into a versioned dataset by placing it in a versioned "
                f"directory (e.g. with default versioning format "
                f"'{self._filepath.as_posix()}/{_default_version}/{self._filepath.name}"
                f"')."
            ) from err

        load_version = self.resolve_load_version()
        if load_version != save_version:
            warnings.warn(
                _CONSISTENCY_WARNING.format(save_version, load_version, str(self))
            )
            self._clear_version_cache()

    return save

exists

exists()

Checks whether a dataset's output already exists by calling the provided _exists() method.

Returns:

  • bool

    Flag indicating whether the output already exists.

Raises:

  • DatasetError

    when underlying exists method raises error.

Source code in kedro/io/core.py
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
def exists(self) -> bool:
    """Checks whether a dataset's output already exists by calling
    the provided _exists() method.

    Returns:
        Flag indicating whether the output already exists.

    Raises:
        DatasetError: when underlying exists method raises error.

    """
    self._logger.debug("Checking whether target of %s exists", str(self))
    try:
        return self._exists()
    except VersionNotFoundError:
        return False
    except Exception as exc:  # SKIP_IF_NO_SPARK
        message = f"Failed during exists check for dataset {self!s}.\n{exc!s}"
        raise DatasetError(message) from exc

list_versions

list_versions(full_path=True)

List all available versions of this dataset.

This method allows you to retrieve all existing versions of a versioned dataset. It's useful for tracking dataset history, auditing changes, or implementing custom version selection logic.

Parameters:

  • full_path (bool, default: True ) –

    If True, returns the full path to each version. If False, returns only the version strings (timestamps).

Returns:

  • list[str]

    A list of version paths (if full_path=True) or version strings

  • list[str]

    (if full_path=False), sorted in reverse chronological order

  • list[str]

    (most recent first).

Raises:

  • VersionNotFoundError

    If the dataset has no versions or if there are permission issues accessing the version directory.

Example

::

>>> dataset = MyVersionedDataset(
...     filepath="data/model.pkl",
...     version=Version(None, None)
... )
>>> # After saving multiple versions...
>>> versions = dataset.list_versions(full_path=False)
>>> print(versions)
['2024-01-15T10.30.00.000Z', '2024-01-14T09.15.00.000Z']
>>> # Get full paths
>>> paths = dataset.list_versions(full_path=True)
>>> print(paths[0])
'data/model.pkl/2024-01-15T10.30.00.000Z/model.pkl'
Source code in kedro/io/core.py
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
def list_versions(self, full_path: bool = True) -> list[str]:
    """List all available versions of this dataset.

    This method allows you to retrieve all existing versions of a versioned
    dataset. It's useful for tracking dataset history, auditing changes, or
    implementing custom version selection logic.

    Args:
        full_path: If True, returns the full path to each version.
            If False, returns only the version strings (timestamps).

    Returns:
        A list of version paths (if full_path=True) or version strings
        (if full_path=False), sorted in reverse chronological order
        (most recent first).

    Raises:
        VersionNotFoundError: If the dataset has no versions or if there
            are permission issues accessing the version directory.

    Example:
        ::

            >>> dataset = MyVersionedDataset(
            ...     filepath="data/model.pkl",
            ...     version=Version(None, None)
            ... )
            >>> # After saving multiple versions...
            >>> versions = dataset.list_versions(full_path=False)
            >>> print(versions)
            ['2024-01-15T10.30.00.000Z', '2024-01-14T09.15.00.000Z']
            >>> # Get full paths
            >>> paths = dataset.list_versions(full_path=True)
            >>> print(paths[0])
            'data/model.pkl/2024-01-15T10.30.00.000Z/model.pkl'
    """
    version_paths = self._get_versions()

    if full_path:
        return version_paths

    return [self._get_version_from_path(path) for path in version_paths]

resolve_load_version

resolve_load_version()

Compute the version the dataset should be loaded with.

Source code in kedro/io/core.py
788
789
790
791
792
793
794
def resolve_load_version(self) -> str | None:
    """Compute the version the dataset should be loaded with."""
    if not self._version:
        return None
    if self._version.load:
        return self._version.load  # type: ignore[no-any-return]
    return self._fetch_latest_load_version()

resolve_save_version

resolve_save_version()

Compute the version the dataset should be saved with.

Source code in kedro/io/core.py
804
805
806
807
808
809
810
def resolve_save_version(self) -> str | None:
    """Compute the version the dataset should be saved with."""
    if not self._version:
        return None
    if self._version.save:
        return self._version.save  # type: ignore[no-any-return]
    return self._fetch_latest_save_version()