Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 7 additions & 7 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,13 @@ dependencies = [
# dependencies. Plan to lift into a sibling ``openarmature-otel``
# package at v1.0 launch alongside ``openarmature-eval``.
otel = [
# Upper bound guards against the SDK 2.x release that removes
# ``opentelemetry.sdk._logs.LoggingHandler`` (currently emits a
# DeprecationWarning). Migration to
# ``opentelemetry-instrumentation-logging`` lands in Phase 6.1
# before bumping the upper bound.
"opentelemetry-api>=1.27,<2",
"opentelemetry-sdk>=1.27,<2",
"opentelemetry-api>=1.27,<3",
"opentelemetry-sdk>=1.27,<3",
# Provides ``LoggingHandler`` after the SDK's deprecation
# of ``opentelemetry.sdk._logs.LoggingHandler``. No upper
# bound — the contrib repo cycles fast on minor releases
# below 1.0; revisit when 1.0 lands.
"opentelemetry-instrumentation-logging>=0.62.0b1",
]

[project.urls]
Expand Down
99 changes: 63 additions & 36 deletions src/openarmature/observability/otel/logs.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,36 +8,67 @@
Opt-in by design: users may have their own logging configuration we
shouldn't override silently. Calling ``install_log_bridge(provider)``
explicitly attaches an OTel ``LoggingHandler`` to the root logger
and registers a filter that injects the correlation_id from the
ContextVar.
and installs a process-global ``LogRecord`` factory that injects
the correlation_id from the ContextVar.
"""

from __future__ import annotations

import logging
from typing import TYPE_CHECKING
from typing import TYPE_CHECKING, Any

if TYPE_CHECKING:
from opentelemetry.sdk._logs import LoggerProvider


class _CorrelationIdFilter(logging.Filter):
"""Logging filter that reads the openarmature correlation_id
ContextVar and attaches it to every log record as the
``openarmature.correlation_id`` attribute. Per spec §7 the
attribute MUST appear on every log record emitted during an
invocation."""
# Marker attribute used to detect "this is the OA-installed
# LogRecord factory" so re-calling ``install_log_bridge`` doesn't
# stack a second wrapper on top of the already-installed one.
_FACTORY_MARKER = "_openarmature_correlation_factory"


def _install_correlation_id_factory() -> None:
"""Install a process-global :class:`logging.LogRecord` factory
that reads the openarmature correlation_id ContextVar and
attaches it to every constructed record as the
``openarmature.correlation_id`` attribute.

Why a factory instead of a logger filter: filters added to the
ROOT logger only fire for records originating directly on the
root logger — Python's logging propagation walks ancestors'
HANDLERS but not their filters. A filter on root therefore
misses every record from a child logger (the normal case;
every reasonable user does ``logger = logging.getLogger("module")``).
Spec §7 mandates the attribute appear on records emitted from
"anywhere within an invocation" — the factory hooks at record
construction, fires uniformly for every emit regardless of
which logger originated the record, and chains over any
user-installed factory rather than replacing it.

Idempotent: re-calling skips installation if the current
factory is already the OA-installed one.
"""
from openarmature.observability.correlation import current_correlation_id

current_factory = logging.getLogRecordFactory()
if getattr(current_factory, _FACTORY_MARKER, False):
# Already installed — re-calling is a no-op.
return

def filter(self, record: logging.LogRecord) -> bool:
from openarmature.observability.correlation import current_correlation_id
prior_factory = current_factory

def _correlation_id_factory(*args: Any, **kwargs: Any) -> logging.LogRecord:
record = prior_factory(*args, **kwargs)
cid = current_correlation_id()
if cid is not None:
# Stored on the log record so any formatter/handler that
# reads ``record.__dict__`` (including the OTel
# LoggingHandler) sees it.
setattr(record, "openarmature.correlation_id", cid)
return True
return record

setattr(_correlation_id_factory, _FACTORY_MARKER, True)
logging.setLogRecordFactory(_correlation_id_factory)


def install_log_bridge(
Expand All @@ -47,32 +78,31 @@ def install_log_bridge(
) -> None:
"""Wire the stdlib root logger to the supplied OTel
:class:`LoggerProvider`. Adds a
:class:`opentelemetry.sdk._logs.LoggingHandler` for OTel-native
``trace_id`` / ``span_id`` bridging, AND attaches an
:class:`_CorrelationIdFilter` directly to the ROOT LOGGER (not
the handler) so the ``openarmature.correlation_id`` attribute
lands on every log record emitted during an invocation —
including records consumed by pre-existing stdout / file /
third-party handlers the user already had configured.

Filter-on-the-root-logger placement matters per spec §7:
"log records emitted from anywhere within an invocation MUST
carry ``openarmature.correlation_id``." A handler-level filter
would only modify records flowing through THAT handler, so a
user's existing stdout handler would see records without the
attribute. The root-logger filter applies to every record,
regardless of which handler eventually processes it.
:class:`opentelemetry.instrumentation.logging.handler.LoggingHandler`
for OTel-native ``trace_id`` / ``span_id`` bridging, AND
installs a process-global :class:`logging.LogRecord` factory
that injects ``openarmature.correlation_id`` on every record.

The factory placement matters per spec §7: "log records
emitted from anywhere within an invocation MUST carry
``openarmature.correlation_id``." Filters added to the root
logger fire only for records originating on root — Python's
propagation walks ancestor handlers but not ancestor filters
— so a root-logger filter misses every child-logger record.
The factory hook fires at record construction time, before any
logger or handler dispatch, so every record gets the
attribute regardless of which logger originated it.

Idempotent: re-calling is a no-op (we check for the existing
OA-tagged handler AND for an existing filter instance on the
root logger).
OA-tagged handler on the root logger AND for the OA-installed
factory marker on the current global factory).

