Skip to content

polars.PolarsDatabaseDataset

kedro_datasets_experimental.polars.PolarsDatabaseDataset

PolarsDatabaseDataset(
    *,
    sql=None,
    credentials=None,
    load_args=None,
    fs_args=None,
    filepath=None,
    table_name=None,
    save_args=None,
    metadata=None
)

Bases: AbstractDataset[None, DataFrame]

PolarsDatabaseDataset loads data from a provided SQL query or write data to a table.

It supports all allowed polars options on read_database and write_database. Since Polars uses SQLAlchemy behind the scenes, when instantiating PolarsDatabaseDataset one needs to pass a compatible connection string either in credentials (see the example code snippet below) or in load_args. Connection string formats supported by SQLAlchemy can be found here: https://docs.sqlalchemy.org/core/engines.html#database-urls

Provide at least one of sql, filepath, or table_name (sql and filepath are mutually exclusive). load uses sql or filepath when given, otherwise SELECT * FROM <table_name>. save always writes to table_name.

Schema-qualified tables can be passed directly to table_name using the schema_name.table_name form; there is no separate schema argument.

Example usage for the YAML API:

Load-and-save against a single table:

shuttles_table:
    type: polars.PolarsDatabaseDataset
    table_name: shuttles
    credentials: db_credentials

Load via a custom (here schema-qualified) SQL query:

shuttle_id_dataset:
    type: polars.PolarsDatabaseDataset
    sql: "SELECT shuttle, shuttle_id FROM spaceflights.shuttles;"
    credentials: db_credentials

Pass extra arguments to the underlying polars methods via load_args and save_args:

shuttles_configured:
    type: polars.PolarsDatabaseDataset
    table_name: shuttles
    credentials: db_credentials
    load_args:
        batch_size: 10000
        schema_overrides:
            shuttle_id: Int64
    save_args:
        if_table_exists: append

Sample database credentials entry in credentials.yml:

db_credentials:
    con: postgresql://scott:tiger@localhost/test  # pragma: allowlist secret
    pool_size: 10 # additional parameters
Example usage for the Python API:
>>> from pathlib import Path
>>> import polars as pl
>>> import sqlite3
>>>
>>> from kedro_datasets_experimental.polars import PolarsDatabaseDataset
>>>
>>> data = pl.DataFrame({"col1": [1, 2], "col2": [4, 5], "col3": [5, 6]})
>>> sql = "SELECT * FROM table_a"
>>> tmp_path = Path.cwd() / "tmp"
>>> tmp_path.mkdir(parents=True, exist_ok=True)
>>> credentials = {"con": f"sqlite:///{tmp_path / 'test.db'}"}
>>> dataset = PolarsDatabaseDataset(sql=sql, credentials=credentials, table_name="table_a")
>>>
>>> dataset.save(data)
>>> reloaded = dataset.load()
>>>
>>> assert data.equals(reloaded)

Parameters:

  • sql (str | None, default: None ) –

    SQL query to execute on load. Mutually exclusive with filepath. If neither is given, table_name must be provided and the dataset will load via SELECT * FROM <table_name>.

  • credentials (dict[str, Any] | None, default: None ) –

    A dictionary with a SQLAlchemy connection string. Users are supposed to provide the connection string con through credentials. To find all supported connection string formats, see here: https://docs.sqlalchemy.org/core/engines.html#database-urls Additional parameters for the sqlalchemy engine can be provided alongside the con parameter.

  • load_args (dict[str, Any] | None, default: None ) –

    Provided to the underlying polars.read_database function along with the connection. To find all supported arguments, see here: https://docs.pola.rs/api/python/stable/reference/api/polars.read_database.html

  • fs_args (dict[str, Any] | None, default: None ) –

    Extra arguments passed to fsspec when filepath points to a SQL file (e.g. credentials for remote storage).

  • filepath (str | None, default: None ) –

    Path to a .sql file containing the query to execute on load. Mutually exclusive with sql.

  • table_name (str | None, default: None ) –

    Target table for save. When sql and filepath are not provided, also used as the load source via SELECT * FROM <table_name>. Schema-qualified tables can be passed as schema_name.table_name.

  • save_args (dict[str, Any] | None, default: None ) –

    Provided to the underlying polars.DataFrame.write_database method along with the connection string. To find all supported arguments, see here: https://docs.pola.rs/api/python/stable/reference/api/polars.DataFrame.write_database.html Defaults to {"if_table_exists": "replace"} — writes overwrite the target table unless overridden (e.g. if_table_exists: append).

  • metadata (dict[str, Any] | None, default: None ) –

    Any arbitrary metadata. This is ignored by Kedro, but may be consumed by users or external plugins.

