Skip to content

langfuse.TraceDataset

kedro_datasets_experimental.langfuse.TraceDataset

TraceDataset(credentials, mode='sdk', **trace_kwargs)

Bases: AbstractDataset

Kedro dataset for managing Langfuse tracing clients and callbacks.

This dataset provides appropriate tracing objects based on mode configuration, enabling seamless integration with different AI frameworks and direct SDK usage. Environment variables are automatically configured during initialization.

Modes:

  • langchain: Returns a CallbackHandler for LangChain integration.
  • openai: Returns a wrapped OpenAI client with automatic tracing.
  • autogen: Returns a configured Tracer for AutoGen integration via OTLP. Note: Langfuse's graph visualisation is in beta and may not render complex multi-agent workflows correctly.
  • sdk: Returns a raw Langfuse client for manual tracing.

Examples:

Using catalog YAML configuration:

langfuse_trace:
  type: kedro_datasets_experimental.langfuse.TraceDataset
  credentials: langfuse_credentials
  mode: openai

Using Python API:

from kedro_datasets_experimental.langfuse import TraceDataset

# Basic usage (using default Langfuse cloud)
dataset = TraceDataset(
    credentials={
        "public_key": "pk_...",
        "secret_key": "sk_...",  # pragma: allowlist secret
        "openai": {"api_key": "sk-..."},  # pragma: allowlist secret
    },
    mode="openai",
)

# With custom host
dataset = TraceDataset(
    credentials={
        "public_key": "pk_...",
        "secret_key": "sk_...",  # pragma: allowlist secret
        "host": "https://custom.langfuse.com",
        "openai": {"api_key": "sk-..."},  # pragma: allowlist secret
    },
    mode="openai",
)

# Load tracing client
client = dataset.load()
response = client.chat.completions.create(...)  # Automatically traced

# AutoGen mode Langfuse cloud
dataset = TraceDataset(
    credentials={
        "public_key": "pk_...",
        "secret_key": "sk_...",  # pragma: allowlist secret
        "endpoint": "https://cloud.langfuse.com/api/public/otel/v1/traces",
    },
    mode="autogen",
)
tracer = dataset.load()

# AutoGen mode self-hosted
dataset = TraceDataset(
    credentials={
        "public_key": "pk_...",
        "secret_key": "sk_...",  # pragma: allowlist secret
        "host": "http://localhost:3000",
        "endpoint": "http://localhost:3000/api/public/otel/v1/traces",
    },
    mode="autogen",
)
tracer = dataset.load()
# Use with AutoGen's runtime logging

Validates credentials and sets up appropriate environment variables for Langfuse tracing integration. Environment variables are set immediately during initialization for use by all tracing modes.

