Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions ddtrace/_trace/product.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ class _Config(En):

enabled = En.v(bool, "enabled", default=True)
global_tags = En.v(dict, "global_tags", parser=parse_tags_str, default={})
runtime_coverage_enabled = En.v(bool, "runtime_coverage_enabled", default=False)


_config = _Config()
Expand All @@ -47,6 +48,27 @@ def start():

_install_trace_methods(config._trace_methods)

# Initialize runtime request coverage if enabled
if _config.runtime_coverage_enabled:
try:
from ddtrace.internal.ci_visibility.runtime_coverage import initialize_runtime_coverage
from ddtrace.internal.ci_visibility.runtime_coverage import is_runtime_coverage_supported
from ddtrace.internal.ci_visibility.runtime_coverage_writer import initialize_runtime_coverage_writer

if is_runtime_coverage_supported():
log.info("Runtime request coverage enabled, initializing...")

# Initialize the coverage collector
if not initialize_runtime_coverage():
log.warning("Runtime coverage collector initialization failed")
# Initialize and start the coverage writer
elif not initialize_runtime_coverage_writer():
log.warning("Runtime coverage writer initialization failed")
else:
log.info("Runtime request coverage initialized successfully (collector + writer)")
except Exception:
log.debug("Failed to initialize runtime request coverage", exc_info=True)

if _config.global_tags:
from ddtrace.trace import tracer

Expand Down
71 changes: 71 additions & 0 deletions ddtrace/_trace/trace_handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -370,6 +370,71 @@ def _on_app_exception(ctx):
req_span.finish()


def _on_wsgi_runtime_coverage_start(ctx: core.ExecutionContext) -> None:
"""Handler for starting runtime request coverage collection on WSGI request start."""
from ddtrace.internal.ci_visibility.runtime_coverage import is_runtime_coverage_supported

if not is_runtime_coverage_supported():
return

try:
from ddtrace.internal.coverage.code import ModuleCodeCollector

collector = ModuleCodeCollector._instance
if collector is not None:
coverage_ctx = collector.CollectInContext()
coverage_ctx.__enter__()
# Store coverage context in the execution context for later cleanup
ctx.set_item("runtime_coverage_ctx", coverage_ctx)
except Exception:
log.debug("Failed to start runtime request coverage", exc_info=True)


def _on_wsgi_runtime_coverage_complete(ctx: core.ExecutionContext, closing_iterable, app_is_iterator) -> None:
"""
Handler for processing runtime request coverage data on WSGI request complete.

This extracts the collected coverage data and sends it to the citestcov intake via
the RuntimeCoverageWriter, which handles batching, encoding, and delivery using the
same infrastructure as test coverage.
"""
coverage_ctx = ctx.get_item("runtime_coverage_ctx")
if coverage_ctx is None:
return

try:
span = ctx.get_item("req_span")
if span is not None:
import os
from pathlib import Path

from ddtrace.internal.ci_visibility.runtime_coverage import build_runtime_coverage_payload
from ddtrace.internal.ci_visibility.runtime_coverage import send_runtime_coverage

# Get root directory for path resolution
root_dir = Path(os.getcwd())

# Build coverage payload
coverage_payload = build_runtime_coverage_payload(
root_dir=root_dir,
trace_id=span.trace_id,
span_id=span.span_id,
)

if coverage_payload:
# Enqueue coverage data to the RuntimeCoverageWriter
# The writer will batch and send it to citestcov intake
send_runtime_coverage(span, coverage_payload["files"])
except Exception:
log.debug("Failed to send runtime request coverage", exc_info=True)
finally:
# Clean up coverage context
try:
coverage_ctx.__exit__(None, None, None)
except Exception:
log.debug("Failed to cleanup coverage context", exc_info=True)


def _on_request_complete(ctx, closing_iterable, app_is_iterator):
middleware = ctx.get_item("middleware")
req_span = ctx.get_item("req_span")
Expand All @@ -394,6 +459,9 @@ def _on_request_complete(ctx, closing_iterable, app_is_iterator):
)
modifier(resp_span, closing_iterable)

# Handle runtime request coverage collection and sending
_on_wsgi_runtime_coverage_complete(ctx, closing_iterable, app_is_iterator)

return _TracedIterable(closing_iterable, resp_span, req_span, wrapped_is_iterator=app_is_iterator)


