kedro_datasets_experimental.databricks.ExternalTableDataset

class kedro_datasets_experimental.databricks.ExternalTableDataset(*, table, catalog=None, database='default', format='delta', write_mode=None, location=None, dataframe_type='spark', primary_key=None, version=None, schema=None, partition_columns=None, owner_group=None, metadata=None)[source]

ExternalTableDataset loads and saves data into external tables in Databricks. Load and save can be in Spark or Pandas dataframes, specified in dataframe_type.

Example usage for the YAML API:

names_and_ages@spark:
  type: databricks.ExternalTableDataset
  format: parquet
  table: names_and_ages

names_and_ages@pandas:
  type: databricks.ExternalTableDataset
  format: parquet
  table: names_and_ages
  dataframe_type: pandas

Example usage for the Python API:

 from kedro_datasets.databricks import ExternalTableDataset
 from pyspark.sql import SparkSession
 from pyspark.sql.types import IntegerType, Row, StringType, StructField, StructType
 import importlib_metadata

 DELTA_VERSION = importlib_metadata.version("delta-spark")
 schema = StructType(
...     [StructField("name", StringType(), True), StructField("age", IntegerType(), True)]
... )
 data = [("Alex", 31), ("Bob", 12), ("Clarke", 65), ("Dave", 29)]
 spark_df = (
...     SparkSession.builder.config(
...         "spark.jars.packages", f"io.delta:delta-core_2.12:{DELTA_VERSION}"
...     )
...     .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
...     .config(
...         "spark.sql.catalog.spark_catalog",
...         "org.apache.spark.sql.delta.catalog.DeltaCatalog",
...     )
...     .getOrCreate()
...     .createDataFrame(data, schema)
... )
 dataset = ExternalTableDataset(
...     table="names_and_ages",
...     write_mode="overwrite",
...     location="abfss://container@storageaccount.dfs.core.windows.net/depts/cust"
... )
 dataset.save(spark_df)
 reloaded = dataset.load()
 assert Row(name="Bob", age=12) in reloaded.take(4)

Methods

exists()

Checks whether a dataset's output already exists by calling the provided _exists() method.

from_config(name, config[, load_version, ...])

Create a dataset instance using the configuration provided.

load()

Loads the version of data in the format defined in the init (spark|pandas dataframe).

release()

Release any cached data.

resolve_load_version()

Compute the version the dataset should be loaded with.

resolve_save_version()

Compute the version the dataset should be saved with.

save(data)

Saves the data based on the write_mode and dataframe_type in the init.

to_config()

Converts the dataset instance into a dictionary-based configuration for serialization.

__init__(*, table, catalog=None, database='default', format='delta', write_mode=None, location=None, dataframe_type='spark', primary_key=None, version=None, schema=None, partition_columns=None, owner_group=None, metadata=None)[source]

Creates a new instance of BaseTableDataset.

Parameters:
  • table (str) – The name of the table.

  • catalog (Optional[str]) – The name of the catalog in Unity. Defaults to None.

  • database (str) – The name of the database. (also referred to as schema). Defaults to “default”.

  • format (str) – The format of the table. Applicable only for external tables. Defaults to “delta”.

  • write_mode (Optional[str]) – The mode to write the data into the table. If not present, the data set is read-only. Options are:[“overwrite”, “append”, “upsert”]. “upsert” mode requires primary_key field to be populated. Defaults to None.

  • location (Optional[str]) – The location of the table. Applicable only for external tables. Should be a valid path in an external location that has already been created. Defaults to None.

  • dataframe_type (str) – “pandas” or “spark” dataframe. Defaults to “spark”.

  • primary_key (Union[str, list[str], None]) – The primary key of the table. Can be in the form of a list. Defaults to None.

  • version (Optional[Version]) – kedro.io.core.Version instance to load the data. Defaults to None.

  • schema (Optional[dict[str, Any]]) – The schema of the table in JSON form. Dataframes will be truncated to match the schema if provided. Used by the hooks to create the table if the schema is provided. Defaults to None.

  • partition_columns (Optional[list[str]]) – The columns to use for partitioning the table. Used by the hooks. Defaults to None.

  • owner_group (Optional[str]) – If table access control is enabled in your workspace, specifying owner_group will transfer ownership of the table and database to this owner. All databases should have the same owner_group. Defaults to None.

  • metadata (Optional[dict[str, Any]]) – Any arbitrary metadata. This is ignored by Kedro, but may be consumed by users or external plugins.

Raises:

DatasetError – Invalid configuration supplied (through BaseTable validation).

exists()[source]

Checks whether a dataset’s output already exists by calling the provided _exists() method.

Return type:

bool

Returns:

Flag indicating whether the output already exists.

Raises:

DatasetError – when underlying exists method raises error.

classmethod from_config(name, config, load_version=None, save_version=None)[source]

Create a dataset instance using the configuration provided.

Parameters:
  • name (str) – Data set name.

  • config (dict[str, Any]) – Data set config dictionary.

  • load_version (Optional[str]) – Version string to be used for load operation if the dataset is versioned. Has no effect on the dataset if versioning was not enabled.

  • save_version (Optional[str]) – Version string to be used for save operation if the dataset is versioned. Has no effect on the dataset if versioning was not enabled.

Return type:

AbstractDataset

Returns:

An instance of an AbstractDataset subclass.

Raises:

DatasetError – When the function fails to create the dataset from its config.

load()[source]

Loads the version of data in the format defined in the init (spark|pandas dataframe).

Raises:

VersionNotFoundError – If the version defined in the init doesn’t exist.

Returns:

Returns a dataframe

in the format defined in the init.

Return type:

Union[DataFrame, pd.DataFrame]

release()[source]

Release any cached data.

Raises:

DatasetError – when underlying release method raises error.

Return type:

None

resolve_load_version()[source]

Compute the version the dataset should be loaded with.

Return type:

Optional[str]

resolve_save_version()[source]

Compute the version the dataset should be saved with.

Return type:

Optional[str]

save(data)[source]

Saves the data based on the write_mode and dataframe_type in the init. If write_mode is pandas, Spark dataframe is created first. If schema is provided, data is matched to schema before saving (columns will be sorted and truncated).

Parameters:

data (Any) – Spark or pandas dataframe to save to the table location.

Return type:

None

to_config()[source]

Converts the dataset instance into a dictionary-based configuration for serialization. Ensures that any subclass-specific details are handled, with additional logic for versioning and caching implemented for CachedDataset.

Adds a key for the dataset’s type using its module and class name and includes the initialization arguments.

For CachedDataset it extracts the underlying dataset’s configuration, handles the versioned flag and removes unnecessary metadata. It also ensures the embedded dataset’s configuration is appropriately flattened or transformed.

If the dataset has a version key, it sets the versioned flag in the configuration.

Removes the metadata key from the configuration if present.

Return type:

dict[str, Any]

Returns:

A dictionary containing the dataset’s type and initialization arguments.