Skip to content

Commit c017e99

Browse files
Merge branch 'development' of github.com:splitio/java-client into cache-update
2 parents daea082 + ad75990 commit c017e99

File tree

9 files changed

+114
-47
lines changed

9 files changed

+114
-47
lines changed

client/src/main/java/io/split/engine/common/PushManager.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,4 +13,5 @@ enum Status {
1313
void stop();
1414
void startWorkers();
1515
void stopWorkers();
16+
void scheduleConnectionReset();
1617
}

client/src/main/java/io/split/engine/common/PushManagerImp.java

Lines changed: 10 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
import io.split.engine.sse.EventSourceClientImp;
99
import io.split.engine.sse.PushStatusTracker;
1010
import io.split.engine.sse.PushStatusTrackerImp;
11+
import io.split.engine.sse.client.SSEClient;
1112
import io.split.engine.sse.dtos.AuthenticationResponse;
1213
import io.split.engine.sse.dtos.SegmentQueueDto;
1314
import io.split.engine.sse.workers.SegmentsWorkerImp;
@@ -24,6 +25,7 @@
2425
import java.util.concurrent.ScheduledExecutorService;
2526
import java.util.concurrent.Executors;
2627
import java.util.concurrent.TimeUnit;
28+
import java.util.concurrent.atomic.AtomicLong;
2729

2830
import static com.google.common.base.Preconditions.checkNotNull;
2931

