Skip to content

databricks.ExternalTableDataset

kedro_datasets_experimental.databricks.ExternalTableDataset

ExternalTableDataset(*, table, catalog=None, database='default', format='delta', write_mode=None, location=None, dataframe_type='spark', primary_key=None, version=None, schema=None, partition_columns=None, owner_group=None, metadata=None)

Bases: BaseTableDataset

ExternalTableDataset loads and saves data into external tables in Databricks. Load and save operations can use either Spark or Pandas DataFrames, specified via the dataframe_type argument.

Example usage for the YAML API:
names_and_ages@spark:
    type: databricks.ExternalTableDataset
    format: parquet
    table: names_and_ages

names_and_ages@pandas:
    type: databricks.ExternalTableDataset
    format: parquet
    table: names_and_ages
    dataframe_type: pandas
Example usage for the Python API:
from kedro_datasets.databricks import ExternalTableDataset
from pyspark.sql import SparkSession
from pyspark.sql.types import IntegerType, Row, StringType, StructField, StructType
import importlib_metadata

DELTA_VERSION = importlib_metadata.version("delta-spark")
schema = StructType([
    StructField("name", StringType(), True),
    StructField("age", IntegerType(), True)
])
data = [("Alex", 31), ("Bob", 12), ("Clarke", 65), ("Dave", 29)]

spark_df = (
    SparkSession.builder.config(
        "spark.jars.packages", f"io.delta:delta-core_2.12:{DELTA_VERSION}"
    )
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
    .config(
        "spark.sql.catalog.spark_catalog",
        "org.apache.spark.sql.delta.catalog.DeltaCatalog",
    )
    .getOrCreate()
    .createDataFrame(data, schema)
)

dataset = ExternalTableDataset(
    table="names_and_ages",
    write_mode="overwrite",
    location="abfss://container@storageaccount.dfs.core.windows.net/depts/cust"
)

dataset.save(spark_df)
reloaded = dataset.load()
assert Row(name="Bob", age=12) in reloaded.take(4)
Source code in kedro-datasets/kedro_datasets/databricks/_base_table_dataset.py
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
def __init__(  # noqa: PLR0913
    self,
    *,
    table: str,
    catalog: str | None = None,
    database: str = "default",
    format: str = "delta",
    write_mode: str | None = None,
    location: str | None = None,
    dataframe_type: str = "spark",
    primary_key: str | list[str] | None = None,
    version: Version | None = None,
    # the following parameters are used by project hooks
    # to create or update table properties
    schema: dict[str, Any] | None = None,
    partition_columns: list[str] | None = None,
    owner_group: str | None = None,
    metadata: dict[str, Any] | None = None,
) -> None:
    """Creates a new instance of ``BaseTableDataset``.

    Args:
        table: The name of the table.
        catalog: The name of the catalog in Unity.
            Defaults to None.
        database: The name of the database.
            (also referred to as schema). Defaults to "default".
        format: The format of the table.
            Applicable only for external tables.
            Defaults to "delta".
        write_mode: The mode to write the data into the table. If not
            present, the data set is read-only.
            Options are:["overwrite", "append", "upsert"].
            "upsert" mode requires primary_key field to be populated.
            Defaults to None.
        location: The location of the table.
            Applicable only for external tables.
            Should be a valid path in an external location that has already been created.
            Defaults to None.
        dataframe_type: "pandas" or "spark" dataframe.
            Defaults to "spark".
        primary_key: The primary key of the table.
            Can be in the form of a list. Defaults to None.
        version: kedro.io.core.Version instance to load the data.
            Defaults to None.
        schema: The schema of the table in JSON form.
            Dataframes will be truncated to match the schema if provided.
            Used by the hooks to create the table if the schema is provided.
            Defaults to None.
        partition_columns: The columns to use for partitioning the table.
            Used by the hooks. Defaults to None.
        owner_group: If table access control is enabled in your workspace,
            specifying owner_group will transfer ownership of the table and database to
            this owner. All databases should have the same owner_group. Defaults to None.
        metadata: Any arbitrary metadata.
            This is ignored by Kedro, but may be consumed by users or external plugins.
    Raises:
        DatasetError: Invalid configuration supplied (through ``BaseTable`` validation).
    """
    self._table = self._create_table(
        table=table,
        catalog=catalog,
        database=database,
        format=format,
        write_mode=write_mode,
        location=location,
        dataframe_type=dataframe_type,
        primary_key=primary_key,
        json_schema=schema,
        partition_columns=partition_columns,
        owner_group=owner_group,
    )

    self.metadata = metadata
    self._version = version

    super().__init__(
        filepath=None,  # type: ignore[arg-type]
        version=version,
        exists_function=self._exists,  # type: ignore[arg-type]
    )

_create_table

_create_table(table, catalog, database, format, write_mode, location, dataframe_type, primary_key, json_schema, partition_columns, owner_group)

Creates a new ExternalTable instance with the provided attributes. Args: table: The name of the table. catalog: The catalog of the table. database: The database of the table. format: The format of the table. write_mode: The write mode for the table. location: The location of the table. dataframe_type: The type of dataframe. primary_key: The primary key of the table. json_schema: The JSON schema of the table. partition_columns: The partition columns of the table. owner_group: The owner group of the table. Returns: ExternalTable: The new ExternalTable instance.

Source code in kedro-datasets/kedro_datasets_experimental/databricks/external_table_dataset.py
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
def _create_table(  # noqa: PLR0913
    self,
    table: str,
    catalog: str | None,
    database: str,
    format: str,
    write_mode: str | None,
    location: str | None,
    dataframe_type: str,
    primary_key: str | list[str] | None,
    json_schema: dict[str, Any] | None,
    partition_columns: list[str] | None,
    owner_group: str | None
) -> ExternalTable:
    """Creates a new ``ExternalTable`` instance with the provided attributes.
    Args:
        table: The name of the table.
        catalog: The catalog of the table.
        database: The database of the table.
        format: The format of the table.
        write_mode: The write mode for the table.
        location: The location of the table.
        dataframe_type: The type of dataframe.
        primary_key: The primary key of the table.
        json_schema: The JSON schema of the table.
        partition_columns: The partition columns of the table.
        owner_group: The owner group of the table.
    Returns:
        ``ExternalTable``: The new ``ExternalTable`` instance.
    """
    return ExternalTable(
        table=table,
        catalog=catalog,
        database=database,
        write_mode=write_mode,
        location=location,
        dataframe_type=dataframe_type,
        json_schema=json_schema,
        partition_columns=partition_columns,
        owner_group=owner_group,
        primary_key=primary_key,
        format=format
    )

_save_overwrite

_save_overwrite(data)

Overwrites the data in the table with the data provided. Args: data (DataFrame): The Spark dataframe to overwrite the table with.

Source code in kedro-datasets/kedro_datasets_experimental/databricks/external_table_dataset.py
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
def _save_overwrite(self, data: DataFrame) -> None:
    """Overwrites the data in the table with the data provided.
    Args:
        data (DataFrame): The Spark dataframe to overwrite the table with.
    """
    writer = data.write.format(self._table.format).mode("overwrite").option(
        "overwriteSchema", "true"
    )

    if self._table.partition_columns:
        writer.partitionBy(
            *self._table.partition_columns if isinstance(self._table.partition_columns, list) else self._table.partition_columns
        )

    if self._table.format == "delta" or (not self._table.exists()):
        if self._table.location:
            writer.option("path", self._table.location)

        writer.saveAsTable(self._table.full_table_location() or "")

    else:
        writer.save(self._table.location)