Skip to content

langfuse.LangfusePromptDataset

kedro_datasets_experimental.langfuse.LangfusePromptDataset

LangfusePromptDataset(
    filepath,
    prompt_name,
    credentials,
    prompt_type="text",
    sync_policy="local",
    mode="sdk",
    load_args=None,
    save_args=None,
)

Bases: AbstractDataset

Kedro dataset for managing prompts with Langfuse versioning and synchronization.

This dataset provides seamless integration between local prompt files (JSON/YAML) and Langfuse prompt management, supporting version control, labelling, and different synchronization policies.

On save / load behaviour:

  • On save: Creates a new version of the prompt in Langfuse with the local data.
  • On load: Synchronizes based on sync_policy and returns a raw Langfuse object (SDK mode) or a LangChain ChatPromptTemplate (langchain mode).

Sync policies:

  • local: Local file takes precedence (default). load_args (version/label) are ignored with a warning, and the latest prompt from Langfuse is loaded if available, since local files are the source of truth.
  • remote: Langfuse version takes precedence. load_args are respected.
  • strict: Raises an error if local and remote differ. load_args are respected.

Examples:

Using catalog YAML configuration:

# Local sync policy - local files are source of truth
intent_prompt:
  type: kedro_datasets_experimental.langfuse.LangfusePromptDataset
  filepath: data/prompts/intent.json
  prompt_name: "intent-classifier"
  prompt_type: "chat"
  credentials: langfuse_credentials
  sync_policy: local
  mode: langchain
  # load_args are ignored in local mode with warning
  # and latest prompt from langfuse is loaded if available
  save_args:
    labels: ["staging", "v2.1"]

# Remote sync policy - Langfuse versions are source of truth
production_prompt:
  type: kedro_datasets_experimental.langfuse.LangfusePromptDataset
  filepath: data/prompts/production.json
  prompt_name: "intent-classifier"
  sync_policy: remote
  load_args:
    label: "production"  # This is respected in remote mode

Using Python API:

from kedro_datasets_experimental.langfuse import LangfusePromptDataset

# Basic usage (using default Langfuse cloud)
dataset = LangfusePromptDataset(
    filepath="data/prompts/intent.json",
    prompt_name="intent-classifier",
    prompt_type="chat",
    credentials={
        "public_key": "pk_...",
        "secret_key": "sk_...",  # pragma: allowlist secret
    },
)

# With custom host
dataset = LangfusePromptDataset(
    filepath="data/prompts/intent.json",
    prompt_name="intent-classifier",
    prompt_type="chat",
    mode="langchain",
    credentials={
        "public_key": "pk_...",
        "secret_key": "sk_...",  # pragma: allowlist secret
        "host": "https://custom.langfuse.com",
    },
)

# Load and use prompt
prompt_template = dataset.load()
formatted = prompt_template.format(user_input="Hello world")

# Save new version with labels
chat_prompt = [
    {"type": "chatmessage", "role": "system", "content": "You are helpful."},
    {"type": "chatmessage", "role": "human", "content": "{input}"},
]
dataset.save(chat_prompt)

Parameters:

  • filepath (str) –

    Local file path for storing prompt. Supports .json, .yaml, .yml extensions.

  • prompt_name (str) –

    Unique identifier for the prompt in Langfuse.

  • prompt_type (Literal['chat', 'text'], default: 'text' ) –

    Type of prompt - "chat" for conversation or "text" for single prompts.

  • credentials (dict[str, Any]) –

    Dictionary with Langfuse credentials. Required: {public_key, secret_key}. Optional: {host} (defaults to Langfuse cloud if not provided).

  • sync_policy (Literal['local', 'remote', 'strict'], default: 'local' ) –

    How to handle conflicts between local and remote: - "local": Local file takes precedence (default) - "remote": Langfuse version takes precedence - "strict": Error if local and remote differ

  • mode (Literal['langchain', 'sdk'], default: 'sdk' ) –

    Return type for load() method: - "sdk": Returns raw Langfuse prompt object (default) - "langchain": Returns ChatPromptTemplate object

  • load_args (dict[str, Any] | None, default: None ) –

    Dictionary with loading parameters. Only used when sync_policy="remote" or "strict". Ignored with warning when sync_policy="local". Supported keys: - version (int): Specific version number to load - label (str): Specific label to load (e.g., "production", "staging") Note: Langfuse will throw an error if both version and label are used together. So label is preferred over version if provided.

  • save_args (dict[str, Any] | None, default: None ) –

    Dictionary with saving parameters. Supported keys: - labels (list[str]): List of labels to assign to new prompt versions

