Skip to content

Commit 1a25c13

Browse files
committed
add retries and cdn bypass logic to segment fetchers
1 parent b785a5d commit 1a25c13

File tree

11 files changed

+149
-62
lines changed

11 files changed

+149
-62
lines changed

client/src/main/java/io/split/client/HttpSegmentChangeFetcher.java

+29-6
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,13 @@
44
import io.split.client.dtos.SegmentChange;
55
import io.split.client.utils.Json;
66
import io.split.client.utils.Utils;
7+
import io.split.engine.common.FetchOptions;
78
import io.split.engine.metrics.Metrics;
89
import io.split.engine.segments.SegmentChangeFetcher;
910
import org.apache.hc.client5.http.classic.methods.HttpGet;
1011
import org.apache.hc.client5.http.impl.classic.CloseableHttpClient;
1112
import org.apache.hc.client5.http.impl.classic.CloseableHttpResponse;
13+
import org.apache.hc.core5.http.Header;
1214
import org.apache.hc.core5.http.io.entity.EntityUtils;
1315
import org.apache.hc.core5.net.URIBuilder;
1416
import org.slf4j.Logger;
@@ -17,6 +19,8 @@
1719
import java.net.URI;
1820
import java.net.URISyntaxException;
1921
import java.nio.charset.StandardCharsets;
22+
import java.util.Arrays;
23+
import java.util.stream.Collectors;
2024

2125
import static com.google.common.base.Preconditions.checkNotNull;
2226

