Skip to content

langfuse.EvaluationDataset

kedro_datasets_experimental.langfuse.EvaluationDataset

EvaluationDataset(
    dataset_name,
    credentials,
    filepath=None,
    sync_policy="local",
    metadata=None,
    version=None,
)

Bases: AbstractDataset[list[dict[str, Any]], 'DatasetClient']

Kedro dataset for Langfuse evaluation datasets.

Connects to a Langfuse evaluation dataset and returns a DatasetClient on load(), which can be used to run experiments via dataset.run_experiment(). Supports an optional local JSON/YAML file as the authoring surface for evaluation items.

On load / save behaviour:

  • On load: Creates the remote dataset if it does not exist, synchronises based on sync_policy, and returns a DatasetClient.
  • On save: Upserts all items to the remote dataset — items with an existing id are updated in place, new items are created. In local mode, items are also merged into the local file (new items take precedence). In remote mode, only the remote upsert occurs.

Item format:

Evaluation items, whether stored in the local filepath file or passed as the data argument to save(), must be a list of dicts. Each item accepts the same keys as Langfuse.create_dataset_item():

  • input (required) — the evaluation input payload.
  • id — stable identifier used for deduplication on sync and upload.
  • expected_output — ground-truth value for scoring.
  • metadata — arbitrary metadata dict attached to the item.
  • source_trace_id — Langfuse trace ID to link the item to.
  • source_observation_id — observation ID within the source trace.
  • status"ACTIVE" (default) or "ARCHIVED".
[
  {
    "id": "q1",
    "input": {"text": "cancel my order"},
    "expected_output": "cancel_order",
    "metadata": {"source": "production"}
  }
]

Items without an id cannot be deduplicated and will be re-uploaded on every load() or save() call.

Sync policies:

  • local (default): The local file is the source of truth. On load(), all local items are upserted to remote (creating new items or updating existing ones matched by id). Items without an id field cannot be deduplicated and will create new entries on every load.
  • remote: The remote Langfuse dataset is the sole source of truth. load() fetches the remote dataset as-is with no local file interaction. save() upserts all items to remote but does not write to any local file. An optional version (ISO 8601 timestamp) can pin load() to a historical snapshot (requires langfuse>=3.14.0).

Examples:

Using catalog YAML configuration:

# Local sync policy - local file seeds and syncs to remote
evaluation_dataset:
  type: kedro_datasets_experimental.langfuse.EvaluationDataset
  dataset_name: intent-detection-eval
  filepath: data/evaluation/intent_items.json
  sync_policy: local
  credentials: langfuse_credentials
  metadata:
    project: intent-detection

# Remote sync policy - Langfuse is the source of truth
production_eval:
  type: kedro_datasets_experimental.langfuse.EvaluationDataset
  dataset_name: intent-detection-eval
  sync_policy: remote
  credentials: langfuse_credentials

# Pinned to a historical snapshot for reproducibility
eval_snapshot:
  type: kedro_datasets_experimental.langfuse.EvaluationDataset
  dataset_name: intent-detection-eval
  sync_policy: remote
  version: "2026-01-15T00:00:00Z"
  credentials: langfuse_credentials

Using Python API:

from kedro_datasets_experimental.langfuse import EvaluationDataset

dataset = EvaluationDataset(
    dataset_name="intent-detection-eval",
    credentials={
        "public_key": "pk_...",
        "secret_key": "sk_...",  # pragma: allowlist secret
    },
    filepath="data/evaluation/intent_items.json",
)

# Load returns a DatasetClient for running experiments
eval_dataset = dataset.load()
for item in eval_dataset.items:
    print(item.input, item.expected_output)

# Save new evaluation items
dataset.save(
    [
        {"id": "q1", "input": {"text": "cancel order"}, "expected_output": "cancel"},
    ]
)