Examples:

>>> # Local sync policy (default) - local files are source of truth
>>> dataset = LangfusePromptDataset(
...     filepath="prompts/intent.json",
...     prompt_name="intent-classifier",
...     credentials={"public_key": "pk_...", "secret_key": "sk_..."}  # pragma: allowlist secret
... )
>>> # Remote sync policy - load specific version from Langfuse
>>> dataset = LangfusePromptDataset(
...     filepath="prompts/intent.yaml",
...     prompt_name="intent-classifier",
...     credentials=creds,
...     sync_policy="remote",
...     load_args={"version": 3}  # This is respected in remote mode
... )
>>> # Remote sync policy - load specific label from Langfuse
>>> dataset = LangfusePromptDataset(
...     filepath="prompts/production.json",
...     prompt_name="intent-classifier",
...     credentials=creds,
...     sync_policy="remote",
...     load_args={"label": "production"}  # This is respected in remote mode
... )
>>> # With custom host
>>> dataset = LangfusePromptDataset(
...     filepath="prompts/intent.json",
...     prompt_name="intent-classifier",
...     credentials={"public_key": "pk_...", "secret_key": "sk_...", "host": "https://custom.langfuse.com"}  # pragma: allowlist secret
... )
>>> # Auto-label new versions when saving (works with any sync policy)
>>> dataset = LangfusePromptDataset(
...     filepath="prompts/intent.json",
...     prompt_name="intent-classifier",
...     credentials=creds,
...     save_args={"labels": ["staging", "v2.1"]}
... )

Raises:

  • DatasetError

    If credentials are missing required keys.

  • NotImplementedError

    If filepath has unsupported extension.

Source code in kedro-datasets/kedro_datasets_experimental/langfuse/langfuse_prompt_dataset.py
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
def __init__(   # noqa: PLR0913
    self,
    filepath: str,
    prompt_name: str,
    credentials: dict[str, Any],
    prompt_type: Literal["chat", "text"] = "text",
    sync_policy: Literal["local", "remote", "strict"] = "local",
    mode: Literal["langchain", "sdk"] = "sdk",
    load_args: dict[str, Any] | None = None,
    save_args: dict[str, Any] | None = None,
) -> None:
    """
    Initialize LangfusePromptDataset for managing prompts with Langfuse versioning.

    Args:
        filepath: Local file path for storing prompt. Supports .json, .yaml, .yml extensions.
        prompt_name: Unique identifier for the prompt in Langfuse.
        prompt_type: Type of prompt - "chat" for conversation or "text" for single prompts.
        credentials: Dictionary with Langfuse credentials. Required: {public_key, secret_key}.
            Optional: {host} (defaults to Langfuse cloud if not provided).
        sync_policy: How to handle conflicts between local and remote:
            - "local": Local file takes precedence (default)
            - "remote": Langfuse version takes precedence
            - "strict": Error if local and remote differ
        mode: Return type for load() method:
            - "sdk": Returns raw Langfuse prompt object (default)
            - "langchain": Returns ChatPromptTemplate object
        load_args: Dictionary with loading parameters. Only used when sync_policy="remote" or "strict".
            Ignored with warning when sync_policy="local". Supported keys:
            - version (int): Specific version number to load
            - label (str): Specific label to load (e.g., "production", "staging")
            Note: Langfuse will throw an error if both version and label are used together.
            So label is preferred over version if provided.
        save_args: Dictionary with saving parameters. Supported keys:
            - labels (list[str]): List of labels to assign to new prompt versions

    Examples:
        >>> # Local sync policy (default) - local files are source of truth
        >>> dataset = LangfusePromptDataset(
        ...     filepath="prompts/intent.json",
        ...     prompt_name="intent-classifier",
        ...     credentials={"public_key": "pk_...", "secret_key": "sk_..."}  # pragma: allowlist secret
        ... )

        >>> # Remote sync policy - load specific version from Langfuse
        >>> dataset = LangfusePromptDataset(
        ...     filepath="prompts/intent.yaml",
        ...     prompt_name="intent-classifier",
        ...     credentials=creds,
        ...     sync_policy="remote",
        ...     load_args={"version": 3}  # This is respected in remote mode
        ... )

        >>> # Remote sync policy - load specific label from Langfuse
        >>> dataset = LangfusePromptDataset(
        ...     filepath="prompts/production.json",
        ...     prompt_name="intent-classifier",
        ...     credentials=creds,
        ...     sync_policy="remote",
        ...     load_args={"label": "production"}  # This is respected in remote mode
        ... )

        >>> # With custom host
        >>> dataset = LangfusePromptDataset(
        ...     filepath="prompts/intent.json",
        ...     prompt_name="intent-classifier",
        ...     credentials={"public_key": "pk_...", "secret_key": "sk_...", "host": "https://custom.langfuse.com"}  # pragma: allowlist secret
        ... )

        >>> # Auto-label new versions when saving (works with any sync policy)
        >>> dataset = LangfusePromptDataset(
        ...     filepath="prompts/intent.json",
        ...     prompt_name="intent-classifier",
        ...     credentials=creds,
        ...     save_args={"labels": ["staging", "v2.1"]}
        ... )

    Raises:
        DatasetError: If credentials are missing required keys.
        NotImplementedError: If filepath has unsupported extension.
    """
    # Validate all parameters before assignment
    self._validate_init_params(filepath, credentials, prompt_type, sync_policy, mode, load_args, save_args)

    self._filepath = Path(filepath)
    self._prompt_name = prompt_name
    self._prompt_type: Literal["chat", "text"] = prompt_type or "text"
    self._langfuse = Langfuse(
        public_key=credentials["public_key"],
        secret_key=credentials["secret_key"],
        host=credentials.get("host"),
    )
    self._sync_policy = sync_policy or "local"
    self._mode = mode or "sdk"
    self._load_args = load_args or {}
    self._save_args = save_args or {}
    self._file_dataset = None
    self._cached_build_args = None

