Skip to content

Commit d3b285e

Browse files
authored
[POA-4078] Add agent metrics (#134)
Adds metrics to the agent letting us know: - How long is the buffer of pcap capture packets - How long is the buffer of unprocessed packets - How long is the buffer of parsed network traffic - How long is the buffer of unpaired partial witnesses
1 parent db2a0c1 commit d3b285e

File tree

4 files changed

+49
-3
lines changed

4 files changed

+49
-3
lines changed

pcap/net_parse.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -140,6 +140,9 @@ func (p *NetworkTrafficParser) ParseFromInterface(
140140
// Signal caller that we're done on exit
141141
defer close(out)
142142

143+
startTime := time.Now()
144+
bufferTimeSum := 0 * time.Second
145+
intervalLength := 1 * time.Minute
143146
for {
144147
select {
145148
// packets channel is going to read until EOF or when signalClose is
@@ -158,6 +161,16 @@ func (p *NetworkTrafficParser) ParseFromInterface(
158161

159162
return
160163
}
164+
165+
now := time.Now()
166+
if now.Sub(startTime) >= intervalLength {
167+
bufferLength := float64(bufferTimeSum.Nanoseconds()) / float64(intervalLength.Nanoseconds())
168+
printer.Debugf("Approximate unprocessed-packets buffer length: %v", bufferLength)
169+
bufferTimeSum = 0 * time.Second
170+
startTime = now
171+
}
172+
bufferTimeSum += now.Sub(packet.Metadata().Timestamp)
173+
161174
p.observer(packet)
162175
p.packetToParsedNetworkTraffic(out, assembler, packet)
163176
case <-ticker.C:

pcap/pcap.go

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -55,20 +55,32 @@ func (p *pcapImpl) capturePackets(done <-chan struct{}, interfaceName, bpfFilter
5555
}()
5656

5757
startTime := time.Now()
58-
count := 0
58+
firstPacket := true
59+
bufferTimeSum := 0 * time.Second
60+
intervalLength := 1 * time.Minute
5961
for {
6062
select {
6163
case <-done:
6264
return
6365
case pkt, ok := <-pktChan:
6466
if ok {
67+
now := time.Now()
68+
if now.Sub(startTime) >= intervalLength {
69+
bufferLength := float64(bufferTimeSum.Nanoseconds()) / float64(intervalLength.Nanoseconds())
70+
printer.Debugf("Aproximate captured-packets buffer length: %v", bufferLength)
71+
bufferTimeSum = 0 * time.Second
72+
startTime = now
73+
}
74+
bufferTimeSum += now.Sub(pkt.Metadata().Timestamp)
75+
6576
wrappedChan <- pkt
6677

67-
if count == 0 {
78+
if firstPacket {
79+
firstPacket = false
6880
ttfp := time.Since(startTime)
6981
printer.Debugf("Time to first packet on %s: %s\n", interfaceName, ttfp)
7082
}
71-
count += 1
83+
7284
} else {
7385
return
7486
}

pcap/run.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
package pcap
22

33
import (
4+
"time"
5+
46
"github.com/akitasoftware/akita-libs/akinet"
57
akihttp "github.com/akitasoftware/akita-libs/akinet/http"
68
akihttp2 "github.com/akitasoftware/akita-libs/akinet/http2"
@@ -11,6 +13,7 @@ import (
1113
"github.com/google/gopacket"
1214
"github.com/google/gopacket/layers"
1315
"github.com/pkg/errors"
16+
"github.com/postmanlabs/postman-insights-agent/printer"
1417
"github.com/postmanlabs/postman-insights-agent/trace"
1518
)
1619

@@ -50,7 +53,19 @@ func Collect(
5053
return errors.Wrap(err, "couldn't start parsing from interface")
5154
}
5255

56+
startTime := time.Now()
57+
bufferTimeSum := 0 * time.Second
58+
intervalLength := 1 * time.Minute
5359
for t := range parsedChan {
60+
now := time.Now()
61+
if now.Sub(startTime) >= intervalLength {
62+
bufferLength := float64(bufferTimeSum.Nanoseconds()) / float64(intervalLength.Nanoseconds())
63+
printer.Debugf("Aproximate parsed-network-traffic buffer length: %v", bufferLength)
64+
bufferTimeSum = 0 * time.Second
65+
startTime = now
66+
}
67+
bufferTimeSum += now.Sub(t.ObservationTime)
68+
5469
t.Interface = intf
5570
err := proc.Process(t)
5671
t.Content.ReleaseBuffers()

trace/backend_collector.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -425,6 +425,8 @@ func (c *BackendCollector) periodicFlush() {
425425
}
426426

427427
func (c *BackendCollector) flushPairCache(cutoffTime time.Time) {
428+
totalWitnesses := 0
429+
flushedWitnesses := 0
428430
c.pairCache.Range(func(k, v interface{}) bool {
429431
e := v.(*witnessWithInfo)
430432
if e.observationTime.Before(cutoffTime) {
@@ -435,7 +437,11 @@ func (c *BackendCollector) flushPairCache(cutoffTime time.Time) {
435437

436438
c.queueUpload(e)
437439
c.pairCache.Delete(k)
440+
441+
flushedWitnesses += 1
438442
}
443+
totalWitnesses += 1
439444
return true
440445
})
446+
printer.Debugf("flushed-witnesses in cache: %v, total-witnesses in cache: %v", flushedWitnesses, totalWitnesses)
441447
}

0 commit comments

Comments
 (0)