Skip to content

Commit 1aa8979

Browse files
authored
test: enable telemetry tests in server mode (#3927)
# What does this PR do? - added a server-based test OLTP collector ## Test Plan CI
1 parent 1f9d48c commit 1aa8979

File tree

8 files changed

+505
-96
lines changed

8 files changed

+505
-96
lines changed

scripts/integration-tests.sh

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -284,10 +284,15 @@ if [[ "$STACK_CONFIG" == *"docker:"* && "$COLLECT_ONLY" == false ]]; then
284284
docker stop "$container_name" 2>/dev/null || true
285285
docker rm "$container_name" 2>/dev/null || true
286286

287+
# Configure telemetry collector port shared between host and container
288+
COLLECTOR_PORT=4317
289+
export LLAMA_STACK_TEST_COLLECTOR_PORT="${COLLECTOR_PORT}"
290+
287291
# Build environment variables for docker run
288292
DOCKER_ENV_VARS=""
289293
DOCKER_ENV_VARS="$DOCKER_ENV_VARS -e LLAMA_STACK_TEST_INFERENCE_MODE=$INFERENCE_MODE"
290294
DOCKER_ENV_VARS="$DOCKER_ENV_VARS -e LLAMA_STACK_TEST_STACK_CONFIG_TYPE=server"
295+
DOCKER_ENV_VARS="$DOCKER_ENV_VARS -e OTEL_EXPORTER_OTLP_ENDPOINT=http://localhost:${COLLECTOR_PORT}"
291296

292297
# Pass through API keys if they exist
293298
[ -n "${TOGETHER_API_KEY:-}" ] && DOCKER_ENV_VARS="$DOCKER_ENV_VARS -e TOGETHER_API_KEY=$TOGETHER_API_KEY"

tests/integration/fixtures/common.py

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,35 @@ def wait_for_server_ready(base_url: str, timeout: int = 30, process: subprocess.
8888
return False
8989

9090

91+
def stop_server_on_port(port: int, timeout: float = 10.0) -> None:
92+
"""Terminate any server processes bound to the given port."""
93+
94+
try:
95+
output = subprocess.check_output(["lsof", "-ti", f":{port}"], text=True)
96+
except (subprocess.CalledProcessError, FileNotFoundError):
97+
return
98+
99+
pids = {int(line) for line in output.splitlines() if line.strip()}
100+
if not pids:
101+
return
102+
103+
deadline = time.time() + timeout
104+
for sig in (signal.SIGTERM, signal.SIGKILL):
105+
for pid in list(pids):
106+
try:
107+
os.kill(pid, sig)
108+
except ProcessLookupError:
109+
pids.discard(pid)
110+
111+
while not is_port_available(port) and time.time() < deadline:
112+
time.sleep(0.1)
113+
114+
if is_port_available(port):
115+
return
116+
117+
raise RuntimeError(f"Unable to free port {port} for test server restart")
118+
119+
91120
def get_provider_data():
92121
# TODO: this needs to be generalized so each provider can have a sample provider data just
93122
# like sample run config on which we can do replace_env_vars()
@@ -199,6 +228,10 @@ def instantiate_llama_stack_client(session):
199228
port = int(parts[2]) if len(parts) > 2 else int(os.environ.get("LLAMA_STACK_PORT", DEFAULT_PORT))
200229
base_url = f"http://localhost:{port}"
201230

231+
force_restart = os.environ.get("LLAMA_STACK_TEST_FORCE_SERVER_RESTART") == "1"
232+
if force_restart:
233+
stop_server_on_port(port)
234+
202235
# Check if port is available
203236
if is_port_available(port):
204237
print(f"Starting llama stack server with config '{config_name}' on port {port}...")
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
# Copyright (c) Meta Platforms, Inc. and affiliates.
2+
# All rights reserved.
3+
#
4+
# This source code is licensed under the terms described in the LICENSE file in
5+
# the root directory of this source tree.
6+
7+
"""Telemetry collector helpers for integration tests."""
8+
9+
from .base import BaseTelemetryCollector, SpanStub
10+
from .in_memory import InMemoryTelemetryCollector, InMemoryTelemetryManager
11+
from .otlp import OtlpHttpTestCollector
12+
13+
__all__ = [
14+
"BaseTelemetryCollector",
15+
"SpanStub",
16+
"InMemoryTelemetryCollector",
17+
"InMemoryTelemetryManager",
18+
"OtlpHttpTestCollector",
19+
]
Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,110 @@
1+
# Copyright (c) Meta Platforms, Inc. and affiliates.
2+
# All rights reserved.
3+
#
4+
# This source code is licensed under the terms described in the LICENSE file in
5+
# the root directory of this source tree.
6+
7+
"""Shared helpers for telemetry test collectors."""
8+
9+
from collections.abc import Iterable
10+
from dataclasses import dataclass
11+
from typing import Any
12+
13+
14+
@dataclass
15+
class SpanStub:
16+
name: str
17+
attributes: dict[str, Any]
18+
resource_attributes: dict[str, Any] | None = None
19+
events: list[dict[str, Any]] | None = None
20+
trace_id: str | None = None
21+
span_id: str | None = None
22+
23+
24+
def _value_to_python(value: Any) -> Any:
25+
kind = value.WhichOneof("value")
26+
if kind == "string_value":
27+
return value.string_value
28+
if kind == "int_value":
29+
return value.int_value
30+
if kind == "double_value":
31+
return value.double_value
32+
if kind == "bool_value":
33+
return value.bool_value
34+
if kind == "bytes_value":
35+
return value.bytes_value
36+
if kind == "array_value":
37+
return [_value_to_python(item) for item in value.array_value.values]
38+
if kind == "kvlist_value":
39+
return {kv.key: _value_to_python(kv.value) for kv in value.kvlist_value.values}
40+
return None
41+
42+
43+
def attributes_to_dict(key_values: Iterable[Any]) -> dict[str, Any]:
44+
return {key_value.key: _value_to_python(key_value.value) for key_value in key_values}
45+
46+
47+
def events_to_list(events: Iterable[Any]) -> list[dict[str, Any]]:
48+
return [
49+
{
50+
"name": event.name,
51+
"timestamp": event.time_unix_nano,
52+
"attributes": attributes_to_dict(event.attributes),
53+
}
54+
for event in events
55+
]
56+
57+
58+
class BaseTelemetryCollector:
59+
def get_spans(
60+
self,
61+
expected_count: int | None = None,
62+
timeout: float = 5.0,
63+
poll_interval: float = 0.05,
64+
) -> tuple[Any, ...]:
65+
import time
66+
67+
deadline = time.time() + timeout
68+
min_count = expected_count if expected_count is not None else 1
69+
last_len: int | None = None
70+
stable_iterations = 0
71+
72+
while True:
73+
spans = tuple(self._snapshot_spans())
74+
75+
if len(spans) >= min_count:
76+
if expected_count is not None and len(spans) >= expected_count:
77+
return spans
78+
79+
if last_len == len(spans):
80+
stable_iterations += 1
81+
if stable_iterations >= 2:
82+
return spans
83+
else:
84+
stable_iterations = 1
85+
else:
86+
stable_iterations = 0
87+
88+
if time.time() >= deadline:
89+
return spans
90+
91+
last_len = len(spans)
92+
time.sleep(poll_interval)
93+
94+
def get_metrics(self) -> Any | None:
95+
return self._snapshot_metrics()
96+
97+
def clear(self) -> None:
98+
self._clear_impl()
99+
100+
def _snapshot_spans(self) -> tuple[Any, ...]: # pragma: no cover - interface hook
101+
raise NotImplementedError
102+
103+
def _snapshot_metrics(self) -> Any | None: # pragma: no cover - interface hook
104+
raise NotImplementedError
105+
106+
def _clear_impl(self) -> None: # pragma: no cover - interface hook
107+
raise NotImplementedError
108+
109+
def shutdown(self) -> None:
110+
"""Optional hook for subclasses with background workers."""
Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
# Copyright (c) Meta Platforms, Inc. and affiliates.
2+
# All rights reserved.
3+
#
4+
# This source code is licensed under the terms described in the LICENSE file in
5+
# the root directory of this source tree.
6+
7+
"""In-memory telemetry collector for library-client tests."""
8+
9+
from typing import Any
10+
11+
import opentelemetry.metrics as otel_metrics
12+
import opentelemetry.trace as otel_trace
13+
from opentelemetry import metrics, trace
14+
from opentelemetry.sdk.metrics import MeterProvider
15+
from opentelemetry.sdk.metrics.export import InMemoryMetricReader
16+
from opentelemetry.sdk.trace import TracerProvider
17+
from opentelemetry.sdk.trace.export import SimpleSpanProcessor
18+
from opentelemetry.sdk.trace.export.in_memory_span_exporter import InMemorySpanExporter
19+
20+
import llama_stack.core.telemetry.telemetry as telemetry_module
21+
22+
from .base import BaseTelemetryCollector, SpanStub
23+
24+
25+
class InMemoryTelemetryCollector(BaseTelemetryCollector):
26+
def __init__(self, span_exporter: InMemorySpanExporter, metric_reader: InMemoryMetricReader) -> None:
27+
self._span_exporter = span_exporter
28+
self._metric_reader = metric_reader
29+
30+
def _snapshot_spans(self) -> tuple[Any, ...]:
31+
spans = []
32+
for span in self._span_exporter.get_finished_spans():
33+
trace_id = None
34+
span_id = None
35+
context = getattr(span, "context", None)
36+
if context:
37+
trace_id = f"{context.trace_id:032x}"
38+
span_id = f"{context.span_id:016x}"
39+
else:
40+
trace_id = getattr(span, "trace_id", None)
41+
span_id = getattr(span, "span_id", None)
42+
43+
stub = SpanStub(
44+
span.name,
45+
span.attributes,
46+
getattr(span, "resource", None),
47+
getattr(span, "events", None),
48+
trace_id,
49+
span_id,
50+
)
51+
spans.append(stub)
52+
53+
return tuple(spans)
54+
55+
def _snapshot_metrics(self) -> Any | None:
56+
data = self._metric_reader.get_metrics_data()
57+
if data and data.resource_metrics:
58+
resource_metric = data.resource_metrics[0]
59+
if resource_metric.scope_metrics:
60+
return resource_metric.scope_metrics[0].metrics
61+
return None
62+
63+
def _clear_impl(self) -> None:
64+
self._span_exporter.clear()
65+
self._metric_reader.get_metrics_data()
66+
67+
68+
class InMemoryTelemetryManager:
69+
def __init__(self) -> None:
70+
if hasattr(otel_trace, "_TRACER_PROVIDER_SET_ONCE"):
71+
otel_trace._TRACER_PROVIDER_SET_ONCE._done = False # type: ignore[attr-defined]
72+
if hasattr(otel_metrics, "_METER_PROVIDER_SET_ONCE"):
73+
otel_metrics._METER_PROVIDER_SET_ONCE._done = False # type: ignore[attr-defined]
74+
75+
span_exporter = InMemorySpanExporter()
76+
tracer_provider = TracerProvider()
77+
tracer_provider.add_span_processor(SimpleSpanProcessor(span_exporter))
78+
trace.set_tracer_provider(tracer_provider)
79+
80+
metric_reader = InMemoryMetricReader()
81+
meter_provider = MeterProvider(metric_readers=[metric_reader])
82+
metrics.set_meter_provider(meter_provider)
83+
84+
telemetry_module._TRACER_PROVIDER = tracer_provider
85+
86+
self.collector = InMemoryTelemetryCollector(span_exporter, metric_reader)
87+
self._tracer_provider = tracer_provider
88+
self._meter_provider = meter_provider
89+
90+
def shutdown(self) -> None:
91+
telemetry_module._TRACER_PROVIDER = None
92+
self._tracer_provider.shutdown()
93+
self._meter_provider.shutdown()

0 commit comments

Comments
 (0)