Skip to content

SparkHiveDataset

SparkHiveDataset loads and saves data to Hive tables using Apache Spark.

kedro_datasets.spark.SparkHiveDataset

SparkHiveDataset(
    *,
    database,
    table,
    write_mode="errorifexists",
    table_pk=None,
    save_args=None,
    metadata=None
)

Bases: AbstractDataset[DataFrame, DataFrame]

SparkHiveDataset loads and saves Spark dataframes stored on Hive. This dataset also handles some incompatible file types such as using partitioned parquet on hive which will not normally allow upserts to existing data without a complete replacement of the existing file/partition.

This Dataset has some key assumptions:

  • Schemas do not change during the pipeline run (defined PKs must be present for the duration of the pipeline).
  • Tables are not being externally modified during upserts. The upsert method is NOT ATOMIC to external changes to the target table while executing. Upsert methodology works by leveraging Spark DataFrame execution plan checkpointing.

Examples:

Using the YAML API:

hive_dataset:
  type: spark.SparkHiveDataset
  database: hive_database
  table: table_name
  write_mode: overwrite

Using the Python API:

>>> from kedro_datasets.spark import SparkHiveDataset
>>> from pyspark.sql import SparkSession
>>> from pyspark.sql.types import StructField, StringType, IntegerType, StructType
>>>
>>> schema = StructType(
...     [StructField("name", StringType(), True), StructField("age", IntegerType(), True)]
... )
>>> data = [("Alex", 31), ("Bob", 12), ("Clarke", 65), ("Dave", 29)]
>>> spark_df = SparkSession.builder.getOrCreate().createDataFrame(data, schema)
>>>
>>> dataset = SparkHiveDataset(
...     database="test_database", table="test_table", write_mode="overwrite"
... )
>>> dataset.save(spark_df)
>>> reloaded = dataset.load()
>>> reloaded.take(4)

Parameters:

  • database (str) –

    The name of the hive database.

  • table (str) –

    The name of the table within the database.

  • write_mode (str, default: 'errorifexists' ) –

    insert, upsert or overwrite are supported.

  • table_pk (list[str] | None, default: None ) –

    If performing an upsert, this identifies the primary key columns used to resolve preexisting data. Is required for write_mode="upsert".

  • save_args (dict[str, Any] | None, default: None ) –

    Optional mapping of any options, passed to the DataFrameWriter.saveAsTable as kwargs. Key example of this is partitionBy which allows data partitioning on a list of column names. Other HiveOptions can be found here: https://spark.apache.org/docs/latest/sql-data-sources-hive-tables.html#specifying-storage-format-for-hive-tables

  • metadata (dict[str, Any] | None, default: None ) –

    Any arbitrary metadata. This is ignored by Kedro, but may be consumed by users or external plugins.

Note

For users leveraging the upsert functionality, a checkpoint directory must be set, e.g. using spark.sparkContext.setCheckpointDir("/path/to/dir") or directly in the Spark conf folder.

Raises:

Source code in kedro_datasets/spark/spark_hive_dataset.py
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
def __init__(  # noqa: PLR0913
    self,
    *,
    database: str,
    table: str,
    write_mode: str = "errorifexists",
    table_pk: list[str] | None = None,
    save_args: dict[str, Any] | None = None,
    metadata: dict[str, Any] | None = None,
) -> None:
    """Creates a new instance of ``SparkHiveDataset``.

    Args:
        database: The name of the hive database.
        table: The name of the table within the database.
        write_mode: ``insert``, ``upsert`` or ``overwrite`` are supported.
        table_pk: If performing an upsert, this identifies the primary key columns used to
            resolve preexisting data. Is required for ``write_mode="upsert"``.
        save_args: Optional mapping of any options,
            passed to the `DataFrameWriter.saveAsTable` as kwargs.
            Key example of this is `partitionBy` which allows data partitioning
            on a list of column names.
            Other `HiveOptions` can be found here:
            https://spark.apache.org/docs/latest/sql-data-sources-hive-tables.html#specifying-storage-format-for-hive-tables
        metadata: Any arbitrary metadata.
            This is ignored by Kedro, but may be consumed by users or external plugins.

    Note:
        For users leveraging the `upsert` functionality,
        a `checkpoint` directory must be set, e.g. using
        `spark.sparkContext.setCheckpointDir("/path/to/dir")`
        or directly in the Spark conf folder.

    Raises:
        DatasetError: Invalid configuration supplied
    """
    _write_modes = ["append", "error", "errorifexists", "upsert", "overwrite"]
    if write_mode not in _write_modes:
        valid_modes = ", ".join(_write_modes)
        raise DatasetError(
            f"Invalid 'write_mode' provided: {write_mode}. "
            f"'write_mode' must be one of: {valid_modes}"
        )
    if write_mode == "upsert" and not table_pk:
        raise DatasetError("'table_pk' must be set to utilise 'upsert' read mode")

    self._write_mode = write_mode
    self._table_pk = table_pk or []
    self._database = database
    self._table = table
    self._full_table_address = f"{database}.{table}"
    self._save_args = deepcopy(self.DEFAULT_SAVE_ARGS)
    if save_args is not None:
        self._save_args.update(save_args)
    self._format = self._save_args.pop("format", None) or "hive"
    self._eager_checkpoint = self._save_args.pop("eager_checkpoint", None) or True

    self.metadata = metadata

