diff --git a/data-models/pkg/protos/health_event.pb.go b/data-models/pkg/protos/health_event.pb.go index 9dec31f00..518125d81 100644 --- a/data-models/pkg/protos/health_event.pb.go +++ b/data-models/pkg/protos/health_event.pb.go @@ -37,6 +37,59 @@ const ( _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) ) +// ProcessingStrategy defines how downstream modules should handle the event. +// UNSPECIFIED: events without an explicit strategy use this default, which platform-connector normalizes to EXECUTE_REMEDIATION. +// EXECUTE_REMEDIATION: normal behavior; downstream modules may update cluster state. +// STORE_ONLY: observability-only behavior; event should be persisted/exported but should not modify cluster resources. +type ProcessingStrategy int32 + +const ( + ProcessingStrategy_UNSPECIFIED ProcessingStrategy = 0 + ProcessingStrategy_EXECUTE_REMEDIATION ProcessingStrategy = 1 + ProcessingStrategy_STORE_ONLY ProcessingStrategy = 2 +) + +// Enum value maps for ProcessingStrategy. +var ( + ProcessingStrategy_name = map[int32]string{ + 0: "UNSPECIFIED", + 1: "EXECUTE_REMEDIATION", + 2: "STORE_ONLY", + } + ProcessingStrategy_value = map[string]int32{ + "UNSPECIFIED": 0, + "EXECUTE_REMEDIATION": 1, + "STORE_ONLY": 2, + } +) + +func (x ProcessingStrategy) Enum() *ProcessingStrategy { + p := new(ProcessingStrategy) + *p = x + return p +} + +func (x ProcessingStrategy) String() string { + return protoimpl.X.EnumStringOf(x.Descriptor(), protoreflect.EnumNumber(x)) +} + +func (ProcessingStrategy) Descriptor() protoreflect.EnumDescriptor { + return file_health_event_proto_enumTypes[0].Descriptor() +} + +func (ProcessingStrategy) Type() protoreflect.EnumType { + return &file_health_event_proto_enumTypes[0] +} + +func (x ProcessingStrategy) Number() protoreflect.EnumNumber { + return protoreflect.EnumNumber(x) +} + +// Deprecated: Use ProcessingStrategy.Descriptor instead. +func (ProcessingStrategy) EnumDescriptor() ([]byte, []int) { + return file_health_event_proto_rawDescGZIP(), []int{0} +} + type RecommendedAction int32 const ( @@ -88,11 +141,11 @@ func (x RecommendedAction) String() string { } func (RecommendedAction) Descriptor() protoreflect.EnumDescriptor { - return file_health_event_proto_enumTypes[0].Descriptor() + return file_health_event_proto_enumTypes[1].Descriptor() } func (RecommendedAction) Type() protoreflect.EnumType { - return &file_health_event_proto_enumTypes[0] + return &file_health_event_proto_enumTypes[1] } func (x RecommendedAction) Number() protoreflect.EnumNumber { @@ -101,7 +154,7 @@ func (x RecommendedAction) Number() protoreflect.EnumNumber { // Deprecated: Use RecommendedAction.Descriptor instead. func (RecommendedAction) EnumDescriptor() ([]byte, []int) { - return file_health_event_proto_rawDescGZIP(), []int{0} + return file_health_event_proto_rawDescGZIP(), []int{1} } type HealthEvents struct { @@ -225,6 +278,7 @@ type HealthEvent struct { NodeName string `protobuf:"bytes,13,opt,name=nodeName,proto3" json:"nodeName,omitempty"` QuarantineOverrides *BehaviourOverrides `protobuf:"bytes,14,opt,name=quarantineOverrides,proto3" json:"quarantineOverrides,omitempty"` DrainOverrides *BehaviourOverrides `protobuf:"bytes,15,opt,name=drainOverrides,proto3" json:"drainOverrides,omitempty"` + ProcessingStrategy ProcessingStrategy `protobuf:"varint,16,opt,name=processingStrategy,proto3,enum=datamodels.ProcessingStrategy" json:"processingStrategy,omitempty"` unknownFields protoimpl.UnknownFields sizeCache protoimpl.SizeCache } @@ -364,6 +418,13 @@ func (x *HealthEvent) GetDrainOverrides() *BehaviourOverrides { return nil } +func (x *HealthEvent) GetProcessingStrategy() ProcessingStrategy { + if x != nil { + return x.ProcessingStrategy + } + return ProcessingStrategy_UNSPECIFIED +} + type BehaviourOverrides struct { state protoimpl.MessageState `protogen:"open.v1"` Force bool `protobuf:"varint,1,opt,name=force,proto3" json:"force,omitempty"` @@ -429,7 +490,7 @@ const file_health_event_proto_rawDesc = "" + "\n" + "entityType\x18\x01 \x01(\tR\n" + "entityType\x12 \n" + - "\ventityValue\x18\x02 \x01(\tR\ventityValue\"\x82\x06\n" + + "\ventityValue\x18\x02 \x01(\tR\ventityValue\"\xd2\x06\n" + "\vHealthEvent\x12\x18\n" + "\aversion\x18\x01 \x01(\rR\aversion\x12\x14\n" + "\x05agent\x18\x02 \x01(\tR\x05agent\x12&\n" + @@ -446,13 +507,19 @@ const file_health_event_proto_rawDesc = "" + "\x12generatedTimestamp\x18\f \x01(\v2\x1a.google.protobuf.TimestampR\x12generatedTimestamp\x12\x1a\n" + "\bnodeName\x18\r \x01(\tR\bnodeName\x12P\n" + "\x13quarantineOverrides\x18\x0e \x01(\v2\x1e.datamodels.BehaviourOverridesR\x13quarantineOverrides\x12F\n" + - "\x0edrainOverrides\x18\x0f \x01(\v2\x1e.datamodels.BehaviourOverridesR\x0edrainOverrides\x1a;\n" + + "\x0edrainOverrides\x18\x0f \x01(\v2\x1e.datamodels.BehaviourOverridesR\x0edrainOverrides\x12N\n" + + "\x12processingStrategy\x18\x10 \x01(\x0e2\x1e.datamodels.ProcessingStrategyR\x12processingStrategy\x1a;\n" + "\rMetadataEntry\x12\x10\n" + "\x03key\x18\x01 \x01(\tR\x03key\x12\x14\n" + "\x05value\x18\x02 \x01(\tR\x05value:\x028\x01\">\n" + "\x12BehaviourOverrides\x12\x14\n" + "\x05force\x18\x01 \x01(\bR\x05force\x12\x12\n" + - "\x04skip\x18\x02 \x01(\bR\x04skip*\xa8\x01\n" + + "\x04skip\x18\x02 \x01(\bR\x04skip*N\n" + + "\x12ProcessingStrategy\x12\x0f\n" + + "\vUNSPECIFIED\x10\x00\x12\x17\n" + + "\x13EXECUTE_REMEDIATION\x10\x01\x12\x0e\n" + + "\n" + + "STORE_ONLY\x10\x02*\xa8\x01\n" + "\x11RecommendedAction\x12\b\n" + "\x04NONE\x10\x00\x12\x13\n" + "\x0fCOMPONENT_RESET\x10\x02\x12\x13\n" + @@ -481,33 +548,35 @@ func file_health_event_proto_rawDescGZIP() []byte { return file_health_event_proto_rawDescData } -var file_health_event_proto_enumTypes = make([]protoimpl.EnumInfo, 1) +var file_health_event_proto_enumTypes = make([]protoimpl.EnumInfo, 2) var file_health_event_proto_msgTypes = make([]protoimpl.MessageInfo, 5) var file_health_event_proto_goTypes = []any{ - (RecommendedAction)(0), // 0: datamodels.RecommendedAction - (*HealthEvents)(nil), // 1: datamodels.HealthEvents - (*Entity)(nil), // 2: datamodels.Entity - (*HealthEvent)(nil), // 3: datamodels.HealthEvent - (*BehaviourOverrides)(nil), // 4: datamodels.BehaviourOverrides - nil, // 5: datamodels.HealthEvent.MetadataEntry - (*timestamppb.Timestamp)(nil), // 6: google.protobuf.Timestamp - (*emptypb.Empty)(nil), // 7: google.protobuf.Empty + (ProcessingStrategy)(0), // 0: datamodels.ProcessingStrategy + (RecommendedAction)(0), // 1: datamodels.RecommendedAction + (*HealthEvents)(nil), // 2: datamodels.HealthEvents + (*Entity)(nil), // 3: datamodels.Entity + (*HealthEvent)(nil), // 4: datamodels.HealthEvent + (*BehaviourOverrides)(nil), // 5: datamodels.BehaviourOverrides + nil, // 6: datamodels.HealthEvent.MetadataEntry + (*timestamppb.Timestamp)(nil), // 7: google.protobuf.Timestamp + (*emptypb.Empty)(nil), // 8: google.protobuf.Empty } var file_health_event_proto_depIdxs = []int32{ - 3, // 0: datamodels.HealthEvents.events:type_name -> datamodels.HealthEvent - 0, // 1: datamodels.HealthEvent.recommendedAction:type_name -> datamodels.RecommendedAction - 2, // 2: datamodels.HealthEvent.entitiesImpacted:type_name -> datamodels.Entity - 5, // 3: datamodels.HealthEvent.metadata:type_name -> datamodels.HealthEvent.MetadataEntry - 6, // 4: datamodels.HealthEvent.generatedTimestamp:type_name -> google.protobuf.Timestamp - 4, // 5: datamodels.HealthEvent.quarantineOverrides:type_name -> datamodels.BehaviourOverrides - 4, // 6: datamodels.HealthEvent.drainOverrides:type_name -> datamodels.BehaviourOverrides - 1, // 7: datamodels.PlatformConnector.HealthEventOccurredV1:input_type -> datamodels.HealthEvents - 7, // 8: datamodels.PlatformConnector.HealthEventOccurredV1:output_type -> google.protobuf.Empty - 8, // [8:9] is the sub-list for method output_type - 7, // [7:8] is the sub-list for method input_type - 7, // [7:7] is the sub-list for extension type_name - 7, // [7:7] is the sub-list for extension extendee - 0, // [0:7] is the sub-list for field type_name + 4, // 0: datamodels.HealthEvents.events:type_name -> datamodels.HealthEvent + 1, // 1: datamodels.HealthEvent.recommendedAction:type_name -> datamodels.RecommendedAction + 3, // 2: datamodels.HealthEvent.entitiesImpacted:type_name -> datamodels.Entity + 6, // 3: datamodels.HealthEvent.metadata:type_name -> datamodels.HealthEvent.MetadataEntry + 7, // 4: datamodels.HealthEvent.generatedTimestamp:type_name -> google.protobuf.Timestamp + 5, // 5: datamodels.HealthEvent.quarantineOverrides:type_name -> datamodels.BehaviourOverrides + 5, // 6: datamodels.HealthEvent.drainOverrides:type_name -> datamodels.BehaviourOverrides + 0, // 7: datamodels.HealthEvent.processingStrategy:type_name -> datamodels.ProcessingStrategy + 2, // 8: datamodels.PlatformConnector.HealthEventOccurredV1:input_type -> datamodels.HealthEvents + 8, // 9: datamodels.PlatformConnector.HealthEventOccurredV1:output_type -> google.protobuf.Empty + 9, // [9:10] is the sub-list for method output_type + 8, // [8:9] is the sub-list for method input_type + 8, // [8:8] is the sub-list for extension type_name + 8, // [8:8] is the sub-list for extension extendee + 0, // [0:8] is the sub-list for field type_name } func init() { file_health_event_proto_init() } @@ -520,7 +589,7 @@ func file_health_event_proto_init() { File: protoimpl.DescBuilder{ GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: unsafe.Slice(unsafe.StringData(file_health_event_proto_rawDesc), len(file_health_event_proto_rawDesc)), - NumEnums: 1, + NumEnums: 2, NumMessages: 5, NumExtensions: 0, NumServices: 1, diff --git a/data-models/protobufs/health_event.proto b/data-models/protobufs/health_event.proto index 3bce219c0..39cbbe670 100644 --- a/data-models/protobufs/health_event.proto +++ b/data-models/protobufs/health_event.proto @@ -29,6 +29,16 @@ message HealthEvents { repeated HealthEvent events = 2; } +// ProcessingStrategy defines how downstream modules should handle the event. +// UNSPECIFIED: events without an explicit strategy use this default, which platform-connector normalizes to EXECUTE_REMEDIATION. +// EXECUTE_REMEDIATION: normal behavior; downstream modules may update cluster state. +// STORE_ONLY: observability-only behavior; event should be persisted/exported but should not modify cluster resources. +enum ProcessingStrategy { + UNSPECIFIED = 0; + EXECUTE_REMEDIATION = 1; + STORE_ONLY = 2; +} + enum RecommendedAction { NONE = 0; COMPONENT_RESET = 2; @@ -66,6 +76,7 @@ message HealthEvent { string nodeName = 13; BehaviourOverrides quarantineOverrides = 14; BehaviourOverrides drainOverrides = 15; + ProcessingStrategy processingStrategy = 16; } message BehaviourOverrides { diff --git a/docs/designs/025-processing-strategy-for-health-checks.md b/docs/designs/025-processing-strategy-for-health-checks.md index 3b450e6d1..61b04d517 100644 --- a/docs/designs/025-processing-strategy-for-health-checks.md +++ b/docs/designs/025-processing-strategy-for-health-checks.md @@ -99,8 +99,9 @@ When a health monitor wants observability-only behavior, it publishes health eve ```protobuf enum ProcessingStrategy { - EXECUTE_REMEDIATION = 0; - STORE_ONLY = 1; + UNSPECIFIED = 0; + EXECUTE_REMEDIATION = 1; + STORE_ONLY = 2; } message HealthEvent { @@ -338,18 +339,39 @@ For `processingStrategy=STORE_ONLY` events, the platform connector should: 2. Not create node conditions 3. Not create Kubernetes events -File: `platform-connectors/pkg/connectors/kubernetes/process_node_events.go` +**Normalization:** Platform connector normalizes `processingStrategy` in the gRPC server handler before any processing or enqueuing. If the field is missing or set to `UNSPECIFIED` (e.g., from custom monitors), it defaults to `EXECUTE_REMEDIATION`. This ensures all events in the database have an explicit strategy, providing consistent behavior across all modules. + +File: `platform-connectors/pkg/server/platform_connector_server.go` -Add skip logic in `processHealthEvents()` method to skip adding node condition/event: +Add normalization in `HealthEventOccurredV1()` method before enqueuing to ring buffers: ```go -func (r *K8sConnector) processHealthEvents(ctx context.Context, healthEvents *protos.HealthEvents) error { - var nodeConditions []corev1.NodeCondition +func (p *PlatformConnectorServer) HealthEventOccurredV1(ctx context.Context, + he *pb.HealthEvents) (*empty.Empty, error) { + slog.Info("Health events received", "events", he) + + healthEventsReceived.Add(float64(len(he.Events))) + + // Custom monitors that don't set processingStrategy will default to EXECUTE_REMEDIATION. + for _, event := range he.Events { + if event.ProcessingStrategy == pb.ProcessingStrategy_UNSPECIFIED { + event.ProcessingStrategy = pb.ProcessingStrategy_EXECUTE_REMEDIATION + } + } + + // ... pipeline processing and enqueuing to ring buffers +} +``` + +File: `platform-connectors/pkg/connectors/kubernetes/process_node_events.go` + +Add skip logic for `STORE_ONLY` events in `filterProcessableEvents()` method: - // NEW: Filter out STORE_ONLY events - they should not modify node conditions or create K8s events +```go +func filterProcessableEvents(healthEvents *protos.HealthEvents) []*protos.HealthEvent { var processableEvents []*protos.HealthEvent for _, healthEvent := range healthEvents.Events { - if healthEvent.ProcessingStrategy == protos.STORE_ONLY { + if healthEvent.ProcessingStrategy == protos.ProcessingStrategy_STORE_ONLY { slog.Info("Skipping STORE_ONLY event - no node conditions or K8s events will be created", "node", healthEvent.NodeName, "checkName", healthEvent.CheckName) @@ -358,7 +380,7 @@ func (r *K8sConnector) processHealthEvents(ctx context.Context, healthEvents *pr processableEvents = append(processableEvents, healthEvent) } - // ... existing code + return processableEvents } ``` --- @@ -404,7 +426,12 @@ We need new methods in mongodb and postgres pipeline builder: File: `store-client/pkg/client/mongodb_pipeline_builder.go` ```go -// BuildProcessableHealthEventInsertsPipeline creates a pipeline that watches for all processable health event inserts. +// BuildProcessableHealthEventInsertsPipeline creates a pipeline that watches for +// all EXECUTE_REMEDIATION health event inserts. +// +// Backward Compatibility: This pipeline uses $or to match events where processingstrategy is either: +// - EXECUTE_REMEDIATION (new events from NVSentinel health monitors) +// - Missing/null (old events created before update, custom monitors, or circuit breaker backlog) func (b *MongoDBPipelineBuilder) BuildProcessableHealthEventInsertsPipeline() datastore.Pipeline { return datastore.ToPipeline( datastore.D( @@ -412,14 +439,20 @@ func (b *MongoDBPipelineBuilder) BuildProcessableHealthEventInsertsPipeline() da datastore.E("operationType", datastore.D( datastore.E("$in", datastore.A("insert")), )), - datastore.E("fullDocument.healthevent.processingstrategy", "EXECUTE_REMEDIATION"), + // Exclude STORE_ONLY events, but include EXECUTE_REMEDIATION and missing field + datastore.E("$or", datastore.A( + datastore.D(datastore.E("fullDocument.healthevent.processingstrategy", int32(protos.ProcessingStrategy_EXECUTE_REMEDIATION))), + datastore.D(datastore.E("fullDocument.healthevent.processingstrategy", datastore.D(datastore.E("$exists", false)))), + )), )), ), ) } -// BuildProcessableNonFatalUnhealthyInsertsPipeline creates a pipeline for non-fatal unhealthy events -// excluding STORE_ONLY events. +// BuildProcessableNonFatalUnhealthyInsertsPipeline creates a pipeline for non-fatal, unhealthy event inserts +// excluding STORE_ONLY events. This is used by health-events-analyzer for pattern analysis. +// +// Backward Compatibility: Uses $or to include EXECUTE_REMEDIATION and missing field. func (b *MongoDBPipelineBuilder) BuildProcessableNonFatalUnhealthyInsertsPipeline() datastore.Pipeline { return datastore.ToPipeline( datastore.D( @@ -427,12 +460,15 @@ func (b *MongoDBPipelineBuilder) BuildProcessableNonFatalUnhealthyInsertsPipelin datastore.E("operationType", "insert"), datastore.E("fullDocument.healthevent.agent", datastore.D(datastore.E("$ne", "health-events-analyzer"))), datastore.E("fullDocument.healthevent.ishealthy", false), - datastore.E("fullDocument.healthevent.processingstrategy", "EXECUTE_REMEDIATION"), + // Exclude STORE_ONLY events, but include EXECUTE_REMEDIATION and missing field + datastore.E("$or", datastore.A( + datastore.D(datastore.E("fullDocument.healthevent.processingstrategy", int32(protos.ProcessingStrategy_EXECUTE_REMEDIATION))), + datastore.D(datastore.E("fullDocument.healthevent.processingstrategy", datastore.D(datastore.E("$exists", false)))), + )), )), ), ) } - ``` Same method required in postgres builder @@ -441,7 +477,9 @@ File: `store-client/pkg/client/postgresql_pipeline_builder.go` ```go // BuildProcessableHealthEventInsertsPipeline creates a pipeline that watches for health event inserts -// excluding STORE_ONLY events. +// with processingStrategy=EXECUTE_REMEDIATION +// +// Backward Compatibility: Uses $or to include EXECUTE_REMEDIATION and missing field. func (b *PostgreSQLPipelineBuilder) BuildProcessableHealthEventInsertsPipeline() datastore.Pipeline { return datastore.ToPipeline( datastore.D( @@ -449,14 +487,20 @@ func (b *PostgreSQLPipelineBuilder) BuildProcessableHealthEventInsertsPipeline() datastore.E("operationType", datastore.D( datastore.E("$in", datastore.A("insert")), )), - datastore.E("fullDocument.healthevent.processingstrategy", "EXECUTE_REMEDIATION"), + // Exclude STORE_ONLY events, but include EXECUTE_REMEDIATION and missing field + datastore.E("$or", datastore.A( + datastore.D(datastore.E("fullDocument.healthevent.processingstrategy", int32(protos.ProcessingStrategy_EXECUTE_REMEDIATION))), + datastore.D(datastore.E("fullDocument.healthevent.processingstrategy", datastore.D(datastore.E("$exists", false)))), + )), )), ), ) } -// BuildProcessableNonFatalUnhealthyInsertsPipeline creates a pipeline for non-fatal unhealthy events -// excluding STORE_ONLY events. +// BuildProcessableNonFatalUnhealthyInsertsPipeline creates a pipeline for non-fatal, unhealthy event inserts +// excluding STORE_ONLY events. For PostgreSQL, handles both INSERT and UPDATE operations. +// +// Backward Compatibility: Uses $or to include EXECUTE_REMEDIATION and missing field. func (b *PostgreSQLPipelineBuilder) BuildProcessableNonFatalUnhealthyInsertsPipeline() datastore.Pipeline { return datastore.ToPipeline( datastore.D( @@ -464,12 +508,15 @@ func (b *PostgreSQLPipelineBuilder) BuildProcessableNonFatalUnhealthyInsertsPipe datastore.E("operationType", datastore.D(datastore.E("$in", datastore.A("insert", "update")))), datastore.E("fullDocument.healthevent.agent", datastore.D(datastore.E("$ne", "health-events-analyzer"))), datastore.E("fullDocument.healthevent.ishealthy", false), - datastore.E("fullDocument.healthevent.processingstrategy", "EXECUTE_REMEDIATION"), + // Exclude STORE_ONLY events, but include EXECUTE_REMEDIATION and missing field + datastore.E("$or", datastore.A( + datastore.D(datastore.E("fullDocument.healthevent.processingstrategy", int32(protos.ProcessingStrategy_EXECUTE_REMEDIATION))), + datastore.D(datastore.E("fullDocument.healthevent.processingstrategy", datastore.D(datastore.E("$exists", false)))), + )), )), ), ) } - ``` --- @@ -532,6 +579,8 @@ func createPipeline() interface{} { Update the default pipeline query to exclude `processingStrategy=STORE_ONLY` events. We need this condition for every rule that's why we are adding it at code level instead of keeping it at config file level. +**Backward Compatibility Note:** Historical events in the database (created before this feature) won't have the `processingstrategy` field. These old events should be treated as `EXECUTE_REMEDIATION` (they were meant to be processed). We use `$or` to explicitly match both `EXECUTE_REMEDIATION` and a missing field to ensure backward compatibility. Other modules and health monitors do not require changes since they only act on newly inserted events, which will already have the `processingStrategy` field set (all health monitors run in `EXECUTE_REMEDIATION` mode by default). + File: `health-events-analyzer/pkg/reconciler/reconciler.go` ```go @@ -542,11 +591,18 @@ func (r *Reconciler) getPipelineStages( // CRITICAL: Always start with agent filter to exclude events from health-events-analyzer itself // This prevents the analyzer from matching its own generated events, which would cause // infinite loops and incorrect rule evaluations + // + // Backward Compatibility: Use $or to include events where processingstrategy is either + // EXECUTE_REMEDIATION or missing (old events created before this feature was added). + // Old events without this field should be treated as EXECUTE_REMEDIATION. pipeline := []map[string]interface{}{ { "$match": map[string]interface{}{ - "healthevent.agent": map[string]interface{}{"$ne": "health-events-analyzer"}, - "healthevent.processingstrategy": map[string]interface{}{"$eq": "EXECUTE_REMEDIATION"}, // Exclude STORE_ONLY by default + "healthevent.agent": map[string]interface{}{"$ne": "health-events-analyzer"}, + "$or": []map[string]interface{}{ + {"healthevent.processingstrategy": "EXECUTE_REMEDIATION"}, + {"healthevent.processingstrategy": map[string]interface{}{"$exists": false}}, + }, }, }, } diff --git a/event-exporter/pkg/transformer/cloudevents.go b/event-exporter/pkg/transformer/cloudevents.go index d9f72ed1c..a00d9052d 100644 --- a/event-exporter/pkg/transformer/cloudevents.go +++ b/event-exporter/pkg/transformer/cloudevents.go @@ -63,6 +63,7 @@ func ToCloudEvent(event *pb.HealthEvent, metadata map[string]string) (*CloudEven "entitiesImpacted": entities, "generatedTimestamp": timestamp, "nodeName": event.NodeName, + "processingStrategy": event.ProcessingStrategy.String(), } if len(event.Metadata) > 0 { diff --git a/event-exporter/pkg/transformer/cloudevents_test.go b/event-exporter/pkg/transformer/cloudevents_test.go index e4590d260..c951dad28 100644 --- a/event-exporter/pkg/transformer/cloudevents_test.go +++ b/event-exporter/pkg/transformer/cloudevents_test.go @@ -66,6 +66,7 @@ func TestToCloudEvent(t *testing.T) { Force: false, Skip: true, }, + ProcessingStrategy: pb.ProcessingStrategy_STORE_ONLY, }, metadata: map[string]string{ "cluster": "prod-cluster-1", @@ -102,6 +103,9 @@ func TestToCloudEvent(t *testing.T) { if healthEvent["recommendedAction"] != "RESTART_VM" { t.Errorf("recommendedAction = %v, want %v", healthEvent["recommendedAction"], "RESTART_VM") } + if healthEvent["processingStrategy"] != "STORE_ONLY" { + t.Errorf("processingStrategy = %v, want STORE_ONLY", healthEvent["processingStrategy"]) + } entities := healthEvent["entitiesImpacted"].([]map[string]any) if len(entities) != 2 { diff --git a/fault-quarantine/pkg/evaluator/rule_evaluator_test.go b/fault-quarantine/pkg/evaluator/rule_evaluator_test.go index 05a91e015..b1beb3e43 100644 --- a/fault-quarantine/pkg/evaluator/rule_evaluator_test.go +++ b/fault-quarantine/pkg/evaluator/rule_evaluator_test.go @@ -260,6 +260,7 @@ func TestRoundTrip(t *testing.T) { "nanos": float64(eventTime.GetNanos()), }, "nodeName": "test-node", + "processingStrategy": float64(0), "quarantineOverrides": nil, "drainOverrides": nil, } diff --git a/fault-quarantine/pkg/initializer/init.go b/fault-quarantine/pkg/initializer/init.go index 05d92ac9e..9b1ef8442 100644 --- a/fault-quarantine/pkg/initializer/init.go +++ b/fault-quarantine/pkg/initializer/init.go @@ -63,7 +63,7 @@ func InitializeAll(ctx context.Context, params InitializationParams) (*Component } builder := client.GetPipelineBuilder() - pipeline := builder.BuildAllHealthEventInsertsPipeline() + pipeline := builder.BuildProcessableHealthEventInsertsPipeline() var tomlCfg config.TomlConfig if err := configmanager.LoadTOMLConfig(params.TomlConfigPath, &tomlCfg); err != nil { diff --git a/health-monitors/csp-health-monitor/pkg/triggerengine/trigger.go b/health-monitors/csp-health-monitor/pkg/triggerengine/trigger.go index dd4caad0e..da3f93192 100644 --- a/health-monitors/csp-health-monitor/pkg/triggerengine/trigger.go +++ b/health-monitors/csp-health-monitor/pkg/triggerengine/trigger.go @@ -359,6 +359,9 @@ func (e *Engine) mapMaintenanceEventToHealthEvent( Metadata: event.Metadata, // Pass along metadata NodeName: event.NodeName, // K8s node name GeneratedTimestamp: timestamppb.New(time.Now()), + // TODO: Remove hardcoded processing strategy and make it configurable via the config file. + // PR: https://github.com/NVIDIA/NVSentinel/pull/641 + ProcessingStrategy: pb.ProcessingStrategy_EXECUTE_REMEDIATION, } return healthEvent, nil diff --git a/health-monitors/csp-health-monitor/pkg/triggerengine/trigger_test.go b/health-monitors/csp-health-monitor/pkg/triggerengine/trigger_test.go index 9b90df94c..812fd0a25 100644 --- a/health-monitors/csp-health-monitor/pkg/triggerengine/trigger_test.go +++ b/health-monitors/csp-health-monitor/pkg/triggerengine/trigger_test.go @@ -240,8 +240,9 @@ func TestMapMaintenanceEventToHealthEvent(t *testing.T) { EntitiesImpacted: []*pb.Entity{ {EntityType: "gce_instance", EntityValue: "instance-123"}, }, - Metadata: map[string]string{"key": "value"}, - NodeName: "node-a", + Metadata: map[string]string{"key": "value"}, + NodeName: "node-a", + ProcessingStrategy: pb.ProcessingStrategy_EXECUTE_REMEDIATION, }, }, { @@ -268,7 +269,8 @@ func TestMapMaintenanceEventToHealthEvent(t *testing.T) { EntitiesImpacted: []*pb.Entity{ {EntityType: "EC2", EntityValue: "i-abcdef"}, }, - NodeName: "node-b", + NodeName: "node-b", + ProcessingStrategy: pb.ProcessingStrategy_EXECUTE_REMEDIATION, }, }, { @@ -331,7 +333,8 @@ func TestMapMaintenanceEventToHealthEvent(t *testing.T) { EntitiesImpacted: []*pb.Entity{ {EntityType: "gce_instance", EntityValue: "instance-789"}, }, - NodeName: "node-e", + NodeName: "node-e", + ProcessingStrategy: pb.ProcessingStrategy_EXECUTE_REMEDIATION, }, }, } diff --git a/health-monitors/gpu-health-monitor/gpu_health_monitor/protos/health_event_pb2.py b/health-monitors/gpu-health-monitor/gpu_health_monitor/protos/health_event_pb2.py index 4877c9d8b..d7637c597 100644 --- a/health-monitors/gpu-health-monitor/gpu_health_monitor/protos/health_event_pb2.py +++ b/health-monitors/gpu-health-monitor/gpu_health_monitor/protos/health_event_pb2.py @@ -21,7 +21,7 @@ DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile( - b'\n\x12health_event.proto\x12\ndatamodels\x1a\x1fgoogle/protobuf/timestamp.proto\x1a\x1bgoogle/protobuf/empty.proto"H\n\x0cHealthEvents\x12\x0f\n\x07version\x18\x01 \x01(\r\x12\'\n\x06\x65vents\x18\x02 \x03(\x0b\x32\x17.datamodels.HealthEvent"1\n\x06\x45ntity\x12\x12\n\nentityType\x18\x01 \x01(\t\x12\x13\n\x0b\x65ntityValue\x18\x02 \x01(\t"\xb1\x04\n\x0bHealthEvent\x12\x0f\n\x07version\x18\x01 \x01(\r\x12\r\n\x05\x61gent\x18\x02 \x01(\t\x12\x16\n\x0e\x63omponentClass\x18\x03 \x01(\t\x12\x11\n\tcheckName\x18\x04 \x01(\t\x12\x0f\n\x07isFatal\x18\x05 \x01(\x08\x12\x11\n\tisHealthy\x18\x06 \x01(\x08\x12\x0f\n\x07message\x18\x07 \x01(\t\x12\x38\n\x11recommendedAction\x18\x08 \x01(\x0e\x32\x1d.datamodels.RecommendedAction\x12\x11\n\terrorCode\x18\t \x03(\t\x12,\n\x10\x65ntitiesImpacted\x18\n \x03(\x0b\x32\x12.datamodels.Entity\x12\x37\n\x08metadata\x18\x0b \x03(\x0b\x32%.datamodels.HealthEvent.MetadataEntry\x12\x36\n\x12generatedTimestamp\x18\x0c \x01(\x0b\x32\x1a.google.protobuf.Timestamp\x12\x10\n\x08nodeName\x18\r \x01(\t\x12;\n\x13quarantineOverrides\x18\x0e \x01(\x0b\x32\x1e.datamodels.BehaviourOverrides\x12\x36\n\x0e\x64rainOverrides\x18\x0f \x01(\x0b\x32\x1e.datamodels.BehaviourOverrides\x1a/\n\rMetadataEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\t:\x02\x38\x01"1\n\x12\x42\x65haviourOverrides\x12\r\n\x05\x66orce\x18\x01 \x01(\x08\x12\x0c\n\x04skip\x18\x02 \x01(\x08*\xa8\x01\n\x11RecommendedAction\x12\x08\n\x04NONE\x10\x00\x12\x13\n\x0f\x43OMPONENT_RESET\x10\x02\x12\x13\n\x0f\x43ONTACT_SUPPORT\x10\x05\x12\x11\n\rRUN_FIELDDIAG\x10\x06\x12\x0e\n\nRESTART_VM\x10\x0f\x12\x0e\n\nRESTART_BM\x10\x18\x12\x0e\n\nREPLACE_VM\x10\x19\x12\x0f\n\x0bRUN_DCGMEUD\x10\x1a\x12\x0b\n\x07UNKNOWN\x10\x63\x32`\n\x11PlatformConnector\x12K\n\x15HealthEventOccurredV1\x12\x18.datamodels.HealthEvents\x1a\x16.google.protobuf.Empty"\x00\x42\x35Z3github.com/nvidia/nvsentinel/data-models/pkg/protosb\x06proto3' + b'\n\x12health_event.proto\x12\ndatamodels\x1a\x1fgoogle/protobuf/timestamp.proto\x1a\x1bgoogle/protobuf/empty.proto"H\n\x0cHealthEvents\x12\x0f\n\x07version\x18\x01 \x01(\r\x12\'\n\x06\x65vents\x18\x02 \x03(\x0b\x32\x17.datamodels.HealthEvent"1\n\x06\x45ntity\x12\x12\n\nentityType\x18\x01 \x01(\t\x12\x13\n\x0b\x65ntityValue\x18\x02 \x01(\t"\xed\x04\n\x0bHealthEvent\x12\x0f\n\x07version\x18\x01 \x01(\r\x12\r\n\x05\x61gent\x18\x02 \x01(\t\x12\x16\n\x0e\x63omponentClass\x18\x03 \x01(\t\x12\x11\n\tcheckName\x18\x04 \x01(\t\x12\x0f\n\x07isFatal\x18\x05 \x01(\x08\x12\x11\n\tisHealthy\x18\x06 \x01(\x08\x12\x0f\n\x07message\x18\x07 \x01(\t\x12\x38\n\x11recommendedAction\x18\x08 \x01(\x0e\x32\x1d.datamodels.RecommendedAction\x12\x11\n\terrorCode\x18\t \x03(\t\x12,\n\x10\x65ntitiesImpacted\x18\n \x03(\x0b\x32\x12.datamodels.Entity\x12\x37\n\x08metadata\x18\x0b \x03(\x0b\x32%.datamodels.HealthEvent.MetadataEntry\x12\x36\n\x12generatedTimestamp\x18\x0c \x01(\x0b\x32\x1a.google.protobuf.Timestamp\x12\x10\n\x08nodeName\x18\r \x01(\t\x12;\n\x13quarantineOverrides\x18\x0e \x01(\x0b\x32\x1e.datamodels.BehaviourOverrides\x12\x36\n\x0e\x64rainOverrides\x18\x0f \x01(\x0b\x32\x1e.datamodels.BehaviourOverrides\x12:\n\x12processingStrategy\x18\x10 \x01(\x0e\x32\x1e.datamodels.ProcessingStrategy\x1a/\n\rMetadataEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\t:\x02\x38\x01"1\n\x12\x42\x65haviourOverrides\x12\r\n\x05\x66orce\x18\x01 \x01(\x08\x12\x0c\n\x04skip\x18\x02 \x01(\x08*N\n\x12ProcessingStrategy\x12\x0f\n\x0bUNSPECIFIED\x10\x00\x12\x17\n\x13\x45XECUTE_REMEDIATION\x10\x01\x12\x0e\n\nSTORE_ONLY\x10\x02*\xa8\x01\n\x11RecommendedAction\x12\x08\n\x04NONE\x10\x00\x12\x13\n\x0f\x43OMPONENT_RESET\x10\x02\x12\x13\n\x0f\x43ONTACT_SUPPORT\x10\x05\x12\x11\n\rRUN_FIELDDIAG\x10\x06\x12\x0e\n\nRESTART_VM\x10\x0f\x12\x0e\n\nRESTART_BM\x10\x18\x12\x0e\n\nREPLACE_VM\x10\x19\x12\x0f\n\x0bRUN_DCGMEUD\x10\x1a\x12\x0b\n\x07UNKNOWN\x10\x63\x32`\n\x11PlatformConnector\x12K\n\x15HealthEventOccurredV1\x12\x18.datamodels.HealthEvents\x1a\x16.google.protobuf.Empty"\x00\x42\x35Z3github.com/nvidia/nvsentinel/data-models/pkg/protosb\x06proto3' ) _globals = globals() @@ -32,18 +32,20 @@ _globals["DESCRIPTOR"]._serialized_options = b"Z3github.com/nvidia/nvsentinel/data-models/pkg/protos" _globals["_HEALTHEVENT_METADATAENTRY"]._loaded_options = None _globals["_HEALTHEVENT_METADATAENTRY"]._serialized_options = b"8\001" - _globals["_RECOMMENDEDACTION"]._serialized_start = 837 - _globals["_RECOMMENDEDACTION"]._serialized_end = 1005 + _globals["_PROCESSINGSTRATEGY"]._serialized_start = 896 + _globals["_PROCESSINGSTRATEGY"]._serialized_end = 974 + _globals["_RECOMMENDEDACTION"]._serialized_start = 977 + _globals["_RECOMMENDEDACTION"]._serialized_end = 1145 _globals["_HEALTHEVENTS"]._serialized_start = 96 _globals["_HEALTHEVENTS"]._serialized_end = 168 _globals["_ENTITY"]._serialized_start = 170 _globals["_ENTITY"]._serialized_end = 219 _globals["_HEALTHEVENT"]._serialized_start = 222 - _globals["_HEALTHEVENT"]._serialized_end = 783 - _globals["_HEALTHEVENT_METADATAENTRY"]._serialized_start = 736 - _globals["_HEALTHEVENT_METADATAENTRY"]._serialized_end = 783 - _globals["_BEHAVIOUROVERRIDES"]._serialized_start = 785 - _globals["_BEHAVIOUROVERRIDES"]._serialized_end = 834 - _globals["_PLATFORMCONNECTOR"]._serialized_start = 1007 - _globals["_PLATFORMCONNECTOR"]._serialized_end = 1103 + _globals["_HEALTHEVENT"]._serialized_end = 843 + _globals["_HEALTHEVENT_METADATAENTRY"]._serialized_start = 796 + _globals["_HEALTHEVENT_METADATAENTRY"]._serialized_end = 843 + _globals["_BEHAVIOUROVERRIDES"]._serialized_start = 845 + _globals["_BEHAVIOUROVERRIDES"]._serialized_end = 894 + _globals["_PLATFORMCONNECTOR"]._serialized_start = 1147 + _globals["_PLATFORMCONNECTOR"]._serialized_end = 1243 # @@protoc_insertion_point(module_scope) diff --git a/health-monitors/gpu-health-monitor/gpu_health_monitor/protos/health_event_pb2.pyi b/health-monitors/gpu-health-monitor/gpu_health_monitor/protos/health_event_pb2.pyi index 809215f42..beb88c0c8 100644 --- a/health-monitors/gpu-health-monitor/gpu_health_monitor/protos/health_event_pb2.pyi +++ b/health-monitors/gpu-health-monitor/gpu_health_monitor/protos/health_event_pb2.pyi @@ -11,6 +11,12 @@ from typing import ClassVar as _ClassVar, Optional as _Optional, Union as _Union DESCRIPTOR: _descriptor.FileDescriptor +class ProcessingStrategy(int, metaclass=_enum_type_wrapper.EnumTypeWrapper): + __slots__ = () + UNSPECIFIED: _ClassVar[ProcessingStrategy] + EXECUTE_REMEDIATION: _ClassVar[ProcessingStrategy] + STORE_ONLY: _ClassVar[ProcessingStrategy] + class RecommendedAction(int, metaclass=_enum_type_wrapper.EnumTypeWrapper): __slots__ = () NONE: _ClassVar[RecommendedAction] @@ -23,6 +29,9 @@ class RecommendedAction(int, metaclass=_enum_type_wrapper.EnumTypeWrapper): RUN_DCGMEUD: _ClassVar[RecommendedAction] UNKNOWN: _ClassVar[RecommendedAction] +UNSPECIFIED: ProcessingStrategy +EXECUTE_REMEDIATION: ProcessingStrategy +STORE_ONLY: ProcessingStrategy NONE: RecommendedAction COMPONENT_RESET: RecommendedAction CONTACT_SUPPORT: RecommendedAction @@ -68,6 +77,7 @@ class HealthEvent(_message.Message): "nodeName", "quarantineOverrides", "drainOverrides", + "processingStrategy", ) class MetadataEntry(_message.Message): @@ -93,6 +103,7 @@ class HealthEvent(_message.Message): NODENAME_FIELD_NUMBER: _ClassVar[int] QUARANTINEOVERRIDES_FIELD_NUMBER: _ClassVar[int] DRAINOVERRIDES_FIELD_NUMBER: _ClassVar[int] + PROCESSINGSTRATEGY_FIELD_NUMBER: _ClassVar[int] version: int agent: str componentClass: str @@ -108,6 +119,7 @@ class HealthEvent(_message.Message): nodeName: str quarantineOverrides: BehaviourOverrides drainOverrides: BehaviourOverrides + processingStrategy: ProcessingStrategy def __init__( self, version: _Optional[int] = ..., @@ -125,6 +137,7 @@ class HealthEvent(_message.Message): nodeName: _Optional[str] = ..., quarantineOverrides: _Optional[_Union[BehaviourOverrides, _Mapping]] = ..., drainOverrides: _Optional[_Union[BehaviourOverrides, _Mapping]] = ..., + processingStrategy: _Optional[_Union[ProcessingStrategy, str]] = ..., ) -> None: ... class BehaviourOverrides(_message.Message): diff --git a/platform-connectors/pkg/connectors/kubernetes/k8s_platform_connector_test.go b/platform-connectors/pkg/connectors/kubernetes/k8s_platform_connector_test.go index 491bc7164..43d8216a5 100644 --- a/platform-connectors/pkg/connectors/kubernetes/k8s_platform_connector_test.go +++ b/platform-connectors/pkg/connectors/kubernetes/k8s_platform_connector_test.go @@ -1388,6 +1388,248 @@ func TestUpdateNodeConditions_ErrorHandling(t *testing.T) { }) } } +func TestProcessHealthEvents_StoreOnlyStrategy(t *testing.T) { + testCases := []struct { + name string + healthEvents []*protos.HealthEvent + expectNodeConditions bool + expectKubernetesEvents bool + description string + expectedConditionType string + expectedEventType string + }{ + { + name: "STORE_ONLY event should not create node condition", + healthEvents: []*protos.HealthEvent{ + { + CheckName: "GpuXidError", + IsHealthy: false, + EntitiesImpacted: []*protos.Entity{{EntityType: "GPU", EntityValue: "0"}}, + ErrorCode: []string{"79"}, + IsFatal: true, + GeneratedTimestamp: timestamppb.New(time.Now()), + ComponentClass: "GPU", + RecommendedAction: protos.RecommendedAction_CONTACT_SUPPORT, + Message: "XID 79: GPU has fallen off the bus", + NodeName: "store-only-test-node", + ProcessingStrategy: protos.ProcessingStrategy_STORE_ONLY, + }, + }, + expectNodeConditions: false, + expectKubernetesEvents: false, + description: "STORE_ONLY fatal event should not create node condition", + }, + { + name: "STORE_ONLY non-fatal event should not create Kubernetes event", + healthEvents: []*protos.HealthEvent{ + { + CheckName: "GpuPcieWatch", + IsHealthy: false, + EntitiesImpacted: []*protos.Entity{{EntityType: "GPU", EntityValue: "0"}}, + ErrorCode: []string{"DCGM_FR_PCI_REPLAY_RATE"}, + IsFatal: false, // Non-fatal creates K8s events, not conditions + GeneratedTimestamp: timestamppb.New(time.Now()), + ComponentClass: "GPU", + RecommendedAction: protos.RecommendedAction_NONE, + Message: "PCI replay rate warning on GPU 0", + NodeName: "store-only-test-node", + ProcessingStrategy: protos.ProcessingStrategy_STORE_ONLY, + }, + }, + expectNodeConditions: false, + expectKubernetesEvents: false, + description: "STORE_ONLY non-fatal event should not create Kubernetes event", + }, + { + name: "EXECUTE_REMEDIATION event should create node condition", + healthEvents: []*protos.HealthEvent{ + { + CheckName: "GpuXidError", + IsHealthy: false, + EntitiesImpacted: []*protos.Entity{{EntityType: "GPU", EntityValue: "0"}}, + ErrorCode: []string{"79"}, + IsFatal: true, + GeneratedTimestamp: timestamppb.New(time.Now()), + ComponentClass: "GPU", + RecommendedAction: protos.RecommendedAction_CONTACT_SUPPORT, + Message: "XID 79: GPU has fallen off the bus", + NodeName: "store-only-test-node", + ProcessingStrategy: protos.ProcessingStrategy_EXECUTE_REMEDIATION, + }, + }, + expectNodeConditions: true, + expectKubernetesEvents: false, + description: "EXECUTE_REMEDIATION fatal event should create node condition", + expectedConditionType: "GpuXidError", + expectedEventType: "", + }, + { + name: "Mixed strategies - only EXECUTE_REMEDIATION should be processed", + healthEvents: []*protos.HealthEvent{ + { + CheckName: "GpuXidError", + IsHealthy: false, + EntitiesImpacted: []*protos.Entity{{EntityType: "GPU", EntityValue: "0"}}, + ErrorCode: []string{"79"}, + IsFatal: true, + GeneratedTimestamp: timestamppb.New(time.Now()), + ComponentClass: "GPU", + RecommendedAction: protos.RecommendedAction_CONTACT_SUPPORT, + Message: "XID 79 on GPU 0", + NodeName: "store-only-test-node", + ProcessingStrategy: protos.ProcessingStrategy_STORE_ONLY, + }, + { + CheckName: "GpuThermalWatch", + IsHealthy: false, + EntitiesImpacted: []*protos.Entity{{EntityType: "GPU", EntityValue: "1"}}, + ErrorCode: []string{"DCGM_FR_CLOCK_THROTTLE_THERMAL"}, + IsFatal: true, + GeneratedTimestamp: timestamppb.New(time.Now()), + ComponentClass: "GPU", + RecommendedAction: protos.RecommendedAction_CONTACT_SUPPORT, + Message: "Thermal throttle on GPU 1", + NodeName: "store-only-test-node", + ProcessingStrategy: protos.ProcessingStrategy_EXECUTE_REMEDIATION, + }, + }, + expectNodeConditions: true, + expectKubernetesEvents: false, + description: "Only EXECUTE_REMEDIATION events should create conditions", + expectedConditionType: "GpuThermalWatch", + expectedEventType: "", + }, + { + name: "STORE_ONLY non fatal event should not create Kubernetes event", + healthEvents: []*protos.HealthEvent{ + { + CheckName: "GpuPowerWatch", + IsHealthy: false, + EntitiesImpacted: []*protos.Entity{{EntityType: "GPU", EntityValue: "0"}}, + ErrorCode: []string{""}, + IsFatal: false, + ProcessingStrategy: protos.ProcessingStrategy_STORE_ONLY, + GeneratedTimestamp: timestamppb.New(time.Now()), + ComponentClass: "GPU", + RecommendedAction: protos.RecommendedAction_CONTACT_SUPPORT, + Message: "Power warning on GPU 0", + NodeName: "store-only-test-node", + }, + }, + expectNodeConditions: false, + expectKubernetesEvents: false, + description: "STORE_ONLY non fatal event should not create Kubernetes event", + expectedConditionType: "", + expectedEventType: "", + }, + { + name: "EXECUTE_REMEDIATION non fatal event should create Kubernetes event", + healthEvents: []*protos.HealthEvent{ + { + CheckName: "GpuPowerWatch", + IsHealthy: false, + EntitiesImpacted: []*protos.Entity{{EntityType: "GPU", EntityValue: "0"}}, + ErrorCode: []string{""}, + IsFatal: false, + ProcessingStrategy: protos.ProcessingStrategy_EXECUTE_REMEDIATION, + GeneratedTimestamp: timestamppb.New(time.Now()), + ComponentClass: "GPU", + RecommendedAction: protos.RecommendedAction_CONTACT_SUPPORT, + Message: "Power warning on GPU 0", + NodeName: "store-only-test-node", + }, + }, + expectNodeConditions: false, + expectKubernetesEvents: true, + description: "EXECUTE_REMEDIATION non fatal event should create Kubernetes event", + expectedConditionType: "", + expectedEventType: "GpuPowerWatch", + }, + } + + for i, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + localCtx := context.Background() + localClientSet := fake.NewSimpleClientset() + stopCh := make(chan struct{}) + defer close(stopCh) + + ringBuffer := ringbuffer.NewRingBuffer(fmt.Sprintf("storeOnlyTestBuffer-%d", i), localCtx) + maxNodeConditionMessageLength := int64(1024) + connector := NewK8sConnector(localClientSet, ringBuffer, stopCh, localCtx, maxNodeConditionMessageLength) + + nodeName := "store-only-test-node" + fakeNode := &corev1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: nodeName, + }, + Status: corev1.NodeStatus{ + Conditions: []corev1.NodeCondition{ + { + Type: corev1.NodeReady, + Status: corev1.ConditionTrue, + LastHeartbeatTime: metav1.Now(), + LastTransitionTime: metav1.Now(), + Reason: "KubeletReady", + Message: "kubelet is posting ready status", + }, + }, + }, + } + _, err := localClientSet.CoreV1().Nodes().Create(localCtx, fakeNode, metav1.CreateOptions{}) + require.NoError(t, err, "Failed to create test node") + + healthEvents := &protos.HealthEvents{ + Version: 1, + Events: tc.healthEvents, + } + err = connector.processHealthEvents(localCtx, healthEvents) + require.NoError(t, err, "processHealthEvents should not return error") + + node, err := localClientSet.CoreV1().Nodes().Get(localCtx, nodeName, metav1.GetOptions{}) + require.NoError(t, err, "Failed to get test node") + + // Count NVSentinel-related conditions (excluding standard K8s conditions like NodeReady) + var nvsentinelConditions []corev1.NodeCondition + for _, condition := range node.Status.Conditions { + condType := string(condition.Type) + if condType != string(corev1.NodeReady) && + condType != string(corev1.NodeMemoryPressure) && + condType != string(corev1.NodeDiskPressure) && + condType != string(corev1.NodePIDPressure) && + condType != string(corev1.NodeNetworkUnavailable) { + nvsentinelConditions = append(nvsentinelConditions, condition) + t.Logf("Found NVSentinel condition: %s", condType) + } + } + + if tc.expectNodeConditions { + assert.Equal(t, tc.expectedConditionType, string(nvsentinelConditions[0].Type), + "Expected condition type %s, got %s", tc.expectedConditionType, nvsentinelConditions[0].Type) + } else { + assert.Equal(t, 0, len(nvsentinelConditions), + "Expected no NVSentinel node conditions for STORE_ONLY events, got %d", len(nvsentinelConditions)) + } + + // Verify Kubernetes events + events, err := localClientSet.CoreV1().Events("").List(localCtx, metav1.ListOptions{ + FieldSelector: fmt.Sprintf("involvedObject.kind=Node,involvedObject.name=%s", nodeName), + }) + require.NoError(t, err, "Failed to list events") + + if tc.expectKubernetesEvents { + assert.Equal(t, tc.expectedEventType, events.Items[0].Type, + "Expected event type %s, got %s", tc.expectedEventType, events.Items[0].Type) + } else { + assert.Equal(t, 0, len(events.Items), + "Expected no Kubernetes events for STORE_ONLY events, got %d", len(events.Items)) + } + + t.Logf("Test passed: %s", tc.description) + }) + } +} + func TestTruncateConditionMessage(t *testing.T) { // Generate test messages that would exceed 1KB when combined generateLongMessages := func(count int) []string { diff --git a/platform-connectors/pkg/connectors/kubernetes/process_node_events.go b/platform-connectors/pkg/connectors/kubernetes/process_node_events.go index 34f97f112..f0308a442 100644 --- a/platform-connectors/pkg/connectors/kubernetes/process_node_events.go +++ b/platform-connectors/pkg/connectors/kubernetes/process_node_events.go @@ -324,31 +324,74 @@ func (r *K8sConnector) constructHealthEventMessage(healthEvent *protos.HealthEve return message } -func (r *K8sConnector) processHealthEvents(ctx context.Context, healthEvents *protos.HealthEvents) error { - var nodeConditions []corev1.NodeCondition +// filterProcessableEvents filters out STORE_ONLY events that should not create node conditions or K8s events. +func filterProcessableEvents(healthEvents *protos.HealthEvents) []*protos.HealthEvent { + var processableEvents []*protos.HealthEvent for _, healthEvent := range healthEvents.Events { - conditionType := corev1.NodeConditionType(string(healthEvent.CheckName)) - message := r.fetchHealthEventMessage(healthEvent) + if healthEvent.ProcessingStrategy == protos.ProcessingStrategy_STORE_ONLY { + slog.Info("Skipping STORE_ONLY health event (no node conditions / node events)", + "node", healthEvent.NodeName, + "checkName", healthEvent.CheckName, + "agent", healthEvent.Agent) + + continue + } + + processableEvents = append(processableEvents, healthEvent) + } + + return processableEvents +} + +// createK8sEvent creates a Kubernetes event from a health event. +func (r *K8sConnector) createK8sEvent(healthEvent *protos.HealthEvent) *corev1.Event { + return &corev1.Event{ + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("%s.%x", healthEvent.NodeName, metav1.Now().UnixNano()), + Namespace: DefaultNamespace, + }, + InvolvedObject: corev1.ObjectReference{ + Kind: "Node", + Name: healthEvent.NodeName, + UID: types.UID(healthEvent.NodeName), + }, + Reason: r.updateHealthEventReason(healthEvent.CheckName, healthEvent.IsHealthy), + ReportingController: healthEvent.Agent, + ReportingInstance: healthEvent.NodeName, + Message: r.fetchHealthEventMessage(healthEvent), + Count: 1, + Source: corev1.EventSource{ + Component: healthEvent.Agent, + Host: healthEvent.NodeName, + }, + FirstTimestamp: metav1.NewTime(healthEvent.GeneratedTimestamp.AsTime()), + LastTimestamp: metav1.NewTime(healthEvent.GeneratedTimestamp.AsTime()), + Type: healthEvent.CheckName, + } +} + +func (r *K8sConnector) processHealthEvents(ctx context.Context, healthEvents *protos.HealthEvents) error { + processableEvents := filterProcessableEvents(healthEvents) + + var nodeConditions []corev1.NodeCondition + for _, healthEvent := range processableEvents { if healthEvent.IsHealthy || healthEvent.IsFatal { - newCondition := corev1.NodeCondition{ - Type: conditionType, + nodeConditions = append(nodeConditions, corev1.NodeCondition{ + Type: corev1.NodeConditionType(healthEvent.CheckName), LastHeartbeatTime: metav1.NewTime(healthEvent.GeneratedTimestamp.AsTime()), LastTransitionTime: metav1.NewTime(healthEvent.GeneratedTimestamp.AsTime()), - Message: message, - } - - nodeConditions = append(nodeConditions, newCondition) + Message: r.fetchHealthEventMessage(healthEvent), + }) } } if len(nodeConditions) > 0 { start := time.Now() - err := r.updateNodeConditions(ctx, healthEvents.Events) + err := r.updateNodeConditions(ctx, processableEvents) - duration := float64(time.Since(start).Milliseconds()) - nodeConditionUpdateDuration.Observe(duration) + nodeConditionUpdateDuration.Observe(float64(time.Since(start).Milliseconds())) if err != nil { nodeConditionUpdateCounter.WithLabelValues(StatusFailed).Inc() @@ -358,36 +401,12 @@ func (r *K8sConnector) processHealthEvents(ctx context.Context, healthEvents *pr nodeConditionUpdateCounter.WithLabelValues(StatusSuccess).Inc() } - for _, healthEvent := range healthEvents.Events { + for _, healthEvent := range processableEvents { if !healthEvent.IsHealthy && !healthEvent.IsFatal { - event := &corev1.Event{ - ObjectMeta: metav1.ObjectMeta{ - Name: fmt.Sprintf("%s.%x", healthEvent.NodeName, metav1.Now().UnixNano()), - Namespace: DefaultNamespace, - }, - InvolvedObject: corev1.ObjectReference{ - Kind: "Node", - Name: healthEvent.NodeName, - UID: types.UID(healthEvent.NodeName), - }, - Reason: r.updateHealthEventReason(healthEvent.CheckName, healthEvent.IsHealthy), - ReportingController: healthEvent.Agent, - ReportingInstance: healthEvent.NodeName, - Message: r.fetchHealthEventMessage(healthEvent), - Count: 1, - Source: corev1.EventSource{ - Component: healthEvent.Agent, - Host: healthEvent.NodeName, - }, - FirstTimestamp: metav1.NewTime(healthEvent.GeneratedTimestamp.AsTime()), - LastTimestamp: metav1.NewTime(healthEvent.GeneratedTimestamp.AsTime()), - Type: healthEvent.CheckName, - } start := time.Now() + err := r.writeNodeEvent(ctx, r.createK8sEvent(healthEvent), healthEvent.NodeName) - err := r.writeNodeEvent(ctx, event, healthEvent.NodeName) - duration := float64(time.Since(start).Milliseconds()) - nodeEventUpdateCreateDuration.Observe(duration) + nodeEventUpdateCreateDuration.Observe(float64(time.Since(start).Milliseconds())) if err != nil { return fmt.Errorf("failed to write node event for %s: %w", healthEvent.NodeName, err) diff --git a/platform-connectors/pkg/server/platform_connector_server.go b/platform-connectors/pkg/server/platform_connector_server.go index 52a7c5844..efc1127d8 100644 --- a/platform-connectors/pkg/server/platform_connector_server.go +++ b/platform-connectors/pkg/server/platform_connector_server.go @@ -54,6 +54,13 @@ func (p *PlatformConnectorServer) HealthEventOccurredV1(ctx context.Context, healthEventsReceived.Add(float64(len(he.Events))) + // Custom monitors that don't set processingStrategy will default to EXECUTE_REMEDIATION. + for _, event := range he.Events { + if event.ProcessingStrategy == pb.ProcessingStrategy_UNSPECIFIED { + event.ProcessingStrategy = pb.ProcessingStrategy_EXECUTE_REMEDIATION + } + } + if p.Pipeline != nil { for i := range he.Events { p.Pipeline.Process(ctx, he.Events[i]) diff --git a/platform-connectors/pkg/server/platform_connector_server_test.go b/platform-connectors/pkg/server/platform_connector_server_test.go new file mode 100644 index 000000000..8802fc7b8 --- /dev/null +++ b/platform-connectors/pkg/server/platform_connector_server_test.go @@ -0,0 +1,68 @@ +// Copyright (c) 2025, NVIDIA CORPORATION. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package server + +import ( + "context" + "testing" + + pb "github.com/nvidia/nvsentinel/data-models/pkg/protos" + "github.com/stretchr/testify/assert" +) + +func TestHealthEventOccurredV1_ProcessingStrategyNormalization(t *testing.T) { + tests := []struct { + name string + inputStrategy pb.ProcessingStrategy + expectedStrategy pb.ProcessingStrategy + }{ + { + name: "UNSPECIFIED is normalized to EXECUTE_REMEDIATION", + inputStrategy: pb.ProcessingStrategy_UNSPECIFIED, + expectedStrategy: pb.ProcessingStrategy_EXECUTE_REMEDIATION, + }, + { + name: "EXECUTE_REMEDIATION remains unchanged", + inputStrategy: pb.ProcessingStrategy_EXECUTE_REMEDIATION, + expectedStrategy: pb.ProcessingStrategy_EXECUTE_REMEDIATION, + }, + { + name: "STORE_ONLY remains unchanged", + inputStrategy: pb.ProcessingStrategy_STORE_ONLY, + expectedStrategy: pb.ProcessingStrategy_STORE_ONLY, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + server := &PlatformConnectorServer{} + + healthEvents := &pb.HealthEvents{ + Events: []*pb.HealthEvent{ + { + NodeName: "test-node", + CheckName: "test-check", + ProcessingStrategy: tt.inputStrategy, + }, + }, + } + + _, err := server.HealthEventOccurredV1(context.Background(), healthEvents) + + assert.NoError(t, err) + assert.Equal(t, tt.expectedStrategy, healthEvents.Events[0].ProcessingStrategy) + }) + } +} diff --git a/store-client/pkg/client/mongodb_pipeline_builder.go b/store-client/pkg/client/mongodb_pipeline_builder.go index 48299822e..7b312f05e 100644 --- a/store-client/pkg/client/mongodb_pipeline_builder.go +++ b/store-client/pkg/client/mongodb_pipeline_builder.go @@ -16,6 +16,7 @@ package client import ( "github.com/nvidia/nvsentinel/data-models/pkg/model" + "github.com/nvidia/nvsentinel/data-models/pkg/protos" "github.com/nvidia/nvsentinel/store-client/pkg/datastore" ) @@ -83,6 +84,34 @@ func (b *MongoDBPipelineBuilder) BuildAllHealthEventInsertsPipeline() datastore. ) } +// BuildProcessableHealthEventInsertsPipeline creates a pipeline that watches for +// all EXECUTE_REMEDIATION health event inserts. +// +// Backward Compatibility: This pipeline uses $or to match events where processingstrategy is either: +// - EXECUTE_REMEDIATION (new events from NVSentinel health monitors) +// - Missing/null (old events created before upgrade, custom monitors, or circuit breaker backlog) +func (b *MongoDBPipelineBuilder) BuildProcessableHealthEventInsertsPipeline() datastore.Pipeline { + return datastore.ToPipeline( + datastore.D( + datastore.E("$match", datastore.D( + datastore.E("operationType", datastore.D( + datastore.E("$in", datastore.A("insert")), + )), + datastore.E("$or", datastore.A( + datastore.D(datastore.E( + "fullDocument.healthevent.processingstrategy", + int32(protos.ProcessingStrategy_EXECUTE_REMEDIATION), + )), + datastore.D(datastore.E( + "fullDocument.healthevent.processingstrategy", + datastore.D(datastore.E("$exists", false)), + )), + )), + )), + ), + ) +} + // BuildNonFatalUnhealthyInsertsPipeline creates a pipeline for non-fatal, unhealthy event inserts // This is used by health-events-analyzer to detect warning-level health events for pattern analysis. func (b *MongoDBPipelineBuilder) BuildNonFatalUnhealthyInsertsPipeline() datastore.Pipeline { @@ -97,6 +126,35 @@ func (b *MongoDBPipelineBuilder) BuildNonFatalUnhealthyInsertsPipeline() datasto ) } +// BuildProcessableNonFatalUnhealthyInsertsPipeline creates a pipeline for non-fatal, unhealthy event inserts +// with processingStrategy=EXECUTE_REMEDIATION. This is used by health-events-analyzer for pattern analysis. +// +// Backward Compatibility: This pipeline uses $or to match events where processingstrategy is either: +// - EXECUTE_REMEDIATION (new events from NVSentinel health monitors) +// - Missing/null (old events created before upgrade, custom monitors) +func (b *MongoDBPipelineBuilder) BuildProcessableNonFatalUnhealthyInsertsPipeline() datastore.Pipeline { + return datastore.ToPipeline( + datastore.D( + datastore.E("$match", datastore.D( + datastore.E("operationType", "insert"), + datastore.E("fullDocument.healthevent.agent", datastore.D(datastore.E("$ne", "health-events-analyzer"))), + datastore.E("fullDocument.healthevent.ishealthy", false), + // Exclude STORE_ONLY events, but include EXECUTE_REMEDIATION and missing field (backward compat) + datastore.E("$or", datastore.A( + datastore.D(datastore.E( + "fullDocument.healthevent.processingstrategy", + int32(protos.ProcessingStrategy_EXECUTE_REMEDIATION), + )), + datastore.D(datastore.E( + "fullDocument.healthevent.processingstrategy", + datastore.D(datastore.E("$exists", false)), + )), + )), + )), + ), + ) +} + // BuildQuarantinedAndDrainedNodesPipeline creates a pipeline for remediation-ready nodes // This watches for insert/update events where both quarantine and eviction status indicate the // node is ready for reboot, or where the node has been unquarantined and needs cleanup, or where diff --git a/store-client/pkg/client/pipeline_builder.go b/store-client/pkg/client/pipeline_builder.go index b622da2c9..134a4250c 100644 --- a/store-client/pkg/client/pipeline_builder.go +++ b/store-client/pkg/client/pipeline_builder.go @@ -32,10 +32,19 @@ type PipelineBuilder interface { // Used by: fault-quarantine to detect new health events BuildAllHealthEventInsertsPipeline() datastore.Pipeline + // BuildProcessableHealthEventInsertsPipeline creates a pipeline that watches for "processable" health event inserts, + // i.e. events with processingStrategy=EXECUTE_REMEDIATION. + // Used by: fault-quarantine to ignore observability-only events (processingStrategy=STORE_ONLY) + BuildProcessableHealthEventInsertsPipeline() datastore.Pipeline + // BuildNonFatalUnhealthyInsertsPipeline creates a pipeline for non-fatal unhealthy events // Used by: health-events-analyzer for pattern analysis BuildNonFatalUnhealthyInsertsPipeline() datastore.Pipeline + // BuildProcessableNonFatalUnhealthyInsertsPipeline creates a pipeline for non-fatal, unhealthy event inserts + // with processingStrategy=EXECUTE_REMEDIATION. This is used by health-events-analyzer for pattern analysis. + BuildProcessableNonFatalUnhealthyInsertsPipeline() datastore.Pipeline + // BuildQuarantinedAndDrainedNodesPipeline creates a pipeline for remediation-ready nodes // Used by: fault-remediation to detect when nodes are ready for reboot BuildQuarantinedAndDrainedNodesPipeline() datastore.Pipeline diff --git a/store-client/pkg/client/pipeline_builder_test.go b/store-client/pkg/client/pipeline_builder_test.go index 683c11b16..0e43d554f 100644 --- a/store-client/pkg/client/pipeline_builder_test.go +++ b/store-client/pkg/client/pipeline_builder_test.go @@ -66,6 +66,25 @@ func TestAllHealthEventInsertsPipeline(t *testing.T) { } } +func TestProcessableHealthEventInsertsPipeline(t *testing.T) { + testCases := []struct { + name string + builder PipelineBuilder + }{ + {"MongoDB", NewMongoDBPipelineBuilder()}, + {"PostgreSQL", NewPostgreSQLPipelineBuilder()}, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + pipeline := tc.builder.BuildProcessableHealthEventInsertsPipeline() + require.NotNil(t, pipeline) + require.NotEmpty(t, pipeline) + assert.Len(t, pipeline, 1, "Pipeline should have 1 stage") + }) + } +} + func TestNonFatalUnhealthyInsertsPipeline(t *testing.T) { testCases := []struct { name string diff --git a/store-client/pkg/client/postgresql_pipeline_builder.go b/store-client/pkg/client/postgresql_pipeline_builder.go index 4e4cafb7b..9af9b33b7 100644 --- a/store-client/pkg/client/postgresql_pipeline_builder.go +++ b/store-client/pkg/client/postgresql_pipeline_builder.go @@ -16,6 +16,7 @@ package client import ( "github.com/nvidia/nvsentinel/data-models/pkg/model" + "github.com/nvidia/nvsentinel/data-models/pkg/protos" "github.com/nvidia/nvsentinel/store-client/pkg/datastore" ) @@ -115,6 +116,35 @@ func (b *PostgreSQLPipelineBuilder) BuildAllHealthEventInsertsPipeline() datasto ) } +// BuildProcessableHealthEventInsertsPipeline creates a pipeline that watches for health event inserts +// with processingStrategy=EXECUTE_REMEDIATION +// +// Backward Compatibility: This pipeline uses $or to match events where processingstrategy is either: +// - EXECUTE_REMEDIATION (new events from NVSentinel health monitors) +// - Missing/null (old events created before upgrade, custom monitors, or circuit breaker backlog) +func (b *PostgreSQLPipelineBuilder) BuildProcessableHealthEventInsertsPipeline() datastore.Pipeline { + return datastore.ToPipeline( + datastore.D( + datastore.E("$match", datastore.D( + datastore.E("operationType", datastore.D( + datastore.E("$in", datastore.A("insert")), + )), + // Exclude STORE_ONLY events, but include EXECUTE_REMEDIATION and missing field (backward compat) + datastore.E("$or", datastore.A( + datastore.D(datastore.E( + "fullDocument.healthevent.processingstrategy", + int32(protos.ProcessingStrategy_EXECUTE_REMEDIATION), + )), + datastore.D(datastore.E( + "fullDocument.healthevent.processingstrategy", + datastore.D(datastore.E("$exists", false)), + )), + )), + )), + ), + ) +} + // BuildNonFatalUnhealthyInsertsPipeline creates a pipeline for non-fatal, unhealthy event inserts // For PostgreSQL, we need to handle both INSERT and UPDATE operations because platform-connectors // may insert a record and then immediately update it, causing the trigger to fire UPDATE events. @@ -130,6 +160,37 @@ func (b *PostgreSQLPipelineBuilder) BuildNonFatalUnhealthyInsertsPipeline() data ) } +// BuildProcessableNonFatalUnhealthyInsertsPipeline creates a pipeline for non-fatal, unhealthy event inserts +// with processingStrategy=EXECUTE_REMEDIATION. This is used by health-events-analyzer for pattern analysis. +// For PostgreSQL, we need to handle both INSERT and UPDATE operations because platform-connectors +// may insert a record and then immediately update it, causing the trigger to fire UPDATE events. +// +// Backward Compatibility: This pipeline uses $or to match events where processingstrategy is either: +// - EXECUTE_REMEDIATION (new events from NVSentinel health monitors) +// - Missing/null (old events created before upgrade, custom monitors) +func (b *PostgreSQLPipelineBuilder) BuildProcessableNonFatalUnhealthyInsertsPipeline() datastore.Pipeline { + return datastore.ToPipeline( + datastore.D( + datastore.E("$match", datastore.D( + datastore.E("operationType", datastore.D(datastore.E("$in", datastore.A("insert", "update")))), + datastore.E("fullDocument.healthevent.agent", datastore.D(datastore.E("$ne", "health-events-analyzer"))), + datastore.E("fullDocument.healthevent.ishealthy", false), + // Exclude STORE_ONLY events, but include EXECUTE_REMEDIATION and missing field (backward compat) + datastore.E("$or", datastore.A( + datastore.D(datastore.E( + "fullDocument.healthevent.processingstrategy", + int32(protos.ProcessingStrategy_EXECUTE_REMEDIATION), + )), + datastore.D(datastore.E( + "fullDocument.healthevent.processingstrategy", + datastore.D(datastore.E("$exists", false)), + )), + )), + )), + ), + ) +} + // BuildQuarantinedAndDrainedNodesPipeline creates a pipeline for remediation-ready nodes // Similar to BuildNodeQuarantineStatusPipeline, this supports both UPDATE and INSERT operations // to be defensive against PostgreSQL trigger edge cases. diff --git a/store-client/pkg/datastore/providers/postgresql/sql_filter_builder.go b/store-client/pkg/datastore/providers/postgresql/sql_filter_builder.go index bca3007de..ce94897fb 100644 --- a/store-client/pkg/datastore/providers/postgresql/sql_filter_builder.go +++ b/store-client/pkg/datastore/providers/postgresql/sql_filter_builder.go @@ -401,6 +401,7 @@ var fieldNameMapping = map[string]string{ "faultremediated": "faultremediated", "status": "status", "message": "message", + "processingstrategy": "processingStrategy", } // isOperationTypeField checks if the field is the operationType field. diff --git a/tests/fault_quarantine_test.go b/tests/fault_quarantine_test.go index 3e1d7398e..cae80dffc 100644 --- a/tests/fault_quarantine_test.go +++ b/tests/fault_quarantine_test.go @@ -23,9 +23,11 @@ import ( "tests/helpers" + "github.com/nvidia/nvsentinel/data-models/pkg/protos" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" v1 "k8s.io/api/core/v1" + "sigs.k8s.io/e2e-framework/pkg/envconf" "sigs.k8s.io/e2e-framework/pkg/features" ) @@ -228,3 +230,110 @@ func TestPreCordonedNodeHandling(t *testing.T) { testEnv.Test(t, feature.Feature()) } + +func TestFaultQuarantineWithProcessingStrategy(t *testing.T) { + feature := features.New("TestFaultQuarantineWithProcessingStrategy"). + WithLabel("suite", "fault-quarantine-with-processing-strategy") + + var testCtx *helpers.QuarantineTestContext + + feature.Setup(func(ctx context.Context, t *testing.T, c *envconf.Config) context.Context { + var newCtx context.Context + newCtx, testCtx = helpers.SetupQuarantineTest(ctx, t, c, "") + return newCtx + }) + + feature.Assess("Check that node is not quarantined for STORE_ONLY events", func(ctx context.Context, t *testing.T, c *envconf.Config) context.Context { + client, err := c.NewClient() + require.NoError(t, err) + + event := helpers.NewHealthEvent(testCtx.NodeName). + WithErrorCode("79"). + WithMessage("XID error occurred"). + WithAgent(helpers.SYSLOG_HEALTH_MONITOR_AGENT). + WithCheckName("SysLogsXIDError"). + WithProcessingStrategy(int(protos.ProcessingStrategy_STORE_ONLY)) + helpers.SendHealthEvent(ctx, t, event) + + t.Logf("Node %s should not have condition SysLogsXIDError", testCtx.NodeName) + helpers.EnsureNodeConditionNotPresent(ctx, t, client, testCtx.NodeName, "SysLogsXIDError") + + helpers.AssertQuarantineState(ctx, t, client, testCtx.NodeName, helpers.QuarantineAssertion{ + ExpectCordoned: false, + ExpectAnnotation: false, + }) + + event = helpers.NewHealthEvent(testCtx.NodeName). + WithErrorCode("DCGM_FR_CLOCK_THROTTLE_POWER"). + WithCheckName("GpuPowerWatch"). + WithFatal(false). + WithProcessingStrategy(int(protos.ProcessingStrategy_STORE_ONLY)) + helpers.SendHealthEvent(ctx, t, event) + + t.Logf("Node %s should not have GpuPowerWatch node event", testCtx.NodeName) + helpers.EnsureNodeEventNotPresent(ctx, t, client, testCtx.NodeName, "GpuPowerWatch", "GpuPowerWatchIsNotHealthy") + + helpers.AssertQuarantineState(ctx, t, client, testCtx.NodeName, helpers.QuarantineAssertion{ + ExpectCordoned: false, + ExpectAnnotation: false, + }) + + return ctx + }) + + feature.Assess("Check that node is quarantined for EXECUTE_REMEDIATION events", func(ctx context.Context, t *testing.T, c *envconf.Config) context.Context { + client, err := c.NewClient() + require.NoError(t, err) + + event := helpers.NewHealthEvent(testCtx.NodeName). + WithErrorCode("79"). + WithMessage("XID error occurred"). + WithAgent(helpers.SYSLOG_HEALTH_MONITOR_AGENT). + WithCheckName("SysLogsXIDError"). + WithProcessingStrategy(int(protos.ProcessingStrategy_EXECUTE_REMEDIATION)) + helpers.SendHealthEvent(ctx, t, event) + + t.Logf("Node %s should have condition SysLogsXIDError", testCtx.NodeName) + helpers.WaitForNodeConditionWithCheckName(ctx, t, client, testCtx.NodeName, "SysLogsXIDError", "", "SysLogsXIDErrorIsNotHealthy", v1.ConditionTrue) + + helpers.AssertQuarantineState(ctx, t, client, testCtx.NodeName, helpers.QuarantineAssertion{ + ExpectCordoned: true, + ExpectAnnotation: true, + }) + + event = helpers.NewHealthEvent(testCtx.NodeName). + WithErrorCode("DCGM_FR_CLOCK_THROTTLE_POWER"). + WithCheckName("GpuPowerWatch"). + WithFatal(false). + WithProcessingStrategy(int(protos.ProcessingStrategy_EXECUTE_REMEDIATION)) + helpers.SendHealthEvent(ctx, t, event) + + t.Logf("Node %s should have node event GpuPowerWatch", testCtx.NodeName) + expectedEvent := v1.Event{ + Type: "GpuPowerWatch", + Reason: "GpuPowerWatchIsNotHealthy", + Message: "ErrorCode:DCGM_FR_CLOCK_THROTTLE_POWER GPU:0 Recommended Action=NONE;", + } + helpers.WaitForNodeEvent(ctx, t, client, testCtx.NodeName, expectedEvent) + + helpers.AssertQuarantineState(ctx, t, client, testCtx.NodeName, helpers.QuarantineAssertion{ + ExpectCordoned: true, + ExpectAnnotation: true, + }) + + return ctx + }) + + feature.Teardown(func(ctx context.Context, t *testing.T, c *envconf.Config) context.Context { + event := helpers.NewHealthEvent(testCtx.NodeName). + WithErrorCode("79"). + WithHealthy(true). + WithAgent(helpers.SYSLOG_HEALTH_MONITOR_AGENT). + WithCheckName("SysLogsXIDError") + helpers.SendHealthEvent(ctx, t, event) + + return helpers.TeardownQuarantineTest(ctx, t, c) + }) + + testEnv.Test(t, feature.Feature()) +} diff --git a/tests/helpers/fault_quarantine.go b/tests/helpers/fault_quarantine.go index 0019eb3bc..e7e1726bd 100644 --- a/tests/helpers/fault_quarantine.go +++ b/tests/helpers/fault_quarantine.go @@ -138,9 +138,11 @@ func SetupQuarantineTestWithOptions(ctx context.Context, t *testing.T, c *envcon testCtx.ConfigMapBackup = backupData - t.Logf("Applying test configmap: %s", configMapPath) - err = createConfigMapFromFilePath(ctx, client, configMapPath, "fault-quarantine", NVSentinelNamespace) - require.NoError(t, err) + if configMapPath != "" { + t.Logf("Applying test configmap: %s", configMapPath) + err = createConfigMapFromFilePath(ctx, client, configMapPath, "fault-quarantine", NVSentinelNamespace) + require.NoError(t, err) + } if opts != nil { if opts.CircuitBreakerPercentage > 0 { diff --git a/tests/helpers/healthevent.go b/tests/helpers/healthevent.go index 246df5a17..f70177f12 100644 --- a/tests/helpers/healthevent.go +++ b/tests/helpers/healthevent.go @@ -45,6 +45,7 @@ type HealthEventTemplate struct { Metadata map[string]string `json:"metadata,omitempty"` QuarantineOverrides *QuarantineOverrides `json:"quarantineOverrides,omitempty"` NodeName string `json:"nodeName"` + ProcessingStrategy int `json:"processingStrategy,omitempty"` } type EntityImpacted struct { @@ -149,6 +150,11 @@ func (h *HealthEventTemplate) WithRecommendedAction(action int) *HealthEventTemp return h } +func (h *HealthEventTemplate) WithProcessingStrategy(strategy int) *HealthEventTemplate { + h.ProcessingStrategy = strategy + return h +} + func (h *HealthEventTemplate) WriteToTempFile() (string, error) { tempFile, err := os.CreateTemp("", "health-event-*.json") if err != nil { diff --git a/tests/helpers/kube.go b/tests/helpers/kube.go index c132b3f48..fe9fea7ee 100755 --- a/tests/helpers/kube.go +++ b/tests/helpers/kube.go @@ -384,6 +384,30 @@ func WaitForNodeEvent(ctx context.Context, t *testing.T, c klient.Client, nodeNa }, EventuallyWaitTimeout, WaitInterval, "node %s should have event %v", nodeName, expectedEvent) } +func EnsureNodeEventNotPresent(ctx context.Context, t *testing.T, + c klient.Client, nodeName string, eventType, eventReason string) { + t.Helper() + + require.Never(t, func() bool { + events, err := GetNodeEvents(ctx, c, nodeName, eventType) + if err != nil { + t.Logf("failed to get events for node %s: %v", nodeName, err) + return false + } + + for _, event := range events.Items { + if event.Type == eventType && event.Reason == eventReason { + t.Logf("node %s has event %v", nodeName, event) + return true + } + } + + t.Logf("node %s does not have event %v", nodeName, eventType) + + return false + }, NeverWaitTimeout, WaitInterval, "node %s should not have event %v", nodeName, eventType) +} + // SelectTestNodeFromUnusedPool selects an available test node from the cluster. // Prefers uncordoned nodes but will fall back to the first node if none are available. func SelectTestNodeFromUnusedPool(ctx context.Context, t *testing.T, client klient.Client) string {