Skip to content

LazyPolarsDataset

LazyPolarsDataset loads and saves data using Polars in lazy execution mode.

kedro_datasets.polars.LazyPolarsDataset

LazyPolarsDataset(
    *,
    filepath,
    file_format,
    load_args=None,
    save_args=None,
    version=None,
    credentials=None,
    fs_args=None,
    metadata=None
)

Bases: AbstractVersionedDataset[LazyFrame, LazyFrame | DataFrame]

LazyPolarsDataset loads/saves data from/to a data file using an underlying filesystem (e.g.: local, S3, GCS). It uses Polars to handle the type of read/write target. It uses lazy loading with Polars Lazy API, but it can save both Lazy and Eager Polars DataFrames.

Examples:

Using the YAML API:

cars:
  type: polars.LazyPolarsDataset
  filepath: data/01_raw/company/cars.csv
  load_args:
    sep: ","
    parse_dates: False
  save_args:
    has_header: False
    null_value: "somenullstring"

motorbikes:
  type: polars.LazyPolarsDataset
  filepath: s3://your_bucket/data/02_intermediate/company/motorbikes.csv
  credentials: dev_s3

Using the Python API:

>>> import polars as pl
>>> from kedro_datasets.polars import LazyPolarsDataset
>>>
>>> data = pl.DataFrame({"col1": [1, 2], "col2": [4, 5], "col3": [5, 6]})
>>>
>>> dataset = LazyPolarsDataset(filepath=tmp_path / "test.csv", file_format="csv")
>>> dataset.save(data)
>>> reloaded = dataset.load()
>>> assert data.equals(reloaded.collect())

Parameters:

  • filepath (str) –

    Filepath in POSIX format to a file prefixed with a protocol like s3://. If prefix is not provided, file protocol (local filesystem) will be used. The prefix should be any protocol supported by fsspec. Key assumption: The first argument of either load/save method points to a filepath/buffer/io type location. There are some read/write targets such as 'clipboard' or 'records' that will fail since they do not take a filepath like argument.

  • file_format (str) –

    String which is used to match the appropriate load/save method on a best effort basis. For example if 'csv' is passed the polars.read_csv and polars.DataFrame.write_csv methods will be identified. An error will be raised unless at least one matching read_{file_format} or write_{file_format}.

  • load_args (dict[str, Any] | None, default: None ) –

    polars options for loading files. Here you can find all available arguments: https://pola-rs.github.io/polars/py-polars/html/reference/io.html All defaults are preserved.

  • save_args (dict[str, Any] | None, default: None ) –

    Polars options for saving files. Here you can find all available arguments: https://pola-rs.github.io/polars/py-polars/html/reference/io.html All defaults are preserved.

  • version (Version | None, default: None ) –

    If specified, should be an instance of kedro.io.core.Version. If its load attribute is None, the latest version will be loaded. If its save attribute is None, save version will be autogenerated.

  • credentials (dict[str, Any] | None, default: None ) –

    Credentials required to get access to the underlying filesystem. E.g. for GCSFileSystem it should look like {"token": None}.

  • fs_args (dict[str, Any] | None, default: None ) –

    Extra arguments to pass into underlying filesystem class constructor (e.g. {"project": "my-project"} for GCSFileSystem), as well as to pass to the filesystem's open method through nested keys open_args_load and open_args_save. Here you can find all available arguments for open: https://filesystem-spec.readthedocs.io/en/latest/api.html#fsspec.spec.AbstractFileSystem.open All defaults are preserved, except mode, which is set to wb when saving.

  • metadata (dict[str, Any] | None, default: None ) –

    Any arbitrary metadata. This is ignored by Kedro, but may be consumed by users or external plugins.

Raises: DatasetError: Will be raised if at least less than one appropriate read or write methods are identified.

