kedro_datasets.pandas.SQLQueryDataset¶
- class kedro_datasets.pandas.SQLQueryDataset(sql=None, credentials=None, load_args=None, fs_args=None, filepath=None, execution_options=None, metadata=None)[source]¶
SQLQueryDataset
loads data from a provided SQL query. It usespandas.DataFrame
internally, so it supports all allowed pandas options onread_sql_query
. Since Pandas uses SQLAlchemy behind the scenes, when instantiatingSQLQueryDataset
one needs to pass a compatible connection string either incredentials
(see the example code snippet below) or inload_args
. Connection string formats supported by SQLAlchemy can be found here: https://docs.sqlalchemy.org/core/engines.html#database-urlsIt does not support save method so it is a read only dataset. To save data to a SQL server use
SQLTableDataset
.Example usage for the YAML API:
shuttle_id_dataset: type: pandas.SQLQueryDataset sql: "select shuttle, shuttle_id from spaceflights.shuttles;" credentials: db_credentials
Advanced example using the
stream_results
andchunksize
options to reduce memory usage:shuttle_id_dataset: type: pandas.SQLQueryDataset sql: "select shuttle, shuttle_id from spaceflights.shuttles;" credentials: db_credentials execution_options: stream_results: true load_args: chunksize: 1000
Sample database credentials entry in
credentials.yml
:db_credentials: con: postgresql://scott:tiger@localhost/test pool_size: 10 # additional parameters
Example usage for the Python API:
import sqlite3 from kedro_datasets.pandas import SQLQueryDataset import pandas as pd data = pd.DataFrame({"col1": [1, 2], "col2": [4, 5], "col3": [5, 6]}) sql = "SELECT * FROM table_a" credentials = {"con": f"sqlite:///{tmp_path / 'test.db'}"} dataset = SQLQueryDataset(sql=sql, credentials=credentials) con = sqlite3.connect(tmp_path / "test.db") cur = con.cursor() cur.execute("CREATE TABLE table_a(col1, col2, col3)") <sqlite3.Cursor object at 0x...> cur.execute("INSERT INTO table_a VALUES (1, 4, 5), (2, 5, 6)") <sqlite3.Cursor object at 0x...> con.commit() reloaded = dataset.load() assert data.equals(reloaded)
Example of usage for MSSQL:
credentials = { ... "server": "localhost", ... "port": "1433", ... "database": "TestDB", ... "user": "SA", ... "password": "StrongPassword", ... } def _make_mssql_connection_str( ... server: str, port: str, database: str, user: str, password: str ... ) -> str: ... import pyodbc ... from sqlalchemy.engine import URL ... driver = pyodbc.drivers()[-1] ... connection_str = ( ... f"DRIVER={driver};SERVER={server},{port};DATABASE={database};" ... f"ENCRYPT=yes;UID={user};PWD={password};" ... f"TrustServerCertificate=yes;" ... ) ... return URL.create("mssql+pyodbc", query={"odbc_connect": connection_str}) ... connection_str = _make_mssql_connection_str(**credentials) dataset = SQLQueryDataset( ... credentials={"con": connection_str}, sql="SELECT TOP 5 * FROM TestTable;" ... ) df = dataset.load()
In addition, here is an example of a catalog with dates parsing:
mssql_dataset: type: kedro_datasets.pandas.SQLQueryDataset credentials: mssql_credentials sql: > SELECT * FROM DateTable WHERE date >= ? AND date <= ? ORDER BY date load_args: params: - ${begin} - ${end} index_col: date parse_dates: date: "%Y-%m-%d %H:%M:%S.%f0 %z"
Attributes
The
Engine
object for the dataset's connection string.Methods
We need to change the format of datetime parameters.
create_connection
(connection_str[, ...])Given a connection string, create singleton connection to be used across all instances of SQLQueryDataset that need to connect to the same source.
exists
()Checks whether a dataset's output already exists by calling the provided _exists() method.
from_config
(name, config[, load_version, ...])Create a dataset instance using the configuration provided.
load
()Loads data by delegation to the provided load method.
release
()Release any cached data.
save
(data)Saves data by delegation to the provided save method.
Converts the dataset instance into a dictionary-based configuration for serialization.
- __init__(sql=None, credentials=None, load_args=None, fs_args=None, filepath=None, execution_options=None, metadata=None)[source]¶
Creates a new
SQLQueryDataset
.- Parameters:
credentials (
Optional
[dict
[str
,Any
]]) – A dictionary with aSQLAlchemy
connection string. Users are supposed to provide the connection string ‘con’ through credentials. It overwrites con parameter inload_args
andsave_args
in case it is provided. To find all supported connection string formats, see here: https://docs.sqlalchemy.org/core/engines.html#database-urls Additional parameters for the sqlalchemy engine can be provided alongside the ‘con’ parameter.load_args (
Optional
[dict
[str
,Any
]]) – Provided to underlying pandasread_sql_query
function along with the connection string. To find all supported arguments, see here: https://pandas.pydata.org/pandas-docs/stable/generated/pandas.read_sql_query.html To find all supported connection string formats, see here: https://docs.sqlalchemy.org/core/engines.html#database-urlsfs_args (
Optional
[dict
[str
,Any
]]) – Extra arguments to pass into underlying filesystem class constructor (e.g. {“project”: “my-project”} forGCSFileSystem
), as well as to pass to the filesystem’s open method through nested keys open_args_load and open_args_save. Here you can find all available arguments for open: https://filesystem-spec.readthedocs.io/en/latest/api.html#fsspec.spec.AbstractFileSystem.open All defaults are preserved, except mode, which is set to r when loading.filepath (
Optional
[str
]) – A path to a file with a sql query statement.execution_options (
Optional
[dict
[str
,Any
]]) – A dictionary with non-SQL advanced options for the connection to be applied to the underlying engine. To find all supported execution options, see here: https://docs.sqlalchemy.org/core/connections.html#sqlalchemy.engine.Connection.execution_options Note that this is not a standard argument supported by pandas API, but could be useful for handling large datasets.metadata (
Optional
[dict
[str
,Any
]]) – Any arbitrary metadata. This is ignored by Kedro, but may be consumed by users or external plugins.
- Raises:
DatasetError – When either
sql
orcon
parameters is empty.
- adapt_mssql_date_params()[source]¶
We need to change the format of datetime parameters. MSSQL expects datetime in the exact format %y-%m-%dT%H:%M:%S. Here, we also accept plain dates. pyodbc does not accept named parameters, they must be provided as a list.
- Return type:
- classmethod create_connection(connection_str, connection_args=None)[source]¶
Given a connection string, create singleton connection to be used across all instances of SQLQueryDataset that need to connect to the same source.
- Return type:
- property engine¶
The
Engine
object for the dataset’s connection string.
- exists()[source]¶
Checks whether a dataset’s output already exists by calling the provided _exists() method.
- Return type:
- Returns:
Flag indicating whether the output already exists.
- Raises:
DatasetError – when underlying exists method raises error.
- classmethod from_config(name, config, load_version=None, save_version=None)[source]¶
Create a dataset instance using the configuration provided.
- Parameters:
name (
str
) – Data set name.load_version (
Optional
[str
]) – Version string to be used forload
operation if the dataset is versioned. Has no effect on the dataset if versioning was not enabled.save_version (
Optional
[str
]) – Version string to be used forsave
operation if the dataset is versioned. Has no effect on the dataset if versioning was not enabled.
- Return type:
- Returns:
An instance of an
AbstractDataset
subclass.- Raises:
DatasetError – When the function fails to create the dataset from its config.
- load()[source]¶
Loads data by delegation to the provided load method.
- Return type:
DataFrame
- Returns:
Data returned by the provided load method.
- Raises:
DatasetError – When underlying load method raises error.
- release()[source]¶
Release any cached data.
- Raises:
DatasetError – when underlying release method raises error.
- Return type:
- save(data)[source]¶
Saves data by delegation to the provided save method.
- Parameters:
data (
None
) – the value to be saved by provided save method.- Raises:
DatasetError – when underlying save method raises error.
FileNotFoundError – when save method got file instead of dir, on Windows.
NotADirectoryError – when save method got file instead of dir, on Unix.
- Return type:
- to_config()[source]¶
Converts the dataset instance into a dictionary-based configuration for serialization. Ensures that any subclass-specific details are handled, with additional logic for versioning and caching implemented for CachedDataset.
Adds a key for the dataset’s type using its module and class name and includes the initialization arguments.
For CachedDataset it extracts the underlying dataset’s configuration, handles the versioned flag and removes unnecessary metadata. It also ensures the embedded dataset’s configuration is appropriately flattened or transformed.
If the dataset has a version key, it sets the versioned flag in the configuration.
Removes the metadata key from the configuration if present.