kedro_datasets.databricks.ManagedTableDataSet¶
- class kedro_datasets.databricks.ManagedTableDataSet(table, catalog=None, database='default', write_mode=None, dataframe_type='spark', primary_key=None, version=None, *, schema=None, partition_columns=None, owner_group=None)[source]¶
ManagedTableDataSet
loads and saves data into managed delta tables on Databricks. Load and save can be in Spark or Pandas dataframes, specified in dataframe_type. When saving data, you can specify one of three modes: overwrite(default), append, or upsert. Upsert requires you to specify the primary_column parameter which will be used as part of the join condition. This dataset works best with the databricks kedro starter. That starter comes with hooks that allow this dataset to function properly. Follow the instructions in that starter to setup your project for this dataset.Example usage for the YAML API:
names_and_ages@spark: type: databricks.ManagedTableDataSet table: names_and_ages names_and_ages@pandas: type: databricks.ManagedTableDataSet table: names_and_ages dataframe_type: pandas
Example usage for the Python API:
from pyspark.sql import SparkSession from pyspark.sql.types import (StructField, StringType, IntegerType, StructType) from kedro_datasets.databricks import ManagedTableDataSet schema = StructType([StructField("name", StringType(), True), StructField("age", IntegerType(), True)]) data = [('Alex', 31), ('Bob', 12), ('Clarke', 65), ('Dave', 29)] spark_df = SparkSession.builder.getOrCreate().createDataFrame(data, schema) data_set = ManagedTableDataSet(table="names_and_ages") data_set.save(spark_df) reloaded = data_set.load() reloaded.take(4)
Methods
exists
()Checks whether a data set's output already exists by calling the provided _exists() method.
from_config
(name, config[, load_version, ...])Create a data set instance using the configuration provided.
load
()Loads data by delegation to the provided load method.
release
()Release any cached data.
Compute the version the dataset should be loaded with.
Compute the version the dataset should be saved with.
save
(data)Saves data by delegation to the provided save method.
- __init__(table, catalog=None, database='default', write_mode=None, dataframe_type='spark', primary_key=None, version=None, *, schema=None, partition_columns=None, owner_group=None)[source]¶
Creates a new instance of
ManagedTableDataSet
- Parameters
table (
str
) – the name of the tablecatalog (
Optional
[str
]) – the name of the catalog in Unity. Defaults to None.database (
str
) – the name of the database. (also referred to as schema). Defaults to “default”.write_mode (
Optional
[str
]) – the mode to write the data into the table. If not present, the data set is read-only. Options are:[“overwrite”, “append”, “upsert”]. “upsert” mode requires primary_key field to be populated. Defaults to None.dataframe_type (
str
) – “pandas” or “spark” dataframe. Defaults to “spark”.primary_key (
Union
[str
,List
[str
],None
]) – the primary key of the table. Can be in the form of a list. Defaults to None.version (
Optional
[Version
]) – kedro.io.core.Version instance to load the data. Defaults to None.schema (
Optional
[Dict
[str
,Any
]]) – the schema of the table in JSON form. Dataframes will be truncated to match the schema if provided. Used by the hooks to create the table if the schema is provided Defaults to None.partition_columns (
Optional
[List
[str
]]) – the columns to use for partitioning the table. Used by the hooks. Defaults to None.owner_group (
Optional
[str
]) – if table access control is enabled in your workspace, specifying owner_group will transfer ownership of the table and database to this owner. All databases should have the same owner_group. Defaults to None.
- Raises
DataSetError – Invalid configuration supplied (through ManagedTable validation)
- exists()¶
Checks whether a data set’s output already exists by calling the provided _exists() method.
- Return type
bool
- Returns
Flag indicating whether the output already exists.
- Raises
DatasetError – when underlying exists method raises error.
- classmethod from_config(name, config, load_version=None, save_version=None)¶
Create a data set instance using the configuration provided.
- Parameters
name – Data set name.
config – Data set config dictionary.
load_version – Version string to be used for
load
operation if the data set is versioned. Has no effect on the data set if versioning was not enabled.save_version – Version string to be used for
save
operation if the data set is versioned. Has no effect on the data set if versioning was not enabled.
- Returns
An instance of an
AbstractDataset
subclass.- Raises
DatasetError – When the function fails to create the data set from its config.
- load()¶
Loads data by delegation to the provided load method.
- Return type
TypeVar
(_DO
)- Returns
Data returned by the provided load method.
- Raises
DatasetError – When underlying load method raises error.
- release()¶
Release any cached data.
- Raises
DatasetError – when underlying release method raises error.
- Return type
None
- resolve_load_version()¶
Compute the version the dataset should be loaded with.
- Return type
str | None
- resolve_save_version()¶
Compute the version the dataset should be saved with.
- Return type
str | None
- save(data)¶
Saves data by delegation to the provided save method.
- Parameters
data (
TypeVar
(_DI
)) – the value to be saved by provided save method.- Raises
DatasetError – when underlying save method raises error.
FileNotFoundError – when save method got file instead of dir, on Windows.
NotADirectoryError – when save method got file instead of dir, on Unix.
- Return type
None