_cached_build_args instance-attribute

_cached_build_args = None

_file_dataset instance-attribute

_file_dataset = None

_filepath instance-attribute

_filepath = Path(filepath)

_get_build_args property

_get_build_args

Build kwargs for fetching prompt from Langfuse based on load_args and sync_policy.

This is a cached property that computes the arguments once and reuses them for performance optimization, as these parameters are accessed frequently during load operations and error message generation.

When sync_policy="local", load_args (version/label) are ignored since local files are the source of truth. Users get a warning and the latest version is fetched for synchronization purposes only.

When sync_policy="remote" or "strict", load_args are respected since remote versions matter for these policies.

Returns:

  • dict[str, Any]

    Cached kwargs dictionary for langfuse.get_prompt() with name, type, and

  • dict[str, Any]

    optional version or label parameters.

_langfuse instance-attribute

_langfuse = Langfuse(
    public_key=credentials["public_key"],
    secret_key=credentials["secret_key"],
    host=get("host"),
)

_load_args instance-attribute

_load_args = load_args or {}

_mode instance-attribute

_mode = mode or 'sdk'

_prompt_name instance-attribute

_prompt_name = prompt_name

_prompt_type instance-attribute

_prompt_type = prompt_type or 'text'

_save_args instance-attribute

_save_args = save_args or {}

_sync_policy instance-attribute

_sync_policy = sync_policy or 'local'

file_dataset property

file_dataset

Get appropriate Kedro dataset based on file extension (cached).

Returns:

Raises:

  • NotImplementedError

    If file extension is not supported.

_adapt_langfuse_chat_format

_adapt_langfuse_chat_format(prompt_data)

Remove Langfuse-specific 'type' key from chat messages for local file compatibility.

Parameters:

  • prompt_data (str | list) –

    The prompt data from Langfuse (string or list of messages).

Returns:

  • str | list

    New prompt data with 'type' key removed from messages if present.

  • str | list

    For string prompts, returns the input unchanged.

