Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions distros/kubernetes/nvsentinel/templates/configmap.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ data:
"K8sConnectorQps": {{ printf "%.2f" .Values.platformConnector.k8sConnector.qps }},
"K8sConnectorBurst": {{ .Values.platformConnector.k8sConnector.burst }},
"MaxNodeConditionMessageLength": {{ .Values.platformConnector.k8sConnector.maxNodeConditionMessageLength }},
"StoreConnectorMaxRetries": {{ .Values.platformConnector.mongodbStore.maxRetries }},
"enableMongoDBStorePlatformConnector": "{{ .Values.global.mongodbStore.enabled }}",
"enablePostgresDBStorePlatformConnector": {{ if and .Values.global.datastore .Values.global.datastore.provider }}{{ eq .Values.global.datastore.provider "postgresql" | quote }}{{ else }}"false"{{ end }}
{{- with .Values.platformConnector.pipeline }}
Expand Down
1 change: 1 addition & 0 deletions distros/kubernetes/nvsentinel/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ platformConnector:
mongodbStore:
enabled: false
clientCertMountPath: "/etc/ssl/mongo-client"
maxRetries: 3

postgresqlStore:
clientCertMountPath: "/etc/ssl/client-certs"
Expand Down
12 changes: 10 additions & 2 deletions platform-connectors/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,12 +135,20 @@ func initializeK8sConnector(

func initializeDatabaseStoreConnector(
ctx context.Context,
config map[string]interface{},
databaseClientCertMountPath string,
) (*store.DatabaseStoreConnector, error) {
ringBuffer := ringbuffer.NewRingBuffer("databaseStore", ctx)
server.InitializeAndAttachRingBufferForConnectors(ringBuffer)

storeConnector, err := store.InitializeDatabaseStoreConnector(ctx, ringBuffer, databaseClientCertMountPath)
maxRetriesFloat, ok := config["StoreConnectorMaxRetries"].(float64)
if !ok {
return nil, fmt.Errorf("failed to convert StoreConnectorMaxRetries to float: %v", config["StoreConnectorMaxRetries"])
}

maxRetries := int(maxRetriesFloat)

storeConnector, err := store.InitializeDatabaseStoreConnector(ctx, ringBuffer, databaseClientCertMountPath, maxRetries)
if err != nil {
return nil, fmt.Errorf("failed to initialize database store connector: %w", err)
}
Expand Down Expand Up @@ -257,7 +265,7 @@ func initializeConnectors(

// Keep the legacy config key name for backward compatibility with existing ConfigMaps
if config["enableMongoDBStorePlatformConnector"] == True || config["enablePostgresDBStorePlatformConnector"] == True {
storeConnector, err = initializeDatabaseStoreConnector(ctx, databaseClientCertMountPath)
storeConnector, err = initializeDatabaseStoreConnector(ctx, config, databaseClientCertMountPath)
if err != nil {
return nil, nil, fmt.Errorf("failed to initialize database store connector: %w", err)
}
Expand Down
30 changes: 25 additions & 5 deletions platform-connectors/pkg/connectors/store/store_connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,22 +38,25 @@ type DatabaseStoreConnector struct {
// resourceSinkClients are client for pushing data to the resource count sink
ringBuffer *ringbuffer.RingBuffer
nodeName string
maxRetries int
}

func new(
databaseClient client.DatabaseClient,
ringBuffer *ringbuffer.RingBuffer,
nodeName string,
maxRetries int,
) *DatabaseStoreConnector {
return &DatabaseStoreConnector{
databaseClient: databaseClient,
ringBuffer: ringBuffer,
nodeName: nodeName,
maxRetries: maxRetries,
}
}

func InitializeDatabaseStoreConnector(ctx context.Context, ringbuffer *ringbuffer.RingBuffer,
clientCertMountPath string) (*DatabaseStoreConnector, error) {
clientCertMountPath string, maxRetries int) (*DatabaseStoreConnector, error) {
nodeName := os.Getenv("NODE_NAME")
if nodeName == "" {
return nil, fmt.Errorf("NODE_NAME is not set")
Expand All @@ -71,9 +74,9 @@ func InitializeDatabaseStoreConnector(ctx context.Context, ringbuffer *ringbuffe
return nil, fmt.Errorf("failed to create database client: %w", err)
}

slog.Info("Successfully initialized database store connector")
slog.Info("Successfully initialized database store connector", "maxRetries", maxRetries)

return new(databaseClient, ringbuffer, nodeName), nil
return new(databaseClient, ringbuffer, nodeName, maxRetries), nil
}

func createClientFactory(databaseClientCertMountPath string) (*factory.ClientFactory, error) {
Expand Down Expand Up @@ -105,8 +108,25 @@ func (r *DatabaseStoreConnector) FetchAndProcessHealthMetric(ctx context.Context

err := r.insertHealthEvents(ctx, healthEvents)
if err != nil {
slog.Error("Error inserting health events", "error", err)
r.ringBuffer.HealthMetricEleProcessingFailed(healthEvents)
retryCount := r.ringBuffer.NumRequeues(healthEvents)
if retryCount < r.maxRetries {
slog.Warn("Error inserting health events, will retry with exponential backoff",
"error", err,
"retryCount", retryCount,
"maxRetries", r.maxRetries,
"eventCount", len(healthEvents.GetEvents()))

r.ringBuffer.AddRateLimited(healthEvents)
} else {
slog.Error("Max retries exceeded, dropping health events permanently",
"error", err,
"retryCount", retryCount,
"maxRetries", r.maxRetries,
"eventCount", len(healthEvents.GetEvents()),
"firstEventNodeName", healthEvents.GetEvents()[0].GetNodeName(),
"firstEventCheckName", healthEvents.GetEvents()[0].GetCheckName())
r.ringBuffer.HealthMetricEleProcessingCompleted(healthEvents)
}
} else {
r.ringBuffer.HealthMetricEleProcessingCompleted(healthEvents)
}
Expand Down
109 changes: 108 additions & 1 deletion platform-connectors/pkg/connectors/store/store_connector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -295,10 +295,117 @@ func TestInitializeDatabaseStoreConnector(t *testing.T) {
}()

ringBuffer := ringbuffer.NewRingBuffer("test", context.Background())
connector, err := InitializeDatabaseStoreConnector(context.Background(), ringBuffer, "")
connector, err := InitializeDatabaseStoreConnector(context.Background(), ringBuffer, "", 3)

require.Error(t, err)
require.Nil(t, connector)
require.Contains(t, err.Error(), "NODE_NAME is not set")
})
}

// TestMessageRetriedOnMongoDBFailure verifies that
// messages are retried with exponential backoff when MongoDB write fails.
func TestMessageRetriedOnMongoDBFailure(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

ringBuffer := ringbuffer.NewRingBuffer("testRetryBehavior", ctx,
ringbuffer.WithRetryConfig(10*time.Millisecond, 50*time.Millisecond))
nodeName := "testNode"
mockClient := &mockDatabaseClient{}

// First 2 calls fail, 3rd call succeeds
mockClient.On("InsertMany", mock.Anything, mock.Anything).
Return(nil, errors.New("MongoDB temporarily unavailable")).Times(2)
mockClient.On("InsertMany", mock.Anything, mock.Anything).
Return(&client.InsertManyResult{InsertedIDs: []interface{}{"id1"}}, nil).Once()

connector := &DatabaseStoreConnector{
databaseClient: mockClient,
ringBuffer: ringBuffer,
nodeName: nodeName,
maxRetries: 3,
}

healthEvent := &protos.HealthEvent{
NodeName: "gpu-node-1",
GeneratedTimestamp: timestamppb.New(time.Now()),
CheckName: "GpuXidError",
ErrorCode: []string{"79"}, // GPU fell off the bus
IsFatal: true,
IsHealthy: false,
}

healthEvents := &protos.HealthEvents{
Events: []*protos.HealthEvent{healthEvent},
}

ringBuffer.Enqueue(healthEvents)
require.Equal(t, 1, ringBuffer.CurrentLength(), "Event should be in queue")
go connector.FetchAndProcessHealthMetric(ctx)

require.Eventually(t, func() bool {
return ringBuffer.CurrentLength() == 0
}, 500*time.Millisecond, 10*time.Millisecond, "Queue should be empty after successful retry")

// Give a bit more time for all async operations to complete
time.Sleep(100 * time.Millisecond)

// Verify correct number of retry attempts
mockClient.AssertNumberOfCalls(t, "InsertMany", 3)
cancel()
}

// TestMessageDroppedAfterMaxRetries verifies that messages are eventually dropped
// after exceeding the maximum retry count to prevent unbounded memory growth.
func TestMessageDroppedAfterMaxRetries(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

ringBuffer := ringbuffer.NewRingBuffer("testMaxRetries", ctx,
ringbuffer.WithRetryConfig(10*time.Millisecond, 50*time.Millisecond))
nodeName := "testNode"
mockClient := &mockDatabaseClient{}

// Always fail to simulate persistent MongoDB outage
mockClient.On("InsertMany", mock.Anything, mock.Anything).Return(
(*client.InsertManyResult)(nil),
errors.New("MongoDB permanently down"),
)

connector := &DatabaseStoreConnector{
databaseClient: mockClient,
ringBuffer: ringBuffer,
nodeName: nodeName,
maxRetries: 3,
}

healthEvent := &protos.HealthEvent{
NodeName: "gpu-node-1",
GeneratedTimestamp: timestamppb.New(time.Now()),
CheckName: "GpuXidError",
ErrorCode: []string{"79"},
IsFatal: true,
IsHealthy: false,
}

healthEvents := &protos.HealthEvents{
Events: []*protos.HealthEvent{healthEvent},
}

ringBuffer.Enqueue(healthEvents)
require.Equal(t, 1, ringBuffer.CurrentLength())

go connector.FetchAndProcessHealthMetric(ctx)

require.Eventually(t, func() bool {
return ringBuffer.CurrentLength() == 0
}, 500*time.Millisecond, 10*time.Millisecond, "Event should be dropped after max retries")

// Give enough time for the final retry attempt to complete
time.Sleep(100 * time.Millisecond)

// Verify we attempted initial call plus 3 retries (4 total)
mockClient.AssertNumberOfCalls(t, "InsertMany", 4)
cancel()
}
48 changes: 46 additions & 2 deletions platform-connectors/pkg/ringbuffer/ring_buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,23 +18,57 @@ import (
"context"
"errors"
"log/slog"
"time"

"k8s.io/client-go/util/workqueue"

"github.com/nvidia/nvsentinel/data-models/pkg/protos"
)

const (
// Default retry configuration for production use
// Retry 1: 500ms, Retry 2: 1.5s, Retry 3: 3s (total ~5s to 4th attempt)
DefaultBaseDelay = 500 * time.Millisecond
DefaultMaxDelay = 3 * time.Second
)

type RingBuffer struct {
ringBufferIdentifier string
healthMetricQueue workqueue.TypedRateLimitingInterface[*protos.HealthEvents]
ctx context.Context
}

func NewRingBuffer(ringBufferName string, ctx context.Context) *RingBuffer {
type Option func(*config)

type config struct {
baseDelay time.Duration
maxDelay time.Duration
}

func WithRetryConfig(baseDelay, maxDelay time.Duration) Option {
return func(c *config) {
c.baseDelay = baseDelay
c.maxDelay = maxDelay
}
}

func NewRingBuffer(ringBufferName string, ctx context.Context, opts ...Option) *RingBuffer {
cfg := &config{
baseDelay: DefaultBaseDelay,
maxDelay: DefaultMaxDelay,
}
for _, opt := range opts {
opt(cfg)
}

workqueue.SetProvider(prometheusMetricsProvider{})
rateLimiter := workqueue.NewTypedItemExponentialFailureRateLimiter[*protos.HealthEvents](
cfg.baseDelay,
cfg.maxDelay,
)

queue := workqueue.NewTypedRateLimitingQueueWithConfig(
workqueue.DefaultTypedControllerRateLimiter[*protos.HealthEvents](),
rateLimiter,
workqueue.TypedRateLimitingQueueConfig[*protos.HealthEvents]{
Name: ringBufferName,
},
Expand Down Expand Up @@ -72,6 +106,7 @@ func (rb *RingBuffer) Dequeue() (*protos.HealthEvents, bool) {
}

func (rb *RingBuffer) HealthMetricEleProcessingCompleted(data *protos.HealthEvents) {
rb.healthMetricQueue.Forget(data)
rb.healthMetricQueue.Done(data)
}

Expand All @@ -80,6 +115,15 @@ func (rb *RingBuffer) HealthMetricEleProcessingFailed(data *protos.HealthEvents)
rb.healthMetricQueue.Done(data)
}

func (rb *RingBuffer) AddRateLimited(data *protos.HealthEvents) {
rb.healthMetricQueue.AddRateLimited(data)
rb.healthMetricQueue.Done(data)
}

func (rb *RingBuffer) NumRequeues(data *protos.HealthEvents) int {
return rb.healthMetricQueue.NumRequeues(data)
}

func (rb *RingBuffer) ShutDownHealthMetricQueue() {
rb.healthMetricQueue.ShutDownWithDrain()
}
Expand Down
Loading