AbstractVersionedDataset(filepath, version, exists_function=None, glob_function=None)
Bases: AbstractDataset[_DI, _DO], ABC
AbstractVersionedDataset is the base class for all versioned dataset
implementations.
All datasets that implement versioning should extend this
abstract class and implement the methods marked as abstract.
Example:
from pathlib import Path, PurePosixPath
import pandas as pd
from kedro.io import AbstractVersionedDataset
class MyOwnDataset(AbstractVersionedDataset):
def __init__(self, filepath, version, param1, param2=True):
super().__init__(PurePosixPath(filepath), version)
self._param1 = param1
self._param2 = param2
def load(self) -> pd.DataFrame:
load_path = self._get_load_path()
return pd.read_csv(load_path)
def save(self, df: pd.DataFrame) -> None:
save_path = self._get_save_path()
df.to_csv(str(save_path))
def _exists(self) -> bool:
path = self._get_load_path()
return Path(path.as_posix()).exists()
def _describe(self):
return dict(version=self._version, param1=self._param1, param2=self._param2)
Example catalog.yml specification:
my_dataset:
type: <path-to-my-own-dataset>.MyOwnDataset
filepath: data/01_raw/my_data.csv
versioned: true
param1: <param1-value> # param1 is a required argument
# param2 will be True by default
Parameters:
-
filepath
(PurePosixPath)
–
Filepath in POSIX format to a file.
-
version
(Version | None)
–
If specified, should be an instance of
kedro.io.core.Version. If its load attribute is
None, the latest version will be loaded. If its save
attribute is None, save version will be autogenerated.
-
exists_function
(Callable[[str], bool] | None, default:
None
)
–
Function that is used for determining whether
a path exists in a filesystem.
-
glob_function
(Callable[[str], list[str]] | None, default:
None
)
–
Function that is used for finding all paths
in a filesystem, which match a given pattern.
Source code in kedro/io/core.py
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736 | def __init__(
self,
filepath: PurePosixPath,
version: Version | None,
exists_function: Callable[[str], bool] | None = None,
glob_function: Callable[[str], list[str]] | None = None,
):
"""Creates a new instance of ``AbstractVersionedDataset``.
Args:
filepath: Filepath in POSIX format to a file.
version: If specified, should be an instance of
``kedro.io.core.Version``. If its ``load`` attribute is
None, the latest version will be loaded. If its ``save``
attribute is None, save version will be autogenerated.
exists_function: Function that is used for determining whether
a path exists in a filesystem.
glob_function: Function that is used for finding all paths
in a filesystem, which match a given pattern.
"""
self._filepath = filepath
self._version = version
self._exists_function = exists_function or _local_exists
self._glob_function = glob_function or iglob
self._cached_load_version: str | None = None
self._cached_save_version: str | None = None
|
_cached_load_version
instance-attribute
_cached_load_version = None
_cached_save_version
instance-attribute
_cached_save_version = None
_exists_function
instance-attribute
_exists_function = exists_function or _local_exists
_filepath
instance-attribute
_glob_function
instance-attribute
_glob_function = glob_function or iglob
_version
instance-attribute
_clear_version_cache
Clear both load and save version caches.
Source code in kedro/io/core.py
| def _clear_version_cache(self) -> None:
"""Clear both load and save version caches."""
self._cached_load_version = None
self._cached_save_version = None
|
_fetch_latest_load_version
_fetch_latest_load_version()
Fetch the most recent existing version from the given path.
Results are cached to avoid repeated filesystem operations.
Source code in kedro/io/core.py
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776 | def _fetch_latest_load_version(self) -> str:
"""Fetch the most recent existing version from the given path.
Results are cached to avoid repeated filesystem operations.
"""
# Return cached version if available
if self._cached_load_version is not None:
return self._cached_load_version
# When load version is unpinned, fetch the most recent existing
# version from the given path.
version_paths = self._get_versions()
most_recent = next((path for path in version_paths), None)
if not most_recent:
message = f"Did not find any versions for {self}"
raise VersionNotFoundError(message)
# Cache and return the result
self._cached_load_version = PurePath(most_recent).parent.name
return self._cached_load_version
|
_fetch_latest_save_version
_fetch_latest_save_version()
Generate and cache the current save version
Source code in kedro/io/core.py
778
779
780
781
782
783
784
785
786 | def _fetch_latest_save_version(self) -> str:
"""Generate and cache the current save version"""
# Return cached version if available
if self._cached_save_version is not None:
return self._cached_save_version
# Generate new timestamp and cache it
self._cached_save_version = generate_timestamp()
return self._cached_save_version
|
_get_load_path
Source code in kedro/io/core.py
796
797
798
799
800
801
802 | def _get_load_path(self) -> PurePosixPath:
if not self._version:
# When versioning is disabled, load from original filepath
return self._filepath
load_version = self.resolve_load_version()
return self._get_versioned_path(load_version) # type: ignore[arg-type]
|
_get_save_path
Source code in kedro/io/core.py
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826 | def _get_save_path(self) -> PurePosixPath:
if not self._version:
# When versioning is disabled, return original filepath
return self._filepath
save_version = self.resolve_save_version()
versioned_path = self._get_versioned_path(save_version) # type: ignore[arg-type]
if self._exists_function(str(versioned_path)):
raise DatasetError(
f"Save path '{versioned_path}' for {self!s} must not exist if "
f"versioning is enabled."
)
return versioned_path
|
_get_version_from_path
_get_version_from_path(path)
Extract version string from a versioned dataset path.
Parameters:
-
path
(str)
–
Full path to a versioned dataset file.
Returns:
-
str
–
Version string extracted from the path.
Source code in kedro/io/core.py
837
838
839
840
841
842
843
844
845
846 | def _get_version_from_path(self, path: str) -> str:
"""Extract version string from a versioned dataset path.
Args:
path: Full path to a versioned dataset file.
Returns:
Version string extracted from the path.
"""
return PurePath(path).parent.name
|
_get_versioned_path
_get_versioned_path(version)
Source code in kedro/io/core.py
828
829
830
831
832
833
834
835 | def _get_versioned_path(self, version: str) -> PurePosixPath:
if _is_unsafe_version(version):
raise DatasetError(
f"Version string '{version}' is not allowed. "
"Version strings must be a single non-empty path component with no "
"path separators ('/' or '\\') and must not be '.' or '..'."
)
return self._filepath / version / self._filepath.name
|
_get_versions
Get all existing versions for this dataset, sorted by most recent first.
Source code in kedro/io/core.py
743
744
745
746
747
748
749
750
751
752
753
754
755 | def _get_versions(self) -> list[str]:
"""Get all existing versions for this dataset, sorted by most recent first."""
pattern = str(self._get_versioned_path("*"))
try:
version_paths = sorted(self._glob_function(pattern), reverse=True)
except Exception as exc:
message = (
f"Did not find any versions for {self}. This could be "
f"due to insufficient permission. Exception: {exc}"
)
raise VersionNotFoundError(message) from exc
return [path for path in version_paths if self._exists_function(path)]
|
_release
Source code in kedro/io/core.py
| def _release(self) -> None:
super()._release()
self._clear_version_cache()
|
_save_wrapper
classmethod
Decorate save_func with logging and error handling code.
Source code in kedro/io/core.py
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927 | @classmethod
def _save_wrapper(
cls, save_func: Callable[[Self, _DI], None]
) -> Callable[[Self, _DI], None]:
"""Decorate `save_func` with logging and error handling code."""
@wraps(save_func)
def save(self: Self, data: _DI) -> None:
self._clear_version_cache()
save_version = (
self.resolve_save_version()
) # Make sure last save version is set
try:
super()._save_wrapper(save_func)(self, data)
except (FileNotFoundError, NotADirectoryError) as err:
# FileNotFoundError raised in Win, NotADirectoryError raised in Unix
_default_version = "YYYY-MM-DDThh.mm.ss.sssZ"
raise DatasetError(
f"Cannot save versioned dataset '{self._filepath.name}' to "
f"'{self._filepath.parent.as_posix()}' because a file with the same "
f"name already exists in the directory. This is likely because "
f"versioning was enabled on a dataset already saved previously. Either "
f"remove '{self._filepath.name}' from the directory or manually "
f"convert it into a versioned dataset by placing it in a versioned "
f"directory (e.g. with default versioning format "
f"'{self._filepath.as_posix()}/{_default_version}/{self._filepath.name}"
f"')."
) from err
load_version = self.resolve_load_version()
if load_version != save_version:
warnings.warn(
_CONSISTENCY_WARNING.format(save_version, load_version, str(self))
)
self._clear_version_cache()
return save
|
exists
Checks whether a dataset's output already exists by calling
the provided _exists() method.
Returns:
-
bool
–
Flag indicating whether the output already exists.
Raises:
Source code in kedro/io/core.py
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947 | def exists(self) -> bool:
"""Checks whether a dataset's output already exists by calling
the provided _exists() method.
Returns:
Flag indicating whether the output already exists.
Raises:
DatasetError: when underlying exists method raises error.
"""
self._logger.debug("Checking whether target of %s exists", str(self))
try:
return self._exists()
except VersionNotFoundError:
return False
except Exception as exc: # SKIP_IF_NO_SPARK
message = f"Failed during exists check for dataset {self!s}.\n{exc!s}"
raise DatasetError(message) from exc
|
list_versions
list_versions(full_path=True)
List all available versions of this dataset.
This method allows you to retrieve all existing versions of a versioned
dataset. It's useful for tracking dataset history, auditing changes, or
implementing custom version selection logic.
Parameters:
-
full_path
(bool, default:
True
)
–
If True, returns the full path to each version.
If False, returns only the version strings (timestamps).
Returns:
-
list[str]
–
A list of version paths (if full_path=True) or version strings
-
list[str]
–
(if full_path=False), sorted in reverse chronological order
-
list[str]
–
Raises:
-
VersionNotFoundError
–
If the dataset has no versions or if there
are permission issues accessing the version directory.
Example
::
>>> dataset = MyVersionedDataset(
... filepath="data/model.pkl",
... version=Version(None, None)
... )
>>> # After saving multiple versions...
>>> versions = dataset.list_versions(full_path=False)
>>> print(versions)
['2024-01-15T10.30.00.000Z', '2024-01-14T09.15.00.000Z']
>>> # Get full paths
>>> paths = dataset.list_versions(full_path=True)
>>> print(paths[0])
'data/model.pkl/2024-01-15T10.30.00.000Z/model.pkl'
Source code in kedro/io/core.py
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889 | def list_versions(self, full_path: bool = True) -> list[str]:
"""List all available versions of this dataset.
This method allows you to retrieve all existing versions of a versioned
dataset. It's useful for tracking dataset history, auditing changes, or
implementing custom version selection logic.
Args:
full_path: If True, returns the full path to each version.
If False, returns only the version strings (timestamps).
Returns:
A list of version paths (if full_path=True) or version strings
(if full_path=False), sorted in reverse chronological order
(most recent first).
Raises:
VersionNotFoundError: If the dataset has no versions or if there
are permission issues accessing the version directory.
Example:
::
>>> dataset = MyVersionedDataset(
... filepath="data/model.pkl",
... version=Version(None, None)
... )
>>> # After saving multiple versions...
>>> versions = dataset.list_versions(full_path=False)
>>> print(versions)
['2024-01-15T10.30.00.000Z', '2024-01-14T09.15.00.000Z']
>>> # Get full paths
>>> paths = dataset.list_versions(full_path=True)
>>> print(paths[0])
'data/model.pkl/2024-01-15T10.30.00.000Z/model.pkl'
"""
version_paths = self._get_versions()
if full_path:
return version_paths
return [self._get_version_from_path(path) for path in version_paths]
|
resolve_load_version
Compute the version the dataset should be loaded with.
Source code in kedro/io/core.py
788
789
790
791
792
793
794 | def resolve_load_version(self) -> str | None:
"""Compute the version the dataset should be loaded with."""
if not self._version:
return None
if self._version.load:
return self._version.load # type: ignore[no-any-return]
return self._fetch_latest_load_version()
|
resolve_save_version
Compute the version the dataset should be saved with.
Source code in kedro/io/core.py
804
805
806
807
808
809
810 | def resolve_save_version(self) -> str | None:
"""Compute the version the dataset should be saved with."""
if not self._version:
return None
if self._version.save:
return self._version.save # type: ignore[no-any-return]
return self._fetch_latest_save_version()
|