Skip to content

APIDataset

APIDataset loads/saves data from/to HTTP(S) APIs. It uses the python requests library: https://requests.readthedocs.io/en/latest/

kedro_datasets.api.APIDataset

APIDataset(
    *,
    url,
    method="GET",
    load_args=None,
    save_args=None,
    credentials=None,
    metadata=None,
    response_dataset=None
)

Bases: AbstractDataset[None, Response]

APIDataset loads/saves data from/to HTTP(S) APIs. It uses the python requests library: https://requests.readthedocs.io/en/latest/

Examples:

Using the YAML API:

usda:
  type: api.APIDataset
  url: https://quickstats.nass.usda.gov
  load_args:
      params:
        key: SOME_TOKEN
        format: JSON
        commodity_desc: CORN
        statisticcat_des: YIELD
        agg_level_desc: STATE
        year: 2000

Using the Python API:

>>> from kedro_datasets.api import APIDataset
>>>
>>>
>>> dataset = APIDataset(
...     url="https://api.spaceflightnewsapi.net/v4/articles",
...     load_args={
...         "params": {
...             "news_site": "NASA",
...             "launch": "65896761-b6ca-4df3-9699-e077a360c52a",  # Artemis I
...         }
...     },
... )
>>> data = dataset.load()

APIDataset can also be used to save output on a remote server using HTTP(S) methods:

>>> example_table = '{"col1":["val1", "val2"], "col2":["val3", "val4"]}'
>>>
>>> dataset = APIDataset(
...     method="POST",
...     url="https://dummyjson.com/products/add",
...     save_args={"chunk_size": 1},
... )
>>> dataset.save(example_table)

APIDataset can automatically persist the output of POST and PUT requests via the response_dataset parameter. This is useful for auditing, debugging, or reusing API responses downstream in a pipeline.

When response_dataset is configured, the behavior is:

  • For JSONDataset: stores response.json() (parsed JSON payload)
  • For TextDataset: stores response.text (raw response body)
  • For other datasets (e.g. PickleDataset, MemoryDataset): stores the full requests.Response object

You can later retrieve the persisted response by calling dataset.get_last_response() on the dataset instance.

api_with_response_storage:
  type: api.APIDataset
  url: https://dummyjson.com/products/add
  method: POST
  response_dataset:
    type: json.JSONDataset
    filepath: data/api_response.json

Or using the Python API:

>>> dataset = APIDataset(
...     url="https://dummyjson.com/products/add",
...     method="POST",
...     response_dataset={"type": "json.JSONDataset", "filepath": "response.json"},
... )
>>> response = dataset.save({"key": "value"})
>>> # The response data is automatically saved to response.json

On initialisation, we can specify all the necessary parameters in the save args dictionary. The default HTTP(S) method is POST but PUT is also supported. Two important parameters to keep in mind are timeout and chunk_size. timeout defines how long our program waits for a response after a request. chunk_size, is only used if the input of save method is a list. It will divide the request into chunks of size chunk_size. For example, here we will send two requests each containing one row of our example DataFrame.

If the data passed to the save method is not a list, APIDataset will check if it can be loaded as JSON. If true, it will send the data unchanged in a single request. Otherwise, the _save method will try to dump the data in JSON format and execute the request.

Parameters:

  • url (str) –

    The API URL endpoint.

  • method (str, default: 'GET' ) –

    The method of the request. GET, POST, PUT are the only supported methods

  • load_args (dict[str, Any] | None, default: None ) –

    Additional parameters to be fed to requests.request. https://requests.readthedocs.io/en/latest/api.html#requests.request

  • save_args (dict[str, Any] | None, default: None ) –

    Options for saving data on server. Includes all parameters used during load method. Adds an optional parameter, chunk_size which determines the size of the package sent at each request.

  • credentials (tuple[str, str] | list[str] | AuthBase | None, default: None ) –

    Allows specifying secrets in credentials.yml. Expected format is ('login', 'password') if given as a tuple or list. An AuthBase instance can be provided for more complex cases.

  • metadata (dict[str, Any] | None, default: None ) –

    Any arbitrary metadata. This is ignored by Kedro, but may be consumed by users or external plugins.

  • response_dataset (str | type[AbstractDataset] | dict[str, Any] | None, default: None ) –

    Optional dataset to automatically store API responses. The API response is stored based on the dataset type:

    • JSONDataset: Stores response.json() (parsed JSON data)
    • TextDataset: Stores response.text (response body as string)
    • Other datasets (e.g., PickleDataset, MemoryDataset): Stores the full requests.Response object

    Can be specified as:

    • A string type identifier: "json.JSONDataset"
    • A dict with "type" key: {"type": "json.JSONDataset", "filepath": "..."}
    • A dataset class (advanced usage)

    If None (default), responses are not automatically stored.