Expand Down Expand Up @@ -1262,6 +1330,9 @@ def listen():
"psycopg.patched_connect",
):
core.on(f"context.started.{context_name}", _start_span)
# Add runtime coverage handler for WSGI context start
if context_name == "wsgi.__call__":
core.on(f"context.started.{context_name}", _on_wsgi_runtime_coverage_start)

for name in (
"asgi.request",
Expand Down
136 changes: 136 additions & 0 deletions ddtrace/internal/ci_visibility/runtime_coverage.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
"""
Runtime Request Coverage

This module provides functionality for collecting and sending coverage data for individual
runtime requests (e.g., HTTP requests to WSGI/ASGI applications) to the CI Visibility
coverage intake endpoint.

This is based on, but separate from test coverage.
"""

import os
from pathlib import Path
from typing import Dict # noqa:F401
from typing import List # noqa:F401
from typing import Optional # noqa:F401

import ddtrace
from ddtrace.internal.ci_visibility.constants import COVERAGE_TAG_NAME
from ddtrace.internal.ci_visibility.runtime_coverage_writer import get_runtime_coverage_writer
from ddtrace.internal.compat import PYTHON_VERSION_INFO
from ddtrace.internal.coverage.code import ModuleCodeCollector
from ddtrace.internal.logger import get_logger


log = get_logger(__name__)


def is_runtime_coverage_supported() -> bool:
"""Check if runtime request coverage is supported (Python 3.12+)."""
return PYTHON_VERSION_INFO >= (3, 12)


def initialize_runtime_coverage() -> bool:
"""
Initialize runtime request coverage collection.

This should be called at startup if DD_TRACE_RUNTIME_COVERAGE_ENABLED is set.
Returns True if initialization was successful, False otherwise.
"""
if not is_runtime_coverage_supported():
log.warning(
"Runtime request coverage requires Python 3.12+, "
"but Python %d.%d is being used. Coverage collection will be disabled.",
PYTHON_VERSION_INFO[0],
PYTHON_VERSION_INFO[1],
)
return False

try:
from ddtrace.internal.coverage.installer import install

# Determine root directory for coverage collection
root_dir = Path(os.getcwd())

# Install the coverage collector
install(include_paths=[root_dir], collect_import_time_coverage=True)

# Verify instance was created
if ModuleCodeCollector._instance is None:
log.warning("Failed to initialize coverage collector instance")
return False

log.info("Runtime request coverage initialized successfully")
return True

except Exception as e:
log.warning("Failed to initialize runtime request coverage: %s", e, exc_info=True)
return False


def build_runtime_coverage_payload(root_dir: Path, trace_id: int, span_id: int) -> Optional[Dict]:
"""
Build a coverage payload from ModuleCodeCollector for runtime request coverage.

Args:
root_dir: Root directory for relative path resolution
trace_id: Trace ID for correlation
span_id: Span ID for correlation

Returns:
Dictionary with coverage files, trace_id, and span_id, or None if no coverage data
"""
try:
# Get coverage files from ModuleCodeCollector
files = ModuleCodeCollector.report_seen_lines(root_dir, include_imported=True)

if not files:
return None

# Return payload dict with trace/span IDs for correlation
return {
"trace_id": trace_id,
"span_id": span_id,
"files": files,
}

except Exception as e:
log.debug("Failed to build runtime coverage payload: %s", e, exc_info=True)
return None


def send_runtime_coverage(span: ddtrace.trace.Span, files: List[Dict]) -> bool:
"""
Send runtime coverage data to citestcov intake using the RuntimeCoverageWriter.

This follows the natural flow used by test coverage: set coverage data as a struct tag
on the span, then write it to the dedicated coverage writer which handles batching,
encoding, retries, and sending to the correct endpoint.

Args:
span: The request span to attach coverage data to
files: List of file coverage data in CI Visibility format

Returns:
True if enqueued successfully, False otherwise
"""
try:
# Get the global runtime coverage writer
writer = get_runtime_coverage_writer()
if writer is None:
log.debug("Runtime coverage writer not initialized")
return False

# Set coverage data as struct tag on the span (matches TestVisibilityItemBase pattern)
# The coverage encoder will extract this tag when encoding
span._set_struct_tag(COVERAGE_TAG_NAME, {"files": files})

# Write the span to the coverage writer
writer.write([span])

log.debug("Runtime coverage span enqueued for trace_id=%s, span_id=%s", span.trace_id, span.span_id)
return True

except Exception as e:
log.debug("Failed to send runtime coverage: %s", e, exc_info=True)
return False
Loading
Loading