kedro_datasets.pandas.SQLQueryDataset¶
- class kedro_datasets.pandas.SQLQueryDataset(sql=None, credentials=None, load_args=None, fs_args=None, filepath=None, execution_options=None, metadata=None)[source]¶
SQLQueryDatasetloads data from a provided SQL query. It usespandas.DataFrameinternally, so it supports all allowed pandas options onread_sql_query. Since Pandas uses SQLAlchemy behind the scenes, when instantiatingSQLQueryDatasetone needs to pass a compatible connection string either incredentials(see the example code snippet below) or inload_args. Connection string formats supported by SQLAlchemy can be found here: https://docs.sqlalchemy.org/core/engines.html#database-urlsIt does not support save method so it is a read only data set. To save data to a SQL server use
SQLTableDataset.Example usage for the YAML API:
shuttle_id_dataset: type: pandas.SQLQueryDataset sql: "select shuttle, shuttle_id from spaceflights.shuttles;" credentials: db_credentials
Advanced example using the
stream_resultsandchunksizeoptions to reduce memory usage:shuttle_id_dataset: type: pandas.SQLQueryDataset sql: "select shuttle, shuttle_id from spaceflights.shuttles;" credentials: db_credentials execution_options: stream_results: true load_args: chunksize: 1000
Sample database credentials entry in
credentials.yml:db_credentials: con: postgresql://scott:tiger@localhost/test pool_size: 10 # additional parameters
Example usage for the Python API:
import sqlite3 from kedro_datasets.pandas import SQLQueryDataset import pandas as pd data = pd.DataFrame({"col1": [1, 2], "col2": [4, 5], "col3": [5, 6]}) sql = "SELECT * FROM table_a" credentials = {"con": f"sqlite:///{tmp_path / 'test.db'}"} dataset = SQLQueryDataset(sql=sql, credentials=credentials) con = sqlite3.connect(tmp_path / "test.db") cur = con.cursor() cur.execute("CREATE TABLE table_a(col1, col2, col3)") <sqlite3.Cursor object at 0x...> cur.execute("INSERT INTO table_a VALUES (1, 4, 5), (2, 5, 6)") <sqlite3.Cursor object at 0x...> con.commit() reloaded = dataset.load() assert data.equals(reloaded)
Example of usage for MSSQL:
credentials = { ... "server": "localhost", ... "port": "1433", ... "database": "TestDB", ... "user": "SA", ... "password": "StrongPassword", ... } def _make_mssql_connection_str( ... server: str, port: str, database: str, user: str, password: str ... ) -> str: ... import pyodbc ... from sqlalchemy.engine import URL ... driver = pyodbc.drivers()[-1] ... connection_str = ( ... f"DRIVER={driver};SERVER={server},{port};DATABASE={database};" ... f"ENCRYPT=yes;UID={user};PWD={password};" ... f"TrustServerCertificate=yes;" ... ) ... return URL.create("mssql+pyodbc", query={"odbc_connect": connection_str}) ... connection_str = _make_mssql_connection_str(**credentials) dataset = SQLQueryDataset( ... credentials={"con": connection_str}, sql="SELECT TOP 5 * FROM TestTable;" ... ) df = dataset.load()
In addition, here is an example of a catalog with dates parsing:
mssql_dataset: type: kedro_datasets.pandas.SQLQueryDataset credentials: mssql_credentials sql: > SELECT * FROM DateTable WHERE date >= ? AND date <= ? ORDER BY date load_args: params: - ${begin} - ${end} index_col: date parse_dates: date: "%Y-%m-%d %H:%M:%S.%f0 %z"
Attributes
The
Engineobject for the dataset's connection string.Methods
We need to change the format of datetime parameters.
create_connection(connection_str[, ...])Given a connection string, create singleton connection to be used across all instances of SQLQueryDataset that need to connect to the same source.
exists()Checks whether a data set's output already exists by calling the provided _exists() method.
from_config(name, config[, load_version, ...])Create a data set instance using the configuration provided.
load()Loads data by delegation to the provided load method.
release()Release any cached data.
save(data)Saves data by delegation to the provided save method.
- __init__(sql=None, credentials=None, load_args=None, fs_args=None, filepath=None, execution_options=None, metadata=None)[source]¶
Creates a new
SQLQueryDataset.- Parameters:
credentials (
Optional[dict[str,Any]]) – A dictionary with aSQLAlchemyconnection string. Users are supposed to provide the connection string ‘con’ through credentials. It overwrites con parameter inload_argsandsave_argsin case it is provided. To find all supported connection string formats, see here: https://docs.sqlalchemy.org/core/engines.html#database-urls Additional parameters for the sqlalchemy engine can be provided alongside the ‘con’ parameter.load_args (
Optional[dict[str,Any]]) – Provided to underlying pandasread_sql_queryfunction along with the connection string. To find all supported arguments, see here: https://pandas.pydata.org/pandas-docs/stable/generated/pandas.read_sql_query.html To find all supported connection string formats, see here: https://docs.sqlalchemy.org/core/engines.html#database-urlsfs_args (
Optional[dict[str,Any]]) – Extra arguments to pass into underlying filesystem class constructor (e.g. {“project”: “my-project”} forGCSFileSystem), as well as to pass to the filesystem’s open method through nested keys open_args_load and open_args_save. Here you can find all available arguments for open: https://filesystem-spec.readthedocs.io/en/latest/api.html#fsspec.spec.AbstractFileSystem.open All defaults are preserved, except mode, which is set to r when loading.filepath (
Optional[str]) – A path to a file with a sql query statement.execution_options (
Optional[dict[str,Any]]) – A dictionary with non-SQL advanced options for the connection to be applied to the underlying engine. To find all supported execution options, see here: https://docs.sqlalchemy.org/core/connections.html#sqlalchemy.engine.Connection.execution_options Note that this is not a standard argument supported by pandas API, but could be useful for handling large datasets.metadata (
Optional[dict[str,Any]]) – Any arbitrary metadata. This is ignored by Kedro, but may be consumed by users or external plugins.
- Raises:
DatasetError – When either
sqlorconparameters is empty.
- adapt_mssql_date_params()[source]¶
We need to change the format of datetime parameters. MSSQL expects datetime in the exact format %y-%m-%dT%H:%M:%S. Here, we also accept plain dates. pyodbc does not accept named parameters, they must be provided as a list.
- Return type:
- classmethod create_connection(connection_str, connection_args=None)[source]¶
Given a connection string, create singleton connection to be used across all instances of SQLQueryDataset that need to connect to the same source.
- Return type:
- property engine¶
The
Engineobject for the dataset’s connection string.
- exists()¶
Checks whether a data set’s output already exists by calling the provided _exists() method.
- Return type:
- Returns:
Flag indicating whether the output already exists.
- Raises:
DatasetError – when underlying exists method raises error.
- classmethod from_config(name, config, load_version=None, save_version=None)¶
Create a data set instance using the configuration provided.
- Parameters:
name (
str) – Data set name.load_version (
Optional[str]) – Version string to be used forloadoperation if the data set is versioned. Has no effect on the data set if versioning was not enabled.save_version (
Optional[str]) – Version string to be used forsaveoperation if the data set is versioned. Has no effect on the data set if versioning was not enabled.
- Return type:
- Returns:
An instance of an
AbstractDatasetsubclass.- Raises:
DatasetError – When the function fails to create the data set from its config.
- load()¶
Loads data by delegation to the provided load method.
- Return type:
TypeVar(_DO)- Returns:
Data returned by the provided load method.
- Raises:
DatasetError – When underlying load method raises error.
- release()¶
Release any cached data.
- Raises:
DatasetError – when underlying release method raises error.
- Return type:
- save(data)¶
Saves data by delegation to the provided save method.
- Parameters:
data (
TypeVar(_DI)) – the value to be saved by provided save method.- Raises:
DatasetError – when underlying save method raises error.
FileNotFoundError – when save method got file instead of dir, on Windows.
NotADirectoryError – when save method got file instead of dir, on Unix.
- Return type: