Skip to content

APIDataset

APIDataset loads/saves data from/to HTTP(S) APIs. It uses the python requests library: https://requests.readthedocs.io/en/latest/

kedro_datasets.api.APIDataset

APIDataset(
    *,
    url,
    method="GET",
    load_args=None,
    save_args=None,
    credentials=None,
    metadata=None
)

Bases: AbstractDataset[None, Response]

APIDataset loads/saves data from/to HTTP(S) APIs. It uses the python requests library: https://requests.readthedocs.io/en/latest/

Examples:

Using the YAML API:

usda:
  type: api.APIDataset
  url: https://quickstats.nass.usda.gov
  load_args:
      params:
        key: SOME_TOKEN
        format: JSON
        commodity_desc: CORN
        statisticcat_des: YIELD
        agg_level_desc: STATE
        year: 2000

Using the Python API:

>>> from kedro_datasets.api import APIDataset
>>>
>>>
>>> dataset = APIDataset(
...     url="https://api.spaceflightnewsapi.net/v4/articles",
...     load_args={
...         "params": {
...             "news_site": "NASA",
...             "launch": "65896761-b6ca-4df3-9699-e077a360c52a",  # Artemis I
...         }
...     },
... )
>>> data = dataset.load()

APIDataset can also be used to save output on a remote server using HTTP(S) methods:

>>> example_table = '{"col1":["val1", "val2"], "col2":["val3", "val4"]}'
>>>
>>> dataset = APIDataset(
...     method="POST",
...     url="https://dummyjson.com/products/add",
...     save_args={"chunk_size": 1},
... )
>>> dataset.save(example_table)

On initialisation, we can specify all the necessary parameters in the save args dictionary. The default HTTP(S) method is POST but PUT is also supported. Two important parameters to keep in mind are timeout and chunk_size. timeout defines how long our program waits for a response after a request. chunk_size, is only used if the input of save method is a list. It will divide the request into chunks of size chunk_size. For example, here we will send two requests each containing one row of our example DataFrame.

If the data passed to the save method is not a list, APIDataset will check if it can be loaded as JSON. If true, it will send the data unchanged in a single request. Otherwise, the _save method will try to dump the data in JSON format and execute the request.

Parameters:

  • url (str) –

    The API URL endpoint.

  • method (str, default: 'GET' ) –

    The method of the request. GET, POST, PUT are the only supported methods

  • load_args (dict[str, Any] | None, default: None ) –

    Additional parameters to be fed to requests.request. https://requests.readthedocs.io/en/latest/api.html#requests.request

  • save_args (dict[str, Any] | None, default: None ) –

    Options for saving data on server. Includes all parameters used during load method. Adds an optional parameter, chunk_size which determines the size of the package sent at each request.

  • credentials (tuple[str, str] | list[str] | AuthBase | None, default: None ) –

    Allows specifying secrets in credentials.yml. Expected format is ('login', 'password') if given as a tuple or list. An AuthBase instance can be provided for more complex cases.

  • metadata (dict[str, Any] | None, default: None ) –

    Any arbitrary metadata. This is ignored by Kedro, but may be consumed by users or external plugins.

Raises:

  • ValueError

    if both auth and credentials are specified or used unsupported RESTful API method.

Source code in kedro_datasets/api/api_dataset.py
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
def __init__(  # noqa: PLR0913
    self,
    *,
    url: str,
    method: str = "GET",
    load_args: dict[str, Any] | None = None,
    save_args: dict[str, Any] | None = None,
    credentials: tuple[str, str] | list[str] | AuthBase | None = None,
    metadata: dict[str, Any] | None = None,
) -> None:
    """Creates a new instance of ``APIDataset`` to fetch data from an API endpoint.

    Args:
        url: The API URL endpoint.
        method: The method of the request. GET, POST, PUT are the only supported
            methods
        load_args: Additional parameters to be fed to requests.request.
            https://requests.readthedocs.io/en/latest/api.html#requests.request
        save_args: Options for saving data on server. Includes all parameters used
            during load method. Adds an optional parameter, ``chunk_size`` which
            determines the size of the package sent at each request.
        credentials: Allows specifying secrets in credentials.yml.
            Expected format is ``('login', 'password')`` if given as a tuple or
            list. An ``AuthBase`` instance can be provided for more complex cases.
        metadata: Any arbitrary metadata.
            This is ignored by Kedro, but may be consumed by users or external plugins.

    Raises:
        ValueError: if both ``auth`` and ``credentials`` are specified or used
            unsupported RESTful API method.
    """
    super().__init__()

    # GET method means load
    if method == "GET":
        self._params = load_args or {}

    # PUT, POST means save
    elif method in ["PUT", "POST"]:
        self._params = deepcopy(self.DEFAULT_SAVE_ARGS)
        if save_args is not None:
            self._params.update(save_args)
        self._chunk_size = self._params.pop("chunk_size", 1)
    else:
        raise ValueError("Only GET, POST and PUT methods are supported")

    self._param_auth = self._params.pop("auth", None)

    if credentials is not None and self._param_auth is not None:
        raise ValueError("Cannot specify both auth and credentials.")

    self._auth = credentials or self._param_auth

    if "cert" in self._params:
        self._params["cert"] = self._convert_type(self._params["cert"])

    if "timeout" in self._params:
        self._params["timeout"] = self._convert_type(self._params["timeout"])

    self._request_args: dict[str, Any] = {
        "url": url,
        "method": method,
        "auth": self._convert_type(self._auth),
        **self._params,
    }

    self.metadata = metadata

