Skip to content

Commit 27eaa84

Browse files
Merge pull request #208 from splitio/stream-retry-delay
Adding delay on streaming retry
2 parents c1676f1 + 1438f78 commit 27eaa84

File tree

5 files changed

+48
-10
lines changed

5 files changed

+48
-10
lines changed

client/src/main/java/io/split/client/SplitClientConfig.java

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ public class SplitClientConfig {
4646
private final int _streamingReconnectBackoffBase;
4747
private final String _authServiceURL;
4848
private final String _streamingServiceURL;
49+
private final int _onDemandFetchRetryDelayMs;
4950

5051
// Proxy configs
5152
private final HttpHost _proxy;
@@ -89,7 +90,8 @@ private SplitClientConfig(String endpoint,
8990
int authRetryBackoffBase,
9091
int streamingReconnectBackoffBase,
9192
String authServiceURL,
92-
String streamingServiceURL) {
93+
String streamingServiceURL,
94+
int onDemandFetchRetryDelayMs) {
9395
_endpoint = endpoint;
9496
_eventsEndpoint = eventsEndpoint;
9597
_featuresRefreshRate = pollForFeatureChangesEveryNSeconds;
@@ -120,6 +122,7 @@ private SplitClientConfig(String endpoint,
120122
_streamingReconnectBackoffBase = streamingReconnectBackoffBase;
121123
_authServiceURL = authServiceURL;
122124
_streamingServiceURL = streamingServiceURL;
125+
_onDemandFetchRetryDelayMs = onDemandFetchRetryDelayMs;
123126

124127
Properties props = new Properties();
125128
try {
@@ -248,6 +251,8 @@ public String streamingServiceURL() {
248251
return _streamingServiceURL;
249252
}
250253

254+
public int streamingRetryDelay() {return _onDemandFetchRetryDelayMs;}
255+
251256
public static final class Builder {
252257

253258
private String _endpoint = "https://sdk.split.io";
@@ -283,6 +288,7 @@ public static final class Builder {
283288
private int _streamingReconnectBackoffBase = 1;
284289
private String _authServiceURL = "https://auth.split.io/api/auth";
285290
private String _streamingServiceURL = "https://streaming.split.io/sse";
291+
private int _onDemandFetchRetryDelayMs = 50;
286292

287293
public Builder() {
288294
}
@@ -674,6 +680,16 @@ public Builder streamingServiceURL(String streamingServiceURL) {
674680
return this;
675681
}
676682

683+
/**
684+
* Set Streaming retry delay.
685+
* @param onDemandFetchRetryDelayMs
686+
* @return
687+
*/
688+
public Builder streamingRetryDelay(int onDemandFetchRetryDelayMs) {
689+
_onDemandFetchRetryDelayMs = onDemandFetchRetryDelayMs;
690+
return this;
691+
}
692+
677693
public SplitClientConfig build() {
678694
if (_featuresRefreshRate < 5 ) {
679695
throw new IllegalArgumentException("featuresRefreshRate must be >= 5: " + _featuresRefreshRate);
@@ -744,6 +760,10 @@ public SplitClientConfig build() {
744760
throw new IllegalArgumentException("streamingServiceURL must not be null");
745761
}
746762

763+
if(_onDemandFetchRetryDelayMs <= 0) {
764+
throw new IllegalStateException("streamingRetryDelay must be > 0");
765+
}
766+
747767
return new SplitClientConfig(
748768
_endpoint,
749769
_eventsEndpoint,
@@ -774,7 +794,8 @@ public SplitClientConfig build() {
774794
_authRetryBackoffBase,
775795
_streamingReconnectBackoffBase,
776796
_authServiceURL,
777-
_streamingServiceURL);
797+
_streamingServiceURL,
798+
_onDemandFetchRetryDelayMs);
778799
}
779800
}
780801
}

client/src/main/java/io/split/client/SplitFactoryImpl.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -141,7 +141,7 @@ public SplitFactoryImpl(String apiToken, SplitClientConfig config) throws URISyn
141141
_eventClient = EventClientImpl.create(_httpclient, _eventsRootTarget, config.eventsQueueSize(), config.eventFlushIntervalInMillis(), config.waitBeforeShutdown());
142142

143143
// SyncManager
144-
_syncManager = SyncManagerImp.build(config.streamingEnabled(), _splitSynchronizationTask, _splitFetcher, _segmentSynchronizationTaskImp, _splitCache, config.authServiceURL(), _httpclient, config.streamingServiceURL(), config.authRetryBackoffBase(), buildSSEdHttpClient(config), _segmentCache);
144+
_syncManager = SyncManagerImp.build(config.streamingEnabled(), _splitSynchronizationTask, _splitFetcher, _segmentSynchronizationTaskImp, _splitCache, config.authServiceURL(), _httpclient, config.streamingServiceURL(), config.authRetryBackoffBase(), buildSSEdHttpClient(config), _segmentCache, config.streamingRetryDelay());
145145
_syncManager.start();
146146

147147
// Evaluator

client/src/main/java/io/split/engine/common/SyncManagerImp.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -59,9 +59,10 @@ public static SyncManagerImp build(boolean streamingEnabledConfig,
5959
String streamingServiceUrl,
6060
int authRetryBackOffBase,
6161
CloseableHttpClient sseHttpClient,
62-
SegmentCache segmentCache) {
62+
SegmentCache segmentCache,
63+
int streamingRetryDelay) {
6364
LinkedBlockingQueue<PushManager.Status> pushMessages = new LinkedBlockingQueue<>();
64-
Synchronizer synchronizer = new SynchronizerImp(splitSynchronizationTask, splitFetcher, segmentSynchronizationTaskImp, splitCache, segmentCache);
65+
Synchronizer synchronizer = new SynchronizerImp(splitSynchronizationTask, splitFetcher, segmentSynchronizationTaskImp, splitCache, segmentCache, streamingRetryDelay);
6566
PushManager pushManager = PushManagerImp.build(synchronizer, streamingServiceUrl, authUrl, httpClient, pushMessages, sseHttpClient);
6667
return new SyncManagerImp(streamingEnabledConfig, synchronizer, pushManager, pushMessages, authRetryBackOffBase);
6768
}

client/src/main/java/io/split/engine/common/SynchronizerImp.java

Lines changed: 20 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -27,17 +27,20 @@ public class SynchronizerImp implements Synchronizer {
2727
private final ScheduledExecutorService _syncAllScheduledExecutorService;
2828
private final SplitCache _splitCache;
2929
private final SegmentCache _segmentCache;
30+
private final int _onDemandFetchRetryDelayMs;
3031

3132
public SynchronizerImp(SplitSynchronizationTask splitSynchronizationTask,
3233
SplitFetcher splitFetcher,
3334
SegmentSynchronizationTask segmentSynchronizationTaskImp,
3435
SplitCache splitCache,
35-
SegmentCache segmentCache) {
36+
SegmentCache segmentCache,
37+
int onDemandFetchRetryDelayMs) {
3638
_splitSynchronizationTask = checkNotNull(splitSynchronizationTask);
3739
_splitFetcher = checkNotNull(splitFetcher);
3840
_segmentSynchronizationTaskImp = checkNotNull(segmentSynchronizationTaskImp);
3941
_splitCache = checkNotNull(splitCache);
4042
_segmentCache = checkNotNull(segmentCache);
43+
_onDemandFetchRetryDelayMs = checkNotNull(onDemandFetchRetryDelayMs);
4144

4245
ThreadFactory splitsThreadFactory = new ThreadFactoryBuilder()
4346
.setDaemon(true)
@@ -70,10 +73,23 @@ public void stopPeriodicFetching() {
7073

7174
@Override
7275
public void refreshSplits(long targetChangeNumber) {
73-
int retries = 1;
74-
while(targetChangeNumber > _splitCache.getChangeNumber() && retries <= RETRIES_NUMBER) {
76+
int retries = RETRIES_NUMBER;
77+
while(targetChangeNumber > _splitCache.getChangeNumber()) {
78+
retries--;
7579
_splitFetcher.forceRefresh(true);
76-
retries++;
80+
if (targetChangeNumber <= _splitCache.getChangeNumber()) {
81+
_log.debug("Refresh completed in %s attempts.", RETRIES_NUMBER - retries);
82+
return;
83+
} else if (retries <= 0) {
84+
_log.warn("No changes fetched after %s attempts.", RETRIES_NUMBER);
85+
return;
86+
}
87+
try {
88+
Thread.sleep(_onDemandFetchRetryDelayMs);
89+
} catch (InterruptedException e) {
90+
Thread.currentThread().interrupt();
91+
_log.debug("Error trying to sleep current Thread.");
92+
}
7793
}
7894
}
7995

client/src/test/java/io/split/engine/common/SynchronizerTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ public void beforeMethod() {
2626
_splitCache = Mockito.mock(SplitCache.class);
2727
_segmentCache = Mockito.mock(SegmentCache.class);
2828

29-
_synchronizer = new SynchronizerImp(_refreshableSplitFetcherTask, _splitFetcher, _segmentFetcher, _splitCache, _segmentCache);
29+
_synchronizer = new SynchronizerImp(_refreshableSplitFetcherTask, _splitFetcher, _segmentFetcher, _splitCache, _segmentCache, 50);
3030
}
3131

3232
@Test

0 commit comments

Comments
 (0)