The user retains responsibility for providing the
:class:`LoggerProvider` (typically built with their preferred
exporter — :class:`InMemoryLogExporter` for tests,
exporter — :class:`InMemoryLogRecordExporter` for tests,
:class:`OTLPLogExporter` for production).
"""
from opentelemetry.sdk._logs import LoggingHandler # type: ignore[attr-defined]
from opentelemetry.instrumentation.logging.handler import LoggingHandler

root = logging.getLogger()
# Idempotency #1: don't double-add the OTel LoggingHandler.
Expand All @@ -87,11 +117,8 @@ def install_log_bridge(
# marker behavior.
object.__setattr__(handler, "_openarmature_installed", True)
root.addHandler(handler)
# Idempotency #2: don't double-add the correlation_id filter to
# the root logger.
filter_already_installed = any(isinstance(f, _CorrelationIdFilter) for f in root.filters)
if not filter_already_installed:
root.addFilter(_CorrelationIdFilter())
# Idempotency #2: don't stack the LogRecord factory.
_install_correlation_id_factory()


__all__ = [
Expand Down
190 changes: 147 additions & 43 deletions tests/unit/test_observability_otel.py
Original file line number Diff line number Diff line change
Expand Up @@ -303,70 +303,174 @@ async def test_disable_llm_spans_skips_llm_provider_span() -> None:
# ---------------------------------------------------------------------------


def test_log_bridge_filter_injects_correlation_id() -> None:
def test_log_record_factory_injects_correlation_id() -> None:
"""Spec §7: every log record emitted during an invocation MUST
carry ``openarmature.correlation_id``. The bridge's filter reads
the ContextVar and attaches it to the LogRecord."""
carry ``openarmature.correlation_id``. The bridge installs a
process-global :class:`logging.LogRecord` factory (rather than
a logger-level filter) so the attribute lands on every record
regardless of which logger originated it — Python's logging
propagates records up the logger tree's HANDLERS but skips
ancestor FILTERS, so a filter on root would miss any
child-logger emit.

Tests both null-cid (outside invocation) and live-cid paths."""
from openarmature.observability.correlation import (
_reset_correlation_id,
_set_correlation_id,
)
from openarmature.observability.otel.logs import _CorrelationIdFilter