Raises:

  • ValueError

    if both auth and credentials are specified or used unsupported RESTful API method.

Source code in kedro_datasets/api/api_dataset.py
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
def __init__(  # noqa: PLR0913
    self,
    *,
    url: str,
    method: str = "GET",
    load_args: dict[str, Any] | None = None,
    save_args: dict[str, Any] | None = None,
    credentials: tuple[str, str] | list[str] | AuthBase | None = None,
    metadata: dict[str, Any] | None = None,
    response_dataset: str | type[AbstractDataset] | dict[str, Any] | None = None,
) -> None:
    """Creates a new instance of ``APIDataset`` to fetch data from an API endpoint.

    Args:
        url: The API URL endpoint.
        method: The method of the request. GET, POST, PUT are the only supported
            methods
        load_args: Additional parameters to be fed to requests.request.
            https://requests.readthedocs.io/en/latest/api.html#requests.request
        save_args: Options for saving data on server. Includes all parameters used
            during load method. Adds an optional parameter, ``chunk_size`` which
            determines the size of the package sent at each request.
        credentials: Allows specifying secrets in credentials.yml.
            Expected format is ``('login', 'password')`` if given as a tuple or
            list. An ``AuthBase`` instance can be provided for more complex cases.
        metadata: Any arbitrary metadata.
            This is ignored by Kedro, but may be consumed by users or external plugins.
        response_dataset: Optional dataset to automatically store API responses.
            The API response is stored based on the dataset type:

            - `JSONDataset`: Stores `response.json()` (parsed JSON data)
            - `TextDataset`: Stores `response.text` (response body as string)
            - Other datasets (e.g., `PickleDataset`, `MemoryDataset`): Stores the
              full `requests.Response` object

            Can be specified as:

            - A string type identifier: `"json.JSONDataset"`
            - A dict with `"type"` key: `{"type": "json.JSONDataset", "filepath": "..."}`
            - A dataset class (advanced usage)

            If `None` (default), responses are not automatically stored.

    Raises:
        ValueError: if both ``auth`` and ``credentials`` are specified or used
            unsupported RESTful API method.
    """
    super().__init__()

    if method == "GET":
        self._params = load_args or {}

    elif method in ["PUT", "POST"]:
        self._params = deepcopy(self.DEFAULT_SAVE_ARGS)
        if save_args is not None:
            self._params.update(save_args)
        self._chunk_size = self._params.pop("chunk_size", 1)
    else:
        raise ValueError("Only GET, POST and PUT methods are supported")

    self._param_auth = self._params.pop("auth", None)

    if credentials is not None and self._param_auth is not None:
        raise ValueError("Cannot specify both auth and credentials.")

    self._auth = credentials or self._param_auth

    if "cert" in self._params:
        self._params["cert"] = self._convert_type(self._params["cert"])

    if "timeout" in self._params:
        self._params["timeout"] = self._convert_type(self._params["timeout"])

    self._request_args: dict[str, Any] = {
        "url": url,
        "method": method,
        "auth": self._convert_type(self._auth),
        **self._params,
    }

    self.metadata = metadata

    # Initialize response dataset if provided
    self._response_dataset_type: type[AbstractDataset[Any, Any]] | None = None
    self._response_dataset_config: dict[str, Any] | None = None
    self._response_dataset_instance: AbstractDataset[Any, Any] | None = None

    if response_dataset is not None:
        dataset_config = (
            response_dataset
            if isinstance(response_dataset, dict)
            else {"type": response_dataset}
        )
        (
            self._response_dataset_type,
            self._response_dataset_config,
        ) = parse_dataset_definition(dataset_config)

DEFAULT_SAVE_ARGS class-attribute instance-attribute

DEFAULT_SAVE_ARGS = {
    "params": None,
    "headers": None,
    "auth": None,
    "json": None,
    "timeout": 60,
    "chunk_size": 100,
}

_auth instance-attribute

_auth = credentials or _param_auth

_chunk_size instance-attribute

_chunk_size = pop('chunk_size', 1)

_param_auth instance-attribute

_param_auth = pop('auth', None)

_params instance-attribute

_params = load_args or {}

_request_args instance-attribute

_request_args = {
    "url": url,
    "method": method,
    "auth": _convert_type(_auth),
    None: _params,
}

_response_dataset property

_response_dataset

Lazily create and cache the response dataset instance.

_response_dataset_config instance-attribute

_response_dataset_config = None

_response_dataset_instance instance-attribute

