Skip to content

langfuse.LangfuseTraceDataset

kedro_datasets_experimental.langfuse.LangfuseTraceDataset

LangfuseTraceDataset(
    credentials, mode="sdk", **trace_kwargs
)

Bases: AbstractDataset

Kedro dataset for managing Langfuse tracing clients and callbacks.

This dataset provides appropriate tracing objects based on mode configuration, enabling seamless integration with different AI frameworks and direct SDK usage. Environment variables are automatically configured during initialization.

Modes:

  • langchain: Returns a CallbackHandler for LangChain integration.
  • openai: Returns a wrapped OpenAI client with automatic tracing.
  • sdk: Returns a raw Langfuse client for manual tracing.

Examples:

Using catalog YAML configuration:

langfuse_trace:
  type: kedro_datasets_experimental.langfuse.LangfuseTraceDataset
  credentials: langfuse_credentials
  mode: openai

Using Python API:

from kedro_datasets_experimental.langfuse import LangfuseTraceDataset

# Basic usage (using default Langfuse cloud)
dataset = LangfuseTraceDataset(
    credentials={
        "public_key": "pk_...",
        "secret_key": "sk_...",  # pragma: allowlist secret
        "openai": {"openai_api_key": "sk-..."},  # pragma: allowlist secret
    },
    mode="openai",
)

# With custom host
dataset = LangfuseTraceDataset(
    credentials={
        "public_key": "pk_...",
        "secret_key": "sk_...",  # pragma: allowlist secret
        "host": "https://custom.langfuse.com",
        "openai": {"openai_api_key": "sk-..."},  # pragma: allowlist secret
    },
    mode="openai",
)

# Load tracing client
client = dataset.load()
response = client.chat.completions.create(...)  # Automatically traced

Validates credentials and sets up appropriate environment variables for Langfuse tracing integration. Environment variables are set immediately during initialization for use by all tracing modes.

Parameters:

  • credentials (dict[str, Any]) –

    Dictionary with Langfuse credentials. Required keys: {public_key, secret_key}. Optional keys: {host} (defaults to Langfuse cloud if not provided). For OpenAI mode, may also include openai section with {openai_api_key, openai_api_base}.

  • mode (Literal['langchain', 'openai', 'sdk'], default: 'sdk' ) –

    Tracing mode - "langchain", "openai", or "sdk" (default).

  • **trace_kwargs (Any, default: {} ) –

    Additional kwargs passed to the tracing client.

Raises:

  • DatasetError

    If required Langfuse credentials are missing or empty.

Examples:

Basic SDK mode (using default Langfuse cloud)

