66 "time"
77
88 "github.com/akitasoftware/akita-libs/akinet"
9+ "github.com/akitasoftware/akita-libs/client_telemetry"
910 "github.com/postmanlabs/postman-insights-agent/printer"
1011 "github.com/spf13/viper"
1112)
@@ -90,8 +91,7 @@ func (r *SharedRateLimit) endInterval(end time.Time) {
9091 r .FirstEstimate = false
9192 } else {
9293 alpha := viper .GetFloat64 (RateLimitExponentialAlpha )
93- exponentialMovingAverage :=
94- (1 - alpha )* float64 (r .EstimatedSampleInterval ) + alpha * float64 (intervalLength )
94+ exponentialMovingAverage := (1 - alpha )* float64 (r .EstimatedSampleInterval ) + alpha * float64 (intervalLength )
9595 printer .Debugln ("New estimate:" , exponentialMovingAverage )
9696 r .EstimatedSampleInterval = time .Duration (uint64 (exponentialMovingAverage ))
9797 }
@@ -206,14 +206,18 @@ type rateLimitCollector struct {
206206
207207 // Channel from RateLimit for epoch starts
208208 epochCh chan time.Time
209+
210+ // Packet counter
211+ packetCount PacketCountConsumer
209212}
210213
211- func (r * SharedRateLimit ) NewCollector (next Collector ) Collector {
214+ func (r * SharedRateLimit ) NewCollector (next Collector , packetCounts PacketCountConsumer ) Collector {
212215 c := & rateLimitCollector {
213216 RateLimit : r ,
214217 NextCollector : next ,
215218 RequestArrivalTimes : make (map [requestKey ]time.Time ),
216219 epochCh : make (chan time.Time , 1 ),
220+ packetCount : packetCounts ,
217221 }
218222 r .lock .Lock ()
219223 defer r .lock .Unlock ()
@@ -240,6 +244,14 @@ func (r *rateLimitCollector) Process(pnt akinet.ParsedNetworkTraffic) error {
240244 r .NextCollector .Process (pnt )
241245 key := requestKey {c .StreamID .String (), c .Seq }
242246 r .RequestArrivalTimes [key ] = pnt .ObservationTime
247+ } else {
248+ r .packetCount .Update (client_telemetry.PacketCounts {
249+ Interface : pnt .Interface ,
250+ DstHost : c .Host ,
251+ SrcPort : pnt .SrcPort ,
252+ DstPort : pnt .DstPort ,
253+ HTTPRequestsRateLimited : 1 ,
254+ })
243255 }
244256 case akinet.HTTPResponse :
245257 // Collect iff the request is in our map. (This means responses to calls
0 commit comments