Skip to content

Commit fdba632

Browse files
Merge pull request #193 from splitio/cache-update
Cache update
2 parents ad75990 + c017e99 commit fdba632

21 files changed

+152
-228
lines changed

client/src/main/java/io/split/client/HttpSegmentChangeFetcher.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,8 @@ public final class HttpSegmentChangeFetcher implements SegmentChangeFetcher {
2828

2929
private static final String SINCE = "since";
3030
private static final String PREFIX = "segmentChangeFetcher";
31+
private static final String NAME_CACHE = "Cache-Control";
32+
private static final String VALUE_CACHE = "no-cache";
3133

3234
private final CloseableHttpClient _client;
3335
private final URI _target;
@@ -49,7 +51,7 @@ private HttpSegmentChangeFetcher(CloseableHttpClient client, URI uri, Metrics me
4951
}
5052

5153
@Override
52-
public SegmentChange fetch(String segmentName, long since) {
54+
public SegmentChange fetch(String segmentName, long since, boolean addCacheHeader) {
5355
long start = System.currentTimeMillis();
5456

5557
CloseableHttpResponse response = null;
@@ -58,6 +60,9 @@ public SegmentChange fetch(String segmentName, long since) {
5860
String path = _target.getPath() + "/" + segmentName;
5961
URI uri = new URIBuilder(_target).setPath(path).addParameter(SINCE, "" + since).build();
6062
HttpGet request = new HttpGet(uri);
63+
if(addCacheHeader) {
64+
request.setHeader(NAME_CACHE, VALUE_CACHE);
65+
}
6166
response = _client.execute(request);
6267

6368
int statusCode = response.getCode();

client/src/main/java/io/split/client/HttpSplitChangeFetcher.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,8 @@ public final class HttpSplitChangeFetcher implements SplitChangeFetcher {
2828

2929
private static final String SINCE = "since";
3030
private static final String PREFIX = "splitChangeFetcher";
31+
private static final String NAME_CACHE = "Cache-Control";
32+
private static final String VALUE_CACHE = "no-cache";
3133

3234
private final CloseableHttpClient _client;
3335
private final URI _target;
@@ -49,7 +51,7 @@ private HttpSplitChangeFetcher(CloseableHttpClient client, URI uri, Metrics metr
4951
}
5052

5153
@Override
52-
public SplitChange fetch(long since) {
54+
public SplitChange fetch(long since, boolean addCacheHeader) {
5355

5456
long start = System.currentTimeMillis();
5557

@@ -59,6 +61,9 @@ public SplitChange fetch(long since) {
5961
URI uri = new URIBuilder(_target).addParameter(SINCE, "" + since).build();
6062

6163
HttpGet request = new HttpGet(uri);
64+
if(addCacheHeader) {
65+
request.setHeader(NAME_CACHE, VALUE_CACHE);
66+
}
6267
response = _client.execute(request);
6368

6469
int statusCode = response.getCode();

client/src/main/java/io/split/client/jmx/SplitJmxMonitor.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ public SplitJmxMonitor(SplitClient splitClient, SplitFetcher featureFetcher, Spl
3434

3535
@Override
3636
public boolean forceSyncFeatures() {
37-
_featureFetcher.forceRefresh();
37+
_featureFetcher.forceRefresh(true);
3838
_log.info("Features successfully refreshed via JMX");
3939
return true;
4040
}
@@ -43,7 +43,7 @@ public boolean forceSyncFeatures() {
4343
public boolean forceSyncSegment(String segmentName) {
4444
SegmentFetcher fetcher = _segmentSynchronizationTask.getFetcher(segmentName);
4545
try{
46-
fetcher.fetch();
46+
fetcher.fetch(true);
4747
}
4848
//We are sure this will never happen because getFetcher firts initiate the segment. This try/catch is for safe only.
4949
catch (NullPointerException np){

client/src/main/java/io/split/engine/common/SynchronizerImp.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -48,8 +48,8 @@ public SynchronizerImp(SplitSynchronizationTask splitSynchronizationTask,
4848
@Override
4949
public void syncAll() {
5050
_syncAllScheduledExecutorService.schedule(() -> {
51-
_splitFetcher.run();
52-
_segmentSynchronizationTaskImp.run();
51+
_splitFetcher.fetchAll(true);
52+
_segmentSynchronizationTaskImp.fetchAll(true);
5353
}, 0, TimeUnit.SECONDS);
5454
}
5555

@@ -70,7 +70,7 @@ public void stopPeriodicFetching() {
7070
@Override
7171
public void refreshSplits(long targetChangeNumber) {
7272
if (targetChangeNumber > _splitCache.getChangeNumber()) {
73-
_splitFetcher.forceRefresh();
73+
_splitFetcher.forceRefresh(true);
7474
}
7575
}
7676

@@ -87,7 +87,7 @@ public void refreshSegment(String segmentName, long changeNumber) {
8787
if (changeNumber > _segmentCache.getChangeNumber(segmentName)) {
8888
SegmentFetcher fetcher = _segmentSynchronizationTaskImp.getFetcher(segmentName);
8989
try{
90-
fetcher.fetch();
90+
fetcher.fetch(true);
9191
}
9292
//We are sure this will never happen because getFetcher firts initiate the segment. This try/catch is for safe only.
9393
catch (NullPointerException np){

client/src/main/java/io/split/engine/experiments/SplitChangeFetcher.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,5 +31,5 @@ public interface SplitChangeFetcher {
3131
* @return SegmentChange
3232
* @throws java.lang.RuntimeException if there was a problem computing split changes
3333
*/
34-
SplitChange fetch(long since);
34+
SplitChange fetch(long since, boolean addCacheHeader);
3535
}

client/src/main/java/io/split/engine/experiments/SplitFetcher.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,5 +8,11 @@ public interface SplitFetcher extends Runnable {
88
* Forces a sync of splits, outside of any scheduled
99
* syncs. This method MUST NOT throw any exceptions.
1010
*/
11-
void forceRefresh();
11+
void forceRefresh(boolean addCacheHeader);
12+
13+
/**
14+
* Forces a sync of ALL splits, outside of any scheduled
15+
* syncs. This method MUST NOT throw any exceptions.
16+
*/
17+
void fetchAll(boolean addCacheHeader);
1218
}

client/src/main/java/io/split/engine/experiments/SplitFetcherImp.java

Lines changed: 26 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -43,12 +43,12 @@ public SplitFetcherImp(SplitChangeFetcher splitChangeFetcher, SplitParser parser
4343
}
4444

4545
@Override
46-
public void forceRefresh() {
46+
public void forceRefresh(boolean addCacheHeader) {
4747
_log.debug("Force Refresh splits starting ...");
4848
try {
4949
while (true) {
5050
long start = _splitCache.getChangeNumber();
51-
runWithoutExceptionHandling();
51+
runWithoutExceptionHandling(addCacheHeader);
5252
long end = _splitCache.getChangeNumber();
5353

5454
if (start >= end) {
@@ -65,28 +65,11 @@ public void forceRefresh() {
6565

6666
@Override
6767
public void run() {
68-
_log.debug("Fetch splits starting ...");
69-
long start = _splitCache.getChangeNumber();
70-
try {
71-
runWithoutExceptionHandling();
72-
_gates.splitsAreReady();
73-
} catch (InterruptedException e) {
74-
_log.warn("Interrupting split fetcher task");
75-
Thread.currentThread().interrupt();
76-
} catch (Throwable t) {
77-
_log.error("RefreshableSplitFetcher failed: " + t.getMessage());
78-
if (_log.isDebugEnabled()) {
79-
_log.debug("Reason:", t);
80-
}
81-
} finally {
82-
if (_log.isDebugEnabled()) {
83-
_log.debug("split fetch before: " + start + ", after: " + _splitCache.getChangeNumber());
84-
}
85-
}
68+
this.fetchAll(false);
8669
}
8770

88-
private void runWithoutExceptionHandling() throws InterruptedException {
89-
SplitChange change = _splitChangeFetcher.fetch(_splitCache.getChangeNumber());
71+
private void runWithoutExceptionHandling(boolean addCacheHeader) throws InterruptedException {
72+
SplitChange change = _splitChangeFetcher.fetch(_splitCache.getChangeNumber(), addCacheHeader);
9073

9174
if (change == null) {
9275
throw new IllegalStateException("SplitChange was null");
@@ -155,4 +138,25 @@ private void runWithoutExceptionHandling() throws InterruptedException {
155138
_splitCache.setChangeNumber(change.till);
156139
}
157140
}
141+
@Override
142+
public void fetchAll(boolean addCacheHeader) {
143+
_log.debug("Fetch splits starting ...");
144+
long start = _splitCache.getChangeNumber();
145+
try {
146+
runWithoutExceptionHandling(addCacheHeader);
147+
_gates.splitsAreReady();
148+
} catch (InterruptedException e) {
149+
_log.warn("Interrupting split fetcher task");
150+
Thread.currentThread().interrupt();
151+
} catch (Throwable t) {
152+
_log.error("RefreshableSplitFetcher failed: " + t.getMessage());
153+
if (_log.isDebugEnabled()) {
154+
_log.debug("Reason:", t);
155+
}
156+
} finally {
157+
if (_log.isDebugEnabled()) {
158+
_log.debug("split fetch before: " + start + ", after: " + _splitCache.getChangeNumber());
159+
}
160+
}
161+
}
158162
}

client/src/main/java/io/split/engine/segments/SegmentChangeFetcher.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,5 +25,5 @@ public interface SegmentChangeFetcher {
2525
* @return SegmentChange
2626
* @throws java.lang.RuntimeException if there was a problem fetching segment changes
2727
*/
28-
SegmentChange fetch(String segmentName, long changesSinceThisChangeNumber);
28+
SegmentChange fetch(String segmentName, long changesSinceThisChangeNumber, boolean addCacheHeader);
2929
}

client/src/main/java/io/split/engine/segments/SegmentFetcher.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,5 +7,9 @@ public interface SegmentFetcher {
77
/**
88
* fetch
99
*/
10-
void fetch();
10+
void fetch(boolean addCacheHeader);
11+
12+
void runWhitCacheHeader();
13+
14+
void fetchAll();
1115
}

client/src/main/java/io/split/engine/segments/SegmentFetcherImp.java

Lines changed: 39 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111

1212
import static com.google.common.base.Preconditions.checkNotNull;
1313

14-
public class SegmentFetcherImp implements Runnable, SegmentFetcher {
14+
public class SegmentFetcherImp implements SegmentFetcher {
1515
private static final Logger _log = LoggerFactory.getLogger(SegmentFetcherImp.class);
1616

1717
private final String _segmentName;
@@ -31,14 +31,9 @@ public SegmentFetcherImp(String segmentName, SegmentChangeFetcher segmentChangeF
3131
}
3232

3333
@Override
34-
public void run() {
34+
public void fetch(boolean addCacheHeader){
3535
try {
36-
// Do this again in case the previous call errored out.
37-
_gates.registerSegment(_segmentName);
38-
callLoopRun(true);
39-
40-
_gates.segmentIsReady(_segmentName);
41-
36+
callLoopRun(false, addCacheHeader);
4237
} catch (Throwable t) {
4338
_log.error("RefreshableSegmentFetcher failed: " + t.getMessage());
4439
if (_log.isDebugEnabled()) {
@@ -47,20 +42,8 @@ public void run() {
4742
}
4843
}
4944

50-
@Override
51-
public void fetch(){
52-
try {
53-
callLoopRun(false);
54-
} catch (Throwable t) {
55-
_log.error("RefreshableSegmentFetcher failed: " + t.getMessage());
56-
if (_log.isDebugEnabled()) {
57-
_log.debug("Reason:", t);
58-
}
59-
}
60-
}
61-
62-
private void runWithoutExceptionHandling() {
63-
SegmentChange change = _segmentChangeFetcher.fetch(_segmentName, _segmentCache.getChangeNumber(_segmentName));
45+
private void runWithoutExceptionHandling(boolean addCacheHeader) {
46+
SegmentChange change = _segmentChangeFetcher.fetch(_segmentName, _segmentCache.getChangeNumber(_segmentName), addCacheHeader);
6447

6548
if (change == null) {
6649
throw new IllegalStateException("SegmentChange was null");
@@ -126,10 +109,10 @@ private String summarize(List<String> changes) {
126109
return bldr.toString();
127110
}
128111

129-
private void callLoopRun(boolean isFetch){
112+
private void callLoopRun(boolean isFetch, boolean addCacheHeader){
130113
while (true) {
131114
long start = _segmentCache.getChangeNumber(_segmentName);
132-
runWithoutExceptionHandling();
115+
runWithoutExceptionHandling(addCacheHeader);
133116
long end = _segmentCache.getChangeNumber(_segmentName);
134117
if (isFetch && _log.isDebugEnabled()) {
135118
_log.debug(_segmentName + " segment fetch before: " + start + ", after: " + _segmentCache.getChangeNumber(_segmentName) /*+ " size: " + _concurrentKeySet.size()*/);
@@ -139,4 +122,36 @@ private void callLoopRun(boolean isFetch){
139122
}
140123
}
141124
}
125+
126+
@Override
127+
public void runWhitCacheHeader(){
128+
this.fetchAndUpdate(true);
129+
}
130+
131+
/**
132+
* Calls callLoopRun and after fetchs segment.
133+
* @param addCacheHeader indicates if CacheHeader is required
134+
*/
135+
private void fetchAndUpdate(boolean addCacheHeader) {
136+
try {
137+
// Do this again in case the previous call errored out.
138+
_gates.registerSegment(_segmentName);
139+
callLoopRun(true, addCacheHeader);
140+
141+
_gates.segmentIsReady(_segmentName);
142+
143+
} catch (Throwable t) {
144+
_log.error("RefreshableSegmentFetcher failed: " + t.getMessage());
145+
if (_log.isDebugEnabled()) {
146+
_log.debug("Reason:", t);
147+
}
148+
}
149+
}
150+
151+
@Override
152+
public void fetchAll() {
153+
this.fetchAndUpdate(false);
154+
}
155+
156+
142157
}

client/src/main/java/io/split/engine/segments/SegmentSynchronizationTask.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,4 +23,10 @@ public interface SegmentSynchronizationTask extends Runnable {
2323
* stops the thread
2424
*/
2525
void stop();
26+
27+
/**
28+
* fetch every Segment
29+
* @param addCacheHeader
30+
*/
31+
void fetchAll(boolean addCacheHeader);
2632
}

client/src/main/java/io/split/engine/segments/SegmentSynchronizationTaskImp.java

Lines changed: 21 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ public class SegmentSynchronizationTaskImp implements SegmentSynchronizationTask
2929
private final AtomicLong _refreshEveryNSeconds;
3030
private final AtomicBoolean _running;
3131
private final Object _lock = new Object();
32-
private final ConcurrentMap<String, SegmentFetcherImp> _segmentFetchers = Maps.newConcurrentMap();
32+
private final ConcurrentMap<String, SegmentFetcher> _segmentFetchers = Maps.newConcurrentMap();
3333
private final SegmentCache _segmentCache;
3434
private final SDKReadinessGates _gates;
3535
private final ScheduledExecutorService _scheduledExecutorService;
@@ -58,20 +58,12 @@ public SegmentSynchronizationTaskImp(SegmentChangeFetcher segmentChangeFetcher,
5858

5959
@Override
6060
public void run() {
61-
for (Map.Entry<String, SegmentFetcherImp> entry : _segmentFetchers.entrySet()) {
62-
SegmentFetcherImp fetcher = entry.getValue();
63-
64-
if (fetcher == null) {
65-
continue;
66-
}
67-
68-
_scheduledExecutorService.submit(fetcher);
69-
}
61+
this.fetchAll(false);
7062
}
7163

7264
@Override
7365
public void initializeSegment(String segmentName) {
74-
SegmentFetcherImp segment = _segmentFetchers.get(segmentName);
66+
SegmentFetcher segment = _segmentFetchers.get(segmentName);
7567
if (segment != null) {
7668
return;
7769
}
@@ -94,7 +86,7 @@ public void initializeSegment(String segmentName) {
9486
segment = new SegmentFetcherImp(segmentName, _segmentChangeFetcher, _gates, _segmentCache);
9587

9688
if (_running.get()) {
97-
_scheduledExecutorService.submit(segment);
89+
_scheduledExecutorService.submit(segment::fetchAll);
9890
}
9991

10092
_segmentFetchers.putIfAbsent(segmentName, segment);
@@ -148,4 +140,21 @@ public void close() {
148140
Thread.currentThread().interrupt();
149141
}
150142
}
143+
144+
@Override
145+
public void fetchAll(boolean addCacheHeader) {
146+
for (Map.Entry<String, SegmentFetcher> entry : _segmentFetchers.entrySet()) {
147+
SegmentFetcher fetcher = entry.getValue();
148+
149+
if (fetcher == null) {
150+
continue;
151+
}
152+
153+
if(addCacheHeader) {
154+
_scheduledExecutorService.submit(fetcher::runWhitCacheHeader);
155+
continue;
156+
}
157+
_scheduledExecutorService.submit(fetcher::fetchAll);
158+
}
159+
}
151160
}

client/src/test/java/io/split/client/HttpSegmentChangeFetcherTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ public void testFetcherWithSpecialCharacters() throws URISyntaxException, IOExce
6161
Metrics.NoopMetrics metrics = new Metrics.NoopMetrics();
6262
HttpSegmentChangeFetcher fetcher = HttpSegmentChangeFetcher.create(httpClientMock, rootTarget, metrics);
6363

64-
SegmentChange change = fetcher.fetch("some_segment", 1234567);
64+
SegmentChange change = fetcher.fetch("some_segment", 1234567, true);
6565

6666
Assert.assertNotNull(change);
6767
Assert.assertEquals(1, change.added.size());

0 commit comments

Comments
 (0)