Skip to content
Merged
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
* [FEATURE] Query-frontend: Allow use of Mimir Query Engine (MQE) via the experimental CLI flags `-query-frontend.query-engine` or `-query-frontend.enable-query-engine-fallback` or corresponding YAML. #11417
* [FEATURE] Querier, query-frontend, ruler: Enable experimental support for duration expressions in PromQL, which are simple arithmetics on numbers in offset and range specification. #11344
* [FEATURE] You can configure Mimir to export traces in OTLP exposition format through the standard `OTEL_` environment variables. #11618
* [FEATURE] Distributor: Add experimental `-distributor.otel-native-delta-ingestion` option to allow primitive delta metrics ingestion via the OTLP endpoint. #11631
* [ENHANCEMENT] Querier: Make the maximum series limit for cardinality API requests configurable on a per-tenant basis with the `cardinality_analysis_max_results` option. #11456
* [ENHANCEMENT] Dashboards: Add "Queries / sec by read path" to Queries Dashboard. #11640
* [ENHANCEMENT] Dashboards: Add "Added Latency" row to Writes Dashboard. #11579
Expand Down
11 changes: 11 additions & 0 deletions cmd/mimir/config-descriptor.json
Original file line number Diff line number Diff line change
Expand Up @@ -5755,6 +5755,17 @@
"fieldType": "boolean",
"fieldCategory": "experimental"
},
{
"kind": "field",
"name": "otel_native_delta_ingestion",
"required": false,
"desc": "Whether to enable native ingestion of delta OTLP metrics, which will store the raw delta sample values without conversion. If disabled, delta metrics will be rejected. Delta support is in an early stage of development. The ingestion and querying process is likely to change over time.",
"fieldValue": null,
"fieldDefaultValue": false,
"fieldFlag": "distributor.otel-native-delta-ingestion",
"fieldType": "boolean",
"fieldCategory": "experimental"
},
{
"kind": "field",
"name": "ingest_storage_read_consistency",
Expand Down
2 changes: 2 additions & 0 deletions cmd/mimir/help-all.txt.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -1423,6 +1423,8 @@ Usage of ./cmd/mimir/mimir:
[experimental] Whether to keep identifying OTel resource attributes in the target_info metric on top of converting to job and instance labels.
-distributor.otel-metric-suffixes-enabled
Whether to enable automatic suffixes to names of metrics ingested through OTLP.
-distributor.otel-native-delta-ingestion
[experimental] Whether to enable native ingestion of delta OTLP metrics, which will store the raw delta sample values without conversion. If disabled, delta metrics will be rejected. Delta support is in an early stage of development. The ingestion and querying process is likely to change over time.
-distributor.otel-promote-resource-attributes comma-separated-list-of-strings
[experimental] Optionally specify OTel resource attributes to promote to labels.
-distributor.remote-timeout duration
Expand Down
2 changes: 2 additions & 0 deletions docs/sources/mimir/configure/about-versioning.md
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,8 @@ The following features are currently experimental:
- `-distributor.otel-keep-identifying-resource-attributes`
- Enable conversion of OTel explicit bucket histograms into native histograms with custom buckets.
- `-distributor.otel-convert-histograms-to-nhcb`
- Enable native ingestion of delta OTLP metrics. Currently, this means storing the raw delta sample values without converting them to cumulative and having the metric type set to "Unknown". Delta support is in an early stage of development. The ingestion and querying process is likely to change over time. Considerations around querying and gotchas can be found in the [corresponding Prometheus documentation](https://prometheus.io/docs/prometheus/3.4/feature_flags/#otlp-native-delta-support).
- `distributor.otel-native-delta-ingestion`
- Hash ring
- Disabling ring heartbeat timeouts
- `-distributor.ring.heartbeat-timeout=0`
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4147,6 +4147,13 @@ ruler_alertmanager_client_config:
# CLI flag: -distributor.otel-convert-histograms-to-nhcb
[otel_convert_histograms_to_nhcb: <boolean> | default = false]

# (experimental) Whether to enable native ingestion of delta OTLP metrics, which
# will store the raw delta sample values without conversion. If disabled, delta
# metrics will be rejected. Delta support is in an early stage of development.
# The ingestion and querying process is likely to change over time.
# CLI flag: -distributor.otel-native-delta-ingestion
[otel_native_delta_ingestion: <boolean> | default = false]

# (experimental) The default consistency level to enforce for queries when using
# the ingest storage. Supports values: strong, eventual.
# CLI flag: -ingest-storage.read-consistency
Expand Down
6 changes: 5 additions & 1 deletion pkg/distributor/otel.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ type OTLPHandlerLimits interface {
PromoteOTelResourceAttributes(id string) []string
OTelKeepIdentifyingResourceAttributes(id string) bool
OTelConvertHistogramsToNHCB(id string) bool
OTelNativeDeltaIngestion(id string) bool
}

// OTLPHandler is an http.Handler accepting OTLP write requests.
Expand Down Expand Up @@ -277,11 +278,12 @@ func newOTLPParser(
promoteResourceAttributes := resourceAttributePromotionConfig.PromoteOTelResourceAttributes(tenantID)
keepIdentifyingResourceAttributes := limits.OTelKeepIdentifyingResourceAttributes(tenantID)
convertHistogramsToNHCB := limits.OTelConvertHistogramsToNHCB(tenantID)
allowDeltaTemporality := limits.OTelNativeDeltaIngestion(tenantID)

pushMetrics.IncOTLPRequest(tenantID)
pushMetrics.ObserveUncompressedBodySize(tenantID, float64(uncompressedBodySize))

metrics, metricsDropped, err := otelMetricsToTimeseries(ctx, otlpConverter, addSuffixes, enableCTZeroIngestion, enableStartTimeQuietZero, promoteResourceAttributes, keepIdentifyingResourceAttributes, convertHistogramsToNHCB, otlpReq.Metrics(), spanLogger)
metrics, metricsDropped, err := otelMetricsToTimeseries(ctx, otlpConverter, addSuffixes, enableCTZeroIngestion, enableStartTimeQuietZero, promoteResourceAttributes, keepIdentifyingResourceAttributes, convertHistogramsToNHCB, allowDeltaTemporality, otlpReq.Metrics(), spanLogger)
if metricsDropped > 0 {
discardedDueToOtelParseError.WithLabelValues(tenantID, "").Add(float64(metricsDropped)) // "group" label is empty here as metrics couldn't be parsed
}
Expand Down Expand Up @@ -515,6 +517,7 @@ func otelMetricsToTimeseries(
promoteResourceAttributes []string,
keepIdentifyingResourceAttributes bool,
convertHistogramsToNHCB bool,
allowDeltaTemporality bool,
md pmetric.Metrics,
logger log.Logger,
) ([]mimirpb.PreallocTimeseries, int, error) {
Expand All @@ -525,6 +528,7 @@ func otelMetricsToTimeseries(
PromoteResourceAttributes: otlp.NewPromoteResourceAttributes(config.OTLPConfig{PromoteResourceAttributes: promoteResourceAttributes}),
KeepIdentifyingResourceAttributes: keepIdentifyingResourceAttributes,
ConvertHistogramsToNHCB: convertHistogramsToNHCB,
AllowDeltaTemporality: allowDeltaTemporality,
}
mimirTS := converter.ToTimeseries(ctx, md, settings, logger)

Expand Down
201 changes: 201 additions & 0 deletions pkg/distributor/otel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,7 @@ func TestOTelMetricsToTimeSeries(t *testing.T) {
tc.promoteResourceAttributes,
tc.keepIdentifyingResourceAttributes,
false,
false,
md,
log.NewNopLogger(),
)
Expand Down Expand Up @@ -362,6 +363,7 @@ func TestConvertOTelHistograms(t *testing.T) {
[]string{},
false,
convertHistogramsToNHCB,
false,
md,
log.NewNopLogger(),
)
Expand Down Expand Up @@ -394,6 +396,205 @@ func TestConvertOTelHistograms(t *testing.T) {
}
}

func TestOTelDeltaIngestion(t *testing.T) {
ts := time.Unix(100, 0)

testCases := []struct {
name string
allowDelta bool
input pmetric.Metrics
expected mimirpb.TimeSeries
expectedErr string
}{
{
name: "delta counter not allowed",
allowDelta: false,
input: func() pmetric.Metrics {
md := pmetric.NewMetrics()
rm := md.ResourceMetrics().AppendEmpty()
il := rm.ScopeMetrics().AppendEmpty()
m := il.Metrics().AppendEmpty()
m.SetName("test_metric")
sum := m.SetEmptySum()
sum.SetAggregationTemporality(pmetric.AggregationTemporalityDelta)
dp := sum.DataPoints().AppendEmpty()
dp.SetTimestamp(pcommon.NewTimestampFromTime(time.Now()))
dp.Attributes().PutStr("metric-attr", "metric value")
return md
}(),
expectedErr: `otlp parse error: invalid temporality and type combination for metric "test_metric"`,
},
{
name: "delta counter allowed",
allowDelta: true,
input: func() pmetric.Metrics {
md := pmetric.NewMetrics()
rm := md.ResourceMetrics().AppendEmpty()
il := rm.ScopeMetrics().AppendEmpty()
m := il.Metrics().AppendEmpty()
m.SetName("test_metric")
sum := m.SetEmptySum()
sum.SetAggregationTemporality(pmetric.AggregationTemporalityDelta)
dp := sum.DataPoints().AppendEmpty()
dp.SetTimestamp(pcommon.NewTimestampFromTime(ts))
dp.SetIntValue(5)
dp.Attributes().PutStr("metric-attr", "metric value")
return md
}(),
expected: mimirpb.TimeSeries{
Labels: []mimirpb.LabelAdapter{{Name: "__name__", Value: "test_metric"}, {Name: "metric_attr", Value: "metric value"}},
Samples: []mimirpb.Sample{{TimestampMs: ts.UnixMilli(), Value: 5}},
},
},
{
name: "delta exponential histogram not allowed",
allowDelta: false,
input: func() pmetric.Metrics {
md := pmetric.NewMetrics()
rm := md.ResourceMetrics().AppendEmpty()
il := rm.ScopeMetrics().AppendEmpty()
m := il.Metrics().AppendEmpty()
m.SetName("test_metric")
sum := m.SetEmptyExponentialHistogram()
sum.SetAggregationTemporality(pmetric.AggregationTemporalityDelta)
dp := sum.DataPoints().AppendEmpty()
dp.SetCount(1)
dp.SetSum(5)
dp.SetTimestamp(pcommon.NewTimestampFromTime(time.Now()))
dp.Attributes().PutStr("metric-attr", "metric value")
return md
}(),
expectedErr: `otlp parse error: invalid temporality and type combination for metric "test_metric"`,
},
{
name: "delta exponential histogram allowed",
allowDelta: true,
input: func() pmetric.Metrics {
md := pmetric.NewMetrics()
rm := md.ResourceMetrics().AppendEmpty()
il := rm.ScopeMetrics().AppendEmpty()
m := il.Metrics().AppendEmpty()
m.SetName("test_metric")
sum := m.SetEmptyExponentialHistogram()
sum.SetAggregationTemporality(pmetric.AggregationTemporalityDelta)
dp := sum.DataPoints().AppendEmpty()
dp.SetCount(1)
dp.SetSum(5)
dp.SetTimestamp(pcommon.NewTimestampFromTime(ts))
dp.Attributes().PutStr("metric-attr", "metric value")
return md
}(),
expected: mimirpb.TimeSeries{
Labels: []mimirpb.LabelAdapter{{Name: "__name__", Value: "test_metric"}, {Name: "metric_attr", Value: "metric value"}},
Histograms: []mimirpb.Histogram{
{
Count: &mimirpb.Histogram_CountInt{CountInt: 1},
Sum: 5,
Schema: 0,
ZeroThreshold: 1e-128,
ZeroCount: &mimirpb.Histogram_ZeroCountInt{ZeroCountInt: 0},
Timestamp: ts.UnixMilli(),
ResetHint: mimirpb.Histogram_GAUGE,
},
},
},
},
{
name: "delta histogram as nhcb not allowed",
allowDelta: false,
input: func() pmetric.Metrics {
md := pmetric.NewMetrics()
rm := md.ResourceMetrics().AppendEmpty()
il := rm.ScopeMetrics().AppendEmpty()
m := il.Metrics().AppendEmpty()
m.SetName("test_metric")
sum := m.SetEmptyHistogram()
sum.SetAggregationTemporality(pmetric.AggregationTemporalityDelta)
dp := sum.DataPoints().AppendEmpty()
dp.SetCount(20)
dp.SetSum(30)
dp.BucketCounts().FromRaw([]uint64{10, 10, 0})
dp.ExplicitBounds().FromRaw([]float64{1, 2})
dp.SetTimestamp(pcommon.NewTimestampFromTime(time.Now()))
dp.Attributes().PutStr("metric-attr", "metric value")
return md
}(),
expectedErr: `otlp parse error: invalid temporality and type combination for metric "test_metric"`,
},
{
name: "delta histogram as nhcb allowed",
allowDelta: true,
input: func() pmetric.Metrics {
md := pmetric.NewMetrics()
rm := md.ResourceMetrics().AppendEmpty()
il := rm.ScopeMetrics().AppendEmpty()
m := il.Metrics().AppendEmpty()
m.SetName("test_metric")
sum := m.SetEmptyHistogram()
sum.SetAggregationTemporality(pmetric.AggregationTemporalityDelta)
dp := sum.DataPoints().AppendEmpty()
dp.SetCount(20)
dp.SetSum(30)
dp.BucketCounts().FromRaw([]uint64{10, 10, 0})
dp.ExplicitBounds().FromRaw([]float64{1, 2})
dp.SetTimestamp(pcommon.NewTimestampFromTime(ts))
dp.Attributes().PutStr("metric-attr", "metric value")
return md
}(),
expected: mimirpb.TimeSeries{
Labels: []mimirpb.LabelAdapter{{Name: "__name__", Value: "test_metric"}, {Name: "metric_attr", Value: "metric value"}},
Histograms: []mimirpb.Histogram{
{
Count: &mimirpb.Histogram_CountInt{CountInt: 20},
Sum: 30,
Schema: -53,
ZeroThreshold: 0,
ZeroCount: nil,
PositiveSpans: []mimirpb.BucketSpan{
{
Length: 3,
},
},
PositiveDeltas: []int64{10, 0, -10},
CustomValues: []float64{1, 2},
Timestamp: ts.UnixMilli(),
ResetHint: mimirpb.Histogram_GAUGE,
},
},
},
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
converter := newOTLPMimirConverter()
mimirTS, dropped, err := otelMetricsToTimeseries(
context.Background(),
converter,
true,
false,
false,
[]string{},
false,
true,
tc.allowDelta,
tc.input,
log.NewNopLogger(),
)
if tc.expectedErr != "" {
require.EqualError(t, err, tc.expectedErr)
require.Len(t, mimirTS, 0)
require.Equal(t, 1, dropped)
} else {
require.NoError(t, err)
require.Len(t, mimirTS, 1)
require.Equal(t, 0, dropped)
require.Equal(t, tc.expected, *mimirTS[0].TimeSeries)
}
})
}
}

func BenchmarkOTLPHandler(b *testing.B) {
var samples []prompb.Sample
for i := 0; i < 1000; i++ {
Expand Down
2 changes: 2 additions & 0 deletions pkg/distributor/push_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1541,6 +1541,8 @@ func (o otlpLimitsMock) OTelKeepIdentifyingResourceAttributes(string) bool {

func (o otlpLimitsMock) OTelConvertHistogramsToNHCB(string) bool { return false }

func (o otlpLimitsMock) OTelNativeDeltaIngestion(string) bool { return false }

func promToMimirHistogram(h *prompb.Histogram) mimirpb.Histogram {
pSpans := make([]mimirpb.BucketSpan, 0, len(h.PositiveSpans))
for _, span := range h.PositiveSpans {
Expand Down
6 changes: 6 additions & 0 deletions pkg/util/validation/limits.go
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,7 @@ type Limits struct {
PromoteOTelResourceAttributes flagext.StringSliceCSV `yaml:"promote_otel_resource_attributes" json:"promote_otel_resource_attributes" category:"experimental"`
OTelKeepIdentifyingResourceAttributes bool `yaml:"otel_keep_identifying_resource_attributes" json:"otel_keep_identifying_resource_attributes" category:"experimental"`
OTelConvertHistogramsToNHCB bool `yaml:"otel_convert_histograms_to_nhcb" json:"otel_convert_histograms_to_nhcb" category:"experimental"`
OTelNativeDeltaIngestion bool `yaml:"otel_native_delta_ingestion" json:"otel_native_delta_ingestion" category:"experimental"`

// Ingest storage.
IngestStorageReadConsistency string `yaml:"ingest_storage_read_consistency" json:"ingest_storage_read_consistency" category:"experimental"`
Expand Down Expand Up @@ -316,6 +317,7 @@ func (l *Limits) RegisterFlags(f *flag.FlagSet) {
f.Var(&l.PromoteOTelResourceAttributes, "distributor.otel-promote-resource-attributes", "Optionally specify OTel resource attributes to promote to labels.")
f.BoolVar(&l.OTelKeepIdentifyingResourceAttributes, "distributor.otel-keep-identifying-resource-attributes", false, "Whether to keep identifying OTel resource attributes in the target_info metric on top of converting to job and instance labels.")
f.BoolVar(&l.OTelConvertHistogramsToNHCB, "distributor.otel-convert-histograms-to-nhcb", false, "Whether to convert OTel explicit histograms into native histograms with custom buckets.")
f.BoolVar(&l.OTelNativeDeltaIngestion, "distributor.otel-native-delta-ingestion", false, "Whether to enable native ingestion of delta OTLP metrics, which will store the raw delta sample values without conversion. If disabled, delta metrics will be rejected. Delta support is in an early stage of development. The ingestion and querying process is likely to change over time.")

f.Var(&l.IngestionArtificialDelay, "distributor.ingestion-artificial-delay", "Target ingestion delay to apply to all tenants. If set to a non-zero value, the distributor will artificially delay ingestion time-frame by the specified duration by computing the difference between actual ingestion and the target. There is no delay on actual ingestion of samples, it is only the response back to the client.")
f.IntVar(&l.IngestionArtificialDelayConditionForTenantsWithLessThanMaxSeries, "distributor.ingestion-artificial-delay-condition-for-tenants-with-less-than-max-series", 0, "Condition to select tenants for which -distributor.ingestion-artificial-delay-duration-for-tenants-with-less-than-max-series should be applied.")
Expand Down Expand Up @@ -1267,6 +1269,10 @@ func (o *Overrides) OTelConvertHistogramsToNHCB(tenantID string) bool {
return o.getOverridesForUser(tenantID).OTelConvertHistogramsToNHCB
}

func (o *Overrides) OTelNativeDeltaIngestion(tenantID string) bool {
return o.getOverridesForUser(tenantID).OTelNativeDeltaIngestion
}

// DistributorIngestionArtificialDelay returns the artificial ingestion latency for a given user.
func (o *Overrides) DistributorIngestionArtificialDelay(tenantID string) time.Duration {
overrides := o.getOverridesForUser(tenantID)
Expand Down