Skip to content

SQLQueryDataset

SQLQueryDataset loads data from a provided SQL query using pandas. It is read-only.

kedro_datasets.pandas.SQLQueryDataset

SQLQueryDataset(
    sql=None,
    credentials=None,
    load_args=None,
    fs_args=None,
    filepath=None,
    execution_options=None,
    metadata=None,
    encoding=None,
)

Bases: AbstractDataset[None, DataFrame]

SQLQueryDataset loads data from a provided SQL query. It uses pandas.DataFrame internally, so it supports all allowed pandas options on read_sql_query. Since Pandas uses SQLAlchemy behind the scenes, when instantiating SQLQueryDataset one needs to pass a compatible connection string either in credentials (see the example code snippet below) or in load_args. Connection string formats supported by SQLAlchemy can be found here: https://docs.sqlalchemy.org/core/engines.html#database-urls

It does not support save method so it is a read only dataset. To save data to a SQL server use SQLTableDataset.

Examples:

Using the YAML API:

shuttle_id_dataset:
  type: pandas.SQLQueryDataset
  sql: "select shuttle, shuttle_id from spaceflights.shuttles;"
  credentials: db_credentials

Advanced example using the stream_results and chunksize options to reduce memory usage:

shuttle_id_dataset:
  type: pandas.SQLQueryDataset
  sql: "select shuttle, shuttle_id from spaceflights.shuttles;"
  credentials: db_credentials
  execution_options:
    stream_results: true
  load_args:
    chunksize: 1000

Sample database credentials entry in credentials.yml:

db_credentials:
  con: postgresql://scott:tiger@localhost/test
  pool_size: 10 # additional parameters

Using the Python API:

>>> import sqlite3
>>>
>>> import pandas as pd
>>> from kedro_datasets.pandas import SQLQueryDataset
>>>
>>> data = pd.DataFrame({"col1": [1, 2], "col2": [4, 5], "col3": [5, 6]})
>>>
>>> sql = "SELECT * FROM table_a"
>>> credentials = {"con": f"sqlite:///{tmp_path / 'test.db'}"}
>>> dataset = SQLQueryDataset(sql=sql, credentials=credentials)
>>>
>>> con = sqlite3.connect(tmp_path / "test.db")
>>> cur = con.cursor()
>>> cur.execute("CREATE TABLE table_a(col1, col2, col3)")
<sqlite3.Cursor object at 0x...>
>>> cur.execute("INSERT INTO table_a VALUES (1, 4, 5), (2, 5, 6)")
<sqlite3.Cursor object at 0x...>
>>> con.commit()
>>> reloaded = dataset.load()
>>> assert data.equals(reloaded)

Using MSSQL:

>>> credentials = {
...     "server": "localhost",
...     "port": "1433",
...     "database": "TestDB",
...     "user": "SA",
...     "password": "StrongPassword",
... }
>>>
>>> def _make_mssql_connection_str(
...     server: str, port: str, database: str, user: str, password: str
... ) -> str:
...     import pyodbc
...     from sqlalchemy.engine import URL
...     driver = pyodbc.drivers()[-1]
...     connection_str = (
...         f"DRIVER={driver};SERVER={server},{port};DATABASE={database};"
...         f"ENCRYPT=yes;UID={user};PWD={password};"
...         f"TrustServerCertificate=yes;"
...     )
...     return URL.create("mssql+pyodbc", query={"odbc_connect": connection_str})
...
>>> connection_str = _make_mssql_connection_str(**credentials)
>>> dataset = SQLQueryDataset(
...     credentials={"con": connection_str}, sql="SELECT TOP 5 * FROM TestTable;"
... )
>>> df = dataset.load()

In addition, here is an example of a catalog with dates parsing:

mssql_dataset:
  type: kedro_datasets.pandas.SQLQueryDataset
  credentials: mssql_credentials
  sql: >
    SELECT *
    FROM  DateTable
    WHERE date >= ? AND date <= ?
    ORDER BY date
  load_args:
    params:
      - ${begin}
      - ${end}
    index_col: date
    parse_dates:
      date: "%Y-%m-%d %H:%M:%S.%f0 %z"

