Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions distros/kubernetes/nvsentinel/templates/configmap.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ data:
"K8sConnectorQps": {{ printf "%.2f" .Values.platformConnector.k8sConnector.qps }},
"K8sConnectorBurst": {{ .Values.platformConnector.k8sConnector.burst }},
"MaxNodeConditionMessageLength": {{ .Values.platformConnector.k8sConnector.maxNodeConditionMessageLength }},
"StoreConnectorMaxRetries": {{ .Values.platformConnector.mongodbStore.maxRetries }},
"enableMongoDBStorePlatformConnector": "{{ .Values.global.mongodbStore.enabled }}",
"enablePostgresDBStorePlatformConnector": {{ if and .Values.global.datastore .Values.global.datastore.provider }}{{ eq .Values.global.datastore.provider "postgresql" | quote }}{{ else }}"false"{{ end }}
{{- with .Values.platformConnector.pipeline }}
Expand Down
1 change: 1 addition & 0 deletions distros/kubernetes/nvsentinel/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ platformConnector:
mongodbStore:
enabled: false
clientCertMountPath: "/etc/ssl/mongo-client"
maxRetries: 3
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion | 🟠 Major

Add inline documentation for the maxRetries field.

The field lacks an inline comment explaining its purpose and behavior.

As per coding guidelines for Helm values.yaml files, all configuration values must be documented with inline comments.

📝 Suggested documentation
  mongodbStore:
    enabled: false
    clientCertMountPath: "/etc/ssl/mongo-client"
-   maxRetries: 3
+   # Maximum number of retry attempts for MongoDB write operations (default: 3)
+   # With maxRetries: 3, each event will be attempted up to 4 times total (1 initial + 3 retries)
+   # before being dropped to prevent unbounded memory growth during outages
+   maxRetries: 3

Committable suggestion skipped: line range outside the PR's diff.

🤖 Prompt for AI Agents
In @distros/kubernetes/nvsentinel/values.yaml at line 115, Add an inline comment
for the maxRetries field explaining what it controls, its default value,
acceptable range/units, and its behavior (e.g., whether it applies per request
or overall retry attempts and how backoff is handled); update the maxRetries
line (the maxRetries key) to include a brief one-line comment above or to the
right that documents purpose, default (3), and any limits or interaction with
backoff/retry logic.


