Skip to content

SnowparkTableDataset

SnowparkTableDataset loads and saves data to Snowflake tables using the Snowpark API.

kedro_datasets.snowflake.SnowparkTableDataset

SnowparkTableDataset(
    *,
    table_name,
    schema=None,
    database=None,
    load_args=None,
    save_args=None,
    credentials=None,
    session=None,
    metadata=None
)

Bases: AbstractDataset

SnowparkTableDataset loads and saves Snowpark DataFrames.

As of October 2024, the Snowpark connector works with Python 3.9, 3.10, and 3.11. Python 3.12 is not supported yet.

Examples:

Using the YAML API:

weather:
  type: kedro_datasets.snowflake.SnowparkTableDataset
  table_name: "weather_data"
  database: "meteorology"
  schema: "observations"
  credentials: db_credentials
  save_args:
    mode: overwrite
    column_order: name
    table_type: ''

You can skip everything but "table_name" if the database and schema are provided via credentials. This allows catalog entries to be shorter when all Snowflake tables are in the same database and schema. Values in the dataset definition take priority over those defined in credentials.

The credentials file provides all connection attributes. The catalog entry for "weather" reuses the credentials parameters, while the "polygons" catalog entry reuses all credentials parameters except for specifying a different schema. The second example demonstrates the use of externalbrowser authentication.

catalog.yml:

weather:
  type: kedro_datasets.snowflake.SnowparkTableDataset
  table_name: "weather_data"
  database: "meteorology"
  schema: "observations"
  credentials: snowflake_client
  save_args:
    mode: overwrite
    column_order: name
    table_type: ''

polygons:
  type: kedro_datasets.snowflake.SnowparkTableDataset
  table_name: "geopolygons"
  credentials: snowflake_client
  schema: "geodata"

credentials.yml:

snowflake_client:
  account: 'ab12345.eu-central-1'
  port: 443
  warehouse: "datascience_wh"
  database: "detailed_data"
  schema: "observations"
  user: "service_account_abc"
  password: "supersecret"

credentials.yml (with externalbrowser authentication):

snowflake_client:
  account: 'ab12345.eu-central-1'
  port: 443
  warehouse: "datascience_wh"
  database: "detailed_data"
  schema: "observations"
  user: "john_doe@wdomain.com"
  authenticator: "externalbrowser"

Parameters:

  • table_name (str) –

    The table name to load or save data to.

  • schema (str | None, default: None ) –

    Name of the schema where table_name is. Optional as can be provided as part of credentials dictionary. Argument value takes priority over one provided in credentials if any.

  • database (str | None, default: None ) –

    Name of the database where schema is. Optional as can be provided as part of credentials dictionary. Argument value takes priority over one provided in credentials if any.

  • load_args (dict[str, Any] | None, default: None ) –

    Currently not used

  • save_args (dict[str, Any] | None, default: None ) –

    Provided to underlying snowpark save_as_table To find all supported arguments, see here: https://docs.snowflake.com/en/developer-guide/snowpark/reference/python/api/snowflake.snowpark.DataFrameWriter.saveAsTable.html

  • credentials (dict[str, Any] | None, default: None ) –

    A dictionary with a snowpark connection string. To find all supported arguments, see here: https://docs.snowflake.com/en/user-guide/python-connector-api.html#connect

  • metadata (dict[str, Any] | None, default: None ) –

    Any arbitrary metadata. This is ignored by Kedro, but may be consumed by users or external plugins.