Source code in kedro_datasets/polars/lazy_polars_dataset.py
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
def __init__(  # noqa: PLR0913
    self,
    *,
    filepath: str,
    file_format: str,
    load_args: dict[str, Any] | None = None,
    save_args: dict[str, Any] | None = None,
    version: Version | None = None,
    credentials: dict[str, Any] | None = None,
    fs_args: dict[str, Any] | None = None,
    metadata: dict[str, Any] | None = None,
) -> None:
    """Creates a new instance of ``LazyPolarsDataset`` pointing to a concrete
    data file on a specific filesystem.

    Args:
        filepath: Filepath in POSIX format to a file prefixed with a protocol like
            `s3://`.
            If prefix is not provided, `file` protocol (local filesystem)
            will be used.
            The prefix should be any protocol supported by ``fsspec``.
            Key assumption: The first argument of either load/save method points to
            a filepath/buffer/io type location. There are some read/write targets
            such as 'clipboard' or 'records' that will fail since they do not take a
            filepath like argument.
        file_format: String which is used to match the appropriate load/save method
            on a best effort basis. For example if 'csv' is passed the
            `polars.read_csv` and
            `polars.DataFrame.write_csv` methods will be identified. An error will
            be raised unless
            at least one matching `read_{file_format}` or `write_{file_format}`.
        load_args: polars options for loading files.
            Here you can find all available arguments:
            https://pola-rs.github.io/polars/py-polars/html/reference/io.html
            All defaults are preserved.
        save_args: Polars options for saving files.
            Here you can find all available arguments:
            https://pola-rs.github.io/polars/py-polars/html/reference/io.html
            All defaults are preserved.
        version: If specified, should be an instance of
            ``kedro.io.core.Version``. If its ``load`` attribute is
            None, the latest version will be loaded. If its ``save``
            attribute is None, save version will be autogenerated.
        credentials: Credentials required to get access to the underlying filesystem.
            E.g. for ``GCSFileSystem`` it should look like `{"token": None}`.
        fs_args: Extra arguments to pass into underlying filesystem class constructor
            (e.g. `{"project": "my-project"}` for ``GCSFileSystem``), as well as
            to pass to the filesystem's `open` method through nested keys
            `open_args_load` and `open_args_save`.
            Here you can find all available arguments for `open`:
            https://filesystem-spec.readthedocs.io/en/latest/api.html#fsspec.spec.AbstractFileSystem.open
            All defaults are preserved, except `mode`, which is set to `wb` when saving.
        metadata: Any arbitrary metadata.
            This is ignored by Kedro, but may be consumed by users or external plugins.
    Raises:
        DatasetError: Will be raised if at least less than one appropriate
            read or write methods are identified.
    """
    self._file_format = file_format.lower()

    if self._file_format not in ACCEPTED_FILE_FORMATS:
        raise DatasetError(
            f"'{self._file_format}' is not an accepted format "
            f"({ACCEPTED_FILE_FORMATS}) ensure that your 'file_format' parameter "
            "has been defined correctly as per the Polars API "
            "https://pola-rs.github.io/polars/py-polars/html/reference/io.html"
        )

    _fs_args = deepcopy(fs_args) or {}
    _fs_open_args_load = _fs_args.pop("open_args_load", {})
    _fs_open_args_save = _fs_args.pop("open_args_save", {})
    _credentials = deepcopy(credentials) or {}

    protocol, path = get_protocol_and_path(filepath, version)
    if protocol == "file":
        _fs_args.setdefault("auto_mkdir", True)

    self._protocol = protocol
    self._storage_options = {**_credentials, **_fs_args}
    self._fs = fsspec.filesystem(self._protocol, **self._storage_options)

    self.metadata = metadata

    super().__init__(
        filepath=PurePosixPath(path),
        version=version,
        exists_function=self._fs.exists,
        glob_function=self._fs.glob,
    )

    # Handle default load and save and fs arguments
    self._load_args = {**self.DEFAULT_LOAD_ARGS, **(load_args or {})}
    self._save_args = {**self.DEFAULT_SAVE_ARGS, **(save_args or {})}
    self._fs_open_args_load = {
        **self.DEFAULT_FS_ARGS.get("open_args_load", {}),
        **(_fs_open_args_load or {}),
    }
    self._fs_open_args_save = {
        **self.DEFAULT_FS_ARGS.get("open_args_save", {}),
        **(_fs_open_args_save or {}),
    }

    if "storage_options" in self._save_args or "storage_options" in self._load_args:
        logger.warning(
            "Dropping 'storage_options' for %s, "
            "please specify them under 'fs_args' or 'credentials'.",
            self._filepath,
        )
        self._save_args.pop("storage_options", None)
        self._load_args.pop("storage_options", None)

DEFAULT_FS_ARGS class-attribute instance-attribute

DEFAULT_FS_ARGS = {'open_args_save': {'mode': 'wb'}}

DEFAULT_LOAD_ARGS class-attribute

DEFAULT_LOAD_ARGS = {}

DEFAULT_SAVE_ARGS class-attribute

DEFAULT_SAVE_ARGS = {}

_file_format instance-attribute

_file_format = lower()

_fs instance-attribute

_fs = filesystem(_protocol, **(_storage_options))

_fs_open_args_load instance-attribute