@@ -32,28 +34,27 @@ public class PushManagerImp implements PushManager {
3234

3335
private final AuthApiClient _authApiClient;
3436
private final EventSourceClient _eventSourceClient;
35-
private final Backoff _backoff;
3637
private final SplitsWorker _splitsWorker;
3738
private final Worker<SegmentQueueDto> _segmentWorker;
3839
private final PushStatusTracker _pushStatusTracker;
3940

4041
private Future<?> _nextTokenRefreshTask;
4142
private final ScheduledExecutorService _scheduledExecutorService;
43+
private AtomicLong _expirationTime;
4244

4345
@VisibleForTesting
4446
/* package private */ PushManagerImp(AuthApiClient authApiClient,
4547
EventSourceClient eventSourceClient,
4648
SplitsWorker splitsWorker,
4749
Worker<SegmentQueueDto> segmentWorker,
48-
Backoff backoff,
4950
PushStatusTracker pushStatusTracker) {
5051

5152
_authApiClient = checkNotNull(authApiClient);
5253
_eventSourceClient = checkNotNull(eventSourceClient);
53-
_backoff = checkNotNull(backoff);
5454
_splitsWorker = splitsWorker;
5555
_segmentWorker = segmentWorker;
5656
_pushStatusTracker = pushStatusTracker;
57+
_expirationTime = new AtomicLong();
5758
_scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryBuilder()
5859
.setDaemon(true)
5960
.setNameFormat("Split-SSERefreshToken-%d")
@@ -64,7 +65,6 @@ public static PushManagerImp build(Synchronizer synchronizer,
6465
String streamingUrl,
6566
String authUrl,
6667
CloseableHttpClient httpClient,
67-
int authRetryBackOffBase,
6868
LinkedBlockingQueue<PushManager.Status> statusMessages,
6969
CloseableHttpClient sseHttpClient) {
7070
SplitsWorker splitsWorker = new SplitsWorkerImp(synchronizer);
@@ -74,7 +74,6 @@ public static PushManagerImp build(Synchronizer synchronizer,
7474
EventSourceClientImp.build(streamingUrl, splitsWorker, segmentWorker, pushStatusTracker, sseHttpClient),
7575
splitsWorker,
7676
segmentWorker,
77-
new Backoff(authRetryBackOffBase),
7877
pushStatusTracker);
7978
}
8079

@@ -83,14 +82,13 @@ public synchronized void start() {
8382
AuthenticationResponse response = _authApiClient.Authenticate();
8483
_log.debug(String.format("Auth service response pushEnabled: %s", response.isPushEnabled()));
8584
if (response.isPushEnabled() && startSse(response.getToken(), response.getChannels())) {
86-
scheduleConnectionReset(response.getExpiration());
87-
_backoff.reset();
85+
_expirationTime.set(response.getExpiration());
8886
return;
8987
}
9088

9189
stop();
9290
if (response.isRetry()) {
93-
scheduleConnectionReset(_backoff.interval());
91+
_pushStatusTracker.handleSseStatus(SSEClient.StatusMessage.RETRYABLE_ERROR);
9492
} else {
9593
_pushStatusTracker.forcePushDisable();
9694
}
@@ -106,13 +104,14 @@ public synchronized void stop() {
106104
}
107105
}
108106

109-
private void scheduleConnectionReset(long time) {
110-
_log.debug(String.format("scheduleNextTokenRefresh in %s SECONDS", time));
107+
@Override
108+
public synchronized void scheduleConnectionReset() {
109+
_log.debug(String.format("scheduleNextTokenRefresh in %s SECONDS", _expirationTime));
111110
_nextTokenRefreshTask = _scheduledExecutorService.schedule(() -> {
112111
_log.debug("Starting scheduleNextTokenRefresh ...");
113112
stop();
114113
start();
115-
}, time, TimeUnit.SECONDS);
114+
}, _expirationTime.get(), TimeUnit.SECONDS);
116115
}
117116

118117
private boolean startSse(String token, String channels) {

client/src/main/java/io/split/engine/common/SyncManagerImp.java

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,12 +29,14 @@ public class SyncManagerImp implements SyncManager {
2929
private final LinkedBlockingQueue<PushManager.Status> _incomingPushStatus;
3030
private final ExecutorService _executorService;
3131
private Future<?> _pushStatusMonitorTask;
32+
private Backoff _backoff;
3233

3334
@VisibleForTesting
3435
/* package private */ SyncManagerImp(boolean streamingEnabledConfig,
3536
Synchronizer synchronizer,
3637
PushManager pushManager,
37-
LinkedBlockingQueue<PushManager.Status> pushMessages) {
38+
LinkedBlockingQueue<PushManager.Status> pushMessages,
39+
int authRetryBackOffBase) {
3840
_streamingEnabledConfig = new AtomicBoolean(streamingEnabledConfig);
3941
_synchronizer = checkNotNull(synchronizer);
4042
_pushManager = checkNotNull(pushManager);
@@ -44,6 +46,7 @@ public class SyncManagerImp implements SyncManager {
4446
.setNameFormat("SPLIT-PushStatusMonitor-%d")
4547
.setDaemon(true)
4648
.build());
49+
_backoff = new Backoff(authRetryBackOffBase);
4750
}
4851

4952
public static SyncManagerImp build(boolean streamingEnabledConfig,
@@ -59,8 +62,8 @@ public static SyncManagerImp build(boolean streamingEnabledConfig,
5962
SegmentCache segmentCache) {
6063
LinkedBlockingQueue<PushManager.Status> pushMessages = new LinkedBlockingQueue<>();
6164
Synchronizer synchronizer = new SynchronizerImp(splitSynchronizationTask, splitFetcher, segmentSynchronizationTaskImp, splitCache, segmentCache);
62-
PushManager pushManager = PushManagerImp.build(synchronizer, streamingServiceUrl, authUrl, httpClient, authRetryBackOffBase, pushMessages, sseHttpClient);
63-
return new SyncManagerImp(streamingEnabledConfig, synchronizer, pushManager, pushMessages);
65+
PushManager pushManager = PushManagerImp.build(synchronizer, streamingServiceUrl, authUrl, httpClient, pushMessages, sseHttpClient);
66+
return new SyncManagerImp(streamingEnabledConfig, synchronizer, pushManager, pushMessages, authRetryBackOffBase);
6467
}
6568

6669
@Override
@@ -105,14 +108,21 @@ private void startPollingMode() {
105108
_synchronizer.stopPeriodicFetching();
106109
_synchronizer.syncAll();
107110
_pushManager.startWorkers();
111+
_pushManager.scheduleConnectionReset();
112+
_backoff.reset();
108113
break;
109114
case STREAMING_DOWN:
110115
_pushManager.stopWorkers();
111116
_synchronizer.startPeriodicFetching();
112117
break;
113118
case STREAMING_BACKOFF:
119+
long howLong = _backoff.interval() * 1000;
120+
_log.error(String.format("Retryable error in streaming subsystem. Switching to polling and retrying in %d seconds", howLong/1000));
114121
_synchronizer.startPeriodicFetching();
115122
_pushManager.stopWorkers();
123+
_pushManager.stop();
124+
Thread.sleep(howLong);
125+
_incomingPushStatus.clear();
116126
_pushManager.start();
117127
break;
118128
case STREAMING_OFF:

client/src/main/java/io/split/engine/sse/EventSourceClientImp.java

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,17 +14,21 @@
1414

1515
import java.net.URI;
1616
import java.net.URISyntaxException;
17+
import java.util.concurrent.atomic.AtomicBoolean;
1718

1819
import static com.google.common.base.Preconditions.checkNotNull;
1920

2021
public class EventSourceClientImp implements EventSourceClient {
2122
private static final Logger _log = LoggerFactory.getLogger(EventSourceClient.class);
23+
private static final String ERROR = "error";
24+
private static final String MESSAGE = "message";
2225

2326
private final String _baseStreamingUrl;
2427
private final NotificationParser _notificationParser;
2528
private final NotificationProcessor _notificationProcessor;
2629
private final SSEClient _sseClient;
2730
private final PushStatusTracker _pushStatusTracker;
31+
private final AtomicBoolean _firstEvent;
2832

2933
@VisibleForTesting
3034
/* package private */ EventSourceClientImp(String baseStreamingUrl,
@@ -41,7 +45,7 @@ public class EventSourceClientImp implements EventSourceClient {
4145
inboundEvent -> { onMessage(inboundEvent); return null; },
4246
status -> { _pushStatusTracker.handleSseStatus(status); return null; },
4347
sseHttpClient);
44-
48+
_firstEvent = new AtomicBoolean();
4549
}
4650

4751
public static EventSourceClientImp build(String baseStreamingUrl,
@@ -63,6 +67,7 @@ public boolean start(String channelList, String token) {
6367
}
6468

6569
try {
70+
_firstEvent.set(false);
6671
return _sseClient.open(buildUri(channelList, token));
6772
} catch (URISyntaxException e) {
6873
_log.error("Error building Streaming URI: " + e.getMessage());
@@ -91,13 +96,16 @@ private void onMessage(RawEvent event) {
9196
try {
9297
String type = event.event();
9398
String payload = event.data();
99+
if(_firstEvent.compareAndSet(false, true) && !ERROR.equals(type)){
100+
_pushStatusTracker.handleSseStatus(SSEClient.StatusMessage.FIRST_EVENT);
101+
}
94102
if (payload.length() > 0) {
95103
_log.debug(String.format("Payload received: %s", payload));
96104
switch (type) {
97-
case "message":
105+
case MESSAGE:
98106
_notificationProcessor.process(_notificationParser.parseMessage(payload));
99107
break;
100-
case "error":
108+
case ERROR:
101109
_pushStatusTracker.handleIncomingAblyError(_notificationParser.parseError(payload));
102110
break;
103111
default:

client/src/main/java/io/split/engine/sse/PushStatusTrackerImp.java

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ public PushStatusTrackerImp(LinkedBlockingQueue<PushManager.Status> statusMessag
2828
_statusMessages = statusMessages;
2929
}
3030

31-
public synchronized void reset() {
31+
private synchronized void reset() {
3232
_publishersOnline.set(true);
3333
_sseStatus.set(SSEClient.StatusMessage.INITIALIZATION_IN_PROGRESS);
3434
_backendStatus.set(ControlType.STREAMING_RESUMED);
@@ -39,11 +39,12 @@ public void handleSseStatus(SSEClient.StatusMessage newStatus) {
3939
_log.debug(String.format("Current status: %s. New status: %s", _sseStatus.get().toString(), newStatus.toString()));
4040

4141
switch(newStatus) {
42-
case CONNECTED:
43-
if (_sseStatus.compareAndSet(SSEClient.StatusMessage.INITIALIZATION_IN_PROGRESS, SSEClient.StatusMessage.CONNECTED)
44-
|| _sseStatus.compareAndSet(SSEClient.StatusMessage.RETRYABLE_ERROR, SSEClient.StatusMessage.CONNECTED)) {
42+
case FIRST_EVENT:
43+
if (SSEClient.StatusMessage.CONNECTED.equals(_sseStatus.get())) {
4544
_statusMessages.offer(PushManager.Status.STREAMING_READY);
4645
}
46+
case CONNECTED:
47+
_sseStatus.compareAndSet(SSEClient.StatusMessage.INITIALIZATION_IN_PROGRESS, SSEClient.StatusMessage.CONNECTED);
4748
break;
4849
case RETRYABLE_ERROR:
4950
if (_sseStatus.compareAndSet(SSEClient.StatusMessage.CONNECTED, SSEClient.StatusMessage.RETRYABLE_ERROR)) {
@@ -135,7 +136,7 @@ public synchronized void forcePushDisable() {
135136
_backendStatus.set(ControlType.STREAMING_DISABLED);
136137
_statusMessages.offer(PushManager.Status.STREAMING_OFF);
137138
}
138-
139+
139140
private boolean isPublishers() {
140141
for(Integer publisher : regions.values()) {
141142
if (publisher > 0) {

client/src/main/java/io/split/engine/sse/client/SSEClient.java

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import java.util.concurrent.ExecutorService;
2020
import java.util.concurrent.Executors;
2121
import java.util.concurrent.TimeUnit;
22+
import java.util.concurrent.atomic.AtomicBoolean;
2223
import java.util.concurrent.atomic.AtomicReference;
2324
import java.util.function.Function;
2425

@@ -31,7 +32,8 @@ public enum StatusMessage {
3132
RETRYABLE_ERROR,
3233
NONRETRYABLE_ERROR,
3334
INITIALIZATION_IN_PROGRESS,
34-
FORCED_STOP
35+
FORCED_STOP,
36+
FIRST_EVENT
3537
}
3638

3739
private enum ConnectionState {
@@ -54,13 +56,15 @@ private enum ConnectionState {
5456
private final AtomicReference<ConnectionState> _state = new AtomicReference<>(ConnectionState.CLOSED);
5557
private final AtomicReference<CloseableHttpResponse> _ongoingResponse = new AtomicReference<>();
5658
private final AtomicReference<HttpGet> _ongoingRequest = new AtomicReference<>();
59+
private AtomicBoolean _forcedStop;
5760

5861
public SSEClient(Function<RawEvent, Void> eventCallback,
5962
Function<StatusMessage, Void> statusCallback,
6063
CloseableHttpClient client) {
6164
_eventCallback = eventCallback;
6265
_statusCallback = statusCallback;
6366
_client = client;
67+
_forcedStop = new AtomicBoolean();
6468
}
6569

6670
public synchronized boolean open(URI uri) {
@@ -90,13 +94,14 @@ public boolean isOpen() {
9094
}
9195

9296
public synchronized void close() {
97+
_forcedStop.set(true);
9398
if (_state.compareAndSet(ConnectionState.OPEN, ConnectionState.CLOSED)) {
9499
if (_ongoingResponse.get() != null) {
95100
try {
96101
_ongoingRequest.get().abort();
97102
_ongoingResponse.get().close();
98103
} catch (IOException e) {
99-
_log.info(String.format("Error closing SSEClient: %s", e.getMessage()));
104+
_log.debug(String.format("SSEClient close forced: %s", e.getMessage()));
100105
}
101106
}
102107
}
@@ -127,9 +132,11 @@ private void connectAndLoop(URI uri, CountDownLatch signal) {
127132
_statusCallback.apply(StatusMessage.RETRYABLE_ERROR);
128133
return;
129134
} catch (IOException exc) { // Other type of connection error
130-
_log.info(String.format("SSE connection ended abruptly: %s. Retrying", exc.getMessage()));
131-
_statusCallback.apply(StatusMessage.RETRYABLE_ERROR);
132-
return;
135+
if(!_forcedStop.get()) {
136+
_log.debug(String.format("SSE connection ended abruptly: %s. Retying", exc.getMessage()));
137+
_statusCallback.apply(StatusMessage.RETRYABLE_ERROR);
138+
return;
139+
}
133140
}
134141
}
135142
} catch (Exception e) { // Any other error non related to the connection disables streaming altogether
@@ -144,6 +151,7 @@ private void connectAndLoop(URI uri, CountDownLatch signal) {
144151

145152
_state.set(ConnectionState.CLOSED);
146153
_log.debug("SSEClient finished.");
154+
_forcedStop.set(false);
147155
}
148156
}
149157

0 commit comments

Comments
 (0)