Source code for kedro_datasets.pandas.gbq_dataset

"""``GBQTableDataset`` loads and saves data from/to Google BigQuery. It uses pandas-gbq
to read and write from/to BigQuery table.
"""

from __future__ import annotations

import copy
from pathlib import PurePosixPath
from typing import Any, ClassVar, NoReturn

import fsspec
import pandas as pd
import pandas_gbq as pd_gbq
from google.cloud import bigquery
from google.cloud.exceptions import NotFound
from google.oauth2.credentials import Credentials
from kedro.io.core import (
    AbstractDataset,
    DatasetError,
    get_filepath_str,
    get_protocol_and_path,
    validate_on_forbidden_chars,
)

from kedro_datasets._utils import ConnectionMixin


[docs] class GBQTableDataset(ConnectionMixin, AbstractDataset[None, pd.DataFrame]): """``GBQTableDataset`` loads and saves data from/to Google BigQuery. It uses pandas-gbq to read and write from/to BigQuery table. Example usage for the `YAML API <https://docs.kedro.org/en/stable/data/\ data_catalog_yaml_examples.html>`_: .. code-block:: yaml vehicles: type: pandas.GBQTableDataset dataset: big_query_dataset table_name: big_query_table project: my-project credentials: gbq-creds load_args: reauth: True save_args: chunk_size: 100 Example usage for the `Python API <https://docs.kedro.org/en/stable/data/\ advanced_data_catalog_usage.html>`_: .. code-block:: pycon >>> from kedro_datasets.pandas import GBQTableDataset >>> import pandas as pd >>> >>> data = pd.DataFrame({"col1": [1, 2], "col2": [4, 5], "col3": [5, 6]}) >>> >>> dataset = GBQTableDataset( ... dataset="dataset", table_name="table_name", project="my-project" ... ) >>> dataset.save(data) >>> reloaded = dataset.load() >>> >>> assert data.equals(reloaded) """ DEFAULT_LOAD_ARGS: dict[str, Any] = {} DEFAULT_SAVE_ARGS: dict[str, Any] = {"progress_bar": False} _CONNECTION_GROUP: ClassVar[str] = "bigquery"
[docs] def __init__( # noqa: PLR0913 self, *, dataset: str, table_name: str, project: str | None = None, credentials: dict[str, Any] | Credentials | None = None, load_args: dict[str, Any] | None = None, save_args: dict[str, Any] | None = None, metadata: dict[str, Any] | None = None, ) -> None: """Creates a new instance of ``GBQTableDataset``. Args: dataset: Google BigQuery dataset. table_name: Google BigQuery table name. project: Google BigQuery Account project ID. Optional when available from the environment. https://cloud.google.com/resource-manager/docs/creating-managing-projects credentials: Credentials for accessing Google APIs. Either ``google.auth.credentials.Credentials`` object or dictionary with parameters required to instantiate ``google.oauth2.credentials.Credentials``. Here you can find all the arguments: https://google-auth.readthedocs.io/en/latest/reference/google.oauth2.credentials.html load_args: Pandas options for loading BigQuery table into DataFrame. Here you can find all available arguments: https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.read_gbq.html All defaults are preserved. save_args: Pandas options for saving DataFrame to BigQuery table. Here you can find all available arguments: https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.to_gbq.html All defaults are preserved, but "progress_bar", which is set to False. metadata: Any arbitrary metadata. This is ignored by Kedro, but may be consumed by users or external plugins. Raises: DatasetError: When ``load_args['location']`` and ``save_args['location']`` are different. """ # Handle default load and save arguments self._load_args = {**self.DEFAULT_LOAD_ARGS, **(load_args or {})} self._save_args = {**self.DEFAULT_SAVE_ARGS, **(save_args or {})} self._validate_location() validate_on_forbidden_chars(dataset=dataset, table_name=table_name) self._dataset = dataset self._table_name = table_name self._project_id = project self._connection_config = { "project": self._project_id, "credentials": credentials, "location": self._save_args.get("location"), } self.metadata = metadata
def _describe(self) -> dict[str, Any]: return { "dataset": self._dataset, "table_name": self._table_name, "load_args": self._load_args, "save_args": self._save_args, } def _connect(self) -> bigquery.Client: credentials = self._connection_config["credentials"] if isinstance(credentials, dict): # Only create `Credentials` object once for consistent hash. credentials = Credentials(**credentials) return bigquery.Client( project=self._connection_config["project"], credentials=credentials, location=self._connection_config["location"], )
[docs] def load(self) -> pd.DataFrame: sql = f"select * from {self._dataset}.{self._table_name}" # nosec self._load_args.setdefault("query_or_table", sql) return pd_gbq.read_gbq( project_id=self._project_id, credentials=self._connection._credentials, **self._load_args, )
[docs] def save(self, data: pd.DataFrame) -> None: pd_gbq.to_gbq( dataframe=data, destination_table=f"{self._dataset}.{self._table_name}", project_id=self._project_id, credentials=self._connection._credentials, **self._save_args, )
def _exists(self) -> bool: table_ref = self._connection.dataset(self._dataset).table(self._table_name) try: self._connection.get_table(table_ref) return True except NotFound: return False def _validate_location(self): save_location = self._save_args.get("location") load_location = self._load_args.get("location") if save_location != load_location: raise DatasetError( """"load_args['location']" is different from "save_args['location']". """ "The 'location' defines where BigQuery data is stored, therefore has " "to be the same for save and load args. " "Details: https://cloud.google.com/bigquery/docs/locations" )
[docs] class GBQQueryDataset(AbstractDataset[None, pd.DataFrame]): """``GBQQueryDataset`` loads data from a provided SQL query from Google BigQuery. It uses ``pandas_gbq.read_gbq`` which itself uses ``pandas-gbq`` internally to read from BigQuery table. Therefore it supports all allowed pandas options on ``read_gbq``. Example adding a catalog entry with the ``YAML API``: .. code-block:: yaml vehicles: type: pandas.GBQQueryDataset sql: "select shuttle, shuttle_id from spaceflights.shuttles;" project: my-project credentials: gbq-creds load_args: reauth: True Example using Python API: .. code-block:: pycon >>> from kedro_datasets.pandas import GBQQueryDataset >>> >>> sql = "SELECT * FROM dataset_1.table_a" >>> >>> dataset = GBQQueryDataset(sql, project="my-project") >>> >>> sql_data = dataset.load() """ DEFAULT_LOAD_ARGS: dict[str, Any] = {}
[docs] def __init__( # noqa: PLR0913 self, sql: str | None = None, project: str | None = None, credentials: dict[str, Any] | Credentials | None = None, load_args: dict[str, Any] | None = None, fs_args: dict[str, Any] | None = None, filepath: str | None = None, metadata: dict[str, Any] | None = None, ) -> None: """Creates a new instance of ``GBQQueryDataset``. Args: sql: The sql query statement. project: Google BigQuery Account project ID. Optional when available from the environment. https://cloud.google.com/resource-manager/docs/creating-managing-projects credentials: Credentials for accessing Google APIs. Either ``google.auth.credentials.Credentials`` object or dictionary with parameters required to instantiate ``google.oauth2.credentials.Credentials``. Here you can find all the arguments: https://google-auth.readthedocs.io/en/latest/reference/google.oauth2.credentials.html load_args: Pandas options for loading BigQuery table into DataFrame. Here you can find all available arguments: https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.read_gbq.html All defaults are preserved. fs_args: Extra arguments to pass into underlying filesystem class constructor (e.g. `{"project": "my-project"}` for ``GCSFileSystem``) used for reading the SQL query from filepath. filepath: A path to a file with a sql query statement. metadata: Any arbitrary metadata. This is ignored by Kedro, but may be consumed by users or external plugins. Raises: DatasetError: When ``sql`` and ``filepath`` parameters are either both empty or both provided, as well as when the `save()` method is invoked. """ if sql and filepath: raise DatasetError( "'sql' and 'filepath' arguments cannot both be provided." "Please only provide one." ) if not (sql or filepath): raise DatasetError( "'sql' and 'filepath' arguments cannot both be empty." "Please provide a sql query or path to a sql query file." ) # Handle default load arguments self._load_args = {**self.DEFAULT_LOAD_ARGS, **(load_args or {})} self._project_id = project if isinstance(credentials, dict): credentials = Credentials(**credentials) self._credentials = credentials # load sql query from arg or from file if sql: self._load_args["query_or_table"] = sql self._filepath = None else: # filesystem for loading sql file _fs_args = copy.deepcopy(fs_args) or {} _fs_credentials = _fs_args.pop("credentials", {}) protocol, path = get_protocol_and_path(str(filepath)) self._protocol = protocol self._fs = fsspec.filesystem(self._protocol, **_fs_credentials, **_fs_args) self._filepath = path self.metadata = metadata
def _describe(self) -> dict[str, Any]: load_args = copy.deepcopy(self._load_args) desc = {} desc["sql"] = str(load_args.pop("query_or_table", None)) desc["filepath"] = str(self._filepath) desc["load_args"] = str(load_args) return desc
[docs] def load(self) -> pd.DataFrame: load_args = copy.deepcopy(self._load_args) if self._filepath: load_path = get_filepath_str(PurePosixPath(self._filepath), self._protocol) with self._fs.open(load_path, mode="r") as fs_file: load_args["query_or_table"] = fs_file.read() return pd_gbq.read_gbq( project_id=self._project_id, credentials=self._credentials, **load_args, )
[docs] def save(self, data: None) -> NoReturn: raise DatasetError("'save' is not supported on GBQQueryDataset")