Skip to content

Commit 8d37bda

Browse files
committed
Allow primitive OTEL delta ingestion to be enabled
1 parent d52663f commit 8d37bda

File tree

4 files changed

+231
-1
lines changed

4 files changed

+231
-1
lines changed

pkg/distributor/otel.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@ type OTLPHandlerLimits interface {
5959
PromoteOTelResourceAttributes(id string) []string
6060
OTelKeepIdentifyingResourceAttributes(id string) bool
6161
OTelConvertHistogramsToNHCB(id string) bool
62+
OTelNativeDeltaIngestion(id string) bool
6263
}
6364

6465
// OTLPHandler is an http.Handler accepting OTLP write requests.
@@ -277,11 +278,12 @@ func newOTLPParser(
277278
promoteResourceAttributes := resourceAttributePromotionConfig.PromoteOTelResourceAttributes(tenantID)
278279
keepIdentifyingResourceAttributes := limits.OTelKeepIdentifyingResourceAttributes(tenantID)
279280
convertHistogramsToNHCB := limits.OTelConvertHistogramsToNHCB(tenantID)
281+
allowDeltaTemporality := limits.OTelNativeDeltaIngestion(tenantID)
280282

281283
pushMetrics.IncOTLPRequest(tenantID)
282284
pushMetrics.ObserveUncompressedBodySize(tenantID, float64(uncompressedBodySize))
283285

284-
metrics, metricsDropped, err := otelMetricsToTimeseries(ctx, otlpConverter, addSuffixes, enableCTZeroIngestion, enableStartTimeQuietZero, promoteResourceAttributes, keepIdentifyingResourceAttributes, convertHistogramsToNHCB, otlpReq.Metrics(), spanLogger)
286+
metrics, metricsDropped, err := otelMetricsToTimeseries(ctx, otlpConverter, addSuffixes, enableCTZeroIngestion, enableStartTimeQuietZero, promoteResourceAttributes, keepIdentifyingResourceAttributes, convertHistogramsToNHCB, allowDeltaTemporality, otlpReq.Metrics(), spanLogger)
285287
if metricsDropped > 0 {
286288
discardedDueToOtelParseError.WithLabelValues(tenantID, "").Add(float64(metricsDropped)) // "group" label is empty here as metrics couldn't be parsed
287289
}
@@ -515,6 +517,7 @@ func otelMetricsToTimeseries(
515517
promoteResourceAttributes []string,
516518
keepIdentifyingResourceAttributes bool,
517519
convertHistogramsToNHCB bool,
520+
allowDeltaTemporality bool,
518521
md pmetric.Metrics,
519522
logger log.Logger,
520523
) ([]mimirpb.PreallocTimeseries, int, error) {
@@ -525,6 +528,7 @@ func otelMetricsToTimeseries(
525528
PromoteResourceAttributes: otlp.NewPromoteResourceAttributes(config.OTLPConfig{PromoteResourceAttributes: promoteResourceAttributes}),
526529
KeepIdentifyingResourceAttributes: keepIdentifyingResourceAttributes,
527530
ConvertHistogramsToNHCB: convertHistogramsToNHCB,
531+
AllowDeltaTemporality: allowDeltaTemporality,
528532
}
529533
mimirTS := converter.ToTimeseries(ctx, md, settings, logger)
530534

pkg/distributor/otel_test.go

Lines changed: 218 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -287,6 +287,7 @@ func TestOTelMetricsToTimeSeries(t *testing.T) {
287287
tc.promoteResourceAttributes,
288288
tc.keepIdentifyingResourceAttributes,
289289
false,
290+
false,
290291
md,
291292
log.NewNopLogger(),
292293
)
@@ -362,6 +363,7 @@ func TestConvertOTelHistograms(t *testing.T) {
362363
[]string{},
363364
false,
364365
convertHistogramsToNHCB,
366+
false,
365367
md,
366368
log.NewNopLogger(),
367369
)
@@ -394,6 +396,222 @@ func TestConvertOTelHistograms(t *testing.T) {
394396
}
395397
}
396398

399+
func TestOTelDeltaIngestion(t *testing.T) {
400+
ts := time.Unix(100, 0)
401+
402+
testCases := []struct {
403+
name string
404+
allowDelta bool
405+
input pmetric.Metrics
406+
expected prompb.TimeSeries
407+
expectedErr string
408+
}{
409+
{
410+
name: "delta counter not allowed",
411+
allowDelta: false,
412+
input: func() pmetric.Metrics {
413+
md := pmetric.NewMetrics()
414+
rm := md.ResourceMetrics().AppendEmpty()
415+
il := rm.ScopeMetrics().AppendEmpty()
416+
m := il.Metrics().AppendEmpty()
417+
m.SetName("test_metric")
418+
sum := m.SetEmptySum()
419+
sum.SetAggregationTemporality(pmetric.AggregationTemporalityDelta)
420+
dp := sum.DataPoints().AppendEmpty()
421+
dp.SetTimestamp(pcommon.NewTimestampFromTime(time.Now()))
422+
dp.Attributes().PutStr("metric-attr", "metric value")
423+
return md
424+
}(),
425+
expectedErr: `otlp parse error: invalid temporality and type combination for metric "test_metric"`,
426+
},
427+
{
428+
name: "delta counter allowed",
429+
allowDelta: true,
430+
input: func() pmetric.Metrics {
431+
md := pmetric.NewMetrics()
432+
rm := md.ResourceMetrics().AppendEmpty()
433+
il := rm.ScopeMetrics().AppendEmpty()
434+
m := il.Metrics().AppendEmpty()
435+
m.SetName("test_metric")
436+
sum := m.SetEmptySum()
437+
sum.SetAggregationTemporality(pmetric.AggregationTemporalityDelta)
438+
dp := sum.DataPoints().AppendEmpty()
439+
dp.SetTimestamp(pcommon.NewTimestampFromTime(ts))
440+
dp.SetIntValue(5)
441+
dp.Attributes().PutStr("metric-attr", "metric value")
442+
return md
443+
}(),
444+
expected: prompb.TimeSeries{
445+
Labels: []prompb.Label{{Name: "__name__", Value: "test_metric"}, {Name: "metric-attr", Value: "metric value"}},
446+
Samples: []prompb.Sample{{Timestamp: ts.UnixMilli(), Value: 5}},
447+
},
448+
},
449+
{
450+
name: "delta exponential histogram not allowed",
451+
allowDelta: false,
452+
input: func() pmetric.Metrics {
453+
md := pmetric.NewMetrics()
454+
rm := md.ResourceMetrics().AppendEmpty()
455+
il := rm.ScopeMetrics().AppendEmpty()
456+
m := il.Metrics().AppendEmpty()
457+
m.SetName("test_metric")
458+
sum := m.SetEmptyExponentialHistogram()
459+
sum.SetAggregationTemporality(pmetric.AggregationTemporalityDelta)
460+
dp := sum.DataPoints().AppendEmpty()
461+
dp.SetCount(1)
462+
dp.SetSum(5)
463+
dp.SetTimestamp(pcommon.NewTimestampFromTime(time.Now()))
464+
dp.Attributes().PutStr("metric-attr", "metric value")
465+
return md
466+
}(),
467+
expectedErr: `otlp parse error: invalid temporality and type combination for metric "test_metric"`,
468+
},
469+
{
470+
name: "delta exponential histogram allowed",
471+
allowDelta: true,
472+
input: func() pmetric.Metrics {
473+
md := pmetric.NewMetrics()
474+
rm := md.ResourceMetrics().AppendEmpty()
475+
il := rm.ScopeMetrics().AppendEmpty()
476+
m := il.Metrics().AppendEmpty()
477+
m.SetName("test_metric")
478+
sum := m.SetEmptyExponentialHistogram()
479+
sum.SetAggregationTemporality(pmetric.AggregationTemporalityDelta)
480+
dp := sum.DataPoints().AppendEmpty()
481+
dp.SetCount(1)
482+
dp.SetSum(5)
483+
dp.SetTimestamp(pcommon.NewTimestampFromTime(time.Now()))
484+
dp.Attributes().PutStr("metric-attr", "metric value")
485+
return md
486+
}(),
487+
expected: prompb.TimeSeries{
488+
Labels: []prompb.Label{{Name: "__name__", Value: "test_metric"}, {Name: "metric-attr", Value: "metric value"}},
489+
Histograms: []prompb.Histogram{
490+
{
491+
Count: &prompb.Histogram_CountInt{CountInt: 1},
492+
Sum: 5,
493+
Schema: 0,
494+
ZeroThreshold: 1e-128,
495+
ZeroCount: &prompb.Histogram_ZeroCountInt{ZeroCountInt: 0},
496+
Timestamp: ts.UnixMilli(),
497+
ResetHint: prompb.Histogram_UNKNOWN,
498+
},
499+
},
500+
},
501+
},
502+
{
503+
name: "delta histogram as nhcb not allowed",
504+
allowDelta: false,
505+
input: func() pmetric.Metrics {
506+
md := pmetric.NewMetrics()
507+
rm := md.ResourceMetrics().AppendEmpty()
508+
il := rm.ScopeMetrics().AppendEmpty()
509+
m := il.Metrics().AppendEmpty()
510+
m.SetName("test_metric")
511+
sum := m.SetEmptyHistogram()
512+
sum.SetAggregationTemporality(pmetric.AggregationTemporalityDelta)
513+
dp := sum.DataPoints().AppendEmpty()
514+
dp.SetCount(20)
515+
dp.SetSum(30)
516+
dp.BucketCounts().FromRaw([]uint64{10, 10, 0})
517+
dp.ExplicitBounds().FromRaw([]float64{1, 2})
518+
dp.SetTimestamp(pcommon.NewTimestampFromTime(time.Now()))
519+
dp.Attributes().PutStr("metric-attr", "metric value")
520+
return md
521+
}(),
522+
expectedErr: `otlp parse error: invalid temporality and type combination for metric "test_metric"`,
523+
},
524+
{
525+
name: "delta histogram as nhcb allowed",
526+
allowDelta: true,
527+
input: func() pmetric.Metrics {
528+
md := pmetric.NewMetrics()
529+
rm := md.ResourceMetrics().AppendEmpty()
530+
il := rm.ScopeMetrics().AppendEmpty()
531+
m := il.Metrics().AppendEmpty()
532+
m.SetName("test_metric")
533+
sum := m.SetEmptyHistogram()
534+
sum.SetAggregationTemporality(pmetric.AggregationTemporalityDelta)
535+
dp := sum.DataPoints().AppendEmpty()
536+
dp.SetCount(20)
537+
dp.SetSum(30)
538+
dp.BucketCounts().FromRaw([]uint64{10, 10, 0})
539+
dp.ExplicitBounds().FromRaw([]float64{1, 2})
540+
dp.SetTimestamp(pcommon.NewTimestampFromTime(time.Now()))
541+
dp.Attributes().PutStr("metric-attr", "metric value")
542+
return md
543+
}(),
544+
expected: prompb.TimeSeries{
545+
Labels: []prompb.Label{{Name: "__name__", Value: "test_metric"}, {Name: "metric-attr", Value: "metric value"}},
546+
Histograms: []prompb.Histogram{
547+
{
548+
Count: &prompb.Histogram_CountInt{CountInt: 20},
549+
Sum: 30,
550+
Schema: -53,
551+
ZeroThreshold: 0,
552+
ZeroCount: nil,
553+
PositiveSpans: []prompb.BucketSpan{
554+
{
555+
Length: 3,
556+
},
557+
},
558+
PositiveDeltas: []int64{10, 0, -10},
559+
CustomValues: []float64{1, 2},
560+
Timestamp: ts.UnixMilli(),
561+
ResetHint: prompb.Histogram_UNKNOWN,
562+
},
563+
},
564+
},
565+
},
566+
}
567+
568+
for _, tc := range testCases {
569+
t.Run(tc.name, func(t *testing.T) {
570+
converter := newOTLPMimirConverter()
571+
mimirTS, dropped, err := otelMetricsToTimeseries(
572+
context.Background(),
573+
converter,
574+
true,
575+
false,
576+
false,
577+
[]string{},
578+
false,
579+
true,
580+
tc.allowDelta,
581+
tc.input,
582+
log.NewNopLogger(),
583+
)
584+
if tc.expectedErr != "" {
585+
require.EqualError(t, err, tc.expectedErr)
586+
require.Len(t, mimirTS, 0)
587+
require.Equal(t, 1, dropped)
588+
589+
} else {
590+
require.NoError(t, err)
591+
require.Len(t, mimirTS, 1)
592+
require.Equal(t, 0, dropped)
593+
}
594+
595+
/*var ts mimirpb.PreallocTimeseries
596+
for i := range mimirTS {
597+
for _, lbl := range mimirTS[i].Labels {
598+
if lbl.Name != labels.MetricName {
599+
continue
600+
}
601+
602+
if lbl.Value == "target_info" {
603+
continue
604+
} else {
605+
ts = mimirTS[i]
606+
break
607+
}
608+
}
609+
}*/
610+
611+
})
612+
}
613+
}
614+
397615
func BenchmarkOTLPHandler(b *testing.B) {
398616
var samples []prompb.Sample
399617
for i := 0; i < 1000; i++ {

pkg/distributor/push_test.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1541,6 +1541,8 @@ func (o otlpLimitsMock) OTelKeepIdentifyingResourceAttributes(string) bool {
15411541

15421542
func (o otlpLimitsMock) OTelConvertHistogramsToNHCB(string) bool { return false }
15431543

1544+
func (o otlpLimitsMock) OTelNativeDeltaIngestion(string) bool { return false }
1545+
15441546
func promToMimirHistogram(h *prompb.Histogram) mimirpb.Histogram {
15451547
pSpans := make([]mimirpb.BucketSpan, 0, len(h.PositiveSpans))
15461548
for _, span := range h.PositiveSpans {

pkg/util/validation/limits.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -274,6 +274,7 @@ type Limits struct {
274274
PromoteOTelResourceAttributes flagext.StringSliceCSV `yaml:"promote_otel_resource_attributes" json:"promote_otel_resource_attributes" category:"experimental"`
275275
OTelKeepIdentifyingResourceAttributes bool `yaml:"otel_keep_identifying_resource_attributes" json:"otel_keep_identifying_resource_attributes" category:"experimental"`
276276
OTelConvertHistogramsToNHCB bool `yaml:"otel_convert_histograms_to_nhcb" json:"otel_convert_histograms_to_nhcb" category:"experimental"`
277+
OTelNativeDeltaIngestion bool `yaml:"otel_native_delta_ingestion" json:"otel_native_delta_ingestion" category:"experimental"`
277278

278279
// Ingest storage.
279280
IngestStorageReadConsistency string `yaml:"ingest_storage_read_consistency" json:"ingest_storage_read_consistency" category:"experimental"`
@@ -314,6 +315,7 @@ func (l *Limits) RegisterFlags(f *flag.FlagSet) {
314315
f.Var(&l.PromoteOTelResourceAttributes, "distributor.otel-promote-resource-attributes", "Optionally specify OTel resource attributes to promote to labels.")
315316
f.BoolVar(&l.OTelKeepIdentifyingResourceAttributes, "distributor.otel-keep-identifying-resource-attributes", false, "Whether to keep identifying OTel resource attributes in the target_info metric on top of converting to job and instance labels.")
316317
f.BoolVar(&l.OTelConvertHistogramsToNHCB, "distributor.otel-convert-histograms-to-nhcb", false, "Whether to convert OTel explicit histograms into native histograms with custom buckets.")
318+
f.BoolVar(&l.OTelNativeDeltaIngestion, "distributor.otel-native-delta-ingestion", false, "Whether to enable native ingestion of delta OTLP metrics, which will store the raw delta sample values without conversion. If disabled, delta metrics will be rejected. Delta support is in an early stage of development. The ingestion and querying process is likely to change over time.")
317319

318320
f.Var(&l.IngestionArtificialDelay, "distributor.ingestion-artificial-delay", "Target ingestion delay to apply to all tenants. If set to a non-zero value, the distributor will artificially delay ingestion time-frame by the specified duration by computing the difference between actual ingestion and the target. There is no delay on actual ingestion of samples, it is only the response back to the client.")
319321
f.IntVar(&l.IngestionArtificialDelayConditionForTenantsWithLessThanMaxSeries, "distributor.ingestion-artificial-delay-condition-for-tenants-with-less-than-max-series", 0, "Condition to select tenants for which -distributor.ingestion-artificial-delay-duration-for-tenants-with-less-than-max-series should be applied.")
@@ -1257,6 +1259,10 @@ func (o *Overrides) OTelConvertHistogramsToNHCB(tenantID string) bool {
12571259
return o.getOverridesForUser(tenantID).OTelConvertHistogramsToNHCB
12581260
}
12591261

1262+
func (o *Overrides) OTelNativeDeltaIngestion(tenantID string) bool {
1263+
return o.getOverridesForUser(tenantID).OTelNativeDeltaIngestion
1264+
}
1265+
12601266
// DistributorIngestionArtificialDelay returns the artificial ingestion latency for a given user.
12611267
func (o *Overrides) DistributorIngestionArtificialDelay(tenantID string) time.Duration {
12621268
overrides := o.getOverridesForUser(tenantID)

0 commit comments

Comments
 (0)