Parameters:

  • dataset_name (str) –

    Name of the evaluation dataset in Langfuse.

  • credentials (dict[str, str]) –

    Langfuse authentication credentials. Required: public_key, secret_key. Optional: host (defaults to Langfuse cloud).

  • filepath (str | None, default: None ) –

    Path to a local JSON/YAML file for authoring evaluation items. Supports .json, .yaml, and .yml extensions. When None, no local file interaction occurs.

  • sync_policy (Literal['local', 'remote'], default: 'local' ) –

    Controls the source of truth for reads and whether a local file is involved: "local" (default) — all local items are upserted to remote on load(); save() upserts to remote and merges into the local file (new data takes precedence). "remote"load() fetches remote as-is; save() upserts to remote without local file interaction.

  • metadata (dict[str, Any] | None, default: None ) –

    Optional metadata dict passed to Langfuse when creating the remote dataset for the first time.

  • version (str | None, default: None ) –

    ISO 8601 timestamp to pin load() to a historical snapshot (e.g. "2026-01-15T00:00:00Z"). Only valid with sync_policy="remote". When omitted, the latest dataset state is returned. Requires langfuse>=3.14.0 (dataset versioning was introduced in the Feb 2026 release).

Raises:

  • DatasetError

    If credentials are missing or empty, sync_policy is invalid, filepath has an unsupported extension, or version is used with sync_policy="local".

Source code in kedro_datasets_experimental/langfuse/evaluation_dataset.py
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
def __init__(  # noqa: PLR0913
    self,
    dataset_name: str,
    credentials: dict[str, str],
    filepath: str | None = None,
    sync_policy: Literal["local", "remote"] = "local",
    metadata: dict[str, Any] | None = None,
    version: str | None = None,
):
    """Initialise ``EvaluationDataset``.

    Args:
        dataset_name: Name of the evaluation dataset in Langfuse.
        credentials: Langfuse authentication credentials.
            Required: ``public_key``, ``secret_key``.
            Optional: ``host`` (defaults to Langfuse cloud).
        filepath: Path to a local JSON/YAML file for authoring evaluation
            items. Supports ``.json``, ``.yaml``, and ``.yml`` extensions.
            When ``None``, no local file interaction occurs.
        sync_policy: Controls the source of truth for reads and whether
            a local file is involved:
            ``"local"`` (default) — all local items are upserted to
            remote on ``load()``; ``save()`` upserts to remote and
            merges into the local file (new data takes precedence).
            ``"remote"`` — ``load()`` fetches remote as-is; ``save()``
            upserts to remote without local file interaction.
        metadata: Optional metadata dict passed to Langfuse when creating
            the remote dataset for the first time.
        version: ISO 8601 timestamp to pin ``load()`` to a historical
            snapshot (e.g. ``"2026-01-15T00:00:00Z"``). Only valid with
            ``sync_policy="remote"``. When omitted, the latest dataset
            state is returned. Requires ``langfuse>=3.14.0`` (dataset
            versioning was introduced in the
            [Feb 2026 release](https://langfuse.com/changelog/2026-02-11-versioned-dataset-experiments)).

    Raises:
        DatasetError: If credentials are missing or empty, sync_policy is
            invalid, filepath has an unsupported extension, or version
            is used with ``sync_policy="local"``.
    """
    self._validate_init_params(credentials, filepath, sync_policy, version)

    self._dataset_name = dataset_name
    self._dataset: DatasetClient | None = None
    self._filepath = Path(filepath) if filepath else None
    self._sync_policy = sync_policy
    self._metadata = metadata
    self._version = self._parse_version(version)
    self._client = Langfuse(
        public_key=credentials["public_key"],
        secret_key=credentials["secret_key"],
        host=credentials.get("host"),
    )
    self._file_dataset = None

_client instance-attribute

_client = Langfuse(
    public_key=credentials["public_key"],
    secret_key=credentials["secret_key"],
    host=get("host"),
)

_dataset instance-attribute

_dataset = None

_dataset_name instance-attribute

_dataset_name = dataset_name

_file_dataset instance-attribute

_file_dataset = None

_filepath instance-attribute

_filepath = Path(filepath) if filepath else None

_metadata instance-attribute

_metadata = metadata

_sync_policy instance-attribute

_sync_policy = sync_policy

_version instance-attribute

_version = _parse_version(version)

file_dataset property

file_dataset

Return JSON/YAML file dataset based on extension.

_describe

_describe()
Source code in kedro_datasets_experimental/langfuse/evaluation_dataset.py
488
489
490
491
492
493
494
495
def _describe(self) -> dict[str, Any]:
    return {
        "dataset_name": self._dataset_name,
        "filepath": str(self._filepath) if self._filepath else None,
        "sync_policy": self._sync_policy,
        "version": self._version.isoformat() if self._version else None,
        "metadata": self._metadata,
    }

