Skip to content

Commit 0729af4

Browse files
authored
fix: Retry on failures writing into MongoDB (#670)
Signed-off-by: Suket Sharma <sukets@nvidia.com>
1 parent 71eb791 commit 0729af4

6 files changed

Lines changed: 191 additions & 10 deletions

File tree

distros/kubernetes/nvsentinel/templates/configmap.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ data:
2525
"K8sConnectorQps": {{ printf "%.2f" .Values.platformConnector.k8sConnector.qps }},
2626
"K8sConnectorBurst": {{ .Values.platformConnector.k8sConnector.burst }},
2727
"MaxNodeConditionMessageLength": {{ .Values.platformConnector.k8sConnector.maxNodeConditionMessageLength }},
28+
"StoreConnectorMaxRetries": {{ .Values.platformConnector.mongodbStore.maxRetries }},
2829
"enableMongoDBStorePlatformConnector": "{{ .Values.global.mongodbStore.enabled }}",
2930
"enablePostgresDBStorePlatformConnector": {{ if and .Values.global.datastore .Values.global.datastore.provider }}{{ eq .Values.global.datastore.provider "postgresql" | quote }}{{ else }}"false"{{ end }}
3031
{{- with .Values.platformConnector.pipeline }}

distros/kubernetes/nvsentinel/values.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,7 @@ platformConnector:
114114
mongodbStore:
115115
enabled: false
116116
clientCertMountPath: "/etc/ssl/mongo-client"
117+
maxRetries: 3
117118

118119
postgresqlStore:
119120
clientCertMountPath: "/etc/ssl/client-certs"

platform-connectors/main.go

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -135,12 +135,20 @@ func initializeK8sConnector(
135135

136136
func initializeDatabaseStoreConnector(
137137
ctx context.Context,
138+
config map[string]interface{},
138139
databaseClientCertMountPath string,
139140
) (*store.DatabaseStoreConnector, error) {
140141
ringBuffer := ringbuffer.NewRingBuffer("databaseStore", ctx)
141142
server.InitializeAndAttachRingBufferForConnectors(ringBuffer)
142143

143-
storeConnector, err := store.InitializeDatabaseStoreConnector(ctx, ringBuffer, databaseClientCertMountPath)
144+
maxRetriesInt64, ok := config["StoreConnectorMaxRetries"].(int64)
145+
if !ok {
146+
return nil, fmt.Errorf("failed to convert StoreConnectorMaxRetries to int: %v", config["StoreConnectorMaxRetries"])
147+
}
148+
149+
maxRetries := int(maxRetriesInt64)
150+
151+
storeConnector, err := store.InitializeDatabaseStoreConnector(ctx, ringBuffer, databaseClientCertMountPath, maxRetries)
144152
if err != nil {
145153
return nil, fmt.Errorf("failed to initialize database store connector: %w", err)
146154
}
@@ -257,7 +265,7 @@ func initializeConnectors(
257265

258266
// Keep the legacy config key name for backward compatibility with existing ConfigMaps
259267
if config["enableMongoDBStorePlatformConnector"] == True || config["enablePostgresDBStorePlatformConnector"] == True {
260-
storeConnector, err = initializeDatabaseStoreConnector(ctx, databaseClientCertMountPath)
268+
storeConnector, err = initializeDatabaseStoreConnector(ctx, config, databaseClientCertMountPath)
261269
if err != nil {
262270
return nil, nil, fmt.Errorf("failed to initialize database store connector: %w", err)
263271
}

platform-connectors/pkg/connectors/store/store_connector.go

Lines changed: 25 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -38,22 +38,25 @@ type DatabaseStoreConnector struct {
3838
// resourceSinkClients are client for pushing data to the resource count sink
3939
ringBuffer *ringbuffer.RingBuffer
4040
nodeName string
41+
maxRetries int
4142
}
4243

4344
func new(
4445
databaseClient client.DatabaseClient,
4546
ringBuffer *ringbuffer.RingBuffer,
4647
nodeName string,
48+
maxRetries int,
4749
) *DatabaseStoreConnector {
4850
return &DatabaseStoreConnector{
4951
databaseClient: databaseClient,
5052
ringBuffer: ringBuffer,
5153
nodeName: nodeName,
54+
maxRetries: maxRetries,
5255
}
5356
}
5457

5558
func InitializeDatabaseStoreConnector(ctx context.Context, ringbuffer *ringbuffer.RingBuffer,
56-
clientCertMountPath string) (*DatabaseStoreConnector, error) {
59+
clientCertMountPath string, maxRetries int) (*DatabaseStoreConnector, error) {
5760
nodeName := os.Getenv("NODE_NAME")
5861
if nodeName == "" {
5962
return nil, fmt.Errorf("NODE_NAME is not set")
@@ -71,9 +74,9 @@ func InitializeDatabaseStoreConnector(ctx context.Context, ringbuffer *ringbuffe
7174
return nil, fmt.Errorf("failed to create database client: %w", err)
7275
}
7376

74-
slog.Info("Successfully initialized database store connector")
77+
slog.Info("Successfully initialized database store connector", "maxRetries", maxRetries)
7578

76-
return new(databaseClient, ringbuffer, nodeName), nil
79+
return new(databaseClient, ringbuffer, nodeName, maxRetries), nil
7780
}
7881

7982
func createClientFactory(databaseClientCertMountPath string) (*factory.ClientFactory, error) {
@@ -105,8 +108,25 @@ func (r *DatabaseStoreConnector) FetchAndProcessHealthMetric(ctx context.Context
105108

106109
err := r.insertHealthEvents(ctx, healthEvents)
107110
if err != nil {
108-
slog.Error("Error inserting health events", "error", err)
109-
r.ringBuffer.HealthMetricEleProcessingFailed(healthEvents)
111+
retryCount := r.ringBuffer.NumRequeues(healthEvents)
112+
if retryCount < r.maxRetries {
113+
slog.Warn("Error inserting health events, will retry with exponential backoff",
114+
"error", err,
115+
"retryCount", retryCount,
116+
"maxRetries", r.maxRetries,
117+
"eventCount", len(healthEvents.GetEvents()))
118+
119+
r.ringBuffer.AddRateLimited(healthEvents)
120+
} else {
121+
slog.Error("Max retries exceeded, dropping health events permanently",
122+
"error", err,
123+
"retryCount", retryCount,
124+
"maxRetries", r.maxRetries,
125+
"eventCount", len(healthEvents.GetEvents()),
126+
"firstEventNodeName", healthEvents.GetEvents()[0].GetNodeName(),
127+
"firstEventCheckName", healthEvents.GetEvents()[0].GetCheckName())
128+
r.ringBuffer.HealthMetricEleProcessingCompleted(healthEvents)
129+
}
110130
} else {
111131
r.ringBuffer.HealthMetricEleProcessingCompleted(healthEvents)
112132
}

platform-connectors/pkg/connectors/store/store_connector_test.go

Lines changed: 108 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -295,10 +295,117 @@ func TestInitializeDatabaseStoreConnector(t *testing.T) {
295295
}()
296296

297297
ringBuffer := ringbuffer.NewRingBuffer("test", context.Background())
298-
connector, err := InitializeDatabaseStoreConnector(context.Background(), ringBuffer, "")
298+
connector, err := InitializeDatabaseStoreConnector(context.Background(), ringBuffer, "", 3)
299299

300300
require.Error(t, err)
301301
require.Nil(t, connector)
302302
require.Contains(t, err.Error(), "NODE_NAME is not set")
303303
})
304304
}
305+
306+
// TestMessageRetriedOnMongoDBFailure verifies that
307+
// messages are retried with exponential backoff when MongoDB write fails.
308+
func TestMessageRetriedOnMongoDBFailure(t *testing.T) {
309+
ctx, cancel := context.WithCancel(context.Background())
310+
defer cancel()
311+
312+
ringBuffer := ringbuffer.NewRingBuffer("testRetryBehavior", ctx,
313+
ringbuffer.WithRetryConfig(10*time.Millisecond, 50*time.Millisecond))
314+
nodeName := "testNode"
315+
mockClient := &mockDatabaseClient{}
316+
317+
// First 2 calls fail, 3rd call succeeds
318+
mockClient.On("InsertMany", mock.Anything, mock.Anything).
319+
Return(nil, errors.New("MongoDB temporarily unavailable")).Times(2)
320+
mockClient.On("InsertMany", mock.Anything, mock.Anything).
321+
Return(&client.InsertManyResult{InsertedIDs: []interface{}{"id1"}}, nil).Once()
322+
323+
connector := &DatabaseStoreConnector{
324+
databaseClient: mockClient,
325+
ringBuffer: ringBuffer,
326+
nodeName: nodeName,
327+
maxRetries: 3,
328+
}
329+
330+
healthEvent := &protos.HealthEvent{
331+
NodeName: "gpu-node-1",
332+
GeneratedTimestamp: timestamppb.New(time.Now()),
333+
CheckName: "GpuXidError",
334+
ErrorCode: []string{"79"}, // GPU fell off the bus
335+
IsFatal: true,
336+
IsHealthy: false,
337+
}
338+
339+
healthEvents := &protos.HealthEvents{
340+
Events: []*protos.HealthEvent{healthEvent},
341+
}
342+
343+
ringBuffer.Enqueue(healthEvents)
344+
require.Equal(t, 1, ringBuffer.CurrentLength(), "Event should be in queue")
345+
go connector.FetchAndProcessHealthMetric(ctx)
346+
347+
require.Eventually(t, func() bool {
348+
return ringBuffer.CurrentLength() == 0
349+
}, 500*time.Millisecond, 10*time.Millisecond, "Queue should be empty after successful retry")
350+
351+
// Give a bit more time for all async operations to complete
352+
time.Sleep(100 * time.Millisecond)
353+
354+
// Verify correct number of retry attempts
355+
mockClient.AssertNumberOfCalls(t, "InsertMany", 3)
356+
cancel()
357+
}
358+
359+
// TestMessageDroppedAfterMaxRetries verifies that messages are eventually dropped
360+
// after exceeding the maximum retry count to prevent unbounded memory growth.
361+
func TestMessageDroppedAfterMaxRetries(t *testing.T) {
362+
ctx, cancel := context.WithCancel(context.Background())
363+
defer cancel()
364+
365+
ringBuffer := ringbuffer.NewRingBuffer("testMaxRetries", ctx,
366+
ringbuffer.WithRetryConfig(10*time.Millisecond, 50*time.Millisecond))
367+
nodeName := "testNode"
368+
mockClient := &mockDatabaseClient{}
369+
370+
// Always fail to simulate persistent MongoDB outage
371+
mockClient.On("InsertMany", mock.Anything, mock.Anything).Return(
372+
(*client.InsertManyResult)(nil),
373+
errors.New("MongoDB permanently down"),
374+
)
375+
376+
connector := &DatabaseStoreConnector{
377+
databaseClient: mockClient,
378+
ringBuffer: ringBuffer,
379+
nodeName: nodeName,
380+
maxRetries: 3,
381+
}
382+
383+
healthEvent := &protos.HealthEvent{
384+
NodeName: "gpu-node-1",
385+
GeneratedTimestamp: timestamppb.New(time.Now()),
386+
CheckName: "GpuXidError",
387+
ErrorCode: []string{"79"},
388+
IsFatal: true,
389+
IsHealthy: false,
390+
}
391+
392+
healthEvents := &protos.HealthEvents{
393+
Events: []*protos.HealthEvent{healthEvent},
394+
}
395+
396+
ringBuffer.Enqueue(healthEvents)
397+
require.Equal(t, 1, ringBuffer.CurrentLength())
398+
399+
go connector.FetchAndProcessHealthMetric(ctx)
400+
401+
require.Eventually(t, func() bool {
402+
return ringBuffer.CurrentLength() == 0
403+
}, 500*time.Millisecond, 10*time.Millisecond, "Event should be dropped after max retries")
404+
405+
// Give enough time for the final retry attempt to complete
406+
time.Sleep(100 * time.Millisecond)
407+
408+
// Verify we attempted initial call plus 3 retries (4 total)
409+
mockClient.AssertNumberOfCalls(t, "InsertMany", 4)
410+
cancel()
411+
}

platform-connectors/pkg/ringbuffer/ring_buffer.go

Lines changed: 46 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,23 +18,57 @@ import (
1818
"context"
1919
"errors"
2020
"log/slog"
21+
"time"
2122

2223
"k8s.io/client-go/util/workqueue"
2324

2425
"github.com/nvidia/nvsentinel/data-models/pkg/protos"
2526
)
2627

28+
const (
29+
// Default retry configuration for production use
30+
// Retry 1: 500ms, Retry 2: 1.5s, Retry 3: 3s (total ~5s to 4th attempt)
31+
DefaultBaseDelay = 500 * time.Millisecond
32+
DefaultMaxDelay = 3 * time.Second
33+
)
34+
2735
type RingBuffer struct {
2836
ringBufferIdentifier string
2937
healthMetricQueue workqueue.TypedRateLimitingInterface[*protos.HealthEvents]
3038
ctx context.Context
3139
}
3240

33-
func NewRingBuffer(ringBufferName string, ctx context.Context) *RingBuffer {
41+
type Option func(*config)
42+
43+
type config struct {
44+
baseDelay time.Duration
45+
maxDelay time.Duration
46+
}
47+
48+
func WithRetryConfig(baseDelay, maxDelay time.Duration) Option {
49+
return func(c *config) {
50+
c.baseDelay = baseDelay
51+
c.maxDelay = maxDelay
52+
}
53+
}
54+
55+
func NewRingBuffer(ringBufferName string, ctx context.Context, opts ...Option) *RingBuffer {
56+
cfg := &config{
57+
baseDelay: DefaultBaseDelay,
58+
maxDelay: DefaultMaxDelay,
59+
}
60+
for _, opt := range opts {
61+
opt(cfg)
62+
}
63+
3464
workqueue.SetProvider(prometheusMetricsProvider{})
65+
rateLimiter := workqueue.NewTypedItemExponentialFailureRateLimiter[*protos.HealthEvents](
66+
cfg.baseDelay,
67+
cfg.maxDelay,
68+
)
3569

3670
queue := workqueue.NewTypedRateLimitingQueueWithConfig(
37-
workqueue.DefaultTypedControllerRateLimiter[*protos.HealthEvents](),
71+
rateLimiter,
3872
workqueue.TypedRateLimitingQueueConfig[*protos.HealthEvents]{
3973
Name: ringBufferName,
4074
},
@@ -72,6 +106,7 @@ func (rb *RingBuffer) Dequeue() (*protos.HealthEvents, bool) {
72106
}
73107

74108
func (rb *RingBuffer) HealthMetricEleProcessingCompleted(data *protos.HealthEvents) {
109+
rb.healthMetricQueue.Forget(data)
75110
rb.healthMetricQueue.Done(data)
76111
}
77112

@@ -80,6 +115,15 @@ func (rb *RingBuffer) HealthMetricEleProcessingFailed(data *protos.HealthEvents)
80115
rb.healthMetricQueue.Done(data)
81116
}
82117

118+
func (rb *RingBuffer) AddRateLimited(data *protos.HealthEvents) {
119+
rb.healthMetricQueue.AddRateLimited(data)
120+
rb.healthMetricQueue.Done(data)
121+
}
122+
123+
func (rb *RingBuffer) NumRequeues(data *protos.HealthEvents) int {
124+
return rb.healthMetricQueue.NumRequeues(data)
125+
}
126+
83127
func (rb *RingBuffer) ShutDownHealthMetricQueue() {
84128
rb.healthMetricQueue.ShutDownWithDrain()
85129
}

0 commit comments

Comments
 (0)