Raises:

  • DatasetError

    When both sql and filepath are provided, when none of sql/filepath/table_name is provided, or when con is missing from credentials.

Source code in kedro_datasets_experimental/polars/polars_database_dataset.py
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
def __init__(  # noqa: PLR0913
    self,
    *,
    sql: str | None = None,
    credentials: dict[str, Any] | None = None,
    load_args: dict[str, Any] | None = None,
    fs_args: dict[str, Any] | None = None,
    filepath: str | None = None,
    table_name: str | None = None,
    save_args: dict[str, Any] | None = None,
    metadata: dict[str, Any] | None = None,
) -> None:
    """Creates a new ``PolarsDatabaseDataset``.

    Args:
        sql: SQL query to execute on load. Mutually exclusive with
            ``filepath``. If neither is given, ``table_name`` must be
            provided and the dataset will load via ``SELECT * FROM <table_name>``.
        credentials: A dictionary with a ``SQLAlchemy`` connection string.
            Users are supposed to provide the connection string ``con``
            through credentials. To find all supported connection string
            formats, see here:
            https://docs.sqlalchemy.org/core/engines.html#database-urls
            Additional parameters for the sqlalchemy engine can be provided
            alongside the ``con`` parameter.
        load_args: Provided to the underlying ``polars.read_database``
            function along with the connection. To find all supported
            arguments, see here:
            https://docs.pola.rs/api/python/stable/reference/api/polars.read_database.html
        fs_args: Extra arguments passed to ``fsspec`` when ``filepath``
            points to a SQL file (e.g. credentials for remote storage).
        filepath: Path to a ``.sql`` file containing the query to execute
            on load. Mutually exclusive with ``sql``.
        table_name: Target table for ``save``. When ``sql`` and
            ``filepath`` are not provided, also used as the load source via
            ``SELECT * FROM <table_name>``. Schema-qualified tables can be
            passed as ``schema_name.table_name``.
        save_args: Provided to the underlying ``polars.DataFrame.write_database``
            method along with the connection string. To find all supported
            arguments, see here:
            https://docs.pola.rs/api/python/stable/reference/api/polars.DataFrame.write_database.html
            Defaults to ``{"if_table_exists": "replace"}`` — writes overwrite
            the target table unless overridden (e.g. ``if_table_exists: append``).
        metadata: Any arbitrary metadata.
            This is ignored by Kedro, but may be consumed by users or external plugins.

    Raises:
        DatasetError: When both ``sql`` and ``filepath`` are provided, when
            none of ``sql``/``filepath``/``table_name`` is provided, or
            when ``con`` is missing from ``credentials``.
    """
    if sql and filepath:
        raise DatasetError(
            "'sql' and 'filepath' arguments cannot both be provided. "
            "Please only provide one."
        )

    if not (sql or filepath or table_name):
        raise DatasetError(
            "Provide at least one of 'sql', 'filepath', or 'table_name'. "
            "When only 'table_name' is given, the dataset will load the whole table."
        )

    if not (credentials and "con" in credentials and credentials["con"]):
        raise DatasetError(
            "'con' argument cannot be empty. Please "
            "provide a SQLAlchemy connection string."
        )

    default_load_args: dict[str, Any] = {}
    default_save_args: dict[str, Any] = {
        "if_table_exists": "replace"
    }

    self._load_args = (
        {**default_load_args, **load_args}
        if load_args is not None
        else default_load_args
    )

    self.table_name = table_name
    self._save_args = (
        {**default_save_args, **save_args}
        if save_args is not None
        else default_save_args
    )

    self.metadata = metadata

    if sql:
        self._load_args["sql"] = sql
        self._filepath = None
    elif filepath:
        # filesystem for loading sql file
        _fs_args = copy.deepcopy(fs_args) or {}
        _fs_credentials = _fs_args.pop("credentials", {})
        protocol, path = get_protocol_and_path(str(filepath))

        self._protocol = protocol
        self._fs = fsspec.filesystem(self._protocol, **_fs_credentials, **_fs_args)
        self._filepath = path
    else:
        # table_name-only mode: load() will build "SELECT * FROM <table_name>".
        self._filepath = None
    self._connection_str = credentials["con"]
    self._connection_args = {
        k: credentials[k] for k in credentials.keys() if k != "con"
    }