Source code in kedro-datasets/kedro_datasets_experimental/langfuse/langfuse_prompt_dataset.py
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
def _adapt_langfuse_chat_format(self, prompt_data: str | list) -> str | list:
    """Remove Langfuse-specific 'type' key from chat messages for local file compatibility.

    Args:
        prompt_data: The prompt data from Langfuse (string or list of messages).

    Returns:
        New prompt data with 'type' key removed from messages if present.
        For string prompts, returns the input unchanged.
    """
    if isinstance(prompt_data, list):
        # Return new list instead of mutating input
        adapted_messages = []
        for msg in prompt_data:
            if isinstance(msg, dict) and "type" in msg:
                # Create new dict without the type key
                adapted_msg = {k: v for k, v in msg.items() if k != "type"}
                adapted_messages.append(adapted_msg)
            else:
                adapted_messages.append(msg)
        return adapted_messages
    return prompt_data

_describe

_describe()

Return a description of the dataset for Kedro's internal use.

Returns:

  • dict[str, Any]

    Dictionary containing dataset description with filepath and Langfuse prompt details.

Source code in kedro-datasets/kedro_datasets_experimental/langfuse/langfuse_prompt_dataset.py
371
372
373
374
375
376
377
378
379
380
381
def _describe(self) -> dict[str, Any]:
    """Return a description of the dataset for Kedro's internal use.

    Returns:
        Dictionary containing dataset description with filepath and Langfuse prompt details.
    """
    return {
        "filepath": self._filepath,
        "prompt_name": self._prompt_name,
        "langfuse_prompt_args": self._get_build_args
    }

_get_prompt_description

_get_prompt_description()

Get consistent prompt description for error messages.

Source code in kedro-datasets/kedro_datasets_experimental/langfuse/langfuse_prompt_dataset.py
383
384
385
def _get_prompt_description(self) -> str:
    """Get consistent prompt description for error messages."""
    return f"'{self._prompt_name}' with args {self._get_build_args}"

_sync_local_policy

_sync_local_policy(local_data, langfuse_prompt)

Handle local sync policy - local file takes precedence.

Local files are the source of truth. When local content differs from remote, the local content is pushed to Langfuse as a new version. If local file is missing but remote exists, the remote content is saved locally.

Parameters:

  • local_data (str | None) –

    Content from local file, None if file doesn't exist

  • langfuse_prompt (Any | None) –

    Langfuse prompt object, None if not found remotely

Returns:

  • Any

    Langfuse prompt object after syncing

Raises:

  • DatasetError

    If neither local nor remote prompt exists

Source code in kedro-datasets/kedro_datasets_experimental/langfuse/langfuse_prompt_dataset.py
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
def _sync_local_policy(
    self, local_data: str | None, langfuse_prompt: Any | None
) -> Any:
    """
    Handle local sync policy - local file takes precedence.

    Local files are the source of truth. When local content differs from remote,
    the local content is pushed to Langfuse as a new version. If local file is missing
    but remote exists, the remote content is saved locally.

    Args:
        local_data: Content from local file, None if file doesn't exist
        langfuse_prompt: Langfuse prompt object, None if not found remotely

    Returns:
        Any: Langfuse prompt object after syncing

    Raises:
        DatasetError: If neither local nor remote prompt exists
    """
    if local_data is not None:
        if langfuse_prompt is None:
            # Push local to Langfuse
            logger.info(f"Creating '{self._prompt_name}' prompt in Langfuse from local file '{self._filepath}' as remote prompt does not exist (local sync policy)")
            self.save(local_data)
            return self._langfuse.get_prompt(**self._get_build_args)

        # If mismatch → update Langfuse with local
        if _hash(_get_content(local_data)) != _hash(
            _get_content(langfuse_prompt.prompt)
        ):
            logger.warning(f"Creating a new version of '{self._prompt_name}' prompt in Langfuse from local file '{self._filepath}' as local file prompt content does not match with remote prompt (local sync policy)")
            # Push local to Langfuse
            self.save(local_data)
            return self._langfuse.get_prompt(**self._get_build_args)
        return langfuse_prompt

    # If local missing but Langfuse exists → persist locally
    if langfuse_prompt:
        normalized_prompt = self._adapt_langfuse_chat_format(langfuse_prompt.prompt)
        logger.warning(f"Creating local file '{self._filepath}' from remote prompt '{self._prompt_name}' from Langfuse as local file is missing (local sync policy)")
        self.file_dataset.save(normalized_prompt)
        return langfuse_prompt

    raise DatasetError(
        f"No prompt found locally at {self._filepath} or in Langfuse for {self._get_prompt_description()}"
    )

_sync_remote_policy