_fs_open_args_load = {
    None: get("open_args_load", {}),
    None: _fs_open_args_load or {},
}

_fs_open_args_save instance-attribute

_fs_open_args_save = {
    None: get("open_args_save", {}),
    None: _fs_open_args_save or {},
}

_load_args instance-attribute

_load_args = {
    None: DEFAULT_LOAD_ARGS,
    None: load_args or {},
}

_protocol instance-attribute

_protocol = protocol

_save_args instance-attribute

_save_args = {
    None: DEFAULT_SAVE_ARGS,
    None: save_args or {},
}

_storage_options instance-attribute

_storage_options = {None: _credentials, None: _fs_args}

metadata instance-attribute

metadata = metadata

_describe

_describe()
Source code in kedro_datasets/polars/lazy_polars_dataset.py
190
191
192
193
194
195
196
197
def _describe(self) -> dict[str, Any]:
    return {
        "filepath": self._filepath,
        "protocol": self._protocol,
        "load_args": self._load_args,
        "save_args": self._save_args,
        "version": self._version,
    }

_exists

_exists()
Source code in kedro_datasets/polars/lazy_polars_dataset.py
252
253
254
255
256
257
258
def _exists(self) -> bool:
    try:
        load_path = get_filepath_str(self._get_load_path(), self._protocol)
    except DatasetError:  # pragma: no cover
        return False

    return self._fs.exists(load_path)

_invalidate_cache

_invalidate_cache()

Invalidate underlying filesystem caches.

Source code in kedro_datasets/polars/lazy_polars_dataset.py
264
265
266
267
def _invalidate_cache(self) -> None:
    """Invalidate underlying filesystem caches."""
    filepath = get_filepath_str(self._filepath, self._protocol)
    self._fs.invalidate_cache(filepath)

_release

_release()
Source code in kedro_datasets/polars/lazy_polars_dataset.py
260
261
262
def _release(self) -> None:
    super()._release()
    self._invalidate_cache()

load

load()
Source code in kedro_datasets/polars/lazy_polars_dataset.py
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
def load(self) -> pl.LazyFrame:
    load_path = str(self._get_load_path())
    if not self._exists():
        raise FileNotFoundError(errno.ENOENT, os.strerror(errno.ENOENT), load_path)

    if self._protocol == "file":
        # With local filesystems, we can use Polar's build-in I/O method:
        self._load_args.pop("partitioning", None)
        load_method = getattr(pl, f"scan_{self._file_format}", None)
        return load_method(load_path, **self._load_args)  # type: ignore[misc]

    # For object storage, we use pyarrow for I/O:
    dataset = ds.dataset(
        load_path, filesystem=self._fs, format=self._file_format, **self._load_args
    )
    return pl.scan_pyarrow_dataset(dataset)

save

save(data)
Source code in kedro_datasets/polars/lazy_polars_dataset.py
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
def save(self, data: pl.DataFrame | pl.LazyFrame) -> None:
    save_path = get_filepath_str(self._get_save_path(), self._protocol)

    collected_data = None
    if isinstance(data, pl.LazyFrame):
        collected_data = data.collect()
    else:
        collected_data = data

    # Note: polars does support writing partitioned parquet file
    # it is leveraging Arrow to do so, see e.g.
    # https://pola-rs.github.io/polars/py-polars/html/reference/api/polars.DataFrame.write_parquet.html
    save_method = getattr(collected_data, f"write_{self._file_format}", None)
    if save_method:
        if self._save_args.get("use_pyarrow") is True:
            pyarrow_opts = self._save_args.get("pyarrow_options", {})
            pa_fs = PyFileSystem(FSSpecHandler(self._fs))
            pyarrow_opts["filesystem"] = pa_fs
            self._save_args["pyarrow_options"] = pyarrow_opts
            save_method(file=save_path, **self._save_args)
        else:
            with self._fs.open(save_path, **self._fs_open_args_save) as fs_file:
                save_method(file=fs_file, **self._save_args)
            self._invalidate_cache()
    # How the LazyPolarsDataset logic is currently written with
    # ACCEPTED_FILE_FORMATS and a check in the `__init__` method,
    # this else loop is never reached, hence we exclude it from coverage report
    # but leave it in for consistency between the Eager and Lazy classes
    else:  # pragma: no cover
        raise DatasetError(
            f"Unable to retrieve 'polars.DataFrame.write_{self._file_format}' "
            "method, please ensure that your 'file_format' parameter has been "
            "defined correctly as per the Polars API"
            "https://pola-rs.github.io/polars/py-polars/html/reference/dataframe/index.html"
        )