Skip to content

opik.TraceDataset

kedro_datasets_experimental.opik.TraceDataset

TraceDataset(credentials, mode='sdk', **trace_kwargs)

Bases: AbstractDataset

Kedro dataset for managing Opik tracing clients and callbacks.

This dataset provides Opik tracing integrations for various AI frameworks or direct SDK usage. During initialization, the dataset automatically configures the Opik environment and credentials to ensure that subsequent traces are correctly logged to the specified workspace and project.

Modes:

  • sdk: Returns a simple namespace-like client exposing the track decorator for manual tracing.
  • openai: Returns an OpenAI client automatically wrapped for Opik tracing.
  • langchain: Returns an OpikTracer callback handler for LangChain integration.
  • autogen: Returns a configured Tracer for AutoGen integration via OTLP (OpenTelemetry Protocol).

Examples

Using catalog YAML configuration:

opik_trace:
  type: kedro_datasets_experimental.opik.TraceDataset
  credentials: opik_credentials
  mode: openai

Using Python API:

from kedro_datasets_experimental.opik import TraceDataset

# Example: OpenAI mode (traced completions)
dataset = TraceDataset(
    credentials={
        "api_key": "opik_api_key",  # pragma: allowlist secret
        "workspace": "my-workspace",
        "project_name": "kedro-demo",
        "openai": {
            "api_key": "sk-...",  # pragma: allowlist secret
            "base_url": "https://api.openai.com/v1",
        },
    },
    mode="openai",
)
client = dataset.load()
response = client.chat.completions.create(
    model="gpt-4o",
    messages=[
        {"role": "system", "content": "You are a helpful assistant."},
        {"role": "user", "content": "Summarize Kedro in one sentence."},
    ],
)

# Example: SDK mode (manual tracing via decorator)
dataset = TraceDataset(
    credentials={
        "api_key": "opik_api_key",  # pragma: allowlist secret
        "workspace": "my-workspace",
        "project_name": "kedro-sdk-demo",
    },
    mode="sdk",
)
client = dataset.load()


@client.track(name="demo_workflow")
def multiply(x: int, y: int) -> int:
    return x * y


print(multiply(3, 4))

# Example: LangChain mode
dataset = TraceDataset(
    credentials={
        "api_key": "opik_api_key",  # pragma: allowlist secret
        "workspace": "my-workspace",
    },
    mode="langchain",
)
tracer = dataset.load()
# Use tracer in your LangChain Runnable or chain.run(callbacks=[tracer])

# Example: AutoGen mode Opik cloud
dataset = TraceDataset(
    credentials={
        "api_key": "opik_api_key",  # pragma: allowlist secret
        "workspace": "my-workspace",
        "project_name": "autogen-demo",
        "endpoint": "https://www.comet.com/opik/api/v1/private/otel/v1/traces",
    },
    mode="autogen",
)
tracer = dataset.load()  # Returns configured Tracer, ready to use

# Option 1: Automatic tracing (LLM calls traced automatically)
agent.invoke(context)  # Traces sent to Opik

# Option 2: Add custom spans with business context (recommended)
with tracer.start_as_current_span("response_generation") as span:
    span.set_attribute("intent", "claim_new")
    span.set_attribute("user_id", "123")
    agent.invoke(context)  # Child spans nested under "response_generation"

# Example: AutoGen mode self-hosted
dataset = TraceDataset(
    credentials={
        "api_key": "opik_api_key",  # pragma: allowlist secret
        "workspace": "my-workspace",
        "project_name": "autogen-demo",
        "url_override": "http://localhost:5173",
        "endpoint": "http://localhost:5173/opik/api/v1/private/otel/v1/traces",
    },
    mode="autogen",
)
tracer = dataset.load()

Notes

  • Opik configuration is global within the Python process. Using multiple TraceDataset instances with different projects in the same session may cause all traces to log to the first configured project.
  • To switch projects, restart the Python process or reload the Opik module.
Source code in kedro_datasets_experimental/opik/trace_dataset.py
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
def __init__(
    self,
    credentials: dict[str, Any],
    mode: Literal["sdk", "openai", "langchain", "autogen"] = "sdk",
    **trace_kwargs: Any,
):
    self._credentials = credentials
    self._mode = mode
    self._trace_kwargs = trace_kwargs
    self._cached_client = None

    self._validate_opik_credentials()
    # Use OTLP directly
    if self._mode != "autogen":
        self._configure_opik()

_cached_client instance-attribute

_cached_client = None

_credentials instance-attribute

_credentials = credentials

_mode instance-attribute

_mode = mode

_trace_kwargs instance-attribute

_trace_kwargs = trace_kwargs

_build_autogen_tracer

_build_autogen_tracer()

Build and return a configured Tracer for AutoGen integration with Opik.

Sets up OpenTelemetry TracerProvider with OTLP exporter to Opik, configures it as the global provider, and returns a ready-to-use Tracer.

Returns:

  • Any

    Tracer configured to export traces to Opik.

Raises:

  • DatasetError

    If required OpenTelemetry dependencies are not installed.

Source code in kedro_datasets_experimental/opik/trace_dataset.py
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
def _build_autogen_tracer(self) -> Any:
    """Build and return a configured Tracer for AutoGen integration with Opik.

    Sets up OpenTelemetry TracerProvider with OTLP exporter to Opik,
    configures it as the global provider, and returns a ready-to-use Tracer.

    Returns:
        Tracer configured to export traces to Opik.

    Raises:
        DatasetError: If required OpenTelemetry dependencies are not installed.
    """
    try:
        from opentelemetry import trace  # noqa: PLC0415
        from opentelemetry.exporter.otlp.proto.http.trace_exporter import (  # noqa: PLC0415
            OTLPSpanExporter,
        )
        from opentelemetry.sdk.trace import TracerProvider  # noqa: PLC0415
        from opentelemetry.sdk.trace.export import (  # noqa: PLC0415
            BatchSpanProcessor,
        )
    except ImportError as exc:
        raise DatasetError(
            "AutoGen mode requires OpenTelemetry. "
            "Install with: pip install opentelemetry-sdk opentelemetry-exporter-otlp-proto-http"
        ) from exc

    # Build headers for Opik authentication
    headers = {
        "Authorization": self._credentials["api_key"],
        "Comet-Workspace": self._credentials["workspace"],
    }

    # Add project name if specified
    project_name = self._credentials.get("project_name")
    if project_name:
        headers["projectName"] = project_name

    # Endpoint is provided by user and validated in _validate_opik_credentials
    endpoint = self._credentials["endpoint"]

    exporter = OTLPSpanExporter(
        endpoint=endpoint,
        headers=headers
    )

    processor = BatchSpanProcessor(exporter)

    # Use existing provider if already set, otherwise create a new one.
    existing_provider = trace.get_tracer_provider()
    if hasattr(existing_provider, "add_span_processor"):
        existing_provider.add_span_processor(processor)
    else:
        provider = TracerProvider()
        provider.add_span_processor(processor)
        trace.set_tracer_provider(provider)

    return trace.get_tracer("opik.autogen")

_configure_opik

_configure_opik()

Initialize Opik global configuration with awareness of project switching.

This function ensures that the Opik SDK is configured using the provided credentials. If an existing configuration is detected (from a prior dataset instance), a warning is emitted since the active project cannot be changed dynamically.

Source code in kedro_datasets_experimental/opik/trace_dataset.py
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
def _configure_opik(self) -> None:
    """Initialize Opik global configuration with awareness of project switching.

    This function ensures that the Opik SDK is configured using the provided credentials.
    If an existing configuration is detected (from a prior dataset instance),
    a warning is emitted since the active project cannot be changed dynamically.
    """
    project_name = self._credentials.get("project_name")

    # Detect an existing configuration and warn if switching projects
    existing_project = os.getenv("OPIK_PROJECT_NAME")
    if existing_project and project_name and project_name != existing_project:
        logger.warning(
            f"Opik is already configured for project '{existing_project}', "
            f"as defined by the environment variable OPIK_PROJECT_NAME. "
            f"The active project cannot be changed dynamically — the new project "
            f"'{project_name}' will be ignored, and all traces will continue "
            f"to be logged under '{existing_project}'.\n"
            f"To log traces to a different project, unset the environment variable "
            f"`OPIK_PROJECT_NAME` before running your pipeline or in the interactive session."
        )
    # Set or update the environment variable (used by Opik SDK)
    elif project_name:
        os.environ["OPIK_PROJECT_NAME"] = project_name

    # Configure Opik (repeated calls are safe but project name won't change)
    configure(
        api_key=self._credentials["api_key"],
        workspace=self._credentials["workspace"],
        url=self._credentials.get("url_override"),
        force=True,
    )

_describe

_describe()

Describe dataset configuration with credentials redacted.

Source code in kedro_datasets_experimental/opik/trace_dataset.py
283
284
285
286
287
288
289
290
291
292
def _describe(self) -> dict[str, Any]:
    """Describe dataset configuration with credentials redacted."""
    creds = self._credentials.copy()
    if "openai" in creds:
        creds["openai"] = {k: "***" for k in creds["openai"].keys()}

    return {
        "mode": self._mode,
        "credentials": {k: "***" for k in creds.keys()},
    }

_load_langchain_tracer

_load_langchain_tracer()

Return an OpikTracer callback for LangChain integration.

Source code in kedro_datasets_experimental/opik/trace_dataset.py
352
353
354
355
356
357
358
359
def _load_langchain_tracer(self) -> Any:
    """Return an OpikTracer callback for LangChain integration."""
    try:
        from opik.integrations.langchain import OpikTracer  # noqa: PLC0415
    except ImportError as e:
        raise DatasetError("Opik LangChain integration not available.") from e

    return OpikTracer(**self._trace_kwargs)

_load_openai_client

_load_openai_client()

Return an OpenAI client wrapped with Opik tracing integration.

Source code in kedro_datasets_experimental/opik/trace_dataset.py
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
def _load_openai_client(self) -> Any:
    """Return an OpenAI client wrapped with Opik tracing integration."""
    try:
        import openai  # noqa: PLC0415
        from opik.integrations.openai import track_openai  # noqa: PLC0415
    except ImportError as e:
        raise DatasetError(
            "OpenAI or Opik OpenAI integration not available. "
            "Ensure you have installed the required dependencies: "
            "pip install openai opik"
        ) from e

    self._validate_openai_client_params()
    client = openai.OpenAI(**self._credentials["openai"])

    project_name = self._trace_kwargs.get("project_name")
    env_project = os.getenv("OPIK_PROJECT_NAME")
    if project_name and env_project and project_name != env_project:
        logger.warning(
            f"Project name mismatch detected: trace_kwargs specifies '{project_name}', "
            f"but environment variable OPIK_PROJECT_NAME is set to '{env_project}'. "
            f"The environment value will take precedence."
        )

    return track_openai(client, project_name=project_name) if project_name else track_openai(client)

_load_sdk_client

_load_sdk_client()

Return a simple SDK client exposing the track decorator.

The Opik SDK does not provide a formal client object for direct usage; instead, the track decorator is imported at the module level. This wrapper mimics a client interface for consistency across modes.

Source code in kedro_datasets_experimental/opik/trace_dataset.py
312
313
314
315
316
317
318
319
320
321
322
323
324
def _load_sdk_client(self) -> Any:
    """Return a simple SDK client exposing the `track` decorator.

    The Opik SDK does not provide a formal client object for direct usage;
    instead, the `track` decorator is imported at the module level.
    This wrapper mimics a client interface for consistency across modes.
    """

    # Simple namespace-like wrapper to mimic a "client"
    class SDKClient:
        track = staticmethod(track)

    return SDKClient()

_validate_openai_client_params

_validate_openai_client_params()

Validate OpenAI credentials in the 'openai' section.

Raises:

  • DatasetError

    If OpenAI credentials are missing or invalid.

Source code in kedro_datasets_experimental/opik/trace_dataset.py
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
def _validate_openai_client_params(self) -> None:
    """Validate OpenAI credentials in the 'openai' section.

    Raises:
        DatasetError: If OpenAI credentials are missing or invalid.
    """
    if "openai" not in self._credentials:
        raise DatasetError(
            "Missing 'openai' section in TraceDataset credentials. "
            "For OpenAI mode, include an 'openai' block inside your credentials."
        )

    openai_creds = self._credentials["openai"]

    api_key = str(openai_creds.get("api_key", "")).strip()
    if not api_key:
        raise DatasetError("Missing or empty OpenAI API key")

    # Validate base_url is not empty if provided
    if "base_url" in openai_creds and not str(openai_creds["base_url"]).strip():
        raise DatasetError("OpenAI credential 'base_url' cannot be empty if provided")

_validate_opik_credentials

_validate_opik_credentials()

Validate Opik credentials before configuring the environment.

Source code in kedro_datasets_experimental/opik/trace_dataset.py
156
157
158
159
160
161
162
163
164
165
166
167
def _validate_opik_credentials(self) -> None:
    """Validate Opik credentials before configuring the environment."""
    validate_credentials(self._credentials, REQUIRED_OPIK_CREDENTIALS, OPTIONAL_OPIK_CREDENTIALS)

    if self._mode == "autogen":
        for key in REQUIRED_OPIK_CREDENTIALS_AUTOGEN:
            if not self._credentials.get(key):
                raise DatasetError(
                    f"AutoGen mode requires '{key}' in credentials "
                    f"(e.g. 'https://www.comet.com/opik/api/v1/private/otel/v1/traces'). "
                    f"Provide the full OTLP endpoint URL for trace export."
                )

load

load()

Load the appropriate tracing client based on the configured mode.

Source code in kedro_datasets_experimental/opik/trace_dataset.py
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
def load(self) -> Any:
    """Load the appropriate tracing client based on the configured mode."""
    if self._cached_client is not None:
        return self._cached_client

    if self._mode == "sdk":
        self._cached_client = self._load_sdk_client()
    elif self._mode == "openai":
        self._cached_client = self._load_openai_client()
    elif self._mode == "langchain":
        self._cached_client = self._load_langchain_tracer()
    elif self._mode == "autogen":
        self._cached_client = self._build_autogen_tracer()
    else:
        raise DatasetError(f"Unsupported mode '{self._mode}' for TraceDataset")

    return self._cached_client

save

save(data)

Saving traces manually is not supported; TraceDataset is read-only.

Source code in kedro_datasets_experimental/opik/trace_dataset.py
361
362
363
def save(self, data: Any) -> None:
    """Saving traces manually is not supported; TraceDataset is read-only."""
    raise NotImplementedError("TraceDataset is read-only.")