kedro_datasets.pandas.SQLQueryDataset

class kedro_datasets.pandas.SQLQueryDataset(sql=None, credentials=None, load_args=None, fs_args=None, filepath=None, execution_options=None, metadata=None)[source]

SQLQueryDataset loads data from a provided SQL query. It uses pandas.DataFrame internally, so it supports all allowed pandas options on read_sql_query. Since Pandas uses SQLAlchemy behind the scenes, when instantiating SQLQueryDataset one needs to pass a compatible connection string either in credentials (see the example code snippet below) or in load_args. Connection string formats supported by SQLAlchemy can be found here: https://docs.sqlalchemy.org/core/engines.html#database-urls

It does not support save method so it is a read only dataset. To save data to a SQL server use SQLTableDataset.

Example usage for the YAML API:

shuttle_id_dataset:
  type: pandas.SQLQueryDataset
  sql: "select shuttle, shuttle_id from spaceflights.shuttles;"
  credentials: db_credentials

Advanced example using the stream_results and chunksize options to reduce memory usage:

shuttle_id_dataset:
  type: pandas.SQLQueryDataset
  sql: "select shuttle, shuttle_id from spaceflights.shuttles;"
  credentials: db_credentials
  execution_options:
    stream_results: true
  load_args:
    chunksize: 1000

Sample database credentials entry in credentials.yml:

db_credentials:
  con: postgresql://scott:tiger@localhost/test
  pool_size: 10 # additional parameters

Example usage for the Python API:

 import sqlite3

 from kedro_datasets.pandas import SQLQueryDataset
 import pandas as pd

 data = pd.DataFrame({"col1": [1, 2], "col2": [4, 5], "col3": [5, 6]})
 sql = "SELECT * FROM table_a"
 credentials = {"con": f"sqlite:///{tmp_path / 'test.db'}"}
 dataset = SQLQueryDataset(sql=sql, credentials=credentials)

 con = sqlite3.connect(tmp_path / "test.db")
 cur = con.cursor()
 cur.execute("CREATE TABLE table_a(col1, col2, col3)")
<sqlite3.Cursor object at 0x...>
 cur.execute("INSERT INTO table_a VALUES (1, 4, 5), (2, 5, 6)")
<sqlite3.Cursor object at 0x...>
 con.commit()
 reloaded = dataset.load()

 assert data.equals(reloaded)

Example of usage for MSSQL:

 credentials = {
...     "server": "localhost",
...     "port": "1433",
...     "database": "TestDB",
...     "user": "SA",
...     "password": "StrongPassword",
... }
 def _make_mssql_connection_str(
...     server: str, port: str, database: str, user: str, password: str
... ) -> str:
...     import pyodbc
...     from sqlalchemy.engine import URL
...     driver = pyodbc.drivers()[-1]
...     connection_str = (
...         f"DRIVER={driver};SERVER={server},{port};DATABASE={database};"
...         f"ENCRYPT=yes;UID={user};PWD={password};"
...         f"TrustServerCertificate=yes;"
...     )
...     return URL.create("mssql+pyodbc", query={"odbc_connect": connection_str})
...
 connection_str = _make_mssql_connection_str(**credentials)  
 dataset = SQLQueryDataset(  
...     credentials={"con": connection_str}, sql="SELECT TOP 5 * FROM TestTable;"
... )
 df = dataset.load()

In addition, here is an example of a catalog with dates parsing:

mssql_dataset:
  type: kedro_datasets.pandas.SQLQueryDataset
  credentials: mssql_credentials
  sql: >
    SELECT *
    FROM  DateTable
    WHERE date >= ? AND date <= ?
    ORDER BY date
  load_args:
    params:
      - ${begin}
      - ${end}
    index_col: date
    parse_dates:
      date: "%Y-%m-%d %H:%M:%S.%f0 %z"

Attributes

engine

The Engine object for the dataset's connection string.

engines

Methods

adapt_mssql_date_params()

We need to change the format of datetime parameters.

create_connection(connection_str[, ...])

Given a connection string, create singleton connection to be used across all instances of SQLQueryDataset that need to connect to the same source.

exists()

Checks whether a dataset's output already exists by calling the provided _exists() method.

from_config(name, config[, load_version, ...])

Create a dataset instance using the configuration provided.

load()

Loads data by delegation to the provided load method.

release()

Release any cached data.

save(data)

Saves data by delegation to the provided save method.

to_config()

Converts the dataset instance into a dictionary-based configuration for serialization.

__init__(sql=None, credentials=None, load_args=None, fs_args=None, filepath=None, execution_options=None, metadata=None)[source]

Creates a new SQLQueryDataset.

Parameters:
Raises:

DatasetError – When either sql or con parameters is empty.

adapt_mssql_date_params()[source]

We need to change the format of datetime parameters. MSSQL expects datetime in the exact format %y-%m-%dT%H:%M:%S. Here, we also accept plain dates. pyodbc does not accept named parameters, they must be provided as a list.

Return type:

None

classmethod create_connection(connection_str, connection_args=None)[source]

Given a connection string, create singleton connection to be used across all instances of SQLQueryDataset that need to connect to the same source.

Return type:

None

property engine

The Engine object for the dataset’s connection string.

engines: dict[str, Any] = {}
exists()[source]

Checks whether a dataset’s output already exists by calling the provided _exists() method.

Return type:

bool

Returns:

Flag indicating whether the output already exists.

Raises:

DatasetError – when underlying exists method raises error.

classmethod from_config(name, config, load_version=None, save_version=None)[source]

Create a dataset instance using the configuration provided.

Parameters:
  • name (str) – Data set name.

  • config (dict[str, Any]) – Data set config dictionary.

  • load_version (Optional[str]) – Version string to be used for load operation if the dataset is versioned. Has no effect on the dataset if versioning was not enabled.

  • save_version (Optional[str]) – Version string to be used for save operation if the dataset is versioned. Has no effect on the dataset if versioning was not enabled.

Return type:

AbstractDataset

Returns:

An instance of an AbstractDataset subclass.

Raises:

DatasetError – When the function fails to create the dataset from its config.

load()[source]

Loads data by delegation to the provided load method.

Return type:

DataFrame

Returns:

Data returned by the provided load method.

Raises:

DatasetError – When underlying load method raises error.

release()[source]

Release any cached data.

Raises:

DatasetError – when underlying release method raises error.

Return type:

None

save(data)[source]

Saves data by delegation to the provided save method.

Parameters:

data (None) – the value to be saved by provided save method.

Raises:
  • DatasetError – when underlying save method raises error.

  • FileNotFoundError – when save method got file instead of dir, on Windows.

  • NotADirectoryError – when save method got file instead of dir, on Unix.

Return type:

NoReturn

to_config()[source]

Converts the dataset instance into a dictionary-based configuration for serialization. Ensures that any subclass-specific details are handled, with additional logic for versioning and caching implemented for CachedDataset.

Adds a key for the dataset’s type using its module and class name and includes the initialization arguments.

For CachedDataset it extracts the underlying dataset’s configuration, handles the versioned flag and removes unnecessary metadata. It also ensures the embedded dataset’s configuration is appropriately flattened or transformed.

If the dataset has a version key, it sets the versioned flag in the configuration.

Removes the metadata key from the configuration if present.

Return type:

dict[str, Any]

Returns:

A dictionary containing the dataset’s type and initialization arguments.