DEFAULT_SAVE_ARGS class-attribute instance-attribute

DEFAULT_SAVE_ARGS = {
    "params": None,
    "headers": None,
    "auth": None,
    "json": None,
    "timeout": 60,
    "chunk_size": 100,
}

_auth instance-attribute

_auth = credentials or _param_auth

_chunk_size instance-attribute

_chunk_size = pop('chunk_size', 1)

_param_auth instance-attribute

_param_auth = pop('auth', None)

_params instance-attribute

_params = load_args or {}

_request_args instance-attribute

_request_args = {
    "url": url,
    "method": method,
    "auth": _convert_type(_auth),
    None: _params,
}

metadata instance-attribute

metadata = metadata

_convert_type staticmethod

_convert_type(value)

From the Data Catalog, iterables are provided as Lists. However, for some parameters in the Python requests library, only Tuples are allowed.

Source code in kedro_datasets/api/api_dataset.py
156
157
158
159
160
161
162
163
164
165
@staticmethod
def _convert_type(value: Any):
    """
    From the Data Catalog, iterables are provided as Lists.
    However, for some parameters in the Python requests library,
    only Tuples are allowed.
    """
    if isinstance(value, list):
        return tuple(value)
    return value

_describe

_describe()
Source code in kedro_datasets/api/api_dataset.py
167
168
169
170
171
def _describe(self) -> dict[str, Any]:
    # prevent auth from logging
    request_args_cp = self._request_args.copy()
    request_args_cp.pop("auth", None)
    return request_args_cp

_execute_request

_execute_request(session)
Source code in kedro_datasets/api/api_dataset.py
173
174
175
176
177
178
179
180
181
182
def _execute_request(self, session: Session) -> requests.Response:
    try:
        response = session.request(**self._request_args)
        response.raise_for_status()
    except requests.exceptions.HTTPError as exc:
        raise DatasetError("Failed to fetch data", exc) from exc
    except OSError as exc:
        raise DatasetError("Failed to connect to the remote server") from exc

    return response

_execute_save_request

_execute_save_request(json_data)
Source code in kedro_datasets/api/api_dataset.py
204
205
206
207
208
209
210
211
212
213
214
215
216
217
def _execute_save_request(self, json_data: Any) -> requests.Response:
    try:
        self._request_args["json"] = json_.loads(json_data)
    except TypeError:
        self._request_args["json"] = json_data
    try:
        response = requests.request(**self._request_args)
        response.raise_for_status()
    except requests.exceptions.HTTPError as exc:
        raise DatasetError("Failed to send data", exc) from exc

    except OSError as exc:
        raise DatasetError("Failed to connect to the remote server") from exc
    return response

_execute_save_with_chunks

_execute_save_with_chunks(json_data)
Source code in kedro_datasets/api/api_dataset.py
191
192
193
194
195
196
197
198
199
200
201
202
def _execute_save_with_chunks(
    self,
    json_data: list[dict[str, Any]],
) -> requests.Response:
    chunk_size = self._chunk_size
    n_chunks = len(json_data) // chunk_size + 1

    for i in range(n_chunks):
        send_data = json_data[i * chunk_size : (i + 1) * chunk_size]
        response = self._execute_save_request(json_data=send_data)

    return response

_exists

_exists()
Source code in kedro_datasets/api/api_dataset.py
228
229
230
231
def _exists(self) -> bool:
    with sessions.Session() as session:
        response = self._execute_request(session)
    return response.ok

load

load()
Source code in kedro_datasets/api/api_dataset.py
184
185
186
187
188
189
def load(self) -> requests.Response:
    if self._request_args["method"] == "GET":
        with sessions.Session() as session:
            return self._execute_request(session)

    raise DatasetError("Only GET method is supported for load")

save

save(data)
Source code in kedro_datasets/api/api_dataset.py
219
220
221
222
223
224
225
226
def save(self, data: Any) -> requests.Response:  # type: ignore[override]
    if self._request_args["method"] in ["PUT", "POST"]:
        if isinstance(data, list):
            return self._execute_save_with_chunks(json_data=data)

        return self._execute_save_request(json_data=data)

    raise DatasetError("Use PUT or POST methods for save")