SparkJDBCDataset¶
SparkJDBCDataset loads and saves data to JDBC sources using Apache Spark.
kedro_datasets.spark.SparkJDBCDataset ¶
SparkJDBCDataset(
*,
url,
table,
credentials=None,
load_args=None,
save_args=None,
metadata=None
)
Bases: AbstractDataset[DataFrame, DataFrame]
SparkJDBCDataset loads data from a database table accessible
via JDBC URL url and connection properties and saves the content of
a PySpark DataFrame to an external database table via JDBC. It uses
pyspark.sql.DataFrameReader and pyspark.sql.DataFrameWriter
internally, so it supports all allowed PySpark options on jdbc.
Examples:
Using the YAML API:
weather:
type: spark.SparkJDBCDataset
table: weather_table
url: jdbc:postgresql://localhost/test
credentials: db_credentials
load_args:
properties:
driver: org.postgresql.Driver
save_args:
properties:
driver: org.postgresql.Driver
Using the Python API:
>>> import pandas as pd
>>> from kedro_datasets.spark import SparkJDBCDataset
>>> from pyspark.sql import SparkSession
>>>
>>> spark = SparkSession.builder.getOrCreate()
>>> data = spark.createDataFrame(
... pd.DataFrame({"col1": [1, 2], "col2": [4, 5], "col3": [5, 6]})
... )
>>>
>>> url = "jdbc:postgresql://localhost/test"
>>> table = "table_a"
>>> connection_properties = {"driver": "org.postgresql.Driver"}
>>> dataset = SparkJDBCDataset(
... url=url,
... table=table,
... credentials={"user": "scott", "password": "tiger"},
... load_args={"properties": connection_properties},
... save_args={"properties": connection_properties},
... )
>>>
>>> dataset.save(data)
>>> reloaded = dataset.load()
>>> assert data.toPandas().equals(reloaded.toPandas())
Parameters:
-
url(str) –A JDBC URL of the form
jdbc:subprotocol:subname. -
table(str) –The name of the table to load or save data to.
-
credentials(dict[str, Any] | None, default:None) –A dictionary of JDBC database connection arguments. Normally at least properties
userandpasswordwith their corresponding values. It updatespropertiesparameter inload_argsandsave_argsin case it is provided. -
load_args(dict[str, Any] | None, default:None) –Provided to underlying PySpark
jdbcfunction along with the JDBC URL and the name of the table. To find all supported arguments, see here: https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrameWriter.jdbc.html -
save_args(dict[str, Any] | None, default:None) –Provided to underlying PySpark
jdbcfunction along with the JDBC URL and the name of the table. To find all supported arguments, see here: https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrameWriter.jdbc.html -
metadata(dict[str, Any] | None, default:None) –Any arbitrary metadata. This is ignored by Kedro, but may be consumed by users or external plugins.
Raises:
-
DatasetError–When either
urlortableis empty or when a property is provided with a None value.
Source code in kedro_datasets/spark/spark_jdbc_dataset.py
67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 | |
_describe ¶
_describe()
Source code in kedro_datasets/spark/spark_jdbc_dataset.py
141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 | |
load ¶
load()
Source code in kedro_datasets/spark/spark_jdbc_dataset.py
164 165 | |
save ¶
save(data)
Source code in kedro_datasets/spark/spark_jdbc_dataset.py
167 168 | |