Skip to content

Commit e0bf264

Browse files
authored
Merge pull request #223 from splitio/feat/cdn_bypass
force the cdn to hit origin if stale data is server after N retries
2 parents 0565cfd + ccc5c40 commit e0bf264

File tree

13 files changed

+328
-40
lines changed

13 files changed

+328
-40
lines changed

client/pom.xml

+1-1
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
<parent>
66
<groupId>io.split.client</groupId>
77
<artifactId>java-client-parent</artifactId>
8-
<version>4.1.7-rc3</version>
8+
<version>4.1.7-rc4</version>
99
</parent>
1010
<artifactId>java-client</artifactId>
1111
<packaging>jar</packaging>

client/src/main/java/io/split/client/HttpSplitChangeFetcher.java

+10-1
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ public final class HttpSplitChangeFetcher implements SplitChangeFetcher {
3131
private static final Logger _log = LoggerFactory.getLogger(HttpSplitChangeFetcher.class);
3232

3333
private static final String SINCE = "since";
34+
private static final String TILL = "till";
3435
private static final String PREFIX = "splitChangeFetcher";
3536

3637
private static final String HEADER_CACHE_CONTROL_NAME = "Cache-Control";
@@ -58,6 +59,10 @@ private HttpSplitChangeFetcher(CloseableHttpClient client, URI uri, Metrics metr
5859
checkNotNull(_target);
5960
}
6061

62+
long makeRandomTill() {
63+
return (-1)*(int)Math.floor(Math.random()*(Math.pow(2, 63)));
64+
}
65+
6166
@Override
6267
public SplitChange fetch(long since, FetchOptions options) {
6368

@@ -66,7 +71,11 @@ public SplitChange fetch(long since, FetchOptions options) {
6671
CloseableHttpResponse response = null;
6772

6873
try {
69-
URI uri = new URIBuilder(_target).addParameter(SINCE, "" + since).build();
74+
URIBuilder uriBuilder = new URIBuilder(_target).addParameter(SINCE, "" + since);
75+
if (options.cdnBypass()) {
76+
uriBuilder.addParameter(TILL, "" + makeRandomTill());
77+
}
78+
URI uri = uriBuilder.build();
7079

7180
HttpGet request = new HttpGet(uri);
7281
if(options.cacheControlHeadersEnabled()) {

client/src/main/java/io/split/engine/common/Backoff.java

+8-2
Original file line numberDiff line numberDiff line change
@@ -5,20 +5,26 @@
55
import static com.google.common.base.Preconditions.checkNotNull;
66

77
public class Backoff {
8-
private static final long BACKOFF_MAX_SECONDS_ALLOWED = 1800;
8+
private static final long BACKOFF_MAX_ALLOWED = 1800;
99

1010
private final long _backoffBase;
1111
private AtomicInteger _attempt;
12+
private final long _maxAllowed;
1213

1314
public Backoff(long backoffBase) {
15+
this(backoffBase, BACKOFF_MAX_ALLOWED);
16+
}
17+
18+
public Backoff(long backoffBase, long maxAllowed) {
1419
_backoffBase = checkNotNull(backoffBase);
1520
_attempt = new AtomicInteger(0);
21+
_maxAllowed = maxAllowed;
1622
}
1723

1824
public long interval() {
1925
long interval = _backoffBase * (long) Math.pow(2, _attempt.getAndIncrement());
2026

21-
return interval >= BACKOFF_MAX_SECONDS_ALLOWED ? BACKOFF_MAX_SECONDS_ALLOWED : interval;
27+
return interval >= _maxAllowed ? BACKOFF_MAX_ALLOWED : interval;
2228
}
2329

2430
public synchronized void reset() {

client/src/main/java/io/split/engine/common/FetchOptions.java

+22-5
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,22 @@
11
package io.split.engine.common;
22

3-
import io.split.engine.matchers.AttributeMatcher;
4-
import org.checkerframework.checker.units.qual.A;
5-
63
import java.util.Map;
74
import java.util.Objects;
85
import java.util.function.Function;
96

107
public class FetchOptions {
118

129
public static class Builder {
10+
1311
public Builder() {}
1412

13+
public Builder(FetchOptions opts) {
14+
_cdnBypass = opts._cdnBypass;
15+
_cacheControlHeaders = opts._cacheControlHeaders;
16+
_fastlyDebugHeader = opts._fastlyDebugHeader;
17+
_responseHeadersCallback = opts._responseHeadersCallback;
18+
}
19+
1520
public Builder cacheControlHeaders(boolean on) {
1621
_cacheControlHeaders = on;
1722
return this;
@@ -27,10 +32,16 @@ public Builder responseHeadersCallback(Function<Map<String, String>, Void> callb
2732
return this;
2833
}
2934

35+
public Builder cdnBypass(boolean bypass) {
36+
_cdnBypass = bypass;
37+
return this;
38+
}
39+
3040
public FetchOptions build() {
31-
return new FetchOptions(_cacheControlHeaders, _responseHeadersCallback, _fastlyDebugHeader);
41+
return new FetchOptions(_cacheControlHeaders, _cdnBypass, _responseHeadersCallback, _fastlyDebugHeader);
3242
}
3343

44+
private boolean _cdnBypass = false;
3445
private boolean _cacheControlHeaders = false;
3546
private boolean _fastlyDebugHeader = false;
3647
private Function<Map<String, String>, Void> _responseHeadersCallback = null;
@@ -44,16 +55,21 @@ public boolean fastlyDebugHeaderEnabled() {
4455
return _fastlyDebugHeader;
4556
}
4657

58+
public boolean cdnBypass() { return _cdnBypass; }
59+
4760
public void handleResponseHeaders(Map<String, String> headers) {
4861
if (Objects.isNull(_responseHeadersCallback) || Objects.isNull(headers)) {
4962
return;
5063
}
5164
_responseHeadersCallback.apply(headers);
5265
}
5366

54-
private FetchOptions(boolean cacheControlHeaders, Function<Map<String, String>, Void> responseHeadersCallback,
67+
private FetchOptions(boolean cacheControlHeaders,
68+
boolean cdnBypass,
69+
Function<Map<String, String>, Void> responseHeadersCallback,
5570
boolean fastlyDebugHeader) {
5671
_cacheControlHeaders = cacheControlHeaders;
72+
_cdnBypass = cdnBypass;
5773
_responseHeadersCallback = responseHeadersCallback;
5874
_fastlyDebugHeader = fastlyDebugHeader;
5975
}
@@ -78,5 +94,6 @@ public int hashCode() {
7894

7995
private final boolean _cacheControlHeaders;
8096
private final boolean _fastlyDebugHeader;
97+
private final boolean _cdnBypass;
8198
private final Function<Map<String, String>, Void> _responseHeadersCallback;
8299
}

client/src/main/java/io/split/engine/common/SynchronizerImp.java

+71-24
Original file line numberDiff line numberDiff line change
@@ -12,15 +12,21 @@
1212
import org.slf4j.Logger;
1313
import org.slf4j.LoggerFactory;
1414

15-
import java.util.concurrent.Executors;
16-
import java.util.concurrent.ScheduledExecutorService;
17-
import java.util.concurrent.ThreadFactory;
18-
import java.util.concurrent.TimeUnit;
15+
import java.util.List;
16+
import java.util.Map;
17+
import java.util.concurrent.*;
18+
import java.util.function.Function;
1919

2020
import static com.google.common.base.Preconditions.checkNotNull;
2121

2222
public class SynchronizerImp implements Synchronizer {
2323

24+
// The boxing here IS necessary, so that the constants are not inlined by the compiler
25+
// and can be modified for the test (we don't want to wait that much in an UT)
26+
private static final long ON_DEMAND_FETCH_BACKOFF_BASE_MS = new Long(10000); //backoff base starting at 10 seconds (!)
27+
private static final long ON_DEMAND_FETCH_BACKOFF_MAX_WAIT_MS = new Long(60000); // don't sleep for more than 1 second
28+
private static final int ON_DEMAND_FETCH_BACKOFF_MAX_RETRIES = 10;
29+
2430
private static final Logger _log = LoggerFactory.getLogger(Synchronizer.class);
2531
private final SplitSynchronizationTask _splitSynchronizationTask;
2632
private final SplitFetcher _splitFetcher;
@@ -49,7 +55,7 @@ public SynchronizerImp(SplitSynchronizationTask splitSynchronizationTask,
4955
_segmentSynchronizationTaskImp = checkNotNull(segmentSynchronizationTaskImp);
5056
_splitCache = checkNotNull(splitCache);
5157
_segmentCache = checkNotNull(segmentCache);
52-
_onDemandFetchRetryDelayMs = checkNotNull(onDemandFetchRetryDelayMs);
58+
_onDemandFetchRetryDelayMs = onDemandFetchRetryDelayMs;
5359
_cdnResponseHeadersLogging = cdnResponseHeadersLogging;
5460
_onDemandFetchMaxRetries = onDemandFetchMaxRetries;
5561
_failedAttemptsBeforeLogging = failedAttemptsBeforeLogging;
@@ -83,42 +89,83 @@ public void stopPeriodicFetching() {
8389
_segmentSynchronizationTaskImp.stop();
8490
}
8591

86-
@Override
87-
public void refreshSplits(long targetChangeNumber) {
92+
private static class SyncResult {
8893

89-
if (targetChangeNumber <= _splitCache.getChangeNumber()) {
90-
return;
94+
/* package private */ SyncResult(boolean success, int remainingAttempts) {
95+
_success = success;
96+
_remainingAttempts = remainingAttempts;
9197
}
9298

93-
FastlyHeadersCaptor captor = new FastlyHeadersCaptor();
94-
FetchOptions opts = new FetchOptions.Builder()
95-
.cacheControlHeaders(true)
96-
.fastlyDebugHeader(_cdnResponseHeadersLogging)
97-
.responseHeadersCallback(_cdnResponseHeadersLogging ? captor::handle : null)
98-
.build();
99+
public boolean success() { return _success; }
100+
public int remainingAttempts() { return _remainingAttempts; }
101+
102+
private final boolean _success;
103+
private final int _remainingAttempts;
104+
}
99105

100-
int remainingAttempts = _onDemandFetchMaxRetries;
106+
private SyncResult attemptSync(long targetChangeNumber,
107+
FetchOptions opts,
108+
Function<Void, Long> nextWaitMs,
109+
int maxRetries) {
110+
int remainingAttempts = maxRetries;
101111
while(true) {
102112
remainingAttempts--;
103113
_splitFetcher.forceRefresh(opts);
104114
if (targetChangeNumber <= _splitCache.getChangeNumber()) {
105-
_log.debug(String.format("Refresh completed in %s attempts.", _onDemandFetchMaxRetries - remainingAttempts));
106-
break;
115+
return new SyncResult(true, remainingAttempts);
107116
} else if (remainingAttempts <= 0) {
108-
_log.info(String.format("No changes fetched after %s attempts.", _onDemandFetchMaxRetries));
109-
break;
117+
_log.info(String.format("No changes fetched after %s attempts.", maxRetries));
118+
return new SyncResult(false, remainingAttempts);
110119
}
111120
try {
112-
Thread.sleep(_onDemandFetchRetryDelayMs);
121+
long howLong = nextWaitMs.apply(null);
122+
Thread.sleep(howLong);
113123
} catch (InterruptedException e) {
114124
Thread.currentThread().interrupt();
115125
_log.debug("Error trying to sleep current Thread.");
116126
}
117127
}
128+
}
129+
130+
private void logCdnHeaders(int maxRetries, int remainingAttempts, List<Map<String, String>> headers) {
131+
if (maxRetries - remainingAttempts > _failedAttemptsBeforeLogging) {
132+
_log.info(String.format("CDN Debug headers: %s", gson.toJson(headers)));
133+
}
134+
}
135+
136+
@Override
137+
public void refreshSplits(long targetChangeNumber) {
138+
139+
if (targetChangeNumber <= _splitCache.getChangeNumber()) {
140+
return;
141+
}
142+
143+
FastlyHeadersCaptor captor = new FastlyHeadersCaptor();
144+
FetchOptions opts = new FetchOptions.Builder()
145+
.cacheControlHeaders(true)
146+
.fastlyDebugHeader(_cdnResponseHeadersLogging)
147+
.responseHeadersCallback(_cdnResponseHeadersLogging ? captor::handle : null)
148+
.build();
149+
150+
SyncResult regularResult = attemptSync(targetChangeNumber, opts,
151+
(discard) -> (long) _onDemandFetchRetryDelayMs, _onDemandFetchMaxRetries);
152+
153+
if (regularResult.success()) {
154+
_log.debug(String.format("Refresh completed in %s attempts.", _onDemandFetchMaxRetries - regularResult.remainingAttempts()));
155+
if (_cdnResponseHeadersLogging) {
156+
logCdnHeaders(_onDemandFetchMaxRetries , regularResult.remainingAttempts(), captor.get());
157+
}
158+
return;
159+
}
160+
161+
FetchOptions withCdnBypass = new FetchOptions.Builder(opts).cdnBypass(true).build();
162+
Backoff backoff = new Backoff(ON_DEMAND_FETCH_BACKOFF_BASE_MS, ON_DEMAND_FETCH_BACKOFF_MAX_WAIT_MS);
163+
SyncResult withCDNBypassed = attemptSync(targetChangeNumber, withCdnBypass,
164+
(discard) -> backoff.interval(), ON_DEMAND_FETCH_BACKOFF_MAX_RETRIES);
118165

119-
if (_cdnResponseHeadersLogging &&
120-
(_onDemandFetchMaxRetries - remainingAttempts) > _failedAttemptsBeforeLogging) {
121-
_log.info(String.format("CDN Debug headers: %s", gson.toJson(captor.get())));
166+
if (_cdnResponseHeadersLogging) {
167+
logCdnHeaders(_onDemandFetchMaxRetries + ON_DEMAND_FETCH_BACKOFF_MAX_RETRIES,
168+
withCDNBypassed.remainingAttempts(), captor.get());
122169
}
123170
}
124171

client/src/main/java/io/split/engine/experiments/SplitFetcherImp.java

+8
Original file line numberDiff line numberDiff line change
@@ -46,12 +46,20 @@ public SplitFetcherImp(SplitChangeFetcher splitChangeFetcher, SplitParser parser
4646
@Override
4747
public void forceRefresh(FetchOptions options) {
4848
_log.debug("Force Refresh splits starting ...");
49+
final long initialCN = _splitCache.getChangeNumber();
4950
try {
5051
while (true) {
5152
long start = _splitCache.getChangeNumber();
5253
runWithoutExceptionHandling(options);
5354
long end = _splitCache.getChangeNumber();
5455

56+
// If the previous execution was the first one, clear the `cdnBypass` flag
57+
// for the next fetches. (This will clear a local copy of the fetch options,
58+
// not the original object that was passed to this method).
59+
if (initialCN == start) {
60+
options = new FetchOptions.Builder(options).cdnBypass(false).build();
61+
}
62+
5563
if (start >= end) {
5664
break;
5765
}

client/src/test/java/io/split/TestHelper.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ public static CloseableHttpClient mockHttpClient(String jsonName, int httpStatus
2626
return httpClientMock;
2727
}
2828

29-
private static CloseableHttpResponse classicResponseToCloseableMock(ClassicHttpResponse mocked) throws InvocationTargetException, IllegalAccessException, NoSuchMethodException {
29+
public static CloseableHttpResponse classicResponseToCloseableMock(ClassicHttpResponse mocked) throws InvocationTargetException, IllegalAccessException, NoSuchMethodException {
3030
Method adaptMethod = CloseableHttpResponse.class.getDeclaredMethod("adapt", ClassicHttpResponse.class);
3131
adaptMethod.setAccessible(true);
3232
return (CloseableHttpResponse) adaptMethod.invoke(null, mocked);

client/src/test/java/io/split/client/HttpSplitChangeFetcherTest.java

+52-1
Original file line numberDiff line numberDiff line change
@@ -5,19 +5,30 @@
55
import io.split.client.dtos.SplitChange;
66
import io.split.engine.common.FetchOptions;
77
import io.split.engine.metrics.Metrics;
8+
import org.apache.hc.client5.http.classic.methods.HttpUriRequestBase;
89
import org.apache.hc.client5.http.impl.classic.CloseableHttpClient;
10+
import org.apache.hc.client5.http.impl.classic.CloseableHttpResponse;
911
import org.apache.hc.client5.http.impl.classic.HttpClients;
10-
import org.apache.hc.core5.http.HttpStatus;
12+
import org.apache.hc.core5.http.*;
13+
import org.apache.hc.core5.http.io.entity.StringEntity;
14+
import org.apache.hc.core5.http.message.BasicClassicHttpResponse;
1115
import org.hamcrest.Matchers;
1216
import org.junit.Assert;
1317
import org.junit.Test;
18+
import org.mockito.ArgumentCaptor;
19+
import org.mockito.Mockito;
1420

21+
import java.io.Closeable;
1522
import java.io.IOException;
23+
import java.io.StringBufferInputStream;
1624
import java.lang.reflect.InvocationTargetException;
1725
import java.net.URI;
1826
import java.net.URISyntaxException;
27+
import java.util.List;
1928
import java.util.Map;
2029

30+
import static org.mockito.Mockito.when;
31+
2132
public class HttpSplitChangeFetcherTest {
2233
@Test
2334
public void testDefaultURL() throws URISyntaxException {
@@ -76,4 +87,44 @@ public void testFetcherWithSpecialCharacters() throws URISyntaxException, Invoca
7687
Assert.assertEquals("{\"test\": \"blue\",\"grüne Straße\": 13}", configs.get("on"));
7788
Assert.assertEquals("{\"test\": \"blue\",\"size\": 15}", configs.get("off"));
7889
}
90+
91+
@Test
92+
public void testFetcherWithCDNBypassOption() throws IOException, URISyntaxException, IllegalAccessException, NoSuchMethodException, InvocationTargetException {
93+
URI rootTarget = URI.create("https://api.split.io");
94+
95+
HttpEntity entityMock = Mockito.mock(HttpEntity.class);
96+
when(entityMock.getContent()).thenReturn(new StringBufferInputStream("{\"till\": 1}"));
97+
ClassicHttpResponse response = Mockito.mock(ClassicHttpResponse.class);
98+
when(response.getCode()).thenReturn(200);
99+
when(response.getEntity()).thenReturn(entityMock);
100+
when(response.getHeaders()).thenReturn(new Header[0]);
101+
102+
ArgumentCaptor<ClassicHttpRequest> requestCaptor = ArgumentCaptor.forClass(ClassicHttpRequest.class);
103+
CloseableHttpClient httpClientMock = Mockito.mock(CloseableHttpClient.class);
104+
when(httpClientMock.execute(requestCaptor.capture())).thenReturn(TestHelper.classicResponseToCloseableMock(response));
105+
106+
Metrics.NoopMetrics metrics = new Metrics.NoopMetrics();
107+
HttpSplitChangeFetcher fetcher = HttpSplitChangeFetcher.create(httpClientMock, rootTarget, metrics);
108+
109+
fetcher.fetch(-1, new FetchOptions.Builder().cdnBypass(true).build());
110+
fetcher.fetch(-1, new FetchOptions.Builder().build());
111+
List<ClassicHttpRequest> captured = requestCaptor.getAllValues();
112+
Assert.assertEquals(captured.size(), 2);
113+
Assert.assertTrue(captured.get(0).getUri().toString().contains("till="));
114+
Assert.assertFalse(captured.get(1).getUri().toString().contains("till="));
115+
}
116+
117+
@Test
118+
public void testRandomNumberGeneration() throws URISyntaxException {
119+
URI rootTarget = URI.create("https://api.split.io");
120+
CloseableHttpClient httpClientMock = Mockito.mock(CloseableHttpClient.class);
121+
Metrics.NoopMetrics metrics = new Metrics.NoopMetrics();
122+
HttpSplitChangeFetcher fetcher = HttpSplitChangeFetcher.create(httpClientMock, rootTarget, metrics);
123+
124+
long min = (long)Math.pow(2, 63) * (-1);
125+
for (long x = 0; x < 100000000; x++) {
126+
long r = fetcher.makeRandomTill();
127+
Assert.assertTrue(r < 0 && r > min);
128+
}
129+
}
79130
}

0 commit comments

Comments
 (0)