Skip to content

opik.EvaluationDataset

kedro_datasets_experimental.opik.EvaluationDataset

EvaluationDataset(
    dataset_name,
    credentials,
    filepath=None,
    sync_policy="local",
    metadata=None,
)

Bases: AbstractDataset

Kedro dataset for Opik evaluation datasets.

Connects to an Opik evaluation dataset and returns an opik.Dataset on load(), which can be passed to opik.evaluation.evaluate() to run experiments. Supports an optional local JSON/YAML file as the authoring surface for evaluation items.

On load / save behaviour:

  • On load: Creates the remote dataset if it does not exist, synchronises based on sync_policy, and returns an opik.Dataset.
  • On save: Inserts all items to the remote dataset via Opik's upsert-by-ID API. Items with a UUID v7 id update the existing remote row in-place; items without a UUID v7 id create a new remote row on every call. In local mode, items are also merged into the local file (new items take precedence). In remote mode, only the remote insert occurs.

Item format:

The local file and save() data must be a list of dicts. Each item accepts the following keys:

  • input (required) — the evaluation input payload.
  • id — identifier used for local deduplication. The upload behaviour depends on whether id is a valid UUID v7:

    • Valid UUID v7: forwarded to Opik. Opik's API upserts by item ID — the first sync creates the remote row; subsequent syncs update that same row in-place if the content has changed. The remote row keeps the same UUID across all syncs. Whenever content changes, the existing remote row is updated in-place, while no new row is created.
    • All other values (human-readable strings, UUIDs of other versions, None, empty string, or no id key): stripped before upload. Opik auto-generates a new UUID v7. Unchanged content is deduplicated by content hash (no-op), but changed content creates a new remote row while the previous one remains, leading to row accumulation over time.
  • expected_output — ground-truth value for scoring.

  • metadata — arbitrary metadata dict attached to the item.

[
  {
    "id": "q1",
    "input": {"text": "cancel my order"},
    "expected_output": "cancel_order",
    "metadata": {"source": "production"}
  }
]
("q1" is used for local deduplication only, as it is not a UUID v7 and will be stripped on upload)

Sync policies:

  • local (default): The local file is the source of truth. On load(), all local items are re-inserted to remote on every sync. Opik's API upserts by item ID, so the outcome depends on whether each item carries a UUID v7 id:

    • Items with a UUID v7 id are updated in-place on the remote — content changes replace the existing row; unchanged items are a no-op.
    • Items without a UUID v7 id (non-UUID values are stripped) are deduplicated by content hash — unchanged content is a no-op, but changed content creates a new remote row (the previous row remains), leading to row accumulation over time. save() inserts to remote and merges into the local file (new data takes precedence).
  • remote: The remote Opik dataset is the sole source of truth. load() fetches the remote dataset as-is with no local file interaction. save() inserts all items to remote without writing to any local file. If the remote dataset does not exist yet, it is created empty — no items are pushed from the local file. To seed a new remote dataset, run with sync_policy="local" at least once, or create and populate the dataset directly via the Opik UI.

Examples:

Using catalog YAML configuration:

# Local sync policy — local file seeds and syncs to remote
evaluation_dataset:
  type: kedro_datasets_experimental.opik.EvaluationDataset
  dataset_name: intent-detection-eval
  filepath: data/evaluation/intent_items.json
  sync_policy: local
  credentials: opik_credentials
  metadata:
    project: intent-detection

# Remote sync policy — Opik is the source of truth
production_eval:
  type: kedro_datasets_experimental.opik.EvaluationDataset
  dataset_name: intent-detection-eval
  sync_policy: remote
  credentials: opik_credentials

Using Python API:

from kedro_datasets_experimental.opik import EvaluationDataset

dataset = EvaluationDataset(
    dataset_name="intent-detection-eval",
    credentials={"api_key": "..."},  # pragma: allowlist secret
    filepath="data/evaluation/intent_items.json",
)

# Load returns an opik.Dataset for running experiments
from opik.evaluation import evaluate

eval_dataset = dataset.load()
evaluate(
    dataset=eval_dataset,
    task=my_task,
    scoring_functions=[my_scorer],
    experiment_name="my-experiment",
)

# Save new evaluation items
dataset.save(
    [
        {"id": "q1", "input": {"text": "cancel order"}, "expected_output": "cancel"},
    ]
)

# Same as in the other example, "q1" is not a UUID v7 and will be stripped on upload

