diff --git a/libbeat/outputs/logstash/async.go b/libbeat/outputs/logstash/async.go index 760066b2f7c3..4c2eec26e1ec 100644 --- a/libbeat/outputs/logstash/async.go +++ b/libbeat/outputs/logstash/async.go @@ -54,6 +54,7 @@ type msgRef struct { win *window batchSize int deadlockListener *deadlockListener + start time.Time } func newAsyncClient( @@ -154,6 +155,7 @@ func (c *asyncClient) Publish(_ context.Context, batch publisher.Batch) error { win: c.win, err: nil, deadlockListener: newDeadlockListener(c.log, logstashDeadlockTimeout), + start: time.Time{}, } ref.count.Store(1) defer ref.dec() @@ -221,6 +223,7 @@ func (c *asyncClient) sendEvents(ref *msgRef, events []publisher.Event) error { window[i] = &events[i].Content } ref.count.Add(1) + ref.start = time.Now() return client.Send(ref.callback, window) } @@ -246,6 +249,10 @@ func (r *msgRef) callback(n uint32, err error) { r.win.tryGrowWindow(r.batchSize) } } + + // Report the latency for the batch of events + r.client.observer.ReportLatency(time.Since(r.start)) + r.dec() }