Source code in kedro_datasets/snowflake/snowpark_dataset.py
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
def __init__(  # noqa: PLR0913
    self,
    *,
    table_name: str,
    schema: str | None = None,
    database: str | None = None,
    load_args: dict[str, Any] | None = None,
    save_args: dict[str, Any] | None = None,
    credentials: dict[str, Any] | None = None,
    session: Session | None = None,
    metadata: dict[str, Any] | None = None,
) -> None:
    """
    Creates a new instance of ``SnowparkTableDataset``.

    Args:
        table_name: The table name to load or save data to.
        schema: Name of the schema where ``table_name`` is.
            Optional as can be provided as part of ``credentials``
            dictionary. Argument value takes priority over one provided
            in ``credentials`` if any.
        database: Name of the database where ``schema`` is.
            Optional as can be provided as part of ``credentials``
            dictionary. Argument value takes priority over one provided
            in ``credentials`` if any.
        load_args: Currently not used
        save_args: Provided to underlying snowpark ``save_as_table``
            To find all supported arguments, see here:
            https://docs.snowflake.com/en/developer-guide/snowpark/reference/python/api/snowflake.snowpark.DataFrameWriter.saveAsTable.html
        credentials: A dictionary with a snowpark connection string.
            To find all supported arguments, see here:
            https://docs.snowflake.com/en/user-guide/python-connector-api.html#connect
        metadata: Any arbitrary metadata.
            This is ignored by Kedro, but may be consumed by users or external plugins.
    """

    if not table_name:
        raise DatasetError("'table_name' argument cannot be empty.")

    if not credentials:
        raise DatasetError("'credentials' argument cannot be empty.")

    if not database:
        if not ("database" in credentials and credentials["database"]):
            raise DatasetError(
                "'database' must be provided by credentials or dataset."
            )
        database = credentials["database"]

    if not schema:
        if not ("schema" in credentials and credentials["schema"]):
            raise DatasetError(
                "'schema' must be provided by credentials or dataset."
            )
        schema = credentials["schema"]

    # Handle default load and save arguments
    self._load_args = {**self.DEFAULT_LOAD_ARGS, **(load_args or {})}
    self._save_args = {**self.DEFAULT_SAVE_ARGS, **(save_args or {})}

    self._table_name = table_name
    self._database = database
    self._schema = schema

    connection_parameters = credentials
    connection_parameters.update(
        {"database": self._database, "schema": self._schema}
    )
    self._connection_parameters = connection_parameters
    self._session = session

    self.metadata = metadata

DEFAULT_LOAD_ARGS class-attribute instance-attribute

DEFAULT_LOAD_ARGS = {}

DEFAULT_SAVE_ARGS class-attribute instance-attribute

DEFAULT_SAVE_ARGS = {}

_SINGLE_PROCESS class-attribute instance-attribute

_SINGLE_PROCESS = True

_connection_parameters instance-attribute

_connection_parameters = connection_parameters

_database instance-attribute

_database = database

_load_args instance-attribute

_load_args = {
    None: DEFAULT_LOAD_ARGS,
    None: load_args or {},
}

_save_args instance-attribute

_save_args = {
    None: DEFAULT_SAVE_ARGS,
    None: save_args or {},
}

_schema instance-attribute

_schema = schema

_session instance-attribute

_session = session

_table_name instance-attribute

_table_name = table_name

metadata instance-attribute

metadata = metadata

session property

session

Retrieve or create a session. Returns: Session: The current session associated with the object.

_describe

_describe()
Source code in kedro_datasets/snowflake/snowpark_dataset.py
180
181
182
183
184
185
def _describe(self) -> dict[str, Any]:
    return {
        "table_name": self._table_name,
        "database": self._database,
        "schema": self._schema,
    }

_exists

_exists()

Check if a specified table exists in the database.

Returns:

  • bool

    True if the table exists, False otherwise.

Source code in kedro_datasets/snowflake/snowpark_dataset.py
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
def _exists(self) -> bool:
    """
    Check if a specified table exists in the database.

    Returns:
        bool: True if the table exists, False otherwise.
    """
    try:
        self.session.table(
            f"{self._database}.{self._schema}.{self._table_name}"
        ).show()
        return True
    except Exception as e:
        logger.debug(f"Table {self._table_name} does not exist: {e}")
        return False

_get_session staticmethod

_get_session(connection_parameters)

