Skip to content

Commit c91a8fa

Browse files
Merge pull request #191 from splitio/occupancy-sec-reg
Adding second region
2 parents 563216a + bf996e2 commit c91a8fa

File tree

3 files changed

+42
-15
lines changed

3 files changed

+42
-15
lines changed

client/src/main/java/io/split/engine/sse/PushStatusTrackerImp.java

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package io.split.engine.sse;
22

3+
import com.google.common.collect.Maps;
34
import io.split.engine.common.PushManager;
45
import io.split.engine.sse.client.SSEClient;
56
import io.split.engine.sse.dtos.ControlNotification;
@@ -9,6 +10,7 @@
910
import org.slf4j.Logger;
1011
import org.slf4j.LoggerFactory;
1112

13+
import java.util.concurrent.ConcurrentMap;
1214
import java.util.concurrent.LinkedBlockingQueue;
1315
import java.util.concurrent.atomic.AtomicBoolean;
1416
import java.util.concurrent.atomic.AtomicReference;
@@ -20,6 +22,7 @@ public class PushStatusTrackerImp implements PushStatusTracker {
2022
private final AtomicReference<SSEClient.StatusMessage> _sseStatus = new AtomicReference<>(SSEClient.StatusMessage.INITIALIZATION_IN_PROGRESS);
2123
private final AtomicReference<ControlType> _backendStatus = new AtomicReference<>(ControlType.STREAMING_RESUMED);
2224
private final LinkedBlockingQueue<PushManager.Status> _statusMessages;
25+
private final ConcurrentMap<String, Integer> regions = Maps.newConcurrentMap();
2326

2427
public PushStatusTrackerImp(LinkedBlockingQueue<PushManager.Status> statusMessages) {
2528
_statusMessages = statusMessages;
@@ -98,9 +101,11 @@ public void handleIncomingOccupancyEvent(OccupancyNotification occupancyNotifica
98101
_log.debug(String.format("handleIncomingOccupancyEvent: publishers=%d", occupancyNotification.getMetrics().getPublishers()));
99102

100103
int publishers = occupancyNotification.getMetrics().getPublishers();
101-
if (publishers <= 0 && _publishersOnline.compareAndSet(true, false) && _backendStatus.get().equals(ControlType.STREAMING_RESUMED)) {
104+
regions.put(occupancyNotification.getChannel(), publishers);
105+
boolean isPublishers = isPublishers();
106+
if (!isPublishers && _publishersOnline.compareAndSet(true, false) && _backendStatus.get().equals(ControlType.STREAMING_RESUMED)) {
102107
_statusMessages.offer(PushManager.Status.STREAMING_DOWN);
103-
} else if (publishers >= 1 && _publishersOnline.compareAndSet(false, true) && _backendStatus.get().equals(ControlType.STREAMING_RESUMED)) {
108+
} else if (isPublishers && _publishersOnline.compareAndSet(false, true) && _backendStatus.get().equals(ControlType.STREAMING_RESUMED)) {
104109
_statusMessages.offer(PushManager.Status.STREAMING_READY);
105110
}
106111
}
@@ -114,6 +119,7 @@ public void handleIncomingAblyError(ErrorNotification notification) {
114119
}
115120
if (notification.getCode() >= 40140 && notification.getCode() <= 40149) {
116121
_statusMessages.offer(PushManager.Status.STREAMING_BACKOFF);
122+
return;
117123
}
118124
if (notification.getCode() >= 40000 && notification.getCode() <= 49999) {
119125
_statusMessages.offer(PushManager.Status.STREAMING_OFF);
@@ -129,4 +135,13 @@ public synchronized void forcePushDisable() {
129135
_backendStatus.set(ControlType.STREAMING_DISABLED);
130136
_statusMessages.offer(PushManager.Status.STREAMING_OFF);
131137
}
138+
139+
private boolean isPublishers() {
140+
for(Integer publisher : regions.values()) {
141+
if (publisher > 0) {
142+
return true;
143+
}
144+
}
145+
return false;
146+
}
132147
}

client/src/main/java/io/split/engine/sse/dtos/OccupancyNotification.java

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@
44
import io.split.engine.sse.NotificationProcessor;
55

66
public class OccupancyNotification extends IncomingNotification implements StatusNotification {
7-
private static final String CONTROL_PRI_CHANNEL = "control_pri";
87
private final OccupancyMetrics metrics;
98

109
public OccupancyNotification(GenericNotificationData genericNotificationData) {
@@ -23,9 +22,7 @@ public void handler(NotificationProcessor notificationProcessor) {
2322

2423
@Override
2524
public void handlerStatus(PushStatusTracker notificationManagerKeeper) {
26-
if (CONTROL_PRI_CHANNEL.equals(getChannel())) {
27-
notificationManagerKeeper.handleIncomingOccupancyEvent(this);
28-
}
25+
notificationManagerKeeper.handleIncomingOccupancyEvent(this);
2926
}
3027

3128
@Override

client/src/test/java/io/split/engine/sse/PushStatusTrackerTest.java

Lines changed: 24 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ public void HandleControlEventStreamingResumedShouldNotifyEvent() throws Interru
3737
@Test
3838
public void HandleControlEventStreamingResumedShouldNotNotifyEvent() {
3939
LinkedBlockingQueue<PushManager.Status> messages = new LinkedBlockingQueue<>();
40-
OccupancyNotification occupancyNotification = buildOccupancyNotification(0);
40+
OccupancyNotification occupancyNotification = buildOccupancyNotification(0, null);
4141
ControlNotification controlNotification = buildControlNotification(ControlType.STREAMING_RESUMED);
4242

4343
PushStatusTracker pushStatusTracker = new PushStatusTrackerImp(messages);
@@ -65,7 +65,7 @@ public void HandleControlEventStreamingDisabledShouldNotifyShutdownEvent() {
6565
@Test
6666
public void HandleOccupancyEventWithPublishersFirstTimeShouldNotNotifyEvent() {
6767
LinkedBlockingQueue<PushManager.Status> messages = new LinkedBlockingQueue<>();
68-
OccupancyNotification occupancyNotification = buildOccupancyNotification(2);
68+
OccupancyNotification occupancyNotification = buildOccupancyNotification(2, null);
6969

7070
PushStatusTracker pushStatusTracker = new PushStatusTrackerImp(messages);
7171
pushStatusTracker.handleIncomingOccupancyEvent(occupancyNotification);
@@ -76,8 +76,23 @@ public void HandleOccupancyEventWithPublishersFirstTimeShouldNotNotifyEvent() {
7676
public void HandleOccupancyEventWithPublishersAndWithStreamingDisabledShouldNotifyEvent() throws InterruptedException {
7777
LinkedBlockingQueue<PushManager.Status> messages = new LinkedBlockingQueue<>();
7878
PushStatusTracker pushStatusTracker = new PushStatusTrackerImp(messages);
79-
pushStatusTracker.handleIncomingOccupancyEvent(buildOccupancyNotification(0));
80-
pushStatusTracker.handleIncomingOccupancyEvent(buildOccupancyNotification(2));
79+
pushStatusTracker.handleIncomingOccupancyEvent(buildOccupancyNotification(0, null));
80+
pushStatusTracker.handleIncomingOccupancyEvent(buildOccupancyNotification(2, null));
81+
82+
assertThat(messages.size(), is(equalTo(2)));
83+
PushManager.Status m1 = messages.take();
84+
assertThat(m1, is(equalTo(PushManager.Status.STREAMING_DOWN)));
85+
86+
PushManager.Status m2 = messages.take();
87+
assertThat(m2, is(equalTo(PushManager.Status.STREAMING_READY)));
88+
}
89+
90+
@Test
91+
public void HandleOccupancyEventWithDifferentChannelsPublishersShouldNotifyEvent() throws InterruptedException {
92+
LinkedBlockingQueue<PushManager.Status> messages = new LinkedBlockingQueue<>();
93+
PushStatusTracker pushStatusTracker = new PushStatusTrackerImp(messages);
94+
pushStatusTracker.handleIncomingOccupancyEvent(buildOccupancyNotification(0, "control_pri"));
95+
pushStatusTracker.handleIncomingOccupancyEvent(buildOccupancyNotification(2, "control_sec"));
8196

8297
assertThat(messages.size(), is(equalTo(2)));
8398
PushManager.Status m1 = messages.take();
@@ -88,14 +103,14 @@ public void HandleOccupancyEventWithPublishersAndWithStreamingDisabledShouldNoti
88103
}
89104

90105
private ControlNotification buildControlNotification(ControlType controlType) {
91-
return new ControlNotification(buildGenericData(controlType, IncomingNotification.Type.CONTROL,null));
106+
return new ControlNotification(buildGenericData(controlType, IncomingNotification.Type.CONTROL,null, null));
92107
}
93108

94-
private OccupancyNotification buildOccupancyNotification(int publishers) {
95-
return new OccupancyNotification(buildGenericData(null, IncomingNotification.Type.OCCUPANCY, publishers));
109+
private OccupancyNotification buildOccupancyNotification(int publishers, String channel) {
110+
return new OccupancyNotification(buildGenericData(null, IncomingNotification.Type.OCCUPANCY, publishers, channel));
96111
}
97112

98-
private GenericNotificationData buildGenericData(ControlType controlType, IncomingNotification.Type type, Integer publishers) {
113+
private GenericNotificationData buildGenericData(ControlType controlType, IncomingNotification.Type type, Integer publishers, String channel) {
99114
return new GenericNotificationData(
100115
null,
101116
null,
@@ -104,6 +119,6 @@ private GenericNotificationData buildGenericData(ControlType controlType, Incomi
104119
publishers != null ? new OccupancyMetrics(publishers) : null,
105120
null,
106121
type,
107-
"channel-test");
122+
channel == null ? "channel-test" : channel);
108123
}
109124
}

0 commit comments

Comments
 (0)