From 5b3c1798108fbba22f09359540e6da3d3b447437 Mon Sep 17 00:00:00 2001 From: Aditya Date: Sun, 26 Apr 2026 16:17:52 +0530 Subject: [PATCH] fix: prevent data loss in eventbus by applying backpressure --- internal/eventbus/bus.go | 2 -- internal/eventbus/bus_test.go | 52 +++++++++++++++++++++++++---------- 2 files changed, 38 insertions(+), 16 deletions(-) diff --git a/internal/eventbus/bus.go b/internal/eventbus/bus.go index 4d4cd1c..7f0575f 100644 --- a/internal/eventbus/bus.go +++ b/internal/eventbus/bus.go @@ -220,8 +220,6 @@ func (b *Bus) dispatch(ctx context.Context, s *subscriber, e Event) { atomic.AddUint64(&b.dropped, 1) case s.ch <- e: atomic.AddUint64(&b.delivered, 1) - default: - atomic.AddUint64(&b.dropped, 1) } } diff --git a/internal/eventbus/bus_test.go b/internal/eventbus/bus_test.go index 35aca49..bea7bd9 100644 --- a/internal/eventbus/bus_test.go +++ b/internal/eventbus/bus_test.go @@ -112,30 +112,54 @@ func TestPublish_MultipleSubscribersEachGetEvent(t *testing.T) { waitUntil(t, time.Second, func() bool { return a.Load() == 1 && c.Load() == 1 }, "both subscribers received") } -func TestPublish_NonBlocking_DropsWhenBufferFull(t *testing.T) { - b := New(2) +func TestPublish_Blocking_AppliesBackpressure(t *testing.T) { + b := New(1) defer b.Close(context.Background()) blocker := make(chan struct{}) - var started atomic.Int32 + var received atomic.Int32 + + // Subscriber that blocks until 'blocker' channel is closed b.Subscribe(EventSubdomainDiscovered, func(ctx context.Context, _ Event) { - started.Add(1) - <-blocker + <-blocker + received.Add(1) }) - // First event enters handler (blocks). Next 2 fill the buffer of size 2. - // Subsequent publishes should be counted as dropped. - for i := 0; i < 100; i++ { - b.Publish(context.Background(), NewSubdomainDiscovered("t", "x.example.com", "p")) + // Event 1: goes to handler (which freezes) + b.Publish(context.Background(), NewSubdomainDiscovered("t", "1.example.com", "p")) + + // Event 2: sits in the bus buffer (size is 1) + b.Publish(context.Background(), NewSubdomainDiscovered("t", "2.example.com", "p")) + + // Event 3: should trigger backpressure and block the publisher + publishDone := make(chan struct{}) + go func() { + b.Publish(context.Background(), NewSubdomainDiscovered("t", "3.example.com", "p")) + close(publishDone) + }() + + // Verify the 3rd publish is actually blocked + select { + case <-publishDone: + t.Fatal("Publish did not block when the buffer was full") + case <-time.After(50 * time.Millisecond): + // Expected: the goroutine is blocked waiting for space } - // Give the bus a moment to register drops. + // Unblock the subscriber so it processes the queue + close(blocker) + + // Wait for the blocked publish to finally complete + <-publishDone + + // Verify no data was lost waitUntil(t, time.Second, func() bool { - return b.Stats().Dropped > 0 - }, "some events dropped when buffer full") + return received.Load() == 3 + }, "all 3 events delivered after backpressure cleared") - // Unblock and close cleanly. - close(blocker) + if b.Stats().Dropped > 0 { + t.Errorf("expected 0 dropped events, got %d", b.Stats().Dropped) + } } func TestClose_DrainsAndStops(t *testing.T) {