Skip to content

Commit f4c18eb

Browse files
authored
Merge pull request #547 from splitio/rbs-sse
Added RBS support for SSE classes
2 parents fc0005d + 8cccfa6 commit f4c18eb

28 files changed

+416
-128
lines changed

client/src/main/java/io/split/client/SplitFactoryImpl.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -277,7 +277,7 @@ public SplitFactoryImpl(String apiToken, SplitClientConfig config) throws URISyn
277277

278278
_syncManager = SyncManagerImp.build(splitTasks, _splitFetcher, splitCache, splitAPI,
279279
segmentCache, _gates, _telemetryStorageProducer, _telemetrySynchronizer, config, splitParser,
280-
flagSetsFilter);
280+
ruleBasedSegmentParser, flagSetsFilter, ruleBasedSegmentCache);
281281
_syncManager.start();
282282

283283
// DestroyOnShutDown

client/src/main/java/io/split/engine/common/ConsumerSynchronizer.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ public void stopPeriodicFetching() {
3535
}
3636

3737
@Override
38-
public void refreshSplits(Long targetChangeNumber) {
38+
public void refreshSplits(Long targetChangeNumber, Long ruleBasedSegmentChangeNumber) {
3939
//No-Op
4040
}
4141

client/src/main/java/io/split/engine/common/LocalhostSynchronizer.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ public void stopPeriodicFetching() {
5656
}
5757

5858
@Override
59-
public void refreshSplits(Long targetChangeNumber) {
59+
public void refreshSplits(Long targetChangeNumber, Long ruleBasedSegmentChangeNumber) {
6060
FetchResult fetchResult = _splitFetcher.forceRefresh(new FetchOptions.Builder().cacheControlHeaders(true).build());
6161
if (fetchResult.isSuccess()){
6262
_log.debug("Refresh feature flags completed");

client/src/main/java/io/split/engine/common/PushManagerImp.java

+7-3
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import com.google.common.annotations.VisibleForTesting;
44
import io.split.client.interceptors.FlagSetsFilter;
5+
import io.split.engine.experiments.RuleBasedSegmentParser;
56
import io.split.engine.experiments.SplitParser;
67
import io.split.engine.sse.AuthApiClient;
78
import io.split.engine.sse.AuthApiClientImp;
@@ -17,6 +18,7 @@
1718
import io.split.engine.sse.workers.FeatureFlagWorkerImp;
1819
import io.split.engine.sse.workers.Worker;
1920

21+
import io.split.storages.RuleBasedSegmentCache;
2022
import io.split.storages.SplitCacheProducer;
2123
import io.split.telemetry.domain.StreamingEvent;
2224
import io.split.telemetry.domain.enums.StreamEventsEnum;
@@ -79,9 +81,11 @@ public static PushManagerImp build(Synchronizer synchronizer,
7981
ThreadFactory threadFactory,
8082
SplitParser splitParser,
8183
SplitCacheProducer splitCacheProducer,
82-
FlagSetsFilter flagSetsFilter) {
83-
FeatureFlagsWorker featureFlagsWorker = new FeatureFlagWorkerImp(synchronizer, splitParser, splitCacheProducer,
84-
telemetryRuntimeProducer, flagSetsFilter);
84+
FlagSetsFilter flagSetsFilter,
85+
RuleBasedSegmentCache ruleBasedSegmentCache,
86+
RuleBasedSegmentParser ruleBasedSegmentParser) {
87+
FeatureFlagsWorker featureFlagsWorker = new FeatureFlagWorkerImp(synchronizer, splitParser, ruleBasedSegmentParser, splitCacheProducer,
88+
ruleBasedSegmentCache, telemetryRuntimeProducer, flagSetsFilter);
8589
Worker<SegmentQueueDto> segmentWorker = new SegmentsWorkerImp(synchronizer);
8690
PushStatusTracker pushStatusTracker = new PushStatusTrackerImp(statusMessages, telemetryRuntimeProducer);
8791

client/src/main/java/io/split/engine/common/SyncManagerImp.java

+9-2
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,12 @@
55
import io.split.client.SplitClientConfig;
66
import io.split.client.interceptors.FlagSetsFilter;
77
import io.split.engine.SDKReadinessGates;
8+
import io.split.engine.experiments.RuleBasedSegmentParser;
89
import io.split.engine.experiments.SplitFetcher;
910
import io.split.engine.experiments.SplitParser;
1011
import io.split.engine.experiments.SplitSynchronizationTask;
1112
import io.split.engine.segments.SegmentSynchronizationTask;
13+
import io.split.storages.RuleBasedSegmentCache;
1214
import io.split.storages.SegmentCacheProducer;
1315
import io.split.storages.SplitCacheProducer;
1416
import io.split.telemetry.domain.StreamingEvent;
@@ -89,12 +91,15 @@ public static SyncManagerImp build(SplitTasks splitTasks,
8991
TelemetrySynchronizer telemetrySynchronizer,
9092
SplitClientConfig config,
9193
SplitParser splitParser,
92-
FlagSetsFilter flagSetsFilter) {
94+
RuleBasedSegmentParser ruleBasedSegmentParser,
95+
FlagSetsFilter flagSetsFilter,
96+
RuleBasedSegmentCache ruleBasedSegmentCache) {
9397
LinkedBlockingQueue<PushManager.Status> pushMessages = new LinkedBlockingQueue<>();
9498
Synchronizer synchronizer = new SynchronizerImp(splitTasks,
9599
splitFetcher,
96100
splitCacheProducer,
97101
segmentCacheProducer,
102+
ruleBasedSegmentCache,
98103
config.streamingRetryDelay(),
99104
config.streamingFetchMaxRetries(),
100105
config.failedAttemptsBeforeLogging(),
@@ -109,7 +114,9 @@ public static SyncManagerImp build(SplitTasks splitTasks,
109114
config.getThreadFactory(),
110115
splitParser,
111116
splitCacheProducer,
112-
flagSetsFilter);
117+
flagSetsFilter,
118+
ruleBasedSegmentCache,
119+
ruleBasedSegmentParser);
113120

114121
return new SyncManagerImp(splitTasks,
115122
config.streamingEnabled(),

client/src/main/java/io/split/engine/common/Synchronizer.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ public interface Synchronizer {
66
boolean syncAll();
77
void startPeriodicFetching();
88
void stopPeriodicFetching();
9-
void refreshSplits(Long targetChangeNumber);
9+
void refreshSplits(Long targetChangeNumber, Long ruleBasedSegmentChangeNumber);
1010
void localKillSplit(SplitKillNotification splitKillNotification);
1111
void refreshSegment(String segmentName, Long targetChangeNumber);
1212
void startPeriodicDataRecording();

client/src/main/java/io/split/engine/common/SynchronizerImp.java

+20-7
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
import io.split.engine.segments.SegmentFetcher;
1010
import io.split.engine.segments.SegmentSynchronizationTask;
1111
import io.split.engine.sse.dtos.SplitKillNotification;
12+
import io.split.storages.RuleBasedSegmentCacheProducer;
1213
import io.split.storages.SegmentCacheProducer;
1314
import io.split.storages.SplitCacheProducer;
1415
import io.split.telemetry.synchronizer.TelemetrySyncTask;
@@ -34,6 +35,7 @@ public class SynchronizerImp implements Synchronizer {
3435
private final SplitFetcher _splitFetcher;
3536
private final SegmentSynchronizationTask _segmentSynchronizationTaskImp;
3637
private final SplitCacheProducer _splitCacheProducer;
38+
private final RuleBasedSegmentCacheProducer _ruleBasedSegmentCacheProducer;
3739
private final SegmentCacheProducer segmentCacheProducer;
3840
private final ImpressionsManager _impressionManager;
3941
private final EventsTask _eventsTask;
@@ -48,6 +50,7 @@ public SynchronizerImp(SplitTasks splitTasks,
4850
SplitFetcher splitFetcher,
4951
SplitCacheProducer splitCacheProducer,
5052
SegmentCacheProducer segmentCacheProducer,
53+
RuleBasedSegmentCacheProducer ruleBasedSegmentCacheProducer,
5154
int onDemandFetchRetryDelayMs,
5255
int onDemandFetchMaxRetries,
5356
int failedAttemptsBeforeLogging,
@@ -56,6 +59,7 @@ public SynchronizerImp(SplitTasks splitTasks,
5659
_splitFetcher = checkNotNull(splitFetcher);
5760
_segmentSynchronizationTaskImp = checkNotNull(splitTasks.getSegmentSynchronizationTask());
5861
_splitCacheProducer = checkNotNull(splitCacheProducer);
62+
_ruleBasedSegmentCacheProducer = checkNotNull(ruleBasedSegmentCacheProducer);
5963
this.segmentCacheProducer = checkNotNull(segmentCacheProducer);
6064
_onDemandFetchRetryDelayMs = checkNotNull(onDemandFetchRetryDelayMs);
6165
_onDemandFetchMaxRetries = onDemandFetchMaxRetries;
@@ -103,7 +107,7 @@ private static class SyncResult {
103107
private final FetchResult _fetchResult;
104108
}
105109

106-
private SyncResult attemptSplitsSync(long targetChangeNumber,
110+
private SyncResult attemptSplitsSync(long targetChangeNumber, long ruleBasedSegmentChangeNumber,
107111
FetchOptions opts,
108112
Function<Void, Long> nextWaitMs,
109113
int maxRetries) {
@@ -114,7 +118,8 @@ private SyncResult attemptSplitsSync(long targetChangeNumber,
114118
if (fetchResult != null && !fetchResult.retry() && !fetchResult.isSuccess()) {
115119
return new SyncResult(false, remainingAttempts, fetchResult);
116120
}
117-
if (targetChangeNumber <= _splitCacheProducer.getChangeNumber()) {
121+
if (targetChangeNumber <= _splitCacheProducer.getChangeNumber()
122+
&& ruleBasedSegmentChangeNumber <= _ruleBasedSegmentCacheProducer.getChangeNumber()) {
118123
return new SyncResult(true, remainingAttempts, fetchResult);
119124
} else if (remainingAttempts <= 0) {
120125
return new SyncResult(false, remainingAttempts, fetchResult);
@@ -130,9 +135,17 @@ private SyncResult attemptSplitsSync(long targetChangeNumber,
130135
}
131136

132137
@Override
133-
public void refreshSplits(Long targetChangeNumber) {
138+
public void refreshSplits(Long targetChangeNumber, Long ruleBasedSegmentChangeNumber) {
134139

135-
if (targetChangeNumber <= _splitCacheProducer.getChangeNumber()) {
140+
if (targetChangeNumber == null || targetChangeNumber == 0) {
141+
targetChangeNumber = _splitCacheProducer.getChangeNumber();
142+
}
143+
if (ruleBasedSegmentChangeNumber == null || ruleBasedSegmentChangeNumber == 0) {
144+
ruleBasedSegmentChangeNumber = _ruleBasedSegmentCacheProducer.getChangeNumber();
145+
}
146+
147+
if (targetChangeNumber <= _splitCacheProducer.getChangeNumber()
148+
&& ruleBasedSegmentChangeNumber <= _ruleBasedSegmentCacheProducer.getChangeNumber()) {
136149
return;
137150
}
138151

@@ -142,7 +155,7 @@ public void refreshSplits(Long targetChangeNumber) {
142155
.flagSetsFilter(_sets)
143156
.build();
144157

145-
SyncResult regularResult = attemptSplitsSync(targetChangeNumber, opts,
158+
SyncResult regularResult = attemptSplitsSync(targetChangeNumber, ruleBasedSegmentChangeNumber, opts,
146159
(discard) -> (long) _onDemandFetchRetryDelayMs, _onDemandFetchMaxRetries);
147160

148161
int attempts = _onDemandFetchMaxRetries - regularResult.remainingAttempts();
@@ -157,7 +170,7 @@ public void refreshSplits(Long targetChangeNumber) {
157170
_log.info(String.format("No changes fetched after %s attempts. Will retry bypassing CDN.", attempts));
158171
FetchOptions withCdnBypass = new FetchOptions.Builder(opts).targetChangeNumber(targetChangeNumber).build();
159172
Backoff backoff = new Backoff(ON_DEMAND_FETCH_BACKOFF_BASE_MS, ON_DEMAND_FETCH_BACKOFF_MAX_WAIT_MS);
160-
SyncResult withCDNBypassed = attemptSplitsSync(targetChangeNumber, withCdnBypass,
173+
SyncResult withCDNBypassed = attemptSplitsSync(targetChangeNumber, ruleBasedSegmentChangeNumber, withCdnBypass,
161174
(discard) -> backoff.interval(), ON_DEMAND_FETCH_BACKOFF_MAX_RETRIES);
162175

163176
int withoutCDNAttempts = ON_DEMAND_FETCH_BACKOFF_MAX_RETRIES - withCDNBypassed._remainingAttempts;
@@ -175,7 +188,7 @@ public void localKillSplit(SplitKillNotification splitKillNotification) {
175188
if (splitKillNotification.getChangeNumber() > _splitCacheProducer.getChangeNumber()) {
176189
_splitCacheProducer.kill(splitKillNotification.getSplitName(), splitKillNotification.getDefaultTreatment(),
177190
splitKillNotification.getChangeNumber());
178-
refreshSplits(splitKillNotification.getChangeNumber());
191+
refreshSplits(splitKillNotification.getChangeNumber(), 0L);
179192
}
180193
}
181194

client/src/main/java/io/split/engine/experiments/ParsedSplit.java

+17
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import com.google.common.collect.ImmutableList;
44
import io.split.engine.matchers.AttributeMatcher;
5+
import io.split.engine.matchers.RuleBasedSegmentMatcher;
56
import io.split.engine.matchers.UserDefinedSegmentMatcher;
67

78
import java.util.HashSet;
@@ -243,6 +244,15 @@ public Set<String> getSegmentsNames() {
243244
.collect(Collectors.toSet());
244245
}
245246

247+
public Set<String> getRuleBasedSegmentsNames() {
248+
return parsedConditions().stream()
249+
.flatMap(parsedCondition -> parsedCondition.matcher().attributeMatchers().stream())
250+
.filter(ParsedSplit::isRuleBasedSegmentMatcher)
251+
.map(ParsedSplit::asRuleBasedSegmentMatcherForEach)
252+
.map(RuleBasedSegmentMatcher::getSegmentName)
253+
.collect(Collectors.toSet());
254+
}
255+
246256
private static boolean isSegmentMatcher(AttributeMatcher attributeMatcher) {
247257
return ((AttributeMatcher.NegatableMatcher) attributeMatcher.matcher()).delegate() instanceof UserDefinedSegmentMatcher;
248258
}
@@ -251,4 +261,11 @@ private static UserDefinedSegmentMatcher asSegmentMatcherForEach(AttributeMatche
251261
return (UserDefinedSegmentMatcher) ((AttributeMatcher.NegatableMatcher) attributeMatcher.matcher()).delegate();
252262
}
253263

264+
private static boolean isRuleBasedSegmentMatcher(AttributeMatcher attributeMatcher) {
265+
return ((AttributeMatcher.NegatableMatcher) attributeMatcher.matcher()).delegate() instanceof RuleBasedSegmentMatcher;
266+
}
267+
268+
private static RuleBasedSegmentMatcher asRuleBasedSegmentMatcherForEach(AttributeMatcher attributeMatcher) {
269+
return (RuleBasedSegmentMatcher) ((AttributeMatcher.NegatableMatcher) attributeMatcher.matcher()).delegate();
270+
}
254271
}

client/src/main/java/io/split/engine/sse/NotificationParserImp.java

+6-2
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,12 @@
11
package io.split.engine.sse;
22

3+
import io.split.client.dtos.RuleBasedSegment;
4+
import io.split.client.dtos.Split;
35
import io.split.client.utils.Json;
46

57
import io.split.engine.sse.dtos.ControlNotification;
68
import io.split.engine.sse.dtos.ErrorNotification;
7-
import io.split.engine.sse.dtos.FeatureFlagChangeNotification;
9+
import io.split.engine.sse.dtos.CommonChangeNotification;
810
import io.split.engine.sse.dtos.GenericNotificationData;
911
import io.split.engine.sse.dtos.IncomingNotification;
1012
import io.split.engine.sse.dtos.OccupancyNotification;
@@ -47,7 +49,9 @@ public ErrorNotification parseError(String payload) throws EventParsingException
4749
private IncomingNotification parseNotification(GenericNotificationData genericNotificationData) throws Exception {
4850
switch (genericNotificationData.getType()) {
4951
case SPLIT_UPDATE:
50-
return new FeatureFlagChangeNotification(genericNotificationData);
52+
return new CommonChangeNotification(genericNotificationData, Split.class);
53+
case RB_SEGMENT_UPDATE:
54+
return new CommonChangeNotification(genericNotificationData, RuleBasedSegment.class);
5155
case SPLIT_KILL:
5256
return new SplitKillNotification(genericNotificationData);
5357
case SEGMENT_UPDATE:

client/src/main/java/io/split/engine/sse/NotificationProcessor.java

+1-2
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,12 @@
11
package io.split.engine.sse;
22

3-
import io.split.engine.sse.dtos.FeatureFlagChangeNotification;
43
import io.split.engine.sse.dtos.IncomingNotification;
54
import io.split.engine.sse.dtos.SplitKillNotification;
65
import io.split.engine.sse.dtos.StatusNotification;
76

87
public interface NotificationProcessor {
98
void process(IncomingNotification notification);
10-
void processSplitUpdate(FeatureFlagChangeNotification featureFlagChangeNotification);
9+
void processUpdates(IncomingNotification notification);
1110
void processSplitKill(SplitKillNotification splitKillNotification);
1211
void processSegmentUpdate(long changeNumber, String segmentName);
1312
void processStatus(StatusNotification statusNotification);

client/src/main/java/io/split/engine/sse/NotificationProcessorImp.java

+5-7
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
package io.split.engine.sse;
22

33
import com.google.common.annotations.VisibleForTesting;
4-
import io.split.engine.sse.dtos.FeatureFlagChangeNotification;
54
import io.split.engine.sse.dtos.GenericNotificationData;
65
import io.split.engine.sse.dtos.IncomingNotification;
76
import io.split.engine.sse.dtos.SplitKillNotification;
@@ -31,20 +30,19 @@ public static NotificationProcessorImp build(FeatureFlagsWorker featureFlagsWork
3130
return new NotificationProcessorImp(featureFlagsWorker, segmentWorker, pushStatusTracker);
3231
}
3332

34-
@Override
35-
public void process(IncomingNotification notification) {
36-
notification.handler(this);
33+
public void processUpdates(IncomingNotification notification) {
34+
_featureFlagsWorker.addToQueue(notification);
3735
}
3836

3937
@Override
40-
public void processSplitUpdate(FeatureFlagChangeNotification featureFlagChangeNotification) {
41-
_featureFlagsWorker.addToQueue(featureFlagChangeNotification);
38+
public void process(IncomingNotification notification) {
39+
notification.handler(this);
4240
}
4341

4442
@Override
4543
public void processSplitKill(SplitKillNotification splitKillNotification) {
4644
_featureFlagsWorker.kill(splitKillNotification);
47-
_featureFlagsWorker.addToQueue(new FeatureFlagChangeNotification(GenericNotificationData.builder()
45+
_featureFlagsWorker.addToQueue(new SplitKillNotification(GenericNotificationData.builder()
4846
.changeNumber(splitKillNotification.getChangeNumber())
4947
.channel(splitKillNotification.getChannel())
5048
.build()));

0 commit comments

Comments
 (0)