Skip to content

Commit 628221c

Browse files
RKestcopybara-github
authored andcommitted
feat(telemetry): emit entrypoint invoke_workflow span under schema v2
When ADK_TELEMETRY_SCHEMA_VERSION_OPT_IN resolves to 2, replace the legacy top-level 'invocation' span with an entrypoint 'invoke_workflow {entrypoint}' span (entrypoint = root agent or root node name), set is_entrypoint, and emit the gen_ai.invoke_workflow.duration metric alongside it. Entrypoint status is tracked via a contextvar so nested/agent-as-tool workflows report is_entrypoint=false and the span is not double-emitted when the ADK entrypoint is itself a workflow. Schema v1 is unchanged. Co-authored-by: Max Ind <maxind@google.com> PiperOrigin-RevId: 939850650
1 parent 17d5f38 commit 628221c

8 files changed

Lines changed: 1056 additions & 1072 deletions

File tree

src/google/adk/runners.py

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -56,16 +56,22 @@
5656
from .sessions.base_session_service import BaseSessionService
5757
from .sessions.base_session_service import GetSessionConfig
5858
from .sessions.session import Session
59+
from .telemetry import _instrumentation
5960
from .telemetry.tracing import tracer
6061
from .tools.base_toolset import BaseToolset
6162
from .utils._debug_output import print_event
6263

6364
if TYPE_CHECKING:
6465
from .apps.app import App
6566
from .apps.app import ResumabilityConfig
67+
from .workflow._base_node import BaseNode
6668

6769
logger = logging.getLogger('google_adk.' + __name__)
6870

71+
# Silence unused warning.
72+
# tracer is imported for backwards compatibility, to avoid breaking change in the API.
73+
_ = tracer
74+
6975

7076
def _find_active_task_isolation_scope(session) -> Optional[str]:
7177
"""Walk session backwards; find the active paused task agent's scope.
@@ -454,7 +460,9 @@ async def _run_node_async(
454460
Events flow through ic._event_queue via NodeRunner.
455461
"""
456462

