diff --git a/client/subscribe.go b/client/subscribe.go index 4bfa48d9..aa17ad22 100644 --- a/client/subscribe.go +++ b/client/subscribe.go @@ -25,6 +25,7 @@ import ( "google.golang.org/grpc/codes" "google.golang.org/grpc/status" + "google.golang.org/protobuf/types/known/structpb" pb "github.com/dapr/dapr/pkg/proto/runtime/v1" "github.com/dapr/go-sdk/service/common" @@ -195,6 +196,8 @@ func (s *Subscription) Receive() (*SubscriptionMessage, error) { RawData: event.GetData(), Topic: event.GetTopic(), PubsubName: event.GetPubsubName(), + TraceID: getStringValueFromExtension(event.GetExtensions(), "traceid"), + TraceParent: getStringValueFromExtension(event.GetExtensions(), "traceparent"), } return &SubscriptionMessage{ @@ -297,3 +300,18 @@ func (s *Subscription) closeStreamOnly() error { } return nil } + +func getStringValueFromExtension(extension *structpb.Struct, key string) string { + if extension == nil { + return "" + } + value, ok := extension.GetFields()[key] + if !ok { + return "" + } + typed, ok := value.GetKind().(*structpb.Value_StringValue) + if !ok { + return "" + } + return typed.StringValue +} diff --git a/service/grpc/topic.go b/service/grpc/topic.go index b3ef4dcc..3ed08673 100644 --- a/service/grpc/topic.go +++ b/service/grpc/topic.go @@ -23,6 +23,7 @@ import ( "google.golang.org/grpc/metadata" "google.golang.org/protobuf/types/known/emptypb" + "google.golang.org/protobuf/types/known/structpb" runtimev1pb "github.com/dapr/dapr/pkg/proto/runtime/v1" "github.com/dapr/go-sdk/service/common" @@ -139,6 +140,8 @@ func (s *Server) OnTopicEvent(ctx context.Context, in *runtimev1pb.TopicEventReq Topic: in.GetTopic(), PubsubName: in.GetPubsubName(), Metadata: getCustomMetadataFromContext(ctx), + TraceID: getStringValueFromExtension(in.GetExtensions(), "traceid"), + TraceParent: getStringValueFromExtension(in.GetExtensions(), "traceparent"), } h := sub.DefaultHandler if in.GetPath() != "" { @@ -183,3 +186,18 @@ func getCustomMetadataFromContext(ctx context.Context) map[string]string { func (s *Server) OnBulkTopicEventAlpha1(ctx context.Context, in *runtimev1pb.TopicEventBulkRequest) (*runtimev1pb.TopicEventBulkResponse, error) { panic("This API callback is not supported.") } + +func getStringValueFromExtension(extension *structpb.Struct, key string) string { + if extension == nil { + return "" + } + value, ok := extension.GetFields()[key] + if !ok { + return "" + } + typed, ok := value.GetKind().(*structpb.Value_StringValue) + if !ok { + return "" + } + return typed.StringValue +} diff --git a/service/grpc/topic_test.go b/service/grpc/topic_test.go index 854362ec..94c4c0c0 100644 --- a/service/grpc/topic_test.go +++ b/service/grpc/topic_test.go @@ -20,6 +20,7 @@ import ( "github.com/stretchr/testify/require" "google.golang.org/grpc/metadata" + "google.golang.org/protobuf/types/known/structpb" "github.com/stretchr/testify/assert" "google.golang.org/protobuf/types/known/emptypb" @@ -163,6 +164,42 @@ func TestTopic(t *testing.T) { require.NoError(t, err) }) + t.Run("topic event with traceid and traceparent", func(t *testing.T) { + sub2 := &common.Subscription{ + PubsubName: "messages", + Topic: "test2", + } + + err := server.AddTopicEventHandler(sub2, + func(ctx context.Context, e *common.TopicEvent) (retry bool, err error) { + assert.Equal(t, "expected-traceid-value", e.TraceID) + assert.Equal(t, "expected-traceparent-value", e.TraceParent) + return false, nil + }) + require.NoError(t, err) + + extensions, err := structpb.NewStruct(map[string]any{ + "traceid": "expected-traceid-value", + "traceparent": "expected-traceparent-value", + }) + require.NoError(t, err) + + in := &runtime.TopicEventRequest{ + Id: "a123", + Source: "test", + Type: "test", + SpecVersion: "v1.0", + DataContentType: "text/plain", + Data: []byte("test"), + Topic: sub2.Topic, + PubsubName: sub2.PubsubName, + Extensions: extensions, + } + ctx := context.Background() + _, err = server.OnTopicEvent(ctx, in) + require.NoError(t, err) + }) + stopTestServer(t, server) }