_exists

_exists()
Source code in kedro_datasets_experimental/langfuse/evaluation_dataset.py
477
478
479
480
481
482
483
484
485
486
def _exists(self) -> bool:
    try:
        self._client.get_dataset(name=self._dataset_name)
        return True
    except LangfuseNotFoundError:
        return False
    except LangfuseApiError as exc:
        raise DatasetError(
            f"Langfuse API error while checking dataset '{self._dataset_name}': {exc}"
        ) from exc

_get_or_create_remote_dataset

_get_or_create_remote_dataset()

Ensure the remote Langfuse dataset exists, creating it if not found.

Returns the latest DatasetClient.

Source code in kedro_datasets_experimental/langfuse/evaluation_dataset.py
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
def _get_or_create_remote_dataset(self) -> "DatasetClient":
    """Ensure the remote Langfuse dataset exists, creating it if not found.

    Returns the latest ``DatasetClient``.
    """
    try:
        return self._client.get_dataset(name=self._dataset_name)
    except LangfuseNotFoundError:
        pass
    except LangfuseApiError as exc:
        raise DatasetError(
            f"Langfuse API error while fetching dataset '{self._dataset_name}': {exc}"
        ) from exc

    try:
        logger.info(
            "Dataset '%s' not found on Langfuse, creating it.",
            self._dataset_name,
        )
        self._client.create_dataset(
            name=self._dataset_name,
            metadata=self._metadata or {},
        )
        return self._client.get_dataset(name=self._dataset_name)
    except LangfuseApiError as exc:
        raise DatasetError(
            f"Langfuse API error while creating dataset '{self._dataset_name}': {exc}"
        ) from exc

_load_local_items

_load_local_items()

Load items from the local file, returning an empty list if unavailable.

Source code in kedro_datasets_experimental/langfuse/evaluation_dataset.py
347
348
349
350
351
def _load_local_items(self) -> list[dict[str, Any]]:
    """Load items from the local file, returning an empty list if unavailable."""
    if not self._filepath or not self._filepath.exists():
        return []
    return self.file_dataset.load()

_merge_items staticmethod

_merge_items(existing, new)

Merge new items into existing list, deduplicating by 'id'.

Items without an id key are always appended. For items with an id, new items take precedence — existing entries with the same id are replaced.

Source code in kedro_datasets_experimental/langfuse/evaluation_dataset.py
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
@staticmethod
def _merge_items(
    existing: list[dict[str, Any]],
    new: list[dict[str, Any]],
) -> list[dict[str, Any]]:
    """Merge new items into existing list, deduplicating by 'id'.

    Items without an ``id`` key are always appended.
    For items with an ``id``, **new items take precedence** — existing
    entries with the same ``id`` are replaced.
    """
    new_by_id: dict[str, dict[str, Any]] = {}
    for item in new:
        item_id = item.get("id")
        if item_id is not None:
            new_by_id[item_id] = item

    seen_ids: set[str] = set()
    merged: list[dict[str, Any]] = []

    for item in existing:
        item_id = item.get("id")
        if item_id is not None and item_id in new_by_id:
            merged.append(new_by_id[item_id])
            seen_ids.add(item_id)
        else:
            merged.append(item)
            if item_id is not None:
                seen_ids.add(item_id)

    for item in new:
        item_id = item.get("id")
        if item_id is not None and item_id in seen_ids:
            continue
        if item_id is not None:
            seen_ids.add(item_id)
        merged.append(item)

    return merged

_parse_version staticmethod

_parse_version(version)

Parse an ISO 8601 version string into a timezone-aware UTC datetime.

Source code in kedro_datasets_experimental/langfuse/evaluation_dataset.py
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
@staticmethod
def _parse_version(version: str | None) -> datetime | None:
    """Parse an ISO 8601 version string into a timezone-aware UTC datetime."""
    if version is None:
        return None
    try:
        # Python 3.10 doesn't support 'Z' suffix in fromisoformat
        normalized = version[:-1] + "+00:00" if version.endswith("Z") else version
        dt = datetime.fromisoformat(normalized)
    except (ValueError, TypeError) as exc:
        raise DatasetError(
            f"Invalid version '{version}'. "
            f"Expected ISO 8601 format (e.g. '2026-01-15T00:00:00Z')."
        ) from exc
    if dt.tzinfo is None:
        dt = dt.replace(tzinfo=timezone.utc)
    return dt

