Skip to content

Commit a077234

Browse files
feat: add health check/warmup to segmentwriter to reduce first-write latency (#4453)
* feat: add health check/warmup to segmentwriter to reduce first-write latency * Ignore error more explicitly * Linter fix
1 parent c99bfe5 commit a077234

File tree

1 file changed

+52
-13
lines changed

1 file changed

+52
-13
lines changed

pkg/segmentwriter/service.go

Lines changed: 52 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -41,19 +41,21 @@ const (
4141
)
4242

4343
type Config struct {
44-
GRPCClientConfig grpcclient.Config `yaml:"grpc_client_config" doc:"description=Configures the gRPC client used to communicate with the segment writer."`
45-
LifecyclerConfig ring.LifecyclerConfig `yaml:"lifecycler,omitempty"`
46-
SegmentDuration time.Duration `yaml:"segment_duration,omitempty" category:"advanced"`
47-
FlushConcurrency uint `yaml:"flush_concurrency,omitempty" category:"advanced"`
48-
UploadTimeout time.Duration `yaml:"upload-timeout,omitempty" category:"advanced"`
49-
UploadMaxRetries int `yaml:"upload-retry_max_retries,omitempty" category:"advanced"`
50-
UploadMinBackoff time.Duration `yaml:"upload-retry_min_period,omitempty" category:"advanced"`
51-
UploadMaxBackoff time.Duration `yaml:"upload-retry_max_period,omitempty" category:"advanced"`
52-
UploadHedgeAfter time.Duration `yaml:"upload-hedge_upload_after,omitempty" category:"advanced"`
53-
UploadHedgeRateMax float64 `yaml:"upload-hedge_rate_max,omitempty" category:"advanced"`
54-
UploadHedgeRateBurst uint `yaml:"upload-hedge_rate_burst,omitempty" category:"advanced"`
55-
MetadataDLQEnabled bool `yaml:"metadata_dlq_enabled,omitempty" category:"advanced"`
56-
MetadataUpdateTimeout time.Duration `yaml:"metadata_update_timeout,omitempty" category:"advanced"`
44+
GRPCClientConfig grpcclient.Config `yaml:"grpc_client_config" doc:"description=Configures the gRPC client used to communicate with the segment writer."`
45+
LifecyclerConfig ring.LifecyclerConfig `yaml:"lifecycler,omitempty"`
46+
SegmentDuration time.Duration `yaml:"segment_duration,omitempty" category:"advanced"`
47+
FlushConcurrency uint `yaml:"flush_concurrency,omitempty" category:"advanced"`
48+
UploadTimeout time.Duration `yaml:"upload-timeout,omitempty" category:"advanced"`
49+
UploadMaxRetries int `yaml:"upload-retry_max_retries,omitempty" category:"advanced"`
50+
UploadMinBackoff time.Duration `yaml:"upload-retry_min_period,omitempty" category:"advanced"`
51+
UploadMaxBackoff time.Duration `yaml:"upload-retry_max_period,omitempty" category:"advanced"`
52+
UploadHedgeAfter time.Duration `yaml:"upload-hedge_upload_after,omitempty" category:"advanced"`
53+
UploadHedgeRateMax float64 `yaml:"upload-hedge_rate_max,omitempty" category:"advanced"`
54+
UploadHedgeRateBurst uint `yaml:"upload-hedge_rate_burst,omitempty" category:"advanced"`
55+
MetadataDLQEnabled bool `yaml:"metadata_dlq_enabled,omitempty" category:"advanced"`
56+
MetadataUpdateTimeout time.Duration `yaml:"metadata_update_timeout,omitempty" category:"advanced"`
57+
BucketHealthCheckEnabled bool `yaml:"bucket_health_check_enabled,omitempty" category:"advanced"`
58+
BucketHealthCheckTimeout time.Duration `yaml:"bucket_health_check_timeout,omitempty" category:"advanced"`
5759
}
5860

5961
func (cfg *Config) Validate() error {
@@ -79,6 +81,8 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
7981
f.UintVar(&cfg.UploadHedgeRateBurst, prefix+".upload-hedge-rate-burst", defaultHedgedRequestBurst, "Maximum number of hedged requests in a burst.")
8082
f.BoolVar(&cfg.MetadataDLQEnabled, prefix+".metadata-dlq-enabled", true, "Enables dead letter queue (DLQ) for metadata. If the metadata update fails, it will be stored and updated asynchronously.")
8183
f.DurationVar(&cfg.MetadataUpdateTimeout, prefix+".metadata-update-timeout", 2*time.Second, "Timeout for metadata update requests.")
84+
f.BoolVar(&cfg.BucketHealthCheckEnabled, prefix+".bucket-health-check-enabled", true, "Enables bucket health check on startup. This both validates credentials and warms up the connection to reduce latency for the first write.")
85+
f.DurationVar(&cfg.BucketHealthCheckTimeout, prefix+".bucket-health-check-timeout", 10*time.Second, "Timeout for bucket health check operations.")
8286
}
8387

8488
type Limits interface {
@@ -155,7 +159,42 @@ func New(
155159
return i, nil
156160
}
157161

162+
// performBucketHealthCheck performs a lightweight bucket operation to warm up the connection
163+
// and detect any object storage issues early. This serves the dual purpose of validating
164+
// bucket accessibility and reducing latency for the first actual write operation.
165+
func (i *SegmentWriterService) performBucketHealthCheck(ctx context.Context) error {
166+
if !i.config.BucketHealthCheckEnabled {
167+
return nil
168+
}
169+
170+
level.Debug(i.logger).Log("msg", "starting bucket health check", "timeout", i.config.BucketHealthCheckTimeout.String())
171+
172+
healthCheckCtx, cancel := context.WithTimeout(ctx, i.config.BucketHealthCheckTimeout)
173+
defer cancel()
174+
175+
err := i.storageBucket.Iter(healthCheckCtx, "", func(string) error {
176+
// We only care about connectivity, not the actual contents
177+
// Return an error to stop iteration after first item (if any)
178+
return errors.New("stop iteration")
179+
})
180+
181+
// Ignore the "stop iteration" error we intentionally return
182+
// and any "object not found" type errors as they indicate the bucket is accessible
183+
if err == nil || i.storageBucket.IsObjNotFoundErr(err) || err.Error() == "stop iteration" {
184+
level.Debug(i.logger).Log("msg", "bucket health check succeeded")
185+
return nil
186+
}
187+
188+
level.Warn(i.logger).Log("msg", "bucket health check failed", "err", err)
189+
return nil // Don't fail startup, just warn
190+
}
191+
158192
func (i *SegmentWriterService) starting(ctx context.Context) error {
193+
// Perform bucket health check before ring registration to warm up the connection
194+
// and avoid slow first requests affecting p99 latency
195+
// On error, will emit a warning but continue startup
196+
_ = i.performBucketHealthCheck(ctx)
197+
159198
if err := services.StartManagerAndAwaitHealthy(ctx, i.subservices); err != nil {
160199
return err
161200
}

0 commit comments

Comments
 (0)