Parameters:

  • sql (str | None, default: None ) –

    The sql query statement.

  • credentials (dict[str, Any] | None, default: None ) –

    A dictionary with a SQLAlchemy connection string. Users are supposed to provide the connection string 'con' through credentials. It overwrites con parameter in load_args and save_args in case it is provided. To find all supported connection string formats, see here: https://docs.sqlalchemy.org/core/engines.html#database-urls Additional parameters for the sqlalchemy engine can be provided alongside the 'con' parameter.

  • load_args (dict[str, Any] | None, default: None ) –

    Provided to underlying pandas read_sql_query function along with the connection string. To find all supported arguments, see here: https://pandas.pydata.org/pandas-docs/stable/generated/pandas.read_sql_query.html To find all supported connection string formats, see here: https://docs.sqlalchemy.org/core/engines.html#database-urls

  • fs_args (dict[str, Any] | None, default: None ) –

    Extra arguments to pass into underlying filesystem class constructor (e.g. {"project": "my-project"} for GCSFileSystem), as well as to pass to the filesystem's open method through nested keys open_args_load and open_args_save. Here you can find all available arguments for open: https://filesystem-spec.readthedocs.io/en/latest/api.html#fsspec.spec.AbstractFileSystem.open All defaults are preserved, except mode, which is set to r when loading.

  • filepath (str | PathLike | None, default: None ) –

    A path to a file with a sql query statement. Can be a string or a PathLike object.

  • execution_options (dict[str, Any] | None, default: None ) –

    A dictionary with non-SQL advanced options for the connection to be applied to the underlying engine. To find all supported execution options, see here: https://docs.sqlalchemy.org/core/connections.html#sqlalchemy.engine.Connection.execution_options Note that this is not a standard argument supported by pandas API, but could be useful for handling large datasets.

  • metadata (dict[str, Any] | None, default: None ) –

    Any arbitrary metadata. This is ignored by Kedro, but may be consumed by users or external plugins.

Raises:

  • DatasetError

    When either sql or con parameters is empty.

Source code in kedro_datasets/pandas/sql_dataset.py
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
def __init__(  # noqa: PLR0913
    self,
    sql: str | None = None,
    credentials: dict[str, Any] | None = None,
    load_args: dict[str, Any] | None = None,
    fs_args: dict[str, Any] | None = None,
    filepath: str | os.PathLike | None = None,
    execution_options: dict[str, Any] | None = None,
    metadata: dict[str, Any] | None = None,
    encoding: str | None = None,
) -> None:
    """Creates a new ``SQLQueryDataset``.

    Args:
        sql: The sql query statement.
        credentials: A dictionary with a ``SQLAlchemy`` connection string.
            Users are supposed to provide the connection string 'con'
            through credentials. It overwrites `con` parameter in
            ``load_args`` and ``save_args`` in case it is provided. To find
            all supported connection string formats, see here:
            https://docs.sqlalchemy.org/core/engines.html#database-urls
            Additional parameters for the sqlalchemy engine can be provided
            alongside the 'con' parameter.
        load_args: Provided to underlying pandas ``read_sql_query``
            function along with the connection string.
            To find all supported arguments, see here:
            https://pandas.pydata.org/pandas-docs/stable/generated/pandas.read_sql_query.html
            To find all supported connection string formats, see here:
            https://docs.sqlalchemy.org/core/engines.html#database-urls
        fs_args: Extra arguments to pass into underlying filesystem class constructor
            (e.g. `{"project": "my-project"}` for ``GCSFileSystem``), as well as
            to pass to the filesystem's `open` method through nested keys
            `open_args_load` and `open_args_save`.
            Here you can find all available arguments for `open`:
            https://filesystem-spec.readthedocs.io/en/latest/api.html#fsspec.spec.AbstractFileSystem.open
            All defaults are preserved, except `mode`, which is set to `r` when loading.
        filepath: A path to a file with a sql query statement. Can be a string or a PathLike object.
        execution_options: A dictionary with non-SQL advanced options for the connection to
            be applied to the underlying engine. To find all supported execution
            options, see here:
            https://docs.sqlalchemy.org/core/connections.html#sqlalchemy.engine.Connection.execution_options
            Note that this is not a standard argument supported by pandas API, but could be
            useful for handling large datasets.
        metadata: Any arbitrary metadata.
            This is ignored by Kedro, but may be consumed by users or external plugins.

    Raises:
        DatasetError: When either ``sql`` or ``con`` parameters is empty.
    """
    if sql and filepath:
        raise DatasetError(
            "'sql' and 'filepath' arguments cannot both be provided."
            "Please only provide one."
        )

    if not (sql or filepath):
        raise DatasetError(
            "'sql' and 'filepath' arguments cannot both be empty."
            "Please provide a sql query or path to a sql query file."
        )

    if not (credentials and "con" in credentials and credentials["con"]):
        raise DatasetError(
            "'con' argument cannot be empty. Please "
            "provide a SQLAlchemy connection string."
        )

    default_load_args: dict[str, Any] = {}

    self._load_args = (
        {**default_load_args, **load_args}
        if load_args is not None
        else default_load_args
    )

    self.metadata = metadata
    self.encoding = encoding

    # load sql query from file
    if sql:
        self._load_args["sql"] = sql
        self._filepath = None
    else:
        # filesystem for loading sql file
        _fs_args = copy.deepcopy(fs_args) or {}
        _fs_credentials = _fs_args.pop("credentials", {})
        protocol, path = get_protocol_and_path(str(filepath))

        self._protocol = protocol
        self._fs = fsspec.filesystem(self._protocol, **_fs_credentials, **_fs_args)
        self._filepath = path
    self._connection_str = credentials["con"]
    self._connection_args = {
        k: credentials[k] for k in credentials.keys() if k != "con"
    }
    self._execution_options = execution_options or {}
    if "mssql" in self._connection_str:
        self.adapt_mssql_date_params()

