diff --git a/backends/fireflyb/comms.go b/backends/fireflyb/comms.go index cfa176a..773be11 100644 --- a/backends/fireflyb/comms.go +++ b/backends/fireflyb/comms.go @@ -37,6 +37,29 @@ func (b *FireflyBackend) sendFirefly(flowID glowdTypes.FlowID, payload []byte) e return errors.Join(sendErrors...) } +func validateCollectorFlow(flowID glowdTypes.FlowID) error { + if flowID.Src.Port() == 0 || flowID.Dst.Port() == 0 { + return fmt.Errorf("ports must be non-zero") + } + + if flowID.Src.Addr().IsUnspecified() || flowID.Dst.Addr().IsUnspecified() { + return fmt.Errorf("source and destination addresses must be specific") + } + + switch flowID.State { + case glowdTypes.START: + if flowID.StartTs.IsZero() { + return fmt.Errorf("start flow is missing start timestamp") + } + case glowdTypes.END: + if flowID.EndTs.IsZero() { + return fmt.Errorf("end flow is missing end timestamp") + } + } + + return nil +} + func (b *FireflyBackend) sendToCollector(payload []byte) error { slog.Debug("sending firefly to the collector") diff --git a/backends/fireflyb/comms_test.go b/backends/fireflyb/comms_test.go new file mode 100644 index 0000000..30690ad --- /dev/null +++ b/backends/fireflyb/comms_test.go @@ -0,0 +1,89 @@ +package fireflyb + +import ( + "net/netip" + "testing" + "time" + + glowdTypes "github.com/scitags/flowd-go/types" +) + +func TestValidateCollectorFlow(t *testing.T) { + now := time.Now().UTC() + + validStart := glowdTypes.FlowID{ + State: glowdTypes.START, + Family: glowdTypes.IPv6, + Protocol: glowdTypes.TCP, + Src: netip.MustParseAddrPort("2001:db8::1:1234"), + Dst: netip.MustParseAddrPort("2001:db8::2:5678"), + StartTs: now, + } + + tests := []struct { + name string + flowID glowdTypes.FlowID + wantErr bool + }{ + {name: "valid start", flowID: validStart, wantErr: false}, + { + name: "unspecified source", + flowID: glowdTypes.FlowID{ + State: glowdTypes.START, + Family: glowdTypes.IPv6, + Protocol: glowdTypes.TCP, + Src: netip.MustParseAddrPort("[::]:1234"), + Dst: netip.MustParseAddrPort("2001:db8::2:5678"), + StartTs: now, + }, + wantErr: true, + }, + { + name: "zero destination port", + flowID: glowdTypes.FlowID{ + State: glowdTypes.START, + Family: glowdTypes.IPv6, + Protocol: glowdTypes.TCP, + Src: netip.MustParseAddrPort("2001:db8::1:1234"), + Dst: netip.MustParseAddrPort("[2001:db8::2]:0"), + StartTs: now, + }, + wantErr: true, + }, + { + name: "start without timestamp", + flowID: glowdTypes.FlowID{ + State: glowdTypes.START, + Family: glowdTypes.IPv6, + Protocol: glowdTypes.TCP, + Src: netip.MustParseAddrPort("2001:db8::1:1234"), + Dst: netip.MustParseAddrPort("2001:db8::2:5678"), + }, + wantErr: true, + }, + { + name: "end without timestamp", + flowID: glowdTypes.FlowID{ + State: glowdTypes.END, + Family: glowdTypes.IPv6, + Protocol: glowdTypes.TCP, + Src: netip.MustParseAddrPort("2001:db8::1:1234"), + Dst: netip.MustParseAddrPort("2001:db8::2:5678"), + StartTs: now, + }, + wantErr: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + err := validateCollectorFlow(tt.flowID) + if tt.wantErr && err == nil { + t.Fatalf("expected validation error, got nil") + } + if !tt.wantErr && err != nil { + t.Fatalf("unexpected validation error: %v", err) + } + }) + } +} diff --git a/backends/marker/conf.go b/backends/marker/conf.go index ca02071..e1abfb5 100644 --- a/backends/marker/conf.go +++ b/backends/marker/conf.go @@ -19,6 +19,10 @@ type Config struct { RawMarkingStrategy string `yaml:"markingStrategy"` MarkingStrategy Strategy `yaml:"-"` // Parsed strategy + // Fixed identifiers leveraged when marking all traffic + FixedExperimentId int `yaml:"-"` + FixedActivityId int `yaml:"-"` + DebugMode bool `yaml:"debugMode"` MatchAll bool `yaml:"matchAll"` } @@ -34,6 +38,9 @@ func (c *Config) UnmarshalYAML(b []byte) error { RemoveQdisc: true, ProgramPath: "", + FixedExperimentId: -1, + FixedActivityId: -1, + RawMarkingStrategy: "label", DebugMode: false, MatchAll: false, diff --git a/backends/marker/marker.go b/backends/marker/marker.go index 46f223b..c51f62d 100644 --- a/backends/marker/marker.go +++ b/backends/marker/marker.go @@ -5,12 +5,14 @@ package marker import ( "fmt" "log/slog" + "net/netip" "os" "time" "math/rand" "github.com/cilium/ebpf" + "github.com/scitags/flowd-go/types" glowdTypes "github.com/scitags/flowd-go/types" ) @@ -103,6 +105,35 @@ func NewMarkerBackend(c *Config) (*MarkerBackend, error) { func (b *MarkerBackend) Run(done <-chan struct{}, inChan <-chan glowdTypes.FlowID) { slog.Debug("running the marker backend") + if b.Config.FixedExperimentId != -1 || b.Config.FixedActivityId != -1 { + slog.Debug("triggering marking of all datagrams", "experimentId", b.Config.FixedExperimentId, "activityId", b.Config.FixedActivityId) + + markAllFlowdId := types.FlowID{ + State: types.START, + Family: types.IPv6, + Src: netip.AddrPortFrom(netip.IPv6Unspecified(), 0), + Dst: netip.AddrPortFrom(netip.IPv6Unspecified(), 0), + Experiment: uint32(b.Config.FixedExperimentId), + Activity: uint32(b.Config.FixedActivityId), + Application: types.SYSLOG_APP_NAME, + } + + rawDstIPHi, rawDstIPLo := extractHalves(markAllFlowdId.Dst.Addr()) + flowHash := FlowFourTuple{ + IPv6Hi: rawDstIPHi, + IPv6Lo: rawDstIPLo, + DstPort: uint32(markAllFlowdId.Dst.Port()), + SrcPort: uint32(markAllFlowdId.Src.Port()), + } + + flowTag := b.genFlowTag(markAllFlowdId.Experiment, markAllFlowdId.Activity) + + if err := b.coll.Maps[MAP_NAME].Update(flowHash, flowTag, ebpf.UpdateAny); err != nil { + slog.Error("error inserting map value", "err", err, "flowHash", flowHash, "flowTag", flowTag) + } + slog.Debug("inserted map value", "flowHash", flowHash, "flowTag", flowTag) + } + for { select { case flowID, ok := <-inChan: diff --git a/cmd/conf.go b/cmd/conf.go index 1ddc67e..760bd9c 100644 --- a/cmd/conf.go +++ b/cmd/conf.go @@ -106,6 +106,10 @@ func pluginBackendDependencies(c *Config) { if c.Backends.Marker != nil { slog.Warn("overriding marking criteria to match all for the marker backend") c.Backends.Marker.MatchAll = true + + slog.Debug("configuring fixed ids", "experimentId", c.Plugins.Perfsonar.ExperimentId, "activityId", c.Plugins.Perfsonar.ActivityId) + c.Backends.Marker.FixedExperimentId = c.Plugins.Perfsonar.ExperimentId + c.Backends.Marker.FixedActivityId = c.Plugins.Perfsonar.ActivityId } } } diff --git a/plugins/perfsonar/perfsonar.go b/plugins/perfsonar/perfsonar.go index 1cbe86c..055737c 100644 --- a/plugins/perfsonar/perfsonar.go +++ b/plugins/perfsonar/perfsonar.go @@ -2,7 +2,6 @@ package perfsonar import ( "log/slog" - "net/netip" "github.com/scitags/flowd-go/types" ) @@ -30,15 +29,11 @@ func (p *PerfsonarPlugin) Run(done <-chan struct{}, outChan chan<- types.FlowID) * 0 disables checks within the eBPF program. */ slog.Debug("kicking off packet marking") - outChan <- types.FlowID{ - State: types.START, - Family: types.IPv6, - Src: netip.AddrPortFrom(netip.IPv6Unspecified(), 0), - Dst: netip.AddrPortFrom(netip.IPv6Unspecified(), 0), - Experiment: uint32(p.ExperimentId), - Activity: uint32(p.ActivityId), - Application: types.SYSLOG_APP_NAME, - } + + // The perfSONAR plugin is simply a no-op to maintain backwards compatibility with + // previous configurations. Configuration parsing and plugin bootstrapping take + // care of setting things up so that every IPv6 datagram is marked with the + // experiment and activity IDs specified in the configuration. // Simply block until the done channel is closed so that we can exit <-done