From 78e6f39433ad720dbfee3968ef11be6e2e4ef4cd Mon Sep 17 00:00:00 2001 From: Romain Marcadier Date: Mon, 9 Dec 2024 14:06:55 +0100 Subject: [PATCH] Fix telemetry payload schema --- ddtrace/tracer/telemetry.go | 5 +- ddtrace/tracer/telemetry_test.go | 6 +- .../utils/telemetry/telemetry_distribution.go | 32 +++--- internal/telemetry/client.go | 106 +++++++++++++----- internal/telemetry/client_test.go | 12 +- internal/telemetry/telemetry.go | 2 +- .../telemetry/telemetrytest/telemetrytest.go | 30 ++++- 7 files changed, 133 insertions(+), 60 deletions(-) diff --git a/ddtrace/tracer/telemetry.go b/ddtrace/tracer/telemetry.go index 9fa60de2dc..4c3814aab0 100644 --- a/ddtrace/tracer/telemetry.go +++ b/ddtrace/tracer/telemetry.go @@ -129,10 +129,9 @@ func startTelemetry(c *config) { } // Submit the initial metric tick - telemetry.GlobalClient.Record( + telemetry.GlobalClient.Gauge( telemetry.NamespaceTracers, - telemetry.MetricKindGauge, - orchestrionEnabledMetric, orchestrionEnabledValue, + orchestrionEnabledMetric, telemetry.GlobalClient.HeartbeatInterval(), orchestrionEnabledValue, orchestrionEnabledTags, false, // Go-specific ) diff --git a/ddtrace/tracer/telemetry_test.go b/ddtrace/tracer/telemetry_test.go index a4ad105ac4..f3769a248c 100644 --- a/ddtrace/tracer/telemetry_test.go +++ b/ddtrace/tracer/telemetry_test.go @@ -8,6 +8,7 @@ package tracer import ( "fmt" "testing" + "time" "gopkg.in/DataDog/dd-trace-go.v1/internal/globalconfig" "gopkg.in/DataDog/dd-trace-go.v1/internal/telemetry" @@ -160,10 +161,9 @@ func TestTelemetryEnabled(t *testing.T) { telemetry.Check(t, telemetryClient.Configuration, "orchestrion_enabled", true) telemetry.Check(t, telemetryClient.Configuration, "orchestrion_k1", "v1") telemetry.Check(t, telemetryClient.Configuration, "orchestrion_k2", "v2") - telemetryClient.AssertCalled(t, "Record", + telemetryClient.AssertCalled(t, "Gauge", telemetry.NamespaceTracers, - telemetry.MetricKindGauge, - "orchestrion.enabled", 1.0, + "orchestrion.enabled", time.Second, 1.0, []string{"k1:v1", "k2:v2"}, false, ) diff --git a/internal/civisibility/utils/telemetry/telemetry_distribution.go b/internal/civisibility/utils/telemetry/telemetry_distribution.go index 3b4d8d54fe..bd02707263 100644 --- a/internal/civisibility/utils/telemetry/telemetry_distribution.go +++ b/internal/civisibility/utils/telemetry/telemetry_distribution.go @@ -9,96 +9,96 @@ import "gopkg.in/DataDog/dd-trace-go.v1/internal/telemetry" // EndpointPayloadBytes records the size in bytes of the serialized payload by CI Visibility. func EndpointPayloadBytes(endpointType EndpointType, value float64) { - telemetry.GlobalClient.Record(telemetry.NamespaceCiVisibility, telemetry.MetricKindDist, "endpoint_payload.bytes", value, removeEmptyStrings([]string{ + telemetry.GlobalClient.Distribution(telemetry.NamespaceCiVisibility, "endpoint_payload.bytes", value, removeEmptyStrings([]string{ (string)(endpointType), }), true) } // EndpointPayloadRequestsMs records the time it takes to send the payload sent to the endpoint in ms by CI Visibility. func EndpointPayloadRequestsMs(endpointType EndpointType, value float64) { - telemetry.GlobalClient.Record(telemetry.NamespaceCiVisibility, telemetry.MetricKindDist, "endpoint_payload.requests_ms", value, removeEmptyStrings([]string{ + telemetry.GlobalClient.Distribution(telemetry.NamespaceCiVisibility, "endpoint_payload.requests_ms", value, removeEmptyStrings([]string{ (string)(endpointType), }), true) } // EndpointPayloadEventsCount records the number of events in the payload sent to the endpoint by CI Visibility. func EndpointPayloadEventsCount(endpointType EndpointType, value float64) { - telemetry.GlobalClient.Record(telemetry.NamespaceCiVisibility, telemetry.MetricKindDist, "endpoint_payload.events_count", value, removeEmptyStrings([]string{ + telemetry.GlobalClient.Distribution(telemetry.NamespaceCiVisibility, "endpoint_payload.events_count", value, removeEmptyStrings([]string{ (string)(endpointType), }), true) } // EndpointEventsSerializationMs records the time it takes to serialize the events in the payload sent to the endpoint in ms by CI Visibility. func EndpointEventsSerializationMs(endpointType EndpointType, value float64) { - telemetry.GlobalClient.Record(telemetry.NamespaceCiVisibility, telemetry.MetricKindDist, "endpoint_payload.events_serialization_ms", value, removeEmptyStrings([]string{ + telemetry.GlobalClient.Distribution(telemetry.NamespaceCiVisibility, "endpoint_payload.events_serialization_ms", value, removeEmptyStrings([]string{ (string)(endpointType), }), true) } // GitCommandMs records the time it takes to execute a git command in ms by CI Visibility. func GitCommandMs(commandType CommandType, value float64) { - telemetry.GlobalClient.Record(telemetry.NamespaceCiVisibility, telemetry.MetricKindDist, "git.command_ms", value, removeEmptyStrings([]string{ + telemetry.GlobalClient.Distribution(telemetry.NamespaceCiVisibility, "git.command_ms", value, removeEmptyStrings([]string{ (string)(commandType), }), true) } // GitRequestsSearchCommitsMs records the time it takes to get the response of the search commit quest in ms by CI Visibility. func GitRequestsSearchCommitsMs(responseCompressedType ResponseCompressedType, value float64) { - telemetry.GlobalClient.Record(telemetry.NamespaceCiVisibility, telemetry.MetricKindDist, "git_requests.search_commits_ms", value, removeEmptyStrings([]string{ + telemetry.GlobalClient.Distribution(telemetry.NamespaceCiVisibility, "git_requests.search_commits_ms", value, removeEmptyStrings([]string{ (string)(responseCompressedType), }), true) } // GitRequestsObjectsPackMs records the time it takes to get the response of the objects pack request in ms by CI Visibility. func GitRequestsObjectsPackMs(value float64) { - telemetry.GlobalClient.Record(telemetry.NamespaceCiVisibility, telemetry.MetricKindDist, "git_requests.objects_pack_ms", value, nil, true) + telemetry.GlobalClient.Distribution(telemetry.NamespaceCiVisibility, "git_requests.objects_pack_ms", value, nil, true) } // GitRequestsObjectsPackBytes records the sum of the sizes of the object pack files inside a single payload by CI Visibility func GitRequestsObjectsPackBytes(value float64) { - telemetry.GlobalClient.Record(telemetry.NamespaceCiVisibility, telemetry.MetricKindDist, "git_requests.objects_pack_bytes", value, nil, true) + telemetry.GlobalClient.Distribution(telemetry.NamespaceCiVisibility, "git_requests.objects_pack_bytes", value, nil, true) } // GitRequestsObjectsPackFiles records the number of files sent in the object pack payload by CI Visibility. func GitRequestsObjectsPackFiles(value float64) { - telemetry.GlobalClient.Record(telemetry.NamespaceCiVisibility, telemetry.MetricKindDist, "git_requests.objects_pack_files", value, nil, true) + telemetry.GlobalClient.Distribution(telemetry.NamespaceCiVisibility, "git_requests.objects_pack_files", value, nil, true) } // GitRequestsSettingsMs records the time it takes to get the response of the settings endpoint request in ms by CI Visibility. func GitRequestsSettingsMs(value float64) { - telemetry.GlobalClient.Record(telemetry.NamespaceCiVisibility, telemetry.MetricKindDist, "git_requests.settings_ms", value, nil, true) + telemetry.GlobalClient.Distribution(telemetry.NamespaceCiVisibility, "git_requests.settings_ms", value, nil, true) } // ITRSkippableTestsRequestMs records the time it takes to get the response of the itr skippable tests endpoint request in ms by CI Visibility. func ITRSkippableTestsRequestMs(value float64) { - telemetry.GlobalClient.Record(telemetry.NamespaceCiVisibility, telemetry.MetricKindDist, "itr_skippable_tests.request_ms", value, nil, true) + telemetry.GlobalClient.Distribution(telemetry.NamespaceCiVisibility, "itr_skippable_tests.request_ms", value, nil, true) } // ITRSkippableTestsResponseBytes records the number of bytes received by the endpoint. Tagged with a boolean flag set to true if response body is compressed. func ITRSkippableTestsResponseBytes(responseCompressedType ResponseCompressedType, value float64) { - telemetry.GlobalClient.Record(telemetry.NamespaceCiVisibility, telemetry.MetricKindDist, "itr_skippable_tests.response_bytes", value, removeEmptyStrings([]string{ + telemetry.GlobalClient.Distribution(telemetry.NamespaceCiVisibility, "itr_skippable_tests.response_bytes", value, removeEmptyStrings([]string{ (string)(responseCompressedType), }), true) } // CodeCoverageFiles records the number of files in the code coverage report by CI Visibility. func CodeCoverageFiles(value float64) { - telemetry.GlobalClient.Record(telemetry.NamespaceCiVisibility, telemetry.MetricKindDist, "code_coverage.files", value, nil, true) + telemetry.GlobalClient.Distribution(telemetry.NamespaceCiVisibility, "code_coverage.files", value, nil, true) } // EarlyFlakeDetectionRequestMs records the time it takes to get the response of the early flake detection endpoint request in ms by CI Visibility. func EarlyFlakeDetectionRequestMs(value float64) { - telemetry.GlobalClient.Record(telemetry.NamespaceCiVisibility, telemetry.MetricKindDist, "early_flake_detection.request_ms", value, nil, true) + telemetry.GlobalClient.Distribution(telemetry.NamespaceCiVisibility, "early_flake_detection.request_ms", value, nil, true) } // EarlyFlakeDetectionResponseBytes records the number of bytes received by the endpoint. Tagged with a boolean flag set to true if response body is compressed. func EarlyFlakeDetectionResponseBytes(responseCompressedType ResponseCompressedType, value float64) { - telemetry.GlobalClient.Record(telemetry.NamespaceCiVisibility, telemetry.MetricKindDist, "early_flake_detection.response_bytes", value, removeEmptyStrings([]string{ + telemetry.GlobalClient.Distribution(telemetry.NamespaceCiVisibility, "early_flake_detection.response_bytes", value, removeEmptyStrings([]string{ (string)(responseCompressedType), }), true) } // EarlyFlakeDetectionResponseTests records the number of tests in the response of the early flake detection endpoint by CI Visibility. func EarlyFlakeDetectionResponseTests(value float64) { - telemetry.GlobalClient.Record(telemetry.NamespaceCiVisibility, telemetry.MetricKindDist, "early_flake_detection.response_tests", value, nil, true) + telemetry.GlobalClient.Distribution(telemetry.NamespaceCiVisibility, "early_flake_detection.response_tests", value, nil, true) } diff --git a/internal/telemetry/client.go b/internal/telemetry/client.go index 9495620c06..b6adae24f2 100644 --- a/internal/telemetry/client.go +++ b/internal/telemetry/client.go @@ -33,10 +33,15 @@ type Client interface { RegisterAppConfig(name string, val interface{}, origin Origin) ProductChange(namespace Namespace, enabled bool, configuration []Configuration) ConfigChange(configuration []Configuration) - Record(namespace Namespace, metric MetricKind, name string, value float64, tags []string, common bool) + Count(namespace Namespace, name string, value float64, tags []string, common bool) + Distribution(namespace Namespace, name string, value float64, tags []string, common bool) + Gauge(namespace Namespace, name string, interval time.Duration, value float64, tags []string, common bool) + ApplyOps(opts ...Option) Stop() + + HeartbeatInterval() time.Duration } var ( @@ -273,6 +278,10 @@ func (c *client) start(configuration []Configuration, namespace Namespace, flush c.heartbeatT = time.AfterFunc(c.heartbeatInterval, c.backgroundHeartbeat) } +func (c *client) HeartbeatInterval() time.Duration { + return c.heartbeatInterval +} + func heartbeatInterval() time.Duration { heartbeat := internal.FloatEnv("DD_TELEMETRY_HEARTBEAT_INTERVAL", defaultHeartbeatInterval) if heartbeat <= 0 || heartbeat > 3600 { @@ -326,9 +335,10 @@ var ( ) type metric struct { - name string - kind MetricKind - value float64 + name string + kind MetricKind + value float64 + interval int // Unix timestamp ts float64 tags []string @@ -351,29 +361,54 @@ func metricKey(name string, tags []string, kind MetricKind) string { return name + string(kind) + strings.Join(tags, "-") } -// Record sets the value for a gauge or distribution metric type with the given +// Count adds the value to a count with the given name and tags. If the metric +// is not language-specific, common should be set to true +func (c *client) Count(namespace Namespace, name string, value float64, tags []string, common bool) { + c.mu.Lock() + defer c.mu.Unlock() + if !c.started { + return + } + c.count(namespace, name, value, tags, common) +} + +// count implements Count, must be called with c.mu locked. +func (c *client) count(namespace Namespace, name string, value float64, tags []string, common bool) { + if _, ok := c.metrics[namespace]; !ok { + c.metrics[namespace] = map[string]*metric{} + } + key := metricKey(name, tags, MetricKindCount) + m, ok := c.metrics[namespace][key] + if !ok { + m = newMetric(name, MetricKindCount, tags, common) + c.metrics[namespace][key] = m + } + m.value += value + m.ts = float64(time.Now().Unix()) + c.newMetrics = true +} + +// Distribution sets the value for a distribution metric type with the given // name and tags. If the metric is not language-specific, common should be set -// to true -func (c *client) Record(namespace Namespace, kind MetricKind, name string, value float64, tags []string, common bool) { +// to true. +func (c *client) Distribution(namespace Namespace, name string, value float64, tags []string, common bool) { c.mu.Lock() defer c.mu.Unlock() if !c.started { return } - c.record(namespace, kind, name, value, tags, common) + c.distribution(namespace, name, value, tags, common) } -// record sets the value for a gauge or distribution metric type with the given -// name and tags. If the metric is not language-soecific, common should be set -// to true. Must be called with c.mu locked. -func (c *client) record(namespace Namespace, kind MetricKind, name string, value float64, tags []string, common bool) { +// distribution implements Distribution, must be called with c.mu locked. +func (c *client) distribution(namespace Namespace, name string, value float64, tags []string, common bool) { if _, ok := c.metrics[namespace]; !ok { c.metrics[namespace] = map[string]*metric{} } - key := metricKey(name, tags, kind) + key := metricKey(name, tags, MetricKindDist) m, ok := c.metrics[namespace][key] if !ok { - m = newMetric(name, kind, tags, common) + m = newMetric(name, MetricKindDist, tags, common) c.metrics[namespace][key] = m } m.value = value @@ -381,24 +416,30 @@ func (c *client) record(namespace Namespace, kind MetricKind, name string, value c.newMetrics = true } -// Count adds the value to a count with the given name and tags. If the metric -// is not language-specific, common should be set to true -func (c *client) Count(namespace Namespace, name string, value float64, tags []string, common bool) { +// Gauge sets the value for a gauge metric type with the given name and tags. If +// the metric is not language-specific, common should be set to true. +func (c *client) Gauge(namespace Namespace, name string, interval time.Duration, value float64, tags []string, common bool) { c.mu.Lock() defer c.mu.Unlock() if !c.started { return } + c.gauge(namespace, name, interval, value, tags, common) +} + +// gauge implements Gauge, must be called with c.mu locked. +func (c *client) gauge(namespace Namespace, name string, interval time.Duration, value float64, tags []string, common bool) { if _, ok := c.metrics[namespace]; !ok { c.metrics[namespace] = map[string]*metric{} } - key := metricKey(name, tags, MetricKindCount) + key := metricKey(name, tags, MetricKindGauge) m, ok := c.metrics[namespace][key] if !ok { - m = newMetric(name, MetricKindCount, tags, common) + m = newMetric(name, MetricKindGauge, tags, common) + m.interval = int(interval.Seconds()) c.metrics[namespace][key] = m } - m.value += value + m.value = value m.ts = float64(time.Now().Unix()) c.newMetrics = true } @@ -430,20 +471,22 @@ func (c *client) flush(sync bool) { Namespace: namespace, } for _, m := range c.metrics[namespace] { - if m.kind == MetricKindDist { + switch m.kind { + case MetricKindDist: dPayload.Series = append(dPayload.Series, DistributionSeries{ Metric: m.name, Tags: m.tags, Common: m.common, Points: []float64{m.value}, }) - } else { + default: gPayload.Series = append(gPayload.Series, Series{ - Metric: m.name, - Type: string(m.kind), - Tags: m.tags, - Common: m.common, - Points: [][2]float64{{m.ts, m.value}}, + Metric: m.name, + Type: string(m.kind), + Interval: m.interval, + Tags: m.tags, + Common: m.common, + Points: [][2]float64{{m.ts, m.value}}, }) } } @@ -643,6 +686,13 @@ func (c *client) backgroundHeartbeat() { // Must be called with c.mu locked. func (c *client) emitHeartbeatMetrics() { for _, m := range c.heartbeatMetrics { - c.record(m.namespace, m.kind, m.name, m.value(), m.tags, m.common) + switch m.kind { + case MetricKindCount: + c.count(m.namespace, m.name, m.value(), m.tags, m.common) + case MetricKindDist: + c.distribution(m.namespace, m.name, m.value(), m.tags, m.common) + case MetricKindGauge: + c.gauge(m.namespace, m.name, c.HeartbeatInterval(), m.value(), m.tags, m.common) + } } } diff --git a/internal/telemetry/client_test.go b/internal/telemetry/client_test.go index a75e2ec3a7..19a6cec268 100644 --- a/internal/telemetry/client_test.go +++ b/internal/telemetry/client_test.go @@ -169,8 +169,8 @@ func TestMetrics(t *testing.T) { client.start(nil, NamespaceTracers, true) // Records should have the most recent value - client.Record(NamespaceTracers, MetricKindGauge, "foobar", 1, nil, false) - client.Record(NamespaceTracers, MetricKindGauge, "foobar", 2, nil, false) + client.Gauge(NamespaceTracers, "foobar", 1, 1, nil, false) + client.Gauge(NamespaceTracers, "foobar", 1, 2, nil, false) // Counts should be aggregated client.Count(NamespaceTracers, "baz", 3, nil, true) client.Count(NamespaceTracers, "baz", 1, nil, true) @@ -244,8 +244,8 @@ func TestDistributionMetrics(t *testing.T) { } client.start(nil, NamespaceTracers, true) // Records should have the most recent value - client.Record(NamespaceTracers, MetricKindDist, "soobar", 1, nil, false) - client.Record(NamespaceTracers, MetricKindDist, "soobar", 3, nil, false) + client.distribution(NamespaceTracers, "soobar", 1, nil, false) + client.distribution(NamespaceTracers, "soobar", 3, nil, false) client.mu.Lock() client.flush(false) client.mu.Unlock() @@ -275,7 +275,7 @@ func TestDisabledClient(t *testing.T) { URL: server.URL, } client.start(nil, NamespaceTracers, true) - client.Record(NamespaceTracers, MetricKindGauge, "foobar", 1, nil, false) + client.Gauge(NamespaceTracers, "foobar", 1, 1, nil, false) client.Count(NamespaceTracers, "bonk", 4, []string{"org:1"}, false) client.Stop() } @@ -290,7 +290,7 @@ func TestNonStartedClient(t *testing.T) { client := &client{ URL: server.URL, } - client.Record(NamespaceTracers, MetricKindGauge, "foobar", 1, nil, false) + client.Gauge(NamespaceTracers, "foobar", 1, 1, nil, false) client.Count(NamespaceTracers, "bonk", 4, []string{"org:1"}, false) client.Stop() } diff --git a/internal/telemetry/telemetry.go b/internal/telemetry/telemetry.go index 994dee89a8..cdb4f408df 100644 --- a/internal/telemetry/telemetry.go +++ b/internal/telemetry/telemetry.go @@ -121,6 +121,6 @@ func Time(namespace Namespace, name string, tags []string, common bool) (finish start := time.Now() return func() { elapsed := time.Since(start) - GlobalClient.Record(namespace, MetricKindDist, name, float64(elapsed.Milliseconds()), tags, common) + GlobalClient.Distribution(namespace, name, float64(elapsed.Milliseconds()), tags, common) } } diff --git a/internal/telemetry/telemetrytest/telemetrytest.go b/internal/telemetry/telemetrytest/telemetrytest.go index 8861c966d6..331f619497 100644 --- a/internal/telemetry/telemetrytest/telemetrytest.go +++ b/internal/telemetry/telemetrytest/telemetrytest.go @@ -9,6 +9,7 @@ package telemetrytest import ( "slices" "sync" + "time" "gopkg.in/DataDog/dd-trace-go.v1/internal/telemetry" @@ -44,6 +45,12 @@ func (c *MockClient) ProductChange(namespace telemetry.Namespace, enabled bool, c.productChange(namespace, enabled) } +func (c *MockClient) HeartbeatInterval() time.Duration { + c.On("HeartbeatInterval").Return(time.Second) + _ = c.Called() + return time.Second +} + // ProductStop signals a product has stopped and disables that product in the mock client. // ProductStop is NOOP for the tracer namespace, since the tracer is not considered a product. func (c *MockClient) ProductStop(namespace telemetry.Namespace) { @@ -69,13 +76,30 @@ func (c *MockClient) productChange(namespace telemetry.Namespace, enabled bool) } } +// Distribution stores the value for the given metric. +func (c *MockClient) Distribution(ns telemetry.Namespace, name string, val float64, tags []string, common bool) { + // Ensure consistent ordering through expectations + slices.Sort(tags) + + c.On("Distribution", ns, name, val, tags, common).Return() + _ = c.Called(ns, name, val, tags, common) + // record the val for tests that assert based on the value + if _, ok := c.Metrics[ns]; !ok { + if c.Metrics == nil { + c.Metrics = make(map[telemetry.Namespace]map[string]float64) + } + c.Metrics[ns] = map[string]float64{} + } + c.Metrics[ns][name] = val +} + // Record stores the value for the given metric. It is currently mocked for `Gauge` and `Distribution` metric types. -func (c *MockClient) Record(ns telemetry.Namespace, kind telemetry.MetricKind, name string, val float64, tags []string, common bool) { +func (c *MockClient) Gauge(ns telemetry.Namespace, name string, interval time.Duration, val float64, tags []string, common bool) { // Ensure consistent ordering through expectations slices.Sort(tags) - c.On("Record", ns, kind, name, val, tags, common).Return() - _ = c.Called(ns, kind, name, val, tags, common) + c.On("Gauge", ns, name, interval, val, tags, common).Return() + _ = c.Called(ns, name, interval, val, tags, common) // record the val for tests that assert based on the value if _, ok := c.Metrics[ns]; !ok { if c.Metrics == nil {