457-
with tracer.start_as_current_span('invocation'):
463+
with _instrumentation.record_invocation(
464+
entrypoint_node=node or self.agent, conversation_id=session_id
465+
):
458466
# 1. Setup
459467
session = await self._get_or_create_session(
460468
user_id=user_id, session_id=session_id
@@ -1040,7 +1048,9 @@ async def _run_with_trace(
10401048
new_message: Optional[types.Content] = None,
10411049
invocation_id: Optional[str] = None,
10421050
) -> AsyncGenerator[Event, None]:
1043-
with tracer.start_as_current_span('invocation'):
1051+
with _instrumentation.record_invocation(
1052+
entrypoint_node=self.agent, conversation_id=session_id
1053+
):
10441054
session = await self._get_or_create_session(
10451055
user_id=user_id,
10461056
session_id=session_id,

src/google/adk/telemetry/_instrumentation.py

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import sys
2121
import time
2222
from typing import AsyncIterator
23+
from typing import Iterator
2324
from typing import TYPE_CHECKING
2425

2526
from opentelemetry import trace
@@ -28,13 +29,16 @@
2829
from . import _metrics
2930
from . import tracing
3031
from ..events import event as event_lib
32+
from ._schema_version import resolve_schema_version
33+
from ._schema_version import SCHEMA_VERSION_SEMCONV_ALIGNED
3134

3235
if TYPE_CHECKING:
3336
from ..agents.base_agent import BaseAgent
3437
from ..agents.invocation_context import InvocationContext
3538
from ..models.llm_request import LlmRequest
3639
from ..models.llm_response import LlmResponse
3740
from ..tools.base_tool import BaseTool
41+
from ..workflow._base_node import BaseNode
3842

3943
logger = logging.getLogger("google_adk." + __name__)
4044

@@ -69,6 +73,46 @@ def _get_elapsed_s(
6973
return time.monotonic() - fallback_start
7074

7175

76+
@contextlib.contextmanager
77+
def record_invocation(
78+
entrypoint_node: BaseNode | None,
79+
conversation_id: str,
80+
) -> Iterator[None]:
81+
"""Top-level invocation span for a runner invocation.
82+
83+
Schema v1 emits the legacy ``invocation`` span. Schema v2 replaces it with an
84+
entrypoint ``invoke_workflow {entrypoint}`` span (entrypoint = root agent or
85+
root node name), with ``is_entrypoint`` set and a
86+
``gen_ai.invoke_workflow.duration`` metric -- unless the entrypoint is itself
87+
a workflow, in which case its own node span is the entrypoint
88+
``invoke_workflow`` span and we avoid double-emitting it here.
89+
90+
Args:
91+
entrypoint_node: The runner's root agent/node.
92+
conversation_id: Session/conversation id (stamped on the v2 span).
93+
94+
Yields:
95+
Nothing; the span (if any) is active for the duration of the block.
96+
"""
97+
if resolve_schema_version() < SCHEMA_VERSION_SEMCONV_ALIGNED:
98+
with tracing.tracer.start_as_current_span("invocation"):
99+
yield
100+
return
101+
102+
from . import node_tracing
103+
from ..workflow._workflow import Workflow
104+
105+
if isinstance(entrypoint_node, Workflow):
106+
# The workflow's own node span is the entrypoint `invoke_workflow` span.
107+
yield
108+
return
109+
110+
with node_tracing._use_invoke_workflow_span(
111+
entrypoint_node.name, conversation_id
112+
):
113+
yield
114+
115+
72116
@dataclasses.dataclass
73117
class TelemetryContext:
74118
"""Stores all telemetry related state."""

src/google/adk/telemetry/_metrics.py

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,11 @@
6161
409.6,
6262
],
6363
)
64+
_workflow_invocation_duration = meter.create_histogram(
65+
"gen_ai.invoke_workflow.duration",
66+
unit="s",
67+
description="Duration of workflow invocations.",
68+
)
6469
_tool_execution_duration = meter.create_histogram(
6570
"gen_ai.tool.execution.duration",
6671
unit="s",
@@ -160,6 +165,23 @@ def record_agent_invocation_duration(
160165
_agent_invocation_duration.record(elapsed_s, attributes=attrs)
161166

162167

168+
def record_workflow_invocation_duration(
169+
workflow_name: str,
170+
elapsed_s: float,
171+
is_entrypoint: bool,
172+
error: Exception | None = None,
173+
):
174+
"""Records the duration of a workflow (entrypoint) invocation."""
175+
attrs = {
176+
gen_ai_attributes.GEN_AI_OPERATION_NAME: "invoke_workflow",
177+
"gen_ai.workflow.name": workflow_name,
178+
"gen_ai.workflow.is_entrypoint": is_entrypoint,
179+
}
180+
if error is not None:
181+
attrs[error_attributes.ERROR_TYPE] = type(error).__name__
182+
_workflow_invocation_duration.record(elapsed_s, attributes=attrs)
183+
184+
163185
def record_agent_request_size(
164186
agent_name: str, user_content: types.Content | None
165187
):

src/google/adk/telemetry/node_tracing.py

Lines changed: 115 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -15,24 +15,41 @@
1515
from __future__ import annotations
1616

1717
from collections.abc import AsyncIterator
18+
from collections.abc import Iterator
1819
from contextlib import asynccontextmanager
20+
from contextlib import contextmanager
1921
from dataclasses import dataclass
2022
from dataclasses import field
23+
import time
2124
from typing import TYPE_CHECKING
2225

2326
from opentelemetry import context as context_api
2427
from opentelemetry.semconv._incubating.attributes.gen_ai_attributes import GEN_AI_CONVERSATION_ID
2528
from opentelemetry.semconv._incubating.attributes.gen_ai_attributes import GEN_AI_OPERATION_NAME
26-
from opentelemetry.util.types import Attributes
29+
from opentelemetry.trace import Span
2730

2831
from ..agents.context import Context
2932
from ..workflow._base_node import BaseNode
3033
from .tracing import tracer
3134

3235
if TYPE_CHECKING:
36+
from ..agents.base_agent import BaseAgent
3337
from ..events.event import Event
3438
from ..workflow._workflow import Workflow
3539

40+
# Span/metric attribute flagging whether an `invoke_workflow` span is the
41+
# entrypoint (first workflow) of its invocation.
42+
GEN_AI_WORKFLOW_IS_ENTRYPOINT = "gen_ai.workflow.is_entrypoint"
43+
44+
# OTel-context key recording that an entrypoint workflow is already active. It
45+
# rides along the otel_context propagated to child nodes, so only the first
46+
# workflow invoked within an invocation is marked as the entrypoint -- nested
47+
# workflows (incl. agents-as-tool that spin up their own runner) see the key
48+
# already set and report is_entrypoint=false.
49+
_ENTRYPOINT_WORKFLOW_KEY = context_api.create_key(
50+
"adk-entrypoint-workflow-active"
51+
)
52+
3653

3754
@dataclass(frozen=True)
3855
class TelemetryContext:
@@ -49,12 +66,6 @@ def add_event(self, event: Event) -> None:
4966
self._associated_event_ids.append(event.id)
5067

5168

52-
@dataclass
53-
class _SpanMetadata:
54-
name: str
55-
attributes: Attributes
56-
57-
5869
@asynccontextmanager
5970
async def start_as_current_node_span(
6071
context: Context, node: BaseNode
@@ -83,64 +94,117 @@ async def start_as_current_node_span(
8394
Context with the started span.
8495
"""
8596

86-
span_metadata = _span_metadata(context, node)
87-
if span_metadata is None:
88-
token = context_api.attach(context.telemetry_context.otel_context)
89-
try:
90-
yield TelemetryContext(
91-
otel_context=context.telemetry_context.otel_context
92-
)
93-
finally:
94-
context_api.detach(token)
95-
return
96-
97-
with tracer.start_as_current_span(
98-
span_metadata.name,
99-
attributes=span_metadata.attributes,
100-
context=context.telemetry_context.otel_context,
101-
) as span:
102-
telemetry_context = TelemetryContext(otel_context=context_api.get_current())
103-
yield telemetry_context
104-
105-
if span.is_recording() and len(telemetry_context._associated_event_ids) > 0:
106-
span.set_attribute(
107-
"gcp.vertex.agent.associated_event_ids",
108-
telemetry_context._associated_event_ids,
109-
)
110-
111-
112-
def _span_metadata(context: Context, node: BaseNode) -> _SpanMetadata | None:
11397
from ..agents.base_agent import BaseAgent
11498
from ..workflow._workflow import Workflow
11599

116100
if isinstance(node, BaseAgent):
117-
return None
101+
with _invoke_agent_span(context, node) as tel_ctx:
102+
yield tel_ctx
118103
elif isinstance(node, Workflow):
119-
return _workflow_span_metadata(context, node)
104+
with _invoke_workflow_span(context, node) as tel_ctx:
105+
yield tel_ctx
120106
else:
121-
return _default_node_span_metadata(context, node)
107+
with _invoke_node_span(context, node) as tel_ctx:
108+
yield tel_ctx
109+
110+
111+
@contextmanager
112+
def _invoke_agent_span(
113+
context: Context, agent: BaseAgent
114+
) -> Iterator[TelemetryContext]:
115+
"""Passes through an agent node; agents emit their own `invoke_agent` span."""
116+
del agent
117+
token = context_api.attach(context.telemetry_context.otel_context)
118+
try:
119+
yield TelemetryContext(otel_context=context.telemetry_context.otel_context)
120+
finally:
121+
context_api.detach(token)
122122

123123

124-
def _workflow_span_metadata(
124+
@contextmanager
125+
def _invoke_workflow_span(
125126
context: Context, workflow: Workflow
126-
) -> _SpanMetadata:
127-
return _SpanMetadata(
128-
name=f"invoke_workflow {workflow.name}",
129-
attributes={
130-
GEN_AI_OPERATION_NAME: "invoke_workflow",
131-
"gen_ai.workflow.name": workflow.name,
132-
GEN_AI_CONVERSATION_ID: context.session.id,
133-
},
134-
)
127+
) -> Iterator[TelemetryContext]:
128+
"""Opens an `invoke_workflow` span plus its duration metric for ``node``."""
129+
with _use_invoke_workflow_span(
130+
workflow.name,
131+
context.session.id,
132+
otel_context=context.telemetry_context.otel_context,
133+
) as span:
134+
tel_ctx = TelemetryContext(otel_context=context_api.get_current())
135+
yield tel_ctx
136+
_maybe_set_associated_events(span, tel_ctx)
135137

136138

137-
def _default_node_span_metadata(
139+
@contextmanager
140+
def _invoke_node_span(
138141
context: Context, node: BaseNode
139-
) -> _SpanMetadata:
140-
return _SpanMetadata(
141-
name=f"invoke_node {node.name}",
142+
) -> Iterator[TelemetryContext]:
143+
"""Opens an `invoke_node` span for a plain node."""
144+
with tracer.start_as_current_span(
145+
f"invoke_node {node.name}",
142146
attributes={
143147
GEN_AI_OPERATION_NAME: "invoke_node",
144148
GEN_AI_CONVERSATION_ID: context.session.id,
145149
},
150+
context=context.telemetry_context.otel_context,
151+
) as span:
152+
tel_ctx = TelemetryContext(otel_context=context_api.get_current())
153+
yield tel_ctx
154+
_maybe_set_associated_events(span, tel_ctx)
155+
156+
157+
def _maybe_set_associated_events(
158+
span: Span, telemetry_context: TelemetryContext
159+
) -> None:
160+
"""Stamps the node's associated event IDs onto its span, if any."""
161+
if span.is_recording() and len(telemetry_context._associated_event_ids) > 0:
162+
span.set_attribute(
163+
"gcp.vertex.agent.associated_event_ids",
164+
telemetry_context._associated_event_ids,
165+
)
166+
167+
168+
@contextmanager
169+
def _use_invoke_workflow_span(
170+
workflow_name: str,
171+
conversation_id: str,
172+
*,
173+
otel_context: context_api.Context | None = None,
174+
) -> Iterator[Span]:
175+
"""Opens an `invoke_workflow {workflow_name}` span."""
176+
from . import _metrics
177+
178+
if otel_context is None:
179+
otel_context = context_api.get_current()
180+
# First workflow in the invocation is the entrypoint. The flag rides along the
181+
# otel_context propagated to child nodes, so nested workflows see it set.
182+
is_entrypoint = not context_api.get_value(
183+
_ENTRYPOINT_WORKFLOW_KEY, otel_context
146184
)
185+
if is_entrypoint:
186+
otel_context = context_api.set_value(
187+
_ENTRYPOINT_WORKFLOW_KEY, True, otel_context
188+
)
189+
attributes = {
190+
GEN_AI_OPERATION_NAME: "invoke_workflow",
191+
"gen_ai.workflow.name": workflow_name,
192+
GEN_AI_CONVERSATION_ID: conversation_id,
193+
GEN_AI_WORKFLOW_IS_ENTRYPOINT: is_entrypoint,
194+
}
195+
start_s = time.monotonic()
196+
error: Exception | None = None
197+
try:
198+
with tracer.start_as_current_span(
199+
f"invoke_workflow {workflow_name}",
200+
attributes=attributes,
201+
context=otel_context,
202+
) as span:
203+
yield span
204+
except Exception as e: # pylint: disable=broad-exception-caught
205+
error = e
206+
raise
207+
finally:
208+
_metrics.record_workflow_invocation_duration(
209+
workflow_name, time.monotonic() - start_s, is_entrypoint, error
210+
)

0 commit comments

Comments
 (0)