Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -226,19 +226,32 @@ def __call__(
args,
kwargs,
) -> None:
"""
Inject tracing headers for the current run into OpenAI request kwargs and suppress language-model instrumentation while calling the wrapped function.

If kwargs contains a `run_manager`, this looks up a span for `run_manager.run_id` from the callback manager; if a span is found, tracing headers are injected into `kwargs["extra_headers"]`. If no span is found, a debug message is logged and no headers are injected. The function also sets the context key SUPPRESS_LANGUAGE_MODEL_INSTRUMENTATION_KEY to True as a best-effort fallback; failures to set the context are ignored. Finally, the original wrapped callable is invoked with the (possibly modified) args and kwargs.

Parameters:
kwargs (dict): May contain `run_manager` (used to find a span via `run_manager.run_id`) and `extra_headers` (a dict that will be updated with injected tracing headers if a span is present).

Returns:
The value returned by calling the original `wrapped` callable with the provided args and kwargs.
"""
run_manager = kwargs.get("run_manager")
if run_manager:
run_id = run_manager.run_id
span_holder = self._callback_manager.spans[run_id]

extra_headers = kwargs.get("extra_headers", {})

# Inject tracing context into the extra headers
ctx = set_span_in_context(span_holder.span)
TraceContextTextMapPropagator().inject(extra_headers, context=ctx)

# Update kwargs to include the modified headers
kwargs["extra_headers"] = extra_headers
span_holder = self._callback_manager.spans.get(run_id)

if span_holder:
extra_headers = kwargs.get("extra_headers", {})
ctx = set_span_in_context(span_holder.span)
TraceContextTextMapPropagator().inject(extra_headers, context=ctx)
kwargs["extra_headers"] = extra_headers
else:
logger.debug(
"No span found for run_id %s, skipping header injection",
run_id
)

# In legacy chains like LLMChain, suppressing model instrumentations
# within create_llm_span doesn't work, so this should helps as a fallback
Expand All @@ -251,4 +264,4 @@ def __call__(
# This is not critical for core functionality
pass

return wrapped(*args, **kwargs)
return wrapped(*args, **kwargs)
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
"""Tests for handling external run_ids from systems like LangSmith."""

import logging
from unittest.mock import MagicMock, Mock, patch
from uuid import uuid4

import pytest
from opentelemetry.instrumentation.langchain.callback_handler import (
TraceloopCallbackHandler,
)


@pytest.fixture
def callback_handler(tracer_provider):
"""
Create a TraceloopCallbackHandler bound to the provided tracer provider.

Parameters:
tracer_provider: The OpenTelemetry TracerProvider used to initialize the callback handler.

Returns:
TraceloopCallbackHandler: An initialized callback handler associated with the given tracer_provider.
"""
return TraceloopCallbackHandler(tracer_provider=tracer_provider)


@pytest.fixture
def mock_run_manager():
"""
Create a mock run manager with a UUID `run_id` attribute.

Returns:
manager (Mock): A unittest.mock.Mock instance with a `run_id` attribute set to a generated UUID.
"""
manager = Mock()
manager.run_id = uuid4()
return manager


def test_external_run_id_no_keyerror(
callback_handler, mock_run_manager, instrument_legacy, caplog
):
"""Test that external run_ids (e.g., from LangSmith) don't cause KeyError."""
from opentelemetry.instrumentation.langchain import _OpenAITracingWrapper

# Create the wrapper
wrapper = _OpenAITracingWrapper(callback_handler)

# Mock the wrapped function
mock_wrapped = Mock(return_value="test_result")

# Ensure the run_id is NOT in the spans dictionary
# (simulating an external system like LangSmith creating the run_id)
assert mock_run_manager.run_id not in callback_handler.spans

# Create kwargs with the external run_manager
kwargs = {
"run_manager": mock_run_manager,
"extra_headers": {},
}

# Capture debug logs
with caplog.at_level(logging.DEBUG):
# Call the wrapper - should NOT raise KeyError
result = wrapper(mock_wrapped, None, [], kwargs)

# Verify the function was called successfully
assert result == "test_result"
mock_wrapped.assert_called_once()

# Verify debug log was generated
assert any(
"No span found for run_id" in record.message
and "skipping header injection" in record.message
for record in caplog.records
)

# Verify extra_headers were not modified since there's no span
# (the original empty dict should still be there)
assert "traceparent" not in kwargs["extra_headers"]


def test_internal_run_id_injects_headers(
callback_handler, mock_run_manager, instrument_legacy
):
"""Test that internal run_ids (created by OTEL) get headers injected."""
from opentelemetry.instrumentation.langchain import _OpenAITracingWrapper
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.trace import SpanKind

# Create a real span and add it to the callback handler's spans
tracer = TracerProvider().get_tracer(__name__)
span = tracer.start_span("test_span", kind=SpanKind.CLIENT)

# Create a span holder (mimicking what the callback handler does)
span_holder = Mock()
span_holder.span = span

# Add the span to the callback handler's spans dictionary
callback_handler.spans[mock_run_manager.run_id] = span_holder

# Create the wrapper
wrapper = _OpenAITracingWrapper(callback_handler)

# Mock the wrapped function
mock_wrapped = Mock(return_value="test_result")

# Create kwargs with the internal run_manager
kwargs = {
"run_manager": mock_run_manager,
"extra_headers": {},
}

# Call the wrapper
result = wrapper(mock_wrapped, None, [], kwargs)

# Verify the function was called successfully
assert result == "test_result"
mock_wrapped.assert_called_once()

# Verify headers were injected (traceparent should exist)
assert "traceparent" in kwargs["extra_headers"]
assert kwargs["extra_headers"]["traceparent"] # Should have a value

# Clean up
span.end()


def test_no_run_manager_continues_normally(callback_handler, instrument_legacy):
"""
Ensure the tracing wrapper executes the wrapped function and does not inject trace headers when no `run_manager` is provided in `kwargs`.

Verifies that the wrapped callable is invoked and that `extra_headers` remains without a `traceparent` entry when `kwargs` does not contain a `run_manager`.
"""
from opentelemetry.instrumentation.langchain import _OpenAITracingWrapper

# Create the wrapper
wrapper = _OpenAITracingWrapper(callback_handler)

# Mock the wrapped function
mock_wrapped = Mock(return_value="test_result")

# Create kwargs without run_manager
kwargs = {"extra_headers": {}}

# Call the wrapper - should work fine
result = wrapper(mock_wrapped, None, [], kwargs)

# Verify the function was called successfully
assert result == "test_result"
mock_wrapped.assert_called_once()

# Verify no headers were injected
assert "traceparent" not in kwargs["extra_headers"]
Loading