SparkHiveDataset¶
SparkHiveDataset loads and saves data to Hive tables using Apache Spark.
kedro_datasets.spark.SparkHiveDataset ¶
SparkHiveDataset(
*,
database,
table,
write_mode="errorifexists",
table_pk=None,
save_args=None,
metadata=None
)
Bases: AbstractDataset[DataFrame, DataFrame]
SparkHiveDataset loads and saves Spark dataframes stored on Hive.
This dataset also handles some incompatible file types such as using partitioned parquet on
hive which will not normally allow upserts to existing data without a complete replacement
of the existing file/partition.
This Dataset has some key assumptions:
- Schemas do not change during the pipeline run (defined PKs must be present for the duration of the pipeline).
- Tables are not being externally modified during upserts. The upsert method is NOT ATOMIC to external changes to the target table while executing. Upsert methodology works by leveraging Spark DataFrame execution plan checkpointing.
Examples:
Using the YAML API:
hive_dataset:
type: spark.SparkHiveDataset
database: hive_database
table: table_name
write_mode: overwrite
Using the Python API:
>>> from kedro_datasets.spark import SparkHiveDataset
>>> from pyspark.sql import SparkSession
>>> from pyspark.sql.types import StructField, StringType, IntegerType, StructType
>>>
>>> schema = StructType(
... [StructField("name", StringType(), True), StructField("age", IntegerType(), True)]
... )
>>> data = [("Alex", 31), ("Bob", 12), ("Clarke", 65), ("Dave", 29)]
>>> spark_df = SparkSession.builder.getOrCreate().createDataFrame(data, schema)
>>>
>>> dataset = SparkHiveDataset(
... database="test_database", table="test_table", write_mode="overwrite"
... )
>>> dataset.save(spark_df)
>>> reloaded = dataset.load()
>>> reloaded.take(4)
Parameters:
-
database(str) –The name of the hive database.
-
table(str) –The name of the table within the database.
-
write_mode(str, default:'errorifexists') –insert,upsertoroverwriteare supported. -
table_pk(list[str] | None, default:None) –If performing an upsert, this identifies the primary key columns used to resolve preexisting data. Is required for
write_mode="upsert". -
save_args(dict[str, Any] | None, default:None) –Optional mapping of any options, passed to the
DataFrameWriter.saveAsTableas kwargs. Key example of this ispartitionBywhich allows data partitioning on a list of column names. OtherHiveOptionscan be found here: https://spark.apache.org/docs/latest/sql-data-sources-hive-tables.html#specifying-storage-format-for-hive-tables -
metadata(dict[str, Any] | None, default:None) –Any arbitrary metadata. This is ignored by Kedro, but may be consumed by users or external plugins.
Note
For users leveraging the upsert functionality,
a checkpoint directory must be set, e.g. using
spark.sparkContext.setCheckpointDir("/path/to/dir")
or directly in the Spark conf folder.
Raises:
-
DatasetError–Invalid configuration supplied
Source code in kedro_datasets/spark/spark_hive_dataset.py
65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 | |
__getstate__ ¶
__getstate__()
Source code in kedro_datasets/spark/spark_hive_dataset.py
201 202 203 204 205 | |
_create_hive_table ¶
_create_hive_table(data, mode=None)
Source code in kedro_datasets/spark/spark_hive_dataset.py
134 135 136 137 138 139 140 141 | |
_describe ¶
_describe()
Source code in kedro_datasets/spark/spark_hive_dataset.py
124 125 126 127 128 129 130 131 132 | |
_exists ¶
_exists()
Source code in kedro_datasets/spark/spark_hive_dataset.py
194 195 196 197 198 199 | |
_upsert_save ¶
_upsert_save(data)
Source code in kedro_datasets/spark/spark_hive_dataset.py
159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 | |
_validate_save ¶
_validate_save(data)
Source code in kedro_datasets/spark/spark_hive_dataset.py
178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 | |
load ¶
load()
Source code in kedro_datasets/spark/spark_hive_dataset.py
143 144 | |
save ¶
save(data)
Source code in kedro_datasets/spark/spark_hive_dataset.py
146 147 148 149 150 151 152 153 154 155 156 157 | |