_sync_remote_policy(local_data, langfuse_prompt)

Handle remote sync policy - Langfuse version takes precedence.

Parameters:

  • local_data (str | None) –

    Content from local file, None if file doesn't exist

  • langfuse_prompt (Any | None) –

    Langfuse prompt object, None if not found remotely

Returns:

  • Any

    Langfuse prompt object after updating local file if needed

Raises:

  • DatasetError

    If remote prompt doesn't exist

Source code in kedro-datasets/kedro_datasets_experimental/langfuse/langfuse_prompt_dataset.py
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
def _sync_remote_policy(
    self, local_data: str | None, langfuse_prompt: Any | None
) -> Any:
    """
    Handle remote sync policy - Langfuse version takes precedence.

    Args:
        local_data: Content from local file, None if file doesn't exist
        langfuse_prompt: Langfuse prompt object, None if not found remotely

    Returns:
        Any: Langfuse prompt object after updating local file if needed

    Raises:
        DatasetError: If remote prompt doesn't exist
    """
    if not langfuse_prompt:
        raise DatasetError(
            f"Remote sync policy specified for {self._get_prompt_description()} "
            "but no remote prompt exists in Langfuse. Confirm that you've configured "
            f"the correct Langfuse host and create the prompt in Langfuse first or use 'local' sync policy."
        )
    if not local_data or _hash(_get_content(local_data)) != _hash(_get_content(langfuse_prompt.prompt)):
        normalized_prompt = self._adapt_langfuse_chat_format(langfuse_prompt.prompt)
        logger.warning(f"Creating/Overwriting local file '{self._filepath}' with remote prompt '{self._prompt_name}' from Langfuse (remote sync policy)")
        self.file_dataset.save(normalized_prompt)
    return langfuse_prompt

_sync_strict_policy

_sync_strict_policy(local_data, langfuse_prompt)

Handle strict sync policy - error if local and remote differ.

Parameters:

  • local_data (str | list | None) –

    Content from local file, None if file doesn't exist.

  • langfuse_prompt (Any | None) –

    Langfuse prompt object, None if not found remotely.

Returns:

  • Any

    Langfuse prompt object if sync is successful.

Raises:

  • DatasetError

    If either local_data or langfuse_prompt is missing, or if they differ.

Source code in kedro-datasets/kedro_datasets_experimental/langfuse/langfuse_prompt_dataset.py
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
def _sync_strict_policy(
    self, local_data: str | list | None, langfuse_prompt: Any | None
) -> Any:
    """Handle strict sync policy - error if local and remote differ.

    Args:
        local_data: Content from local file, None if file doesn't exist.
        langfuse_prompt: Langfuse prompt object, None if not found remotely.

    Returns:
        Langfuse prompt object if sync is successful.

    Raises:
        DatasetError: If either local_data or langfuse_prompt is missing, or if they differ.
    """
    if not local_data or not langfuse_prompt:
        missing_parts = []
        if not local_data:
            missing_parts.append("local file")
        if not langfuse_prompt:
            missing_parts.append("remote prompt")

        raise DatasetError(
            f"Strict sync policy specified for {self._get_prompt_description()}. "
            f"Both local and remote prompts must exist in strict mode. "
            f"Missing: {' and '.join(missing_parts)}."
        )

    local_hash = _hash(_get_content(local_data))
    remote_hash = _hash(_get_content(langfuse_prompt.prompt))
    if local_hash != remote_hash:
        raise DatasetError(
            f"Strict sync failed for {self._get_prompt_description()}: "
            f"local and remote prompts differ. Use 'local' or 'remote' policy to resolve."
        )
    return langfuse_prompt

_sync_with_langfuse

_sync_with_langfuse(local_data, langfuse_prompt)

Synchronize local file and Langfuse prompt based on configured sync policy.

This method delegates to specialized sync policy handlers based on the configured sync_policy setting.

Parameters:

  • local_data (str | None) –

    Content from local file, None if file doesn't exist

  • langfuse_prompt (Any | None) –

    Langfuse prompt object, None if not found remotely

Returns:

  • Any

    Langfuse prompt object after synchronization

Raises:

  • DatasetError

    Based on sync_policy conflicts (see individual policy methods)

  • DatasetError

    If no prompt found locally or in Langfuse

