Skip to content

Commit

Permalink
Fix telemetry payload schema
Browse files Browse the repository at this point in the history
  • Loading branch information
RomainMuller committed Dec 9, 2024
1 parent cb224d5 commit 78e6f39
Show file tree
Hide file tree
Showing 7 changed files with 133 additions and 60 deletions.
5 changes: 2 additions & 3 deletions ddtrace/tracer/telemetry.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,10 +129,9 @@ func startTelemetry(c *config) {
}

// Submit the initial metric tick
telemetry.GlobalClient.Record(
telemetry.GlobalClient.Gauge(
telemetry.NamespaceTracers,
telemetry.MetricKindGauge,
orchestrionEnabledMetric, orchestrionEnabledValue,
orchestrionEnabledMetric, telemetry.GlobalClient.HeartbeatInterval(), orchestrionEnabledValue,
orchestrionEnabledTags,
false, // Go-specific
)
Expand Down
6 changes: 3 additions & 3 deletions ddtrace/tracer/telemetry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ package tracer
import (
"fmt"
"testing"
"time"

"gopkg.in/DataDog/dd-trace-go.v1/internal/globalconfig"
"gopkg.in/DataDog/dd-trace-go.v1/internal/telemetry"
Expand Down Expand Up @@ -160,10 +161,9 @@ func TestTelemetryEnabled(t *testing.T) {
telemetry.Check(t, telemetryClient.Configuration, "orchestrion_enabled", true)
telemetry.Check(t, telemetryClient.Configuration, "orchestrion_k1", "v1")
telemetry.Check(t, telemetryClient.Configuration, "orchestrion_k2", "v2")
telemetryClient.AssertCalled(t, "Record",
telemetryClient.AssertCalled(t, "Gauge",
telemetry.NamespaceTracers,
telemetry.MetricKindGauge,
"orchestrion.enabled", 1.0,
"orchestrion.enabled", time.Second, 1.0,
[]string{"k1:v1", "k2:v2"},
false,
)
Expand Down
32 changes: 16 additions & 16 deletions internal/civisibility/utils/telemetry/telemetry_distribution.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,96 +9,96 @@ import "gopkg.in/DataDog/dd-trace-go.v1/internal/telemetry"

// EndpointPayloadBytes records the size in bytes of the serialized payload by CI Visibility.
func EndpointPayloadBytes(endpointType EndpointType, value float64) {
telemetry.GlobalClient.Record(telemetry.NamespaceCiVisibility, telemetry.MetricKindDist, "endpoint_payload.bytes", value, removeEmptyStrings([]string{
telemetry.GlobalClient.Distribution(telemetry.NamespaceCiVisibility, "endpoint_payload.bytes", value, removeEmptyStrings([]string{
(string)(endpointType),
}), true)
}

// EndpointPayloadRequestsMs records the time it takes to send the payload sent to the endpoint in ms by CI Visibility.
func EndpointPayloadRequestsMs(endpointType EndpointType, value float64) {
telemetry.GlobalClient.Record(telemetry.NamespaceCiVisibility, telemetry.MetricKindDist, "endpoint_payload.requests_ms", value, removeEmptyStrings([]string{
telemetry.GlobalClient.Distribution(telemetry.NamespaceCiVisibility, "endpoint_payload.requests_ms", value, removeEmptyStrings([]string{
(string)(endpointType),
}), true)
}

// EndpointPayloadEventsCount records the number of events in the payload sent to the endpoint by CI Visibility.
func EndpointPayloadEventsCount(endpointType EndpointType, value float64) {
telemetry.GlobalClient.Record(telemetry.NamespaceCiVisibility, telemetry.MetricKindDist, "endpoint_payload.events_count", value, removeEmptyStrings([]string{
telemetry.GlobalClient.Distribution(telemetry.NamespaceCiVisibility, "endpoint_payload.events_count", value, removeEmptyStrings([]string{
(string)(endpointType),
}), true)
}

// EndpointEventsSerializationMs records the time it takes to serialize the events in the payload sent to the endpoint in ms by CI Visibility.
func EndpointEventsSerializationMs(endpointType EndpointType, value float64) {
telemetry.GlobalClient.Record(telemetry.NamespaceCiVisibility, telemetry.MetricKindDist, "endpoint_payload.events_serialization_ms", value, removeEmptyStrings([]string{
telemetry.GlobalClient.Distribution(telemetry.NamespaceCiVisibility, "endpoint_payload.events_serialization_ms", value, removeEmptyStrings([]string{
(string)(endpointType),
}), true)
}

// GitCommandMs records the time it takes to execute a git command in ms by CI Visibility.
func GitCommandMs(commandType CommandType, value float64) {
telemetry.GlobalClient.Record(telemetry.NamespaceCiVisibility, telemetry.MetricKindDist, "git.command_ms", value, removeEmptyStrings([]string{
telemetry.GlobalClient.Distribution(telemetry.NamespaceCiVisibility, "git.command_ms", value, removeEmptyStrings([]string{
(string)(commandType),
}), true)
}

// GitRequestsSearchCommitsMs records the time it takes to get the response of the search commit quest in ms by CI Visibility.
func GitRequestsSearchCommitsMs(responseCompressedType ResponseCompressedType, value float64) {
telemetry.GlobalClient.Record(telemetry.NamespaceCiVisibility, telemetry.MetricKindDist, "git_requests.search_commits_ms", value, removeEmptyStrings([]string{
telemetry.GlobalClient.Distribution(telemetry.NamespaceCiVisibility, "git_requests.search_commits_ms", value, removeEmptyStrings([]string{
(string)(responseCompressedType),
}), true)
}

// GitRequestsObjectsPackMs records the time it takes to get the response of the objects pack request in ms by CI Visibility.
func GitRequestsObjectsPackMs(value float64) {
telemetry.GlobalClient.Record(telemetry.NamespaceCiVisibility, telemetry.MetricKindDist, "git_requests.objects_pack_ms", value, nil, true)
telemetry.GlobalClient.Distribution(telemetry.NamespaceCiVisibility, "git_requests.objects_pack_ms", value, nil, true)
}

// GitRequestsObjectsPackBytes records the sum of the sizes of the object pack files inside a single payload by CI Visibility
func GitRequestsObjectsPackBytes(value float64) {
telemetry.GlobalClient.Record(telemetry.NamespaceCiVisibility, telemetry.MetricKindDist, "git_requests.objects_pack_bytes", value, nil, true)
telemetry.GlobalClient.Distribution(telemetry.NamespaceCiVisibility, "git_requests.objects_pack_bytes", value, nil, true)
}

// GitRequestsObjectsPackFiles records the number of files sent in the object pack payload by CI Visibility.
func GitRequestsObjectsPackFiles(value float64) {
telemetry.GlobalClient.Record(telemetry.NamespaceCiVisibility, telemetry.MetricKindDist, "git_requests.objects_pack_files", value, nil, true)
telemetry.GlobalClient.Distribution(telemetry.NamespaceCiVisibility, "git_requests.objects_pack_files", value, nil, true)
}

// GitRequestsSettingsMs records the time it takes to get the response of the settings endpoint request in ms by CI Visibility.
func GitRequestsSettingsMs(value float64) {
telemetry.GlobalClient.Record(telemetry.NamespaceCiVisibility, telemetry.MetricKindDist, "git_requests.settings_ms", value, nil, true)
telemetry.GlobalClient.Distribution(telemetry.NamespaceCiVisibility, "git_requests.settings_ms", value, nil, true)
}

// ITRSkippableTestsRequestMs records the time it takes to get the response of the itr skippable tests endpoint request in ms by CI Visibility.
func ITRSkippableTestsRequestMs(value float64) {
telemetry.GlobalClient.Record(telemetry.NamespaceCiVisibility, telemetry.MetricKindDist, "itr_skippable_tests.request_ms", value, nil, true)
telemetry.GlobalClient.Distribution(telemetry.NamespaceCiVisibility, "itr_skippable_tests.request_ms", value, nil, true)
}

// ITRSkippableTestsResponseBytes records the number of bytes received by the endpoint. Tagged with a boolean flag set to true if response body is compressed.
func ITRSkippableTestsResponseBytes(responseCompressedType ResponseCompressedType, value float64) {
telemetry.GlobalClient.Record(telemetry.NamespaceCiVisibility, telemetry.MetricKindDist, "itr_skippable_tests.response_bytes", value, removeEmptyStrings([]string{
telemetry.GlobalClient.Distribution(telemetry.NamespaceCiVisibility, "itr_skippable_tests.response_bytes", value, removeEmptyStrings([]string{
(string)(responseCompressedType),
}), true)
}

// CodeCoverageFiles records the number of files in the code coverage report by CI Visibility.
func CodeCoverageFiles(value float64) {
telemetry.GlobalClient.Record(telemetry.NamespaceCiVisibility, telemetry.MetricKindDist, "code_coverage.files", value, nil, true)
telemetry.GlobalClient.Distribution(telemetry.NamespaceCiVisibility, "code_coverage.files", value, nil, true)
}

// EarlyFlakeDetectionRequestMs records the time it takes to get the response of the early flake detection endpoint request in ms by CI Visibility.
func EarlyFlakeDetectionRequestMs(value float64) {
telemetry.GlobalClient.Record(telemetry.NamespaceCiVisibility, telemetry.MetricKindDist, "early_flake_detection.request_ms", value, nil, true)
telemetry.GlobalClient.Distribution(telemetry.NamespaceCiVisibility, "early_flake_detection.request_ms", value, nil, true)
}

// EarlyFlakeDetectionResponseBytes records the number of bytes received by the endpoint. Tagged with a boolean flag set to true if response body is compressed.
func EarlyFlakeDetectionResponseBytes(responseCompressedType ResponseCompressedType, value float64) {
telemetry.GlobalClient.Record(telemetry.NamespaceCiVisibility, telemetry.MetricKindDist, "early_flake_detection.response_bytes", value, removeEmptyStrings([]string{
telemetry.GlobalClient.Distribution(telemetry.NamespaceCiVisibility, "early_flake_detection.response_bytes", value, removeEmptyStrings([]string{
(string)(responseCompressedType),
}), true)
}

// EarlyFlakeDetectionResponseTests records the number of tests in the response of the early flake detection endpoint by CI Visibility.
func EarlyFlakeDetectionResponseTests(value float64) {
telemetry.GlobalClient.Record(telemetry.NamespaceCiVisibility, telemetry.MetricKindDist, "early_flake_detection.response_tests", value, nil, true)
telemetry.GlobalClient.Distribution(telemetry.NamespaceCiVisibility, "early_flake_detection.response_tests", value, nil, true)
}
106 changes: 78 additions & 28 deletions internal/telemetry/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,15 @@ type Client interface {
RegisterAppConfig(name string, val interface{}, origin Origin)
ProductChange(namespace Namespace, enabled bool, configuration []Configuration)
ConfigChange(configuration []Configuration)
Record(namespace Namespace, metric MetricKind, name string, value float64, tags []string, common bool)

Count(namespace Namespace, name string, value float64, tags []string, common bool)
Distribution(namespace Namespace, name string, value float64, tags []string, common bool)
Gauge(namespace Namespace, name string, interval time.Duration, value float64, tags []string, common bool)

ApplyOps(opts ...Option)
Stop()

HeartbeatInterval() time.Duration
}

var (
Expand Down Expand Up @@ -273,6 +278,10 @@ func (c *client) start(configuration []Configuration, namespace Namespace, flush
c.heartbeatT = time.AfterFunc(c.heartbeatInterval, c.backgroundHeartbeat)
}

func (c *client) HeartbeatInterval() time.Duration {
return c.heartbeatInterval
}

func heartbeatInterval() time.Duration {
heartbeat := internal.FloatEnv("DD_TELEMETRY_HEARTBEAT_INTERVAL", defaultHeartbeatInterval)
if heartbeat <= 0 || heartbeat > 3600 {
Expand Down Expand Up @@ -326,9 +335,10 @@ var (
)

type metric struct {
name string
kind MetricKind
value float64
name string
kind MetricKind
value float64
interval int
// Unix timestamp
ts float64
tags []string
Expand All @@ -351,54 +361,85 @@ func metricKey(name string, tags []string, kind MetricKind) string {
return name + string(kind) + strings.Join(tags, "-")
}

// Record sets the value for a gauge or distribution metric type with the given
// Count adds the value to a count with the given name and tags. If the metric
// is not language-specific, common should be set to true
func (c *client) Count(namespace Namespace, name string, value float64, tags []string, common bool) {
c.mu.Lock()
defer c.mu.Unlock()
if !c.started {
return
}
c.count(namespace, name, value, tags, common)
}

// count implements Count, must be called with c.mu locked.
func (c *client) count(namespace Namespace, name string, value float64, tags []string, common bool) {
if _, ok := c.metrics[namespace]; !ok {
c.metrics[namespace] = map[string]*metric{}
}
key := metricKey(name, tags, MetricKindCount)
m, ok := c.metrics[namespace][key]
if !ok {
m = newMetric(name, MetricKindCount, tags, common)
c.metrics[namespace][key] = m
}
m.value += value
m.ts = float64(time.Now().Unix())
c.newMetrics = true
}

// Distribution sets the value for a distribution metric type with the given
// name and tags. If the metric is not language-specific, common should be set
// to true
func (c *client) Record(namespace Namespace, kind MetricKind, name string, value float64, tags []string, common bool) {
// to true.
func (c *client) Distribution(namespace Namespace, name string, value float64, tags []string, common bool) {
c.mu.Lock()
defer c.mu.Unlock()
if !c.started {
return
}
c.record(namespace, kind, name, value, tags, common)
c.distribution(namespace, name, value, tags, common)
}

// record sets the value for a gauge or distribution metric type with the given
// name and tags. If the metric is not language-soecific, common should be set
// to true. Must be called with c.mu locked.
func (c *client) record(namespace Namespace, kind MetricKind, name string, value float64, tags []string, common bool) {
// distribution implements Distribution, must be called with c.mu locked.
func (c *client) distribution(namespace Namespace, name string, value float64, tags []string, common bool) {
if _, ok := c.metrics[namespace]; !ok {
c.metrics[namespace] = map[string]*metric{}
}
key := metricKey(name, tags, kind)
key := metricKey(name, tags, MetricKindDist)
m, ok := c.metrics[namespace][key]
if !ok {
m = newMetric(name, kind, tags, common)
m = newMetric(name, MetricKindDist, tags, common)
c.metrics[namespace][key] = m
}
m.value = value
m.ts = float64(time.Now().Unix())
c.newMetrics = true
}

// Count adds the value to a count with the given name and tags. If the metric
// is not language-specific, common should be set to true
func (c *client) Count(namespace Namespace, name string, value float64, tags []string, common bool) {
// Gauge sets the value for a gauge metric type with the given name and tags. If
// the metric is not language-specific, common should be set to true.
func (c *client) Gauge(namespace Namespace, name string, interval time.Duration, value float64, tags []string, common bool) {
c.mu.Lock()
defer c.mu.Unlock()
if !c.started {
return
}
c.gauge(namespace, name, interval, value, tags, common)
}

// gauge implements Gauge, must be called with c.mu locked.
func (c *client) gauge(namespace Namespace, name string, interval time.Duration, value float64, tags []string, common bool) {
if _, ok := c.metrics[namespace]; !ok {
c.metrics[namespace] = map[string]*metric{}
}
key := metricKey(name, tags, MetricKindCount)
key := metricKey(name, tags, MetricKindGauge)
m, ok := c.metrics[namespace][key]
if !ok {
m = newMetric(name, MetricKindCount, tags, common)
m = newMetric(name, MetricKindGauge, tags, common)
m.interval = int(interval.Seconds())
c.metrics[namespace][key] = m
}
m.value += value
m.value = value
m.ts = float64(time.Now().Unix())
c.newMetrics = true
}
Expand Down Expand Up @@ -430,20 +471,22 @@ func (c *client) flush(sync bool) {
Namespace: namespace,
}
for _, m := range c.metrics[namespace] {
if m.kind == MetricKindDist {
switch m.kind {
case MetricKindDist:
dPayload.Series = append(dPayload.Series, DistributionSeries{
Metric: m.name,
Tags: m.tags,
Common: m.common,
Points: []float64{m.value},
})
} else {
default:
gPayload.Series = append(gPayload.Series, Series{
Metric: m.name,
Type: string(m.kind),
Tags: m.tags,
Common: m.common,
Points: [][2]float64{{m.ts, m.value}},
Metric: m.name,
Type: string(m.kind),
Interval: m.interval,
Tags: m.tags,
Common: m.common,
Points: [][2]float64{{m.ts, m.value}},
})
}
}
Expand Down Expand Up @@ -643,6 +686,13 @@ func (c *client) backgroundHeartbeat() {
// Must be called with c.mu locked.
func (c *client) emitHeartbeatMetrics() {
for _, m := range c.heartbeatMetrics {
c.record(m.namespace, m.kind, m.name, m.value(), m.tags, m.common)
switch m.kind {
case MetricKindCount:
c.count(m.namespace, m.name, m.value(), m.tags, m.common)
case MetricKindDist:
c.distribution(m.namespace, m.name, m.value(), m.tags, m.common)
case MetricKindGauge:
c.gauge(m.namespace, m.name, c.HeartbeatInterval(), m.value(), m.tags, m.common)
}
}
}
12 changes: 6 additions & 6 deletions internal/telemetry/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,8 +169,8 @@ func TestMetrics(t *testing.T) {
client.start(nil, NamespaceTracers, true)

// Records should have the most recent value
client.Record(NamespaceTracers, MetricKindGauge, "foobar", 1, nil, false)
client.Record(NamespaceTracers, MetricKindGauge, "foobar", 2, nil, false)
client.Gauge(NamespaceTracers, "foobar", 1, 1, nil, false)
client.Gauge(NamespaceTracers, "foobar", 1, 2, nil, false)
// Counts should be aggregated
client.Count(NamespaceTracers, "baz", 3, nil, true)
client.Count(NamespaceTracers, "baz", 1, nil, true)
Expand Down Expand Up @@ -244,8 +244,8 @@ func TestDistributionMetrics(t *testing.T) {
}
client.start(nil, NamespaceTracers, true)
// Records should have the most recent value
client.Record(NamespaceTracers, MetricKindDist, "soobar", 1, nil, false)
client.Record(NamespaceTracers, MetricKindDist, "soobar", 3, nil, false)
client.distribution(NamespaceTracers, "soobar", 1, nil, false)
client.distribution(NamespaceTracers, "soobar", 3, nil, false)
client.mu.Lock()
client.flush(false)
client.mu.Unlock()
Expand Down Expand Up @@ -275,7 +275,7 @@ func TestDisabledClient(t *testing.T) {
URL: server.URL,
}
client.start(nil, NamespaceTracers, true)
client.Record(NamespaceTracers, MetricKindGauge, "foobar", 1, nil, false)
client.Gauge(NamespaceTracers, "foobar", 1, 1, nil, false)
client.Count(NamespaceTracers, "bonk", 4, []string{"org:1"}, false)
client.Stop()
}
Expand All @@ -290,7 +290,7 @@ func TestNonStartedClient(t *testing.T) {
client := &client{
URL: server.URL,
}
client.Record(NamespaceTracers, MetricKindGauge, "foobar", 1, nil, false)
client.Gauge(NamespaceTracers, "foobar", 1, 1, nil, false)
client.Count(NamespaceTracers, "bonk", 4, []string{"org:1"}, false)
client.Stop()
}
Expand Down
2 changes: 1 addition & 1 deletion internal/telemetry/telemetry.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,6 @@ func Time(namespace Namespace, name string, tags []string, common bool) (finish
start := time.Now()
return func() {
elapsed := time.Since(start)
GlobalClient.Record(namespace, MetricKindDist, name, float64(elapsed.Milliseconds()), tags, common)
GlobalClient.Distribution(namespace, name, float64(elapsed.Milliseconds()), tags, common)
}
}
Loading

0 comments on commit 78e6f39

Please sign in to comment.