@@ -74,12 +74,22 @@ class should return the workflow interceptor subclass from
7474 def __init__ (
7575 self ,
7676 tracer : Optional [opentelemetry .trace .Tracer ] = None ,
77+ * ,
78+ always_create_workflow_spans : bool = False ,
7779 ) -> None :
7880 """Initialize a OpenTelemetry tracing interceptor.
7981
8082 Args:
8183 tracer: The tracer to use. Defaults to
8284 :py:func:`opentelemetry.trace.get_tracer`.
85+ always_create_workflow_spans: When false, the default, spans are
86+ only created in workflows when an overarching span from the
87+ client is present. In cases of starting a workflow elsewhere,
88+ e.g. CLI or schedules, a client-created span is not present and
89+ workflow spans will not be created. Setting this to true will
90+ create spans in workflows no matter what, but there is a risk of
91+ them being orphans since they may not have a parent span after
92+ replaying.
8393 """
8494 self .tracer = tracer or opentelemetry .trace .get_tracer (__name__ )
8595 # To customize any of this, users must subclass. We intentionally don't
@@ -90,6 +100,7 @@ def __init__(
90100 self .text_map_propagator : opentelemetry .propagators .textmap .TextMapPropagator = default_text_map_propagator
91101 # TODO(cretz): Should I be using the configured one at the client and activity level?
92102 self .payload_converter = temporalio .converter .PayloadConverter .default
103+ self ._always_create_workflow_spans = always_create_workflow_spans
93104
94105 def intercept_client (
95106 self , next : temporalio .client .OutboundInterceptor
@@ -165,10 +176,15 @@ def _start_as_current_span(
165176
166177 def _completed_workflow_span (
167178 self , params : _CompletedWorkflowSpanParams
168- ) -> _CarrierDict :
179+ ) -> Optional [ _CarrierDict ] :
169180 # Carrier to context, start span, set span as current on context,
170181 # context back to carrier
171182
183+ # If the parent is missing and user hasn't said to always create, do not
184+ # create
185+ if params .parent_missing and not self ._always_create_workflow_spans :
186+ return None
187+
172188 # Extract the context
173189 context = self .text_map_propagator .extract (params .context )
174190 # Create link if there is a span present
@@ -286,7 +302,7 @@ class _InputWithHeaders(Protocol):
286302
287303class _WorkflowExternFunctions (TypedDict ):
288304 __temporal_opentelemetry_completed_span : Callable [
289- [_CompletedWorkflowSpanParams ], _CarrierDict
305+ [_CompletedWorkflowSpanParams ], Optional [ _CarrierDict ]
290306 ]
291307
292308
@@ -299,6 +315,7 @@ class _CompletedWorkflowSpanParams:
299315 link_context : Optional [_CarrierDict ]
300316 exception : Optional [Exception ]
301317 kind : opentelemetry .trace .SpanKind
318+ parent_missing : bool
302319
303320
304321_interceptor_context_key = opentelemetry .context .create_key (
@@ -529,17 +546,13 @@ def _completed_span(
529546 exception : Optional [Exception ] = None ,
530547 kind : opentelemetry .trace .SpanKind = opentelemetry .trace .SpanKind .INTERNAL ,
531548 ) -> None :
532- # If there is no span on the context, we do not create a span
533- if opentelemetry .trace .get_current_span () is opentelemetry .trace .INVALID_SPAN :
534- return None
535-
536549 # If we are replaying and they don't want a span on replay, no span
537550 if temporalio .workflow .unsafe .is_replaying () and not new_span_even_on_replay :
538551 return None
539552
540553 # Create the span. First serialize current context to carrier.
541- context_carrier : _CarrierDict = {}
542- self .text_map_propagator .inject (context_carrier )
554+ new_context_carrier : _CarrierDict = {}
555+ self .text_map_propagator .inject (new_context_carrier )
543556 # Invoke
544557 info = temporalio .workflow .info ()
545558 attributes : Dict [str , opentelemetry .util .types .AttributeValue ] = {
@@ -548,25 +561,27 @@ def _completed_span(
548561 }
549562 if additional_attributes :
550563 attributes .update (additional_attributes )
551- context_carrier = self ._extern_functions [
564+ updated_context_carrier = self ._extern_functions [
552565 "__temporal_opentelemetry_completed_span"
553566 ](
554567 _CompletedWorkflowSpanParams (
555- context = context_carrier ,
568+ context = new_context_carrier ,
556569 name = span_name ,
557570 # Always set span attributes as workflow ID and run ID
558571 attributes = attributes ,
559572 time_ns = temporalio .workflow .time_ns (),
560573 link_context = link_context_carrier ,
561574 exception = exception ,
562575 kind = kind ,
576+ parent_missing = opentelemetry .trace .get_current_span ()
577+ is opentelemetry .trace .INVALID_SPAN ,
563578 )
564579 )
565580
566581 # Add to outbound if needed
567- if add_to_outbound :
582+ if add_to_outbound and updated_context_carrier :
568583 add_to_outbound .headers = self ._context_carrier_to_headers (
569- context_carrier , add_to_outbound .headers
584+ updated_context_carrier , add_to_outbound .headers
570585 )
571586
572587 def _set_on_context (
0 commit comments