1
1
package io .split .engine .sse ;
2
2
3
+ import com .google .common .collect .Maps ;
3
4
import io .split .engine .common .PushManager ;
4
5
import io .split .engine .sse .client .SSEClient ;
5
6
import io .split .engine .sse .dtos .ControlNotification ;
9
10
import org .slf4j .Logger ;
10
11
import org .slf4j .LoggerFactory ;
11
12
13
+ import java .util .concurrent .ConcurrentMap ;
12
14
import java .util .concurrent .LinkedBlockingQueue ;
13
15
import java .util .concurrent .atomic .AtomicBoolean ;
14
16
import java .util .concurrent .atomic .AtomicReference ;
@@ -20,6 +22,7 @@ public class PushStatusTrackerImp implements PushStatusTracker {
20
22
private final AtomicReference <SSEClient .StatusMessage > _sseStatus = new AtomicReference <>(SSEClient .StatusMessage .INITIALIZATION_IN_PROGRESS );
21
23
private final AtomicReference <ControlType > _backendStatus = new AtomicReference <>(ControlType .STREAMING_RESUMED );
22
24
private final LinkedBlockingQueue <PushManager .Status > _statusMessages ;
25
+ private final ConcurrentMap <String , Integer > regions = Maps .newConcurrentMap ();
23
26
24
27
public PushStatusTrackerImp (LinkedBlockingQueue <PushManager .Status > statusMessages ) {
25
28
_statusMessages = statusMessages ;
@@ -98,9 +101,11 @@ public void handleIncomingOccupancyEvent(OccupancyNotification occupancyNotifica
98
101
_log .debug (String .format ("handleIncomingOccupancyEvent: publishers=%d" , occupancyNotification .getMetrics ().getPublishers ()));
99
102
100
103
int publishers = occupancyNotification .getMetrics ().getPublishers ();
101
- if (publishers <= 0 && _publishersOnline .compareAndSet (true , false ) && _backendStatus .get ().equals (ControlType .STREAMING_RESUMED )) {
104
+ regions .put (occupancyNotification .getChannel (), publishers );
105
+ boolean isPublishers = isPublishers ();
106
+ if (!isPublishers && _publishersOnline .compareAndSet (true , false ) && _backendStatus .get ().equals (ControlType .STREAMING_RESUMED )) {
102
107
_statusMessages .offer (PushManager .Status .STREAMING_DOWN );
103
- } else if (publishers >= 1 && _publishersOnline .compareAndSet (false , true ) && _backendStatus .get ().equals (ControlType .STREAMING_RESUMED )) {
108
+ } else if (isPublishers && _publishersOnline .compareAndSet (false , true ) && _backendStatus .get ().equals (ControlType .STREAMING_RESUMED )) {
104
109
_statusMessages .offer (PushManager .Status .STREAMING_READY );
105
110
}
106
111
}
@@ -114,6 +119,7 @@ public void handleIncomingAblyError(ErrorNotification notification) {
114
119
}
115
120
if (notification .getCode () >= 40140 && notification .getCode () <= 40149 ) {
116
121
_statusMessages .offer (PushManager .Status .STREAMING_BACKOFF );
122
+ return ;
117
123
}
118
124
if (notification .getCode () >= 40000 && notification .getCode () <= 49999 ) {
119
125
_statusMessages .offer (PushManager .Status .STREAMING_OFF );
@@ -129,4 +135,13 @@ public synchronized void forcePushDisable() {
129
135
_backendStatus .set (ControlType .STREAMING_DISABLED );
130
136
_statusMessages .offer (PushManager .Status .STREAMING_OFF );
131
137
}
138
+
139
+ private boolean isPublishers () {
140
+ for (Integer publisher : regions .values ()) {
141
+ if (publisher > 0 ) {
142
+ return true ;
143
+ }
144
+ }
145
+ return false ;
146
+ }
132
147
}
0 commit comments