_connection_args instance-attribute

_connection_args = {
    k: (credentials[k])
    for k in (credentials.keys())
    if k != "con"
}

_connection_str instance-attribute

_connection_str = credentials['con']

_filepath instance-attribute

_filepath = None

_fs instance-attribute

_fs = fsspec.filesystem(
    self._protocol, **_fs_credentials, **_fs_args
)

_load_args instance-attribute

_load_args = (
    {None: default_load_args, None: load_args}
    if load_args is not None
    else default_load_args
)

_protocol instance-attribute

_protocol = protocol

_save_args instance-attribute

_save_args = (
    {None: default_save_args, None: save_args}
    if save_args is not None
    else default_save_args
)

engine property

engine

The Engine object for the dataset's connection string.

engines class-attribute instance-attribute

engines = {}

metadata instance-attribute

metadata = metadata

table_name instance-attribute

table_name = table_name

_describe

_describe()
Source code in kedro_datasets_experimental/polars/polars_database_dataset.py
312
313
314
315
316
317
318
319
320
def _describe(self) -> dict[str, Any]:
    load_args = copy.deepcopy(self._load_args)
    return {
        "sql": str(load_args.pop("sql", None)),
        "filepath": str(self._filepath),
        "load_args": str(load_args),
        "table_name": self.table_name,
        "save_args": str(self._save_args),
    }

create_connection classmethod

create_connection(connection_str, connection_args=None)

Given a connection string, create a singleton Engine to be used across all instances of PolarsDatabaseDataset that need to connect to the same source.

Source code in kedro_datasets_experimental/polars/polars_database_dataset.py
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
@classmethod
def create_connection(
    cls, connection_str: str, connection_args: dict | None = None
) -> None:
    """Given a connection string, create a singleton ``Engine``
    to be used across all instances of `PolarsDatabaseDataset` that
    need to connect to the same source.
    """
    connection_args = connection_args or {}
    try:
        engine = create_engine(connection_str, **connection_args)
    except ImportError as import_error:
        raise _get_missing_module_error(import_error) from import_error
    except NoSuchModuleError as exc:
        raise _get_sql_alchemy_missing_error() from exc

    cls.engines[connection_str] = engine

load

load()
Source code in kedro_datasets_experimental/polars/polars_database_dataset.py
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
def load(self) -> pl.DataFrame:
    load_args = copy.deepcopy(self._load_args)

    if self._filepath:
        load_path = get_filepath_str(PurePosixPath(self._filepath), self._protocol)
        with self._fs.open(load_path, mode="r") as fs_file:
            query = fs_file.read()
    elif "sql" in load_args:
        query = load_args.pop("sql")
    else:
        query = f"SELECT * FROM {self.table_name}"  # nosec B608

    return pl.read_database(
        query=query,
        connection=self.engine,
        **load_args
    )

save

save(data)
Source code in kedro_datasets_experimental/polars/polars_database_dataset.py
340
341
342
343
344
345
346
347
348
349
350
def save(self, data: pl.DataFrame) -> NoReturn:
    if not self.table_name:
        raise DatasetError(
            "'table_name' argument is required to save datasets."
        )

    data.write_database(
        table_name=self.table_name,
        connection=self.engine,
        **self._save_args
    )