Source code in kedro-datasets/kedro_datasets_experimental/langfuse/langfuse_prompt_dataset.py
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
def _sync_with_langfuse(
    self, local_data: str | None, langfuse_prompt: Any | None
) -> Any:
    """
    Synchronize local file and Langfuse prompt based on configured sync policy.

    This method delegates to specialized sync policy handlers based on the
    configured sync_policy setting.

    Args:
        local_data: Content from local file, None if file doesn't exist
        langfuse_prompt: Langfuse prompt object, None if not found remotely

    Returns:
        Any: Langfuse prompt object after synchronization

    Raises:
        DatasetError: Based on sync_policy conflicts (see individual policy methods)
        DatasetError: If no prompt found locally or in Langfuse
    """
    if self._sync_policy == "strict":
        return self._sync_strict_policy(local_data, langfuse_prompt)
    elif self._sync_policy == "remote":
        return self._sync_remote_policy(local_data, langfuse_prompt)
    else:  # local policy (default)
        return self._sync_local_policy(local_data, langfuse_prompt)

_validate_args

_validate_args(load_args, save_args)

Validate load_args and save_args.

Parameters:

  • load_args (dict[str, Any] | None) –

    Load arguments to validate.

  • save_args (dict[str, Any] | None) –

    Save arguments to validate.

Raises:

  • DatasetError

    If argument types are invalid.

Source code in kedro-datasets/kedro_datasets_experimental/langfuse/langfuse_prompt_dataset.py
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
def _validate_args(self, load_args: dict[str, Any] | None, save_args: dict[str, Any] | None) -> None:
    """Validate load_args and save_args.

    Args:
        load_args: Load arguments to validate.
        save_args: Save arguments to validate.

    Raises:
        DatasetError: If argument types are invalid.
    """
    if load_args is not None:
        if "version" in load_args and load_args["version"] is not None:
            if not isinstance(load_args["version"], int):
                raise DatasetError(
                    f"load_args['version'] must be an integer, got {type(load_args['version']).__name__}: {load_args['version']}"
                )
        if "label" in load_args and load_args["label"] is not None:
            if not isinstance(load_args["label"], str):
                raise DatasetError(
                    f"load_args['label'] must be a string, got {type(load_args['label']).__name__}: {load_args['label']}"
                )

    if save_args is not None:
        if "labels" in save_args and save_args["labels"] is not None:
            if not isinstance(save_args["labels"], list):
                raise DatasetError(
                    f"save_args['labels'] must be a list, got {type(save_args['labels']).__name__}: {save_args['labels']}"
                )
            for i, label in enumerate(save_args["labels"]):
                if not isinstance(label, str):
                    raise DatasetError(
                        f"save_args['labels'][{i}] must be a string, got {type(label).__name__}: {label}"
                    )

_validate_credentials

_validate_credentials(credentials)

Validate Langfuse credentials.

Parameters:

  • credentials (dict[str, Any]) –

    Credentials dictionary to validate.

Raises:

  • DatasetError

    If required credentials are missing or empty.

Source code in kedro-datasets/kedro_datasets_experimental/langfuse/langfuse_prompt_dataset.py
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
def _validate_credentials(self, credentials: dict[str, Any]) -> None:
    """Validate Langfuse credentials.

    Args:
        credentials: Credentials dictionary to validate.

    Raises:
        DatasetError: If required credentials are missing or empty.
    """
    # Validate required keys
    for key in REQUIRED_LANGFUSE_CREDENTIALS:
        if key not in credentials:
            raise DatasetError(f"Missing required Langfuse credential: '{key}'")
        if not credentials[key] or not str(credentials[key]).strip():
            raise DatasetError(f"Langfuse credential '{key}' cannot be empty")

    # Validate optional keys
    for key in OPTIONAL_LANGFUSE_CREDENTIALS:
        if key in credentials:
            if not credentials[key] or not str(credentials[key]).strip():
                raise DatasetError(f"Langfuse credential '{key}' cannot be empty if provided")

_validate_enum_params

_validate_enum_params(prompt_type, sync_policy, mode)

Validate enum parameters.

Parameters:

  • prompt_type (str) –

    Prompt type to validate.

  • sync_policy (str) –

    Sync policy to validate.

  • mode (str) –

    Mode to validate.

Raises:

  • DatasetError

    If parameter values are invalid.

