Skip to content

kedro.io.SharedMemoryDataCatalog

kedro.io.SharedMemoryDataCatalog

SharedMemoryDataCatalog(datasets=None, raw_data=None, config_resolver=None, load_versions=None, save_version=None)

Bases: DataCatalog

A specialized DataCatalog for managing datasets in a shared memory context.

The SharedMemoryDataCatalog extends the base DataCatalog to support multiprocessing by ensuring that datasets are serializable and synchronized across threads or processes. It provides additional functionality for managing shared memory datasets, such as setting a multiprocessing manager and validating dataset compatibility with multiprocessing.

Attributes:

  • default_runtime_patterns (ClassVar) –

    A dictionary defining the default runtime pattern for datasets of type kedro.io.SharedMemoryDataset.

Example: ::

>>> from multiprocessing.managers import SyncManager
>>> from kedro.io import MemoryDataset
>>> from kedro.io.data_catalog import SharedMemoryDataCatalog
>>>
>>> # Create a shared memory catalog
>>> catalog = SharedMemoryDataCatalog(
...     datasets={"shared_data": MemoryDataset(data=[1, 2, 3])}
... )
>>>
>>> # Set a multiprocessing manager
>>> manager = SyncManager()
>>> manager.start()
>>> catalog.set_manager_datasets(manager)
>>>
>>> # Validate the catalog for multiprocessing compatibility
>>> catalog.validate_catalog()
Source code in kedro/io/data_catalog.py
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
def __init__(
    self,
    datasets: dict[str, AbstractDataset] | None = None,
    raw_data: dict[str, Any] | None = None,
    config_resolver: CatalogConfigResolver | None = None,
    load_versions: dict[str, str] | None = None,
    save_version: str | None = None,
) -> None:
    """``DataCatalog`` stores instances of ``AbstractDataset``
    implementations to provide ``load`` and ``save`` capabilities from
    anywhere in the program. To use a ``DataCatalog``, you need to
    instantiate it with a dictionary of datasets. Then it will act as a
    single point of reference for your calls, relaying load and save
    functions to the underlying datasets.

    Note: ``DataCatalog`` is an experimental feature and is under active development.
    Therefore, it is possible we'll introduce breaking changes to this class, so be mindful
    of that if you decide to use it already.

    Args:
        datasets: A dictionary of dataset names and dataset instances.
        raw_data: A dictionary with data to be added in memory as `MemoryDataset`` instances.
            Keys represent dataset names and the values are raw data.
        config_resolver: An instance of CatalogConfigResolver to resolve dataset factory patterns and configurations.
        load_versions: A mapping between dataset names and versions
            to load. Has no effect on datasets without enabled versioning.
        save_version: Version string to be used for ``save`` operations
            by all datasets with enabled versioning. It must: a) be a
            case-insensitive string that conforms with operating system
            filename limitations, b) always return the latest version when
            sorted in lexicographical order.

    Example:
    ::

        >>> from kedro.io import DataCatalog, MemoryDataset
        >>> from kedro_datasets.pandas import CSVDataset

        >>> # Define datasets
        >>> datasets = {
        ...     "cars": CSVDataset(filepath="cars.csv"),
        ...     "planes": MemoryDataset(data={"type": "jet", "capacity": 200}),
        ... }

        >>> # Define raw data
        >>> raw_data = {
        ...     "raw_numbers": [1, 2, 3, 4, 5],
        ... }

        >>> # Initialize the catalog
        >>> catalog = DataCatalog(
        ...     datasets=datasets,
        ...     raw_data=raw_data,
        ...     load_versions={"cars": "2023-01-01T00.00.00"},
        ...     save_version="2023-01-02T00.00.00",
        ... )

        >>> print(catalog)
    """
    self._config_resolver = config_resolver or CatalogConfigResolver(
        default_runtime_patterns=self.default_runtime_patterns
    )
    self._datasets: dict[str, AbstractDataset] = datasets or {}
    self._lazy_datasets: dict[str, _LazyDataset] = {}
    self._load_versions, self._save_version = self._validate_versions(
        datasets, load_versions or {}, save_version
    )

    self._use_rich_markup = _has_rich_handler()

    for ds_name, ds_config in self._config_resolver.config.items():
        self._add_from_config(ds_name, ds_config)

    raw_data = raw_data or {}
    for ds_name, data in raw_data.items():
        self[ds_name] = data  # type: ignore[has-type]

default_runtime_patterns class-attribute instance-attribute

default_runtime_patterns = {'{default}': {'type': 'kedro.io.SharedMemoryDataset'}}

set_manager_datasets

set_manager_datasets(manager)

Associate a multiprocessing manager with all shared memory datasets in the catalog.

This method iterates through all datasets in the catalog and sets the provided multiprocessing manager for datasets of type SharedMemoryDataset. This ensures that these datasets are properly synchronized across threads or processes.