Parameters:

  • credentials (dict[str, Any]) –

    Dictionary with Langfuse credentials. Required: {public_key, secret_key}. Optional: {host} (defaults to Langfuse cloud if not provided). For autogen mode, {endpoint} is required — the full OTLP endpoint URL (e.g. https://cloud.langfuse.com/api/public/otel/v1/traces). For OpenAI mode, include openai section with {api_key, base_url}.

  • mode (Literal['langchain', 'openai', 'autogen', 'sdk'], default: 'sdk' ) –

    Tracing mode - "langchain", "openai", "autogen", or "sdk" (default).

  • **trace_kwargs (Any, default: {} ) –

    Additional kwargs passed to the tracing client.

Raises:

  • DatasetError

    If required Langfuse credentials are missing or empty.

Examples:

>>> # Basic SDK mode (using default Langfuse cloud)
>>> dataset = TraceDataset(
...     credentials={"public_key": "pk_...", "secret_key": "sk_..."}  # pragma: allowlist secret
... )
>>> # With custom host
>>> dataset = TraceDataset(
...     credentials={
...         "public_key": "pk_...",
...         "secret_key": "sk_...",  # pragma: allowlist secret
...         "host": "https://custom.langfuse.com"
...     }
... )
>>> # OpenAI mode with API key
>>> dataset = TraceDataset(
...     credentials={
...         "public_key": "pk_...",
...         "secret_key": "sk_...",  # pragma: allowlist secret
...         "openai": {"api_key": "sk-...", "base_url": "..."}  # pragma: allowlist secret
...     },
...     mode="openai"
... )
>>> # AutoGen mode cloud
>>> dataset = TraceDataset(
...     credentials={
...         "public_key": "pk_...",
...         "secret_key": "sk_...",  # pragma: allowlist secret
...         "endpoint": "https://cloud.langfuse.com/api/public/otel/v1/traces",
...     },
...     mode="autogen"
... )
>>> # AutoGen mode self-hosted
>>> dataset = TraceDataset(
...     credentials={
...         "public_key": "pk_...",
...         "secret_key": "sk_...",  # pragma: allowlist secret
...         "host": "http://localhost:3000",
...         "endpoint": "http://localhost:3000/api/public/otel/v1/traces",
...     },
...     mode="autogen"
... )
Note

Sets LANGFUSE_SECRET_KEY, LANGFUSE_PUBLIC_KEY, and LANGFUSE_HOST environment variables from the provided credentials.

Source code in kedro_datasets_experimental/langfuse/trace_dataset.py
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
def __init__(
    self,
    credentials: dict[str, Any],
    mode: Literal["langchain", "openai", "autogen", "sdk"] = "sdk",
    **trace_kwargs: Any
):
    """Initialize TraceDataset and configure environment variables.

    Validates credentials and sets up appropriate environment variables for
    Langfuse tracing integration. Environment variables are set immediately
    during initialization for use by all tracing modes.

    Args:
        credentials: Dictionary with Langfuse credentials. Required: {public_key, secret_key}.
            Optional: {host} (defaults to Langfuse cloud if not provided).
            For autogen mode, {endpoint} is required — the full OTLP endpoint URL
            (e.g. https://cloud.langfuse.com/api/public/otel/v1/traces).
            For OpenAI mode, include openai section with {api_key, base_url}.
        mode: Tracing mode - "langchain", "openai", "autogen", or "sdk" (default).
        **trace_kwargs: Additional kwargs passed to the tracing client.

    Raises:
        DatasetError: If required Langfuse credentials are missing or empty.

    Examples:
        >>> # Basic SDK mode (using default Langfuse cloud)
        >>> dataset = TraceDataset(
        ...     credentials={"public_key": "pk_...", "secret_key": "sk_..."}  # pragma: allowlist secret
        ... )

        >>> # With custom host
        >>> dataset = TraceDataset(
        ...     credentials={
        ...         "public_key": "pk_...",
        ...         "secret_key": "sk_...",  # pragma: allowlist secret
        ...         "host": "https://custom.langfuse.com"
        ...     }
        ... )

        >>> # OpenAI mode with API key
        >>> dataset = TraceDataset(
        ...     credentials={
        ...         "public_key": "pk_...",
        ...         "secret_key": "sk_...",  # pragma: allowlist secret
        ...         "openai": {"api_key": "sk-...", "base_url": "..."}  # pragma: allowlist secret
        ...     },
        ...     mode="openai"
        ... )

        >>> # AutoGen mode cloud
        >>> dataset = TraceDataset(
        ...     credentials={
        ...         "public_key": "pk_...",
        ...         "secret_key": "sk_...",  # pragma: allowlist secret
        ...         "endpoint": "https://cloud.langfuse.com/api/public/otel/v1/traces",
        ...     },
        ...     mode="autogen"
        ... )

        >>> # AutoGen mode self-hosted
        >>> dataset = TraceDataset(
        ...     credentials={
        ...         "public_key": "pk_...",
        ...         "secret_key": "sk_...",  # pragma: allowlist secret
        ...         "host": "http://localhost:3000",
        ...         "endpoint": "http://localhost:3000/api/public/otel/v1/traces",
        ...     },
        ...     mode="autogen"
        ... )

    Note:
        Sets LANGFUSE_SECRET_KEY, LANGFUSE_PUBLIC_KEY, and LANGFUSE_HOST
        environment variables from the provided credentials.
    """
    self._credentials = credentials
    self._mode = mode
    self._trace_kwargs = trace_kwargs
    self._cached_client = None

    # Validate Langfuse credentials before setting environment variables
    self._validate_langfuse_credentials()

    # Set Langfuse environment variables from credentials
    os.environ["LANGFUSE_SECRET_KEY"] = self._credentials["secret_key"]
    os.environ["LANGFUSE_PUBLIC_KEY"] = self._credentials["public_key"]

    if "host" in self._credentials:
        os.environ["LANGFUSE_HOST"] = self._credentials["host"]

_cached_client instance-attribute

_cached_client = None

_credentials instance-attribute

_credentials = credentials

_mode instance-attribute

_mode = mode

_trace_kwargs instance-attribute

_trace_kwargs = trace_kwargs

_build_autogen_tracer

_build_autogen_tracer()

Build and return a configured Tracer for AutoGen integration with Langfuse.

Sets up OpenTelemetry TracerProvider with OTLP exporter to Langfuse, configures it as the global provider, and returns a ready-to-use Tracer.

Returns:

  • Any

    Tracer configured to export traces to Langfuse.

Raises:

  • DatasetError

    If required OpenTelemetry dependencies are not installed.

Source code in kedro_datasets_experimental/langfuse/trace_dataset.py
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
def _build_autogen_tracer(self) -> Any:
    """Build and return a configured Tracer for AutoGen integration with Langfuse.

    Sets up OpenTelemetry TracerProvider with OTLP exporter to Langfuse,
    configures it as the global provider, and returns a ready-to-use Tracer.

    Returns:
        Tracer configured to export traces to Langfuse.

    Raises:
        DatasetError: If required OpenTelemetry dependencies are not installed.
    """
    try:
        from opentelemetry import trace  # noqa: PLC0415
        from opentelemetry.exporter.otlp.proto.http.trace_exporter import (  # noqa: PLC0415
            OTLPSpanExporter,
        )
        from opentelemetry.sdk.trace import TracerProvider  # noqa: PLC0415
        from opentelemetry.sdk.trace.export import (  # noqa: PLC0415
            BatchSpanProcessor,
        )
    except ImportError as exc:
        raise DatasetError(
            "AutoGen mode requires OpenTelemetry. "
            "Install with: pip install opentelemetry-sdk opentelemetry-exporter-otlp-proto-http"
        ) from exc

    import base64  # noqa: PLC0415

    auth = base64.b64encode(
        f"{self._credentials['public_key']}:{self._credentials['secret_key']}".encode()
    ).decode()

    # Endpoint is provided by user and validated in _validate_langfuse_credentials
    endpoint = self._credentials["endpoint"]

    exporter = OTLPSpanExporter(
        endpoint=endpoint,
        headers={"Authorization": f"Basic {auth}"}
    )

    processor = BatchSpanProcessor(exporter)

    # Use existing provider if already set, otherwise create a new one.
    existing_provider = trace.get_tracer_provider()
    if hasattr(existing_provider, "add_span_processor"):
        existing_provider.add_span_processor(processor)
    else:
        provider = TracerProvider()
        provider.add_span_processor(processor)
        trace.set_tracer_provider(provider)

    return trace.get_tracer("langfuse.autogen")

_describe

_describe()

Return a description of the dataset for Kedro's internal use.

Returns:

  • dict[str, Any]

    Dictionary containing dataset description with mode and masked credentials.

Source code in kedro_datasets_experimental/langfuse/trace_dataset.py
202
203
204
205
206
207
208
def _describe(self) -> dict[str, Any]:
    """Return a description of the dataset for Kedro's internal use.

    Returns:
        Dictionary containing dataset description with mode and masked credentials.
    """
    return {"mode": self._mode, "credentials": "***"}

_validate_langfuse_credentials

_validate_langfuse_credentials()

Validate Langfuse credentials before setting environment variables.

Raises:

  • DatasetError

    If Langfuse credentials are missing or invalid.

Source code in kedro_datasets_experimental/langfuse/trace_dataset.py
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
def _validate_langfuse_credentials(self) -> None:
    """Validate Langfuse credentials before setting environment variables.

    Raises:
        DatasetError: If Langfuse credentials are missing or invalid.
    """
    validate_langfuse_credentials(self._credentials)

    # AutoGen mode has additional required credentials
    if self._mode == "autogen":
        for key in REQUIRED_LANGFUSE_CREDENTIALS_AUTOGEN:
            if not self._credentials.get(key):
                raise DatasetError(
                    f"AutoGen mode requires '{key}' in credentials "
                    f"(e.g. 'https://cloud.langfuse.com/api/public/otel/v1/traces'). "
                    f"Provide the full OTLP endpoint URL for trace export."
                )

_validate_openai_client_params

_validate_openai_client_params()

Validate OpenAI credentials in the 'openai' section.

Raises:

  • DatasetError

    If OpenAI credentials are missing or invalid.

Source code in kedro_datasets_experimental/langfuse/trace_dataset.py
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
def _validate_openai_client_params(self) -> None:
    """Validate OpenAI credentials in the 'openai' section.

    Raises:
        DatasetError: If OpenAI credentials are missing or invalid.
    """
    # Check if openai section exists
    if "openai" not in self._credentials:
        raise DatasetError("OpenAI mode requires 'openai' section in credentials")

    openai_creds = self._credentials["openai"]

    # Check for required API key
    if "api_key" not in openai_creds:
        raise DatasetError("Missing required OpenAI credential: 'api_key'")

    # Validate that API key is not empty
    if not openai_creds["api_key"] or not openai_creds["api_key"].strip():
        raise DatasetError("OpenAI API key cannot be empty")

    # Validate base_url is not empty if provided
    if "base_url" in openai_creds and not str(openai_creds["base_url"]).strip():
        raise DatasetError("OpenAI credential 'base_url' cannot be empty if provided")

load

load()

Load appropriate tracing client based on configured mode.

Creates and returns the appropriate tracing client for the specified mode. The client is cached after first load to avoid repeated initialisation. All clients use environment variables set during initialisation for authentication.

Returns:

  • Tracing client object based on mode
  • - langchain mode

    CallbackHandler for LangChain integration

  • - openai mode

    Wrapped OpenAI client with automatic tracing

  • - autogen mode

    Configured Tracer for OpenTelemetry integration

  • - sdk mode

    Raw Langfuse client for manual tracing

Raises:

  • DatasetError

    If mode-specific dependencies are missing or credentials are invalid.

Examples:

LangChain mode

dataset = TraceDataset(credentials=creds, mode="langchain")
callback = dataset.load()
chain.invoke(input, config={"callbacks": [callback]})

OpenAI mode

dataset = TraceDataset(credentials=creds, mode="openai")
client = dataset.load()
response = client.chat.completions.create(model="gpt-4", messages=[...])

AutoGen mode

dataset = TraceDataset(credentials=creds, mode="autogen")
tracer = dataset.load()  # Returns configured Tracer

# Option 1: Automatic tracing (LLM calls traced automatically)
agent.invoke(context)  # Traces sent to Langfuse

# Option 2: Add custom spans with context
with tracer.start_as_current_span("response_generation") as span:
    span.set_attribute("intent", "claim_new")
    agent.invoke(context)  # Child spans nested under parent

SDK mode

dataset = TraceDataset(credentials=creds, mode="sdk")
langfuse = dataset.load()
trace = langfuse.trace(name="my-trace")
Source code in kedro_datasets_experimental/langfuse/trace_dataset.py
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
def load(self) -> Any:
    """Load appropriate tracing client based on configured mode.

    Creates and returns the appropriate tracing client for the specified mode.
    The client is cached after first load to avoid repeated initialisation.
    All clients use environment variables set during initialisation for authentication.

    Returns:
        Tracing client object based on mode:
        - langchain mode: CallbackHandler for LangChain integration
        - openai mode: Wrapped OpenAI client with automatic tracing
        - autogen mode: Configured Tracer for OpenTelemetry integration
        - sdk mode: Raw Langfuse client for manual tracing

    Raises:
        DatasetError: If mode-specific dependencies are missing or credentials are invalid.

    Examples:
        # LangChain mode
            dataset = TraceDataset(credentials=creds, mode="langchain")
            callback = dataset.load()
            chain.invoke(input, config={"callbacks": [callback]})

        # OpenAI mode
            dataset = TraceDataset(credentials=creds, mode="openai")
            client = dataset.load()
            response = client.chat.completions.create(model="gpt-4", messages=[...])

        # AutoGen mode
            dataset = TraceDataset(credentials=creds, mode="autogen")
            tracer = dataset.load()  # Returns configured Tracer

            # Option 1: Automatic tracing (LLM calls traced automatically)
            agent.invoke(context)  # Traces sent to Langfuse

            # Option 2: Add custom spans with context
            with tracer.start_as_current_span("response_generation") as span:
                span.set_attribute("intent", "claim_new")
                agent.invoke(context)  # Child spans nested under parent

        # SDK mode
            dataset = TraceDataset(credentials=creds, mode="sdk")
            langfuse = dataset.load()
            trace = langfuse.trace(name="my-trace")
    """
    # Return cached client if available
    if self._cached_client is not None:
        return self._cached_client

    # Create and cache the appropriate client
    if self._mode == "langchain":
        from langfuse.langchain import CallbackHandler  # noqa: PLC0415
        self._cached_client = CallbackHandler(**self._trace_kwargs)
    elif self._mode == "openai":
        from langfuse.openai import OpenAI  # noqa: PLC0415
        self._validate_openai_client_params()
        self._cached_client = OpenAI(**self._credentials["openai"])
    elif self._mode == "autogen":
        self._cached_client = self._build_autogen_tracer()
    else:
        try:
            from langfuse import get_client  # noqa: PLC0415
            self._cached_client = get_client()
        except ImportError:
            from langfuse import Langfuse  # noqa: PLC0415
            self._cached_client = Langfuse(**self._trace_kwargs)

    return self._cached_client

save

save(data)

Save operation is not supported for tracing datasets.

Parameters:

  • data (Any) –

    Data to save (not used).

Raises:

  • NotImplementedError

    Always raised as tracing datasets are read-only.

Note

TraceDataset is designed for providing tracing clients, not for data storage. Use the returned tracing clients to automatically log traces, spans, and generations to Langfuse.

Source code in kedro_datasets_experimental/langfuse/trace_dataset.py
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
def save(self, data: Any) -> None:
    """Save operation is not supported for tracing datasets.

    Args:
        data: Data to save (not used).

    Raises:
        NotImplementedError: Always raised as tracing datasets are read-only.

    Note:
        TraceDataset is designed for providing tracing clients,
        not for data storage. Use the returned tracing clients to automatically
        log traces, spans, and generations to Langfuse.
    """
    raise NotImplementedError("TraceDataset is read-only - it provides tracing clients, not data storage")