postgresqlStore:
clientCertMountPath: "/etc/ssl/client-certs"
Expand Down
12 changes: 10 additions & 2 deletions platform-connectors/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,12 +135,20 @@ func initializeK8sConnector(

func initializeDatabaseStoreConnector(
ctx context.Context,
config map[string]interface{},
databaseClientCertMountPath string,
) (*store.DatabaseStoreConnector, error) {
ringBuffer := ringbuffer.NewRingBuffer("databaseStore", ctx)
server.InitializeAndAttachRingBufferForConnectors(ringBuffer)

storeConnector, err := store.InitializeDatabaseStoreConnector(ctx, ringBuffer, databaseClientCertMountPath)
maxRetriesFloat, ok := config["StoreConnectorMaxRetries"].(float64)
if !ok {
return nil, fmt.Errorf("failed to convert StoreConnectorMaxRetries to float: %v", config["StoreConnectorMaxRetries"])
}

maxRetries := int(maxRetriesFloat)

storeConnector, err := store.InitializeDatabaseStoreConnector(ctx, ringBuffer, databaseClientCertMountPath, maxRetries)
if err != nil {
return nil, fmt.Errorf("failed to initialize database store connector: %w", err)
}
Expand Down Expand Up @@ -257,7 +265,7 @@ func initializeConnectors(

// Keep the legacy config key name for backward compatibility with existing ConfigMaps
if config["enableMongoDBStorePlatformConnector"] == True || config["enablePostgresDBStorePlatformConnector"] == True {
storeConnector, err = initializeDatabaseStoreConnector(ctx, databaseClientCertMountPath)
storeConnector, err = initializeDatabaseStoreConnector(ctx, config, databaseClientCertMountPath)
if err != nil {
return nil, nil, fmt.Errorf("failed to initialize database store connector: %w", err)
}
Expand Down
30 changes: 25 additions & 5 deletions platform-connectors/pkg/connectors/store/store_connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,22 +38,25 @@ type DatabaseStoreConnector struct {
// resourceSinkClients are client for pushing data to the resource count sink
ringBuffer *ringbuffer.RingBuffer
nodeName string
maxRetries int
}

func new(
databaseClient client.DatabaseClient,
ringBuffer *ringbuffer.RingBuffer,
nodeName string,
maxRetries int,
) *DatabaseStoreConnector {
return &DatabaseStoreConnector{
databaseClient: databaseClient,
ringBuffer: ringBuffer,
nodeName: nodeName,
maxRetries: maxRetries,
}
}

func InitializeDatabaseStoreConnector(ctx context.Context, ringbuffer *ringbuffer.RingBuffer,
clientCertMountPath string) (*DatabaseStoreConnector, error) {
clientCertMountPath string, maxRetries int) (*DatabaseStoreConnector, error) {
nodeName := os.Getenv("NODE_NAME")
if nodeName == "" {
return nil, fmt.Errorf("NODE_NAME is not set")
Expand All @@ -71,9 +74,9 @@ func InitializeDatabaseStoreConnector(ctx context.Context, ringbuffer *ringbuffe
return nil, fmt.Errorf("failed to create database client: %w", err)
}

slog.Info("Successfully initialized database store connector")
slog.Info("Successfully initialized database store connector", "maxRetries", maxRetries)

return new(databaseClient, ringbuffer, nodeName), nil
return new(databaseClient, ringbuffer, nodeName, maxRetries), nil
}

func createClientFactory(databaseClientCertMountPath string) (*factory.ClientFactory, error) {
Expand Down Expand Up @@ -105,8 +108,25 @@ func (r *DatabaseStoreConnector) FetchAndProcessHealthMetric(ctx context.Context

err := r.insertHealthEvents(ctx, healthEvents)
if err != nil {
slog.Error("Error inserting health events", "error", err)
r.ringBuffer.HealthMetricEleProcessingFailed(healthEvents)
retryCount := r.ringBuffer.NumRequeues(healthEvents)
if retryCount < r.maxRetries {
slog.Warn("Error inserting health events, will retry with exponential backoff",
"error", err,
"retryCount", retryCount,
"maxRetries", r.maxRetries,
"eventCount", len(healthEvents.GetEvents()))

r.ringBuffer.AddRateLimited(healthEvents)
} else {
slog.Error("Max retries exceeded, dropping health events permanently",
"error", err,
"retryCount", retryCount,
"maxRetries", r.maxRetries,
"eventCount", len(healthEvents.GetEvents()),
"firstEventNodeName", healthEvents.GetEvents()[0].GetNodeName(),
"firstEventCheckName", healthEvents.GetEvents()[0].GetCheckName())
r.ringBuffer.HealthMetricEleProcessingCompleted(healthEvents)
}
} else {
r.ringBuffer.HealthMetricEleProcessingCompleted(healthEvents)
}
Expand Down
109 changes: 108 additions & 1 deletion platform-connectors/pkg/connectors/store/store_connector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -295,10 +295,117 @@ func TestInitializeDatabaseStoreConnector(t *testing.T) {
}()

ringBuffer := ringbuffer.NewRingBuffer("test", context.Background())
connector, err := InitializeDatabaseStoreConnector(context.Background(), ringBuffer, "")
connector, err := InitializeDatabaseStoreConnector(context.Background(), ringBuffer, "", 3)

require.Error(t, err)
require.Nil(t, connector)
require.Contains(t, err.Error(), "NODE_NAME is not set")
})
}

// TestMessageRetriedOnMongoDBFailure verifies that
// messages are retried with exponential backoff when MongoDB write fails.
func TestMessageRetriedOnMongoDBFailure(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

ringBuffer := ringbuffer.NewRingBuffer("testRetryBehavior", ctx,
ringbuffer.WithRetryConfig(10*time.Millisecond, 50*time.Millisecond))
nodeName := "testNode"
mockClient := &mockDatabaseClient{}

// First 2 calls fail, 3rd call succeeds
mockClient.On("InsertMany", mock.Anything, mock.Anything).
Return(nil, errors.New("MongoDB temporarily unavailable")).Times(2)
mockClient.On("InsertMany", mock.Anything, mock.Anything).
Return(&client.InsertManyResult{InsertedIDs: []interface{}{"id1"}}, nil).Once()

connector := &DatabaseStoreConnector{
databaseClient: mockClient,
ringBuffer: ringBuffer,
nodeName: nodeName,
maxRetries: 3,
}

healthEvent := &protos.HealthEvent{
NodeName: "gpu-node-1",
GeneratedTimestamp: timestamppb.New(time.Now()),
CheckName: "GpuXidError",
ErrorCode: []string{"79"}, // GPU fell off the bus
IsFatal: true,
IsHealthy: false,
}

healthEvents := &protos.HealthEvents{
Events: []*protos.HealthEvent{healthEvent},
}

ringBuffer.Enqueue(healthEvents)
require.Equal(t, 1, ringBuffer.CurrentLength(), "Event should be in queue")
go connector.FetchAndProcessHealthMetric(ctx)

require.Eventually(t, func() bool {
return ringBuffer.CurrentLength() == 0
}, 500*time.Millisecond, 10*time.Millisecond, "Queue should be empty after successful retry")

// Give a bit more time for all async operations to complete
time.Sleep(100 * time.Millisecond)

// Verify correct number of retry attempts
mockClient.AssertNumberOfCalls(t, "InsertMany", 3)
cancel()
}

// TestMessageDroppedAfterMaxRetries verifies that messages are eventually dropped
// after exceeding the maximum retry count to prevent unbounded memory growth.
func TestMessageDroppedAfterMaxRetries(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

ringBuffer := ringbuffer.NewRingBuffer("testMaxRetries", ctx,
ringbuffer.WithRetryConfig(10*time.Millisecond, 50*time.Millisecond))
nodeName := "testNode"
mockClient := &mockDatabaseClient{}

// Always fail to simulate persistent MongoDB outage
mockClient.On("InsertMany", mock.Anything, mock.Anything).Return(
(*client.InsertManyResult)(nil),
errors.New("MongoDB permanently down"),
)

connector := &DatabaseStoreConnector{
databaseClient: mockClient,
ringBuffer: ringBuffer,
nodeName: nodeName,
maxRetries: 3,
}

healthEvent := &protos.HealthEvent{
NodeName: "gpu-node-1",
GeneratedTimestamp: timestamppb.New(time.Now()),
CheckName: "GpuXidError",
ErrorCode: []string{"79"},
IsFatal: true,
IsHealthy: false,
}

healthEvents := &protos.HealthEvents{
Events: []*protos.HealthEvent{healthEvent},
}

ringBuffer.Enqueue(healthEvents)
require.Equal(t, 1, ringBuffer.CurrentLength())

go connector.FetchAndProcessHealthMetric(ctx)

require.Eventually(t, func() bool {
return ringBuffer.CurrentLength() == 0
}, 500*time.Millisecond, 10*time.Millisecond, "Event should be dropped after max retries")

// Give enough time for the final retry attempt to complete
time.Sleep(100 * time.Millisecond)

// Verify we attempted initial call plus 3 retries (4 total)
mockClient.AssertNumberOfCalls(t, "InsertMany", 4)
cancel()
}
48 changes: 46 additions & 2 deletions platform-connectors/pkg/ringbuffer/ring_buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,23 +18,57 @@ import (
"context"
"errors"
"log/slog"
"time"

"k8s.io/client-go/util/workqueue"

"github.com/nvidia/nvsentinel/data-models/pkg/protos"
)

const (
// Default retry configuration for production use
// Retry 1: 500ms, Retry 2: 1.5s, Retry 3: 3s (total ~5s to 4th attempt)
DefaultBaseDelay = 500 * time.Millisecond
DefaultMaxDelay = 3 * time.Second
)

type RingBuffer struct {
ringBufferIdentifier string
healthMetricQueue workqueue.TypedRateLimitingInterface[*protos.HealthEvents]
ctx context.Context
}

func NewRingBuffer(ringBufferName string, ctx context.Context) *RingBuffer {
type Option func(*config)

type config struct {
baseDelay time.Duration
maxDelay time.Duration
}

func WithRetryConfig(baseDelay, maxDelay time.Duration) Option {
return func(c *config) {
c.baseDelay = baseDelay
c.maxDelay = maxDelay
}
}

func NewRingBuffer(ringBufferName string, ctx context.Context, opts ...Option) *RingBuffer {
cfg := &config{
baseDelay: DefaultBaseDelay,
maxDelay: DefaultMaxDelay,
}
for _, opt := range opts {
opt(cfg)
}

workqueue.SetProvider(prometheusMetricsProvider{})
rateLimiter := workqueue.NewTypedItemExponentialFailureRateLimiter[*protos.HealthEvents](
cfg.baseDelay,
cfg.maxDelay,
)

queue := workqueue.NewTypedRateLimitingQueueWithConfig(
workqueue.DefaultTypedControllerRateLimiter[*protos.HealthEvents](),
rateLimiter,
workqueue.TypedRateLimitingQueueConfig[*protos.HealthEvents]{
Name: ringBufferName,
},
Expand Down Expand Up @@ -72,6 +106,7 @@ func (rb *RingBuffer) Dequeue() (*protos.HealthEvents, bool) {
}

func (rb *RingBuffer) HealthMetricEleProcessingCompleted(data *protos.HealthEvents) {
rb.healthMetricQueue.Forget(data)
rb.healthMetricQueue.Done(data)
}

Expand All @@ -80,6 +115,15 @@ func (rb *RingBuffer) HealthMetricEleProcessingFailed(data *protos.HealthEvents)
rb.healthMetricQueue.Done(data)
}

func (rb *RingBuffer) AddRateLimited(data *protos.HealthEvents) {
rb.healthMetricQueue.AddRateLimited(data)
rb.healthMetricQueue.Done(data)
}

func (rb *RingBuffer) NumRequeues(data *protos.HealthEvents) int {
return rb.healthMetricQueue.NumRequeues(data)
}

func (rb *RingBuffer) ShutDownHealthMetricQueue() {
rb.healthMetricQueue.ShutDownWithDrain()
}
Expand Down