Skip to content

Commit 7ed99da

Browse files
authored
[POA-2544] Updates to the initial telemetry workflow (#65)
With this PR, we have made some changes to the initial telemetry flows. 1. We have removed the statsLogDelay config. We now no longer send an initial telemetry after 60 seconds of startup. 2. We now report successful telemetry whenever we see a successful capture of a request and a response. This compensates for the long wait time of 5 minutes, i.e., the default telemetry interval. Implementation inspired from [here](https://postmanlabs.atlassian.net/wiki/spaces/PO/pages/5811765834/Plan+Warning+about+not+seeing+HTTP+traffic+needs+to+be+adjusted?focusedCommentId=5830148712).
1 parent c5ca73e commit 7ed99da

File tree

5 files changed

+49
-52
lines changed

5 files changed

+49
-52
lines changed

apidump/apidump.go

Lines changed: 23 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@ import (
1717
kgxapi "github.com/akitasoftware/akita-libs/api_schema"
1818
"github.com/akitasoftware/akita-libs/buffer_pool"
1919
"github.com/akitasoftware/akita-libs/tags"
20-
"github.com/akitasoftware/go-utils/math"
2120
"github.com/akitasoftware/go-utils/optionals"
2221
"github.com/pkg/errors"
2322
"github.com/postmanlabs/postman-insights-agent/apispec"
@@ -116,9 +115,6 @@ type Args struct {
116115
// How often to rotate learn sessions; set to zero to disable rotation.
117116
LearnSessionLifetime time.Duration
118117

119-
// Print packet capture statistics after N seconds.
120-
StatsLogDelay int
121-
122118
// Periodically report telemetry every N seconds thereafter
123119
TelemetryInterval int
124120

@@ -153,15 +149,20 @@ type apidump struct {
153149
backendSvcName string
154150
learnClient rest.LearnClient
155151

156-
startTime time.Time
157-
dumpSummary *Summary
152+
startTime time.Time
153+
dumpSummary *Summary
154+
successTelemetry *trace.SuccessTelemetry
158155
}
159156

160157
// Start a new apidump session based on the given arguments.
161158
func newSession(args *Args) *apidump {
162159
a := &apidump{
163160
Args: args,
164161
startTime: time.Now(),
162+
successTelemetry: &trace.SuccessTelemetry{
163+
Channel: make(chan struct{}),
164+
Once: sync.Once{},
165+
},
165166
}
166167
return a
167168
}
@@ -200,7 +201,7 @@ func (a *apidump) LookupService() error {
200201
return nil
201202
}
202203

203-
// Send the initial mesage to the backend indicating successful start
204+
// Send the initial message to the backend indicating successful start
204205
func (a *apidump) SendInitialTelemetry() {
205206
// Do not send packet capture telemetry for local captures.
206207
if !a.TargetIsRemote() {
@@ -209,11 +210,11 @@ func (a *apidump) SendInitialTelemetry() {
209210

210211
// XXX(cns): The observed duration serves as a key for upserting packet
211212
// telemetry, so it needs to be the same here as in the packet
212-
// telemetry sent sixty seconds after startup.
213+
// telemetry sent 5 minutes after startup.
213214
req := kgxapi.PostInitialClientTelemetryRequest{
214215
ClientID: a.ClientID,
215216
ObservedStartingAt: a.startTime,
216-
ObservedDurationInSeconds: a.StatsLogDelay,
217+
ObservedDurationInSeconds: a.TelemetryInterval,
217218
SendsWitnessPayloads: a.ReproMode,
218219
CLIVersion: version.ReleaseVersion().String(),
219220
CLITargetArch: architecture.GetCanonicalArch(),
@@ -234,7 +235,7 @@ func (a *apidump) SendInitialTelemetry() {
234235
// Send a message to the backend indicating failure to start and a cause
235236
func (a *apidump) SendErrorTelemetry(errorType api_schema.ApidumpErrorType, err error) {
236237
req := &kgxapi.PostClientPacketCaptureStatsRequest{
237-
ObservedDurationInSeconds: a.StatsLogDelay,
238+
ObservedDurationInSeconds: a.TelemetryInterval,
238239
ApidumpError: errorType,
239240
ApidumpErrorText: err.Error(),
240241
}
@@ -414,31 +415,16 @@ func (a *apidump) RotateLearnSession(done <-chan struct{}, collectors []trace.Le
414415

415416
// Goroutine to send telemetry, stop when "done" is closed.
416417
//
417-
// Prints a summary after a short delay. This ensures that statistics will
418-
// appear in customer logs close to when the process is started.
419-
// Omits if args.StatsLogDelay is <= 0.
420-
//
421418
// Sends telemetry to the server on a regular basis.
422419
// Omits if args.TelemetryInterval is <= 0
423420
func (a *apidump) TelemetryWorker(done <-chan struct{}) {
424-
if a.StatsLogDelay <= 0 && a.TelemetryInterval <= 0 {
421+
if a.TelemetryInterval <= 0 {
425422
return
426423
}
427424

428425
a.SendInitialTelemetry()
429426

430-
if a.StatsLogDelay > 0 {
431-
// Wait while capturing statistics.
432-
time.Sleep(time.Duration(a.StatsLogDelay) * time.Second)
433-
434-
// Print telemetry data.
435-
printer.Stderr.Infof("Printing packet capture statistics after %d seconds of capture.\n", a.StatsLogDelay)
436-
a.dumpSummary.PrintPacketCounts()
437-
a.dumpSummary.PrintWarnings()
438-
439-
a.SendPacketTelemetry(a.StatsLogDelay)
440-
}
441-
427+
subsequentTelemetrySent := false
442428
if a.TelemetryInterval > 0 {
443429
ticker := time.NewTicker(time.Duration(a.TelemetryInterval) * time.Second)
444430

@@ -449,6 +435,11 @@ func (a *apidump) TelemetryWorker(done <-chan struct{}) {
449435
case now := <-ticker.C:
450436
duration := int(now.Sub(a.startTime) / time.Second)
451437
a.SendPacketTelemetry(duration)
438+
subsequentTelemetrySent = true
439+
case <-a.successTelemetry.Channel:
440+
if !subsequentTelemetrySent {
441+
a.SendPacketTelemetry(a.TelemetryInterval)
442+
}
452443
}
453444
}
454445
}
@@ -641,15 +632,8 @@ func (a *apidump) Run() error {
641632
// when the main collection process does.
642633
if a.TargetIsRemote() {
643634
{
644-
// Record the first resource usage data slightly before the
645-
// stats log delay to ensure we include usage data in the first
646-
// telemetry upload.
647-
var delay time.Duration
648-
if 0 < a.StatsLogDelay {
649-
delay = time.Duration(math.Max(a.StatsLogDelay-5, 1)) * time.Second
650-
}
651-
652-
go usage.Poll(stop, delay, time.Duration(a.ProcFSPollingInterval)*time.Second)
635+
// Record the first usage immediately (sending delay = 0) since we want to include it in the success telemetry
636+
go usage.Poll(stop, 0, time.Duration(a.ProcFSPollingInterval)*time.Second)
653637
}
654638

655639
go a.TelemetryWorker(stop)
@@ -724,8 +708,9 @@ func (a *apidump) Run() error {
724708
// trace is empty or not.) In the future we could add columns for both
725709
// pre- and post-filtering.
726710
collector = &trace.PacketCountCollector{
727-
PacketCounts: summary,
728-
Collector: collector,
711+
PacketCounts: summary,
712+
Collector: collector,
713+
SuccessTelemetry: a.successTelemetry,
729714
}
730715

731716
// Subsampling.

apispec/defaults.go

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,9 +27,6 @@ const (
2727
// How many requests to capture per minute.
2828
DefaultRateLimit = 1000.0
2929

30-
// How long to wait after starting up before printing packet-capture statistics.
31-
DefaultStatsLogDelay_seconds = 60
32-
3330
// How often to upload client telemetry.
3431
DefaultTelemetryInterval_seconds = 5 * 60 // 5 minutes
3532

cmd/internal/apidump/apidump.go

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,6 @@ var (
3232
execCommandUserFlag string
3333
pluginsFlag []string
3434
traceRotateFlag string
35-
statsLogDelay int
3635
telemetryInterval int
3736
procFSPollingInterval int
3837
collectTCPAndTLSReports bool
@@ -212,7 +211,6 @@ func apidumpRunInternal(cmd *cobra.Command, _ []string) error {
212211
ExecCommandUser: execCommandUserFlag,
213212
Plugins: plugins,
214213
LearnSessionLifetime: traceRotateInterval,
215-
StatsLogDelay: statsLogDelay,
216214
TelemetryInterval: telemetryInterval,
217215
ProcFSPollingInterval: procFSPollingInterval,
218216
CollectTCPAndTLSReports: collectTCPAndTLSReports,
@@ -309,13 +307,6 @@ func init() {
309307
)
310308
Cmd.Flags().MarkHidden("trace-rotate")
311309

312-
Cmd.Flags().IntVar(
313-
&statsLogDelay,
314-
"stats-log-delay",
315-
apispec.DefaultStatsLogDelay_seconds,
316-
"Print packet capture statistics after N seconds.",
317-
)
318-
319310
Cmd.Flags().IntVar(
320311
&telemetryInterval,
321312
"telemetry-interval",

trace/collector.go

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"math"
55
"sort"
66
"strconv"
7+
"sync"
78

89
"github.com/OneOfOne/xxhash"
910
"github.com/akitasoftware/akita-libs/akid"
@@ -14,6 +15,11 @@ import (
1415
"github.com/spf13/viper"
1516
)
1617

18+
type SuccessTelemetry struct {
19+
Channel chan struct{}
20+
Once sync.Once
21+
}
22+
1723
type Collector interface {
1824
// Hands new data from network to the collector. The implementation may choose
1925
// to process them asynchronously (e.g. to wait for the response to a
@@ -98,8 +104,9 @@ func (sc *UserTrafficCollector) Close() error {
98104

99105
// This is a shim to add packet counts based on payload type.
100106
type PacketCountCollector struct {
101-
PacketCounts PacketCountConsumer
102-
Collector Collector
107+
PacketCounts PacketCountConsumer
108+
Collector Collector
109+
SuccessTelemetry *SuccessTelemetry
103110
}
104111

105112
// Don't record self-generated traffic in the breakdown by hostname,
@@ -192,9 +199,17 @@ func (pc *PacketCountCollector) Process(t akinet.ParsedNetworkTraffic) error {
192199
Unparsed: 1,
193200
})
194201
}
202+
if pc.PacketCounts.HasRequestAndResponse() {
203+
pc.SendSuccessTelemetry()
204+
}
195205
return pc.Collector.Process(t)
196206
}
197207

208+
func (pc *PacketCountCollector) SendSuccessTelemetry() {
209+
pc.SuccessTelemetry.Once.Do(func() {
210+
pc.SuccessTelemetry.Channel <- struct{}{}
211+
})
212+
}
198213
func (pc *PacketCountCollector) Close() error {
199214
return pc.Collector.Close()
200215
}

trace/stats.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,9 @@ import (
1515
type PacketCountConsumer interface {
1616
// Add an additional measurement to the current count
1717
Update(delta PacketCounts)
18+
19+
// Returns true if we have successfully received at least one request and a response
20+
HasRequestAndResponse() bool
1821
}
1922

2023
// Discard the count
@@ -127,6 +130,12 @@ func (s *PacketCounter) Update(c PacketCounts) {
127130
s.total.Add(c)
128131
}
129132

133+
func (s *PacketCounter) HasRequestAndResponse() bool {
134+
s.mutex.RLock()
135+
defer s.mutex.RUnlock()
136+
return s.total.HTTPRequests > 0 && s.total.HTTPResponses > 0
137+
}
138+
130139
func (s *PacketCounter) Total() PacketCounts {
131140
s.mutex.RLock()
132141
defer s.mutex.RUnlock()

0 commit comments

Comments
 (0)