Skip to content

databricks.ExternalTableDataset

kedro_datasets_experimental.databricks.ExternalTableDataset

ExternalTableDataset(
    *,
    table,
    catalog=None,
    database="default",
    format="delta",
    write_mode=None,
    location=None,
    dataframe_type="spark",
    primary_key=None,
    version=None,
    schema=None,
    partition_columns=None,
    owner_group=None,
    metadata=None
)

Bases: BaseTableDataset

ExternalTableDataset loads and saves data into external tables in Databricks. Load and save operations can use either Spark or Pandas DataFrames, specified via the dataframe_type argument.

### Example usage for the [YAML API](https://docs.kedro.org/en/stable/catalog-data/data_catalog_yaml_examples/):

```yaml
names_and_ages@spark:
    type: databricks.ExternalTableDataset
    format: parquet
    table: names_and_ages

names_and_ages@pandas:
    type: databricks.ExternalTableDataset
    format: parquet
    table: names_and_ages
    dataframe_type: pandas
```

### Example usage for the [Python API](https://docs.kedro.org/en/stable/catalog-data/advanced_data_catalog_usage/):
from kedro_datasets.databricks import ExternalTableDataset
from pyspark.sql import SparkSession
from pyspark.sql.types import IntegerType, Row, StringType, StructField, StructType
import importlib.metadata

DELTA_VERSION = importlib.metadata.version("delta-spark")
major_version = int(DELTA_VERSION.split(".")[0])
delta_package = (
    f"io.delta:delta-spark_2.13:{DELTA_VERSION}"
    if major_version >= 4
    else f"io.delta:delta-core_2.12:{DELTA_VERSION}"
)

schema = StructType(
    [StructField("name", StringType(), True), StructField("age", IntegerType(), True)]
)
data = [("Alex", 31), ("Bob", 12), ("Clarke", 65), ("Dave", 29)]

spark_df = (
    SparkSession.builder.config("spark.jars.packages", delta_package)
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
    .config(
        "spark.sql.catalog.spark_catalog",
        "org.apache.spark.sql.delta.catalog.DeltaCatalog",
    )
    .getOrCreate()
    .createDataFrame(data, schema)
)

dataset = ExternalTableDataset(
    table="names_and_ages",
    write_mode="overwrite",
    location="abfss://container@storageaccount.dfs.core.windows.net/depts/cust",
)

dataset.save(spark_df)
reloaded = dataset.load()
assert Row(name="Bob", age=12) in reloaded.take(4)
Source code in kedro_datasets/databricks/_base_table_dataset.py
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
def __init__(  # noqa: PLR0913
    self,
    *,
    table: str,
    catalog: str | None = None,
    database: str = "default",
    format: str = "delta",
    write_mode: str | None = None,
    location: str | None = None,
    dataframe_type: str = "spark",
    primary_key: str | list[str] | None = None,
    version: Version | None = None,
    # the following parameters are used by project hooks
    # to create or update table properties
    schema: dict[str, Any] | None = None,
    partition_columns: list[str] | None = None,
    owner_group: str | None = None,
    metadata: dict[str, Any] | None = None,
) -> None:
    """Creates a new instance of ``BaseTableDataset``.

    Args:
        table: The name of the table.
        catalog: The name of the catalog in Unity.
            Defaults to None.
        database: The name of the database.
            (also referred to as schema). Defaults to "default".
        format: The format of the table.
            Applicable only for external tables.
            Defaults to "delta".
        write_mode: The mode to write the data into the table. If not
            present, the data set is read-only.
            Options are:["overwrite", "append", "upsert"].
            "upsert" mode requires primary_key field to be populated.
            Defaults to None.
        location: The location of the table.
            Applicable only for external tables.
            Should be a valid path in an external location that has already been created.
            Defaults to None.
        dataframe_type: "pandas" or "spark" dataframe.
            Defaults to "spark".
        primary_key: The primary key of the table.
            Can be in the form of a list. Defaults to None.
        version: kedro.io.core.Version instance to load the data.
            Defaults to None.
        schema: The schema of the table in JSON form.
            Dataframes will be truncated to match the schema if provided.
            Used by the hooks to create the table if the schema is provided.
            Defaults to None.
        partition_columns: The columns to use for partitioning the table.
            Used by the hooks. Defaults to None.
        owner_group: If table access control is enabled in your workspace,
            specifying owner_group will transfer ownership of the table and database to
            this owner. All databases should have the same owner_group. Defaults to None.
        metadata: Any arbitrary metadata.
            This is ignored by Kedro, but may be consumed by users or external plugins.
    Raises:
        DatasetError: Invalid configuration supplied (through ``BaseTable`` validation).
    """
    self._table = self._create_table(
        table=table,
        catalog=catalog,
        database=database,
        format=format,
        write_mode=write_mode,
        location=location,
        dataframe_type=dataframe_type,
        primary_key=primary_key,
        json_schema=schema,
        partition_columns=partition_columns,
        owner_group=owner_group,
    )

    self.metadata = metadata
    self._version = version

    super().__init__(
        filepath=None,  # type: ignore[arg-type]
        version=version,
        exists_function=self._exists,  # type: ignore[arg-type]
    )