Source code in kedro-datasets/kedro_datasets_experimental/langfuse/langfuse_prompt_dataset.py
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
def _validate_enum_params(self, prompt_type: str, sync_policy: str, mode: str) -> None:
    """Validate enum parameters.

    Args:
        prompt_type: Prompt type to validate.
        sync_policy: Sync policy to validate.
        mode: Mode to validate.

    Raises:
        DatasetError: If parameter values are invalid.
    """
    if prompt_type and prompt_type not in VALID_PROMPT_TYPES:
        raise DatasetError(
            f"Invalid prompt_type '{prompt_type}'. Must be one of: {', '.join(sorted(VALID_PROMPT_TYPES))}"
        )

    if sync_policy and sync_policy not in VALID_SYNC_POLICIES:
        raise DatasetError(
            f"Invalid sync_policy '{sync_policy}'. Must be one of: {', '.join(sorted(VALID_SYNC_POLICIES))}"
        )

    if mode and mode not in VALID_MODES:
        raise DatasetError(
            f"Invalid mode '{mode}'. Must be one of: {', '.join(sorted(VALID_MODES))}"
        )

_validate_init_params

_validate_init_params(
    filepath,
    credentials,
    prompt_type,
    sync_policy,
    mode,
    load_args=None,
    save_args=None,
)

Validate initialization parameters.

Parameters:

  • filepath (str) –

    File path to validate for supported extensions.

  • credentials (dict[str, Any]) –

    Credentials dictionary to validate.

  • prompt_type (str) –

    Prompt type to validate.

  • sync_policy (str) –

    Sync policy to validate.

  • mode (str) –

    Mode to validate.

  • load_args (dict[str, Any] | None, default: None ) –

    Load arguments to validate.

  • save_args (dict[str, Any] | None, default: None ) –

    Save arguments to validate.

Raises:

  • DatasetError

    If parameters are invalid.

  • NotImplementedError

    If filepath has unsupported extension.

  • ImportError

    If langchain package is required but not available.

Source code in kedro-datasets/kedro_datasets_experimental/langfuse/langfuse_prompt_dataset.py
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
def _validate_init_params(  # noqa: PLR0913
    self,
    filepath: str,
    credentials: dict[str, Any],
    prompt_type: str,
    sync_policy: str,
    mode: str,
    load_args: dict[str, Any] | None = None,
    save_args: dict[str, Any] | None = None,
) -> None:
    """Validate initialization parameters.

    Args:
        filepath: File path to validate for supported extensions.
        credentials: Credentials dictionary to validate.
        prompt_type: Prompt type to validate.
        sync_policy: Sync policy to validate.
        mode: Mode to validate.
        load_args: Load arguments to validate.
        save_args: Save arguments to validate.

    Raises:
        DatasetError: If parameters are invalid.
        NotImplementedError: If filepath has unsupported extension.
        ImportError: If langchain package is required but not available.
    """
    # Validate file extension
    file_path = Path(filepath)
    if file_path.suffix.lower() not in SUPPORTED_FILE_EXTENSIONS:
        raise NotImplementedError(
            f"Unsupported file extension '{file_path.suffix}'. "
            f"Supported formats: {', '.join(sorted(SUPPORTED_FILE_EXTENSIONS))}"
        )

    # Validate mode-specific requirements
    if mode == "langchain":
        try:
            from langchain.prompts import ChatPromptTemplate  # noqa: PLC0415
        except ImportError as exc:
            raise ImportError(
                "The 'langchain' package is required when using mode='langchain'. "
                "Install it with: pip install 'kedro-datasets[langfuse]'"
            ) from exc

    # Delegate to specialized validation methods
    self._validate_credentials(credentials)
    self._validate_enum_params(prompt_type, sync_policy, mode)
    self._validate_args(load_args, save_args)

load

load()

Loads prompt from Langfuse, local file if present, and synchronizes based on sync_policy. Returns prompt in format specified by mode.

Returns:

  • ChatPromptTemplate

    If mode="langchain", ready-to-use LangChain template.

  • Any

    If mode="sdk", raw Langfuse prompt object.

Raises:

  • DatasetError

    If sync_policy conflicts or no prompt found.

