Skip to content
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,8 @@ spec:
- "{{ join "," $root.Values.enabledChecks }}"
- "--metadata-path"
- "{{ $root.Values.global.metadataPath }}"
- "--processing-strategy"
- "{{ $root.Values.processingStrategy }}"
resources:
{{- toYaml $root.Values.resources | nindent 12 }}
ports:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,3 +50,10 @@ tolerations: []
journalHostPath: /var/log

logLevel: info

# Processing strategy for health events
# valid values: EXECUTE_REMEDIATION, STORE_ONLY
# default: EXECUTE_REMEDIATION
# EXECUTE_REMEDIATION: normal behavior; downstream modules may update cluster state.
# STORE_ONLY: observability-only behavior; event should be persisted/exported but should not modify cluster resources (i.e., no node conditions, no quarantine, no drain, no remediation).
processingStrategy: EXECUTE_REMEDIATION
12 changes: 12 additions & 0 deletions health-monitors/syslog-health-monitor/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@ var (
"Indicates if this monitor is running in Kata Containers mode (set by DaemonSet variant).")
metadataPath = flag.String("metadata-path", "/var/lib/nvsentinel/gpu_metadata.json",
"Path to GPU metadata JSON file.")
processingStrategyFlag = flag.String("processing-strategy", "EXECUTE_REMEDIATION",
"Event processing strategy: EXECUTE_REMEDIATION or STORE_ONLY")
)

