diff --git a/distros/kubernetes/nvsentinel/charts/syslog-health-monitor/templates/_helpers.tpl b/distros/kubernetes/nvsentinel/charts/syslog-health-monitor/templates/_helpers.tpl index 1ecd5656a..7e21a06a9 100644 --- a/distros/kubernetes/nvsentinel/charts/syslog-health-monitor/templates/_helpers.tpl +++ b/distros/kubernetes/nvsentinel/charts/syslog-health-monitor/templates/_helpers.tpl @@ -100,6 +100,8 @@ spec: - "{{ join "," $root.Values.enabledChecks }}" - "--metadata-path" - "{{ $root.Values.global.metadataPath }}" + - "--processing-strategy" + - {{ $root.Values.processingStrategy }} resources: {{- toYaml $root.Values.resources | nindent 12 }} ports: diff --git a/distros/kubernetes/nvsentinel/charts/syslog-health-monitor/values.yaml b/distros/kubernetes/nvsentinel/charts/syslog-health-monitor/values.yaml index f7e0f1a2c..8624bc138 100644 --- a/distros/kubernetes/nvsentinel/charts/syslog-health-monitor/values.yaml +++ b/distros/kubernetes/nvsentinel/charts/syslog-health-monitor/values.yaml @@ -50,3 +50,10 @@ tolerations: [] journalHostPath: /var/log logLevel: info + +# Processing strategy for health events +# valid values: EXECUTE_REMEDIATION, STORE_ONLY +# default: EXECUTE_REMEDIATION +# EXECUTE_REMEDIATION: normal behavior; downstream modules may update cluster state. +# STORE_ONLY: observability-only behavior; event should be persisted/exported but should not modify cluster resources (i.e., no node conditions, no quarantine, no drain, no remediation). +processingStrategy: EXECUTE_REMEDIATION \ No newline at end of file diff --git a/health-monitors/syslog-health-monitor/main.go b/health-monitors/syslog-health-monitor/main.go index fc8f248be..24c061aea 100644 --- a/health-monitors/syslog-health-monitor/main.go +++ b/health-monitors/syslog-health-monitor/main.go @@ -67,6 +67,8 @@ var ( "Indicates if this monitor is running in Kata Containers mode (set by DaemonSet variant).") metadataPath = flag.String("metadata-path", "/var/lib/nvsentinel/gpu_metadata.json", "Path to GPU metadata JSON file.") + processingStrategyFlag = flag.String("processing-strategy", "EXECUTE_REMEDIATION", + "Event processing strategy: EXECUTE_REMEDIATION or STORE_ONLY") ) var checks []fd.CheckDefinition @@ -159,6 +161,15 @@ func run() error { slog.Info("Creating syslog monitor", "checksCount", len(checks)) + value, ok := pb.ProcessingStrategy_value[*processingStrategyFlag] + if !ok { + return fmt.Errorf("unexpected processingStrategy value: %q", *processingStrategyFlag) + } + + slog.Info("Event handling strategy configured", "processingStrategy", *processingStrategyFlag) + + processingStrategy := pb.ProcessingStrategy(value) + fdHealthMonitor, err := fd.NewSyslogMonitor( nodeName, checks, @@ -169,6 +180,7 @@ func run() error { *stateFileFlag, *xidAnalyserEndpoint, *metadataPath, + processingStrategy, ) if err != nil { return fmt.Errorf("error creating syslog health monitor: %w", err) diff --git a/health-monitors/syslog-health-monitor/pkg/gpufallen/gpufallen_handler.go b/health-monitors/syslog-health-monitor/pkg/gpufallen/gpufallen_handler.go index 033b523cd..2ef3af182 100644 --- a/health-monitors/syslog-health-monitor/pkg/gpufallen/gpufallen_handler.go +++ b/health-monitors/syslog-health-monitor/pkg/gpufallen/gpufallen_handler.go @@ -28,7 +28,9 @@ import ( // NewGPUFallenHandler creates a new GPUFallenHandler instance. func NewGPUFallenHandler(nodeName, defaultAgentName, - defaultComponentClass, checkName string) (*GPUFallenHandler, error) { + defaultComponentClass, checkName string, + processingStrategy pb.ProcessingStrategy, +) (*GPUFallenHandler, error) { ctx, cancel := context.WithCancel(context.Background()) h := &GPUFallenHandler{ @@ -36,6 +38,7 @@ func NewGPUFallenHandler(nodeName, defaultAgentName, defaultAgentName: defaultAgentName, defaultComponentClass: defaultComponentClass, checkName: checkName, + processingStrategy: processingStrategy, recentXIDs: make(map[string]xidRecord), xidWindow: 5 * time.Minute, // Remember XIDs for 5 minutes cancelCleanup: cancel, @@ -232,6 +235,7 @@ func (h *GPUFallenHandler) createHealthEventFromError(event *gpuFallenErrorEvent NodeName: h.nodeName, RecommendedAction: pb.RecommendedAction_RESTART_BM, ErrorCode: []string{"GPU_FALLEN_OFF_BUS"}, + ProcessingStrategy: h.processingStrategy, } return &pb.HealthEvents{ diff --git a/health-monitors/syslog-health-monitor/pkg/gpufallen/gpufallen_handler_test.go b/health-monitors/syslog-health-monitor/pkg/gpufallen/gpufallen_handler_test.go index 25f06f42a..501e19c55 100644 --- a/health-monitors/syslog-health-monitor/pkg/gpufallen/gpufallen_handler_test.go +++ b/health-monitors/syslog-health-monitor/pkg/gpufallen/gpufallen_handler_test.go @@ -169,6 +169,7 @@ func TestProcessLine(t *testing.T) { "test-agent", "GPU", "test-check", + pb.ProcessingStrategy_STORE_ONLY, ) require.NoError(t, err) defer handler.Close() @@ -181,6 +182,7 @@ func TestProcessLine(t *testing.T) { if tc.validateEvent != nil { tc.validateEvent(t, events, tc.message) } + assert.Equal(t, pb.ProcessingStrategy_STORE_ONLY, events.Events[0].ProcessingStrategy) } else { assert.Nil(t, events, "Expected no event to be generated") } @@ -196,6 +198,7 @@ func TestXIDTracking(t *testing.T) { "test-agent", "GPU", "test-check", + pb.ProcessingStrategy_EXECUTE_REMEDIATION, ) require.NoError(t, err) defer handler.Close() // Cleanup goroutine to prevent leaks @@ -222,6 +225,7 @@ func TestXIDTracking(t *testing.T) { "test-agent", "GPU", "test-check", + pb.ProcessingStrategy_EXECUTE_REMEDIATION, ) require.NoError(t, err) defer handler2.Close() @@ -234,6 +238,7 @@ func TestXIDTracking(t *testing.T) { require.NoError(t, err) require.NotNil(t, events, "Should generate event when no recent XID") require.Len(t, events.Events, 1) + assert.Equal(t, handler2.processingStrategy, events.Events[0].ProcessingStrategy) }) t.Run("XID expires after time window", func(t *testing.T) { @@ -243,6 +248,7 @@ func TestXIDTracking(t *testing.T) { "test-agent", "GPU", "test-check", + pb.ProcessingStrategy_EXECUTE_REMEDIATION, ) require.NoError(t, err) defer handler3.Close() @@ -270,6 +276,7 @@ func TestXIDTracking(t *testing.T) { "test-agent", "GPU", "test-check", + pb.ProcessingStrategy_EXECUTE_REMEDIATION, ) require.NoError(t, err) defer handler4.Close() @@ -298,6 +305,7 @@ func TestXIDTracking(t *testing.T) { "test-agent", "GPU", "test-check", + pb.ProcessingStrategy_EXECUTE_REMEDIATION, ) require.NoError(t, err) defer handler5.Close() @@ -315,6 +323,7 @@ func TestXIDTracking(t *testing.T) { "test-agent", "GPU", "test-check", + pb.ProcessingStrategy_EXECUTE_REMEDIATION, ) require.NoError(t, err) defer handler6.Close() @@ -340,6 +349,7 @@ func TestXIDTracking(t *testing.T) { "test-agent", "GPU", "test-check", + pb.ProcessingStrategy_EXECUTE_REMEDIATION, ) require.NoError(t, err) defer handler7.Close() @@ -379,6 +389,7 @@ func TestXIDTracking(t *testing.T) { "test-agent", "GPU", "test-check", + pb.ProcessingStrategy_EXECUTE_REMEDIATION, ) require.NoError(t, err) defer handler8.Close() diff --git a/health-monitors/syslog-health-monitor/pkg/gpufallen/types.go b/health-monitors/syslog-health-monitor/pkg/gpufallen/types.go index 85e99e6e6..31cc2c20e 100644 --- a/health-monitors/syslog-health-monitor/pkg/gpufallen/types.go +++ b/health-monitors/syslog-health-monitor/pkg/gpufallen/types.go @@ -19,6 +19,8 @@ import ( "regexp" "sync" "time" + + pb "github.com/nvidia/nvsentinel/data-models/pkg/protos" ) var ( @@ -47,6 +49,7 @@ type GPUFallenHandler struct { defaultAgentName string defaultComponentClass string checkName string + processingStrategy pb.ProcessingStrategy mu sync.RWMutex recentXIDs map[string]xidRecord // pciAddr -> XID record xidWindow time.Duration // how long to remember XID errors diff --git a/health-monitors/syslog-health-monitor/pkg/sxid/sxid_handler.go b/health-monitors/syslog-health-monitor/pkg/sxid/sxid_handler.go index e9b8bc812..2d2b23a20 100644 --- a/health-monitors/syslog-health-monitor/pkg/sxid/sxid_handler.go +++ b/health-monitors/syslog-health-monitor/pkg/sxid/sxid_handler.go @@ -28,12 +28,15 @@ import ( ) func NewSXIDHandler(nodeName, defaultAgentName, - defaultComponentClass, checkName, metadataPath string) (*SXIDHandler, error) { + defaultComponentClass, checkName, metadataPath string, + processingStrategy pb.ProcessingStrategy, +) (*SXIDHandler, error) { return &SXIDHandler{ nodeName: nodeName, defaultAgentName: defaultAgentName, defaultComponentClass: defaultComponentClass, checkName: checkName, + processingStrategy: processingStrategy, metadataReader: metadata.NewReader(metadataPath), }, nil } @@ -103,6 +106,7 @@ func (sxidHandler *SXIDHandler) ProcessLine(message string) (*pb.HealthEvents, e RecommendedAction: errRes, ErrorCode: []string{fmt.Sprint(sxidErrorEvent.ErrorNum)}, Metadata: metadata, + ProcessingStrategy: sxidHandler.processingStrategy, } return &pb.HealthEvents{ diff --git a/health-monitors/syslog-health-monitor/pkg/sxid/sxid_handler_test.go b/health-monitors/syslog-health-monitor/pkg/sxid/sxid_handler_test.go index 08d94a028..a2e3e857a 100644 --- a/health-monitors/syslog-health-monitor/pkg/sxid/sxid_handler_test.go +++ b/health-monitors/syslog-health-monitor/pkg/sxid/sxid_handler_test.go @@ -15,8 +15,11 @@ package sxid import ( + "os" + "path/filepath" "testing" + pb "github.com/nvidia/nvsentinel/data-models/pkg/protos" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -40,7 +43,7 @@ func TestNewSXIDHandler(t *testing.T) { for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { - handler, err := NewSXIDHandler(tc.nodeName, tc.agentName, tc.componentClass, tc.checkName, "/tmp/metadata.json") + handler, err := NewSXIDHandler(tc.nodeName, tc.agentName, tc.componentClass, tc.checkName, "/tmp/metadata.json", pb.ProcessingStrategy_EXECUTE_REMEDIATION) require.NoError(t, err) require.NotNil(t, handler) @@ -137,6 +140,61 @@ func TestExtractInfoFromNVSwitchErrorMsg(t *testing.T) { } } +func TestProcessLineWithValidTopologyFatalSXID(t *testing.T) { + // Create temp metadata file with valid NVSwitch topology + metadataJSON := `{ + "version": "1.0", + "timestamp": "2025-01-01T00:00:00Z", + "node_name": "test-node", + "gpus": [ + { + "gpu_id": 1, + "uuid": "GPU-aaaabbbb-cccc-dddd-eeee-ffffffffffff", + "pci_address": "0000:18:00.0", + "serial_number": "GPU-SN-002", + "device_name": "NVIDIA H100", + "nvlinks": [ + { + "link_id": 2, + "remote_pci_address": "0000:c3:00.0", + "remote_link_id": 28 + } + ] + } + ], + "nvswitches": ["0000:c3:00.0"] + }` + + tmpDir := t.TempDir() + metadataPath := filepath.Join(tmpDir, "gpu_metadata.json") + err := os.WriteFile(metadataPath, []byte(metadataJSON), 0o644) + require.NoError(t, err) + + handler, err := NewSXIDHandler("test-node", "test-agent", "NVSWITCH", "sxid-check", metadataPath, pb.ProcessingStrategy_STORE_ONLY) + require.NoError(t, err) + + message := "[ 1108.858286] nvidia-nvswitch0: SXid (PCI:0000:c3:00.0): 24007, Fatal, Link 28 sourcetrack timeout error (First)" + events, err := handler.ProcessLine(message) + + require.NoError(t, err) + require.NotNil(t, events) + require.Len(t, events.Events, 1) + + event := events.Events[0] + assert.Equal(t, message, event.Message) + assert.Equal(t, []string{"24007"}, event.ErrorCode) + assert.True(t, event.IsFatal) + assert.False(t, event.IsHealthy) + assert.Equal(t, pb.RecommendedAction_CONTACT_SUPPORT, event.RecommendedAction) + assert.Equal(t, pb.ProcessingStrategy_STORE_ONLY, event.ProcessingStrategy) + + // Verify GPU entity + assert.Equal(t, "GPU", event.EntitiesImpacted[3].EntityType) + assert.Equal(t, "1", event.EntitiesImpacted[3].EntityValue) + assert.Equal(t, "GPU_UUID", event.EntitiesImpacted[4].EntityType) + assert.Equal(t, "GPU-aaaabbbb-cccc-dddd-eeee-ffffffffffff", event.EntitiesImpacted[4].EntityValue) +} + func TestProcessLine(t *testing.T) { testCases := []struct { name string @@ -166,7 +224,7 @@ func TestProcessLine(t *testing.T) { for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { - handler, err := NewSXIDHandler("test-node", "test-agent", "NVSWITCH", "sxid-check", "/tmp/metadata.json") + handler, err := NewSXIDHandler("test-node", "test-agent", "NVSWITCH", "sxid-check", "/tmp/metadata.json", pb.ProcessingStrategy_EXECUTE_REMEDIATION) require.NoError(t, err) events, err := handler.ProcessLine(tc.message) @@ -182,6 +240,7 @@ func TestProcessLine(t *testing.T) { event := events.Events[0] assert.Equal(t, tc.message, event.Message) assert.NotEmpty(t, event.Metadata) + assert.Equal(t, pb.ProcessingStrategy_EXECUTE_REMEDIATION, event.ProcessingStrategy) } else { assert.Nil(t, events) } diff --git a/health-monitors/syslog-health-monitor/pkg/sxid/types.go b/health-monitors/syslog-health-monitor/pkg/sxid/types.go index 796b3eea0..ee2726f25 100644 --- a/health-monitors/syslog-health-monitor/pkg/sxid/types.go +++ b/health-monitors/syslog-health-monitor/pkg/sxid/types.go @@ -17,6 +17,7 @@ package sxid import ( "regexp" + pb "github.com/nvidia/nvsentinel/data-models/pkg/protos" "github.com/nvidia/nvsentinel/health-monitors/syslog-health-monitor/pkg/metadata" ) @@ -30,6 +31,7 @@ type SXIDHandler struct { defaultAgentName string defaultComponentClass string checkName string + processingStrategy pb.ProcessingStrategy metadataReader *metadata.Reader } diff --git a/health-monitors/syslog-health-monitor/pkg/syslog-monitor/syslogmonitor.go b/health-monitors/syslog-health-monitor/pkg/syslog-monitor/syslogmonitor.go index c66802b84..7e0abe5ec 100644 --- a/health-monitors/syslog-health-monitor/pkg/syslog-monitor/syslogmonitor.go +++ b/health-monitors/syslog-health-monitor/pkg/syslog-monitor/syslogmonitor.go @@ -48,10 +48,12 @@ func NewSyslogMonitor( stateFilePath string, xidAnalyserEndpoint string, metadataPath string, + processingStrategy pb.ProcessingStrategy, ) (*SyslogMonitor, error) { return NewSyslogMonitorWithFactory(nodeName, checks, pcClient, defaultAgentName, defaultComponentClass, pollingInterval, stateFilePath, GetDefaultJournalFactory(), xidAnalyserEndpoint, metadataPath, + processingStrategy, ) } @@ -69,6 +71,7 @@ func NewSyslogMonitorWithFactory( journalFactory JournalFactory, xidAnalyserEndpoint string, metadataPath string, + processingStrategy pb.ProcessingStrategy, ) (*SyslogMonitor, error) { // Load state from file state, err := loadState(stateFilePath) @@ -90,6 +93,7 @@ func NewSyslogMonitorWithFactory( pcClient: pcClient, defaultAgentName: defaultAgentName, defaultComponentClass: defaultComponentClass, + processingStrategy: processingStrategy, pollingInterval: pollingInterval, checkLastCursors: state.CheckLastCursors, journalFactory: journalFactory, @@ -103,7 +107,7 @@ func NewSyslogMonitorWithFactory( switch check.Name { case XIDErrorCheck: xidHandler, err := xid.NewXIDHandler(nodeName, - defaultAgentName, defaultComponentClass, check.Name, xidAnalyserEndpoint, metadataPath) + defaultAgentName, defaultComponentClass, check.Name, xidAnalyserEndpoint, metadataPath, processingStrategy) if err != nil { slog.Error("Error initializing XID handler", "error", err.Error()) return nil, fmt.Errorf("failed to initialize XID handler: %w", err) @@ -113,7 +117,7 @@ func NewSyslogMonitorWithFactory( case SXIDErrorCheck: sxidHandler, err := sxid.NewSXIDHandler( - nodeName, defaultAgentName, defaultComponentClass, check.Name, metadataPath) + nodeName, defaultAgentName, defaultComponentClass, check.Name, metadataPath, processingStrategy) if err != nil { slog.Error("Error initializing SXID handler", "error", err.Error()) return nil, fmt.Errorf("failed to initialize SXID handler: %w", err) @@ -123,7 +127,7 @@ func NewSyslogMonitorWithFactory( case GPUFallenOffCheck: gpuFallenHandler, err := gpufallen.NewGPUFallenHandler( - nodeName, defaultAgentName, defaultComponentClass, check.Name) + nodeName, defaultAgentName, defaultComponentClass, check.Name, processingStrategy) if err != nil { slog.Error("Error initializing GPU Fallen Off handler", "error", err.Error()) return nil, fmt.Errorf("failed to initialize GPU Fallen Off handler: %w", err) @@ -806,6 +810,7 @@ func (sm *SyslogMonitor) prepareHealthEventWithAction( IsHealthy: isHealthy, NodeName: sm.nodeName, RecommendedAction: errRes.RecommendedAction, + ProcessingStrategy: sm.processingStrategy, } return &pb.HealthEvents{ diff --git a/health-monitors/syslog-health-monitor/pkg/syslog-monitor/syslogmonitor_test.go b/health-monitors/syslog-health-monitor/pkg/syslog-monitor/syslogmonitor_test.go index d7a5ed2f4..49bf07e5b 100644 --- a/health-monitors/syslog-health-monitor/pkg/syslog-monitor/syslogmonitor_test.go +++ b/health-monitors/syslog-health-monitor/pkg/syslog-monitor/syslogmonitor_test.go @@ -307,13 +307,14 @@ func TestNewSyslogMonitor(t *testing.T) { filePath := testStateFile monitor, err := NewSyslogMonitor(args.NodeName, - args.Checks, args.PcClient, args.DefaultAgentName, args.DefaultComponentClass, args.PollingInterval, filePath, "http://localhost:8080", "/tmp/metadata.json") + args.Checks, args.PcClient, args.DefaultAgentName, args.DefaultComponentClass, args.PollingInterval, filePath, "http://localhost:8080", "/tmp/metadata.json", pb.ProcessingStrategy_STORE_ONLY) assert.NoError(t, err) assert.NotNil(t, monitor) assert.Equal(t, args.NodeName, monitor.nodeName) assert.Equal(t, args.Checks, monitor.checks) assert.NotNil(t, monitor.journalFactory, "Journal factory should not be nil") assert.Equal(t, testStateFile, monitor.stateFilePath) + assert.Equal(t, pb.ProcessingStrategy_STORE_ONLY, monitor.processingStrategy) // Test case 2: With specific fake factory fakeJournalFactory := NewFakeJournalFactory() @@ -326,7 +327,7 @@ func TestNewSyslogMonitor(t *testing.T) { filePath = testStateFile2 monitor, err = NewSyslogMonitorWithFactory(args.NodeName, - args.Checks, args.PcClient, args.DefaultAgentName, args.DefaultComponentClass, args.PollingInterval, filePath, fakeJournalFactory, "http://localhost:8080", "/tmp/metadata.json") + args.Checks, args.PcClient, args.DefaultAgentName, args.DefaultComponentClass, args.PollingInterval, filePath, fakeJournalFactory, "http://localhost:8080", "/tmp/metadata.json", pb.ProcessingStrategy_EXECUTE_REMEDIATION) assert.NoError(t, err) assert.NotNil(t, monitor) assert.Equal(t, fakeJournalFactory, monitor.journalFactory) @@ -398,6 +399,7 @@ func TestJournalProcessingLogic(t *testing.T) { fakeJournalFactory, "http://localhost:8080", "/tmp/metadata.json", + pb.ProcessingStrategy_EXECUTE_REMEDIATION, ) assert.NoError(t, err) @@ -500,6 +502,7 @@ func TestJournalStateManagement(t *testing.T) { mockFactory, "http://localhost:8080", "/tmp/metadata.json", + pb.ProcessingStrategy_EXECUTE_REMEDIATION, ) assert.NoError(t, err) @@ -531,6 +534,7 @@ func TestJournalStateManagement(t *testing.T) { mockFactory, "http://localhost:8080", "/tmp/metadata.json", + pb.ProcessingStrategy_EXECUTE_REMEDIATION, ) assert.NoError(t, err) @@ -578,6 +582,7 @@ func TestBootIDChangeHandling(t *testing.T) { mockFactory, "http://localhost:8080", "/tmp/metadata.json", + pb.ProcessingStrategy_EXECUTE_REMEDIATION, ) assert.NoError(t, err) @@ -627,6 +632,7 @@ func TestRunMultipleChecks(t *testing.T) { mockFactory, "http://localhost:8080", "/tmp/metadata.json", + pb.ProcessingStrategy_EXECUTE_REMEDIATION, ) assert.NoError(t, err) @@ -667,6 +673,7 @@ func TestGPUFallenOffHandlerInitialization(t *testing.T) { mockFactory, "http://localhost:8080", "/tmp/metadata.json", + pb.ProcessingStrategy_EXECUTE_REMEDIATION, ) assert.NoError(t, err) assert.NotNil(t, sm.checkToHandlerMap[GPUFallenOffCheck], "GPU Fallen Off handler should be initialized") diff --git a/health-monitors/syslog-health-monitor/pkg/syslog-monitor/types.go b/health-monitors/syslog-health-monitor/pkg/syslog-monitor/types.go index 184c593b6..187381e65 100644 --- a/health-monitors/syslog-health-monitor/pkg/syslog-monitor/types.go +++ b/health-monitors/syslog-health-monitor/pkg/syslog-monitor/types.go @@ -50,6 +50,7 @@ type SyslogMonitor struct { pcClient pb.PlatformConnectorClient defaultAgentName string defaultComponentClass string + processingStrategy pb.ProcessingStrategy pollingInterval string // Map of check name to last processed cursor checkLastCursors map[string]string diff --git a/health-monitors/syslog-health-monitor/pkg/xid/types.go b/health-monitors/syslog-health-monitor/pkg/xid/types.go index 44cab65f8..c12640165 100644 --- a/health-monitors/syslog-health-monitor/pkg/xid/types.go +++ b/health-monitors/syslog-health-monitor/pkg/xid/types.go @@ -17,6 +17,7 @@ package xid import ( "regexp" + pb "github.com/nvidia/nvsentinel/data-models/pkg/protos" "github.com/nvidia/nvsentinel/health-monitors/syslog-health-monitor/pkg/metadata" "github.com/nvidia/nvsentinel/health-monitors/syslog-health-monitor/pkg/xid/parser" ) @@ -30,6 +31,7 @@ type XIDHandler struct { defaultAgentName string defaultComponentClass string checkName string + processingStrategy pb.ProcessingStrategy pciToGPUUUID map[string]string parser parser.Parser diff --git a/health-monitors/syslog-health-monitor/pkg/xid/xid_handler.go b/health-monitors/syslog-health-monitor/pkg/xid/xid_handler.go index d8615df5f..b7d45bcc2 100644 --- a/health-monitors/syslog-health-monitor/pkg/xid/xid_handler.go +++ b/health-monitors/syslog-health-monitor/pkg/xid/xid_handler.go @@ -31,7 +31,9 @@ import ( ) func NewXIDHandler(nodeName, defaultAgentName, - defaultComponentClass, checkName, xidAnalyserEndpoint, metadataPath string) (*XIDHandler, error) { + defaultComponentClass, checkName, xidAnalyserEndpoint, metadataPath string, + processingStrategy pb.ProcessingStrategy, +) (*XIDHandler, error) { config := parser.ParserConfig{ NodeName: nodeName, XidAnalyserEndpoint: xidAnalyserEndpoint, @@ -48,6 +50,7 @@ func NewXIDHandler(nodeName, defaultAgentName, defaultAgentName: defaultAgentName, defaultComponentClass: defaultComponentClass, checkName: checkName, + processingStrategy: processingStrategy, pciToGPUUUID: make(map[string]string), parser: xidParser, metadataReader: metadata.NewReader(metadataPath), @@ -183,6 +186,7 @@ func (xidHandler *XIDHandler) createHealthEventFromResponse( RecommendedAction: recommendedAction, ErrorCode: []string{xidResp.Result.DecodedXIDStr}, Metadata: metadata, + ProcessingStrategy: xidHandler.processingStrategy, } return &pb.HealthEvents{ diff --git a/health-monitors/syslog-health-monitor/pkg/xid/xid_handler_test.go b/health-monitors/syslog-health-monitor/pkg/xid/xid_handler_test.go index e80275e22..d0fa54f0e 100644 --- a/health-monitors/syslog-health-monitor/pkg/xid/xid_handler_test.go +++ b/health-monitors/syslog-health-monitor/pkg/xid/xid_handler_test.go @@ -87,7 +87,7 @@ func TestNormalizePCI(t *testing.T) { func TestDetermineFatality(t *testing.T) { xidHandler, err := NewXIDHandler("test-node", - "test-agent", "test-component", "test-check", "http://localhost:8080", "/tmp/metadata.json") + "test-agent", "test-component", "test-check", "http://localhost:8080", "/tmp/metadata.json", pb.ProcessingStrategy_EXECUTE_REMEDIATION) assert.Nil(t, err) testCases := []struct { @@ -139,7 +139,7 @@ func TestProcessLine(t *testing.T) { name: "NVRM GPU Map Line", message: "NVRM: GPU at PCI:0000:00:08.0: GPU-12345678-1234-1234-1234-123456789012", setupHandler: func() *XIDHandler { - h, _ := NewXIDHandler("test-node", "test-agent", "GPU", "xid-check", "", "/tmp/metadata.json") + h, _ := NewXIDHandler("test-node", "test-agent", "GPU", "xid-check", "", "/tmp/metadata.json", pb.ProcessingStrategy_EXECUTE_REMEDIATION) h.parser = &mockParser{ parseFunc: func(msg string) (*parser.Response, error) { return nil, nil @@ -154,7 +154,7 @@ func TestProcessLine(t *testing.T) { name: "Valid XID Message", message: "NVRM: Xid (PCI:0000:00:08.0): 79, pid=12345, name=test-process", setupHandler: func() *XIDHandler { - h, _ := NewXIDHandler("test-node", "test-agent", "GPU", "xid-check", "", "/tmp/metadata.json") + h, _ := NewXIDHandler("test-node", "test-agent", "GPU", "xid-check", "", "/tmp/metadata.json", pb.ProcessingStrategy_STORE_ONLY) h.parser = &mockParser{ parseFunc: func(msg string) (*parser.Response, error) { return &parser.Response{ @@ -191,13 +191,14 @@ func TestProcessLine(t *testing.T) { assert.Equal(t, "0000:00:08.0", event.EntitiesImpacted[0].EntityValue) // Issue #197: Message field stores full journal, no Metadata duplication assert.Empty(t, event.Metadata) + assert.Equal(t, pb.ProcessingStrategy_STORE_ONLY, event.ProcessingStrategy) }, }, { name: "Valid XID with GPU UUID", message: "NVRM: Xid (PCI:0000:00:08.0): 79, pid=12345, name=test-process", setupHandler: func() *XIDHandler { - h, _ := NewXIDHandler("test-node", "test-agent", "GPU", "xid-check", "", "/tmp/metadata.json") + h, _ := NewXIDHandler("test-node", "test-agent", "GPU", "xid-check", "", "/tmp/metadata.json", pb.ProcessingStrategy_EXECUTE_REMEDIATION) h.parser = &mockParser{ parseFunc: func(msg string) (*parser.Response, error) { return &parser.Response{ @@ -227,13 +228,14 @@ func TestProcessLine(t *testing.T) { assert.Equal(t, "GPU-12345678-1234-1234-1234-123456789012", event.EntitiesImpacted[1].EntityValue) assert.Equal(t, "NVRM: Xid (PCI:0000:00:08.0): 79, pid=12345, name=test-process", event.Message) assert.Empty(t, event.Metadata) + assert.Equal(t, pb.ProcessingStrategy_EXECUTE_REMEDIATION, event.ProcessingStrategy) }, }, { name: "Parser Returns Error", message: "Some random message", setupHandler: func() *XIDHandler { - h, _ := NewXIDHandler("test-node", "test-agent", "GPU", "xid-check", "", "/tmp/metadata.json") + h, _ := NewXIDHandler("test-node", "test-agent", "GPU", "xid-check", "", "/tmp/metadata.json", pb.ProcessingStrategy_EXECUTE_REMEDIATION) h.parser = &mockParser{ parseFunc: func(msg string) (*parser.Response, error) { return nil, errors.New("parse error") @@ -248,7 +250,7 @@ func TestProcessLine(t *testing.T) { name: "Parser Returns Nil Response", message: "Some random message", setupHandler: func() *XIDHandler { - h, _ := NewXIDHandler("test-node", "test-agent", "GPU", "xid-check", "", "/tmp/metadata.json") + h, _ := NewXIDHandler("test-node", "test-agent", "GPU", "xid-check", "", "/tmp/metadata.json", pb.ProcessingStrategy_EXECUTE_REMEDIATION) h.parser = &mockParser{ parseFunc: func(msg string) (*parser.Response, error) { return nil, nil @@ -263,7 +265,7 @@ func TestProcessLine(t *testing.T) { name: "Parser Returns Success=false", message: "Some random message", setupHandler: func() *XIDHandler { - h, _ := NewXIDHandler("test-node", "test-agent", "GPU", "xid-check", "", "/tmp/metadata.json") + h, _ := NewXIDHandler("test-node", "test-agent", "GPU", "xid-check", "", "/tmp/metadata.json", pb.ProcessingStrategy_EXECUTE_REMEDIATION) h.parser = &mockParser{ parseFunc: func(msg string) (*parser.Response, error) { return &parser.Response{ @@ -302,7 +304,7 @@ func TestProcessLine(t *testing.T) { } func TestCreateHealthEventFromResponse(t *testing.T) { - handler, _ := NewXIDHandler("test-node", "test-agent", "GPU", "xid-check", "", "/tmp/metadata.json") + handler, _ := NewXIDHandler("test-node", "test-agent", "GPU", "xid-check", "", "/tmp/metadata.json", pb.ProcessingStrategy_EXECUTE_REMEDIATION) testCases := []struct { name string @@ -343,6 +345,7 @@ func TestCreateHealthEventFromResponse(t *testing.T) { assert.Equal(t, "test-node", event.NodeName) assert.NotNil(t, event.GeneratedTimestamp) assert.Empty(t, event.Metadata) + assert.Equal(t, pb.ProcessingStrategy_EXECUTE_REMEDIATION, event.ProcessingStrategy) }, }, { @@ -372,6 +375,7 @@ func TestCreateHealthEventFromResponse(t *testing.T) { assert.Equal(t, "GPU-ABCDEF12-3456-7890-ABCD-EF1234567890", event.EntitiesImpacted[1].EntityValue) assert.Equal(t, "Test XID message", event.Message) assert.Empty(t, event.Metadata) + assert.Equal(t, pb.ProcessingStrategy_EXECUTE_REMEDIATION, event.ProcessingStrategy) }, }, } @@ -422,7 +426,7 @@ func TestNewXIDHandler(t *testing.T) { for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { - handler, err := NewXIDHandler(tc.nodeName, tc.agentName, tc.componentClass, tc.checkName, tc.xidAnalyserEndpoint, "/tmp/metadata.json") + handler, err := NewXIDHandler(tc.nodeName, tc.agentName, tc.componentClass, tc.checkName, tc.xidAnalyserEndpoint, "/tmp/metadata.json", pb.ProcessingStrategy_EXECUTE_REMEDIATION) if tc.expectError { assert.Error(t, err) diff --git a/platform-connectors/pkg/connectors/kubernetes/k8s_platform_connector_test.go b/platform-connectors/pkg/connectors/kubernetes/k8s_platform_connector_test.go index 43d8216a5..08d7141d8 100644 --- a/platform-connectors/pkg/connectors/kubernetes/k8s_platform_connector_test.go +++ b/platform-connectors/pkg/connectors/kubernetes/k8s_platform_connector_test.go @@ -1604,10 +1604,12 @@ func TestProcessHealthEvents_StoreOnlyStrategy(t *testing.T) { } if tc.expectNodeConditions { + require.NotEmpty(t, nvsentinelConditions, + "Expected at least one NVSentinel node condition, but got none") assert.Equal(t, tc.expectedConditionType, string(nvsentinelConditions[0].Type), "Expected condition type %s, got %s", tc.expectedConditionType, nvsentinelConditions[0].Type) } else { - assert.Equal(t, 0, len(nvsentinelConditions), + assert.Empty(t, nvsentinelConditions, "Expected no NVSentinel node conditions for STORE_ONLY events, got %d", len(nvsentinelConditions)) } @@ -1618,10 +1620,12 @@ func TestProcessHealthEvents_StoreOnlyStrategy(t *testing.T) { require.NoError(t, err, "Failed to list events") if tc.expectKubernetesEvents { + require.NotEmpty(t, events.Items, + "Expected at least one Kubernetes event, but got none") assert.Equal(t, tc.expectedEventType, events.Items[0].Type, "Expected event type %s, got %s", tc.expectedEventType, events.Items[0].Type) } else { - assert.Equal(t, 0, len(events.Items), + assert.Empty(t, events.Items, "Expected no Kubernetes events for STORE_ONLY events, got %d", len(events.Items)) } diff --git a/tests/event_exporter_test.go b/tests/event_exporter_test.go index a11295da1..260a61c60 100644 --- a/tests/event_exporter_test.go +++ b/tests/event_exporter_test.go @@ -22,6 +22,8 @@ import ( "testing" "time" + "tests/helpers" + "github.com/stretchr/testify/require" appsv1 "k8s.io/api/apps/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -29,7 +31,6 @@ import ( "sigs.k8s.io/e2e-framework/klient/wait/conditions" "sigs.k8s.io/e2e-framework/pkg/envconf" "sigs.k8s.io/e2e-framework/pkg/features" - "tests/helpers" ) func TestEventExporterChangeStream(t *testing.T) { @@ -81,7 +82,7 @@ func TestEventExporterChangeStream(t *testing.T) { t.Log("Validating received CloudEvent") require.NotNil(t, receivedEvent) - helpers.ValidateCloudEvent(t, receivedEvent, nodeName, testMessage, "GpuXidError", "79") + helpers.ValidateCloudEvent(t, receivedEvent, nodeName, testMessage, "GpuXidError", "79", "EXECUTE_REMEDIATION") return ctx }) diff --git a/tests/helpers/event_exporter.go b/tests/helpers/event_exporter.go index 523ecd2a9..af8629f74 100644 --- a/tests/helpers/event_exporter.go +++ b/tests/helpers/event_exporter.go @@ -222,6 +222,7 @@ func ValidateCloudEvent( t *testing.T, event map[string]any, expectedNodeName, expectedMessage, expectedCheckName, expectedErrorCode string, + expectedProcessingStrategy string, ) { t.Helper() t.Logf("Validating CloudEvent: %+v", event) @@ -241,5 +242,6 @@ func ValidateCloudEvent( require.Equal(t, expectedCheckName, healthEvent["checkName"]) require.Equal(t, expectedNodeName, healthEvent["nodeName"]) require.Equal(t, expectedMessage, healthEvent["message"]) + require.Equal(t, expectedProcessingStrategy, healthEvent["processingStrategy"]) require.Contains(t, healthEvent["errorCode"], expectedErrorCode) } diff --git a/tests/helpers/kube.go b/tests/helpers/kube.go index fe9fea7ee..4966a7ad1 100755 --- a/tests/helpers/kube.go +++ b/tests/helpers/kube.go @@ -2228,3 +2228,261 @@ func VerifyLogFilesUploaded(ctx context.Context, t *testing.T, c klient.Client, t.Logf("✓ Log files verified for node %s", nodeName) } + +// WaitForDaemonSetRollout waits for a DaemonSet to complete its rollout. +// It checks that all pods are updated and ready. +func waitForDaemonSetRollout(ctx context.Context, t *testing.T, client klient.Client, name string) { + t.Helper() + + t.Logf("Waiting for daemonset %s/%s rollout to complete", NVSentinelNamespace, name) + + require.Eventually(t, func() bool { + daemonSet := &appsv1.DaemonSet{} + if err := client.Resources().Get(ctx, name, NVSentinelNamespace, daemonSet); err != nil { + t.Logf("Failed to get daemonset: %v", err) + return false + } + + // Check if all desired pods are scheduled, updated, and ready + if daemonSet.Status.DesiredNumberScheduled == 0 { + t.Logf("DaemonSet has no desired pods scheduled yet") + return false + } + + if daemonSet.Status.UpdatedNumberScheduled != daemonSet.Status.DesiredNumberScheduled { + t.Logf("DaemonSet rollout in progress: %d/%d pods updated", + daemonSet.Status.UpdatedNumberScheduled, daemonSet.Status.DesiredNumberScheduled) + + return false + } + + if daemonSet.Status.NumberReady != daemonSet.Status.DesiredNumberScheduled { + t.Logf("DaemonSet rollout in progress: %d/%d pods ready", + daemonSet.Status.NumberReady, daemonSet.Status.DesiredNumberScheduled) + + return false + } + + t.Logf("DaemonSet %s/%s rollout complete: %d/%d pods ready and updated", + NVSentinelNamespace, name, daemonSet.Status.NumberReady, daemonSet.Status.DesiredNumberScheduled) + + return true + }, EventuallyWaitTimeout, WaitInterval, "daemonset %s/%s rollout should complete", NVSentinelNamespace, name) + + t.Logf("DaemonSet %s/%s rollout completed successfully", NVSentinelNamespace, name) +} + +// UpdateDaemonSetArgs updates the daemonset with the specified arguments and complete the rollout. +func UpdateDaemonSetArgs(ctx context.Context, t *testing.T, + client klient.Client, daemonsetName string, containerName string, + args map[string]string) error { + t.Helper() + + t.Logf("Updating daemonset %s/%s with args %v", NVSentinelNamespace, daemonsetName, args) + + err := retry.RetryOnConflict(retry.DefaultRetry, func() error { + daemonSet := &appsv1.DaemonSet{} + if err := client.Resources().Get(ctx, daemonsetName, NVSentinelNamespace, daemonSet); err != nil { + return err + } + + containers := daemonSet.Spec.Template.Spec.Containers + containerFound := false + + for i := range containers { + if containers[i].Name == containerName { + setArgsOnContainer(t, &containers[i], args) + + containerFound = true + + break + } + } + + if !containerFound { + return fmt.Errorf("container %q not found in daemonset %s/%s", containerName, NVSentinelNamespace, daemonsetName) + } + + return client.Resources().Update(ctx, daemonSet) + }) + if err != nil { + return err + } + + t.Logf("Waiting for daemonset %s/%s rollout to complete", NVSentinelNamespace, daemonsetName) + waitForDaemonSetRollout(ctx, t, client, daemonsetName) + + t.Logf("Waiting 10 seconds for daemonset pods to start") + time.Sleep(10 * time.Second) + + return nil +} + +func RemoveDaemonSetArgs(ctx context.Context, t *testing.T, client klient.Client, + daemonsetName string, + containerName string, args map[string]string, +) { + t.Helper() + + t.Logf("Removing args %v from daemonset %s/%s", args, NVSentinelNamespace, daemonsetName) + + err := retry.RetryOnConflict(retry.DefaultRetry, func() error { + daemonSet := &appsv1.DaemonSet{} + if err := client.Resources().Get(ctx, daemonsetName, NVSentinelNamespace, daemonSet); err != nil { + return err + } + + containers := daemonSet.Spec.Template.Spec.Containers + containerFound := false + + for i := range containers { + if containers[i].Name == containerName { + removeArgsFromContainer(&containers[i], args) + + containerFound = true + + break + } + } + + if !containerFound { + return fmt.Errorf("container %q not found in daemonset %s/%s", containerName, NVSentinelNamespace, daemonsetName) + } + + return client.Resources().Update(ctx, daemonSet) + }) + require.NoError(t, err, "failed to remove args from daemonset %s/%s", NVSentinelNamespace, daemonsetName) + + t.Logf("Waiting for daemonset %s/%s rollout to complete after restoration", NVSentinelNamespace, daemonsetName) + waitForDaemonSetRollout(ctx, t, client, daemonsetName) + + t.Log("DaemonSet restored successfully") +} + +// tryUpdateExistingArg attempts to update an existing argument at position j. +// Returns true if the argument was found and updated. +func tryUpdateExistingArg(container *v1.Container, j int, flag, value string) bool { + existingArg := container.Args[j] + + // Match --flag=value style + if strings.HasPrefix(existingArg, flag+"=") { + if value != "" { + container.Args[j] = flag + "=" + value + } else { + container.Args[j] = flag + } + + return true + } + + // Match --flag or --flag value style + if existingArg == flag { + if value != "" { + if j+1 < len(container.Args) && !strings.HasPrefix(container.Args[j+1], "-") { + container.Args[j+1] = value + } else { + container.Args = append(container.Args[:j+1], append([]string{value}, container.Args[j+1:]...)...) + } + } + + return true + } + + return false +} + +func setArgsOnContainer(t *testing.T, container *v1.Container, args map[string]string) { + t.Helper() + t.Logf("Setting args %v on container %s", args, container.Name) + + for flag, value := range args { + found := false + + for j := 0; j < len(container.Args); j++ { + if tryUpdateExistingArg(container, j, flag, value) { + found = true + break + } + } + + if !found { + if value != "" { + container.Args = append(container.Args, flag+"="+value) + } else { + container.Args = append(container.Args, flag) + } + } + } +} + +func removeArgsFromContainer(container *v1.Container, args map[string]string) { + for flag := range args { + for j := 0; j < len(container.Args); j++ { + existingArg := container.Args[j] + + // Match --flag=value style + if strings.HasPrefix(existingArg, flag+"=") { + container.Args = append(container.Args[:j], container.Args[j+1:]...) + break + } + + // Match --flag or --flag value style + + if existingArg == flag { + if j+1 < len(container.Args) && !strings.HasPrefix(container.Args[j+1], "-") { + container.Args = append(container.Args[:j], container.Args[j+2:]...) + } else { + container.Args = append(container.Args[:j], container.Args[j+1:]...) + } + + break + } + } + } +} + +func GetDaemonSetPodOnWorkerNode(ctx context.Context, t *testing.T, client klient.Client, + daemonsetName string, podNamePattern string) (*v1.Pod, error) { + t.Helper() + + var resultPod *v1.Pod + + require.Eventually(t, func() bool { + // Get the pod + pod, err := GetPodOnWorkerNode(ctx, t, client, NVSentinelNamespace, podNamePattern) + if err != nil { + t.Logf("Failed to get pod: %v", err) + return false + } + + // Verify pod is not being deleted + if pod.DeletionTimestamp != nil { + t.Logf("Pod %s is being deleted, waiting for replacement", pod.Name) + return false + } + + // Verify pod is running and ready + if pod.Status.Phase != v1.PodRunning { + t.Logf("Pod %s is not in Running phase: %s", pod.Name, pod.Status.Phase) + return false + } + + // Check all containers are ready + for _, cond := range pod.Status.Conditions { + if cond.Type == v1.PodReady && cond.Status != v1.ConditionTrue { + t.Logf("Pod %s is not ready yet", pod.Name) + return false + } + } + + resultPod = pod + + return true + }, EventuallyWaitTimeout, WaitInterval, "daemonset pod from current rollout should be running and ready") + + if resultPod == nil { + return nil, fmt.Errorf("failed to get ready pod for daemonset %s", daemonsetName) + } + + return resultPod, nil +} diff --git a/tests/helpers/syslog-health-monitor.go b/tests/helpers/syslog-health-monitor.go new file mode 100644 index 000000000..5e26fad50 --- /dev/null +++ b/tests/helpers/syslog-health-monitor.go @@ -0,0 +1,112 @@ +// Copyright (c) 2025, NVIDIA CORPORATION. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package helpers + +import ( + "context" + "testing" + + "github.com/stretchr/testify/require" + v1 "k8s.io/api/core/v1" + "sigs.k8s.io/e2e-framework/klient" +) + +const ( + StubJournalHTTPPort = 9091 + SyslogContainerName = "syslog-health-monitor" + SyslogDaemonSetName = "syslog-health-monitor-regular" +) + +// helper function to set up syslog health monitor and port forward to it +func SetUpSyslogHealthMonitor(ctx context.Context, t *testing.T, + client klient.Client, args map[string]string) (string, *v1.Pod, chan struct{}) { + var err error + + var syslogPod *v1.Pod + + if args != nil { + err = UpdateDaemonSetArgs(ctx, t, client, SyslogDaemonSetName, SyslogContainerName, args) + require.NoError(t, err, "failed to update syslog health monitor processing strategy") + } + + syslogPod, err = GetDaemonSetPodOnWorkerNode(ctx, t, client, SyslogDaemonSetName, "syslog-health-monitor-regular") + require.NoError(t, err, "failed to get syslog health monitor pod on worker node") + require.NotNil(t, syslogPod, "syslog health monitor pod should exist on worker node") + + testNodeName := syslogPod.Spec.NodeName + t.Logf("Using syslog health monitor pod: %s on node: %s", syslogPod.Name, testNodeName) + + metadata := CreateTestMetadata(testNodeName) + InjectMetadata(t, ctx, client, NVSentinelNamespace, testNodeName, metadata) + + t.Logf("Setting up port-forward to pod %s on port %d", syslogPod.Name, StubJournalHTTPPort) + stopChan, readyChan := PortForwardPod( + ctx, + client.RESTConfig(), + syslogPod.Namespace, + syslogPod.Name, + StubJournalHTTPPort, + StubJournalHTTPPort, + ) + <-readyChan + t.Log("Port-forward ready") + + t.Logf("Setting ManagedByNVSentinel=false on node %s", testNodeName) + err = SetNodeManagedByNVSentinel(ctx, client, testNodeName, false) + require.NoError(t, err, "failed to set ManagedByNVSentinel label") + + return testNodeName, syslogPod, stopChan +} + +// helper function to roll back syslog health monitor daemonset and stop the port forward +func TearDownSyslogHealthMonitor(ctx context.Context, t *testing.T, client klient.Client, + nodeName string, stopChan chan struct{}, + args map[string]string, podName string) { + t.Log("Stopping port-forward") + close(stopChan) + + if args != nil { + RemoveDaemonSetArgs(ctx, t, client, SyslogDaemonSetName, SyslogContainerName, args) + } + + t.Logf("Restarting syslog-health-monitor pod %s to clear conditions", podName) + + err := DeletePod(ctx, client, NVSentinelNamespace, podName) + if err != nil { + t.Logf("Warning: failed to delete pod: %v", err) + } else { + t.Logf("Waiting for SysLogsXIDError condition to be cleared from node %s", nodeName) + require.Eventually(t, func() bool { + condition, err := CheckNodeConditionExists(ctx, client, nodeName, + "SysLogsXIDError", "SysLogsXIDErrorIsHealthy") + if err != nil { + t.Logf("Failed to check node condition: %v", err) + return false + } + + return condition != nil && condition.Status == v1.ConditionFalse + }, EventuallyWaitTimeout, WaitInterval, "SysLogsXIDError condition should be cleared") + } + + t.Logf("Cleaning up metadata from node %s", nodeName) + DeleteMetadata(t, ctx, client, NVSentinelNamespace, nodeName) + + t.Logf("Removing ManagedByNVSentinel label from node %s", nodeName) + + err = RemoveNodeManagedByNVSentinelLabel(ctx, client, nodeName) + if err != nil { + t.Logf("Warning: failed to remove label: %v", err) + } +} diff --git a/tests/syslog_health_monitor_test.go b/tests/syslog_health_monitor_test.go index c0b717823..755556292 100644 --- a/tests/syslog_health_monitor_test.go +++ b/tests/syslog_health_monitor_test.go @@ -22,20 +22,20 @@ import ( "strings" "testing" + "tests/helpers" + "github.com/stretchr/testify/require" v1 "k8s.io/api/core/v1" "sigs.k8s.io/e2e-framework/pkg/envconf" "sigs.k8s.io/e2e-framework/pkg/features" - "tests/helpers" ) type contextKey string const ( - stubJournalHTTPPort = 9091 - keySyslogNodeName contextKey = "nodeName" - keyStopChan contextKey = "stopChan" - keySyslogPodName contextKey = "syslogPodName" + keySyslogNodeName contextKey = "nodeName" + keyStopChan contextKey = "stopChan" + keySyslogPodName contextKey = "syslogPodName" ) // TestSyslogHealthMonitorXIDDetection tests burst XID injection and aggregation @@ -44,42 +44,16 @@ func TestSyslogHealthMonitorXIDDetection(t *testing.T) { WithLabel("suite", "syslog-health-monitor"). WithLabel("component", "xid-detection") - var testNodeName string - var syslogPod *v1.Pod - feature.Setup(func(ctx context.Context, t *testing.T, c *envconf.Config) context.Context { client, err := c.NewClient() require.NoError(t, err, "failed to create kubernetes client") - syslogPod, err = helpers.GetPodOnWorkerNode(ctx, t, client, helpers.NVSentinelNamespace, "syslog-health-monitor") - require.NoError(t, err, "failed to find syslog health monitor pod") - require.NotNil(t, syslogPod, "syslog health monitor pod should exist") - - testNodeName = syslogPod.Spec.NodeName - t.Logf("Using syslog health monitor pod: %s on node: %s", syslogPod.Name, testNodeName) - - metadata := helpers.CreateTestMetadata(testNodeName) - helpers.InjectMetadata(t, ctx, client, syslogPod.Namespace, testNodeName, metadata) - - t.Logf("Setting up port-forward to pod %s on port %d", syslogPod.Name, stubJournalHTTPPort) - stopChan, readyChan := helpers.PortForwardPod( - ctx, - client.RESTConfig(), - syslogPod.Namespace, - syslogPod.Name, - stubJournalHTTPPort, - stubJournalHTTPPort, - ) - <-readyChan - t.Log("Port-forward ready") - - t.Logf("Setting ManagedByNVSentinel=false on node %s", testNodeName) - err = helpers.SetNodeManagedByNVSentinel(ctx, client, testNodeName, false) - require.NoError(t, err, "failed to set ManagedByNVSentinel label") + testNodeName, syslogPod, stopChan := helpers.SetUpSyslogHealthMonitor(ctx, t, client, nil) ctx = context.WithValue(ctx, keySyslogNodeName, testNodeName) ctx = context.WithValue(ctx, keySyslogPodName, syslogPod.Name) ctx = context.WithValue(ctx, keyStopChan, stopChan) + return ctx }) @@ -111,7 +85,7 @@ func TestSyslogHealthMonitorXIDDetection(t *testing.T) { `ErrorCode:31 PCI:0001:00:00 GPU_UUID:GPU-[0-9a-fA-F-]+ kernel:.*?NVRM: Xid \(PCI:0001:00:00\): 31.*?Recommended Action=NONE`, } - helpers.InjectSyslogMessages(t, stubJournalHTTPPort, xidMessages) + helpers.InjectSyslogMessages(t, helpers.StubJournalHTTPPort, xidMessages) t.Log("Verifying node condition contains XID sequence with GPU UUIDs using regex patterns") require.Eventually(t, func() bool { @@ -145,7 +119,7 @@ func TestSyslogHealthMonitorXIDDetection(t *testing.T) { } t.Logf("Injecting %d additional XID messages to exceed 1KB limit", len(additionalXidMessages)) - helpers.InjectSyslogMessages(t, stubJournalHTTPPort, additionalXidMessages) + helpers.InjectSyslogMessages(t, helpers.StubJournalHTTPPort, additionalXidMessages) t.Log("Verifying node condition message is truncated to 1KB limit with exactly one truncation suffix") require.Eventually(t, func() bool { @@ -183,52 +157,14 @@ func TestSyslogHealthMonitorXIDDetection(t *testing.T) { }) feature.Teardown(func(ctx context.Context, t *testing.T, c *envconf.Config) context.Context { - if stopChanVal := ctx.Value(keyStopChan); stopChanVal != nil { - t.Log("Stopping port-forward") - close(stopChanVal.(chan struct{})) - } - client, err := c.NewClient() - if err != nil { - t.Logf("Warning: failed to create client for teardown: %v", err) - return ctx - } - - nodeNameVal := ctx.Value(keySyslogNodeName) - if nodeNameVal == nil { - t.Log("Skipping teardown: nodeName not set (setup likely failed early)") - return ctx - } - nodeName := nodeNameVal.(string) + require.NoError(t, err, "failed to create kubernetes client") - podNameVal := ctx.Value(keySyslogPodName) - if podNameVal != nil { - podName := podNameVal.(string) - t.Logf("Restarting syslog-health-monitor pod %s to clear conditions", podName) - err = helpers.DeletePod(ctx, client, helpers.NVSentinelNamespace, podName) - if err != nil { - t.Logf("Warning: failed to delete pod: %v", err) - } else { - t.Logf("Waiting for SysLogsXIDError condition to be cleared from node %s", nodeName) - require.Eventually(t, func() bool { - condition, err := helpers.CheckNodeConditionExists(ctx, client, nodeName, - "SysLogsXIDError", "SysLogsXIDErrorIsHealthy") - if err != nil { - return false - } - return condition != nil && condition.Status == v1.ConditionFalse - }, helpers.EventuallyWaitTimeout, helpers.WaitInterval, "SysLogsXIDError condition should be cleared") - } - } + nodeName := ctx.Value(keySyslogNodeName).(string) + syslogPod := ctx.Value(keySyslogPodName).(string) + stopChan := ctx.Value(keyStopChan).(chan struct{}) - t.Logf("Cleaning up metadata from node %s", nodeName) - helpers.DeleteMetadata(t, ctx, client, helpers.NVSentinelNamespace, nodeName) - - t.Logf("Removing ManagedByNVSentinel label from node %s", nodeName) - err = helpers.RemoveNodeManagedByNVSentinelLabel(ctx, client, nodeName) - if err != nil { - t.Logf("Warning: failed to remove ManagedByNVSentinel label: %v", err) - } + helpers.TearDownSyslogHealthMonitor(ctx, t, client, nodeName, stopChan, nil, syslogPod) return ctx }) @@ -256,14 +192,14 @@ func TestSyslogHealthMonitorXIDWithoutMetadata(t *testing.T) { testNodeName = syslogPod.Spec.NodeName t.Logf("Using syslog health monitor pod: %s on node: %s", syslogPod.Name, testNodeName) - t.Logf("Setting up port-forward to pod %s on port %d", syslogPod.Name, stubJournalHTTPPort) + t.Logf("Setting up port-forward to pod %s on port %d", syslogPod.Name, helpers.StubJournalHTTPPort) stopChan, readyChan := helpers.PortForwardPod( ctx, client.RESTConfig(), syslogPod.Namespace, syslogPod.Name, - stubJournalHTTPPort, - stubJournalHTTPPort, + helpers.StubJournalHTTPPort, + helpers.StubJournalHTTPPort, ) <-readyChan t.Log("Port-forward ready") @@ -286,7 +222,7 @@ func TestSyslogHealthMonitorXIDWithoutMetadata(t *testing.T) { xidMessage := "kernel: [16450076.435595] NVRM: Xid (PCI:0000:17:00): 79, pid=123456, name=test, GPU has fallen off the bus." - helpers.InjectSyslogMessages(t, stubJournalHTTPPort, []string{xidMessage}) + helpers.InjectSyslogMessages(t, helpers.StubJournalHTTPPort, []string{xidMessage}) t.Log("Verifying node condition is created without GPU UUID (metadata not available)") require.Eventually(t, func() bool { @@ -356,38 +292,11 @@ func TestSyslogHealthMonitorSXIDDetection(t *testing.T) { WithLabel("suite", "syslog-health-monitor"). WithLabel("component", "sxid-detection") - var testNodeName string - var syslogPod *v1.Pod - feature.Setup(func(ctx context.Context, t *testing.T, c *envconf.Config) context.Context { client, err := c.NewClient() require.NoError(t, err, "failed to create kubernetes client") - syslogPod, err = helpers.GetPodOnWorkerNode(ctx, t, client, helpers.NVSentinelNamespace, "syslog-health-monitor") - require.NoError(t, err, "failed to find syslog health monitor pod") - require.NotNil(t, syslogPod, "syslog health monitor pod should exist") - - testNodeName = syslogPod.Spec.NodeName - t.Logf("Using syslog health monitor pod: %s on node: %s", syslogPod.Name, testNodeName) - - metadata := helpers.CreateTestMetadata(testNodeName) - helpers.InjectMetadata(t, ctx, client, syslogPod.Namespace, testNodeName, metadata) - - t.Logf("Setting up port-forward to pod %s on port %d", syslogPod.Name, stubJournalHTTPPort) - stopChan, readyChan := helpers.PortForwardPod( - ctx, - client.RESTConfig(), - syslogPod.Namespace, - syslogPod.Name, - stubJournalHTTPPort, - stubJournalHTTPPort, - ) - <-readyChan - t.Log("Port-forward ready") - - t.Logf("Setting ManagedByNVSentinel=false on node %s", testNodeName) - err = helpers.SetNodeManagedByNVSentinel(ctx, client, testNodeName, false) - require.NoError(t, err, "failed to set ManagedByNVSentinel label") + testNodeName, syslogPod, stopChan := helpers.SetUpSyslogHealthMonitor(ctx, t, client, nil) ctx = context.WithValue(ctx, keySyslogNodeName, testNodeName) ctx = context.WithValue(ctx, keySyslogPodName, syslogPod.Name) @@ -416,7 +325,7 @@ func TestSyslogHealthMonitorSXIDDetection(t *testing.T) { `ErrorCode:24007 NVSWITCH:0 PCI:0000:c3:00\.0 NVLINK:30 GPU:1 GPU_UUID:GPU-[0-9a-fA-F-]+ kernel:.*?nvidia-nvswitch0: SXid \(PCI:0000:c3:00\.0\): 24007, Fatal, Link 30.*?Recommended Action=CONTACT_SUPPORT`, } - helpers.InjectSyslogMessages(t, stubJournalHTTPPort, sxidMessages) + helpers.InjectSyslogMessages(t, helpers.StubJournalHTTPPort, sxidMessages) t.Log("Verifying we got 2 non-fatal SXID Kubernetes Events with GPU UUIDs using regex patterns") require.Eventually(t, func() bool { @@ -434,52 +343,77 @@ func TestSyslogHealthMonitorSXIDDetection(t *testing.T) { }) feature.Teardown(func(ctx context.Context, t *testing.T, c *envconf.Config) context.Context { - if stopChanVal := ctx.Value(keyStopChan); stopChanVal != nil { - t.Log("Stopping port-forward") - close(stopChanVal.(chan struct{})) - } + client, err := c.NewClient() + require.NoError(t, err, "failed to create kubernetes client") + nodeName := ctx.Value(keySyslogNodeName).(string) + stopChan := ctx.Value(keyStopChan).(chan struct{}) + syslogPod := ctx.Value(keySyslogPodName).(string) + + helpers.TearDownSyslogHealthMonitor(ctx, t, client, nodeName, stopChan, nil, syslogPod) + + return ctx + }) + + testEnv.Test(t, feature.Feature()) +} + +// TestSyslogHealthMonitorStoreOnlyStrategy tests the STORE_ONLY event handling strategy +func TestSyslogHealthMonitorStoreOnlyStrategy(t *testing.T) { + feature := features.New("Syslog Health Monitor - Store Only Strategy"). + WithLabel("suite", "syslog-health-monitor"). + WithLabel("component", "store-only-strategy") + + feature.Setup(func(ctx context.Context, t *testing.T, c *envconf.Config) context.Context { client, err := c.NewClient() - if err != nil { - t.Logf("Warning: failed to create client for teardown: %v", err) - return ctx - } + require.NoError(t, err, "failed to create kubernetes client") - nodeNameVal := ctx.Value(keySyslogNodeName) - if nodeNameVal == nil { - t.Log("Skipping teardown: nodeName not set") - return ctx - } - nodeName := nodeNameVal.(string) + testNodeName, syslogPod, stopChan := helpers.SetUpSyslogHealthMonitor(ctx, t, client, map[string]string{ + "--processing-strategy": "STORE_ONLY", + }) - podNameVal := ctx.Value(keySyslogPodName) - if podNameVal != nil { - podName := podNameVal.(string) - t.Logf("Restarting syslog-health-monitor pod %s", podName) - err = helpers.DeletePod(ctx, client, helpers.NVSentinelNamespace, podName) - if err != nil { - t.Logf("Warning: failed to delete pod: %v", err) - } else { - require.Eventually(t, func() bool { - condition, err := helpers.CheckNodeConditionExists(ctx, client, nodeName, - "SysLogsSXIDError", "SysLogsSXIDErrorIsHealthy") - if err != nil { - return false - } - return condition != nil && condition.Status == v1.ConditionFalse - }, helpers.EventuallyWaitTimeout, helpers.WaitInterval, "Condition should be cleared") - } - } + ctx = context.WithValue(ctx, keySyslogNodeName, testNodeName) + ctx = context.WithValue(ctx, keyStopChan, stopChan) + ctx = context.WithValue(ctx, keySyslogPodName, syslogPod.Name) + return ctx + }) - t.Logf("Cleaning up metadata from node %s", nodeName) - helpers.DeleteMetadata(t, ctx, client, helpers.NVSentinelNamespace, nodeName) + feature.Assess("Inject XID errors and verify no node condition is created when running in STORE_ONLY strategy", func(ctx context.Context, t *testing.T, c *envconf.Config) context.Context { + client, err := c.NewClient() + require.NoError(t, err, "failed to create kubernetes client") - t.Logf("Removing ManagedByNVSentinel label from node %s", nodeName) - err = helpers.RemoveNodeManagedByNVSentinelLabel(ctx, client, nodeName) - if err != nil { - t.Logf("Warning: failed to remove label: %v", err) + nodeName := ctx.Value(keySyslogNodeName).(string) + + xidMessages := []string{ + "kernel: [123.456789] NVRM: Xid (PCI:0000:17:00): 79, pid=123456, name=test, GPU has fallen off the bus.", } + helpers.InjectSyslogMessages(t, helpers.StubJournalHTTPPort, xidMessages) + + t.Log("Verifying no node condition is created") + helpers.EnsureNodeConditionNotPresent(ctx, t, client, nodeName, "SysLogsXIDError") + + t.Log("Verifying node was not cordoned") + helpers.AssertQuarantineState(ctx, t, client, nodeName, helpers.QuarantineAssertion{ + ExpectCordoned: false, + ExpectAnnotation: false, + }) + + return ctx + }) + + feature.Teardown(func(ctx context.Context, t *testing.T, c *envconf.Config) context.Context { + client, err := c.NewClient() + require.NoError(t, err, "failed to create kubernetes client") + + nodeName := ctx.Value(keySyslogNodeName).(string) + stopChan := ctx.Value(keyStopChan).(chan struct{}) + syslogPod := ctx.Value(keySyslogPodName).(string) + + helpers.TearDownSyslogHealthMonitor(ctx, t, client, nodeName, stopChan, map[string]string{ + "--processing-strategy": "STORE_ONLY", + }, syslogPod) + return ctx })