Parameters:

  • dataset_name (str) –

    Name of the evaluation dataset in Opik.

  • credentials (dict[str, str]) –

    Opik authentication credentials. Required: api_key. Optional: workspace, host, project_name.

  • filepath (str | None, default: None ) –

    Path to a local JSON/YAML file for authoring evaluation items. Supports .json, .yaml, and .yml extensions. When None, no local file interaction occurs.

  • sync_policy (Literal['local', 'remote'], default: 'local' ) –

    Controls the source of truth for reads and whether a local file is involved: "local" (default) — all local items are re-inserted to remote on load(); save() inserts to remote and merges into the local file (new data takes precedence). "remote"load() fetches remote as-is; save() inserts to remote without local file interaction.

  • metadata (dict[str, Any] | None, default: None ) –

    Optional metadata dict stored locally and returned by _describe(). Note: Opik's create_dataset() does not accept a metadata argument, so this value is not propagated to the remote dataset.

Source code in kedro_datasets_experimental/opik/evaluation_dataset.py
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
def __init__(
    self,
    dataset_name: str,
    credentials: dict[str, str],
    filepath: str | None = None,
    sync_policy: Literal["local", "remote"] = "local",
    metadata: dict[str, Any] | None = None,
):
    """Initialise ``EvaluationDataset``.

    Args:
        dataset_name: Name of the evaluation dataset in Opik.
        credentials: Opik authentication credentials.
            Required: ``api_key``.
            Optional: ``workspace``, ``host``, ``project_name``.
        filepath: Path to a local JSON/YAML file for authoring evaluation
            items. Supports ``.json``, ``.yaml``, and ``.yml`` extensions.
            When ``None``, no local file interaction occurs.
        sync_policy: Controls the source of truth for reads and whether
            a local file is involved:
            ``"local"`` (default) — all local items are re-inserted to
            remote on ``load()``; ``save()`` inserts to remote and
            merges into the local file (new data takes precedence).
            ``"remote"`` — ``load()`` fetches remote as-is; ``save()``
            inserts to remote without local file interaction.
        metadata: Optional metadata dict stored locally and returned by
            ``_describe()``. Note: Opik's ``create_dataset()`` does not
            accept a metadata argument, so this value is not propagated
            to the remote dataset.
    """
    self._validate_init_params(credentials, filepath, sync_policy)

    self._dataset_name = dataset_name
    self._filepath = Path(filepath) if filepath else None
    self._sync_policy = sync_policy
    self._metadata = metadata
    self._file_dataset = None

    try:
        self._client = Opik(**credentials)
    except Exception as e:
        raise DatasetError(f"Failed to initialise Opik client: {e}") from e

_client instance-attribute

_client = Opik(**credentials)

_dataset_name instance-attribute

_dataset_name = dataset_name

_file_dataset instance-attribute

_file_dataset = None

_filepath instance-attribute

_filepath = Path(filepath) if filepath else None

_metadata instance-attribute

_metadata = metadata

_sync_policy instance-attribute

_sync_policy = sync_policy

file_dataset property

file_dataset

Return a JSON or YAML file dataset based on the filepath extension.

_describe

_describe()
Source code in kedro_datasets_experimental/opik/evaluation_dataset.py
538
539
540
541
542
543
544
def _describe(self) -> dict[str, Any]:
    return {
        "dataset_name": self._dataset_name,
        "filepath": str(self._filepath) if self._filepath else None,
        "sync_policy": self._sync_policy,
        "metadata": self._metadata,
    }

_exists

_exists()
Source code in kedro_datasets_experimental/opik/evaluation_dataset.py
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
def _exists(self) -> bool:
    try:
        self._client.get_dataset(name=self._dataset_name)
        return True
    except ApiError as e:
        if e.status_code == HTTP_NOT_FOUND:
            return False
        raise DatasetError(
            f"Opik API error while checking dataset '{self._dataset_name}': {e}"
        ) from e
    except Exception as e:
        raise DatasetError(
            f"Failed to connect to Opik while checking dataset "
            f"'{self._dataset_name}': {e}"
        ) from e

_get_or_create_remote_dataset

_get_or_create_remote_dataset()

Ensure the remote Opik dataset exists, creating it if not found.

Returns the latest Dataset object.

Raises:

  • DatasetError

    If the Opik API returns an unexpected error or is unreachable.

