Skip to content
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go/ai/evaluator.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ func NewEvaluator(name string, opts *EvaluatorOptions, fn EvaluatorFunc) Evaluat
Type: "evaluator",
Subtype: "evaluator",
}
_, err := tracing.RunInNewSpan(ctx, spanMetadata, datapoint,
_, err := tracing.RunInNewSpan(ctx, spanMetadata, datapoint, nil,
func(ctx context.Context, input *Example) (*EvaluatorCallbackResponse, error) {
traceId := trace.SpanContextFromContext(ctx).TraceID().String()
spanId := trace.SpanContextFromContext(ctx).SpanID().String()
Expand Down
2 changes: 1 addition & 1 deletion go/ai/generate.go
Original file line number Diff line number Diff line change
Expand Up @@ -330,7 +330,7 @@ func GenerateWithRequest(ctx context.Context, r api.Registry, opts *GenerateActi
Subtype: "util",
}

return tracing.RunInNewSpan(ctx, spanMetadata, req, func(ctx context.Context, req *ModelRequest) (*ModelResponse, error) {
return tracing.RunInNewSpan(ctx, spanMetadata, req, nil, func(ctx context.Context, req *ModelRequest) (*ModelResponse, error) {
var wrappedCb ModelStreamCallback
currentRole := RoleModel
currentIndex := messageIndex
Expand Down
10 changes: 5 additions & 5 deletions go/core/action.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,15 +176,15 @@ func (a *ActionDef[In, Out, Stream]) Name() string { return a.desc.Name }

// Run executes the Action's function in a new trace span.
func (a *ActionDef[In, Out, Stream]) Run(ctx context.Context, input In, cb StreamCallback[Stream]) (output Out, err error) {
r, err := a.runWithTelemetry(ctx, input, cb)
r, err := a.runWithTelemetry(ctx, input, cb, nil)
if err != nil {
return base.Zero[Out](), err
}
return r.Result, nil
}

// Run executes the Action's function in a new trace span.
func (a *ActionDef[In, Out, Stream]) runWithTelemetry(ctx context.Context, input In, cb StreamCallback[Stream]) (output api.ActionRunResult[Out], err error) {
func (a *ActionDef[In, Out, Stream]) runWithTelemetry(ctx context.Context, input In, cb StreamCallback[Stream], telemetryCb func(traceID, spanID string)) (output api.ActionRunResult[Out], err error) {
inputBytes, _ := json.Marshal(input)
logger.FromContext(ctx).Debug("Action.Run",
"name", a.Name(),
Expand Down Expand Up @@ -215,7 +215,7 @@ func (a *ActionDef[In, Out, Stream]) runWithTelemetry(ctx context.Context, input

var traceID string
var spanID string
o, err := tracing.RunInNewSpan(ctx, spanMetadata, input,
o, err := tracing.RunInNewSpan(ctx, spanMetadata, input, telemetryCb,
func(ctx context.Context, input In) (Out, error) {
traceInfo := tracing.SpanTraceInfo(ctx)
traceID = traceInfo.TraceID
Expand Down Expand Up @@ -260,7 +260,7 @@ func (a *ActionDef[In, Out, Stream]) RunJSON(ctx context.Context, input json.Raw
return r.Result, nil
}

// RunJSON runs the action with a JSON input, and returns a JSON result along with telemetry info.
// RunJSONWithTelemetry runs the action with a JSON input, and returns a JSON result along with telemetry info.
func (a *ActionDef[In, Out, Stream]) RunJSONWithTelemetry(ctx context.Context, input json.RawMessage, cb StreamCallback[json.RawMessage]) (*api.ActionRunResult[json.RawMessage], error) {
i, err := base.UnmarshalAndNormalize[In](input, a.desc.InputSchema)
if err != nil {
Expand All @@ -278,7 +278,7 @@ func (a *ActionDef[In, Out, Stream]) RunJSONWithTelemetry(ctx context.Context, i
}
}

r, err := a.runWithTelemetry(ctx, i, scb)
r, err := a.runWithTelemetry(ctx, i, scb, tracing.TelemetryCb(ctx))
if err != nil {
return &api.ActionRunResult[json.RawMessage]{
TraceId: r.TraceId,
Expand Down
4 changes: 2 additions & 2 deletions go/core/flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ func Run[Out any](ctx context.Context, name string, fn func() (Out, error)) (Out
Type: "flowStep",
Subtype: "flowStep",
}
return tracing.RunInNewSpan(ctx, spanMetadata, nil, func(ctx context.Context, _ any) (Out, error) {
return tracing.RunInNewSpan(ctx, spanMetadata, nil, nil, func(ctx context.Context, _ any) (Out, error) {
o, err := fn()
if err != nil {
return base.Zero[Out](), err
Expand All @@ -112,7 +112,7 @@ func (f *Flow[In, Out, Stream]) RunJSON(ctx context.Context, input json.RawMessa
return (*ActionDef[In, Out, Stream])(f).RunJSON(ctx, input, cb)
}

// RunJSON runs the flow with JSON input and streaming callback and returns the output as JSON.
// RunJSONWithTelemetry runs the flow with JSON input and streaming callback and returns the output as JSON along with telemetry info.
func (f *Flow[In, Out, Stream]) RunJSONWithTelemetry(ctx context.Context, input json.RawMessage, cb StreamCallback[json.RawMessage]) (*api.ActionRunResult[json.RawMessage], error) {
return (*ActionDef[In, Out, Stream])(f).RunJSONWithTelemetry(ctx, input, cb)
}
Expand Down
3 changes: 3 additions & 0 deletions go/core/schemas.config
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ SpanStatus omit
TimeEvent omit
TimeEventAnnotation omit
TraceData omit
SpanStartEvent omit
SpanEndEvent omit
TraceEvent omit

GenerationCommonConfig.maxOutputTokens type int
GenerationCommonConfig.topK type int
Expand Down
22 changes: 22 additions & 0 deletions go/core/tracing/tracing.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,10 +173,12 @@ type SpanMetadata struct {

// RunInNewSpan runs f on input in a new span with the provided metadata.
// The metadata contains all span configuration including name, type, labels, etc.
// If telemetryCb is provided, it will be called with the trace ID and span ID as soon as the span is created.
func RunInNewSpan[I, O any](
ctx context.Context,
metadata *SpanMetadata,
input I,
telemetryCb func(traceID, spanID string),
f func(context.Context, I) (O, error),
) (O, error) {
// TODO: support span links.
Expand Down Expand Up @@ -239,6 +241,12 @@ func RunInNewSpan[I, O any](
TraceID: span.SpanContext().TraceID().String(),
SpanID: span.SpanContext().SpanID().String(),
}

// Fire telemetry callback immediately if provided
if telemetryCb != nil {
telemetryCb(sm.TraceInfo.TraceID, sm.TraceInfo.SpanID)
}

defer span.End()
defer func() { span.SetAttributes(sm.attributes()...) }()
ctx = spanMetaKey.NewContext(ctx, sm)
Expand Down Expand Up @@ -371,6 +379,20 @@ func (sm *spanMetadata) attributes() []attribute.KeyValue {
// spanMetaKey is for storing spanMetadatas in a context.
var spanMetaKey = base.NewContextKey[*spanMetadata]()

// telemetryCbKey is the context key for telemetry callbacks.
var telemetryCbKey = base.NewContextKey[func(traceID, spanID string)]()

// WithTelemetryCb returns a context with the telemetry callback attached.
// Used by the reflection server to pass callbacks to actions.
func WithTelemetryCb(ctx context.Context, cb func(traceID, spanID string)) context.Context {
return telemetryCbKey.NewContext(ctx, cb)
}

// TelemetryCb retrieves the telemetry callback from context, or nil if not set.
func TelemetryCb(ctx context.Context) func(traceID, spanID string) {
return telemetryCbKey.FromContext(ctx)
}

// SpanPath returns the path as recorded in the current span metadata.
func SpanPath(ctx context.Context) string {
return spanMetaKey.FromContext(ctx).Path
Expand Down
18 changes: 9 additions & 9 deletions go/core/tracing/tracing_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -314,7 +314,7 @@ func TestRunInNewSpanWithMetadata(t *testing.T) {
ctx := context.Background()
input := "test input"

output, err := RunInNewSpan(ctx, tc.metadata, input,
output, err := RunInNewSpan(ctx, tc.metadata, input, nil,
func(ctx context.Context, input string) (string, error) {
// Verify that span metadata is available in context
sm := spanMetaKey.FromContext(ctx)
Expand Down Expand Up @@ -360,7 +360,7 @@ func TestRunInNewSpanWithTypeConvenience(t *testing.T) {
Subtype: "tool",
}

output, err := RunInNewSpan(ctx, metadata, "input",
output, err := RunInNewSpan(ctx, metadata, "input", nil,
func(ctx context.Context, input string) (string, error) {
sm := spanMetaKey.FromContext(ctx)
if sm == nil {
Expand Down Expand Up @@ -390,10 +390,10 @@ func TestNestedSpanPaths(t *testing.T) {
ctx := context.Background()

// Test nested spans to verify path building
_, err := RunInNewSpan(ctx, &SpanMetadata{Name: "chatFlow", IsRoot: true, Type: "action", Subtype: "flow"}, "input",
_, err := RunInNewSpan(ctx, &SpanMetadata{Name: "chatFlow", IsRoot: true, Type: "action", Subtype: "flow"}, "input", nil,
func(ctx context.Context, input string) (string, error) {
// Nested action span
return RunInNewSpan(ctx, &SpanMetadata{Name: "myTool", IsRoot: false, Type: "action", Subtype: "tool"}, input,
return RunInNewSpan(ctx, &SpanMetadata{Name: "myTool", IsRoot: false, Type: "action", Subtype: "tool"}, input, nil,
func(ctx context.Context, input string) (string, error) {
sm := spanMetaKey.FromContext(ctx)
if sm == nil {
Expand Down Expand Up @@ -425,7 +425,7 @@ func TestIsFailureSourceOnError(t *testing.T) {
_, err := RunInNewSpan(ctx, &SpanMetadata{
Name: "failing-action",
Type: "action",
}, "input", func(ctx context.Context, input string) (string, error) {
}, "input", nil, func(ctx context.Context, input string) (string, error) {
return "", testErr
})

Expand All @@ -446,7 +446,7 @@ func TestRootSpanAutoDetection(t *testing.T) {
Type: "action",
Subtype: "flow",
IsRoot: false, // Even when explicitly set to false, should be overridden
}, "input", func(ctx context.Context, input string) (string, error) {
}, "input", nil, func(ctx context.Context, input string) (string, error) {
sm := spanMetaKey.FromContext(ctx)
if sm == nil {
t.Fatal("Expected span metadata in context")
Expand All @@ -469,7 +469,7 @@ func TestRootSpanAutoDetection(t *testing.T) {
Name: "explicitRootFlow",
Type: "action",
IsRoot: true, // Explicitly set to true
}, "input", func(ctx context.Context, input string) (string, error) {
}, "input", nil, func(ctx context.Context, input string) (string, error) {
sm := spanMetaKey.FromContext(ctx)
if sm == nil {
t.Fatal("Expected span metadata in context")
Expand All @@ -492,13 +492,13 @@ func TestRootSpanAutoDetection(t *testing.T) {
Name: "parentFlow",
Type: "action",
IsRoot: true,
}, "input", func(ctx context.Context, input string) (string, error) {
}, "input", nil, func(ctx context.Context, input string) (string, error) {
// This is a nested span - should NOT be root
_, err := RunInNewSpan(ctx, &SpanMetadata{
Name: "childAction",
Type: "action",
IsRoot: false,
}, input, func(ctx context.Context, input string) (string, error) {
}, input, nil, func(ctx context.Context, input string) (string, error) {
sm := spanMetaKey.FromContext(ctx)
if sm == nil {
t.Fatal("Expected span metadata in context")
Expand Down
Loading
Loading