Source code in kedro-datasets/kedro_datasets_experimental/langfuse/langfuse_prompt_dataset.py
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
def load(self) -> Union["ChatPromptTemplate", Any]:
    """Loads prompt from Langfuse, local file if present, and synchronizes
    based on sync_policy. Returns prompt in format specified by mode.

    Returns:
        ChatPromptTemplate: If mode="langchain", ready-to-use LangChain template.
        Any: If mode="sdk", raw Langfuse prompt object.

    Raises:
        DatasetError: If sync_policy conflicts or no prompt found.
    """
    # Temporarily suppress Langfuse logger to prevent Langfuse ERROR logs for 404s
    langfuse_logger = logging.getLogger('langfuse')
    original_level = langfuse_logger.level
    langfuse_logger.setLevel(logging.CRITICAL)

    try:
        langfuse_prompt = self._langfuse.get_prompt(**self._get_build_args)
    except (ConnectionError, TimeoutError) as e:
        logger.warning(f"Network error when fetching prompt '{self._prompt_name}' from langfuse: {e}. ")
        langfuse_prompt = None
    except Exception as e:
        logger.warning(
            f"Error when fetching prompt '{self._prompt_name}' from langfuse: {type(e).__name__}: {e}. ")
        langfuse_prompt = None
    finally:
        # Restore original logging level
        langfuse_logger.setLevel(original_level)

    # Load local file if it exists
    local_data = None
    if self._filepath.exists():
        local_data = self.file_dataset.load()

    # Synchronize local and remote
    langfuse_prompt = self._sync_with_langfuse(local_data, langfuse_prompt)

    if self._mode == "sdk":
        return langfuse_prompt
    elif self._mode == "langchain":
        from langchain.prompts import ChatPromptTemplate  # noqa: PLC0415
        return ChatPromptTemplate.from_messages(langfuse_prompt.get_langchain_prompt())
    else:
        raise DatasetError(f"Unsupported mode: {self._mode}. Must be 'sdk' or 'langchain'.")

preview

preview()

Generate a JSON-compatible preview of the underlying prompt data for Kedro-Viz.

Automatically wraps string content in a JSON object to ensure compatibility with Kedro-Viz's JSON preview requirements. This prevents "src property must be a valid json object" errors when the local file contains plain text.

Returns:

  • JSONPreview

    A Kedro-Viz-compatible object containing a serialized JSON string. String content is wrapped in {"content": } format for proper JSON object structure. Returns error message if local file doesn't exist.

Source code in kedro-datasets/kedro_datasets_experimental/langfuse/langfuse_prompt_dataset.py
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
def preview(self) -> JSONPreview:
    """
    Generate a JSON-compatible preview of the underlying prompt data for Kedro-Viz.

    Automatically wraps string content in a JSON object to ensure compatibility
    with Kedro-Viz's JSON preview requirements. This prevents "src property must
    be a valid json object" errors when the local file contains plain text.

    Returns:
        JSONPreview: A Kedro-Viz-compatible object containing a serialized JSON string.
            String content is wrapped in {"content": <string>} format for proper
            JSON object structure. Returns error message if local file doesn't exist.
    """
    if self._filepath.exists():
        local_data = self.file_dataset.load()

        # If local_data is just a string, wrap it in a JSON object
        if isinstance(local_data, str):
            local_data = {"content": local_data}

        return JSONPreview(json.dumps(local_data))

    return JSONPreview("Local prompt does not exist.")

save

save(data)

Create a new version of prompt in Langfuse with the local data.

Parameters:

  • data (str | list) –

    The prompt content to save. Can be string for text prompts or list of message dictionaries for chat prompts.

Raises:

  • DatasetError

    If Langfuse API call fails or invalid data format.

Source code in kedro-datasets/kedro_datasets_experimental/langfuse/langfuse_prompt_dataset.py
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
def save(self, data: str | list) -> None:
    """Create a new version of prompt in Langfuse with the local data.

    Args:
        data: The prompt content to save. Can be string for text prompts
            or list of message dictionaries for chat prompts.

    Raises:
        DatasetError: If Langfuse API call fails or invalid data format.
    """
    create_kwargs = {
        "name": self._prompt_name,
        "prompt": data,
        "type": self._prompt_type,
    }

    # Add labels from save_args if specified
    if "labels" in self._save_args:
        create_kwargs["labels"] = self._save_args["labels"]

    self._langfuse.create_prompt(**create_kwargs)