Skip to content

ManagedTableDataset

ManagedTableDataset loads and saves data into managed delta tables in Databricks.

kedro_datasets.databricks.ManagedTableDataset

ManagedTableDataset(
    *,
    table,
    catalog=None,
    database="default",
    write_mode=None,
    dataframe_type="spark",
    primary_key=None,
    version=None,
    schema=None,
    partition_columns=None,
    owner_group=None,
    metadata=None
)

Bases: BaseTableDataset

ManagedTableDataset loads and saves data into managed delta tables in Databricks. Load and save can be in Spark or Pandas dataframes, specified in dataframe_type. When saving data, you can specify one of three modes: overwrite, append, or upsert. Upsert requires you to specify the primary_column parameter which will be used as part of the join condition. This dataset works best with the databricks kedro starter. That starter comes with hooks that allow this dataset to function properly. Follow the instructions in that starter to setup your project for this dataset.

Examples:

Using the YAML API:

names_and_ages@spark:
  type: databricks.ManagedTableDataset
  table: names_and_ages

names_and_ages@pandas:
  type: databricks.ManagedTableDataset
  table: names_and_ages
  dataframe_type: pandas

Using the Python API:

>>> import importlib.metadata
>>>
>>> from kedro_datasets.databricks import ManagedTableDataset
>>> from pyspark.sql import SparkSession
>>> from pyspark.sql.types import IntegerType, Row, StringType, StructField, StructType
>>>
>>> DELTA_VERSION = importlib.metadata.version("delta-spark")
>>> major_version = int(DELTA_VERSION.split('.')[0])
>>> delta_package = (
...     f"io.delta:delta-spark_2.13:{DELTA_VERSION}"
...     if major_version >= 4
...     else f"io.delta:delta-core_2.12:{DELTA_VERSION}"
... )
>>> schema = StructType(
...     [StructField("name", StringType(), True), StructField("age", IntegerType(), True)]
... )
>>> data = [("Alex", 31), ("Bob", 12), ("Clarke", 65), ("Dave", 29)]
>>> spark_df = (
...     SparkSession.builder.config(
...         "spark.jars.packages", delta_package
...     )
...     .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
...     .config(
...         "spark.sql.catalog.spark_catalog",
...         "org.apache.spark.sql.delta.catalog.DeltaCatalog",
...     )
...     .getOrCreate()
...     .createDataFrame(data, schema)
... )
>>>
>>> dataset = ManagedTableDataset(table="names_and_ages", write_mode="overwrite")
>>> dataset.save(spark_df)
>>> reloaded = dataset.load()

Parameters:

  • table (str) –

    The name of the table.

  • catalog (str | None, default: None ) –

    The name of the catalog in Unity. Defaults to None.

  • database (str, default: 'default' ) –

    The name of the database. (also referred to as schema). Defaults to "default".

  • write_mode (str | None, default: None ) –

    the mode to write the data into the table. If not present, the dataset is read-only. Options are:["overwrite", "append", "upsert"]. "upsert" mode requires primary_key field to be populated. Defaults to None.

  • dataframe_type (str, default: 'spark' ) –

    "pandas" or "spark" dataframe. Defaults to "spark".

  • primary_key (str | list[str] | None, default: None ) –

    The primary key of the table. Can be in the form of a list. Defaults to None.

  • version (Version | None, default: None ) –

    kedro.io.core.Version instance to load the data. Defaults to None.

  • schema (dict[str, Any] | None, default: None ) –

    The schema of the table in JSON form. Dataframes will be truncated to match the schema if provided. Used by the hooks to create the table if the schema is provided. Defaults to None.

  • partition_columns (list[str] | None, default: None ) –

    The columns to use for partitioning the table. Used by the hooks. Defaults to None.

  • owner_group (str | None, default: None ) –

    If table access control is enabled in your workspace, specifying owner_group will transfer ownership of the table and database to this owner. All databases should have the same owner_group. Defaults to None.

  • metadata (dict[str, Any] | None, default: None ) –

    Any arbitrary metadata. This is ignored by Kedro, but may be consumed by users or external plugins.

Raises: DatasetError: Invalid configuration supplied (through ManagedTable validation).

