ManagedTableDataset¶
ManagedTableDataset loads and saves data into managed delta tables in Databricks.
kedro_datasets.databricks.ManagedTableDataset ¶
ManagedTableDataset(
*,
table,
catalog=None,
database="default",
write_mode=None,
dataframe_type="spark",
primary_key=None,
version=None,
schema=None,
partition_columns=None,
owner_group=None,
metadata=None
)
Bases: BaseTableDataset
ManagedTableDataset loads and saves data into managed delta tables in Databricks.
Load and save can be in Spark or Pandas dataframes, specified in dataframe_type.
When saving data, you can specify one of three modes: overwrite, append,
or upsert. Upsert requires you to specify the primary_column parameter which
will be used as part of the join condition. This dataset works best with
the databricks kedro starter. That starter comes with hooks that allow this
dataset to function properly. Follow the instructions in that starter to
setup your project for this dataset.
Examples:
Using the YAML API:
names_and_ages@spark:
type: databricks.ManagedTableDataset
table: names_and_ages
names_and_ages@pandas:
type: databricks.ManagedTableDataset
table: names_and_ages
dataframe_type: pandas
Using the Python API:
>>> import importlib.metadata
>>>
>>> from kedro_datasets.databricks import ManagedTableDataset
>>> from pyspark.sql import SparkSession
>>> from pyspark.sql.types import IntegerType, Row, StringType, StructField, StructType
>>>
>>> DELTA_VERSION = importlib.metadata.version("delta-spark")
>>> major_version = int(DELTA_VERSION.split('.')[0])
>>> delta_package = (
... f"io.delta:delta-spark_2.13:{DELTA_VERSION}"
... if major_version >= 4
... else f"io.delta:delta-core_2.12:{DELTA_VERSION}"
... )
>>> schema = StructType(
... [StructField("name", StringType(), True), StructField("age", IntegerType(), True)]
... )
>>> data = [("Alex", 31), ("Bob", 12), ("Clarke", 65), ("Dave", 29)]
>>> spark_df = (
... SparkSession.builder.config(
... "spark.jars.packages", delta_package
... )
... .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
... .config(
... "spark.sql.catalog.spark_catalog",
... "org.apache.spark.sql.delta.catalog.DeltaCatalog",
... )
... .getOrCreate()
... .createDataFrame(data, schema)
... )
>>>
>>> dataset = ManagedTableDataset(table="names_and_ages", write_mode="overwrite")
>>> dataset.save(spark_df)
>>> reloaded = dataset.load()
Parameters:
-
table(str) –The name of the table.
-
catalog(str | None, default:None) –The name of the catalog in Unity. Defaults to None.
-
database(str, default:'default') –The name of the database. (also referred to as schema). Defaults to "default".
-
write_mode(str | None, default:None) –the mode to write the data into the table. If not present, the dataset is read-only. Options are:["overwrite", "append", "upsert"]. "upsert" mode requires primary_key field to be populated. Defaults to None.
-
dataframe_type(str, default:'spark') –"pandas" or "spark" dataframe. Defaults to "spark".
-
primary_key(str | list[str] | None, default:None) –The primary key of the table. Can be in the form of a list. Defaults to None.
-
version(Version | None, default:None) –kedro.io.core.Version instance to load the data. Defaults to None.
-
schema(dict[str, Any] | None, default:None) –The schema of the table in JSON form. Dataframes will be truncated to match the schema if provided. Used by the hooks to create the table if the schema is provided. Defaults to None.
-
partition_columns(list[str] | None, default:None) –The columns to use for partitioning the table. Used by the hooks. Defaults to None.
-
owner_group(str | None, default:None) –If table access control is enabled in your workspace, specifying owner_group will transfer ownership of the table and database to this owner. All databases should have the same owner_group. Defaults to None.
-
metadata(dict[str, Any] | None, default:None) –Any arbitrary metadata. This is ignored by Kedro, but may be consumed by users or external plugins.
Raises:
DatasetError: Invalid configuration supplied (through ManagedTable validation).
Source code in kedro_datasets/databricks/managed_table_dataset.py
89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 | |
_create_table ¶
_create_table(
table,
catalog,
database,
format,
write_mode,
location,
dataframe_type,
primary_key,
json_schema,
partition_columns,
owner_group,
)
Creates a new ManagedTable instance with the provided attributes.
Parameters:
-
table(str) –The name of the table.
-
catalog(str | None) –The catalog of the table.
-
database(str) –The database of the table.
-
format(str) –The format of the table.
-
write_mode(str | None) –The write mode for the table.
-
dataframe_type(str) –The type of dataframe.
-
primary_key(str | list[str] | None) –The primary key of the table.
-
json_schema(dict[str, Any] | None) –The JSON schema of the table.
-
partition_columns(list[str] | None) –The partition columns of the table.
-
owner_group(str | None) –The owner group of the table.
Returns:
-
``ManagedTable``–The new
ManagedTableinstance.
Source code in kedro_datasets/databricks/managed_table_dataset.py
153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 | |
_describe ¶
_describe()
Returns a description of the instance of the dataset.
Returns:
-
Dict[str, str]–Dict with the details of the dataset.
Source code in kedro_datasets/databricks/managed_table_dataset.py
198 199 200 201 202 203 204 205 206 207 208 | |