_sync_local_to_remote

_sync_local_to_remote(dataset, local_items)

Upsert local items to remote (create new, update existing).

Every item is sent to Langfuse.create_dataset_item(), which performs an upsert: items with an id that already exists on remote are updated in place; new items are created. Items without an id always create new entries and cannot be deduplicated.

Returns the refreshed DatasetClient.

Source code in kedro_datasets_experimental/langfuse/evaluation_dataset.py
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
def _sync_local_to_remote(
    self,
    dataset: "DatasetClient",
    local_items: list[dict[str, Any]],
) -> "DatasetClient":
    """Upsert local items to remote (create new, update existing).

    Every item is sent to ``Langfuse.create_dataset_item()``, which
    performs an upsert: items with an ``id`` that already exists on
    remote are updated in place; new items are created. Items without
    an ``id`` always create new entries and cannot be deduplicated.

    Returns the refreshed ``DatasetClient``.
    """
    if not local_items:
        return dataset

    items_without_id = [item for item in local_items if "id" not in item]
    if items_without_id:
        logger.warning(
            "Found %d item(s) without an 'id' field. "
            "Items without 'id' cannot be deduplicated and will create "
            "new entries on every sync. Consider adding unique 'id' fields.",
            len(items_without_id),
        )

    logger.info(
        "Upserting %d item(s) from '%s' to remote dataset '%s'.",
        len(local_items),
        self._filepath,
        self._dataset_name,
    )
    self._upload_items(local_items)
    return self._client.get_dataset(name=self._dataset_name)

_upload_items

_upload_items(items)

Upload items to the remote Langfuse dataset.

Passes through all keys accepted by Langfuse.create_dataset_item(). Callers are responsible for validating items before calling this method.

Source code in kedro_datasets_experimental/langfuse/evaluation_dataset.py
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
def _upload_items(self, items: list[dict[str, Any]]) -> None:
    """Upload items to the remote Langfuse dataset.

    Passes through all keys accepted by ``Langfuse.create_dataset_item()``.
    Callers are responsible for validating items before calling this method.
    """
    for item in items:
        self._client.create_dataset_item(
            dataset_name=self._dataset_name,
            id=item.get("id"),
            input=item["input"],
            expected_output=item.get("expected_output"),
            metadata=item.get("metadata"),
            source_trace_id=item.get("source_trace_id"),
            source_observation_id=item.get("source_observation_id"),
            status=item.get("status"),
        )

_validate_init_params staticmethod

_validate_init_params(
    credentials, filepath, sync_policy, version
)
Source code in kedro_datasets_experimental/langfuse/evaluation_dataset.py
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
@staticmethod
def _validate_init_params(
    credentials: dict[str, str],
    filepath: str | None,
    sync_policy: str,
    version: str | None,
) -> None:
    validate_langfuse_credentials(credentials)
    validate_sync_policy(sync_policy, VALID_SYNC_POLICIES)
    if filepath is not None:
        validate_file_extension(filepath)
    if version is not None and sync_policy != "remote":
        raise DatasetError(
            "The 'version' parameter can only be used with "
            "sync_policy='remote'. A versioned load returns a historical "
            "snapshot which is incompatible with local-to-remote sync."
        )

_validate_items staticmethod

_validate_items(items)

Validate that all items contain the required 'input' key.

Source code in kedro_datasets_experimental/langfuse/evaluation_dataset.py
280
281
282
283
284
285
286
287
@staticmethod
def _validate_items(items: list[dict[str, Any]]) -> None:
    """Validate that all items contain the required 'input' key."""
    for i, item in enumerate(items):
        if "input" not in item:
            raise DatasetError(
                f"Dataset item at index {i} is missing required 'input' key."
            )

load

load()

Load the evaluation dataset from Langfuse.

Creates the remote dataset if it does not exist. In local mode, all local items are upserted to remote (creating new items or updating existing ones matched by id). In remote mode with version set, returns items as they existed at that point in time.

Returns:

  • DatasetClient

    Langfuse dataset client that can be used to iterate items or call run_experiment().

Raises:

  • DatasetError

    If the Langfuse API is unreachable or returns an unexpected error.

