SQLQueryDataset¶
SQLQueryDataset loads data from a provided SQL query using pandas. It is read-only.
kedro_datasets.pandas.SQLQueryDataset ¶
SQLQueryDataset(
sql=None,
credentials=None,
load_args=None,
fs_args=None,
filepath=None,
execution_options=None,
metadata=None,
encoding=None,
)
Bases: AbstractDataset[None, DataFrame]
SQLQueryDataset loads data from a provided SQL query. It
uses pandas.DataFrame internally, so it supports all allowed
pandas options on read_sql_query. Since Pandas uses SQLAlchemy behind
the scenes, when instantiating SQLQueryDataset one needs to pass
a compatible connection string either in credentials (see the example
code snippet below) or in load_args. Connection string formats supported
by SQLAlchemy can be found here:
https://docs.sqlalchemy.org/core/engines.html#database-urls
It does not support save method so it is a read only dataset.
To save data to a SQL server use SQLTableDataset.
Examples:
Using the YAML API:
shuttle_id_dataset:
type: pandas.SQLQueryDataset
sql: "select shuttle, shuttle_id from spaceflights.shuttles;"
credentials: db_credentials
Advanced example using the stream_results and chunksize options to reduce memory usage:
shuttle_id_dataset:
type: pandas.SQLQueryDataset
sql: "select shuttle, shuttle_id from spaceflights.shuttles;"
credentials: db_credentials
execution_options:
stream_results: true
load_args:
chunksize: 1000
Sample database credentials entry in credentials.yml:
db_credentials:
con: postgresql://scott:tiger@localhost/test
pool_size: 10 # additional parameters
Using the Python API:
>>> import sqlite3
>>>
>>> import pandas as pd
>>> from kedro_datasets.pandas import SQLQueryDataset
>>>
>>> data = pd.DataFrame({"col1": [1, 2], "col2": [4, 5], "col3": [5, 6]})
>>>
>>> sql = "SELECT * FROM table_a"
>>> credentials = {"con": f"sqlite:///{tmp_path / 'test.db'}"}
>>> dataset = SQLQueryDataset(sql=sql, credentials=credentials)
>>>
>>> con = sqlite3.connect(tmp_path / "test.db")
>>> cur = con.cursor()
>>> cur.execute("CREATE TABLE table_a(col1, col2, col3)")
<sqlite3.Cursor object at 0x...>
>>> cur.execute("INSERT INTO table_a VALUES (1, 4, 5), (2, 5, 6)")
<sqlite3.Cursor object at 0x...>
>>> con.commit()
>>> reloaded = dataset.load()
>>> assert data.equals(reloaded)
Using MSSQL:
>>> credentials = {
... "server": "localhost",
... "port": "1433",
... "database": "TestDB",
... "user": "SA",
... "password": "StrongPassword",
... }
>>>
>>> def _make_mssql_connection_str(
... server: str, port: str, database: str, user: str, password: str
... ) -> str:
... import pyodbc
... from sqlalchemy.engine import URL
... driver = pyodbc.drivers()[-1]
... connection_str = (
... f"DRIVER={driver};SERVER={server},{port};DATABASE={database};"
... f"ENCRYPT=yes;UID={user};PWD={password};"
... f"TrustServerCertificate=yes;"
... )
... return URL.create("mssql+pyodbc", query={"odbc_connect": connection_str})
...
>>> connection_str = _make_mssql_connection_str(**credentials)
>>> dataset = SQLQueryDataset(
... credentials={"con": connection_str}, sql="SELECT TOP 5 * FROM TestTable;"
... )
>>> df = dataset.load()
In addition, here is an example of a catalog with dates parsing:
mssql_dataset:
type: kedro_datasets.pandas.SQLQueryDataset
credentials: mssql_credentials
sql: >
SELECT *
FROM DateTable
WHERE date >= ? AND date <= ?
ORDER BY date
load_args:
params:
- ${begin}
- ${end}
index_col: date
parse_dates:
date: "%Y-%m-%d %H:%M:%S.%f0 %z"
Parameters:
-
sql(str | None, default:None) –The sql query statement.
-
credentials(dict[str, Any] | None, default:None) –A dictionary with a
SQLAlchemyconnection string. Users are supposed to provide the connection string 'con' through credentials. It overwritesconparameter inload_argsandsave_argsin case it is provided. To find all supported connection string formats, see here: https://docs.sqlalchemy.org/core/engines.html#database-urls Additional parameters for the sqlalchemy engine can be provided alongside the 'con' parameter. -
load_args(dict[str, Any] | None, default:None) –Provided to underlying pandas
read_sql_queryfunction along with the connection string. To find all supported arguments, see here: https://pandas.pydata.org/pandas-docs/stable/generated/pandas.read_sql_query.html To find all supported connection string formats, see here: https://docs.sqlalchemy.org/core/engines.html#database-urls -
fs_args(dict[str, Any] | None, default:None) –Extra arguments to pass into underlying filesystem class constructor (e.g.
{"project": "my-project"}forGCSFileSystem), as well as to pass to the filesystem'sopenmethod through nested keysopen_args_loadandopen_args_save. Here you can find all available arguments foropen: https://filesystem-spec.readthedocs.io/en/latest/api.html#fsspec.spec.AbstractFileSystem.open All defaults are preserved, exceptmode, which is set torwhen loading. -
filepath(str | PathLike | None, default:None) –A path to a file with a sql query statement. Can be a string or a PathLike object.
-
execution_options(dict[str, Any] | None, default:None) –A dictionary with non-SQL advanced options for the connection to be applied to the underlying engine. To find all supported execution options, see here: https://docs.sqlalchemy.org/core/connections.html#sqlalchemy.engine.Connection.execution_options Note that this is not a standard argument supported by pandas API, but could be useful for handling large datasets.
-
metadata(dict[str, Any] | None, default:None) –Any arbitrary metadata. This is ignored by Kedro, but may be consumed by users or external plugins.
Raises:
-
DatasetError–When either
sqlorconparameters is empty.
Source code in kedro_datasets/pandas/sql_dataset.py
418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 | |
_connection_args
instance-attribute
¶
_connection_args = {
k: (credentials[k]) for k in (keys()) if k != "con"
}
_load_args
instance-attribute
¶
_load_args = (
{None: default_load_args, None: load_args}
if load_args is not None
else default_load_args
)
_describe ¶
_describe()
Source code in kedro_datasets/pandas/sql_dataset.py
545 546 547 548 549 550 551 552 | |
adapt_mssql_date_params ¶
adapt_mssql_date_params()
We need to change the format of datetime parameters.
MSSQL expects datetime in the exact format %y-%m-%dT%H:%M:%S.
Here, we also accept plain dates.
pyodbc does not accept named parameters, they must be provided as a list.
Source code in kedro_datasets/pandas/sql_dataset.py
570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 | |
create_connection
classmethod
¶
create_connection(connection_str, connection_args=None)
Given a connection string, create singleton connection
to be used across all instances of SQLQueryDataset that
need to connect to the same source.
Source code in kedro_datasets/pandas/sql_dataset.py
517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 | |
load ¶
load()
Source code in kedro_datasets/pandas/sql_dataset.py
554 555 556 557 558 559 560 561 562 563 564 | |
save ¶
save(data)
Source code in kedro_datasets/pandas/sql_dataset.py
566 567 | |