diff --git a/distros/kubernetes/nvsentinel/templates/configmap.yaml b/distros/kubernetes/nvsentinel/templates/configmap.yaml index 38da2cc59..b2003fa55 100644 --- a/distros/kubernetes/nvsentinel/templates/configmap.yaml +++ b/distros/kubernetes/nvsentinel/templates/configmap.yaml @@ -25,6 +25,7 @@ data: "K8sConnectorQps": {{ printf "%.2f" .Values.platformConnector.k8sConnector.qps }}, "K8sConnectorBurst": {{ .Values.platformConnector.k8sConnector.burst }}, "MaxNodeConditionMessageLength": {{ .Values.platformConnector.k8sConnector.maxNodeConditionMessageLength }}, + "StoreConnectorMaxRetries": {{ .Values.platformConnector.mongodbStore.maxRetries }}, "enableMongoDBStorePlatformConnector": "{{ .Values.global.mongodbStore.enabled }}", "enablePostgresDBStorePlatformConnector": {{ if and .Values.global.datastore .Values.global.datastore.provider }}{{ eq .Values.global.datastore.provider "postgresql" | quote }}{{ else }}"false"{{ end }} {{- with .Values.platformConnector.pipeline }} diff --git a/distros/kubernetes/nvsentinel/values.yaml b/distros/kubernetes/nvsentinel/values.yaml index 57a26581d..83146672d 100644 --- a/distros/kubernetes/nvsentinel/values.yaml +++ b/distros/kubernetes/nvsentinel/values.yaml @@ -112,6 +112,7 @@ platformConnector: mongodbStore: enabled: false clientCertMountPath: "/etc/ssl/mongo-client" + maxRetries: 3 postgresqlStore: clientCertMountPath: "/etc/ssl/client-certs" diff --git a/platform-connectors/main.go b/platform-connectors/main.go index 9620c6b95..c253a6b54 100755 --- a/platform-connectors/main.go +++ b/platform-connectors/main.go @@ -135,12 +135,20 @@ func initializeK8sConnector( func initializeDatabaseStoreConnector( ctx context.Context, + config map[string]interface{}, databaseClientCertMountPath string, ) (*store.DatabaseStoreConnector, error) { ringBuffer := ringbuffer.NewRingBuffer("databaseStore", ctx) server.InitializeAndAttachRingBufferForConnectors(ringBuffer) - storeConnector, err := store.InitializeDatabaseStoreConnector(ctx, ringBuffer, databaseClientCertMountPath) + maxRetriesFloat, ok := config["StoreConnectorMaxRetries"].(float64) + if !ok { + return nil, fmt.Errorf("failed to convert StoreConnectorMaxRetries to float: %v", config["StoreConnectorMaxRetries"]) + } + + maxRetries := int(maxRetriesFloat) + + storeConnector, err := store.InitializeDatabaseStoreConnector(ctx, ringBuffer, databaseClientCertMountPath, maxRetries) if err != nil { return nil, fmt.Errorf("failed to initialize database store connector: %w", err) } @@ -257,7 +265,7 @@ func initializeConnectors( // Keep the legacy config key name for backward compatibility with existing ConfigMaps if config["enableMongoDBStorePlatformConnector"] == True || config["enablePostgresDBStorePlatformConnector"] == True { - storeConnector, err = initializeDatabaseStoreConnector(ctx, databaseClientCertMountPath) + storeConnector, err = initializeDatabaseStoreConnector(ctx, config, databaseClientCertMountPath) if err != nil { return nil, nil, fmt.Errorf("failed to initialize database store connector: %w", err) } diff --git a/platform-connectors/pkg/connectors/store/store_connector.go b/platform-connectors/pkg/connectors/store/store_connector.go index b26cf0a19..1d0ff16d1 100644 --- a/platform-connectors/pkg/connectors/store/store_connector.go +++ b/platform-connectors/pkg/connectors/store/store_connector.go @@ -38,22 +38,25 @@ type DatabaseStoreConnector struct { // resourceSinkClients are client for pushing data to the resource count sink ringBuffer *ringbuffer.RingBuffer nodeName string + maxRetries int } func new( databaseClient client.DatabaseClient, ringBuffer *ringbuffer.RingBuffer, nodeName string, + maxRetries int, ) *DatabaseStoreConnector { return &DatabaseStoreConnector{ databaseClient: databaseClient, ringBuffer: ringBuffer, nodeName: nodeName, + maxRetries: maxRetries, } } func InitializeDatabaseStoreConnector(ctx context.Context, ringbuffer *ringbuffer.RingBuffer, - clientCertMountPath string) (*DatabaseStoreConnector, error) { + clientCertMountPath string, maxRetries int) (*DatabaseStoreConnector, error) { nodeName := os.Getenv("NODE_NAME") if nodeName == "" { return nil, fmt.Errorf("NODE_NAME is not set") @@ -71,9 +74,9 @@ func InitializeDatabaseStoreConnector(ctx context.Context, ringbuffer *ringbuffe return nil, fmt.Errorf("failed to create database client: %w", err) } - slog.Info("Successfully initialized database store connector") + slog.Info("Successfully initialized database store connector", "maxRetries", maxRetries) - return new(databaseClient, ringbuffer, nodeName), nil + return new(databaseClient, ringbuffer, nodeName, maxRetries), nil } func createClientFactory(databaseClientCertMountPath string) (*factory.ClientFactory, error) { @@ -105,8 +108,25 @@ func (r *DatabaseStoreConnector) FetchAndProcessHealthMetric(ctx context.Context err := r.insertHealthEvents(ctx, healthEvents) if err != nil { - slog.Error("Error inserting health events", "error", err) - r.ringBuffer.HealthMetricEleProcessingFailed(healthEvents) + retryCount := r.ringBuffer.NumRequeues(healthEvents) + if retryCount < r.maxRetries { + slog.Warn("Error inserting health events, will retry with exponential backoff", + "error", err, + "retryCount", retryCount, + "maxRetries", r.maxRetries, + "eventCount", len(healthEvents.GetEvents())) + + r.ringBuffer.AddRateLimited(healthEvents) + } else { + slog.Error("Max retries exceeded, dropping health events permanently", + "error", err, + "retryCount", retryCount, + "maxRetries", r.maxRetries, + "eventCount", len(healthEvents.GetEvents()), + "firstEventNodeName", healthEvents.GetEvents()[0].GetNodeName(), + "firstEventCheckName", healthEvents.GetEvents()[0].GetCheckName()) + r.ringBuffer.HealthMetricEleProcessingCompleted(healthEvents) + } } else { r.ringBuffer.HealthMetricEleProcessingCompleted(healthEvents) } diff --git a/platform-connectors/pkg/connectors/store/store_connector_test.go b/platform-connectors/pkg/connectors/store/store_connector_test.go index f296c7530..2b38d496c 100644 --- a/platform-connectors/pkg/connectors/store/store_connector_test.go +++ b/platform-connectors/pkg/connectors/store/store_connector_test.go @@ -295,10 +295,117 @@ func TestInitializeDatabaseStoreConnector(t *testing.T) { }() ringBuffer := ringbuffer.NewRingBuffer("test", context.Background()) - connector, err := InitializeDatabaseStoreConnector(context.Background(), ringBuffer, "") + connector, err := InitializeDatabaseStoreConnector(context.Background(), ringBuffer, "", 3) require.Error(t, err) require.Nil(t, connector) require.Contains(t, err.Error(), "NODE_NAME is not set") }) } + +// TestMessageRetriedOnMongoDBFailure verifies that +// messages are retried with exponential backoff when MongoDB write fails. +func TestMessageRetriedOnMongoDBFailure(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + ringBuffer := ringbuffer.NewRingBuffer("testRetryBehavior", ctx, + ringbuffer.WithRetryConfig(10*time.Millisecond, 50*time.Millisecond)) + nodeName := "testNode" + mockClient := &mockDatabaseClient{} + + // First 2 calls fail, 3rd call succeeds + mockClient.On("InsertMany", mock.Anything, mock.Anything). + Return(nil, errors.New("MongoDB temporarily unavailable")).Times(2) + mockClient.On("InsertMany", mock.Anything, mock.Anything). + Return(&client.InsertManyResult{InsertedIDs: []interface{}{"id1"}}, nil).Once() + + connector := &DatabaseStoreConnector{ + databaseClient: mockClient, + ringBuffer: ringBuffer, + nodeName: nodeName, + maxRetries: 3, + } + + healthEvent := &protos.HealthEvent{ + NodeName: "gpu-node-1", + GeneratedTimestamp: timestamppb.New(time.Now()), + CheckName: "GpuXidError", + ErrorCode: []string{"79"}, // GPU fell off the bus + IsFatal: true, + IsHealthy: false, + } + + healthEvents := &protos.HealthEvents{ + Events: []*protos.HealthEvent{healthEvent}, + } + + ringBuffer.Enqueue(healthEvents) + require.Equal(t, 1, ringBuffer.CurrentLength(), "Event should be in queue") + go connector.FetchAndProcessHealthMetric(ctx) + + require.Eventually(t, func() bool { + return ringBuffer.CurrentLength() == 0 + }, 500*time.Millisecond, 10*time.Millisecond, "Queue should be empty after successful retry") + + // Give a bit more time for all async operations to complete + time.Sleep(100 * time.Millisecond) + + // Verify correct number of retry attempts + mockClient.AssertNumberOfCalls(t, "InsertMany", 3) + cancel() +} + +// TestMessageDroppedAfterMaxRetries verifies that messages are eventually dropped +// after exceeding the maximum retry count to prevent unbounded memory growth. +func TestMessageDroppedAfterMaxRetries(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + ringBuffer := ringbuffer.NewRingBuffer("testMaxRetries", ctx, + ringbuffer.WithRetryConfig(10*time.Millisecond, 50*time.Millisecond)) + nodeName := "testNode" + mockClient := &mockDatabaseClient{} + + // Always fail to simulate persistent MongoDB outage + mockClient.On("InsertMany", mock.Anything, mock.Anything).Return( + (*client.InsertManyResult)(nil), + errors.New("MongoDB permanently down"), + ) + + connector := &DatabaseStoreConnector{ + databaseClient: mockClient, + ringBuffer: ringBuffer, + nodeName: nodeName, + maxRetries: 3, + } + + healthEvent := &protos.HealthEvent{ + NodeName: "gpu-node-1", + GeneratedTimestamp: timestamppb.New(time.Now()), + CheckName: "GpuXidError", + ErrorCode: []string{"79"}, + IsFatal: true, + IsHealthy: false, + } + + healthEvents := &protos.HealthEvents{ + Events: []*protos.HealthEvent{healthEvent}, + } + + ringBuffer.Enqueue(healthEvents) + require.Equal(t, 1, ringBuffer.CurrentLength()) + + go connector.FetchAndProcessHealthMetric(ctx) + + require.Eventually(t, func() bool { + return ringBuffer.CurrentLength() == 0 + }, 500*time.Millisecond, 10*time.Millisecond, "Event should be dropped after max retries") + + // Give enough time for the final retry attempt to complete + time.Sleep(100 * time.Millisecond) + + // Verify we attempted initial call plus 3 retries (4 total) + mockClient.AssertNumberOfCalls(t, "InsertMany", 4) + cancel() +} diff --git a/platform-connectors/pkg/ringbuffer/ring_buffer.go b/platform-connectors/pkg/ringbuffer/ring_buffer.go index 61272095b..c9bffb5f9 100644 --- a/platform-connectors/pkg/ringbuffer/ring_buffer.go +++ b/platform-connectors/pkg/ringbuffer/ring_buffer.go @@ -18,23 +18,57 @@ import ( "context" "errors" "log/slog" + "time" "k8s.io/client-go/util/workqueue" "github.com/nvidia/nvsentinel/data-models/pkg/protos" ) +const ( + // Default retry configuration for production use + // Retry 1: 500ms, Retry 2: 1.5s, Retry 3: 3s (total ~5s to 4th attempt) + DefaultBaseDelay = 500 * time.Millisecond + DefaultMaxDelay = 3 * time.Second +) + type RingBuffer struct { ringBufferIdentifier string healthMetricQueue workqueue.TypedRateLimitingInterface[*protos.HealthEvents] ctx context.Context } -func NewRingBuffer(ringBufferName string, ctx context.Context) *RingBuffer { +type Option func(*config) + +type config struct { + baseDelay time.Duration + maxDelay time.Duration +} + +func WithRetryConfig(baseDelay, maxDelay time.Duration) Option { + return func(c *config) { + c.baseDelay = baseDelay + c.maxDelay = maxDelay + } +} + +func NewRingBuffer(ringBufferName string, ctx context.Context, opts ...Option) *RingBuffer { + cfg := &config{ + baseDelay: DefaultBaseDelay, + maxDelay: DefaultMaxDelay, + } + for _, opt := range opts { + opt(cfg) + } + workqueue.SetProvider(prometheusMetricsProvider{}) + rateLimiter := workqueue.NewTypedItemExponentialFailureRateLimiter[*protos.HealthEvents]( + cfg.baseDelay, + cfg.maxDelay, + ) queue := workqueue.NewTypedRateLimitingQueueWithConfig( - workqueue.DefaultTypedControllerRateLimiter[*protos.HealthEvents](), + rateLimiter, workqueue.TypedRateLimitingQueueConfig[*protos.HealthEvents]{ Name: ringBufferName, }, @@ -72,6 +106,7 @@ func (rb *RingBuffer) Dequeue() (*protos.HealthEvents, bool) { } func (rb *RingBuffer) HealthMetricEleProcessingCompleted(data *protos.HealthEvents) { + rb.healthMetricQueue.Forget(data) rb.healthMetricQueue.Done(data) } @@ -80,6 +115,15 @@ func (rb *RingBuffer) HealthMetricEleProcessingFailed(data *protos.HealthEvents) rb.healthMetricQueue.Done(data) } +func (rb *RingBuffer) AddRateLimited(data *protos.HealthEvents) { + rb.healthMetricQueue.AddRateLimited(data) + rb.healthMetricQueue.Done(data) +} + +func (rb *RingBuffer) NumRequeues(data *protos.HealthEvents) int { + return rb.healthMetricQueue.NumRequeues(data) +} + func (rb *RingBuffer) ShutDownHealthMetricQueue() { rb.healthMetricQueue.ShutDownWithDrain() }