Skip to content

Commit 76eb475

Browse files
Migrate event subscribers to ClusterEventBus for local-only delivery (#25295)
* Migrate event subscribers to ClusterEventBus for local-only delivery Switch InputRoutingService, DeletedStreamNotificationListener, StreamDestinationFilterService, and StartPageCleanupListener from the server EventBus to ClusterEventBus.registerClusterEventSubscriber() so their handlers only fire on the originating node instead of being replicated to every node via ClusterEventPeriodical. DataNodeEventService intentionally stays on the server EventBus (added explanatory comment). Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * Removing useless comment. --------- Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
1 parent c4e42e9 commit 76eb475

File tree

7 files changed

+25
-24
lines changed

7 files changed

+25
-24
lines changed

graylog2-server/src/main/java/org/graylog2/inputs/InputRoutingService.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,10 @@
1616
*/
1717
package org.graylog2.inputs;
1818

19-
import com.google.common.eventbus.EventBus;
2019
import com.google.common.eventbus.Subscribe;
2120
import jakarta.inject.Inject;
2221
import jakarta.inject.Singleton;
22+
import org.graylog2.events.ClusterEventBus;
2323
import org.graylog.plugins.pipelineprocessor.db.PipelineDao;
2424
import org.graylog.plugins.pipelineprocessor.db.PipelineService;
2525
import org.graylog.plugins.pipelineprocessor.db.RuleDao;
@@ -67,14 +67,15 @@ public InputRoutingService(
6767
StreamService streamService,
6868
PipelineService pipelineService,
6969
PipelineRuleParser pipelineRuleParser,
70-
EventBus eventBus) {
70+
ClusterEventBus clusterEventBus) {
7171
this.ruleService = ruleService;
7272
this.inputService = inputService;
7373
this.streamService = streamService;
7474
this.pipelineService = pipelineService;
7575
this.pipelineRuleParser = pipelineRuleParser;
7676

77-
eventBus.register(this);
77+
// Subscribe on ClusterEventBus to only receive events originating on this node (local-only delivery).
78+
clusterEventBus.registerClusterEventSubscriber(this);
7879
}
7980

8081
/**

graylog2-server/src/main/java/org/graylog2/notifications/DeletedStreamNotificationListener.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,9 @@
1616
*/
1717
package org.graylog2.notifications;
1818

19-
import com.google.common.eventbus.EventBus;
2019
import com.google.common.eventbus.Subscribe;
2120
import jakarta.inject.Inject;
21+
import org.graylog2.events.ClusterEventBus;
2222
import org.graylog2.streams.events.StreamDeletedEvent;
2323
import org.slf4j.Logger;
2424
import org.slf4j.LoggerFactory;
@@ -30,9 +30,10 @@ public class DeletedStreamNotificationListener {
3030
private final NotificationService notificationService;
3131

3232
@Inject
33-
public DeletedStreamNotificationListener(EventBus eventBus, NotificationService notificationService) {
33+
public DeletedStreamNotificationListener(ClusterEventBus clusterEventBus, NotificationService notificationService) {
3434
this.notificationService = notificationService;
35-
eventBus.register(this);
35+
// Subscribe on ClusterEventBus to only receive events originating on this node (local-only delivery).
36+
clusterEventBus.registerClusterEventSubscriber(this);
3637
}
3738

3839
@Subscribe

graylog2-server/src/main/java/org/graylog2/streams/filters/StreamDestinationFilterService.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818

1919
import com.fasterxml.jackson.annotation.JsonProperty;
2020
import com.google.common.collect.ImmutableMap;
21-
import com.google.common.eventbus.EventBus;
2221
import com.google.common.eventbus.Subscribe;
2322
import org.graylog2.database.MongoCollection;
2423
import com.mongodb.client.model.Accumulators;
@@ -81,7 +80,6 @@ public class StreamDestinationFilterService {
8180
@Inject
8281
public StreamDestinationFilterService(MongoCollections mongoCollections,
8382
ClusterEventBus clusterEventBus,
84-
EventBus eventBus,
8583
Optional<DestinationFilterCreationValidator> optionalDestinationFilterCreationValidator) {
8684
this.collection = mongoCollections.collection(COLLECTION, StreamDestinationFilterRuleDTO.class);
8785
this.paginationHelper = mongoCollections.paginationHelper(collection);
@@ -93,7 +91,8 @@ public StreamDestinationFilterService(MongoCollections mongoCollections,
9391
collection.createIndex(Indexes.ascending(FIELD_DESTINATION_TYPE));
9492
collection.createIndex(Indexes.ascending(FIELD_STATUS));
9593

96-
eventBus.register(this);
94+
// Subscribe on ClusterEventBus to only receive events originating on this node (local-only delivery).
95+
clusterEventBus.registerClusterEventSubscriber(this);
9796
}
9897

9998
private Bson parseQuery(String queryString) {

graylog2-server/src/main/java/org/graylog2/users/StartPageCleanupListener.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,9 @@
1616
*/
1717
package org.graylog2.users;
1818

19-
import com.google.common.eventbus.EventBus;
2019
import com.google.common.eventbus.Subscribe;
2120
import jakarta.inject.Inject;
21+
import org.graylog2.events.ClusterEventBus;
2222
import org.graylog2.dashboards.events.DashboardDeletedEvent;
2323
import org.graylog2.plugin.database.ValidationException;
2424
import org.graylog2.rest.models.users.requests.DashboardStartPage;
@@ -35,10 +35,11 @@ public class StartPageCleanupListener {
3535
private final UserService userService;
3636

3737
@Inject
38-
public StartPageCleanupListener(EventBus serverEventBus,
38+
public StartPageCleanupListener(ClusterEventBus clusterEventBus,
3939
UserService userService) {
4040
this.userService = userService;
41-
serverEventBus.register(this);
41+
// Subscribe on ClusterEventBus to only receive events originating on this node (local-only delivery).
42+
clusterEventBus.registerClusterEventSubscriber(this);
4243
}
4344

4445
@Subscribe

graylog2-server/src/test/java/org/graylog2/inputs/routing/InputRoutingServiceTest.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717
package org.graylog2.inputs.routing;
1818

1919
import com.fasterxml.jackson.databind.ObjectMapper;
20-
import com.google.common.eventbus.EventBus;
2120
import org.apache.commons.io.IOUtils;
2221
import org.assertj.core.api.Assertions;
2322
import org.graylog.plugins.pipelineprocessor.db.PipelineDao;
@@ -31,6 +30,7 @@
3130
import org.graylog.plugins.pipelineprocessor.rest.PipelineUtils;
3231
import org.graylog2.database.NotFoundException;
3332
import org.graylog2.inputs.Input;
33+
import org.graylog2.events.ClusterEventBus;
3434
import org.graylog2.inputs.InputRoutingService;
3535
import org.graylog2.inputs.InputService;
3636
import org.graylog2.plugin.streams.Stream;
@@ -67,7 +67,7 @@ class InputRoutingServiceTest {
6767
final RuleService ruleService = mock(RuleService.class);
6868
final PipelineService pipelineService = mock(PipelineService.class);
6969
final PipelineRuleParser pipelineRuleParser = mock(PipelineRuleParser.class);
70-
final EventBus eventBus = mock(EventBus.class);
70+
final ClusterEventBus clusterEventBus = new ClusterEventBus();
7171
final ObjectMapper objectMapper = new ObjectMapperProvider().get();
7272

7373
InputRoutingService inputRoutingService;
@@ -82,7 +82,7 @@ class InputRoutingServiceTest {
8282
@BeforeEach
8383
void setUp() {
8484
inputRoutingService = new InputRoutingService(
85-
ruleService, inputService, streamService, pipelineService, pipelineRuleParser, eventBus);
85+
ruleService, inputService, streamService, pipelineService, pipelineRuleParser, clusterEventBus);
8686
}
8787

8888
@Test

graylog2-server/src/test/java/org/graylog2/notifications/DeletedStreamNotificationListenerTest.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,8 @@
1616
*/
1717
package org.graylog2.notifications;
1818

19-
import com.google.common.eventbus.EventBus;
2019
import org.assertj.core.api.Assertions;
20+
import org.graylog2.events.ClusterEventBus;
2121
import org.graylog2.streams.events.StreamDeletedEvent;
2222
import org.junit.jupiter.api.Test;
2323
import org.mockito.ArgumentCaptor;
@@ -31,11 +31,11 @@ class DeletedStreamNotificationListenerTest {
3131

3232
@Test
3333
void testNotificationDeletion() {
34-
EventBus eventBus = new EventBus();
34+
final ClusterEventBus clusterEventBus = new ClusterEventBus();
3535
final NotificationService notificationService = mockNotificationService("123", "456");
36-
new DeletedStreamNotificationListener(eventBus, notificationService);
36+
new DeletedStreamNotificationListener(clusterEventBus, notificationService);
3737

38-
eventBus.post(new StreamDeletedEvent("123", "stream title"));
38+
clusterEventBus.post(new StreamDeletedEvent("123", "stream title"));
3939

4040
final ArgumentCaptor<Notification> argumentCaptor = ArgumentCaptor.forClass(Notification.class);
4141
Mockito.verify(notificationService, Mockito.times(1)).destroy(argumentCaptor.capture());

graylog2-server/src/test/java/org/graylog2/streams/filters/StreamDestinationFilterServiceTest.java

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717
package org.graylog2.streams.filters;
1818

1919
import com.google.common.collect.ImmutableMultimap;
20-
import com.google.common.eventbus.EventBus;
2120
import com.google.common.util.concurrent.MoreExecutors;
2221
import com.mongodb.client.model.Sorts;
2322
import org.graylog.plugins.pipelineprocessor.rulebuilder.RuleBuilder;
@@ -49,12 +48,12 @@ class StreamDestinationFilterServiceTest {
4948
@Mock
5049
private DestinationFilterCreationValidator mockedFilterLicenseCheck;
5150
private StreamDestinationFilterService service;
52-
private EventBus eventBus;
51+
private ClusterEventBus clusterEventBus;
5352

5453
@BeforeEach
5554
void setUp(MongoCollections mongoCollections) {
56-
this.eventBus = new EventBus("stream-destination-filter-service");
57-
this.service = new StreamDestinationFilterService(mongoCollections, new ClusterEventBus(MoreExecutors.directExecutor()), eventBus, Optional.of(mockedFilterLicenseCheck));
55+
this.clusterEventBus = new ClusterEventBus(MoreExecutors.directExecutor());
56+
this.service = new StreamDestinationFilterService(mongoCollections, clusterEventBus, Optional.of(mockedFilterLicenseCheck));
5857
}
5958

6059
@Test
@@ -314,7 +313,7 @@ void streamDeletionEvent() {
314313
optionalDto = service.findByIdForStream("54e3deadbeefdeadbeef1000", "54e3deadbeefdeadbeef0002");
315314
assertThat(optionalDto).isPresent();
316315

317-
eventBus.post(new StreamDeletedEvent("54e3deadbeefdeadbeef1000", "Test Stream 1"));
316+
clusterEventBus.post(new StreamDeletedEvent("54e3deadbeefdeadbeef1000", "Test Stream 1"));
318317

319318
var afterDeletionEvent = service.findByIdForStream("54e3deadbeefdeadbeef1000", "54e3deadbeefdeadbeef0000");
320319
assertThat(afterDeletionEvent).isNotPresent();

0 commit comments

Comments
 (0)