Source code in kedro_datasets_experimental/opik/evaluation_dataset.py
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
def _get_or_create_remote_dataset(self) -> Dataset:
    """Ensure the remote Opik dataset exists, creating it if not found.

    Returns the latest ``Dataset`` object.

    Raises:
        DatasetError: If the Opik API returns an unexpected error or is
            unreachable.
    """
    try:
        return self._client.get_dataset(name=self._dataset_name)
    except ApiError as e:
        if e.status_code != HTTP_NOT_FOUND:
            raise DatasetError(
                f"Opik API error while fetching dataset '{self._dataset_name}': {e}"
            ) from e
    except Exception as e:
        raise DatasetError(
            f"Failed to connect to Opik while fetching dataset "
            f"'{self._dataset_name}': {e}"
        ) from e

    try:
        logger.info(
            "Dataset '%s' not found on Opik, creating it.",
            self._dataset_name,
        )
        return self._client.create_dataset(
            name=self._dataset_name,
            description=f"Created by Kedro (sync_policy={self._sync_policy})",
        )
    except ApiError as e:
        raise DatasetError(
            f"Opik API error while creating dataset '{self._dataset_name}': {e}"
        ) from e
    except Exception as e:
        raise DatasetError(
            f"Failed to connect to Opik while creating dataset "
            f"'{self._dataset_name}': {e}"
        ) from e

_merge_items staticmethod

_merge_items(existing, new)

Merge new items into an existing list, deduplicating by id.

Items without an id key are always appended. For items with an id, new items take precedence — existing entries with the same id are replaced in place.

Source code in kedro_datasets_experimental/opik/evaluation_dataset.py
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
@staticmethod
def _merge_items(
    existing: list[dict[str, Any]],
    new: list[dict[str, Any]],
) -> list[dict[str, Any]]:
    """Merge new items into an existing list, deduplicating by ``id``.

    Items without an ``id`` key are always appended. For items with an
    ``id``, new items take precedence — existing entries with the same
    ``id`` are replaced in place.
    """
    new_by_id: dict[str, dict[str, Any]] = {
        item["id"]: item for item in new if "id" in item
    }

    seen_ids: set[str] = set()
    merged: list[dict[str, Any]] = []

    for item in existing:
        item_id = item.get("id")
        if item_id is not None and item_id in new_by_id:
            merged.append(new_by_id[item_id])
            seen_ids.add(item_id)
        else:
            merged.append(item)
            if item_id is not None:
                seen_ids.add(item_id)

    for item in new:
        item_id = item.get("id")
        if item_id is not None and item_id in seen_ids:
            continue
        if item_id is not None:
            seen_ids.add(item_id)
        merged.append(item)

    return merged

_strip_id staticmethod

_strip_id(item)
Source code in kedro_datasets_experimental/opik/evaluation_dataset.py
274
275
276
@staticmethod
def _strip_id(item: dict[str, Any]) -> dict[str, Any]:
    return {k: v for k, v in item.items() if k != "id"}

_sync_local_to_remote

_sync_local_to_remote(dataset)

Insert all local items into the remote dataset.

Reads the local file and inserts all items into the remote dataset. The Opik SDK deduplicates by content hash, so re-inserting unchanged items is a no-op. Returns a refreshed Dataset object. If the dataset's id is a valid UUID v7, the same remote row is updated in-place on every sync. Otherwise, a new remote row will be created.

Source code in kedro_datasets_experimental/opik/evaluation_dataset.py
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
def _sync_local_to_remote(self, dataset: Dataset) -> Dataset:
    """Insert all local items into the remote dataset.

    Reads the local file and inserts all items into the remote dataset.
    The Opik SDK deduplicates by content hash, so re-inserting unchanged
    items is a no-op. Returns a refreshed ``Dataset`` object. If the dataset's
    id is a valid UUID v7, the same remote row is updated in-place on every sync.
    Otherwise, a new remote row will be created.
    """
    if not self._filepath or not self._filepath.exists():
        return dataset

    local_items = self.file_dataset.load()
    self._validate_items(local_items)

    if not local_items:
        return dataset

    items_without_stable_id = [
        item for item in local_items
        if "id" not in item or not item.get("id")
    ]
    if items_without_stable_id:
        logger.warning(
            "Found %d item(s) with a missing, None, or empty 'id' field in '%s'. "
            "These cannot be tracked across syncs and will create new remote "
            "rows on every load.",
            len(items_without_stable_id),
            self._filepath,
        )

    items_with_non_uuid_v7_id = []
    for item in local_items:
        if item.get("id"):  # present and non-empty/non-None
            try:
                parsed = uuid.UUID(str(item["id"]))
                if parsed.version != REQUIRED_UUID_VERSION:
                    items_with_non_uuid_v7_id.append(item)
            except ValueError:
                items_with_non_uuid_v7_id.append(item)
    if items_with_non_uuid_v7_id:
        logger.warning(
            "Found %d item(s) with non-UUID-v7 'id' values in '%s' "
            "(e.g. '%s'). Opik requires UUID v7 for item IDs — these "
            "will be stripped before upload and Opik will auto-generate "
            "UUID v7 values. Remote rows will not have stable identities.",
            len(items_with_non_uuid_v7_id),
            self._filepath,
            items_with_non_uuid_v7_id[0]["id"],
        )

    logger.info(
        "Syncing %d item(s) from '%s' to remote dataset '%s'.",
        len(local_items),
        self._filepath,
        self._dataset_name,
    )
    self._upload_items(dataset, local_items)
    try:
        self._client.flush()
    except Exception as e:
        raise DatasetError(
            f"Failed to flush items to Opik dataset '{self._dataset_name}': {e}"
        ) from e

    try:
        return self._client.get_dataset(name=self._dataset_name)
    except ApiError as e:
        raise DatasetError(
            f"Opik API error while refreshing dataset '{self._dataset_name}' after sync: {e}"
        ) from e
    except Exception as e:
        raise DatasetError(
            f"Failed to refresh dataset '{self._dataset_name}' after sync: {e}"
        ) from e