Given a connection string, create singleton connection to be used across all instances of SnowparkTableDataset that need to connect to the same source. connection_parameters is a dictionary of any values supported by snowflake python connector: https://docs.snowflake.com/en/user-guide/python-connector-api.html#connect example: connection_parameters = { "account": "", "user": "", "password": "", (optional) "role": "", (optional) "warehouse": "", (optional) "database": "", (optional) "schema": "", (optional) "authenticator: "" (optional) }

Source code in kedro_datasets/snowflake/snowpark_dataset.py
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
@staticmethod
def _get_session(connection_parameters) -> Session:
    """
    Given a connection string, create singleton connection
    to be used across all instances of `SnowparkTableDataset` that
    need to connect to the same source.
    connection_parameters is a dictionary of any values
    supported by snowflake python connector:
            https://docs.snowflake.com/en/user-guide/python-connector-api.html#connect
        example:
        connection_parameters = {
            "account": "",
            "user": "",
            "password": "", (optional)
            "role": "", (optional)
            "warehouse": "", (optional)
            "database": "", (optional)
            "schema": "", (optional)
            "authenticator: "" (optional)
            }
    """
    try:
        logger.debug("Trying to reuse active snowpark session...")
        session = sp_context.get_active_session()
    except sp_exceptions.SnowparkSessionException:
        logger.debug("No active snowpark session found. Creating...")
        session = Session.builder.configs(connection_parameters).create()
    return session

_validate_and_get_table_name

_validate_and_get_table_name()

Validate that all parts of the table name are not None and join them into a string.

Parameters:

  • parts (list[str | None]) –

    The list containing database, schema, and table name.

Returns:

  • str

    The joined table name in the format 'database.schema.table'.

Raises:

  • ValueError

    If any part of the table name is None.

Source code in kedro_datasets/snowflake/snowpark_dataset.py
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
def _validate_and_get_table_name(self) -> str:
    """
    Validate that all parts of the table name are not None and join them into a string.

    Args:
        parts (list[str | None]): The list containing database, schema, and table name.

    Returns:
        str: The joined table name in the format 'database.schema.table'.

    Raises:
        ValueError: If any part of the table name is None.
    """
    parts: list[str | None] = [self._database, self._schema, self._table_name]
    if any(part is None or part == "" for part in parts):
        raise DatasetError("Database, schema or table name cannot be None or empty")
    parts_str = cast(list[str], parts)  # make linting happy
    return ".".join(parts_str)

load

load()

Load data from a specified database table.

Returns:

  • DataFrame

    The loaded data as a Snowpark DataFrame.

Source code in kedro_datasets/snowflake/snowpark_dataset.py
227
228
229
230
231
232
233
234
def load(self) -> DataFrame:
    """
    Load data from a specified database table.

    Returns:
        DataFrame: The loaded data as a Snowpark DataFrame.
    """
    return self.session.table(self._validate_and_get_table_name())

save

save(data)

Check if the data is a Snowpark DataFrame or a Pandas DataFrame, convert it to a Snowpark DataFrame if needed, and save it to the specified table.

Parameters:

  • data (DataFrame | DataFrame) –

    The data to save.

Source code in kedro_datasets/snowflake/snowpark_dataset.py
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
def save(self, data: pd.DataFrame | DataFrame) -> None:
    """
    Check if the data is a Snowpark DataFrame or a Pandas DataFrame,
    convert it to a Snowpark DataFrame if needed, and save it to the specified table.

    Args:
        data (pd.DataFrame | DataFrame): The data to save.
    """
    if isinstance(data, pd.DataFrame):
        snowpark_df = self.session.create_dataframe(data)
    elif isinstance(data, DataFrame):
        snowpark_df = data
    else:
        raise DatasetError(
            f"Data of type {type(data)} is not supported for saving."
        )

    snowpark_df.write.save_as_table(
        self._validate_and_get_table_name(), **self._save_args
    )