Parameters:

  • manager (SyncManager) –

    A multiprocessing manager to be associated with shared memory datasets.

Example: ::

>>> from multiprocessing.managers import SyncManager
>>> from kedro.io.data_catalog import SharedMemoryDataCatalog
>>>
>>> catalog = SharedMemoryDataCatalog(
...     datasets={"shared_data": MemoryDataset(data=[1, 2, 3])}
... )
>>>
>>> manager = SyncManager()
>>> manager.start()
>>> catalog.set_manager_datasets(manager)
>>> print(catalog)
# {'shared_data': kedro.io.memory_dataset.MemoryDataset(data='<list>')}
Source code in kedro/io/data_catalog.py
1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
1237
1238
1239
1240
def set_manager_datasets(self, manager: SyncManager) -> None:
    """
    Associate a multiprocessing manager with all shared memory datasets in the catalog.

    This method iterates through all datasets in the catalog and sets the provided
    multiprocessing manager for datasets of type `SharedMemoryDataset`. This ensures
    that these datasets are properly synchronized across threads or processes.

    Args:
        manager: A multiprocessing manager to be associated with
            shared memory datasets.

    Example:
    ::

        >>> from multiprocessing.managers import SyncManager
        >>> from kedro.io.data_catalog import SharedMemoryDataCatalog
        >>>
        >>> catalog = SharedMemoryDataCatalog(
        ...     datasets={"shared_data": MemoryDataset(data=[1, 2, 3])}
        ... )
        >>>
        >>> manager = SyncManager()
        >>> manager.start()
        >>> catalog.set_manager_datasets(manager)
        >>> print(catalog)
        # {'shared_data': kedro.io.memory_dataset.MemoryDataset(data='<list>')}
    """
    for _, ds in self._datasets.items():
        if isinstance(ds, SharedMemoryDataset):
            ds.set_manager(manager)

validate_catalog

validate_catalog()

Validate the catalog to ensure all datasets are serializable and compatible with multiprocessing.

This method checks that all datasets in the catalog are serializable and do not include non-proxied memory datasets as outputs. Non-serializable datasets or datasets that rely on single-process memory cannot be used in a multiprocessing context. If any such datasets are found, an exception is raised with details.

Raises:

  • AttributeError

    If any datasets are found to be non-serializable or incompatible with multiprocessing.

Example: ::

>>> from kedro.io.data_catalog import SharedMemoryDataCatalog
>>>
>>> catalog = SharedMemoryDataCatalog(
...     datasets={"shared_data": MemoryDataset(data=[1, 2, 3])}
... )
>>>
>>> try:
...     catalog.validate_catalog()
... except AttributeError as e:
...     print(f"Validation failed: {e}")
# No error
Source code in kedro/io/data_catalog.py
1242
1243
1244
1245
1246
1247
1248
1249
1250
1251
1252
1253
1254
1255
1256
1257
1258
1259
1260
1261
1262
1263
1264
1265
1266
1267
1268
1269
1270
1271
1272
1273
1274
1275
1276
1277
1278
1279
1280
1281
1282
1283
1284
1285
1286
1287
1288
def validate_catalog(self) -> None:
    """
    Validate the catalog to ensure all datasets are serializable and compatible with multiprocessing.

    This method checks that all datasets in the catalog are serializable and do not
    include non-proxied memory datasets as outputs. Non-serializable datasets or
    datasets that rely on single-process memory cannot be used in a multiprocessing
    context. If any such datasets are found, an exception is raised with details.

    Raises:
        AttributeError: If any datasets are found to be non-serializable or incompatible
            with multiprocessing.

    Example:
    ::

        >>> from kedro.io.data_catalog import SharedMemoryDataCatalog
        >>>
        >>> catalog = SharedMemoryDataCatalog(
        ...     datasets={"shared_data": MemoryDataset(data=[1, 2, 3])}
        ... )
        >>>
        >>> try:
        ...     catalog.validate_catalog()
        ... except AttributeError as e:
        ...     print(f"Validation failed: {e}")
        # No error
    """
    unserialisable = []
    for name, dataset in self._datasets.items():
        if getattr(dataset, "_SINGLE_PROCESS", False):  # SKIP_IF_NO_SPARK
            unserialisable.append(name)
            continue
        try:
            ForkingPickler.dumps(dataset)
        except (AttributeError, PicklingError):
            unserialisable.append(name)

    if unserialisable:
        raise AttributeError(
            f"The following datasets cannot be used with multiprocessing: "
            f"{sorted(unserialisable)}\nIn order to utilize multiprocessing you "
            f"need to make sure all datasets are serialisable, i.e. datasets "
            f"should not make use of lambda functions, nested functions, closures "
            f"etc.\nIf you are using custom decorators ensure they are correctly "
            f"decorated using functools.wraps()."
        )