Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions backends/fireflyb/comms.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,29 @@ func (b *FireflyBackend) sendFirefly(flowID glowdTypes.FlowID, payload []byte) e
return errors.Join(sendErrors...)
}

func validateCollectorFlow(flowID glowdTypes.FlowID) error {
if flowID.Src.Port() == 0 || flowID.Dst.Port() == 0 {
return fmt.Errorf("ports must be non-zero")
}

if flowID.Src.Addr().IsUnspecified() || flowID.Dst.Addr().IsUnspecified() {
return fmt.Errorf("source and destination addresses must be specific")
}

switch flowID.State {
case glowdTypes.START:
if flowID.StartTs.IsZero() {
return fmt.Errorf("start flow is missing start timestamp")
}
case glowdTypes.END:
if flowID.EndTs.IsZero() {
return fmt.Errorf("end flow is missing end timestamp")
}
}

return nil
}

func (b *FireflyBackend) sendToCollector(payload []byte) error {
slog.Debug("sending firefly to the collector")

Expand Down
89 changes: 89 additions & 0 deletions backends/fireflyb/comms_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
package fireflyb

import (
"net/netip"
"testing"
"time"

glowdTypes "github.com/scitags/flowd-go/types"
)

func TestValidateCollectorFlow(t *testing.T) {
now := time.Now().UTC()

validStart := glowdTypes.FlowID{
State: glowdTypes.START,
Family: glowdTypes.IPv6,
Protocol: glowdTypes.TCP,
Src: netip.MustParseAddrPort("2001:db8::1:1234"),
Dst: netip.MustParseAddrPort("2001:db8::2:5678"),
StartTs: now,
}

tests := []struct {
name string
flowID glowdTypes.FlowID
wantErr bool
}{
{name: "valid start", flowID: validStart, wantErr: false},
{
name: "unspecified source",
flowID: glowdTypes.FlowID{
State: glowdTypes.START,
Family: glowdTypes.IPv6,
Protocol: glowdTypes.TCP,
Src: netip.MustParseAddrPort("[::]:1234"),
Dst: netip.MustParseAddrPort("2001:db8::2:5678"),
StartTs: now,
},
wantErr: true,
},
{
name: "zero destination port",
flowID: glowdTypes.FlowID{
State: glowdTypes.START,
Family: glowdTypes.IPv6,
Protocol: glowdTypes.TCP,
Src: netip.MustParseAddrPort("2001:db8::1:1234"),
Dst: netip.MustParseAddrPort("[2001:db8::2]:0"),
StartTs: now,
},
wantErr: true,
},
{
name: "start without timestamp",
flowID: glowdTypes.FlowID{
State: glowdTypes.START,
Family: glowdTypes.IPv6,
Protocol: glowdTypes.TCP,
Src: netip.MustParseAddrPort("2001:db8::1:1234"),
Dst: netip.MustParseAddrPort("2001:db8::2:5678"),
},
wantErr: true,
},
{
name: "end without timestamp",
flowID: glowdTypes.FlowID{
State: glowdTypes.END,
Family: glowdTypes.IPv6,
Protocol: glowdTypes.TCP,
Src: netip.MustParseAddrPort("2001:db8::1:1234"),
Dst: netip.MustParseAddrPort("2001:db8::2:5678"),
StartTs: now,
},
wantErr: true,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
err := validateCollectorFlow(tt.flowID)
if tt.wantErr && err == nil {
t.Fatalf("expected validation error, got nil")
}
if !tt.wantErr && err != nil {
t.Fatalf("unexpected validation error: %v", err)
}
})
}
}
7 changes: 7 additions & 0 deletions backends/marker/conf.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@ type Config struct {
RawMarkingStrategy string `yaml:"markingStrategy"`
MarkingStrategy Strategy `yaml:"-"` // Parsed strategy

// Fixed identifiers leveraged when marking all traffic
FixedExperimentId int `yaml:"-"`
FixedActivityId int `yaml:"-"`

DebugMode bool `yaml:"debugMode"`
MatchAll bool `yaml:"matchAll"`
}
Expand All @@ -34,6 +38,9 @@ func (c *Config) UnmarshalYAML(b []byte) error {
RemoveQdisc: true,
ProgramPath: "",

FixedExperimentId: -1,
FixedActivityId: -1,

RawMarkingStrategy: "label",
DebugMode: false,
MatchAll: false,
Expand Down
31 changes: 31 additions & 0 deletions backends/marker/marker.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,14 @@ package marker
import (
"fmt"
"log/slog"
"net/netip"
"os"
"time"

"math/rand"

"github.com/cilium/ebpf"
"github.com/scitags/flowd-go/types"
glowdTypes "github.com/scitags/flowd-go/types"
)

Expand Down Expand Up @@ -103,6 +105,35 @@ func NewMarkerBackend(c *Config) (*MarkerBackend, error) {
func (b *MarkerBackend) Run(done <-chan struct{}, inChan <-chan glowdTypes.FlowID) {
slog.Debug("running the marker backend")

if b.Config.FixedExperimentId != -1 || b.Config.FixedActivityId != -1 {
slog.Debug("triggering marking of all datagrams", "experimentId", b.Config.FixedExperimentId, "activityId", b.Config.FixedActivityId)

markAllFlowdId := types.FlowID{
State: types.START,
Family: types.IPv6,
Src: netip.AddrPortFrom(netip.IPv6Unspecified(), 0),
Dst: netip.AddrPortFrom(netip.IPv6Unspecified(), 0),
Experiment: uint32(b.Config.FixedExperimentId),
Activity: uint32(b.Config.FixedActivityId),
Application: types.SYSLOG_APP_NAME,
}

rawDstIPHi, rawDstIPLo := extractHalves(markAllFlowdId.Dst.Addr())
flowHash := FlowFourTuple{
IPv6Hi: rawDstIPHi,
IPv6Lo: rawDstIPLo,
DstPort: uint32(markAllFlowdId.Dst.Port()),
SrcPort: uint32(markAllFlowdId.Src.Port()),
}

flowTag := b.genFlowTag(markAllFlowdId.Experiment, markAllFlowdId.Activity)

if err := b.coll.Maps[MAP_NAME].Update(flowHash, flowTag, ebpf.UpdateAny); err != nil {
slog.Error("error inserting map value", "err", err, "flowHash", flowHash, "flowTag", flowTag)
}
slog.Debug("inserted map value", "flowHash", flowHash, "flowTag", flowTag)
}

for {
select {
case flowID, ok := <-inChan:
Expand Down
4 changes: 4 additions & 0 deletions cmd/conf.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,10 @@ func pluginBackendDependencies(c *Config) {
if c.Backends.Marker != nil {
slog.Warn("overriding marking criteria to match all for the marker backend")
c.Backends.Marker.MatchAll = true

slog.Debug("configuring fixed ids", "experimentId", c.Plugins.Perfsonar.ExperimentId, "activityId", c.Plugins.Perfsonar.ActivityId)
c.Backends.Marker.FixedExperimentId = c.Plugins.Perfsonar.ExperimentId
c.Backends.Marker.FixedActivityId = c.Plugins.Perfsonar.ActivityId
}
}
}
Expand Down
15 changes: 5 additions & 10 deletions plugins/perfsonar/perfsonar.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package perfsonar

import (
"log/slog"
"net/netip"

"github.com/scitags/flowd-go/types"
)
Expand Down Expand Up @@ -30,15 +29,11 @@ func (p *PerfsonarPlugin) Run(done <-chan struct{}, outChan chan<- types.FlowID)
* 0 disables checks within the eBPF program.
*/
slog.Debug("kicking off packet marking")
outChan <- types.FlowID{
State: types.START,
Family: types.IPv6,
Src: netip.AddrPortFrom(netip.IPv6Unspecified(), 0),
Dst: netip.AddrPortFrom(netip.IPv6Unspecified(), 0),
Experiment: uint32(p.ExperimentId),
Activity: uint32(p.ActivityId),
Application: types.SYSLOG_APP_NAME,
}

// The perfSONAR plugin is simply a no-op to maintain backwards compatibility with
// previous configurations. Configuration parsing and plugin bootstrapping take
// care of setting things up so that every IPv6 datagram is marked with the
// experiment and activity IDs specified in the configuration.

// Simply block until the done channel is closed so that we can exit
<-done
Expand Down