Skip to content

Add nativeHistogram IngestionRate limit #6794

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
* [ENHANCEMENT] Distributor: Add min/max schema validation for NativeHistograms. #6766
* [ENHANCEMENT] Ingester: Handle runtime errors in query path #6769
* [ENHANCEMENT] Compactor: Support metadata caching bucket for Cleaner. Can be enabled via `-compactor.cleaner-caching-bucket-enabled` flag. #6778
* [ENHANCEMENT] Distributor: Add ingestion rate limit for Native Histograms. #6794
* [ENHANCEMENT] Compactor, Store Gateway: Introduce user scanner strategy and user index. #6780
* [BUGFIX] Ingester: Avoid error or early throttling when READONLY ingesters are present in the ring #6517
* [BUGFIX] Ingester: Fix labelset data race condition. #6573
Expand Down
9 changes: 9 additions & 0 deletions docs/configuration/config-file-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -3427,6 +3427,11 @@ The `limits_config` configures default and per-tenant limits imposed by Cortex s
# CLI flag: -distributor.ingestion-rate-limit
[ingestion_rate: <float> | default = 25000]

# Per-user native histograms ingestion rate limit in samples per second.
# Disabled by default
# CLI flag: -distributor.native-histograms-ingestion-rate-limit
[native_histograms_ingestion_rate: <float> | default = 1.7976931348623157e+308]

# Whether the ingestion rate limit should be applied individually to each
# distributor instance (local), or evenly shared across the cluster (global).
# CLI flag: -distributor.ingestion-rate-limit-strategy
Expand All @@ -3436,6 +3441,10 @@ The `limits_config` configures default and per-tenant limits imposed by Cortex s
# CLI flag: -distributor.ingestion-burst-size
[ingestion_burst_size: <int> | default = 50000]

# Per-user allowed native histograms ingestion burst size (in number of samples)
# CLI flag: -distributor.native-histograms-ingestion-burst-size
[native_histograms_ingestion_burst_size: <int> | default = 0]