Source code in kedro_datasets/databricks/managed_table_dataset.py
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
def __init__(  # noqa: PLR0913
    self,
    *,
    table: str,
    catalog: str | None = None,
    database: str = "default",
    write_mode: str | None = None,
    dataframe_type: str = "spark",
    primary_key: str | list[str] | None = None,
    version: Version | None = None,
    # the following parameters are used by project hooks
    # to create or update table properties
    schema: dict[str, Any] | None = None,
    partition_columns: list[str] | None = None,
    owner_group: str | None = None,
    metadata: dict[str, Any] | None = None,
) -> None:
    """Creates a new instance of ``ManagedTableDataset``.

    Args:
        table: The name of the table.
        catalog: The name of the catalog in Unity.
            Defaults to None.
        database: The name of the database.
            (also referred to as schema). Defaults to "default".
        write_mode: the mode to write the data into the table. If not
            present, the dataset is read-only.
            Options are:["overwrite", "append", "upsert"].
            "upsert" mode requires primary_key field to be populated.
            Defaults to None.
        dataframe_type: "pandas" or "spark" dataframe.
            Defaults to "spark".
        primary_key: The primary key of the table.
            Can be in the form of a list. Defaults to None.
        version: kedro.io.core.Version instance to load the data.
            Defaults to None.
        schema: The schema of the table in JSON form.
            Dataframes will be truncated to match the schema if provided.
            Used by the hooks to create the table if the schema is provided.
            Defaults to None.
        partition_columns: The columns to use for partitioning the table.
            Used by the hooks. Defaults to None.
        owner_group: If table access control is enabled in your workspace,
            specifying owner_group will transfer ownership of the table and database to
            this owner. All databases should have the same owner_group. Defaults to None.
        metadata: Any arbitrary metadata.
            This is ignored by Kedro, but may be consumed by users or external plugins.
    Raises:
        DatasetError: Invalid configuration supplied (through ``ManagedTable`` validation).
    """
    super().__init__(
        database=database,
        catalog=catalog,
        table=table,
        write_mode=write_mode,
        dataframe_type=dataframe_type,
        version=version,
        schema=schema,
        partition_columns=partition_columns,
        metadata=metadata,
        primary_key=primary_key,
        owner_group=owner_group,
    )

_create_table

_create_table(
    table,
    catalog,
    database,
    format,
    write_mode,
    location,
    dataframe_type,
    primary_key,
    json_schema,
    partition_columns,
    owner_group,
)

Creates a new ManagedTable instance with the provided attributes.

Parameters:

  • table (str) –

    The name of the table.

  • catalog (str | None) –

    The catalog of the table.

  • database (str) –

    The database of the table.

  • format (str) –

    The format of the table.

  • write_mode (str | None) –

    The write mode for the table.

  • dataframe_type (str) –

    The type of dataframe.

  • primary_key (str | list[str] | None) –

    The primary key of the table.

  • json_schema (dict[str, Any] | None) –

    The JSON schema of the table.

  • partition_columns (list[str] | None) –

    The partition columns of the table.

  • owner_group (str | None) –

    The owner group of the table.

Returns:

  • ``ManagedTable``

    The new ManagedTable instance.

Source code in kedro_datasets/databricks/managed_table_dataset.py
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
def _create_table(  # noqa: PLR0913
    self,
    table: str,
    catalog: str | None,
    database: str,
    format: str,
    write_mode: str | None,
    location: str | None,
    dataframe_type: str,
    primary_key: str | list[str] | None,
    json_schema: dict[str, Any] | None,
    partition_columns: list[str] | None,
    owner_group: str | None,
) -> ManagedTable:
    """Creates a new ``ManagedTable`` instance with the provided attributes.

    Args:
        table: The name of the table.
        catalog: The catalog of the table.
        database: The database of the table.
        format: The format of the table.
        write_mode: The write mode for the table.
        dataframe_type: The type of dataframe.
        primary_key: The primary key of the table.
        json_schema: The JSON schema of the table.
        partition_columns: The partition columns of the table.
        owner_group: The owner group of the table.

    Returns:
        ``ManagedTable``: The new ``ManagedTable`` instance.
    """
    return ManagedTable(
        table=table,
        catalog=catalog,
        database=database,
        write_mode=write_mode,
        location=location,
        dataframe_type=dataframe_type,
        json_schema=json_schema,
        partition_columns=partition_columns,
        owner_group=owner_group,
        primary_key=primary_key,
        format=format,
    )

_describe

_describe()

Returns a description of the instance of the dataset.

Returns:

  • Dict[str, str]

    Dict with the details of the dataset.

Source code in kedro_datasets/databricks/managed_table_dataset.py
198
199
200
201
202
203
204
205
206
207
208
def _describe(self) -> dict[str, str | list | None]:
    """Returns a description of the instance of the dataset.

    Returns:
        Dict[str, str]: Dict with the details of the dataset.
    """
    description = super()._describe()
    del description["format"]
    del description["location"]

    return description