var checks []fd.CheckDefinition
Expand Down Expand Up @@ -159,6 +161,15 @@ func run() error {

slog.Info("Creating syslog monitor", "checksCount", len(checks))

value, ok := pb.ProcessingStrategy_value[*processingStrategyFlag]
if !ok {
return fmt.Errorf("unexpected processingStrategy value: %q", *processingStrategyFlag)
}

slog.Info("Event handling strategy configured", "processingStrategy", *processingStrategyFlag)

processingStrategy := pb.ProcessingStrategy(value)

fdHealthMonitor, err := fd.NewSyslogMonitor(
nodeName,
checks,
Expand All @@ -169,6 +180,7 @@ func run() error {
*stateFileFlag,
*xidAnalyserEndpoint,
*metadataPath,
processingStrategy,
)
if err != nil {
return fmt.Errorf("error creating syslog health monitor: %w", err)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,17 @@ import (

// NewGPUFallenHandler creates a new GPUFallenHandler instance.
func NewGPUFallenHandler(nodeName, defaultAgentName,
defaultComponentClass, checkName string) (*GPUFallenHandler, error) {
defaultComponentClass, checkName string,
processingStrategy pb.ProcessingStrategy,
) (*GPUFallenHandler, error) {
ctx, cancel := context.WithCancel(context.Background())

h := &GPUFallenHandler{
nodeName: nodeName,
defaultAgentName: defaultAgentName,
defaultComponentClass: defaultComponentClass,
checkName: checkName,
processingStrategy: processingStrategy,
recentXIDs: make(map[string]xidRecord),
xidWindow: 5 * time.Minute, // Remember XIDs for 5 minutes
cancelCleanup: cancel,
Expand Down Expand Up @@ -232,6 +235,7 @@ func (h *GPUFallenHandler) createHealthEventFromError(event *gpuFallenErrorEvent
NodeName: h.nodeName,
RecommendedAction: pb.RecommendedAction_RESTART_BM,
ErrorCode: []string{"GPU_FALLEN_OFF_BUS"},
ProcessingStrategy: h.processingStrategy,
}

return &pb.HealthEvents{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,7 @@ func TestProcessLine(t *testing.T) {
"test-agent",
"GPU",
"test-check",
pb.ProcessingStrategy_STORE_ONLY,
)
require.NoError(t, err)
defer handler.Close()
Expand All @@ -181,6 +182,7 @@ func TestProcessLine(t *testing.T) {
if tc.validateEvent != nil {
tc.validateEvent(t, events, tc.message)
}
assert.Equal(t, pb.ProcessingStrategy_STORE_ONLY, events.Events[0].ProcessingStrategy)
} else {
assert.Nil(t, events, "Expected no event to be generated")
}
Expand All @@ -196,6 +198,7 @@ func TestXIDTracking(t *testing.T) {
"test-agent",
"GPU",
"test-check",
pb.ProcessingStrategy_EXECUTE_REMEDIATION,
)
require.NoError(t, err)
defer handler.Close() // Cleanup goroutine to prevent leaks
Expand All @@ -222,6 +225,7 @@ func TestXIDTracking(t *testing.T) {
"test-agent",
"GPU",
"test-check",
pb.ProcessingStrategy_EXECUTE_REMEDIATION,
)
require.NoError(t, err)
defer handler2.Close()
Expand All @@ -234,6 +238,7 @@ func TestXIDTracking(t *testing.T) {
require.NoError(t, err)
require.NotNil(t, events, "Should generate event when no recent XID")
require.Len(t, events.Events, 1)
assert.Equal(t, handler2.processingStrategy, events.Events[0].ProcessingStrategy)
})

t.Run("XID expires after time window", func(t *testing.T) {
Expand All @@ -243,6 +248,7 @@ func TestXIDTracking(t *testing.T) {
"test-agent",
"GPU",
"test-check",
pb.ProcessingStrategy_EXECUTE_REMEDIATION,
)
require.NoError(t, err)
defer handler3.Close()
Expand Down Expand Up @@ -270,6 +276,7 @@ func TestXIDTracking(t *testing.T) {
"test-agent",
"GPU",
"test-check",
pb.ProcessingStrategy_EXECUTE_REMEDIATION,
)
require.NoError(t, err)
defer handler4.Close()
Expand Down Expand Up @@ -298,6 +305,7 @@ func TestXIDTracking(t *testing.T) {
"test-agent",
"GPU",
"test-check",
pb.ProcessingStrategy_EXECUTE_REMEDIATION,
)
require.NoError(t, err)
defer handler5.Close()
Expand All @@ -315,6 +323,7 @@ func TestXIDTracking(t *testing.T) {
"test-agent",
"GPU",
"test-check",
pb.ProcessingStrategy_EXECUTE_REMEDIATION,
)
require.NoError(t, err)
defer handler6.Close()
Expand All @@ -340,6 +349,7 @@ func TestXIDTracking(t *testing.T) {
"test-agent",
"GPU",
"test-check",
pb.ProcessingStrategy_EXECUTE_REMEDIATION,
)
require.NoError(t, err)
defer handler7.Close()
Expand Down Expand Up @@ -379,6 +389,7 @@ func TestXIDTracking(t *testing.T) {
"test-agent",
"GPU",
"test-check",
pb.ProcessingStrategy_EXECUTE_REMEDIATION,
)
require.NoError(t, err)
defer handler8.Close()
Expand Down
3 changes: 3 additions & 0 deletions health-monitors/syslog-health-monitor/pkg/gpufallen/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ import (
"regexp"
"sync"
"time"

pb "github.com/nvidia/nvsentinel/data-models/pkg/protos"
)

var (
Expand Down Expand Up @@ -47,6 +49,7 @@ type GPUFallenHandler struct {
defaultAgentName string
defaultComponentClass string
checkName string
processingStrategy pb.ProcessingStrategy
mu sync.RWMutex
recentXIDs map[string]xidRecord // pciAddr -> XID record
xidWindow time.Duration // how long to remember XID errors
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,15 @@ import (
)

func NewSXIDHandler(nodeName, defaultAgentName,
defaultComponentClass, checkName, metadataPath string) (*SXIDHandler, error) {
defaultComponentClass, checkName, metadataPath string,
processingStrategy pb.ProcessingStrategy,
) (*SXIDHandler, error) {
return &SXIDHandler{
nodeName: nodeName,
defaultAgentName: defaultAgentName,
defaultComponentClass: defaultComponentClass,
checkName: checkName,
processingStrategy: processingStrategy,
metadataReader: metadata.NewReader(metadataPath),
}, nil
}
Expand Down Expand Up @@ -103,6 +106,7 @@ func (sxidHandler *SXIDHandler) ProcessLine(message string) (*pb.HealthEvents, e
RecommendedAction: errRes,
ErrorCode: []string{fmt.Sprint(sxidErrorEvent.ErrorNum)},
Metadata: metadata,
ProcessingStrategy: sxidHandler.processingStrategy,
}

return &pb.HealthEvents{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,11 @@
package sxid

import (
"os"
"path/filepath"
"testing"

pb "github.com/nvidia/nvsentinel/data-models/pkg/protos"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
Expand All @@ -40,7 +43,7 @@ func TestNewSXIDHandler(t *testing.T) {

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
handler, err := NewSXIDHandler(tc.nodeName, tc.agentName, tc.componentClass, tc.checkName, "/tmp/metadata.json")
handler, err := NewSXIDHandler(tc.nodeName, tc.agentName, tc.componentClass, tc.checkName, "/tmp/metadata.json", pb.ProcessingStrategy_EXECUTE_REMEDIATION)

require.NoError(t, err)
require.NotNil(t, handler)
Expand Down Expand Up @@ -137,6 +140,61 @@ func TestExtractInfoFromNVSwitchErrorMsg(t *testing.T) {
}
}

func TestProcessLineWithValidTopologyFatalSXID(t *testing.T) {
// Create temp metadata file with valid NVSwitch topology
metadataJSON := `{
"version": "1.0",
"timestamp": "2025-01-01T00:00:00Z",
"node_name": "test-node",
"gpus": [
{
"gpu_id": 1,
"uuid": "GPU-aaaabbbb-cccc-dddd-eeee-ffffffffffff",
"pci_address": "0000:18:00.0",
"serial_number": "GPU-SN-002",
"device_name": "NVIDIA H100",
"nvlinks": [
{
"link_id": 2,
"remote_pci_address": "0000:c3:00.0",
"remote_link_id": 28
}
]
}
],
"nvswitches": ["0000:c3:00.0"]
}`

tmpDir := t.TempDir()
metadataPath := filepath.Join(tmpDir, "gpu_metadata.json")
err := os.WriteFile(metadataPath, []byte(metadataJSON), 0o644)
require.NoError(t, err)

handler, err := NewSXIDHandler("test-node", "test-agent", "NVSWITCH", "sxid-check", metadataPath, pb.ProcessingStrategy_STORE_ONLY)
require.NoError(t, err)

message := "[ 1108.858286] nvidia-nvswitch0: SXid (PCI:0000:c3:00.0): 24007, Fatal, Link 28 sourcetrack timeout error (First)"
events, err := handler.ProcessLine(message)

require.NoError(t, err)
require.NotNil(t, events)
require.Len(t, events.Events, 1)

event := events.Events[0]
assert.Equal(t, message, event.Message)
assert.Equal(t, []string{"24007"}, event.ErrorCode)
assert.True(t, event.IsFatal)
assert.False(t, event.IsHealthy)
assert.Equal(t, pb.RecommendedAction_CONTACT_SUPPORT, event.RecommendedAction)
assert.Equal(t, pb.ProcessingStrategy_STORE_ONLY, event.ProcessingStrategy)

// Verify GPU entity
assert.Equal(t, "GPU", event.EntitiesImpacted[3].EntityType)
assert.Equal(t, "1", event.EntitiesImpacted[3].EntityValue)
assert.Equal(t, "GPU_UUID", event.EntitiesImpacted[4].EntityType)
assert.Equal(t, "GPU-aaaabbbb-cccc-dddd-eeee-ffffffffffff", event.EntitiesImpacted[4].EntityValue)
}

func TestProcessLine(t *testing.T) {
testCases := []struct {
name string
Expand Down Expand Up @@ -166,7 +224,7 @@ func TestProcessLine(t *testing.T) {

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
handler, err := NewSXIDHandler("test-node", "test-agent", "NVSWITCH", "sxid-check", "/tmp/metadata.json")
handler, err := NewSXIDHandler("test-node", "test-agent", "NVSWITCH", "sxid-check", "/tmp/metadata.json", pb.ProcessingStrategy_EXECUTE_REMEDIATION)
require.NoError(t, err)

events, err := handler.ProcessLine(tc.message)
Expand All @@ -182,6 +240,7 @@ func TestProcessLine(t *testing.T) {
event := events.Events[0]
assert.Equal(t, tc.message, event.Message)
assert.NotEmpty(t, event.Metadata)
assert.Equal(t, pb.ProcessingStrategy_EXECUTE_REMEDIATION, event.ProcessingStrategy)
} else {
assert.Nil(t, events)
}
Expand Down
2 changes: 2 additions & 0 deletions health-monitors/syslog-health-monitor/pkg/sxid/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package sxid
import (
"regexp"

pb "github.com/nvidia/nvsentinel/data-models/pkg/protos"
"github.com/nvidia/nvsentinel/health-monitors/syslog-health-monitor/pkg/metadata"
)

Expand All @@ -30,6 +31,7 @@ type SXIDHandler struct {
defaultAgentName string
defaultComponentClass string
checkName string
processingStrategy pb.ProcessingStrategy
metadataReader *metadata.Reader
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,12 @@ func NewSyslogMonitor(
stateFilePath string,
xidAnalyserEndpoint string,
metadataPath string,
processingStrategy pb.ProcessingStrategy,
) (*SyslogMonitor, error) {
return NewSyslogMonitorWithFactory(nodeName, checks, pcClient, defaultAgentName,
defaultComponentClass, pollingInterval, stateFilePath, GetDefaultJournalFactory(),
xidAnalyserEndpoint, metadataPath,
processingStrategy,
)
}

Expand All @@ -69,6 +71,7 @@ func NewSyslogMonitorWithFactory(
journalFactory JournalFactory,
xidAnalyserEndpoint string,
metadataPath string,
processingStrategy pb.ProcessingStrategy,
) (*SyslogMonitor, error) {
// Load state from file
state, err := loadState(stateFilePath)
Expand All @@ -90,6 +93,7 @@ func NewSyslogMonitorWithFactory(
pcClient: pcClient,
defaultAgentName: defaultAgentName,
defaultComponentClass: defaultComponentClass,
processingStrategy: processingStrategy,
pollingInterval: pollingInterval,
checkLastCursors: state.CheckLastCursors,
journalFactory: journalFactory,
Expand All @@ -103,7 +107,7 @@ func NewSyslogMonitorWithFactory(
switch check.Name {
case XIDErrorCheck:
xidHandler, err := xid.NewXIDHandler(nodeName,
defaultAgentName, defaultComponentClass, check.Name, xidAnalyserEndpoint, metadataPath)
defaultAgentName, defaultComponentClass, check.Name, xidAnalyserEndpoint, metadataPath, processingStrategy)
if err != nil {
slog.Error("Error initializing XID handler", "error", err.Error())
return nil, fmt.Errorf("failed to initialize XID handler: %w", err)
Expand All @@ -113,7 +117,7 @@ func NewSyslogMonitorWithFactory(

case SXIDErrorCheck:
sxidHandler, err := sxid.NewSXIDHandler(
nodeName, defaultAgentName, defaultComponentClass, check.Name, metadataPath)
nodeName, defaultAgentName, defaultComponentClass, check.Name, metadataPath, processingStrategy)
if err != nil {
slog.Error("Error initializing SXID handler", "error", err.Error())
return nil, fmt.Errorf("failed to initialize SXID handler: %w", err)
Expand All @@ -123,7 +127,7 @@ func NewSyslogMonitorWithFactory(

case GPUFallenOffCheck:
gpuFallenHandler, err := gpufallen.NewGPUFallenHandler(
nodeName, defaultAgentName, defaultComponentClass, check.Name)
nodeName, defaultAgentName, defaultComponentClass, check.Name, processingStrategy)
if err != nil {
slog.Error("Error initializing GPU Fallen Off handler", "error", err.Error())
return nil, fmt.Errorf("failed to initialize GPU Fallen Off handler: %w", err)
Expand Down Expand Up @@ -806,6 +810,7 @@ func (sm *SyslogMonitor) prepareHealthEventWithAction(
IsHealthy: isHealthy,
NodeName: sm.nodeName,
RecommendedAction: errRes.RecommendedAction,
ProcessingStrategy: sm.processingStrategy,
}

return &pb.HealthEvents{
Expand Down
Loading
Loading