_create_table

_create_table(
    table,
    catalog,
    database,
    format,
    write_mode,
    location,
    dataframe_type,
    primary_key,
    json_schema,
    partition_columns,
    owner_group,
)

Creates a new ExternalTable instance with the provided attributes. Args: table: The name of the table. catalog: The catalog of the table. database: The database of the table. format: The format of the table. write_mode: The write mode for the table. location: The location of the table. dataframe_type: The type of dataframe. primary_key: The primary key of the table. json_schema: The JSON schema of the table. partition_columns: The partition columns of the table. owner_group: The owner group of the table. Returns: ExternalTable: The new ExternalTable instance.

Source code in kedro_datasets_experimental/databricks/external_table_dataset.py
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
def _create_table(  # noqa: PLR0913
    self,
    table: str,
    catalog: str | None,
    database: str,
    format: str,
    write_mode: str | None,
    location: str | None,
    dataframe_type: str,
    primary_key: str | list[str] | None,
    json_schema: dict[str, Any] | None,
    partition_columns: list[str] | None,
    owner_group: str | None
) -> ExternalTable:
    """Creates a new ``ExternalTable`` instance with the provided attributes.
    Args:
        table: The name of the table.
        catalog: The catalog of the table.
        database: The database of the table.
        format: The format of the table.
        write_mode: The write mode for the table.
        location: The location of the table.
        dataframe_type: The type of dataframe.
        primary_key: The primary key of the table.
        json_schema: The JSON schema of the table.
        partition_columns: The partition columns of the table.
        owner_group: The owner group of the table.
    Returns:
        ``ExternalTable``: The new ``ExternalTable`` instance.
    """
    return ExternalTable(
        table=table,
        catalog=catalog,
        database=database,
        write_mode=write_mode,
        location=location,
        dataframe_type=dataframe_type,
        json_schema=json_schema,
        partition_columns=partition_columns,
        owner_group=owner_group,
        primary_key=primary_key,
        format=format
    )

_save_overwrite

_save_overwrite(data)

Overwrites the data in the table with the data provided. Args: data (DataFrame): The Spark dataframe to overwrite the table with.

Source code in kedro_datasets_experimental/databricks/external_table_dataset.py
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
def _save_overwrite(self, data: DataFrame) -> None:
    """Overwrites the data in the table with the data provided.
    Args:
        data (DataFrame): The Spark dataframe to overwrite the table with.
    """
    writer = data.write.format(self._table.format).mode("overwrite").option(
        "overwriteSchema", "true"
    )

    if self._table.partition_columns:
        writer.partitionBy(
            *self._table.partition_columns if isinstance(self._table.partition_columns, list) else self._table.partition_columns
        )

    if self._table.format == "delta" or (not self._table.exists()):
        if self._table.location:
            writer.option("path", self._table.location)

        writer.saveAsTable(self._table.full_table_location() or "")

    else:
        writer.save(self._table.location)