Skip to content
Merged
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
* [FEATURE] Query-frontend: Allow use of Mimir Query Engine (MQE) via the experimental CLI flags `-query-frontend.query-engine` or `-query-frontend.enable-query-engine-fallback` or corresponding YAML. #11417 #11775
* [FEATURE] Querier, query-frontend, ruler: Enable experimental support for duration expressions in PromQL, which are simple arithmetics on numbers in offset and range specification. #11344
* [FEATURE] You can configure Mimir to export traces in OTLP exposition format through the standard `OTEL_` environment variables. #11618
* [FEATURE] Distributor: Add experimental `-distributor.otel-native-delta-ingestion` option to allow primitive delta metrics ingestion via the OTLP endpoint. #11631
* [FEATURE] distributor: Allow configuring tenant-specific HA tracker failover timeouts. #11774
* [FEATURE] OTLP: Add experimental support for promoting OTel scope metadata (name, version, schema URL, attributes) to metric labels, prefixed with `otel_scope_`. Enable via the `-distributor.otel-promote-scope-metadata` flag. #11795
* [ENHANCEMENT] Dashboards: Add "Influx write requests" row to Writes Dashboard. #11731
Expand Down
11 changes: 11 additions & 0 deletions cmd/mimir/config-descriptor.json
Original file line number Diff line number Diff line change
Expand Up @@ -5807,6 +5807,17 @@
"fieldType": "boolean",
"fieldCategory": "experimental"
},
{
"kind": "field",
"name": "otel_native_delta_ingestion",
"required": false,
"desc": "Whether to enable native ingestion of delta OTLP metrics, which will store the raw delta sample values without conversion. If disabled, delta metrics will be rejected. Delta support is in an early stage of development. The ingestion and querying process is likely to change over time.",
"fieldValue": null,
"fieldDefaultValue": false,
"fieldFlag": "distributor.otel-native-delta-ingestion",
"fieldType": "boolean",
"fieldCategory": "experimental"
},
{
"kind": "field",
"name": "ingest_storage_read_consistency",
Expand Down
2 changes: 2 additions & 0 deletions cmd/mimir/help-all.txt.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -1425,6 +1425,8 @@ Usage of ./cmd/mimir/mimir:
[experimental] Whether to keep identifying OTel resource attributes in the target_info metric on top of converting to job and instance labels.
-distributor.otel-metric-suffixes-enabled
Whether to enable automatic suffixes to names of metrics ingested through OTLP.
-distributor.otel-native-delta-ingestion
[experimental] Whether to enable native ingestion of delta OTLP metrics, which will store the raw delta sample values without conversion. If disabled, delta metrics will be rejected. Delta support is in an early stage of development. The ingestion and querying process is likely to change over time.
-distributor.otel-promote-resource-attributes comma-separated-list-of-strings
[experimental] Optionally specify OTel resource attributes to promote to labels.
-distributor.otel-promote-scope-metadata
Expand Down
2 changes: 2 additions & 0 deletions docs/sources/mimir/configure/about-versioning.md
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,8 @@ The following features are currently experimental:
- `-distributor.otel-convert-histograms-to-nhcb`
- Enable promotion of OTel scope metadata to metric labels
- `-distributor.otel-promote-scope-metadata`
- Enable native ingestion of delta OTLP metrics. This means storing the raw delta sample values without converting them to cumulative values and having the metric type set to "Unknown". Delta support is in an early stage of development. The ingestion and querying process is likely to change over time. You can find considerations around querying and gotchas in the [corresponding Prometheus documentation](https://prometheus.io/docs/prometheus/3.4/feature_flags/#otlp-native-delta-support).
- `distributor.otel-native-delta-ingestion`
- Hash ring
- Disabling ring heartbeat timeouts
- `-distributor.ring.heartbeat-timeout=0`
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4186,6 +4186,13 @@ ruler_alertmanager_client_config:
# CLI flag: -distributor.otel-promote-scope-metadata
[otel_promote_scope_metadata: <boolean> | default = false]

# (experimental) Whether to enable native ingestion of delta OTLP metrics, which
# will store the raw delta sample values without conversion. If disabled, delta
# metrics will be rejected. Delta support is in an early stage of development.
# The ingestion and querying process is likely to change over time.
# CLI flag: -distributor.otel-native-delta-ingestion
[otel_native_delta_ingestion: <boolean> | default = false]

# (experimental) The default consistency level to enforce for queries when using
# the ingest storage. Supports values: strong, eventual.
# CLI flag: -ingest-storage.read-consistency
Expand Down
5 changes: 5 additions & 0 deletions pkg/distributor/otel.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ type OTLPHandlerLimits interface {
OTelKeepIdentifyingResourceAttributes(id string) bool
OTelConvertHistogramsToNHCB(id string) bool
OTelPromoteScopeMetadata(id string) bool
OTelNativeDeltaIngestion(id string) bool
}

// OTLPHandler is an http.Handler accepting OTLP write requests.
Expand Down Expand Up @@ -283,6 +284,7 @@ func newOTLPParser(
keepIdentifyingResourceAttributes := limits.OTelKeepIdentifyingResourceAttributes(tenantID)
convertHistogramsToNHCB := limits.OTelConvertHistogramsToNHCB(tenantID)
promoteScopeMetadata := limits.OTelPromoteScopeMetadata(tenantID)
allowDeltaTemporality := limits.OTelNativeDeltaIngestion(tenantID)

pushMetrics.IncOTLPRequest(tenantID)
pushMetrics.ObserveUncompressedBodySize(tenantID, float64(uncompressedBodySize))
Expand All @@ -299,6 +301,7 @@ func newOTLPParser(
convertHistogramsToNHCB: convertHistogramsToNHCB,
promoteScopeMetadata: promoteScopeMetadata,
promoteResourceAttributes: promoteResourceAttributes,
allowDeltaTemporality: allowDeltaTemporality,
},
spanLogger,
)
Expand Down Expand Up @@ -509,6 +512,7 @@ type conversionOptions struct {
convertHistogramsToNHCB bool
promoteScopeMetadata bool
promoteResourceAttributes []string
allowDeltaTemporality bool
}

func otelMetricsToTimeseries(
Expand All @@ -526,6 +530,7 @@ func otelMetricsToTimeseries(
KeepIdentifyingResourceAttributes: opts.keepIdentifyingResourceAttributes,
ConvertHistogramsToNHCB: opts.convertHistogramsToNHCB,
PromoteScopeMetadata: opts.promoteScopeMetadata,
AllowDeltaTemporality: opts.allowDeltaTemporality,
}
mimirTS := converter.ToTimeseries(ctx, md, settings, logger)

Expand Down
196 changes: 196 additions & 0 deletions pkg/distributor/otel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -508,6 +508,202 @@ func TestConvertOTelHistograms(t *testing.T) {
}
}

func TestOTelDeltaIngestion(t *testing.T) {
ts := time.Unix(100, 0)

testCases := []struct {
name string
allowDelta bool
input pmetric.Metrics
expected mimirpb.TimeSeries
expectedErr string
}{
{
name: "delta counter not allowed",
allowDelta: false,
input: func() pmetric.Metrics {
md := pmetric.NewMetrics()
rm := md.ResourceMetrics().AppendEmpty()
il := rm.ScopeMetrics().AppendEmpty()
m := il.Metrics().AppendEmpty()
m.SetName("test_metric")
sum := m.SetEmptySum()
sum.SetAggregationTemporality(pmetric.AggregationTemporalityDelta)
dp := sum.DataPoints().AppendEmpty()
dp.SetTimestamp(pcommon.NewTimestampFromTime(time.Now()))
dp.Attributes().PutStr("metric-attr", "metric value")
return md
}(),
expectedErr: `otlp parse error: invalid temporality and type combination for metric "test_metric"`,
},
{
name: "delta counter allowed",
allowDelta: true,
input: func() pmetric.Metrics {
md := pmetric.NewMetrics()
rm := md.ResourceMetrics().AppendEmpty()
il := rm.ScopeMetrics().AppendEmpty()
m := il.Metrics().AppendEmpty()
m.SetName("test_metric")
sum := m.SetEmptySum()
sum.SetAggregationTemporality(pmetric.AggregationTemporalityDelta)
dp := sum.DataPoints().AppendEmpty()
dp.SetTimestamp(pcommon.NewTimestampFromTime(ts))
dp.SetIntValue(5)
dp.Attributes().PutStr("metric-attr", "metric value")
return md
}(),
expected: mimirpb.TimeSeries{
Labels: []mimirpb.LabelAdapter{{Name: "__name__", Value: "test_metric"}, {Name: "metric_attr", Value: "metric value"}},
Samples: []mimirpb.Sample{{TimestampMs: ts.UnixMilli(), Value: 5}},
},
},
{
name: "delta exponential histogram not allowed",
allowDelta: false,
input: func() pmetric.Metrics {
md := pmetric.NewMetrics()
rm := md.ResourceMetrics().AppendEmpty()
il := rm.ScopeMetrics().AppendEmpty()
m := il.Metrics().AppendEmpty()
m.SetName("test_metric")
sum := m.SetEmptyExponentialHistogram()
sum.SetAggregationTemporality(pmetric.AggregationTemporalityDelta)
dp := sum.DataPoints().AppendEmpty()
dp.SetCount(1)
dp.SetSum(5)
dp.SetTimestamp(pcommon.NewTimestampFromTime(time.Now()))
dp.Attributes().PutStr("metric-attr", "metric value")
return md
}(),
expectedErr: `otlp parse error: invalid temporality and type combination for metric "test_metric"`,
},
{
name: "delta exponential histogram allowed",
allowDelta: true,
input: func() pmetric.Metrics {
md := pmetric.NewMetrics()
rm := md.ResourceMetrics().AppendEmpty()
il := rm.ScopeMetrics().AppendEmpty()
m := il.Metrics().AppendEmpty()
m.SetName("test_metric")
sum := m.SetEmptyExponentialHistogram()
sum.SetAggregationTemporality(pmetric.AggregationTemporalityDelta)
dp := sum.DataPoints().AppendEmpty()
dp.SetCount(1)
dp.SetSum(5)
dp.SetTimestamp(pcommon.NewTimestampFromTime(ts))
dp.Attributes().PutStr("metric-attr", "metric value")
return md
}(),
expected: mimirpb.TimeSeries{
Labels: []mimirpb.LabelAdapter{{Name: "__name__", Value: "test_metric"}, {Name: "metric_attr", Value: "metric value"}},
Histograms: []mimirpb.Histogram{
{
Count: &mimirpb.Histogram_CountInt{CountInt: 1},
Sum: 5,
Schema: 0,
ZeroThreshold: 1e-128,
ZeroCount: &mimirpb.Histogram_ZeroCountInt{ZeroCountInt: 0},
Timestamp: ts.UnixMilli(),
ResetHint: mimirpb.Histogram_GAUGE,
},
},
},
},
{
name: "delta histogram as nhcb not allowed",
allowDelta: false,
input: func() pmetric.Metrics {
md := pmetric.NewMetrics()
rm := md.ResourceMetrics().AppendEmpty()
il := rm.ScopeMetrics().AppendEmpty()
m := il.Metrics().AppendEmpty()
m.SetName("test_metric")
sum := m.SetEmptyHistogram()
sum.SetAggregationTemporality(pmetric.AggregationTemporalityDelta)
dp := sum.DataPoints().AppendEmpty()
dp.SetCount(20)
dp.SetSum(30)
dp.BucketCounts().FromRaw([]uint64{10, 10, 0})
dp.ExplicitBounds().FromRaw([]float64{1, 2})
dp.SetTimestamp(pcommon.NewTimestampFromTime(time.Now()))
dp.Attributes().PutStr("metric-attr", "metric value")
return md
}(),
expectedErr: `otlp parse error: invalid temporality and type combination for metric "test_metric"`,
},
{
name: "delta histogram as nhcb allowed",
allowDelta: true,
input: func() pmetric.Metrics {
md := pmetric.NewMetrics()
rm := md.ResourceMetrics().AppendEmpty()
il := rm.ScopeMetrics().AppendEmpty()
m := il.Metrics().AppendEmpty()
m.SetName("test_metric")
sum := m.SetEmptyHistogram()
sum.SetAggregationTemporality(pmetric.AggregationTemporalityDelta)
dp := sum.DataPoints().AppendEmpty()
dp.SetCount(20)
dp.SetSum(30)
dp.BucketCounts().FromRaw([]uint64{10, 10, 0})
dp.ExplicitBounds().FromRaw([]float64{1, 2})
dp.SetTimestamp(pcommon.NewTimestampFromTime(ts))
dp.Attributes().PutStr("metric-attr", "metric value")
return md
}(),
expected: mimirpb.TimeSeries{
Labels: []mimirpb.LabelAdapter{{Name: "__name__", Value: "test_metric"}, {Name: "metric_attr", Value: "metric value"}},
Histograms: []mimirpb.Histogram{
{
Count: &mimirpb.Histogram_CountInt{CountInt: 20},
Sum: 30,
Schema: -53,
ZeroThreshold: 0,
ZeroCount: nil,
PositiveSpans: []mimirpb.BucketSpan{
{
Length: 3,
},
},
PositiveDeltas: []int64{10, 0, -10},
CustomValues: []float64{1, 2},
Timestamp: ts.UnixMilli(),
ResetHint: mimirpb.Histogram_GAUGE,
},
},
},
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
converter := newOTLPMimirConverter()
mimirTS, dropped, err := otelMetricsToTimeseries(
context.Background(),
converter,
tc.input,
conversionOptions{
convertHistogramsToNHCB: true,
allowDeltaTemporality: tc.allowDelta,
},
log.NewNopLogger(),
)
if tc.expectedErr != "" {
require.EqualError(t, err, tc.expectedErr)
require.Len(t, mimirTS, 0)
require.Equal(t, 1, dropped)
} else {
require.NoError(t, err)
require.Len(t, mimirTS, 1)
require.Equal(t, 0, dropped)
require.Equal(t, tc.expected, *mimirTS[0].TimeSeries)
}
})
}
}

func BenchmarkOTLPHandler(b *testing.B) {
var samples []prompb.Sample
for i := 0; i < 1000; i++ {
Expand Down
2 changes: 2 additions & 0 deletions pkg/distributor/push_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1545,6 +1545,8 @@ func (o otlpLimitsMock) OTelPromoteScopeMetadata(string) bool {
return false
}

func (o otlpLimitsMock) OTelNativeDeltaIngestion(string) bool { return false }

func promToMimirHistogram(h *prompb.Histogram) mimirpb.Histogram {
pSpans := make([]mimirpb.BucketSpan, 0, len(h.PositiveSpans))
for _, span := range h.PositiveSpans {
Expand Down
6 changes: 6 additions & 0 deletions pkg/util/validation/limits.go
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,7 @@ type Limits struct {
OTelKeepIdentifyingResourceAttributes bool `yaml:"otel_keep_identifying_resource_attributes" json:"otel_keep_identifying_resource_attributes" category:"experimental"`
OTelConvertHistogramsToNHCB bool `yaml:"otel_convert_histograms_to_nhcb" json:"otel_convert_histograms_to_nhcb" category:"experimental"`
OTelPromoteScopeMetadata bool `yaml:"otel_promote_scope_metadata" json:"otel_promote_scope_metadata" category:"experimental"`
OTelNativeDeltaIngestion bool `yaml:"otel_native_delta_ingestion" json:"otel_native_delta_ingestion" category:"experimental"`

// Ingest storage.
IngestStorageReadConsistency string `yaml:"ingest_storage_read_consistency" json:"ingest_storage_read_consistency" category:"experimental"`
Expand Down Expand Up @@ -337,6 +338,7 @@ func (l *Limits) RegisterFlags(f *flag.FlagSet) {
f.BoolVar(&l.OTelKeepIdentifyingResourceAttributes, "distributor.otel-keep-identifying-resource-attributes", false, "Whether to keep identifying OTel resource attributes in the target_info metric on top of converting to job and instance labels.")
f.BoolVar(&l.OTelConvertHistogramsToNHCB, "distributor.otel-convert-histograms-to-nhcb", false, "Whether to convert OTel explicit histograms into native histograms with custom buckets.")
f.BoolVar(&l.OTelPromoteScopeMetadata, "distributor.otel-promote-scope-metadata", false, "Whether to promote OTel scope metadata (scope name, version, schema URL, attributes) to corresponding metric labels, prefixed with otel_scope_.")
f.BoolVar(&l.OTelNativeDeltaIngestion, "distributor.otel-native-delta-ingestion", false, "Whether to enable native ingestion of delta OTLP metrics, which will store the raw delta sample values without conversion. If disabled, delta metrics will be rejected. Delta support is in an early stage of development. The ingestion and querying process is likely to change over time.")

f.Var(&l.IngestionArtificialDelay, "distributor.ingestion-artificial-delay", "Target ingestion delay to apply to all tenants. If set to a non-zero value, the distributor will artificially delay ingestion time-frame by the specified duration by computing the difference between actual ingestion and the target. There is no delay on actual ingestion of samples, it is only the response back to the client.")
f.IntVar(&l.IngestionArtificialDelayConditionForTenantsWithLessThanMaxSeries, "distributor.ingestion-artificial-delay-condition-for-tenants-with-less-than-max-series", 0, "Condition to select tenants for which -distributor.ingestion-artificial-delay-duration-for-tenants-with-less-than-max-series should be applied.")
Expand Down Expand Up @@ -1329,6 +1331,10 @@ func (o *Overrides) OTelPromoteScopeMetadata(tenantID string) bool {
return o.getOverridesForUser(tenantID).OTelPromoteScopeMetadata
}

func (o *Overrides) OTelNativeDeltaIngestion(tenantID string) bool {
return o.getOverridesForUser(tenantID).OTelNativeDeltaIngestion
}

// DistributorIngestionArtificialDelay returns the artificial ingestion latency for a given user.
func (o *Overrides) DistributorIngestionArtificialDelay(tenantID string) time.Duration {
overrides := o.getOverridesForUser(tenantID)
Expand Down