# Flag to enable, for all users, handling of samples with external labels
# identifying replicas in an HA Prometheus setup.
# CLI flag: -distributor.ha-tracker.enable-for-all-users
Expand Down
60 changes: 43 additions & 17 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"flag"
"fmt"
"io"
"math"
"net/http"
"sort"
"strings"
Expand Down Expand Up @@ -95,7 +96,8 @@ type Distributor struct {
HATracker *ha.HATracker

// Per-user rate limiter.
ingestionRateLimiter *limiter.RateLimiter
ingestionRateLimiter *limiter.RateLimiter
nativeHistogramsIngestionRateLimiter *limiter.RateLimiter

// Manager for subservices (HA Tracker, distributor ring and client pool)
subservices *services.Manager
Expand Down Expand Up @@ -267,11 +269,13 @@ func New(cfg Config, clientConfig ingester_client.Config, limits *validation.Ove
// it's an internal dependency and can't join the distributors ring, we skip rate
// limiting.
var ingestionRateStrategy limiter.RateLimiterStrategy
var nativeHistogramsIngestionRateStrategy limiter.RateLimiterStrategy
var distributorsLifeCycler *ring.Lifecycler
var distributorsRing *ring.Ring

if !canJoinDistributorsRing {
ingestionRateStrategy = newInfiniteIngestionRateStrategy()
nativeHistogramsIngestionRateStrategy = newInfiniteIngestionRateStrategy()
} else if limits.IngestionRateStrategy() == validation.GlobalIngestionRateStrategy {
distributorsLifeCycler, err = ring.NewLifecycler(cfg.DistributorRing.ToLifecyclerConfig(), nil, "distributor", ringKey, true, true, log, prometheus.WrapRegistererWithPrefix("cortex_", reg))
if err != nil {
Expand All @@ -285,21 +289,24 @@ func New(cfg Config, clientConfig ingester_client.Config, limits *validation.Ove
subservices = append(subservices, distributorsLifeCycler, distributorsRing)

ingestionRateStrategy = newGlobalIngestionRateStrategy(limits, distributorsLifeCycler)
nativeHistogramsIngestionRateStrategy = newGlobalNativeHistogramsIngestionRateStrategy(limits, distributorsLifeCycler)
} else {
ingestionRateStrategy = newLocalIngestionRateStrategy(limits)
nativeHistogramsIngestionRateStrategy = newLocalNativeHistogramsIngestionRateStrategy(limits)
}

d := &Distributor{
cfg: cfg,
log: log,
ingestersRing: ingestersRing,
ingesterPool: NewPool(cfg.PoolConfig, ingestersRing, cfg.IngesterClientFactory, log),
distributorsLifeCycler: distributorsLifeCycler,
distributorsRing: distributorsRing,
limits: limits,
ingestionRateLimiter: limiter.NewRateLimiter(ingestionRateStrategy, 10*time.Second),
HATracker: haTracker,
ingestionRate: util_math.NewEWMARate(0.2, instanceIngestionRateTickInterval),
cfg: cfg,
log: log,
ingestersRing: ingestersRing,
ingesterPool: NewPool(cfg.PoolConfig, ingestersRing, cfg.IngesterClientFactory, log),
distributorsLifeCycler: distributorsLifeCycler,
distributorsRing: distributorsRing,
limits: limits,
ingestionRateLimiter: limiter.NewRateLimiter(ingestionRateStrategy, 10*time.Second),
nativeHistogramsIngestionRateLimiter: limiter.NewRateLimiter(nativeHistogramsIngestionRateStrategy, 10*time.Second),
HATracker: haTracker,
ingestionRate: util_math.NewEWMARate(0.2, instanceIngestionRateTickInterval),

queryDuration: instrument.NewHistogramCollector(promauto.With(reg).NewHistogramVec(prometheus.HistogramOpts{
Namespace: "cortex",
Expand Down Expand Up @@ -754,7 +761,7 @@ func (d *Distributor) Push(ctx context.Context, req *cortexpb.WriteRequest) (*co
}

// A WriteRequest can only contain series or metadata but not both. This might change in the future.
seriesKeys, validatedTimeseries, validatedFloatSamples, validatedHistogramSamples, validatedExemplars, firstPartialErr, err := d.prepareSeriesKeys(ctx, req, userID, limits, removeReplica)
seriesKeys, nhSeriesKeys, validatedTimeseries, nhValidatedTimeseries, validatedFloatSamples, validatedHistogramSamples, validatedExemplars, firstPartialErr, err := d.prepareSeriesKeys(ctx, req, userID, limits, removeReplica)
if err != nil {
return nil, err
}
Expand All @@ -765,6 +772,18 @@ func (d *Distributor) Push(ctx context.Context, req *cortexpb.WriteRequest) (*co
d.receivedExemplars.WithLabelValues(userID).Add(float64(validatedExemplars))
d.receivedMetadata.WithLabelValues(userID).Add(float64(len(validatedMetadata)))

nhRateLimited := false
if limits.NativeHistogramsIngestionRate != math.MaxFloat64 {
nhRateLimited = !d.nativeHistogramsIngestionRateLimiter.AllowN(time.Now(), userID, validatedHistogramSamples)
}

if nhRateLimited {
d.validateMetrics.DiscardedSamples.WithLabelValues(validation.NativeHistogramsRateLimited, userID).Add(float64(validatedHistogramSamples))
} else {
seriesKeys = append(seriesKeys, nhSeriesKeys...)
validatedTimeseries = append(validatedTimeseries, nhValidatedTimeseries...)
}

if len(seriesKeys) == 0 && len(metadataKeys) == 0 {
// Ensure the request slice is reused if there's no series or metadata passing the validation.
cortexpb.ReuseSlice(req.Timeseries)
Expand Down Expand Up @@ -936,14 +955,16 @@ type samplesLabelSetEntry struct {
labels labels.Labels
}

func (d *Distributor) prepareSeriesKeys(ctx context.Context, req *cortexpb.WriteRequest, userID string, limits *validation.Limits, removeReplica bool) ([]uint32, []cortexpb.PreallocTimeseries, int, int, int, error, error) {
func (d *Distributor) prepareSeriesKeys(ctx context.Context, req *cortexpb.WriteRequest, userID string, limits *validation.Limits, removeReplica bool) ([]uint32, []uint32, []cortexpb.PreallocTimeseries, []cortexpb.PreallocTimeseries, int, int, int, error, error) {
pSpan, _ := opentracing.StartSpanFromContext(ctx, "prepareSeriesKeys")
defer pSpan.Finish()

// For each timeseries or samples, we compute a hash to distribute across ingesters;
// check each sample/metadata and discard if outside limits.
validatedTimeseries := make([]cortexpb.PreallocTimeseries, 0, len(req.Timeseries))
nhValidatedTimeseries := make([]cortexpb.PreallocTimeseries, 0, len(req.Timeseries))
seriesKeys := make([]uint32, 0, len(req.Timeseries))
nhSeriesKeys := make([]uint32, 0, len(req.Timeseries))
validatedFloatSamples := 0
validatedHistogramSamples := 0
validatedExemplars := 0
Expand Down Expand Up @@ -1051,7 +1072,7 @@ func (d *Distributor) prepareSeriesKeys(ctx context.Context, req *cortexpb.Write
// label and dropped labels (if any)
key, err := d.tokenForLabels(userID, ts.Labels)
if err != nil {
return nil, nil, 0, 0, 0, nil, err
return nil, nil, nil, nil, 0, 0, 0, nil, err
}
validatedSeries, validationErr := d.validateSeries(ts, userID, skipLabelNameValidation, limits)

Expand Down Expand Up @@ -1086,8 +1107,13 @@ func (d *Distributor) prepareSeriesKeys(ctx context.Context, req *cortexpb.Write
}
}

seriesKeys = append(seriesKeys, key)
validatedTimeseries = append(validatedTimeseries, validatedSeries)
if len(ts.Histograms) > 0 {
nhSeriesKeys = append(nhSeriesKeys, key)
nhValidatedTimeseries = append(nhValidatedTimeseries, validatedSeries)
} else {
seriesKeys = append(seriesKeys, key)
validatedTimeseries = append(validatedTimeseries, validatedSeries)
}
validatedFloatSamples += len(ts.Samples)
validatedHistogramSamples += len(ts.Histograms)
validatedExemplars += len(ts.Exemplars)
Expand All @@ -1103,7 +1129,7 @@ func (d *Distributor) prepareSeriesKeys(ctx context.Context, req *cortexpb.Write
}
}

return seriesKeys, validatedTimeseries, validatedFloatSamples, validatedHistogramSamples, validatedExemplars, firstPartialErr, nil
return seriesKeys, nhSeriesKeys, validatedTimeseries, nhValidatedTimeseries, validatedFloatSamples, validatedHistogramSamples, validatedExemplars, firstPartialErr, nil
}

func sortLabelsIfNeeded(labels []cortexpb.LabelAdapter) {
Expand Down
91 changes: 91 additions & 0 deletions pkg/distributor/distributor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -681,6 +681,97 @@ func TestDistributor_PushIngestionRateLimiter(t *testing.T) {
}
}

func TestDistributor_PushIngestionRateLimiter_Histograms(t *testing.T) {
t.Parallel()
type testPush struct {
samples int
metadata int
expectedError error
expectedNHDiscardedSampleMetricValue int
}

ctx := user.InjectOrgID(context.Background(), "user")
tests := map[string]struct {
distributors int
ingestionRateStrategy string
ingestionRate float64
ingestionBurstSize int
nativeHistogramsIngestionRate float64
nativeHistogramsIngestionBurstSize int
pushes []testPush
}{
"local strategy: limit should be set to each distributor: histograms": {
distributors: 2,
ingestionRateStrategy: validation.LocalIngestionRateStrategy,
ingestionRate: 13,
ingestionBurstSize: 13,
nativeHistogramsIngestionRate: 5,
nativeHistogramsIngestionBurstSize: 5,
pushes: []testPush{
{samples: 2, expectedError: nil, expectedNHDiscardedSampleMetricValue: 0},
{samples: 4, expectedError: nil, expectedNHDiscardedSampleMetricValue: 4},
{samples: 2, metadata: 1, expectedError: nil, expectedNHDiscardedSampleMetricValue: 4},
{samples: 2, metadata: 8, expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, "ingestion rate limit (13) exceeded while adding 2 samples and 8 metadata"), expectedNHDiscardedSampleMetricValue: 6},
},
},
"global strategy: burst should set to each distributor: histograms": {
distributors: 2,
ingestionRateStrategy: validation.GlobalIngestionRateStrategy,
ingestionRate: 20,
ingestionBurstSize: 20,
nativeHistogramsIngestionRate: 6,
nativeHistogramsIngestionBurstSize: 3,
pushes: []testPush{
{samples: 2, expectedError: nil, expectedNHDiscardedSampleMetricValue: 0},
{samples: 4, expectedError: nil, expectedNHDiscardedSampleMetricValue: 4},
{samples: 1, metadata: 1, expectedError: nil, expectedNHDiscardedSampleMetricValue: 4},
{samples: 2, metadata: 15, expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, "ingestion rate limit (10) exceeded while adding 2 samples and 15 metadata"), expectedNHDiscardedSampleMetricValue: 6},
},
},
}

for testName, testData := range tests {
testData := testData

t.Run(testName, func(t *testing.T) {
t.Parallel()
limits := &validation.Limits{}
flagext.DefaultValues(limits)
limits.IngestionRateStrategy = testData.ingestionRateStrategy
limits.IngestionRate = testData.ingestionRate
limits.IngestionBurstSize = testData.ingestionBurstSize
limits.NativeHistogramsIngestionRate = testData.nativeHistogramsIngestionRate
limits.NativeHistogramsIngestionBurstSize = testData.nativeHistogramsIngestionBurstSize

// Start all expected distributors
distributors, _, _, _ := prepare(t, prepConfig{
numIngesters: 3,
happyIngesters: 3,
numDistributors: testData.distributors,
shardByAllLabels: true,
limits: limits,
})

// Push samples in multiple requests to the first distributor
for _, push := range testData.pushes {
var request = makeWriteRequest(0, 0, push.metadata, push.samples)

response, err := distributors[0].Push(ctx, request)

if push.expectedError == nil {
assert.Equal(t, emptyResponse, response)
assert.Nil(t, err)
} else {
assert.Nil(t, response)
assert.Equal(t, push.expectedError, err)
}
assert.Equal(t, float64(push.expectedNHDiscardedSampleMetricValue), testutil.ToFloat64(distributors[0].validateMetrics.DiscardedSamples.WithLabelValues(validation.NativeHistogramsRateLimited, "user")))
}
})
}

}

func TestPush_QuorumError(t *testing.T) {
t.Parallel()

Expand Down
46 changes: 46 additions & 0 deletions pkg/distributor/ingestion_rate_strategy.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,3 +72,49 @@ func (s *infiniteStrategy) Burst(tenantID string) int {
// Burst is ignored when limit = rate.Inf
return 0
}

type localStrategyNativeHistograms struct {
limits *validation.Overrides
}

func newLocalNativeHistogramsIngestionRateStrategy(limits *validation.Overrides) limiter.RateLimiterStrategy {
return &localStrategyNativeHistograms{
limits: limits,
}
}

func (s *localStrategyNativeHistograms) Limit(tenantID string) float64 {
return s.limits.NativeHistogramsIngestionRate(tenantID)
}

func (s *localStrategyNativeHistograms) Burst(tenantID string) int {
return s.limits.NativeHistogramsIngestionBurstSize(tenantID)
}

type globalStrategyNativeHistograms struct {
limits *validation.Overrides
ring ReadLifecycler
}

func newGlobalNativeHistogramsIngestionRateStrategy(limits *validation.Overrides, ring ReadLifecycler) limiter.RateLimiterStrategy {
return &globalStrategyNativeHistograms{
limits: limits,
ring: ring,
}
}

func (s *globalStrategyNativeHistograms) Limit(tenantID string) float64 {
numDistributors := s.ring.HealthyInstancesCount()

if numDistributors == 0 {
return s.limits.NativeHistogramsIngestionRate(tenantID)
}

return s.limits.NativeHistogramsIngestionRate(tenantID) / float64(numDistributors)
}

func (s *globalStrategyNativeHistograms) Burst(tenantID string) int {
// The meaning of burst doesn't change for the global strategy, in order
// to keep it easier to understand for users / operators.
return s.limits.NativeHistogramsIngestionBurstSize(tenantID)
}
Loading
Loading