Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,7 @@ spec:
- "--database-client-cert-mount-path={{ .Values.clientCertMountPath }}"
- "--uds-path=/run/nvsentinel/nvsentinel.sock"
- "--metrics-port=2113"
- "--processing-strategy={{ .Values.processingStrategy }}"
resources:
{{- toYaml .Values.quarantineTriggerEngine.resources | default .Values.resources | nindent 12 }}
ports:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,13 @@ tolerations: []

podAnnotations: {}

# Processing strategy for health events
# valid values: EXECUTE_REMEDIATION, STORE_ONLY
# default: EXECUTE_REMEDIATION
# EXECUTE_REMEDIATION: normal behavior; downstream modules may update cluster state.
# STORE_ONLY: observability-only behavior; event should be persisted/exported but should not modify cluster resources (i.e., no node conditions, no quarantine, no drain, no remediation).
processingStrategy: EXECUTE_REMEDIATION

# Log verbosity level for the main CSP health monitor container (e.g. "debug", "info", "warn", "error")
logLevel: info

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ type appConfig struct {
udsPath string
databaseClientCertMountPath string
metricsPort string
processingStrategy string
}

func parseFlags() *appConfig {
Expand All @@ -71,6 +72,8 @@ func parseFlags() *appConfig {
"Directory where database client tls.crt, tls.key, and ca.crt are mounted.",
)
flag.StringVar(&cfg.metricsPort, "metrics-port", defaultMetricsPortSidecar, "Port for the sidecar Prometheus metrics.")
flag.StringVar(&cfg.processingStrategy, "processing-strategy", "EXECUTE_REMEDIATION",
"Event processing strategy: EXECUTE_REMEDIATION or STORE_ONLY")

// Parse flags after initialising klog
flag.Parse()
Expand Down Expand Up @@ -213,7 +216,15 @@ func run() error {
return fmt.Errorf("kubernetes client setup failed: %w", err)
}

engine := trigger.NewEngine(cfg, store, platformConnectorClient, k8sClient)
value, ok := pb.ProcessingStrategy_value[appCfg.processingStrategy]
if !ok {
return fmt.Errorf("invalid processingStrategy %q (expected EXECUTE_REMEDIATION or STORE_ONLY)",
appCfg.processingStrategy)
}

slog.Info("Event handling strategy configured", "processingStrategy", appCfg.processingStrategy)

engine := trigger.NewEngine(cfg, store, platformConnectorClient, k8sClient, pb.ProcessingStrategy(value))

slog.Info("Trigger engine starting...")
engine.Start(gCtx)
Expand Down
47 changes: 24 additions & 23 deletions health-monitors/csp-health-monitor/pkg/triggerengine/trigger.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,13 +56,14 @@ const (
// Engine polls the datastore for maintenance events and forwards the
// corresponding health signals to NVSentinel through the UDS connector.
type Engine struct {
store datastore.Store
udsClient pb.PlatformConnectorClient
config *config.Config
pollInterval time.Duration
k8sClient kubernetes.Interface
monitoredNodes sync.Map // Track which nodes are currently being monitored
monitorInterval time.Duration
store datastore.Store
udsClient pb.PlatformConnectorClient
config *config.Config
pollInterval time.Duration
k8sClient kubernetes.Interface
monitoredNodes sync.Map // Track which nodes are currently being monitored
monitorInterval time.Duration
processingStrategy pb.ProcessingStrategy
}

// NewEngine constructs a ready-to-run Engine instance.
Expand All @@ -71,14 +72,16 @@ func NewEngine(
store datastore.Store,
udsClient pb.PlatformConnectorClient,
k8sClient kubernetes.Interface,
processingStrategy pb.ProcessingStrategy,
) *Engine {
return &Engine{
config: cfg,
store: store,
udsClient: udsClient,
pollInterval: time.Duration(cfg.MaintenanceEventPollIntervalSeconds) * time.Second,
k8sClient: k8sClient,
monitorInterval: defaultMonitorInterval,
config: cfg,
store: store,
udsClient: udsClient,
pollInterval: time.Duration(cfg.MaintenanceEventPollIntervalSeconds) * time.Second,
k8sClient: k8sClient,
monitorInterval: defaultMonitorInterval,
processingStrategy: processingStrategy,
}
}

Expand Down Expand Up @@ -343,13 +346,14 @@ func (e *Engine) mapMaintenanceEventToHealthEvent(
}

healthEvent := &pb.HealthEvent{
Agent: "csp-health-monitor", // Consistent agent name
ComponentClass: event.ResourceType, // e.g., "EC2", "gce_instance"
CheckName: "CSPMaintenance", // Consistent check name
IsFatal: isFatal,
IsHealthy: isHealthy,
Message: message,
RecommendedAction: pb.RecommendedAction(actionEnum),
Agent: "csp-health-monitor", // Consistent agent name
ComponentClass: event.ResourceType, // e.g., "EC2", "gce_instance"
CheckName: "CSPMaintenance", // Consistent check name
IsFatal: isFatal,
IsHealthy: isHealthy,
ProcessingStrategy: e.processingStrategy,
Message: message,
RecommendedAction: pb.RecommendedAction(actionEnum),
EntitiesImpacted: []*pb.Entity{
{
EntityType: event.ResourceType,
Expand All @@ -359,9 +363,6 @@ func (e *Engine) mapMaintenanceEventToHealthEvent(
Metadata: event.Metadata, // Pass along metadata
NodeName: event.NodeName, // K8s node name
GeneratedTimestamp: timestamppb.New(time.Now()),
// TODO: Remove hardcoded processing strategy and make it configurable via the config file.
// PR: https://github.com/NVIDIA/NVSentinel/pull/641
ProcessingStrategy: pb.ProcessingStrategy_EXECUTE_REMEDIATION,
}

return healthEvent, nil
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ func TestNewEngine(t *testing.T) {
mUDSClient := new(MockUDSClient)
mockClient := createMockClientWithReadyNodes()

engine := NewEngine(cfg, mStore, mUDSClient, mockClient)
engine := NewEngine(cfg, mStore, mUDSClient, mockClient, pb.ProcessingStrategy_EXECUTE_REMEDIATION)

assert.NotNil(t, engine)
assert.Equal(t, cfg, engine.config)
Expand All @@ -204,7 +204,7 @@ func TestMapMaintenanceEventToHealthEvent(t *testing.T) {
cfg := newTestConfig()
mStore := new(MockDatastore) // Not strictly needed for this func, but engine needs it
mUDSClient := new(MockUDSClient) // Not strictly needed for this func, but engine needs it
engine := NewEngine(cfg, mStore, mUDSClient, nil)
engine := NewEngine(cfg, mStore, mUDSClient, nil, pb.ProcessingStrategy_EXECUTE_REMEDIATION)

tests := []struct {
name string
Expand Down Expand Up @@ -612,7 +612,7 @@ func TestProcessAndSendTrigger(t *testing.T) {
t.Run(tc.name, func(t *testing.T) {
mStore := new(MockDatastore)
mUDSClient := new(MockUDSClient)
engine := NewEngine(cfg, mStore, mUDSClient, nil)
engine := NewEngine(cfg, mStore, mUDSClient, nil, pb.ProcessingStrategy_EXECUTE_REMEDIATION)

tc.setupMocks(mStore, mUDSClient, tc.event, tc.targetDBStatus)

Expand Down Expand Up @@ -793,7 +793,7 @@ func TestCheckAndTriggerEvents(t *testing.T) {
mStore := new(MockDatastore)
mUDSClient := new(MockUDSClient)
mockClient := createMockClientWithReadyNodes("node-q1", "node-h1", "q-no-node")
engine := NewEngine(cfg, mStore, mUDSClient, mockClient)
engine := NewEngine(cfg, mStore, mUDSClient, mockClient, pb.ProcessingStrategy_EXECUTE_REMEDIATION)

if tc.setupMocks != nil {
tc.setupMocks(mStore, mUDSClient)
Expand Down Expand Up @@ -839,7 +839,7 @@ func TestHealthyTriggerWaitsForNodeReady(t *testing.T) {
mUDSClient.On("HealthEventOccurredV1", mock.Anything, mock.Anything, mock.Anything).Return(&emptypb.Empty{}, nil).Once()
mStore.On("UpdateEventStatus", mock.AnythingOfType("*context.timerCtx"), healthyEvent.EventID, model.StatusHealthyTriggered).Return(nil).Once()

engine := NewEngine(cfg, mStore, mUDSClient, mockClient)
engine := NewEngine(cfg, mStore, mUDSClient, mockClient, pb.ProcessingStrategy_EXECUTE_REMEDIATION)
engine.monitorInterval = 3 * time.Second

err := engine.checkAndTriggerEvents(ctx)
Expand Down
104 changes: 104 additions & 0 deletions tests/csp_health_monitor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -413,3 +413,107 @@ func TestCSPHealthMonitorQuarantineThreshold(t *testing.T) {

testEnv.Test(t, feature.Feature())
}

// TestCSPHealthMonitorStoreOnlyProcessingStrategy verifies the STORE_ONLY processing strategy:
// The event is stored in the database and exported as a CloudEvent, but does not trigger any cordoning or draining.
func TestCSPHealthMonitorStoreOnlyProcessingStrategy(t *testing.T) {
feature := features.New("Processing Strategy").
WithLabel("suite", "csp-health-monitor")

var testCtx *helpers.CSPHealthMonitorTestContext
var injectedEventID string
var testInstanceID string

feature.Setup(func(ctx context.Context, t *testing.T, c *envconf.Config) context.Context {
client, err := c.NewClient()
require.NoError(t, err)

err = helpers.SetDeploymentArgs(ctx, t, client, "csp-health-monitor", helpers.NVSentinelNamespace, "maintenance-notifier", map[string]string{
"--processing-strategy": "STORE_ONLY",
})
require.NoError(t, err)

helpers.WaitForDeploymentRollout(ctx, t, client, "csp-health-monitor", helpers.NVSentinelNamespace)

var newCtx context.Context
newCtx, testCtx = helpers.SetupCSPHealthMonitorTest(ctx, t, c, helpers.CSPGCP)

t.Log("Clearing any existing GCP events from mock API")
require.NoError(t, testCtx.CSPClient.ClearEvents(helpers.CSPGCP), "failed to clear GCP events")

testInstanceID = fmt.Sprintf("%d", time.Now().UnixNano())

t.Logf("Adding GCP instance annotation to node %s (instance_id=%s, zone=us-central1-a)", testCtx.NodeName, testInstanceID)
require.NoError(t, helpers.AddGCPInstanceIDAnnotation(ctx, client, testCtx.NodeName, testInstanceID, "us-central1-a"))

node, err := helpers.GetNodeByName(ctx, client, testCtx.NodeName)
require.NoError(t, err)
require.Equal(t, testInstanceID, node.Annotations["container.googleapis.com/instance_id"], "GCP instance_id annotation not set")
require.Equal(t, "us-central1-a", node.Labels["topology.kubernetes.io/zone"], "zone label not set")
t.Log("Verified: node annotations and labels are set correctly")

// Wait for the monitor to complete at least one poll cycle
helpers.WaitForCSPHealthMonitorPoll(t, testCtx.CSPClient, helpers.CSPGCP)

return newCtx
})

feature.Assess("Injecting PENDING maintenance event and verifying node was not cordoned when processing STORE_ONLY strategy", func(ctx context.Context, t *testing.T, c *envconf.Config) context.Context {
t.Log("Injecting GCP maintenance event with PENDING status into mock Cloud Logging API")

scheduledStart := time.Now().Add(15 * time.Minute)
scheduledEnd := time.Now().Add(75 * time.Minute)
event := helpers.CSPMaintenanceEvent{
CSP: helpers.CSPGCP,
InstanceID: testInstanceID,
NodeName: testCtx.NodeName,
Zone: "us-central1-a",
ProjectID: "test-project",
Status: "PENDING",
EventTypeCode: "compute.instances.upcomingMaintenance",
MaintenanceType: "SCHEDULED",
ScheduledStart: &scheduledStart,
ScheduledEnd: &scheduledEnd,
Description: "Scheduled maintenance for GCP instance - e2e test",
}

var err error
injectedEventID, _, err = testCtx.CSPClient.InjectEvent(event)
require.NoError(t, err)
t.Logf("Event injected: ID=%s, instanceID=%s, scheduledStart=%s", injectedEventID, testInstanceID, scheduledStart.Format(time.RFC3339))

// Verify event was stored in mock
eventCount, err := testCtx.CSPClient.GetEventCount(helpers.CSPGCP)
require.NoError(t, err, "failed to get event count from mock")
require.Equal(t, 1, eventCount, "expected 1 event in mock store after injection")
t.Logf("Verified: mock store has %d GCP event(s)", eventCount)

client, err := c.NewClient()
require.NoError(t, err)

helpers.EnsureNodeConditionNotPresent(ctx, t, client, testCtx.NodeName, "CSPMaintenance")
t.Log("Verifying node was not cordoned when processing STORE_ONLY strategy")
helpers.AssertQuarantineState(ctx, t, client, testCtx.NodeName, helpers.QuarantineAssertion{
ExpectCordoned: false,
ExpectAnnotation: false,
})

return ctx
})

feature.Teardown(func(ctx context.Context, t *testing.T, c *envconf.Config) context.Context {
client, err := c.NewClient()
require.NoError(t, err)

helpers.RemoveDeploymentArgs(ctx, client, "csp-health-monitor", helpers.NVSentinelNamespace, "maintenance-notifier", map[string]string{
"--processing-strategy": "STORE_ONLY",
})

helpers.WaitForDeploymentRollout(ctx, t, client, "csp-health-monitor", helpers.NVSentinelNamespace)

helpers.TeardownCSPHealthMonitorTest(ctx, t, c, testCtx)

return ctx
})
testEnv.Test(t, feature.Feature())
}
5 changes: 3 additions & 2 deletions tests/event_exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,15 @@ import (
"testing"
"time"

"tests/helpers"

"github.com/stretchr/testify/require"
appsv1 "k8s.io/api/apps/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"sigs.k8s.io/e2e-framework/klient/wait"
"sigs.k8s.io/e2e-framework/klient/wait/conditions"
"sigs.k8s.io/e2e-framework/pkg/envconf"
"sigs.k8s.io/e2e-framework/pkg/features"
"tests/helpers"
)

func TestEventExporterChangeStream(t *testing.T) {
Expand Down Expand Up @@ -81,7 +82,7 @@ func TestEventExporterChangeStream(t *testing.T) {

t.Log("Validating received CloudEvent")
require.NotNil(t, receivedEvent)
helpers.ValidateCloudEvent(t, receivedEvent, nodeName, testMessage, "GpuXidError", "79")
helpers.ValidateCloudEvent(t, receivedEvent, nodeName, testMessage, "GpuXidError", "79", "EXECUTE_REMEDIATION")

return ctx
})
Expand Down
2 changes: 2 additions & 0 deletions tests/helpers/event_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,7 @@ func ValidateCloudEvent(
t *testing.T,
event map[string]any,
expectedNodeName, expectedMessage, expectedCheckName, expectedErrorCode string,
expectedProcessingStrategy string,
) {
t.Helper()
t.Logf("Validating CloudEvent: %+v", event)
Expand All @@ -241,5 +242,6 @@ func ValidateCloudEvent(
require.Equal(t, expectedCheckName, healthEvent["checkName"])
require.Equal(t, expectedNodeName, healthEvent["nodeName"])
require.Equal(t, expectedMessage, healthEvent["message"])
require.Equal(t, expectedProcessingStrategy, healthEvent["processingStrategy"])
require.Contains(t, healthEvent["errorCode"], expectedErrorCode)
}
Loading
Loading