DEFAULT_SAVE_ARGS class-attribute instance-attribute

DEFAULT_SAVE_ARGS = {}

_database instance-attribute

_database = database

_eager_checkpoint instance-attribute

_eager_checkpoint = pop('eager_checkpoint', None) or True

_format instance-attribute

_format = pop('format', None) or 'hive'

_full_table_address instance-attribute

_full_table_address = f'{database}.{table}'

_save_args instance-attribute

_save_args = deepcopy(DEFAULT_SAVE_ARGS)

_table instance-attribute

_table = table

_table_pk instance-attribute

_table_pk = table_pk or []

_write_mode instance-attribute

_write_mode = write_mode

metadata instance-attribute

metadata = metadata

__getstate__

__getstate__()
Source code in kedro_datasets/spark/spark_hive_dataset.py
201
202
203
204
205
def __getstate__(self) -> None:
    raise pickle.PicklingError(
        "PySpark datasets objects cannot be pickled "
        "or serialised as Python objects."
    )

_create_hive_table

_create_hive_table(data, mode=None)
Source code in kedro_datasets/spark/spark_hive_dataset.py
134
135
136
137
138
139
140
141
def _create_hive_table(self, data: DataFrame, mode: str | None = None):
    _mode: str = mode or self._write_mode
    data.write.saveAsTable(
        self._full_table_address,
        mode=_mode,
        format=self._format,
        **self._save_args,
    )

_describe

_describe()
Source code in kedro_datasets/spark/spark_hive_dataset.py
124
125
126
127
128
129
130
131
132
def _describe(self) -> dict[str, Any]:
    return {
        "database": self._database,
        "table": self._table,
        "write_mode": self._write_mode,
        "table_pk": self._table_pk,
        "partition_by": self._save_args.get("partitionBy"),
        "format": self._format,
    }

_exists

_exists()
Source code in kedro_datasets/spark/spark_hive_dataset.py
194
195
196
197
198
199
def _exists(self) -> bool:
    return (
        get_spark()
        ._jsparkSession.catalog()
        .tableExists(self._database, self._table)
    )

_upsert_save

_upsert_save(data)
Source code in kedro_datasets/spark/spark_hive_dataset.py
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
def _upsert_save(self, data: DataFrame) -> None:
    if not self._exists() or self.load.__wrapped__(self).rdd.isEmpty():  # type: ignore[attr-defined]
        self._create_hive_table(data=data, mode="overwrite")
    else:
        _tmp_colname = "tmp_colname"
        _tmp_row = "tmp_row"
        _w = Window.partitionBy(*self._table_pk).orderBy(col(_tmp_colname).desc())
        df_old = self.load.__wrapped__(self).select("*", lit(1).alias(_tmp_colname))  # type: ignore[attr-defined]
        df_new = data.select("*", lit(2).alias(_tmp_colname))
        df_stacked = df_new.unionByName(df_old).select(
            "*", row_number().over(_w).alias(_tmp_row)
        )
        df_filtered = (
            df_stacked.filter(col(_tmp_row) == 1)
            .drop(_tmp_colname, _tmp_row)
            .checkpoint(eager=self._eager_checkpoint)
        )
        self._create_hive_table(data=df_filtered, mode="overwrite")

_validate_save

_validate_save(data)
Source code in kedro_datasets/spark/spark_hive_dataset.py
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
def _validate_save(self, data: DataFrame):
    # do not validate when the table doesn't exist
    # or if the `write_mode` is set to overwrite
    if (not self._exists()) or self._write_mode == "overwrite":
        return
    hive_dtypes = set(self.load.__wrapped__(self).dtypes)  # type: ignore[attr-defined]
    data_dtypes = set(data.dtypes)
    if data_dtypes != hive_dtypes:
        new_cols = data_dtypes - hive_dtypes
        missing_cols = hive_dtypes - data_dtypes
        raise DatasetError(
            f"Dataset does not match hive table schema.\n"
            f"Present on insert only: {sorted(new_cols)}\n"
            f"Present on schema only: {sorted(missing_cols)}"
        )

load

load()
Source code in kedro_datasets/spark/spark_hive_dataset.py
143
144
def load(self) -> DataFrame:
    return get_spark().read.table(self._full_table_address)

save

save(data)
Source code in kedro_datasets/spark/spark_hive_dataset.py
146
147
148
149
150
151
152
153
154
155
156
157
def save(self, data: DataFrame) -> None:
    self._validate_save(data)
    if self._write_mode == "upsert":
        # check if _table_pk is a subset of df columns
        if not set(self._table_pk) <= set(self.load.__wrapped__(self).columns):  # type: ignore[attr-defined]
            raise DatasetError(
                f"Columns {str(self._table_pk)} selected as primary key(s) not found in "
                f"table {self._full_table_address}"
            )
        self._upsert_save(data=data)
    else:
        self._create_hive_table(data=data)