Source code in kedro_datasets_experimental/langfuse/evaluation_dataset.py
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
def load(self) -> "DatasetClient":
    """Load the evaluation dataset from Langfuse.

    Creates the remote dataset if it does not exist. In ``local`` mode,
    all local items are upserted to remote (creating new items or
    updating existing ones matched by ``id``). In ``remote`` mode with
    ``version`` set, returns items as they existed at that point in time.

    Returns:
        DatasetClient: Langfuse dataset client that can be used to
            iterate items or call ``run_experiment()``.

    Raises:
        DatasetError: If the Langfuse API is unreachable or returns
            an unexpected error.
    """
    local_items: list[dict[str, Any]] = []
    if self._sync_policy == "local":
        local_items = self._load_local_items()
        self._validate_items(local_items)

    dataset = self._get_or_create_remote_dataset()

    if self._version is not None:
        logger.info(
            "Loading versioned snapshot of '%s' at %s.",
            self._dataset_name,
            self._version.isoformat(),
        )
        dataset = self._client.get_dataset(
            name=self._dataset_name, version=self._version
        )

    if self._sync_policy == "local":
        dataset = self._sync_local_to_remote(dataset, local_items)

    logger.info(
        "Loaded dataset '%s' with %d item(s) (sync_policy='%s').",
        self._dataset_name,
        len(dataset.items),
        self._sync_policy,
    )
    self._dataset = dataset
    return dataset

preview

preview()

Generate a JSON-compatible preview of the local evaluation data.

Returns:

  • JSONPreview

    Serialised JSON string for Kedro-Viz. Returns a descriptive message if filepath is not configured or the file does not exist.

Source code in kedro_datasets_experimental/langfuse/evaluation_dataset.py
497
498
499
500
501
502
503
504
505
506
507
508
def preview(self) -> JSONPreview:
    """Generate a JSON-compatible preview of the local evaluation data.

    Returns:
        JSONPreview: Serialised JSON string for Kedro-Viz. Returns a
            descriptive message if ``filepath`` is not configured or
            the file does not exist.
    """
    return build_preview(
        self._filepath,
        self.file_dataset if self._filepath else None,
    )

save

save(data)

Save evaluation items to the remote dataset.

Upserts all items to Langfuse via create_dataset_item() — items with an existing id are updated in place, new items are created. In local mode, items are also merged into the local file (new items take precedence over existing entries with the same id). In remote mode, only the remote upload occurs.

Parameters:

  • data (list[dict[str, Any]]) –

    List of evaluation item dicts. Each item must contain an input key. See class docstring for the full list of accepted keys (mirrors Langfuse.create_dataset_item()).

Raises:

  • DatasetError

    If any item is missing the required input key or the Langfuse API returns an error.

Source code in kedro_datasets_experimental/langfuse/evaluation_dataset.py
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
def save(self, data: list[dict[str, Any]]) -> None:
    """Save evaluation items to the remote dataset.

    Upserts all items to Langfuse via ``create_dataset_item()`` — items
    with an existing ``id`` are updated in place, new items are created.
    In ``local`` mode, items are also merged into the local file (new
    items take precedence over existing entries with the same ``id``).
    In ``remote`` mode, only the remote upload occurs.

    Args:
        data: List of evaluation item dicts. Each item must contain
            an ``input`` key. See class docstring for the full list of
            accepted keys (mirrors ``Langfuse.create_dataset_item()``).

    Raises:
        DatasetError: If any item is missing the required ``input`` key
            or the Langfuse API returns an error.
    """
    self._validate_items(data)
    self._get_or_create_remote_dataset()

    items_without_id = [item for item in data if "id" not in item]
    if items_without_id:
        logger.warning(
            "Found %d item(s) without an 'id' field. "
            "Items without 'id' cannot be deduplicated and will create "
            "new entries on every save. Consider adding unique 'id' fields.",
            len(items_without_id),
        )

    logger.info(
        "Upserting %d item(s) to remote dataset '%s'.",
        len(data),
        self._dataset_name,
    )
    self._upload_items(data)

    if self._sync_policy == "local" and self._filepath:
        existing = []
        if self._filepath.exists():
            existing = self.file_dataset.load()
        merged = self._merge_items(existing, data)
        self.file_dataset.save(merged)