_response_dataset_instance = None

_response_dataset_type instance-attribute

_response_dataset_type = None

metadata instance-attribute

metadata = metadata

_convert_type staticmethod

_convert_type(value)

From the Data Catalog, iterables are provided as Lists. However, for some parameters in the Python requests library, only Tuples are allowed.

Source code in kedro_datasets/api/api_dataset.py
224
225
226
227
228
229
230
231
232
233
@staticmethod
def _convert_type(value: Any):
    """
    From the Data Catalog, iterables are provided as Lists.
    However, for some parameters in the Python requests library,
    only Tuples are allowed.
    """
    if isinstance(value, list):
        return tuple(value)
    return value

_describe

_describe()
Source code in kedro_datasets/api/api_dataset.py
250
251
252
253
254
255
256
257
258
259
def _describe(self) -> dict[str, Any]:
    # prevent auth from logging
    request_args_cp = self._request_args.copy()
    request_args_cp.pop("auth", None)

    result = dict(request_args_cp)
    if self._response_dataset is not None:
        result["response_dataset"] = self._response_dataset._describe()

    return result

_execute_request

_execute_request(session)
Source code in kedro_datasets/api/api_dataset.py
261
262
263
264
265
266
267
268
269
270
def _execute_request(self, session: Session) -> requests.Response:
    try:
        response = session.request(**self._request_args)
        response.raise_for_status()
    except requests.exceptions.HTTPError as exc:
        raise DatasetError("Failed to fetch data", exc) from exc
    except OSError as exc:
        raise DatasetError("Failed to connect to the remote server") from exc

    return response

_execute_save_request

_execute_save_request(json_data)
Source code in kedro_datasets/api/api_dataset.py
304
305
306
307
308
309
310
311
312
313
314
315
316
317
def _execute_save_request(self, json_data: Any) -> requests.Response:
    try:
        self._request_args["json"] = json_.loads(json_data)
    except TypeError:
        self._request_args["json"] = json_data
    try:
        response = requests.request(**self._request_args)
        response.raise_for_status()
    except requests.exceptions.HTTPError as exc:
        raise DatasetError("Failed to send data", exc) from exc

    except OSError as exc:
        raise DatasetError("Failed to connect to the remote server") from exc
    return response

_execute_save_with_chunks

_execute_save_with_chunks(json_data)
Source code in kedro_datasets/api/api_dataset.py
291
292
293
294
295
296
297
298
299
300
301
302
def _execute_save_with_chunks(
    self,
    json_data: list[dict[str, Any]],
) -> requests.Response:
    chunk_size = self._chunk_size
    n_chunks = math.ceil(len(json_data) / chunk_size)

    for i in range(n_chunks):
        send_data = json_data[i * chunk_size : (i + 1) * chunk_size]
        response = self._execute_save_request(json_data=send_data)

    return response

_exists

_exists()
Source code in kedro_datasets/api/api_dataset.py
342
343
344
345
346
347
348
349
def _exists(self) -> bool:
    if self._request_args["method"] != "GET":
        return False

    with sessions.Session() as session:
        response = self._execute_request(session)

    return response.ok

get_last_response

get_last_response()
Source code in kedro_datasets/api/api_dataset.py
272
273
274
275
276
277
278
def get_last_response(self) -> Any:
    if self._response_dataset is None:
        raise DatasetError(
            "No response_dataset configured; cannot retrieve persisted response."
        )

    return self._response_dataset.load()  # type: ignore[return-value]

load

load()
Source code in kedro_datasets/api/api_dataset.py
280
281
282
283
284
285
286
287
288
289
def load(self) -> Any:
    if self._request_args["method"] != "GET":
        raise DatasetError(
            "Only GET method is supported for load()."
            "Use save() to send data or get_last_response() to retrieve "
            "a persisted response."
        )

    with sessions.Session() as session:
        return self._execute_request(session)

save

save(data)
Source code in kedro_datasets/api/api_dataset.py
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
def save(self, data: Any) -> requests.Response:  # type: ignore[override]
    if self._request_args["method"] in ["PUT", "POST"]:
        if isinstance(data, list):
            response: requests.Response = self._execute_save_with_chunks(
                json_data=data
            )
        else:
            response: requests.Response = self._execute_save_request(json_data=data)

        if self._response_dataset is not None:
            if isinstance(self._response_dataset, JSONDataset):
                extracted_data = response.json()
            elif isinstance(self._response_dataset, TextDataset):
                extracted_data = response.text
            else:
                extracted_data = response

            self._response_dataset.save(extracted_data)

        return response

    raise DatasetError("Use PUT or POST methods for save")