diff --git a/cmd/relayproxy/config/exporter.go b/cmd/relayproxy/config/exporter.go index 82f0666ad65..f8aaaaad62e 100644 --- a/cmd/relayproxy/config/exporter.go +++ b/cmd/relayproxy/config/exporter.go @@ -4,30 +4,34 @@ import ( "fmt" "github.com/thomaspoignant/go-feature-flag/exporter/kafkaexporter" + "github.com/thomaspoignant/go-feature-flag/exporter/opentelemetryexporter" "github.com/xitongsys/parquet-go/parquet" ) // ExporterConf contains all the field to configure an exporter +// The parquet compression line is too long. +// nolint:lll type ExporterConf struct { - Kind ExporterKind `mapstructure:"kind" koanf:"kind"` - OutputDir string `mapstructure:"outputDir" koanf:"outputdir"` - Format string `mapstructure:"format" koanf:"format"` - Filename string `mapstructure:"filename" koanf:"filename"` - CsvTemplate string `mapstructure:"csvTemplate" koanf:"csvtemplate"` - Bucket string `mapstructure:"bucket" koanf:"bucket"` - Path string `mapstructure:"path" koanf:"path"` - EndpointURL string `mapstructure:"endpointUrl" koanf:"endpointurl"` - Secret string `mapstructure:"secret" koanf:"secret"` - Meta map[string]string `mapstructure:"meta" koanf:"meta"` - LogFormat string `mapstructure:"logFormat" koanf:"logformat"` - FlushInterval int64 `mapstructure:"flushInterval" koanf:"flushinterval"` - MaxEventInMemory int64 `mapstructure:"maxEventInMemory" koanf:"maxeventinmemory"` - ParquetCompressionCodec string `mapstructure:"parquetCompressionCodec" koanf:"parquetcompressioncodec"` - Headers map[string][]string `mapstructure:"headers" koanf:"headers"` - QueueURL string `mapstructure:"queueUrl" koanf:"queueurl"` - Kafka kafkaexporter.Settings `mapstructure:"kafka" koanf:"kafka"` + Kind ExporterKind `mapstructure:"kind" koanf:"kind"` + OutputDir string `mapstructure:"outputDir" koanf:"outputdir"` + Format string `mapstructure:"format" koanf:"format"` + Filename string `mapstructure:"filename" koanf:"filename"` + CsvTemplate string `mapstructure:"csvTemplate" koanf:"csvtemplate"` + Bucket string `mapstructure:"bucket" koanf:"bucket"` + Path string `mapstructure:"path" koanf:"path"` + EndpointURL string `mapstructure:"endpointUrl" koanf:"endpointurl"` + Secret string `mapstructure:"secret" koanf:"secret"` + Meta map[string]string `mapstructure:"meta" koanf:"meta"` + LogFormat string `mapstructure:"logFormat" koanf:"logformat"` + FlushInterval int64 `mapstructure:"flushInterval" koanf:"flushinterval"` + MaxEventInMemory int64 `mapstructure:"maxEventInMemory" koanf:"maxeventinmemory"` + ParquetCompressionCodec string `mapstructure:"parquetCompressionCodec" koanf:"parquetcompressioncodec"` + Headers map[string][]string `mapstructure:"headers" koanf:"headers"` + QueueURL string `mapstructure:"queueUrl" koanf:"queueurl"` + Kafka kafkaexporter.Settings `mapstructure:"kafka" koanf:"kafka"` ProjectID string `mapstructure:"projectID" koanf:"projectid"` Topic string `mapstructure:"topic" koanf:"topic"` + OpenTel opentelemetryexporter.Settings `mapstructure:"opentel" koanf:"opentel"` } func (c *ExporterConf) IsValid() error { @@ -60,6 +64,10 @@ func (c *ExporterConf) IsValid() error { return fmt.Errorf("invalid exporter: \"projectID\" and \"topic\" are required for kind \"%s\"", c.Kind) } + if c.Kind == OpenTelExporter && (c.OpenTel.OpentelSettings.URI == "") { + return fmt.Errorf("invalid exporter: \"opentel.uri\" is required for kind \"%s\"", c.Kind) + } + return nil } @@ -74,13 +82,14 @@ const ( SQSExporter ExporterKind = "sqs" KafkaExporter ExporterKind = "kafka" PubSubExporter ExporterKind = "pubsub" + OpenTelExporter ExporterKind = "opentel" ) // IsValid is checking if the value is part of the enum func (r ExporterKind) IsValid() error { switch r { case FileExporter, WebhookExporter, LogExporter, S3Exporter, GoogleStorageExporter, SQSExporter, KafkaExporter, - PubSubExporter: + OpenTelExporter, PubSubExporter: return nil } return fmt.Errorf("invalid exporter: kind \"%s\" is not supported", r) diff --git a/cmd/relayproxy/config/exporter_test.go b/cmd/relayproxy/config/exporter_test.go index a47295c4505..40adb016a47 100644 --- a/cmd/relayproxy/config/exporter_test.go +++ b/cmd/relayproxy/config/exporter_test.go @@ -5,6 +5,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/thomaspoignant/go-feature-flag/cmd/relayproxy/config" + "github.com/thomaspoignant/go-feature-flag/exporter/opentelemetryexporter" ) func TestExporterConf_IsValid(t *testing.T) { @@ -23,6 +24,7 @@ func TestExporterConf_IsValid(t *testing.T) { QueueURL string ProjectID string Topic string + OpenTel opentelemetryexporter.Settings } tests := []struct { name string @@ -186,6 +188,35 @@ func TestExporterConf_IsValid(t *testing.T) { wantErr: true, errValue: "invalid exporter: \"projectID\" and \"topic\" are required for kind \"pubsub\"", }, + { + name: "kind OpenTel with no creds", + fields: fields{ + Kind: "opentel", + OpenTel: opentelemetryexporter.Settings{OpentelSettings: opentelemetryexporter.OpenTelSettings{URI: "localhost"}}, + }, + wantErr: false, + }, + { + name: "kind OpenTel with creds", + fields: fields{ + Kind: "opentel", + OpenTel: opentelemetryexporter.Settings{ + OpentelSettings: opentelemetryexporter.OpenTelSettings{URI: "localhost", CACertPath: "/tmp/does-not-exist"}, + }, + }, + wantErr: false, + }, + { + name: "kind OpenTel with no uri", + fields: fields{ + Kind: "opentel", + OpenTel: opentelemetryexporter.Settings{ + OpentelSettings: opentelemetryexporter.OpenTelSettings{CACertPath: "/tmp/does-not-exist"}, + }, + }, + wantErr: true, + errValue: "invalid exporter: \"opentel.uri\" is required for kind \"opentel\"", + }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { @@ -204,6 +235,7 @@ func TestExporterConf_IsValid(t *testing.T) { QueueURL: tt.fields.QueueURL, ProjectID: tt.fields.ProjectID, Topic: tt.fields.Topic, + OpenTel: tt.fields.OpenTel, } err := c.IsValid() assert.Equal(t, tt.wantErr, err != nil) diff --git a/exporter/opentelemetryexporter/exporter.go b/exporter/opentelemetryexporter/exporter.go new file mode 100644 index 00000000000..999f1a1b073 --- /dev/null +++ b/exporter/opentelemetryexporter/exporter.go @@ -0,0 +1,349 @@ +package opentelemetryexporter + +import ( + "context" + "crypto/tls" + "crypto/x509" + "errors" + "fmt" + "os" + "reflect" + "strings" + "time" + + "github.com/thomaspoignant/go-feature-flag/exporter" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/codes" + "go.opentelemetry.io/otel/exporters/otlp/otlptrace" + "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc" + "go.opentelemetry.io/otel/exporters/stdout/stdouttrace" + "go.opentelemetry.io/otel/propagation" + "go.opentelemetry.io/otel/sdk/resource" + sdktrace "go.opentelemetry.io/otel/sdk/trace" + semconv "go.opentelemetry.io/otel/semconv/v1.17.0" + "go.opentelemetry.io/otel/trace" + "google.golang.org/grpc" + "google.golang.org/grpc/backoff" + "google.golang.org/grpc/credentials" + "google.golang.org/grpc/credentials/insecure" + + "log" +) + +const ( + serviceName = "go-feature-flag" + ServiceVersion = "0.0.1" + instrumentationName = "github.com/thomaspoignant/go-feature-flag" + instrumentationVersion = "0.0.1" +) + +type Exporter struct { + resource *resource.Resource + processors []*sdktrace.SpanProcessor +} + +func (*Exporter) IsBulk() bool { + return true +} + +type OpenTelSettings struct { + URI string `json:"uri"` + CACertPath string `json:"cacertpath"` +} + +type Settings struct { + OpentelSettings OpenTelSettings `json:"opentelsettings"` + Resource map[string]string `json:"resource"` +} + +type ExporterOption func(*Exporter) error + +func getTracer() trace.Tracer { + return otel.GetTracerProvider().Tracer( + instrumentationName, + trace.WithInstrumentationVersion(instrumentationVersion), + trace.WithSchemaURL(semconv.SchemaURL)) +} + +func getTransportCredentials(caCertPath string) (credentials.TransportCredentials, error) { + if caCertPath != "" { + _, err := os.Stat(caCertPath) + if err != nil { + return nil, err + } + + pemServerCA, err := os.ReadFile(caCertPath) + if err != nil { + return nil, err + } + + certPool := x509.NewCertPool() + if !certPool.AppendCertsFromPEM(pemServerCA) { + return nil, fmt.Errorf("failed to add server CA's certificate") + } + + // I don't like setting the minversion, but gosec will complain otherwise + config := &tls.Config{ + RootCAs: certPool, + MinVersion: tls.VersionTLS12, + } + + return credentials.NewTLS(config), nil + } + return insecure.NewCredentials(), nil +} + +func NewExporter(settings Settings) (*Exporter, error) { + processorOptions := make([]grpc.DialOption, 0) + exporterOptions := make([]ExporterOption, 0) + + connectParams := grpc.ConnectParams{ + Backoff: backoff.Config{BaseDelay: time.Second * 2, + Multiplier: 2.0, + MaxDelay: time.Second * 16}} + + credentials, err := getTransportCredentials(settings.OpentelSettings.CACertPath) + if err != nil { + return nil, err + } + + processorOptions = append(processorOptions, grpc.WithConnectParams(connectParams)) + processorOptions = append(processorOptions, grpc.WithTransportCredentials(credentials)) + + exporterOptions = append(exporterOptions, WithResource(defaultResource())) + if len(settings.Resource) > 0 { + exporterOptions = append(exporterOptions, WithResource(mapToResource(settings.Resource))) + } + + otelProcessor, err := OtelCollectorBatchSpanProcessor(settings.OpentelSettings.URI, + processorOptions..., + ) + + exporterOptions = append(exporterOptions, WithBatchSpanProcessors(&otelProcessor)) + + if err != nil { + return nil, err + } + + exp, err := NewExporterFromOpts(exporterOptions...) + + if err != nil { + return nil, err + } + + return exp, nil +} + +func NewExporterFromOpts(opts ...ExporterOption) (*Exporter, error) { + exporter := Exporter{} + for _, opt := range opts { + if err := opt(&exporter); err != nil { + return nil, err + } + } + return &exporter, nil +} + +func WithResource(customResource *resource.Resource) ExporterOption { + return func(exp *Exporter) error { + mergedResource, err := resource.Merge(customResource, defaultResource()) + if err != nil { + return errors.New("unable to merge resources") + } + exp.resource = mergedResource + return nil + } +} + +func WithBatchSpanProcessors(processors ...*sdktrace.SpanProcessor) ExporterOption { + return func(exp *Exporter) error { + exp.processors = processors + return nil + } +} + +func defaultResource() *resource.Resource { + return resource.NewWithAttributes( + semconv.SchemaURL, + semconv.ServiceName(serviceName), + semconv.ServiceVersion(ServiceVersion), + ) +} + +func mapToResource(attrs map[string]string) *resource.Resource { + customKeyValues := make([]attribute.KeyValue, 0) + for ck, cv := range attrs { + customKeyValues = append(customKeyValues, + attribute.KeyValue{Key: attribute.Key(ck), Value: attribute.StringValue(cv)}) + } + return resource.NewWithAttributes( + semconv.SchemaURL, customKeyValues..., + ) +} + +func otelExporter(uri string, opts ...grpc.DialOption) (*otlptrace.Exporter, error) { + // TODO creds + + if len(opts) == 0 { + return nil, errors.New("need credentials option") + } + + conn, err := grpc.NewClient(uri, opts...) + if err != nil { + return nil, fmt.Errorf("failed to create gRPC connection to collector: %w", err) + } + + // Set up a trace exporter + traceExporter, err := otlptracegrpc.New(context.Background(), otlptracegrpc.WithGRPCConn(conn)) + if err != nil { + return nil, fmt.Errorf("failed to create trace exporter: %w", err) + } + return traceExporter, nil +} + +func OtelCollectorBatchSpanProcessor(uri string, opts ...grpc.DialOption) (sdktrace.SpanProcessor, error) { + otelExporter, err := otelExporter(uri, opts...) + if err != nil { + return nil, err + } + + return sdktrace.NewBatchSpanProcessor(otelExporter), nil +} + +func newstdoutExporter(options ...stdouttrace.Option) (*stdouttrace.Exporter, error) { + exp, err := stdouttrace.New(options...) + if err != nil { + return nil, fmt.Errorf("failed to initialize stdouttrace exporter: %w", err) + } + return exp, nil +} + +func stdoutBatchSpanProcessor(options ...stdouttrace.Option) (sdktrace.SpanProcessor, error) { + inMemoryExporter, err := newstdoutExporter(options...) + if err != nil { + return nil, err + } + + return sdktrace.NewBatchSpanProcessor(inMemoryExporter), nil +} + +func valueToAttributes(data interface{}, parentName string, maxDepth int, recursionDepth int) []attribute.KeyValue { + parentName = strings.ToLower(parentName) + reflectedAttributes := make([]attribute.KeyValue, 0) + + if recursionDepth > maxDepth { + return reflectedAttributes + } + + targetType := reflect.TypeOf(data) + targetValue := reflect.ValueOf(data) + kind := targetValue.Kind() + + switch kind { + case reflect.Float32, reflect.Float64: + reflectedAttributes = append(reflectedAttributes, attribute.Float64(parentName, targetValue.Float())) + case reflect.Int, reflect.Int8, reflect.Int16, + reflect.Int32, reflect.Int64: + reflectedAttributes = append(reflectedAttributes, attribute.Int64(parentName, targetValue.Int())) + case reflect.Bool: + reflectedAttributes = append(reflectedAttributes, attribute.Bool(parentName, targetValue.Bool())) + case reflect.String: + reflectedAttributes = append(reflectedAttributes, attribute.String(parentName, targetValue.String())) + + case reflect.Struct: + for i := 0; i < targetType.NumField(); i++ { + name := targetType.Field(i).Name + fv := targetValue.Field(i) + + if !fv.CanInterface() { + continue + } + + subAttributes := valueToAttributes(fv.Interface(), parentName+"."+name, maxDepth, recursionDepth+1) + reflectedAttributes = append(reflectedAttributes, subAttributes...) + } + default: + } + + return reflectedAttributes +} + +func featureEventToAttributes(featureEvent exporter.FeatureEvent) []attribute.KeyValue { + // https://opentelemetry.io/docs/specs/semconv/feature-flags/feature-flags-spans/ + + attributes := make([]attribute.KeyValue, 0) + attributes = append(attributes, attribute.String("kind", featureEvent.Kind), + attribute.String("contextKind", featureEvent.ContextKind), + attribute.String("userKey", featureEvent.UserKey), + attribute.Int64("creationDate", featureEvent.CreationDate), + attribute.String("key", featureEvent.Key), + attribute.String("variation", featureEvent.Variation), + attribute.Bool("default", featureEvent.Default), + attribute.String("version", featureEvent.Version), + attribute.String("source", featureEvent.Source)) + + valueAttrs := valueToAttributes(featureEvent.Value, "value", 2, 0) + attributes = append(attributes, valueAttrs...) + + return attributes +} + +func initProvider(exp *Exporter) (func(context.Context) error, error) { + // The default resource will win on merge + + tracerProvider := sdktrace.NewTracerProvider( + sdktrace.WithSampler(sdktrace.AlwaysSample()), + sdktrace.WithResource(exp.resource), + ) + + if len(exp.processors) == 0 { + return nil, errors.New("no processors provided") + } + + for _, spanProcessor := range exp.processors { + tracerProvider.RegisterSpanProcessor(*spanProcessor) + } + + otel.SetTracerProvider(tracerProvider) + + // set global propagator to tracecontext (the default is no-op). + otel.SetTextMapPropagator(propagation.TraceContext{}) + + // Shutdown will flush any remaining spans and shut down the exporter. + return func(ctx context.Context) error { + return tracerProvider.ForceFlush(ctx) + }, nil +} + +func eventToSpan(ctx context.Context, featureEvent exporter.FeatureEvent) { + attributes := featureEventToAttributes(featureEvent) + _, span := getTracer().Start(ctx, featureEvent.Kind) + defer span.End() + span.SetAttributes(attributes...) + // How can we detect feature-flag evaluation failure? + span.SetStatus(codes.Ok, "n/a") +} +func eventsToSpans(ctx context.Context, featureEvents []exporter.FeatureEvent) { + for _, featureEvent := range featureEvents { + eventToSpan(ctx, featureEvent) + } +} + +func (exporter *Exporter) Export(ctx context.Context, _ *log.Logger, featureEvents []exporter.FeatureEvent) error { + shutdown, err := initProvider(exporter) + if err != nil { + log.Fatal(err) + } + defer func() { + if err := shutdown(ctx); err != nil { + log.Fatal("failed to shutdown TracerProvider: %w", err) + } + }() + + ctx, span := getTracer().Start(ctx, "feature-flag-evaluation") + defer span.End() + eventsToSpans(ctx, featureEvents) + + return nil +} diff --git a/exporter/opentelemetryexporter/exporter_test.go b/exporter/opentelemetryexporter/exporter_test.go new file mode 100644 index 00000000000..4cfd65099a9 --- /dev/null +++ b/exporter/opentelemetryexporter/exporter_test.go @@ -0,0 +1,464 @@ +package opentelemetryexporter + +import ( + "context" + "errors" + "fmt" + "log" + "os" + "strings" + "sync" + "testing" + "time" + + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/exporters/stdout/stdouttrace" + "go.opentelemetry.io/otel/sdk/resource" + sdktrace "go.opentelemetry.io/otel/sdk/trace" + "go.opentelemetry.io/otel/sdk/trace/tracetest" + semconv "go.opentelemetry.io/otel/semconv/v1.17.0" + "google.golang.org/grpc" + "google.golang.org/grpc/backoff" + "google.golang.org/grpc/credentials/insecure" + + "github.com/docker/go-connections/nat" + "github.com/stretchr/testify/assert" + "github.com/testcontainers/testcontainers-go" + "github.com/testcontainers/testcontainers-go/wait" + "github.com/thomaspoignant/go-feature-flag/exporter" +) + +var _ sdktrace.SpanExporter = (*PersistentInMemoryExporter)(nil) + +func TestCI(t *testing.T) { + t.Setenv("GITHUB_RUN_ID", "does-not-matter") + t.Setenv("CI", "true") + assert.True(t, checkIfGithubActionCI()) +} + +func TestValueReflection(t *testing.T) { + v := valueToAttributes("foo", "value", 2, 0) + assert.Len(t, v, 1) + assert.Len(t, valueToAttributes(1, "value", 2, 0), 1) + assert.Len(t, valueToAttributes(true, "value", 2, 0), 1) + assert.Len(t, valueToAttributes(3.2, "value", 2, 0), 1) + + testStruct := testStruct{ + Timestamp: 192929922, Condition: true, Content: "hello", notExported: false, Value: 1.0, AnotherValue: 3.3, + Substruct: testSubStruct{SubCondition: false, SubContent: "world", SubValue: 3.0, SubAnotherValue: 44.4, subNotExported: true}, + } + + prefix := "value" + + event := exporter.FeatureEvent{Value: testStruct} + structAttrs := valueToAttributes(event.Value, prefix, 2, 0) + assert.Len(t, structAttrs, 10) + for _, attr := range structAttrs { + assert.True(t, strings.HasPrefix(string(attr.Key), prefix+".")) + } + + substruct := testSubStruct{SubCondition: false, SubContent: "world", SubValue: 3.0, SubAnotherValue: 44.4, subNotExported: true} + slice := make([]testSubStruct, 0) + slice = append(slice, substruct) + + // we don't handle slices or arrays + assert.Len(t, valueToAttributes(slice, "value", 2, 0), 0) +} + +func TestFeatureEventsToAttributes(t *testing.T) { + // TODO: Build Various kinds of events + featureEvents := buildFeatureEvents() + + for _, featureEvent := range featureEvents { + attributes := featureEventToAttributes(featureEvent) + assert.True(t, len(attributes) == 10 || len(attributes) == 19) + } +} + +func TestResource(t *testing.T) { + resource := defaultResource() + assert.NotNil(t, resource) + assert.NotNil(t, resource.SchemaURL()) + + attributes := resource.Attributes() + assert.Len(t, attributes, 2) +} + +func TestExporterBuildsWithOptions(t *testing.T) { + userCustomResource := resource.NewWithAttributes( + semconv.SchemaURL, attribute.KeyValue{Key: "hello", Value: attribute.StringValue("World")}) + + inMemoryExporter := PersistentInMemoryExporter{} + inMemoryProcessor := sdktrace.NewBatchSpanProcessor(&inMemoryExporter) + exporter, err := NewExporterFromOpts( + + WithResource(userCustomResource), + WithBatchSpanProcessors(&inMemoryProcessor), + ) + assert.NoError(t, err) + assert.NotNil(t, exporter) + assert.NotNil(t, exporter.resource) + assert.Len(t, exporter.resource.Attributes(), 3) + // Check that our default resource wins the merge + assertResource(t, *defaultResource(), *exporter.resource) + // Check we didn't step on the users resource + assertResource(t, *userCustomResource, *exporter.resource) + assert.Len(t, exporter.processors, 1) +} + +func TestInterfaceCompliance(t *testing.T) { + exporter := Exporter{} + _ = acceptExporter(&exporter) + assert.True(t, exporter.IsBulk(), "Exporter exporter is a bulk exporter") +} +func TestExporterOptionErrorPath(t *testing.T) { + exp, err := NewExporterFromOpts( + + func(*Exporter) error { + return errors.New("test error") + }, + ) + assert.Error(t, err) + assert.Nil(t, exp) + + // This will cause a failure because of the schema mismatch + userCustomResource := resource.NewWithAttributes( + "https://opentelemetry.io/schemas/1.18.0", attribute.KeyValue{Key: "hello", Value: attribute.StringValue("World")}) + + exp, err = NewExporterFromOpts( + + WithResource(userCustomResource), + ) + assert.Error(t, err) + assert.Nil(t, exp) +} + +func TestInitProviderRequiresProcessor(t *testing.T) { + _, err := initProvider(&Exporter{}) + assert.NotNil(t, err) +} + +func TestPersistentInMemoryExporter(t *testing.T) { + ctx := context.Background() + + inMemorySpanExporter := PersistentInMemoryExporter{} + + tp := sdktrace.NewTracerProvider(sdktrace.WithSpanProcessor(sdktrace.NewBatchSpanProcessor(&inMemorySpanExporter))) + tracer := tp.Tracer("tracer") + _, span := tracer.Start(ctx, "span") + span.End() + + err := tp.ForceFlush(ctx) + assert.NoError(t, err) + + assert.Len(t, inMemorySpanExporter.GetSpans(), 1) + err = inMemorySpanExporter.Shutdown(ctx) + assert.NoError(t, err) + assert.Len(t, inMemorySpanExporter.GetSpans(), 1) + inMemorySpanExporter.Reset() + assert.Len(t, inMemorySpanExporter.GetSpans(), 0) +} + +func TestExportWithMultipleProcessors(t *testing.T) { + featureEvents := buildFeatureEvents() + + ctx := context.Background() + logger := log.New(os.Stdout, "", 0) + + inMemoryExporter := PersistentInMemoryExporter{} + inMemoryProcessor := sdktrace.NewBatchSpanProcessor(&inMemoryExporter) + // TODO wire up the stdout processor only if !CI + stdoutProcessor, err := stdoutBatchSpanProcessor(stdouttrace.WithPrettyPrint()) + assert.NoError(t, err) + resource := defaultResource() + + exp, err := NewExporterFromOpts( + + WithResource(resource), + WithBatchSpanProcessors(&inMemoryProcessor, &stdoutProcessor), + ) + assert.NoError(t, err) + err = exp.Export(ctx, logger, featureEvents) + assert.NoError(t, err) + // We sent three spans, the parents and three child spans corresponding to events + assert.Len(t, inMemoryExporter.GetSpans(), 4) + assertSpanReferentialIntegrity(t, &inMemoryExporter) + + // Test we can send again after the first cycle + inMemoryExporter.Reset() + err = exp.Export(ctx, logger, featureEvents) + assert.NoError(t, err) + assert.Len(t, inMemoryExporter.GetSpans(), 4) +} + +func TestOtelBSPNeedsOptions(t *testing.T) { + _, err := OtelCollectorBatchSpanProcessor("localhost") + assert.Error(t, err) + connectParams := grpc.ConnectParams{ + Backoff: backoff.Config{BaseDelay: time.Second * 2, + Multiplier: 2.0, + MaxDelay: time.Second * 16}} + + // Fails because it needs credentials + _, err = OtelCollectorBatchSpanProcessor("localhost", + grpc.WithConnectParams(connectParams)) + assert.Error(t, err) +} + +func TestOtelExporterDirectly(t *testing.T) { + ctx := context.Background() + + consumer := AppendingLogConsumer{} + otelC, err := setupOtelCollectorContainer(ctx, &consumer) + assert.NoError(t, err) + + connectParams := grpc.ConnectParams{ + Backoff: backoff.Config{BaseDelay: time.Second * 2, + Multiplier: 2.0, + MaxDelay: time.Second * 16}} + otelExporter, err := otelExporter(otelC.URI, + grpc.WithTransportCredentials(insecure.NewCredentials()), + grpc.WithConnectParams(connectParams)) + + assert.NoError(t, err) + + target := "test-go-feature-flag-export" + + spans := getSpanStubs(target) + err = otelExporter.ExportSpans(ctx, spans.Snapshots()) + assert.NoError(t, err) + + time.Sleep(5 * time.Second) + assert.True(t, consumer.Exists(target)) +} + +func TestExportToOtelCollector(t *testing.T) { + containerWaitTime := time.Second * 5 + + if checkIfGithubActionCI() { + log.Println("Setting timeout for CI") + containerWaitTime = time.Second * 5 + } + + featureEvents := buildFeatureEvents() + + ctx := context.Background() + logger := log.New(os.Stdout, "", 0) + + consumer := AppendingLogConsumer{} + otelC, err := setupOtelCollectorContainer(ctx, &consumer) + assert.NoError(t, err) + + settings := Settings{OpentelSettings: OpenTelSettings{URI: otelC.URI}} + exp, err := NewExporter(settings) + + assert.NoError(t, err) + err = exp.Export(ctx, logger, featureEvents) + assert.NoError(t, err) + // Sleep to give the container time to process the spans + time.Sleep(containerWaitTime) + assert.GreaterOrEqual(t, consumer.Size(), 1) + assert.True(t, consumer.Exists(instrumentationName)) + + // Clean up the container after the test is complete + t.Cleanup(func() { + fmt.Println("Terminating container") + if err := otelC.Terminate(ctx); err != nil { + t.Fatalf("failed to terminate container: %s", err) + } + }) +} + +type testSubStruct struct { + SubContent string + SubTimeStamp int64 + SubCondition bool + SubValue float32 + SubAnotherValue float64 + subNotExported bool +} + +type testStruct struct { + Substruct testSubStruct + Content string + Timestamp int64 + Condition bool + Value float32 + AnotherValue float64 + notExported bool +} + +func acceptExporter(expInterface exporter.Exporter) exporter.Exporter { + return expInterface +} + +func checkIfGithubActionCI() bool { + // https://docs.github.com/en/actions/learn-github-actions/variables#default-environment-variables + _, ok1 := os.LookupEnv("CI") + _, ok2 := os.LookupEnv("GITHUB_RUN_ID") + return ok1 && ok2 +} + +func buildFeatureEvents() []exporter.FeatureEvent { + return []exporter.FeatureEvent{ + { + Kind: "feature", ContextKind: "anonymousUser", UserKey: "ABCD", CreationDate: 1617970547, Key: "random-key", + Variation: "Default", Value: "YO", Default: false, + }, + { + Kind: "feature", ContextKind: "anonymousUser", UserKey: "ABCDEF", CreationDate: 1617970547, Key: "random-key", + Variation: "Default", Value: "YO", Default: false, + }, + { + Kind: "feature", ContextKind: "anonymousUser", UserKey: "ABCDEF", CreationDate: 1617970547, Key: "random-key", + Variation: "Default", Value: testStruct{ + Timestamp: 192929922, Condition: true, Content: "hello", notExported: false, Value: 1.0, AnotherValue: 3.3, + Substruct: testSubStruct{SubCondition: false, SubContent: "world", SubValue: 3.0, SubAnotherValue: 44.4, subNotExported: true}, + }, Default: false, + }, + } +} + +func getSpanStubs(target string) tracetest.SpanStubs { + s := make(tracetest.SpanStubs, 0) + s = append(s, tracetest.SpanStub{Name: target, StartTime: time.Now()}) + return s +} + +func assertResource(t *testing.T, expected resource.Resource, actual resource.Resource) { + var found bool + for _, target := range expected.Attributes() { + for _, attr := range actual.Attributes() { + if target.Key == attr.Key && target.Value == attr.Value { + found = true + } + } + assert.True(t, found) + } +} + +func assertSpanReferentialIntegrity(t *testing.T, inMemoryExporter *PersistentInMemoryExporter) { + for _, span := range inMemoryExporter.GetSpans() { + assert.NotNil(t, span) + } + + for _, span := range inMemoryExporter.GetSpans() { + if span.Parent.HasTraceID() { + assert.Equal(t, span.Parent.TraceID(), span.SpanContext.TraceID()) + assert.NotEqual(t, span.Parent.SpanID(), span.SpanContext.SpanID()) + assert.Equal(t, span.ChildSpanCount, 0) + } else { + assert.Equal(t, span.ChildSpanCount, 3) + } + assert.NotNil(t, span.Resource) + + if span.Parent.HasTraceID() { + assert.NotNil(t, span.Attributes) + // Different spans have different attributes + assert.GreaterOrEqual(t, len(span.Attributes), 1) + } + } +} + +// NewPersistentInMemoryExporter returns a new PersistentInMemoryExporter. +func NewPersistentInMemoryExporter() *PersistentInMemoryExporter { + return new(PersistentInMemoryExporter) +} + +// PersistentInMemoryExporter is an exporter that stores all received spans in-memory. +type PersistentInMemoryExporter struct { + tracetest.InMemoryExporter +} + +func (imsb *PersistentInMemoryExporter) Shutdown(context.Context) error { + return nil +} + +// AppendingLogConsumer buffers log content into a slice +type AppendingLogConsumer struct { + logs []string + lock sync.Mutex +} + +func (lc *AppendingLogConsumer) Size() int { + lc.lock.Lock() + defer lc.lock.Unlock() + return len(lc.logs) +} + +// Accept prints the log to stdout +func (lc *AppendingLogConsumer) Accept(l testcontainers.Log) { + lc.lock.Lock() + defer lc.lock.Unlock() + lc.logs = append(lc.logs, string(l.Content)) +} + +// Exists checks if the target exists anywhere in the log output +func (lc *AppendingLogConsumer) Exists(target string) bool { + lc.lock.Lock() + defer lc.lock.Unlock() + for _, s := range lc.logs { + if strings.Contains(s, target) { + return true + } + } + return false +} + +func (lc *AppendingLogConsumer) Display() { + lc.lock.Lock() + defer lc.lock.Unlock() + for _, s := range lc.logs { + fmt.Println(s) + } +} + +// opentelCollectorContainer struct for the test container and URI +type opentelCollectorContainer struct { + testcontainers.Container + URI string +} + +// setupOtelCollectorContainer sets up an otel container with a log consumer +func setupOtelCollectorContainer(ctx context.Context, + consumer testcontainers.LogConsumer) (*opentelCollectorContainer, error) { + // TODO ForListeningPort won't accept the variable as string + grpcPort := "4317/tcp" + req := testcontainers.ContainerRequest{ + Image: "otel/opentelemetry-collector:0.98.0", + ExposedPorts: []string{grpcPort, "55679/tcp"}, + WaitingFor: wait.ForAll( + wait.ForLog("Everything is ready. Begin running and processing data"), + wait.ForListeningPort(nat.Port(grpcPort)), + ), + } + + logConsumerConfig := testcontainers.LogConsumerConfig{ + Opts: []testcontainers.LogProductionOption{testcontainers.WithLogProductionTimeout(10 * time.Second)}, + Consumers: []testcontainers.LogConsumer{consumer}, + } + + request := testcontainers.GenericContainerRequest{ + ContainerRequest: req, + Started: true, + } + request.LogConsumerCfg = &logConsumerConfig + container, err := testcontainers.GenericContainer(ctx, request) + if err != nil { + return nil, err + } + + ip, err := container.Host(ctx) + if err != nil { + return nil, err + } + + mappedPort, err := container.MappedPort(ctx, "4317") + if err != nil { + return nil, err + } + + uri := fmt.Sprintf("%s:%s", ip, mappedPort.Port()) + + return &opentelCollectorContainer{Container: container, URI: uri}, nil +} diff --git a/go.mod b/go.mod index d0583e324fd..ec6ca6f1bc3 100644 --- a/go.mod +++ b/go.mod @@ -15,6 +15,7 @@ require ( github.com/aws/aws-sdk-go-v2/service/s3 v1.53.1 github.com/aws/aws-sdk-go-v2/service/sqs v1.31.4 github.com/awslabs/aws-lambda-go-api-proxy v0.16.2 + github.com/docker/go-connections v0.5.0 github.com/fsouza/fake-gcs-server v1.48.0 github.com/gdexlab/go-render v1.0.1 github.com/golang/mock v1.6.0 @@ -50,8 +51,11 @@ require ( go.opentelemetry.io/contrib/instrumentation/github.com/labstack/echo/otelecho v0.51.0 go.opentelemetry.io/otel v1.26.0 go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.26.0 + go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.26.0 go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.26.0 + go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.26.0 go.opentelemetry.io/otel/sdk v1.26.0 + go.opentelemetry.io/otel/trace v1.26.0 go.uber.org/zap v1.27.0 golang.org/x/net v0.24.0 golang.org/x/oauth2 v0.19.0 @@ -105,7 +109,6 @@ require ( github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect github.com/distribution/reference v0.5.0 // indirect github.com/docker/docker v25.0.5+incompatible // indirect - github.com/docker/go-connections v0.5.0 // indirect github.com/docker/go-units v0.5.0 // indirect github.com/eapache/go-resiliency v1.6.0 // indirect github.com/eapache/go-xerial-snappy v0.0.0-20230731223053-c322873962e3 // indirect @@ -200,7 +203,6 @@ require ( go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.49.0 // indirect go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.49.0 // indirect go.opentelemetry.io/otel/metric v1.26.0 // indirect - go.opentelemetry.io/otel/trace v1.26.0 // indirect go.opentelemetry.io/proto/otlp v1.2.0 // indirect go.uber.org/multierr v1.10.0 // indirect golang.org/x/crypto v0.22.0 // indirect diff --git a/go.sum b/go.sum index bb7ff0a50f3..d07ab0b7ba0 100644 --- a/go.sum +++ b/go.sum @@ -886,8 +886,12 @@ go.opentelemetry.io/otel v1.26.0 h1:LQwgL5s/1W7YiiRwxf03QGnWLb2HW4pLiAhaA5cZXBs= go.opentelemetry.io/otel v1.26.0/go.mod h1:UmLkJHUAidDval2EICqBMbnAd0/m2vmpf/dAM+fvFs4= go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.26.0 h1:1u/AyyOqAWzy+SkPxDpahCNZParHV8Vid1RnI2clyDE= go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.26.0/go.mod h1:z46paqbJ9l7c9fIPCXTqTGwhQZ5XoTIsfeFYWboizjs= +go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.26.0 h1:Waw9Wfpo/IXzOI8bCB7DIk+0JZcqqsyn1JFnAc+iam8= +go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.26.0/go.mod h1:wnJIG4fOqyynOnnQF/eQb4/16VlX2EJAHhHgqIqWfAo= go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.26.0 h1:1wp/gyxsuYtuE/JFxsQRtcCDtMrO2qMvlfXALU5wkzI= go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.26.0/go.mod h1:gbTHmghkGgqxMomVQQMur1Nba4M0MQ8AYThXDUjsJ38= +go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.26.0 h1:0W5o9SzoR15ocYHEQfvfipzcNog1lBxOLfnex91Hk6s= +go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.26.0/go.mod h1:zVZ8nz+VSggWmnh6tTsJqXQ7rU4xLwRtna1M4x5jq58= go.opentelemetry.io/otel/metric v1.26.0 h1:7S39CLuY5Jgg9CrnA9HHiEjGMF/X2VHvoXGgSllRz30= go.opentelemetry.io/otel/metric v1.26.0/go.mod h1:SY+rHOI4cEawI9a7N1A4nIg/nTQXe1ccCNWYOJUrpX4= go.opentelemetry.io/otel/sdk v1.26.0 h1:Y7bumHf5tAiDlRYFmGqetNcLaVUZmh4iYfmGxtmz7F8=