flt = _CorrelationIdFilter()
record = logging.LogRecord(
name="test",
level=logging.INFO,
pathname="",
lineno=0,
msg="hello",
args=None,
exc_info=None,
from openarmature.observability.otel.logs import (
_install_correlation_id_factory,
)
# Outside an invocation: no correlation_id attribute set.
assert flt.filter(record) is True
assert not hasattr(record, "openarmature.correlation_id")

# Inside an invocation: filter attaches the ContextVar value.
record2 = logging.LogRecord(
name="test",
level=logging.INFO,
pathname="",
lineno=0,
msg="hello",
args=None,
exc_info=None,
)
token = _set_correlation_id("my-cid-42")

prior_factory = logging.getLogRecordFactory()
try:
flt.filter(record2)
_install_correlation_id_factory()
factory = logging.getLogRecordFactory()

# Outside an invocation: no correlation_id attribute set.
record = factory(
"any.child.logger",
logging.INFO,
"",
0,
"hello",
None,
None,
)
assert not hasattr(record, "openarmature.correlation_id")

# Inside an invocation: factory attaches the ContextVar
# value to every newly constructed record.
token = _set_correlation_id("my-cid-42")
try:
record2 = factory(
"any.child.logger",
logging.INFO,
"",
0,
"hello",
None,
None,
)
finally:
_reset_correlation_id(token)
assert getattr(record2, "openarmature.correlation_id") == "my-cid-42"
finally:
_reset_correlation_id(token)
assert getattr(record2, "openarmature.correlation_id") == "my-cid-42"
# Restore the prior factory — process-global state.
logging.setLogRecordFactory(prior_factory)


def test_install_log_bridge_is_idempotent() -> None:
"""Re-calling :func:`install_log_bridge` MUST NOT register a
duplicate handler — the bridge owns the only OA-flagged
LoggingHandler on the root logger."""
duplicate handler on the root logger AND MUST NOT stack a
second LogRecord factory wrapper on top of the
already-installed one.

Wrapped in ``warnings.catch_warnings("error")`` to lock in the
Phase 6.1 PR-B migration: this is the canonical surface where
the deprecated ``opentelemetry.sdk._logs.LoggingHandler`` used
to emit a ``DeprecationWarning``. Any future regression that
re-introduces the deprecated path fires here immediately."""
import warnings

from opentelemetry.sdk._logs import LoggerProvider

root = logging.getLogger()
prior_handlers = list(root.handlers)
prior_filters = list(root.filters)
prior_factory = logging.getLogRecordFactory()
try:
with warnings.catch_warnings():
warnings.simplefilter("error")
provider = LoggerProvider()
install_log_bridge(provider)
handler_count_before = len(root.handlers)
factory_after_first = logging.getLogRecordFactory()
install_log_bridge(provider)
handler_count_after = len(root.handlers)
factory_after_second = logging.getLogRecordFactory()
assert handler_count_before == handler_count_after
# Factory identity is preserved across re-calls — no
# second wrapper stacked on top of the first.
assert factory_after_first is factory_after_second
finally:
# install_log_bridge mutates process-wide state; restore so
# this test does not leak into others.
root.handlers[:] = prior_handlers
root.filters[:] = prior_filters
logging.setLogRecordFactory(prior_factory)


def test_log_bridge_exports_records_with_correlation_id() -> None:
"""Spec §7 end-to-end: a log record emitted on a CHILD logger
under ``current_correlation_id`` flows through the bridge to
the OTel ``LoggerProvider``'s exporter with
``openarmature.correlation_id`` populated. Child-logger emit
is the load-bearing case — Python's logging propagates child
records up to root's handlers but skips root's filters, so a
filter-on-root placement (the prior implementation) misses
every reasonable user's logger.

Wrapped in ``warnings.catch_warnings("error")`` so the PR-B
migration's "no more deprecation warning" guarantee is
asserted on the affirmative export path too."""
import warnings

from opentelemetry.sdk._logs import LoggerProvider
from opentelemetry.sdk._logs.export import (
InMemoryLogRecordExporter,
SimpleLogRecordProcessor,
)

from openarmature.observability.correlation import (
_reset_correlation_id,
_set_correlation_id,
)

root = logging.getLogger()
prior_handlers = list(root.handlers)
prior_filters = list(root.filters)
prior_factory = logging.getLogRecordFactory()
try:
provider = LoggerProvider()
install_log_bridge(provider)
handler_count_before = len(root.handlers)
install_log_bridge(provider)
handler_count_after = len(root.handlers)
assert handler_count_before == handler_count_after
with warnings.catch_warnings():
warnings.simplefilter("error")
exporter = InMemoryLogRecordExporter()
provider = LoggerProvider()
provider.add_log_record_processor(SimpleLogRecordProcessor(exporter))
install_log_bridge(provider)

# Emit on a CHILD logger to verify the factory
# placement (which fires uniformly at record
# construction) actually delivers — a filter-on-root
# placement would not.
child_logger = logging.getLogger("openarmature.test_log_bridge.child")
token = _set_correlation_id("test-cid-export-1")
try:
child_logger.warning("hello from %s", "test")
finally:
_reset_correlation_id(token)

# SimpleLogRecordProcessor flushes synchronously, but
# force-flush as a belt-and-suspenders guard so any
# buffered emit lands in the exporter before assertions.
provider.force_flush()
records = exporter.get_finished_logs()
# Filter to the record(s) emitted on our test logger — the
# root may receive other records from concurrent test setup.
ours = [r for r in records if r.log_record.body == "hello from test"]
assert len(ours) == 1, (
f"expected exactly one exported record for our test logger; "
f"got {len(ours)} (full set: {[r.log_record.body for r in records]})"
)
attrs = dict(ours[0].log_record.attributes or {})
assert attrs.get("openarmature.correlation_id") == "test-cid-export-1", (
f"correlation_id MUST appear on the exported OTel LogRecord attributes; "
f"got {attrs.get('openarmature.correlation_id')!r}"
)
finally:
# install_log_bridge mutates the process-wide root logger;
# restore the prior handler + filter set so this test does
# not leak state into others.
root.handlers[:] = prior_handlers
root.filters[:] = prior_filters
logging.setLogRecordFactory(prior_factory)


# ---------------------------------------------------------------------------
Expand Down
Loading
Loading