_connection_args instance-attribute

_connection_args = {
    k: (credentials[k]) for k in (keys()) if k != "con"
}

_connection_str instance-attribute

_connection_str = credentials['con']

_execution_options instance-attribute

_execution_options = execution_options or {}

_filepath instance-attribute

_filepath = None

_fs instance-attribute

_fs = filesystem(_protocol, **_fs_credentials, **_fs_args)

_load_args instance-attribute

_load_args = (
    {None: default_load_args, None: load_args}
    if load_args is not None
    else default_load_args
)

_protocol instance-attribute

_protocol = protocol

encoding instance-attribute

encoding = encoding

engine property

engine

The Engine object for the dataset's connection string.

engines class-attribute instance-attribute

engines = {}

metadata instance-attribute

metadata = metadata

_describe

_describe()
Source code in kedro_datasets/pandas/sql_dataset.py
545
546
547
548
549
550
551
552
def _describe(self) -> dict[str, Any]:
    load_args = copy.deepcopy(self._load_args)
    return {
        "sql": str(load_args.pop("sql", None)),
        "filepath": str(self._filepath),
        "load_args": str(load_args),
        "execution_options": str(self._execution_options),
    }

adapt_mssql_date_params

adapt_mssql_date_params()

We need to change the format of datetime parameters. MSSQL expects datetime in the exact format %y-%m-%dT%H:%M:%S. Here, we also accept plain dates. pyodbc does not accept named parameters, they must be provided as a list.

Source code in kedro_datasets/pandas/sql_dataset.py
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
def adapt_mssql_date_params(self) -> None:
    """We need to change the format of datetime parameters.
    MSSQL expects datetime in the exact format %y-%m-%dT%H:%M:%S.
    Here, we also accept plain dates.
    `pyodbc` does not accept named parameters, they must be provided as a list."""
    params = self._load_args.get("params", [])
    if not isinstance(params, list):
        raise DatasetError(
            "Unrecognized `params` format. It can be only a `list`, "
            f"got {type(params)!r}"
        )
    new_load_args = []
    for value in params:
        try:
            as_date = dt.date.fromisoformat(value)
            new_val = dt.datetime.combine(as_date, dt.time.min)
            new_load_args.append(new_val.strftime("%Y-%m-%dT%H:%M:%S"))
        except (TypeError, ValueError):
            new_load_args.append(value)
    if new_load_args:
        self._load_args["params"] = tuple(new_load_args)

create_connection classmethod

create_connection(connection_str, connection_args=None)

Given a connection string, create singleton connection to be used across all instances of SQLQueryDataset that need to connect to the same source.

Source code in kedro_datasets/pandas/sql_dataset.py
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
@classmethod
def create_connection(
    cls, connection_str: str, connection_args: dict | None = None
) -> None:
    """Given a connection string, create singleton connection
    to be used across all instances of `SQLQueryDataset` that
    need to connect to the same source.
    """
    connection_args = connection_args or {}
    try:
        engine = create_engine(connection_str, **connection_args)
    except ImportError as import_error:
        raise _get_missing_module_error(import_error) from import_error
    except NoSuchModuleError as exc:
        raise _get_sql_alchemy_missing_error() from exc

    cls.engines[connection_str] = engine

load

load()
Source code in kedro_datasets/pandas/sql_dataset.py
554
555
556
557
558
559
560
561
562
563
564
def load(self) -> pd.DataFrame:
    load_args = copy.deepcopy(self._load_args)

    if self._filepath:
        load_path = get_filepath_str(PurePosixPath(self._filepath), self._protocol)
        with self._fs.open(load_path, mode="r", encoding=self.encoding) as fs_file:
            load_args["sql"] = fs_file.read()

    return pd.read_sql_query(
        con=self.engine.execution_options(**self._execution_options), **load_args
    )

save

save(data)
Source code in kedro_datasets/pandas/sql_dataset.py
566
567
def save(self, data: None) -> NoReturn:
    raise DatasetError("'save' is not supported on SQLQueryDataset")