kedro_datasets.pandas.SQLQueryDataset

class kedro_datasets.pandas.SQLQueryDataset(sql=None, credentials=None, load_args=None, fs_args=None, filepath=None, execution_options=None, metadata=None)[source]

SQLQueryDataset loads data from a provided SQL query. It uses pandas.DataFrame internally, so it supports all allowed pandas options on read_sql_query. Since Pandas uses SQLAlchemy behind the scenes, when instantiating SQLQueryDataset one needs to pass a compatible connection string either in credentials (see the example code snippet below) or in load_args. Connection string formats supported by SQLAlchemy can be found here: https://docs.sqlalchemy.org/core/engines.html#database-urls

It does not support save method so it is a read only data set. To save data to a SQL server use SQLTableDataset.

Example usage for the YAML API:

shuttle_id_dataset:
  type: pandas.SQLQueryDataset
  sql: "select shuttle, shuttle_id from spaceflights.shuttles;"
  credentials: db_credentials

Advanced example using the stream_results and chunksize options to reduce memory usage:

shuttle_id_dataset:
  type: pandas.SQLQueryDataset
  sql: "select shuttle, shuttle_id from spaceflights.shuttles;"
  credentials: db_credentials
  execution_options:
    stream_results: true
  load_args:
    chunksize: 1000

Sample database credentials entry in credentials.yml:

db_credentials:
  con: postgresql://scott:tiger@localhost/test
  pool_size: 10 # additional parameters

Example usage for the Python API:

 import sqlite3

 from kedro_datasets.pandas import SQLQueryDataset
 import pandas as pd

 data = pd.DataFrame({"col1": [1, 2], "col2": [4, 5], "col3": [5, 6]})
 sql = "SELECT * FROM table_a"
 credentials = {"con": f"sqlite:///{tmp_path / 'test.db'}"}
 dataset = SQLQueryDataset(sql=sql, credentials=credentials)

 con = sqlite3.connect(tmp_path / "test.db")
 cur = con.cursor()
 cur.execute("CREATE TABLE table_a(col1, col2, col3)")
<sqlite3.Cursor object at 0x...>
 cur.execute("INSERT INTO table_a VALUES (1, 4, 5), (2, 5, 6)")
<sqlite3.Cursor object at 0x...>
 con.commit()
 reloaded = dataset.load()

 assert data.equals(reloaded)

Example of usage for MSSQL:

 credentials = {
...     "server": "localhost",
...     "port": "1433",
...     "database": "TestDB",
...     "user": "SA",
...     "password": "StrongPassword",
... }
 def _make_mssql_connection_str(
...     server: str, port: str, database: str, user: str, password: str
... ) -> str:
...     import pyodbc
...     from sqlalchemy.engine import URL
...     driver = pyodbc.drivers()[-1]
...     connection_str = (
...         f"DRIVER={driver};SERVER={server},{port};DATABASE={database};"
...         f"ENCRYPT=yes;UID={user};PWD={password};"
...         f"TrustServerCertificate=yes;"
...     )
...     return URL.create("mssql+pyodbc", query={"odbc_connect": connection_str})
...
 connection_str = _make_mssql_connection_str(**credentials)  
 dataset = SQLQueryDataset(  
...     credentials={"con": connection_str}, sql="SELECT TOP 5 * FROM TestTable;"
... )
 df = dataset.load()

In addition, here is an example of a catalog with dates parsing:

mssql_dataset:
  type: kedro_datasets.pandas.SQLQueryDataset
  credentials: mssql_credentials
  sql: >
    SELECT *
    FROM  DateTable
    WHERE date >= ? AND date <= ?
    ORDER BY date
  load_args:
    params:
      - ${begin}
      - ${end}
    index_col: date
    parse_dates:
      date: "%Y-%m-%d %H:%M:%S.%f0 %z"

Attributes

engine

The Engine object for the dataset's connection string.

engines

Methods

adapt_mssql_date_params()

We need to change the format of datetime parameters.

create_connection(connection_str[, ...])

Given a connection string, create singleton connection to be used across all instances of SQLQueryDataset that need to connect to the same source.

exists()

Checks whether a data set's output already exists by calling the provided _exists() method.

from_config(name, config[, load_version, ...])

Create a data set instance using the configuration provided.

load()

Loads data by delegation to the provided load method.

release()

Release any cached data.

save(data)

Saves data by delegation to the provided save method.

__init__(sql=None, credentials=None, load_args=None, fs_args=None, filepath=None, execution_options=None, metadata=None)[source]

Creates a new SQLQueryDataset.

Parameters:
Raises:

DatasetError – When either sql or con parameters is empty.

adapt_mssql_date_params()[source]

We need to change the format of datetime parameters. MSSQL expects datetime in the exact format %y-%m-%dT%H:%M:%S. Here, we also accept plain dates. pyodbc does not accept named parameters, they must be provided as a list.

Return type:

None

classmethod create_connection(connection_str, connection_args=None)[source]

Given a connection string, create singleton connection to be used across all instances of SQLQueryDataset that need to connect to the same source.

Return type:

None

property engine

The Engine object for the dataset’s connection string.

engines: dict[str, Any] = {}
exists()

Checks whether a data set’s output already exists by calling the provided _exists() method.

Return type:

bool

Returns:

Flag indicating whether the output already exists.

Raises:

DatasetError – when underlying exists method raises error.

classmethod from_config(name, config, load_version=None, save_version=None)

Create a data set instance using the configuration provided.

Parameters:
  • name (str) – Data set name.

  • config (dict[str, Any]) – Data set config dictionary.

  • load_version (str | None) – Version string to be used for load operation if the data set is versioned. Has no effect on the data set if versioning was not enabled.

  • save_version (str | None) – Version string to be used for save operation if the data set is versioned. Has no effect on the data set if versioning was not enabled.

Return type:

AbstractDataset

Returns:

An instance of an AbstractDataset subclass.

Raises:

DatasetError – When the function fails to create the data set from its config.

load()

Loads data by delegation to the provided load method.

Return type:

TypeVar(_DO)

Returns:

Data returned by the provided load method.

Raises:

DatasetError – When underlying load method raises error.

release()

Release any cached data.

Raises:

DatasetError – when underlying release method raises error.

Return type:

None

save(data)

Saves data by delegation to the provided save method.

Parameters:

data (TypeVar(_DI)) – the value to be saved by provided save method.

Raises:
  • DatasetError – when underlying save method raises error.

  • FileNotFoundError – when save method got file instead of dir, on Windows.

  • NotADirectoryError – when save method got file instead of dir, on Unix.

Return type:

None