_upload_items

_upload_items(dataset, items)

Insert items into the remote Opik dataset.

Upload behaviour depends on whether an item carries a UUID v7 id:

  • Valid UUID v7: forwarded to Opik. Opik's REST API calls create_or_update by item ID — the first call creates the remote row; subsequent calls update that same row in-place if the content has changed. Whenever content changes, the existing remote row is updated in-place, while no new row is created.
  • All other values (human-readable strings, UUIDs of other versions, None, empty string, or no id key): stripped before upload. Opik auto-generates a new UUID v7. Unchanged content is deduplicated by content hash (no-op), but changed content creates a new remote row while the previous one remains.

Callers are responsible for validating items before calling this method.

Raises:

  • DatasetError

    If the Opik API returns an error or the server is unreachable during insert.

Source code in kedro_datasets_experimental/opik/evaluation_dataset.py
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
def _upload_items(self, dataset: Dataset, items: list[dict[str, Any]]) -> None:
    """Insert items into the remote Opik dataset.

    Upload behaviour depends on whether an item carries a UUID v7 ``id``:

    - **Valid UUID v7**: forwarded to Opik. Opik's REST API calls
      ``create_or_update`` by item ID — the first call creates the
      remote row; subsequent calls update that same row in-place if
      the content has changed. Whenever content changes, the existing
      remote row is updated in-place, while no new row is created.
    - **All other values** (human-readable strings, UUIDs of other
      versions, ``None``, empty string, or no ``id`` key): stripped
      before upload. Opik auto-generates a new UUID v7. Unchanged
      content is deduplicated by content hash (no-op), but changed
      content creates a **new remote row** while the previous one
      remains.

    Callers are responsible for validating items before calling this method.

    Raises:
        DatasetError: If the Opik API returns an error or the server is
            unreachable during insert.
    """
    items_to_insert = []
    for item in items:
        if "id" not in item:
            items_to_insert.append(item)
        elif not item["id"]:
            items_to_insert.append(self._strip_id(item))
        else:
            try:
                parsed = uuid.UUID(str(item["id"]))
                if parsed.version == REQUIRED_UUID_VERSION:
                    items_to_insert.append(item)  # valid UUID v7 — preserve id
                else:
                    items_to_insert.append(self._strip_id(item))
            except ValueError:
                items_to_insert.append(self._strip_id(item))
    try:
        dataset.insert(items_to_insert)
    except ApiError as e:
        raise DatasetError(
            f"Opik API error while inserting items into dataset "
            f"'{self._dataset_name}': {e}"
        ) from e
    except Exception as e:
        raise DatasetError(
            f"Failed to insert items into Opik dataset '{self._dataset_name}': {e}"
        ) from e

_validate_init_params staticmethod

_validate_init_params(credentials, filepath, sync_policy)
Source code in kedro_datasets_experimental/opik/evaluation_dataset.py
213
214
215
216
217
218
219
220
221
222
@staticmethod
def _validate_init_params(
    credentials: dict[str, str],
    filepath: str | None,
    sync_policy: str,
) -> None:
    validate_credentials(credentials, REQUIRED_OPIK_CREDENTIALS, OPTIONAL_OPIK_CREDENTIALS)
    validate_sync_policy(sync_policy, VALID_SYNC_POLICIES)
    if filepath is not None:
        validate_file_extension(filepath)

_validate_items staticmethod

_validate_items(items)

Validate that all items contain the required input key.

Raises:

Source code in kedro_datasets_experimental/opik/evaluation_dataset.py
278
279
280
281
282
283
284
285
286
287
288
289
@staticmethod
def _validate_items(items: list[dict[str, Any]]) -> None:
    """Validate that all items contain the required ``input`` key.

    Raises:
        DatasetError: If any item is missing the ``input`` key.
    """
    for i, item in enumerate(items):
        if "input" not in item:
            raise DatasetError(
                f"Dataset item at index {i} is missing required 'input' key."
            )

load

load()

Load the Opik dataset, syncing local items to remote if sync_policy is local.

Creates the remote dataset if it does not exist. In local mode, all local items are re-inserted to remote on every load via Opik's create_or_update API (upsert by item ID). On items with a valid UUID v7 id, update the existing remote row in-place, and no new row is created. On items where the id is not a valid UUID v7 (including missing, None, or empty), the id is stripped before upload and Opik auto-generates a new UUID v7. Unchanged content is deduplicated (no-op), but changed content creates a new remote row while the previous one remains.

Returns:

  • Dataset

    The Opik dataset ready for use in experiments.

Raises:

  • DatasetError

    If the Opik API returns an unexpected error or the server is unreachable.

Source code in kedro_datasets_experimental/opik/evaluation_dataset.py
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
def load(self) -> Dataset:
    """Load the Opik dataset, syncing local items to remote if sync_policy is ``local``.

    Creates the remote dataset if it does not exist. In ``local`` mode, all
    local items are re-inserted to remote on every load via Opik's
    ``create_or_update`` API (upsert by item ID). On items with a valid UUID v7
    ``id``, update the existing remote row in-place, and no new row is created.
    On items where the ``id`` is not a valid UUID v7 (including missing, ``None``, or empty),
    the ``id`` is stripped before upload and Opik auto-generates a new UUID v7.
    Unchanged content is deduplicated (no-op), but changed content creates a
    new remote row while the previous one remains.

    Returns:
        Dataset: The Opik dataset ready for use in experiments.

    Raises:
        DatasetError: If the Opik API returns an unexpected error or the
            server is unreachable.
    """
    dataset = self._get_or_create_remote_dataset()

    if self._sync_policy == "local":
        dataset = self._sync_local_to_remote(dataset)

    logger.info(
        "Loaded dataset '%s' (sync_policy='%s').",
        self._dataset_name,
        self._sync_policy,
    )
    return dataset

preview

preview()

Generate a JSON-compatible preview of the local evaluation data for Kedro-Viz.

Returns:

  • JSONPreview

    A Kedro-Viz-compatible object containing a serialized JSON string. Returns a descriptive message if filepath is not configured or does not exist.

Source code in kedro_datasets_experimental/opik/evaluation_dataset.py
546
547
548
549
550
551
552
553
def preview(self) -> JSONPreview:
    """Generate a JSON-compatible preview of the local evaluation data for Kedro-Viz.

    Returns:
        JSONPreview: A Kedro-Viz-compatible object containing a serialized JSON string.
            Returns a descriptive message if filepath is not configured or does not exist.
    """
    return build_preview(self._filepath, self.file_dataset if self._filepath else None)

save

save(data)

Insert items into the Opik dataset and optionally update the local file.

In remote mode, only the remote upload occurs. In local mode, items are also merged into the local file.

Parameters:

  • data (list[dict[str, Any]]) –

    List of dicts, each containing at least an input key.

Raises:

  • DatasetError

    If the Opik API call fails or any item is missing input.

Source code in kedro_datasets_experimental/opik/evaluation_dataset.py
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
def save(self, data: list[dict[str, Any]]) -> None:
    """Insert items into the Opik dataset and optionally update the local file.

    In ``remote`` mode, only the remote upload occurs. In ``local`` mode,
    items are also merged into the local file.

    Args:
        data: List of dicts, each containing at least an ``input`` key.

    Raises:
        DatasetError: If the Opik API call fails or any item is missing ``input``.
    """
    if self._sync_policy == "remote":
        logger.warning(
            "sync_policy='remote': save() uploads to remote only, "
            "local file '%s' will not be updated.",
            self._filepath,
        )

    self._validate_items(data)

    dataset = self._get_or_create_remote_dataset()
    self._upload_items(dataset, data)
    try:
        self._client.flush()
    except Exception as e:
        raise DatasetError(
            f"Failed to flush items to Opik dataset '{self._dataset_name}': {e}"
        ) from e

    if self._sync_policy == "local" and self._filepath:
        existing: list[dict] = []
        if self._filepath.exists():
            existing = self.file_dataset.load()
        self.file_dataset.save(self._merge_items(existing, data))