@@ -27,9 +31,13 @@ public final class HttpSegmentChangeFetcher implements SegmentChangeFetcher {
2731
private static final Logger _log = LoggerFactory.getLogger(HttpSegmentChangeFetcher.class);
2832

2933
private static final String SINCE = "since";
34+
private static final String TILL = "till";
3035
private static final String PREFIX = "segmentChangeFetcher";
31-
private static final String NAME_CACHE = "Cache-Control";
32-
private static final String VALUE_CACHE = "no-cache";
36+
private static final String CACHE_CONTROL_HEADER_NAME = "Cache-Control";
37+
private static final String CACHE_CONTROL_HEADER_VALUE = "no-cache";
38+
39+
private static final String HEADER_FASTLY_DEBUG_NAME = "Fastly-Debug";
40+
private static final String HEADER_FASTLY_DEBUG_VALUE = "1";
3341

3442
private final CloseableHttpClient _client;
3543
private final URI _target;
@@ -51,19 +59,34 @@ private HttpSegmentChangeFetcher(CloseableHttpClient client, URI uri, Metrics me
5159
}
5260

5361
@Override
54-
public SegmentChange fetch(String segmentName, long since, boolean addCacheHeader) {
62+
public SegmentChange fetch(String segmentName, long since, FetchOptions options) {
5563
long start = System.currentTimeMillis();
5664

5765
CloseableHttpResponse response = null;
5866

5967
try {
6068
String path = _target.getPath() + "/" + segmentName;
61-
URI uri = new URIBuilder(_target).setPath(path).addParameter(SINCE, "" + since).build();
69+
URIBuilder uriBuilder = new URIBuilder(_target)
70+
.setPath(path)
71+
.addParameter(SINCE, "" + since);
72+
if (options.hasCustomCN()) {
73+
uriBuilder.addParameter(TILL, "" + options.targetCN());
74+
}
75+
76+
URI uri = uriBuilder.build();
6277
HttpGet request = new HttpGet(uri);
63-
if(addCacheHeader) {
64-
request.setHeader(NAME_CACHE, VALUE_CACHE);
78+
79+
if(options.cacheControlHeadersEnabled()) {
80+
request.setHeader(CACHE_CONTROL_HEADER_NAME, CACHE_CONTROL_HEADER_VALUE);
6581
}
82+
83+
if (options.fastlyDebugHeaderEnabled()) {
84+
request.addHeader(HEADER_FASTLY_DEBUG_NAME, HEADER_FASTLY_DEBUG_VALUE);
85+
}
86+
6687
response = _client.execute(request);
88+
options.handleResponseHeaders(Arrays.stream(response.getHeaders())
89+
.collect(Collectors.toMap(Header::getName, Header::getValue)));
6790

6891
int statusCode = response.getCode();
6992

client/src/main/java/io/split/client/jmx/SplitJmxMonitor.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ public boolean forceSyncFeatures() {
4444
public boolean forceSyncSegment(String segmentName) {
4545
SegmentFetcher fetcher = _segmentSynchronizationTask.getFetcher(segmentName);
4646
try{
47-
fetcher.fetch(true);
47+
fetcher.fetch(new FetchOptions.Builder().build());
4848
}
4949
//We are sure this will never happen because getFetcher firts initiate the segment. This try/catch is for safe only.
5050
catch (NullPointerException np){

client/src/main/java/io/split/engine/common/SynchronizerImp.java

+78-21
Original file line numberDiff line numberDiff line change
@@ -103,10 +103,10 @@ private static class SyncResult {
103103
private final int _remainingAttempts;
104104
}
105105

106-
private SyncResult attemptSync(long targetChangeNumber,
107-
FetchOptions opts,
108-
Function<Void, Long> nextWaitMs,
109-
int maxRetries) {
106+
private SyncResult attemptSplitsSync(long targetChangeNumber,
107+
FetchOptions opts,
108+
Function<Void, Long> nextWaitMs,
109+
int maxRetries) {
110110
int remainingAttempts = maxRetries;
111111
while(true) {
112112
remainingAttempts--;
@@ -126,9 +126,9 @@ private SyncResult attemptSync(long targetChangeNumber,
126126
}
127127
}
128128

129-
private void logCdnHeaders(int maxRetries, int remainingAttempts, List<Map<String, String>> headers) {
129+
private void logCdnHeaders(String prefix, int maxRetries, int remainingAttempts, List<Map<String, String>> headers) {
130130
if (maxRetries - remainingAttempts > _failedAttemptsBeforeLogging) {
131-
_log.info(String.format("CDN Debug headers: %s", gson.toJson(headers)));
131+
_log.info(String.format("%s: CDN Debug headers: %s", prefix, gson.toJson(headers)));
132132
}
133133
}
134134

@@ -146,22 +146,22 @@ public void refreshSplits(long targetChangeNumber) {
146146
.responseHeadersCallback(_cdnResponseHeadersLogging ? captor::handle : null)
147147
.build();
148148

149-
SyncResult regularResult = attemptSync(targetChangeNumber, opts,
149+
SyncResult regularResult = attemptSplitsSync(targetChangeNumber, opts,
150150
(discard) -> (long) _onDemandFetchRetryDelayMs, _onDemandFetchMaxRetries);
151151

152152
int attempts = _onDemandFetchMaxRetries - regularResult.remainingAttempts();
153153
if (regularResult.success()) {
154154
_log.debug(String.format("Refresh completed in %s attempts.", attempts));
155155
if (_cdnResponseHeadersLogging) {
156-
logCdnHeaders(_onDemandFetchMaxRetries , regularResult.remainingAttempts(), captor.get());
156+
logCdnHeaders("[splits]", _onDemandFetchMaxRetries , regularResult.remainingAttempts(), captor.get());
157157
}
158158
return;
159159
}
160160

161161
_log.info(String.format("No changes fetched after %s attempts. Will retry bypassing CDN.", attempts));
162162
FetchOptions withCdnBypass = new FetchOptions.Builder(opts).targetChangeNumber(targetChangeNumber).build();
163163
Backoff backoff = new Backoff(ON_DEMAND_FETCH_BACKOFF_BASE_MS, ON_DEMAND_FETCH_BACKOFF_MAX_WAIT_MS);
164-
SyncResult withCDNBypassed = attemptSync(targetChangeNumber, withCdnBypass,
164+
SyncResult withCDNBypassed = attemptSplitsSync(targetChangeNumber, withCdnBypass,
165165
(discard) -> backoff.interval(), ON_DEMAND_FETCH_BACKOFF_MAX_RETRIES);
166166

167167
int withoutCDNAttempts = ON_DEMAND_FETCH_BACKOFF_MAX_RETRIES - withCDNBypassed._remainingAttempts;
@@ -172,7 +172,7 @@ public void refreshSplits(long targetChangeNumber) {
172172
}
173173

174174
if (_cdnResponseHeadersLogging) {
175-
logCdnHeaders(_onDemandFetchMaxRetries + ON_DEMAND_FETCH_BACKOFF_MAX_RETRIES,
175+
logCdnHeaders("[splits]", _onDemandFetchMaxRetries + ON_DEMAND_FETCH_BACKOFF_MAX_RETRIES,
176176
withCDNBypassed.remainingAttempts(), captor.get());
177177
}
178178
}
@@ -185,19 +185,76 @@ public void localKillSplit(String splitName, String defaultTreatment, long newCh
185185
}
186186
}
187187

188-
@Override
189-
public void refreshSegment(String segmentName, long changeNumber) {
190-
int retries = 1;
191-
while(changeNumber > _segmentCache.getChangeNumber(segmentName) && retries <= _onDemandFetchMaxRetries) {
192-
SegmentFetcher fetcher = _segmentSynchronizationTaskImp.getFetcher(segmentName);
193-
try{
194-
fetcher.fetch(true);
188+
public SyncResult attemptSegmentSync(String segmentName,
189+
long targetChangeNumber,
190+
FetchOptions opts,
191+
Function<Void, Long> nextWaitMs,
192+
int maxRetries) {
193+
194+
int remainingAttempts = maxRetries;
195+
SegmentFetcher fetcher = _segmentSynchronizationTaskImp.getFetcher(segmentName);
196+
checkNotNull(fetcher);
197+
198+
while(true) {
199+
remainingAttempts--;
200+
fetcher.fetch(opts);
201+
if (targetChangeNumber <= _segmentCache.getChangeNumber(segmentName)) {
202+
return new SyncResult(true, remainingAttempts);
203+
} else if (remainingAttempts <= 0) {
204+
return new SyncResult(false, remainingAttempts);
195205
}
196-
//We are sure this will never happen because getFetcher firts initiate the segment. This try/catch is for safe only.
197-
catch (NullPointerException np){
198-
throw new NullPointerException();
206+
try {
207+
long howLong = nextWaitMs.apply(null);
208+
Thread.sleep(howLong);
209+
} catch (InterruptedException e) {
210+
Thread.currentThread().interrupt();
211+
_log.debug("Error trying to sleep current Thread.");
212+
}
213+
}
214+
}
215+
216+
@Override
217+
public void refreshSegment(String segmentName, long targetChangeNumber) {
218+
219+
if (targetChangeNumber <= _segmentCache.getChangeNumber(segmentName)) {
220+
return;
221+
}
222+
223+
FastlyHeadersCaptor captor = new FastlyHeadersCaptor();
224+
FetchOptions opts = new FetchOptions.Builder()
225+
.cacheControlHeaders(true)
226+
.fastlyDebugHeader(_cdnResponseHeadersLogging)
227+
.responseHeadersCallback(_cdnResponseHeadersLogging ? captor::handle : null)
228+
.build();
229+
230+
SyncResult regularResult = attemptSegmentSync(segmentName, targetChangeNumber, opts,
231+
(discard) -> (long) _onDemandFetchRetryDelayMs, _onDemandFetchMaxRetries);
232+
233+
int attempts = _onDemandFetchMaxRetries - regularResult.remainingAttempts();
234+
if (regularResult.success()) {
235+
_log.debug(String.format("Segment %s refresh completed in %s attempts.", segmentName, attempts));
236+
if (_cdnResponseHeadersLogging) {
237+
logCdnHeaders(String.format("[segment/%s]", segmentName), _onDemandFetchMaxRetries , regularResult.remainingAttempts(), captor.get());
199238
}
200-
retries++;
239+
return;
240+
}
241+
242+
_log.info(String.format("No changes fetched for segment %s after %s attempts. Will retry bypassing CDN.", segmentName, attempts));
243+
FetchOptions withCdnBypass = new FetchOptions.Builder(opts).targetChangeNumber(targetChangeNumber).build();
244+
Backoff backoff = new Backoff(ON_DEMAND_FETCH_BACKOFF_BASE_MS, ON_DEMAND_FETCH_BACKOFF_MAX_WAIT_MS);
245+
SyncResult withCDNBypassed = attemptSegmentSync(segmentName, targetChangeNumber, withCdnBypass,
246+
(discard) -> backoff.interval(), ON_DEMAND_FETCH_BACKOFF_MAX_RETRIES);
247+
248+
int withoutCDNAttempts = ON_DEMAND_FETCH_BACKOFF_MAX_RETRIES - withCDNBypassed._remainingAttempts;
249+
if (withCDNBypassed.success()) {
250+
_log.debug(String.format("Segment %s refresh completed bypassing the CDN in %s attempts.", segmentName, withoutCDNAttempts));
251+
} else {
252+
_log.debug(String.format("No changes fetched for segment %s after %s attempts with CDN bypassed.", segmentName, withoutCDNAttempts));
253+
}
254+
255+
if (_cdnResponseHeadersLogging) {
256+
logCdnHeaders(String.format("[segment/%s]", segmentName), _onDemandFetchMaxRetries + ON_DEMAND_FETCH_BACKOFF_MAX_RETRIES,
257+
withCDNBypassed.remainingAttempts(), captor.get());
201258
}
202259
}
203260
}

client/src/main/java/io/split/engine/experiments/SplitFetcherImp.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ public SplitFetcherImp(SplitChangeFetcher splitChangeFetcher, SplitParser parser
4646
@Override
4747
public void forceRefresh(FetchOptions options) {
4848
_log.debug("Force Refresh splits starting ...");
49-
final long initialCN = _splitCache.getChangeNumber();
49+
final long INITIAL_CN = _splitCache.getChangeNumber();
5050
try {
5151
while (true) {
5252
long start = _splitCache.getChangeNumber();
@@ -56,7 +56,7 @@ public void forceRefresh(FetchOptions options) {
5656
// If the previous execution was the first one, clear the `cdnBypass` flag
5757
// for the next fetches. (This will clear a local copy of the fetch options,
5858
// not the original object that was passed to this method).
59-
if (initialCN == start) {
59+
if (INITIAL_CN == start) {
6060
options = new FetchOptions.Builder(options).targetChangeNumber(FetchOptions.DEFAULT_TARGET_CHANGENUMBER).build();
6161
}
6262

client/src/main/java/io/split/engine/segments/SegmentChangeFetcher.java

+3-1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package io.split.engine.segments;
22

33
import io.split.client.dtos.SegmentChange;
4+
import io.split.engine.common.FetchOptions;
45

56
/**
67
* Fetches changes in the segment since a reference point.
@@ -22,8 +23,9 @@ public interface SegmentChangeFetcher {
2223
* @param segmentName the name of the segment to fetch.
2324
* @param changesSinceThisChangeNumber a value less than zero implies that the client is
2425
* requesting information on this segment for the first time.
26+
* @param options
2527
* @return SegmentChange
2628
* @throws java.lang.RuntimeException if there was a problem fetching segment changes
2729
*/
28-
SegmentChange fetch(String segmentName, long changesSinceThisChangeNumber, boolean addCacheHeader);
30+
SegmentChange fetch(String segmentName, long changesSinceThisChangeNumber, FetchOptions options);
2931
}

client/src/main/java/io/split/engine/segments/SegmentFetcher.java

+3-1
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,15 @@
11
package io.split.engine.segments;
22

3+
import io.split.engine.common.FetchOptions;
4+
35
/**
46
* Created by adilaijaz on 5/7/15.
57
*/
68
public interface SegmentFetcher {
79
/**
810
* fetch
911
*/
10-
void fetch(boolean addCacheHeader);
12+
void fetch(FetchOptions opts);
1113

1214
void runWhitCacheHeader();
1315

client/src/main/java/io/split/engine/segments/SegmentFetcherImp.java

+16-14
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import io.split.cache.SegmentCache;
44
import io.split.client.dtos.SegmentChange;
55
import io.split.engine.SDKReadinessGates;
6+
import io.split.engine.common.FetchOptions;
67
import org.slf4j.Logger;
78
import org.slf4j.LoggerFactory;
89

@@ -31,9 +32,9 @@ public SegmentFetcherImp(String segmentName, SegmentChangeFetcher segmentChangeF
3132
}
3233

3334
@Override
34-
public void fetch(boolean addCacheHeader){
35+
public void fetch(FetchOptions opts){
3536
try {
36-
callLoopRun(false, addCacheHeader);
37+
callLoopRun(opts);
3738
} catch (Throwable t) {
3839
_log.error("RefreshableSegmentFetcher failed: " + t.getMessage());
3940
if (_log.isDebugEnabled()) {
@@ -42,8 +43,8 @@ public void fetch(boolean addCacheHeader){
4243
}
4344
}
4445

45-
private void runWithoutExceptionHandling(boolean addCacheHeader) {
46-
SegmentChange change = _segmentChangeFetcher.fetch(_segmentName, _segmentCache.getChangeNumber(_segmentName), addCacheHeader);
46+
private void runWithoutExceptionHandling(FetchOptions options) {
47+
SegmentChange change = _segmentChangeFetcher.fetch(_segmentName, _segmentCache.getChangeNumber(_segmentName), options);
4748

4849
if (change == null) {
4950
throw new IllegalStateException("SegmentChange was null");
@@ -109,14 +110,15 @@ private String summarize(List<String> changes) {
109110
return bldr.toString();
110111
}
111112

112-
private void callLoopRun(boolean isFetch, boolean addCacheHeader){
113+
private void callLoopRun(FetchOptions opts){
114+
final long INITIAL_CN = _segmentCache.getChangeNumber(_segmentName);
113115
while (true) {
114116
long start = _segmentCache.getChangeNumber(_segmentName);
115-
runWithoutExceptionHandling(addCacheHeader);
116-
long end = _segmentCache.getChangeNumber(_segmentName);
117-
if (isFetch && _log.isDebugEnabled()) {
118-
_log.debug(_segmentName + " segment fetch before: " + start + ", after: " + _segmentCache.getChangeNumber(_segmentName) /*+ " size: " + _concurrentKeySet.size()*/);
117+
runWithoutExceptionHandling(opts);
118+
if (INITIAL_CN == start) {
119+
opts = new FetchOptions.Builder(opts).targetChangeNumber(FetchOptions.DEFAULT_TARGET_CHANGENUMBER).build();
119120
}
121+
long end = _segmentCache.getChangeNumber(_segmentName);
120122
if (start >= end) {
121123
break;
122124
}
@@ -125,18 +127,18 @@ private void callLoopRun(boolean isFetch, boolean addCacheHeader){
125127

126128
@Override
127129
public void runWhitCacheHeader(){
128-
this.fetchAndUpdate(true);
130+
this.fetchAndUpdate(new FetchOptions.Builder().cacheControlHeaders(true).build());
129131
}
130132

131133
/**
132134
* Calls callLoopRun and after fetchs segment.
133-
* @param addCacheHeader indicates if CacheHeader is required
135+
* @param opts contains all soft of options used when issuing the fetch request
134136
*/
135-
private void fetchAndUpdate(boolean addCacheHeader) {
137+
private void fetchAndUpdate(FetchOptions opts) {
136138
try {
137139
// Do this again in case the previous call errored out.
138140
_gates.registerSegment(_segmentName);
139-
callLoopRun(true, addCacheHeader);
141+
callLoopRun(opts);
140142

141143
_gates.segmentIsReady(_segmentName);
142144

@@ -150,7 +152,7 @@ private void fetchAndUpdate(boolean addCacheHeader) {
150152

151153
@Override
152154
public void fetchAll() {
153-
this.fetchAndUpdate(false);
155+
this.fetchAndUpdate(new FetchOptions.Builder().build());
154156
}
155157

156158

client/src/test/java/io/split/client/HttpSegmentChangeFetcherTest.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import io.split.TestHelper;
44
import io.split.client.dtos.SegmentChange;
5+
import io.split.engine.common.FetchOptions;
56
import io.split.engine.metrics.Metrics;
67
import org.apache.hc.client5.http.impl.classic.CloseableHttpClient;
78
import org.apache.hc.client5.http.impl.classic.HttpClients;
@@ -61,7 +62,7 @@ public void testFetcherWithSpecialCharacters() throws URISyntaxException, IOExce
6162
Metrics.NoopMetrics metrics = new Metrics.NoopMetrics();
6263
HttpSegmentChangeFetcher fetcher = HttpSegmentChangeFetcher.create(httpClientMock, rootTarget, metrics);
6364

64-
SegmentChange change = fetcher.fetch("some_segment", 1234567, true);
65+
SegmentChange change = fetcher.fetch("some_segment", 1234567, new FetchOptions.Builder().build());
6566

6667
Assert.assertNotNull(change);
6768
Assert.assertEquals(1, change.added.size());

0 commit comments

Comments
 (0)