dataset = LangfuseTraceDataset(

... credentials={"public_key": "pk_...", "secret_key": "sk_..."} # pragma: allowlist secret ... )

With custom host

dataset = LangfuseTraceDataset(

... credentials={ ... "public_key": "pk_...", "secret_key": "sk_...", # pragma: allowlist secret ... "host": "https://custom.langfuse.com" ... } ... )

OpenAI mode with API key

dataset = LangfuseTraceDataset(

... credentials={ ... "public_key": "pk_...", "secret_key": "sk_...", # pragma: allowlist secret ... "openai": {"openai_api_key": "sk-...", "openai_api_base": "..."} # pragma: allowlist secret ... }, ... mode="openai" ... )

Note

Sets LANGFUSE_SECRET_KEY, LANGFUSE_PUBLIC_KEY, and LANGFUSE_HOST environment variables from the provided credentials. Also sets OPENAI_API_KEY if provided for OpenAI mode compatibility.

Source code in kedro_datasets_experimental/langfuse/langfuse_trace_dataset.py
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
def __init__(
    self,
    credentials: dict[str, Any],
    mode: Literal["langchain", "openai", "sdk"] = "sdk",
    **trace_kwargs: Any
):
    """Initialize LangfuseTraceDataset and configure environment variables.

    Validates credentials and sets up appropriate environment variables for
    Langfuse tracing integration. Environment variables are set immediately
    during initialization for use by all tracing modes.

    Args:
        credentials: Dictionary with Langfuse credentials. Required keys:
            {public_key, secret_key}. Optional keys: {host} (defaults to
            Langfuse cloud if not provided). For OpenAI mode, may also include
            openai section with {openai_api_key, openai_api_base}.
        mode: Tracing mode - "langchain", "openai", or "sdk" (default).
        **trace_kwargs: Additional kwargs passed to the tracing client.

    Raises:
        DatasetError: If required Langfuse credentials are missing or empty.

    Examples:
        # Basic SDK mode (using default Langfuse cloud)
            dataset = LangfuseTraceDataset(
        ...     credentials={"public_key": "pk_...", "secret_key": "sk_..."}  # pragma: allowlist secret
        ... )

        # With custom host
            dataset = LangfuseTraceDataset(
        ...     credentials={
        ...         "public_key": "pk_...", "secret_key": "sk_...",  # pragma: allowlist secret
        ...         "host": "https://custom.langfuse.com"
        ...     }
        ... )

        # OpenAI mode with API key
            dataset = LangfuseTraceDataset(
        ...     credentials={
        ...         "public_key": "pk_...", "secret_key": "sk_...",  # pragma: allowlist secret
        ...         "openai": {"openai_api_key": "sk-...", "openai_api_base": "..."} # pragma: allowlist secret
        ...     },
        ...     mode="openai"
        ... )

    Note:
        Sets LANGFUSE_SECRET_KEY, LANGFUSE_PUBLIC_KEY, and LANGFUSE_HOST
        environment variables from the provided credentials. Also sets
        OPENAI_API_KEY if provided for OpenAI mode compatibility.
    """
    self._credentials = credentials
    self._mode = mode
    self._trace_kwargs = trace_kwargs
    self._cached_client = None

    # Validate Langfuse credentials before setting environment variables
    self._validate_langfuse_credentials()

    # Set Langfuse environment variables from credentials
    os.environ["LANGFUSE_SECRET_KEY"] = self._credentials["secret_key"]
    os.environ["LANGFUSE_PUBLIC_KEY"] = self._credentials["public_key"]

    if "host" in self._credentials:
        os.environ["LANGFUSE_HOST"] = self._credentials["host"]

_cached_client instance-attribute

_cached_client = None

_credentials instance-attribute

_credentials = credentials

_mode instance-attribute

_mode = mode

_trace_kwargs instance-attribute

_trace_kwargs = trace_kwargs

_build_openai_client_params

_build_openai_client_params()

Validate and build OpenAI client parameters from credentials.

Validates the presence and content of required OpenAI credentials, then constructs parameters dictionary for OpenAI client initialization.

Returns:

  • dict[str, str]

    Dictionary with validated OpenAI client parameters. Always includes

  • dict[str, str]

    'api_key', optionally includes 'base_url' if provided.

Raises:

  • DatasetError

    If OpenAI credentials are missing or invalid.

Examples:

With API key only

params = self._build_openai_client_params()
# Returns: {"api_key": "sk-..."}  # pragma: allowlist secret

With API key and custom base URL

params = self._build_openai_client_params()
# Returns: {"api_key": "sk-...", "base_url": "https://api.custom.com"}  # pragma: allowlist secret
Source code in kedro_datasets_experimental/langfuse/langfuse_trace_dataset.py
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
def _build_openai_client_params(self) -> dict[str, str]:
    """Validate and build OpenAI client parameters from credentials.

    Validates the presence and content of required OpenAI credentials,
    then constructs parameters dictionary for OpenAI client initialization.

    Returns:
        Dictionary with validated OpenAI client parameters. Always includes
        'api_key', optionally includes 'base_url' if provided.

    Raises:
        DatasetError: If OpenAI credentials are missing or invalid.

    Examples:
        # With API key only
            params = self._build_openai_client_params()
            # Returns: {"api_key": "sk-..."}  # pragma: allowlist secret

        # With API key and custom base URL
            params = self._build_openai_client_params()
            # Returns: {"api_key": "sk-...", "base_url": "https://api.custom.com"}  # pragma: allowlist secret
    """
    # Check if openai section exists
    if "openai" not in self._credentials:
        raise DatasetError("OpenAI mode requires 'openai' section in credentials")

    openai_creds = self._credentials["openai"]

    # Check for required API key
    if "openai_api_key" not in openai_creds:
        raise DatasetError("Missing required OpenAI credential: 'openai_api_key'")

    # Validate that API key is not empty
    if not openai_creds["openai_api_key"] or not openai_creds["openai_api_key"].strip():
        raise DatasetError("OpenAI API key cannot be empty")

    # Build validated client parameters
    client_params = {"api_key": openai_creds["openai_api_key"]}

    # Add base_url if provided (optional)
    if "openai_api_base" in openai_creds and openai_creds["openai_api_base"]:
        client_params["base_url"] = openai_creds["openai_api_base"]

    return client_params

_describe

_describe()

Return a description of the dataset for Kedro's internal use.

Returns:

  • dict[str, Any]

    Dictionary containing dataset description with mode and masked credentials.

Source code in kedro_datasets_experimental/langfuse/langfuse_trace_dataset.py
153
154
155
156
157
158
159
def _describe(self) -> dict[str, Any]:
    """Return a description of the dataset for Kedro's internal use.

    Returns:
        Dictionary containing dataset description with mode and masked credentials.
    """
    return {"mode": self._mode, "credentials": "***"}

_validate_langfuse_credentials

_validate_langfuse_credentials()

Validate Langfuse credentials before setting environment variables.

Raises:

  • DatasetError

    If Langfuse credentials are missing or invalid.

Source code in kedro_datasets_experimental/langfuse/langfuse_trace_dataset.py
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
def _validate_langfuse_credentials(self) -> None:
    """Validate Langfuse credentials before setting environment variables.

    Raises:
        DatasetError: If Langfuse credentials are missing or invalid.
    """
    # Validate required keys
    for key in REQUIRED_LANGFUSE_CREDENTIALS:
        if key not in self._credentials:
            raise DatasetError(f"Missing required Langfuse credential: '{key}'")

        # Validate that credential is not empty
        if not self._credentials[key] or not str(self._credentials[key]).strip():
            raise DatasetError(f"Langfuse credential '{key}' cannot be empty")

    # Validate optional keys if present
    for key in OPTIONAL_LANGFUSE_CREDENTIALS:
        if key in self._credentials:
            # If host is provided, it cannot be empty
            if not self._credentials[key] or not str(self._credentials[key]).strip():
                raise DatasetError(f"Langfuse credential '{key}' cannot be empty if provided")

load

load()

Load appropriate tracing client based on configured mode.

Creates and returns the appropriate tracing client for the specified mode. The client is cached after first load to avoid repeated initialisation. All clients use environment variables set during initialisation for authentication.

Returns:

  • Tracing client object based on mode
  • - langchain mode

    CallbackHandler for LangChain integration

  • - openai mode

    Wrapped OpenAI client with automatic tracing

  • - sdk mode

    Raw Langfuse client for manual tracing

Raises:

  • DatasetError

    If OpenAI mode is used but OpenAI credentials are missing or invalid.

Examples:

LangChain mode

dataset = LangfuseTraceDataset(credentials=creds, mode="langchain")
callback = dataset.load()
chain.invoke(input, config={"callbacks": [callback]})

OpenAI mode

dataset = LangfuseTraceDataset(credentials=creds, mode="openai")
client = dataset.load()
response = client.chat.completions.create(model="gpt-4", messages=[...])

SDK mode

dataset = LangfuseTraceDataset(credentials=creds, mode="sdk")
langfuse = dataset.load()
trace = langfuse.trace(name="my-trace")
Source code in kedro_datasets_experimental/langfuse/langfuse_trace_dataset.py
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
def load(self) -> Any:
    """Load appropriate tracing client based on configured mode.

    Creates and returns the appropriate tracing client for the specified mode.
    The client is cached after first load to avoid repeated initialisation.
    All clients use environment variables set during initialisation for authentication.

    Returns:
        Tracing client object based on mode:
        - langchain mode: CallbackHandler for LangChain integration
        - openai mode: Wrapped OpenAI client with automatic tracing
        - sdk mode: Raw Langfuse client for manual tracing

    Raises:
        DatasetError: If OpenAI mode is used but OpenAI credentials are missing or invalid.

    Examples:
        # LangChain mode
            dataset = LangfuseTraceDataset(credentials=creds, mode="langchain")
            callback = dataset.load()
            chain.invoke(input, config={"callbacks": [callback]})

        # OpenAI mode
            dataset = LangfuseTraceDataset(credentials=creds, mode="openai")
            client = dataset.load()
            response = client.chat.completions.create(model="gpt-4", messages=[...])

        # SDK mode
            dataset = LangfuseTraceDataset(credentials=creds, mode="sdk")
            langfuse = dataset.load()
            trace = langfuse.trace(name="my-trace")
    """
    # Return cached client if available
    if self._cached_client is not None:
        return self._cached_client

    # Create and cache the appropriate client
    if self._mode == "langchain":
        from langfuse.langchain import CallbackHandler  # noqa PLC0415
        self._cached_client = CallbackHandler()
    elif self._mode == "openai":
        from langfuse.openai import OpenAI  # noqa PLC0415
        client_params = self._build_openai_client_params()
        self._cached_client = OpenAI(**client_params)
    else:
        try:
            from langfuse import get_client  # noqa PLC0415
            self._cached_client = get_client()
        except ImportError:
            from langfuse import Langfuse  # noqa PLC0415
            self._cached_client = Langfuse()

    return self._cached_client

save

save(data)

Save operation is not supported for tracing datasets.

Parameters:

  • data (Any) –

    Data to save (not used).

Raises:

  • NotImplementedError

    Always raised as tracing datasets are read-only.

Note

LangfuseTraceDataset is designed for providing tracing clients, not for data storage. Use the returned tracing clients to automatically log traces, spans, and generations to Langfuse.

Source code in kedro_datasets_experimental/langfuse/langfuse_trace_dataset.py
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
def save(self, data: Any) -> None:
    """Save operation is not supported for tracing datasets.

    Args:
        data: Data to save (not used).

    Raises:
        NotImplementedError: Always raised as tracing datasets are read-only.

    Note:
        LangfuseTraceDataset is designed for providing tracing clients,
        not for data storage. Use the returned tracing clients to automatically
        log traces, spans, and generations to Langfuse.
    """
    raise NotImplementedError("LangfuseTraceDataset is read-only - it provides tracing clients, not data storage")