Skip to content

Commit 868dc77

Browse files
committed
force the cdn to hit origin if stale data is server after N retries
1 parent 0565cfd commit 868dc77

File tree

4 files changed

+106
-29
lines changed

4 files changed

+106
-29
lines changed

client/src/main/java/io/split/client/HttpSplitChangeFetcher.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ public final class HttpSplitChangeFetcher implements SplitChangeFetcher {
3131
private static final Logger _log = LoggerFactory.getLogger(HttpSplitChangeFetcher.class);
3232

3333
private static final String SINCE = "since";
34+
private static final String TILL = "till";
3435
private static final String PREFIX = "splitChangeFetcher";
3536

3637
private static final String HEADER_CACHE_CONTROL_NAME = "Cache-Control";
@@ -58,6 +59,10 @@ private HttpSplitChangeFetcher(CloseableHttpClient client, URI uri, Metrics metr
5859
checkNotNull(_target);
5960
}
6061

62+
long makeRandomTill() {
63+
return (-1)*(int)Math.floor(Math.random()*(Math.pow(2, 63)));
64+
}
65+
6166
@Override
6267
public SplitChange fetch(long since, FetchOptions options) {
6368

@@ -66,7 +71,11 @@ public SplitChange fetch(long since, FetchOptions options) {
6671
CloseableHttpResponse response = null;
6772

6873
try {
69-
URI uri = new URIBuilder(_target).addParameter(SINCE, "" + since).build();
74+
URIBuilder uriBuilder = new URIBuilder(_target).addParameter(SINCE, "" + since);
75+
if (options.cdnBypass()) {
76+
uriBuilder.addParameter(TILL, "" + makeRandomTill());
77+
}
78+
URI uri = uriBuilder.build();
7079

7180
HttpGet request = new HttpGet(uri);
7281
if(options.cacheControlHeadersEnabled()) {

client/src/main/java/io/split/engine/common/FetchOptions.java

Lines changed: 22 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,22 @@
11
package io.split.engine.common;
22

3-
import io.split.engine.matchers.AttributeMatcher;
4-
import org.checkerframework.checker.units.qual.A;
5-
63
import java.util.Map;
74
import java.util.Objects;
85
import java.util.function.Function;
96

107
public class FetchOptions {
118

129
public static class Builder {
10+
1311
public Builder() {}
1412

13+
public Builder(FetchOptions opts) {
14+
_cdnBypass = opts._cdnBypass;
15+
_cacheControlHeaders = opts._cacheControlHeaders;
16+
_fastlyDebugHeader = opts._fastlyDebugHeader;
17+
_responseHeadersCallback = opts._responseHeadersCallback;
18+
}
19+
1520
public Builder cacheControlHeaders(boolean on) {
1621
_cacheControlHeaders = on;
1722
return this;
@@ -27,10 +32,16 @@ public Builder responseHeadersCallback(Function<Map<String, String>, Void> callb
2732
return this;
2833
}
2934

35+
public Builder cdnBypass(boolean bypass) {
36+
_cdnBypass = bypass;
37+
return this;
38+
}
39+
3040
public FetchOptions build() {
31-
return new FetchOptions(_cacheControlHeaders, _responseHeadersCallback, _fastlyDebugHeader);
41+
return new FetchOptions(_cacheControlHeaders, _cdnBypass, _responseHeadersCallback, _fastlyDebugHeader);
3242
}
3343

44+
private boolean _cdnBypass = false;
3445
private boolean _cacheControlHeaders = false;
3546
private boolean _fastlyDebugHeader = false;
3647
private Function<Map<String, String>, Void> _responseHeadersCallback = null;
@@ -44,16 +55,21 @@ public boolean fastlyDebugHeaderEnabled() {
4455
return _fastlyDebugHeader;
4556
}
4657

58+
public boolean cdnBypass() { return _cdnBypass; }
59+
4760
public void handleResponseHeaders(Map<String, String> headers) {
4861
if (Objects.isNull(_responseHeadersCallback) || Objects.isNull(headers)) {
4962
return;
5063
}
5164
_responseHeadersCallback.apply(headers);
5265
}
5366

54-
private FetchOptions(boolean cacheControlHeaders, Function<Map<String, String>, Void> responseHeadersCallback,
67+
private FetchOptions(boolean cacheControlHeaders,
68+
boolean cdnBypass,
69+
Function<Map<String, String>, Void> responseHeadersCallback,
5570
boolean fastlyDebugHeader) {
5671
_cacheControlHeaders = cacheControlHeaders;
72+
_cdnBypass = cdnBypass;
5773
_responseHeadersCallback = responseHeadersCallback;
5874
_fastlyDebugHeader = fastlyDebugHeader;
5975
}
@@ -78,5 +94,6 @@ public int hashCode() {
7894

7995
private final boolean _cacheControlHeaders;
8096
private final boolean _fastlyDebugHeader;
97+
private final boolean _cdnBypass;
8198
private final Function<Map<String, String>, Void> _responseHeadersCallback;
8299
}

client/src/main/java/io/split/engine/common/SynchronizerImp.java

Lines changed: 66 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -12,15 +12,18 @@
1212
import org.slf4j.Logger;
1313
import org.slf4j.LoggerFactory;
1414

15-
import java.util.concurrent.Executors;
16-
import java.util.concurrent.ScheduledExecutorService;
17-
import java.util.concurrent.ThreadFactory;
18-
import java.util.concurrent.TimeUnit;
15+
import java.util.List;
16+
import java.util.Map;
17+
import java.util.concurrent.*;
18+
import java.util.function.Function;
1919

2020
import static com.google.common.base.Preconditions.checkNotNull;
2121

2222
public class SynchronizerImp implements Synchronizer {
2323

24+
private static final long ON_DEMAND_FETCH_BACKOFF_BASE_MS = 10000; //backoff base starting at 10 seconds (!)
25+
private static final int ON_DEMAND_FETCH_BACKOFF_MAX_RETRIES = 10;
26+
2427
private static final Logger _log = LoggerFactory.getLogger(Synchronizer.class);
2528
private final SplitSynchronizationTask _splitSynchronizationTask;
2629
private final SplitFetcher _splitFetcher;
@@ -83,42 +86,82 @@ public void stopPeriodicFetching() {
8386
_segmentSynchronizationTaskImp.stop();
8487
}
8588

86-
@Override
87-
public void refreshSplits(long targetChangeNumber) {
89+
private static class SyncResult {
8890

89-
if (targetChangeNumber <= _splitCache.getChangeNumber()) {
90-
return;
91+
/* package private */ SyncResult(boolean success, int remainingAttempts) {
92+
_success = success;
93+
_remainingAttempts = remainingAttempts;
9194
}
9295

93-
FastlyHeadersCaptor captor = new FastlyHeadersCaptor();
94-
FetchOptions opts = new FetchOptions.Builder()
95-
.cacheControlHeaders(true)
96-
.fastlyDebugHeader(_cdnResponseHeadersLogging)
97-
.responseHeadersCallback(_cdnResponseHeadersLogging ? captor::handle : null)
98-
.build();
96+
public boolean success() { return _success; }
97+
public int remainingAttempts() { return _remainingAttempts; }
9998

100-
int remainingAttempts = _onDemandFetchMaxRetries;
99+
private final boolean _success;
100+
private final int _remainingAttempts;
101+
}
102+
103+
private SyncResult attemptSync(long targetChangeNumber,
104+
FetchOptions opts,
105+
Function<Void, Long> nextWaitMs,
106+
int maxRetries) {
107+
int remainingAttempts = maxRetries;
101108
while(true) {
102109
remainingAttempts--;
103110
_splitFetcher.forceRefresh(opts);
104111
if (targetChangeNumber <= _splitCache.getChangeNumber()) {
105-
_log.debug(String.format("Refresh completed in %s attempts.", _onDemandFetchMaxRetries - remainingAttempts));
106-
break;
112+
return new SyncResult(true, remainingAttempts);
107113
} else if (remainingAttempts <= 0) {
108-
_log.info(String.format("No changes fetched after %s attempts.", _onDemandFetchMaxRetries));
109-
break;
114+
_log.info(String.format("No changes fetched after %s attempts.", maxRetries));
115+
return new SyncResult(false, remainingAttempts);
110116
}
111117
try {
112-
Thread.sleep(_onDemandFetchRetryDelayMs);
118+
Thread.sleep(nextWaitMs.apply(null));
113119
} catch (InterruptedException e) {
114120
Thread.currentThread().interrupt();
115121
_log.debug("Error trying to sleep current Thread.");
116122
}
117123
}
124+
}
125+
126+
private void logCdnHeaders(int maxRetries, int remainingAttempts, List<Map<String, String>> headers) {
127+
if (maxRetries - remainingAttempts > _failedAttemptsBeforeLogging) {
128+
_log.info(String.format("CDN Debug headers: %s", gson.toJson(headers)));
129+
}
130+
}
131+
132+
@Override
133+
public void refreshSplits(long targetChangeNumber) {
134+
135+
if (targetChangeNumber <= _splitCache.getChangeNumber()) {
136+
return;
137+
}
138+
139+
FastlyHeadersCaptor captor = new FastlyHeadersCaptor();
140+
FetchOptions opts = new FetchOptions.Builder()
141+
.cacheControlHeaders(true)
142+
.fastlyDebugHeader(_cdnResponseHeadersLogging)
143+
.responseHeadersCallback(_cdnResponseHeadersLogging ? captor::handle : null)
144+
.build();
145+
146+
SyncResult regularResult = attemptSync(targetChangeNumber, opts,
147+
(discard) -> (long) _onDemandFetchRetryDelayMs, _onDemandFetchMaxRetries);
148+
149+
if (regularResult.success()) {
150+
_log.debug(String.format("Refresh completed in %s attempts.", _onDemandFetchMaxRetries - regularResult.remainingAttempts()));
151+
if (_cdnResponseHeadersLogging) {
152+
logCdnHeaders(_onDemandFetchMaxRetries , regularResult.remainingAttempts(), captor.get());
153+
}
154+
return;
155+
}
118156

119-
if (_cdnResponseHeadersLogging &&
120-
(_onDemandFetchMaxRetries - remainingAttempts) > _failedAttemptsBeforeLogging) {
121-
_log.info(String.format("CDN Debug headers: %s", gson.toJson(captor.get())));
157+
FetchOptions withCdnBypass = new FetchOptions.Builder(opts).cdnBypass(true).build();
158+
Backoff backoff = new Backoff(ON_DEMAND_FETCH_BACKOFF_BASE_MS);
159+
SyncResult withCDNBypassed = attemptSync(targetChangeNumber, withCdnBypass,
160+
(discard) -> backoff.interval(), ON_DEMAND_FETCH_BACKOFF_MAX_RETRIES);
161+
162+
if (_cdnResponseHeadersLogging) {
163+
logCdnHeaders(_onDemandFetchMaxRetries + ON_DEMAND_FETCH_BACKOFF_MAX_RETRIES,
164+
withCDNBypassed.remainingAttempts(), captor.get());
122165
}
123166
}
124167

client/src/main/java/io/split/engine/experiments/SplitFetcherImp.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,12 +46,20 @@ public SplitFetcherImp(SplitChangeFetcher splitChangeFetcher, SplitParser parser
4646
@Override
4747
public void forceRefresh(FetchOptions options) {
4848
_log.debug("Force Refresh splits starting ...");
49+
final long initialCN = _splitCache.getChangeNumber();
4950
try {
5051
while (true) {
5152
long start = _splitCache.getChangeNumber();
5253
runWithoutExceptionHandling(options);
5354
long end = _splitCache.getChangeNumber();
5455

56+
// If the previous execution was the first one, clear the `cdnBypass` flag
57+
// for the next fetches. (This will clear a local copy of the fetch options,
58+
// not the original object that was passed to this method).
59+
if (initialCN == start) {
60+
options = new FetchOptions.Builder(options).cdnBypass(false).build();
61+
}
62+
5563
if (start >= end) {
5664
break;
5765
}

0 commit comments

Comments
 (0)