Skip to content

Commit b8029e2

Browse files
authored
Merge branch 'feature/rule-based-segment' into rbs-localhost
2 parents daa9e4a + f4c18eb commit b8029e2

26 files changed

+558
-243
lines changed

client/src/main/java/io/split/client/SplitFactoryImpl.java

-1
Original file line numberDiff line numberDiff line change
@@ -220,7 +220,6 @@ public SplitFactoryImpl(String apiToken, SplitClientConfig config) throws URISyn
220220
// Segments
221221
_segmentSynchronizationTaskImp = buildSegments(config, segmentCache, splitCache);
222222

223-
224223
SplitParser splitParser = new SplitParser();
225224
RuleBasedSegmentParser ruleBasedSegmentParser = new RuleBasedSegmentParser();
226225
// SplitFetcher

client/src/main/java/io/split/engine/common/SynchronizerImp.java

+11-5
Original file line numberDiff line numberDiff line change
@@ -118,8 +118,8 @@ private SyncResult attemptSplitsSync(long targetChangeNumber, long ruleBasedSegm
118118
if (fetchResult != null && !fetchResult.retry() && !fetchResult.isSuccess()) {
119119
return new SyncResult(false, remainingAttempts, fetchResult);
120120
}
121-
if ((targetChangeNumber != 0 && targetChangeNumber <= _splitCacheProducer.getChangeNumber()) ||
122-
(ruleBasedSegmentChangeNumber != 0 && ruleBasedSegmentChangeNumber <= _ruleBasedSegmentCacheProducer.getChangeNumber())) {
121+
if (targetChangeNumber <= _splitCacheProducer.getChangeNumber()
122+
&& ruleBasedSegmentChangeNumber <= _ruleBasedSegmentCacheProducer.getChangeNumber()) {
123123
return new SyncResult(true, remainingAttempts, fetchResult);
124124
} else if (remainingAttempts <= 0) {
125125
return new SyncResult(false, remainingAttempts, fetchResult);
@@ -137,9 +137,15 @@ private SyncResult attemptSplitsSync(long targetChangeNumber, long ruleBasedSegm
137137
@Override
138138
public void refreshSplits(Long targetChangeNumber, Long ruleBasedSegmentChangeNumber) {
139139

140-
if ((targetChangeNumber != 0 && targetChangeNumber <= _splitCacheProducer.getChangeNumber()) ||
141-
(ruleBasedSegmentChangeNumber != 0 && ruleBasedSegmentChangeNumber <= _ruleBasedSegmentCacheProducer.getChangeNumber()) ||
142-
(ruleBasedSegmentChangeNumber == 0 && targetChangeNumber == 0)) {
140+
if (targetChangeNumber == null || targetChangeNumber == 0) {
141+
targetChangeNumber = _splitCacheProducer.getChangeNumber();
142+
}
143+
if (ruleBasedSegmentChangeNumber == null || ruleBasedSegmentChangeNumber == 0) {
144+
ruleBasedSegmentChangeNumber = _ruleBasedSegmentCacheProducer.getChangeNumber();
145+
}
146+
147+
if (targetChangeNumber <= _splitCacheProducer.getChangeNumber()
148+
&& ruleBasedSegmentChangeNumber <= _ruleBasedSegmentCacheProducer.getChangeNumber()) {
143149
return;
144150
}
145151

client/src/main/java/io/split/engine/experiments/SplitFetcherImp.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -144,7 +144,7 @@ private Set<String> runWithoutExceptionHandling(FetchOptions options) throws Int
144144

145145
synchronized (_lock) {
146146
// check state one more time.
147-
if (checkExitConditions(change.featureFlags, _splitCacheProducer.getChangeNumber()) &&
147+
if (checkExitConditions(change.featureFlags, _splitCacheProducer.getChangeNumber()) ||
148148
checkExitConditions(change.ruleBasedSegments, _ruleBasedSegmentCacheProducer.getChangeNumber())) {
149149
// some other thread may have updated the shared state. exit
150150
return segments;

client/src/main/java/io/split/engine/sse/NotificationParserImp.java

+5-4
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,18 @@
11
package io.split.engine.sse;
22

3+
import io.split.client.dtos.RuleBasedSegment;
4+
import io.split.client.dtos.Split;
35
import io.split.client.utils.Json;
46

57
import io.split.engine.sse.dtos.ControlNotification;
68
import io.split.engine.sse.dtos.ErrorNotification;
7-
import io.split.engine.sse.dtos.FeatureFlagChangeNotification;
9+
import io.split.engine.sse.dtos.CommonChangeNotification;
810
import io.split.engine.sse.dtos.GenericNotificationData;
911
import io.split.engine.sse.dtos.IncomingNotification;
1012
import io.split.engine.sse.dtos.OccupancyNotification;
1113
import io.split.engine.sse.dtos.RawMessageNotification;
1214
import io.split.engine.sse.dtos.SegmentChangeNotification;
1315
import io.split.engine.sse.dtos.SplitKillNotification;
14-
import io.split.engine.sse.dtos.RuleBasedSegmentChangeNotification;
1516
import io.split.engine.sse.exceptions.EventParsingException;
1617

1718
public class NotificationParserImp implements NotificationParser {
@@ -48,9 +49,9 @@ public ErrorNotification parseError(String payload) throws EventParsingException
4849
private IncomingNotification parseNotification(GenericNotificationData genericNotificationData) throws Exception {
4950
switch (genericNotificationData.getType()) {
5051
case SPLIT_UPDATE:
51-
return new FeatureFlagChangeNotification(genericNotificationData);
52+
return new CommonChangeNotification(genericNotificationData, Split.class);
5253
case RB_SEGMENT_UPDATE:
53-
return new RuleBasedSegmentChangeNotification(genericNotificationData);
54+
return new CommonChangeNotification(genericNotificationData, RuleBasedSegment.class);
5455
case SPLIT_KILL:
5556
return new SplitKillNotification(genericNotificationData);
5657
case SEGMENT_UPDATE:

client/src/main/java/io/split/engine/sse/NotificationProcessor.java

+1-4
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,12 @@
11
package io.split.engine.sse;
22

3-
import io.split.engine.sse.dtos.FeatureFlagChangeNotification;
43
import io.split.engine.sse.dtos.IncomingNotification;
54
import io.split.engine.sse.dtos.SplitKillNotification;
65
import io.split.engine.sse.dtos.StatusNotification;
7-
import io.split.engine.sse.dtos.RuleBasedSegmentChangeNotification;
86

97
public interface NotificationProcessor {
108
void process(IncomingNotification notification);
11-
void processSplitUpdate(FeatureFlagChangeNotification featureFlagChangeNotification);
12-
void processRuleBasedSegmentUpdate(RuleBasedSegmentChangeNotification ruleBasedSegmentChangeNotification);
9+
void processUpdates(IncomingNotification notification);
1310
void processSplitKill(SplitKillNotification splitKillNotification);
1411
void processSegmentUpdate(long changeNumber, String segmentName);
1512
void processStatus(StatusNotification statusNotification);

client/src/main/java/io/split/engine/sse/NotificationProcessorImp.java

+5-13
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,11 @@
11
package io.split.engine.sse;
22

33
import com.google.common.annotations.VisibleForTesting;
4-
import io.split.engine.sse.dtos.FeatureFlagChangeNotification;
54
import io.split.engine.sse.dtos.GenericNotificationData;
65
import io.split.engine.sse.dtos.IncomingNotification;
76
import io.split.engine.sse.dtos.SplitKillNotification;
87
import io.split.engine.sse.dtos.StatusNotification;
98
import io.split.engine.sse.dtos.SegmentQueueDto;
10-
import io.split.engine.sse.dtos.RuleBasedSegmentChangeNotification;
119
import io.split.engine.sse.workers.FeatureFlagsWorker;
1210
import io.split.engine.sse.workers.Worker;
1311

@@ -32,25 +30,19 @@ public static NotificationProcessorImp build(FeatureFlagsWorker featureFlagsWork
3230
return new NotificationProcessorImp(featureFlagsWorker, segmentWorker, pushStatusTracker);
3331
}
3432

35-
@Override
36-
public void process(IncomingNotification notification) {
37-
notification.handler(this);
38-
}
39-
40-
@Override
41-
public void processSplitUpdate(FeatureFlagChangeNotification featureFlagChangeNotification) {
42-
_featureFlagsWorker.addToQueue(featureFlagChangeNotification);
33+
public void processUpdates(IncomingNotification notification) {
34+
_featureFlagsWorker.addToQueue(notification);
4335
}
4436

4537
@Override
46-
public void processRuleBasedSegmentUpdate(RuleBasedSegmentChangeNotification ruleBasedSegmentChangeNotification) {
47-
_featureFlagsWorker.addToQueue(ruleBasedSegmentChangeNotification);
38+
public void process(IncomingNotification notification) {
39+
notification.handler(this);
4840
}
4941

5042
@Override
5143
public void processSplitKill(SplitKillNotification splitKillNotification) {
5244
_featureFlagsWorker.kill(splitKillNotification);
53-
_featureFlagsWorker.addToQueue(new FeatureFlagChangeNotification(GenericNotificationData.builder()
45+
_featureFlagsWorker.addToQueue(new SplitKillNotification(GenericNotificationData.builder()
5446
.changeNumber(splitKillNotification.getChangeNumber())
5547
.channel(splitKillNotification.getChannel())
5648
.build()));

client/src/main/java/io/split/engine/sse/dtos/CommonChangeNotification.java

+21-8
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package io.split.engine.sse.dtos;
22

3+
import io.split.client.utils.Json;
34
import io.split.engine.segments.SegmentSynchronizationTaskImp;
45
import io.split.engine.sse.NotificationProcessor;
56
import io.split.engine.sse.enums.CompressType;
@@ -14,24 +15,29 @@
1415
import static io.split.engine.sse.utils.DecompressionUtil.gZipDecompress;
1516
import static io.split.engine.sse.utils.DecompressionUtil.zLibDecompress;
1617

17-
public class CommonChangeNotification extends IncomingNotification {
18+
public class CommonChangeNotification<Y> extends IncomingNotification {
1819
private static final Logger _log = LoggerFactory.getLogger(SegmentSynchronizationTaskImp.class);
1920
private final long changeNumber;
2021
private long previousChangeNumber;
2122
private CompressType compressType;
23+
private Y definition;
24+
private Class _definitionClass;
2225

23-
public CommonChangeNotification(GenericNotificationData genericNotificationData, IncomingNotification.Type notificationType) {
24-
super(notificationType, genericNotificationData.getChannel());
26+
public CommonChangeNotification(GenericNotificationData genericNotificationData,
27+
Class definitionClass) {
28+
super(genericNotificationData.getType(), genericNotificationData.getChannel());
2529
changeNumber = genericNotificationData.getChangeNumber();
30+
_definitionClass = definitionClass;
31+
2632
if(genericNotificationData.getPreviousChangeNumber() != null) {
2733
previousChangeNumber = genericNotificationData.getPreviousChangeNumber();
2834
}
2935
compressType = CompressType.from(genericNotificationData.getCompressType());
30-
if (compressType == null || genericNotificationData.getFeatureFlagDefinition() == null) {
36+
if (compressType == null || genericNotificationData.getDefinition() == null) {
3137
return;
3238
}
3339
try {
34-
byte[] decodedBytes = Base64.getDecoder().decode(genericNotificationData.getFeatureFlagDefinition());
40+
byte[] decodedBytes = Base64.getDecoder().decode(genericNotificationData.getDefinition());
3541
switch (compressType) {
3642
case GZIP:
3743
decodedBytes = gZipDecompress(decodedBytes);
@@ -57,18 +63,25 @@ public long getChangeNumber() {
5763
public long getPreviousChangeNumber() {
5864
return previousChangeNumber;
5965
}
60-
6166
public CompressType getCompressType() {
6267
return compressType;
6368
}
6469

70+
public Y getDefinition() {
71+
return definition;
72+
}
73+
6574
@Override
66-
public void handler(NotificationProcessor notificationProcessor) {}
75+
public void handler(NotificationProcessor notificationProcessor) {
76+
notificationProcessor.processUpdates(this);
77+
}
6778

6879
@Override
6980
public String toString() {
7081
return String.format("Type: %s; Channel: %s; ChangeNumber: %s", getType(), getChannel(), getChangeNumber());
7182
}
7283

73-
public void updateDefinition(byte[] decodedBytes) throws UnsupportedEncodingException {};
84+
private void updateDefinition(byte[] decodedBytes) throws UnsupportedEncodingException {
85+
definition = (Y) Json.fromJson(new String(decodedBytes, "UTF-8"), _definitionClass);
86+
}
7487
}

client/src/main/java/io/split/engine/sse/dtos/FeatureFlagChangeNotification.java

-29
This file was deleted.

client/src/main/java/io/split/engine/sse/dtos/GenericNotificationData.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ public Long getPreviousChangeNumber() {
7575
return previousChangeNumber;
7676
}
7777

78-
public String getFeatureFlagDefinition() {
78+
public String getDefinition() {
7979
return featureFlagDefinition;
8080
}
8181

client/src/main/java/io/split/engine/sse/dtos/RuleBasedSegmentChangeNotification.java

-29
This file was deleted.

client/src/main/java/io/split/engine/sse/workers/FeatureFlagWorkerImp.java

+11-12
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,8 @@
88
import io.split.engine.common.Synchronizer;
99
import io.split.engine.experiments.RuleBasedSegmentParser;
1010
import io.split.engine.experiments.SplitParser;
11-
import io.split.engine.sse.dtos.FeatureFlagChangeNotification;
11+
import io.split.engine.sse.dtos.CommonChangeNotification;
1212
import io.split.engine.sse.dtos.IncomingNotification;
13-
import io.split.engine.sse.dtos.RuleBasedSegmentChangeNotification;
1413
import io.split.engine.sse.dtos.SplitKillNotification;
1514
import io.split.storages.RuleBasedSegmentCache;
1615
import io.split.storages.SplitCacheProducer;
@@ -67,26 +66,26 @@ protected void executeRefresh(IncomingNotification incomingNotification) {
6766
long changeNumber = 0L;
6867
long changeNumberRBS = 0L;
6968
if (incomingNotification.getType() == IncomingNotification.Type.SPLIT_UPDATE) {
70-
FeatureFlagChangeNotification featureFlagChangeNotification = (FeatureFlagChangeNotification) incomingNotification;
69+
CommonChangeNotification<Split> featureFlagChangeNotification = (CommonChangeNotification) incomingNotification;
7170
success = addOrUpdateFeatureFlag(featureFlagChangeNotification);
7271
changeNumber = featureFlagChangeNotification.getChangeNumber();
7372
} else {
74-
RuleBasedSegmentChangeNotification ruleBasedSegmentChangeNotification = (RuleBasedSegmentChangeNotification) incomingNotification;
75-
success = AddOrUpdateRuleBasedSegment((RuleBasedSegmentChangeNotification) incomingNotification);
73+
CommonChangeNotification<RuleBasedSegment> ruleBasedSegmentChangeNotification = (CommonChangeNotification) incomingNotification;
74+
success = AddOrUpdateRuleBasedSegment(ruleBasedSegmentChangeNotification);
7675
changeNumberRBS = ruleBasedSegmentChangeNotification.getChangeNumber();
7776
}
7877
if (!success)
7978
_synchronizer.refreshSplits(changeNumber, changeNumberRBS);
8079
}
8180

82-
private boolean AddOrUpdateRuleBasedSegment(RuleBasedSegmentChangeNotification ruleBasedSegmentChangeNotification) {
81+
private boolean AddOrUpdateRuleBasedSegment(CommonChangeNotification ruleBasedSegmentChangeNotification) {
8382
if (ruleBasedSegmentChangeNotification.getChangeNumber() <= _ruleBasedSegmentCache.getChangeNumber()) {
8483
return true;
8584
}
8685
try {
87-
if (ruleBasedSegmentChangeNotification.getRuleBasedSegmentDefinition() != null &&
86+
if (ruleBasedSegmentChangeNotification.getDefinition() != null &&
8887
ruleBasedSegmentChangeNotification.getPreviousChangeNumber() == _ruleBasedSegmentCache.getChangeNumber()) {
89-
RuleBasedSegment ruleBasedSegment = ruleBasedSegmentChangeNotification.getRuleBasedSegmentDefinition();
88+
RuleBasedSegment ruleBasedSegment = (RuleBasedSegment) ruleBasedSegmentChangeNotification.getDefinition();
9089
RuleBasedSegmentsToUpdate ruleBasedSegmentsToUpdate = processRuleBasedSegmentChanges(_ruleBasedSegmentParser,
9190
Collections.singletonList(ruleBasedSegment));
9291
_ruleBasedSegmentCache.update(ruleBasedSegmentsToUpdate.getToAdd(), ruleBasedSegmentsToUpdate.getToRemove(),
@@ -104,14 +103,14 @@ private boolean AddOrUpdateRuleBasedSegment(RuleBasedSegmentChangeNotification r
104103
}
105104
return false;
106105
}
107-
private boolean addOrUpdateFeatureFlag(FeatureFlagChangeNotification featureFlagChangeNotification) {
106+
private boolean addOrUpdateFeatureFlag(CommonChangeNotification featureFlagChangeNotification) {
108107
if (featureFlagChangeNotification.getChangeNumber() <= _splitCacheProducer.getChangeNumber()) {
109108
return true;
110109
}
111110
try {
112-
if (featureFlagChangeNotification.getFeatureFlagDefinition() != null &&
111+
if (featureFlagChangeNotification.getDefinition() != null &&
113112
featureFlagChangeNotification.getPreviousChangeNumber() == _splitCacheProducer.getChangeNumber()) {
114-
Split featureFlag = featureFlagChangeNotification.getFeatureFlagDefinition();
113+
Split featureFlag = (Split) featureFlagChangeNotification.getDefinition();
115114
FeatureFlagsToUpdate featureFlagsToUpdate = processFeatureFlagChanges(_splitParser, Collections.singletonList(featureFlag),
116115
_flagSetsFilter);
117116
_splitCacheProducer.update(featureFlagsToUpdate.getToAdd(), featureFlagsToUpdate.getToRemove(),
@@ -123,7 +122,7 @@ private boolean addOrUpdateFeatureFlag(FeatureFlagChangeNotification featureFlag
123122
if (featureFlagsToUpdate.getToAdd().stream().count() > 0) {
124123
Set<String> ruleBasedSegments = featureFlagsToUpdate.getToAdd().get(0).getRuleBasedSegmentsNames();
125124
if (!ruleBasedSegments.isEmpty() && !_ruleBasedSegmentCache.contains(ruleBasedSegments)) {
126-
_synchronizer.refreshSplits(featureFlagChangeNotification.getChangeNumber(), 0L);
125+
return false;
127126
}
128127
}
129128
_telemetryRuntimeProducer.recordUpdatesFromSSE(UpdatesFromSSEEnum.SPLITS);

client/src/main/java/io/split/storages/RuleBasedSegmentCacheCommons.java

-8
This file was deleted.

client/src/main/java/io/split/storages/RuleBasedSegmentCacheConsumer.java

+3-1
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,11 @@
77
import java.util.Map;
88
import java.util.Set;
99

10-
public interface RuleBasedSegmentCacheConsumer extends RuleBasedSegmentCacheCommons {
10+
public interface RuleBasedSegmentCacheConsumer {
1111
ParsedRuleBasedSegment get(String name);
1212
Collection<ParsedRuleBasedSegment> getAll();
1313
List<String> ruleBasedSegmentNames();
1414
boolean contains(Set<String> ruleBasedSegmentNames);
15+
long getChangeNumber();
16+
Set<String> getSegments();
1517
}

client/src/main/java/io/split/storages/RuleBasedSegmentCacheProducer.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,9 @@
44

55
import java.util.List;
66

7-
public interface RuleBasedSegmentCacheProducer extends RuleBasedSegmentCacheCommons{
7+
public interface RuleBasedSegmentCacheProducer {
88
boolean remove(String name);
99
void setChangeNumber(long changeNumber);
10+
long getChangeNumber();
1011
void update(List<ParsedRuleBasedSegment> toAdd, List<String> toRemove, long changeNumber);
1112
}

client/src/main/java/io/split/storages/memory/RuleBasedSegmentCacheInMemoryImp.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ public void clear() {
7575
_concurrentMap.clear();
7676
}
7777

78-
public void putMany(List<ParsedRuleBasedSegment> ruleBasedSegments) {
78+
private void putMany(List<ParsedRuleBasedSegment> ruleBasedSegments) {
7979
for (ParsedRuleBasedSegment ruleBasedSegment : ruleBasedSegments) {
8080
_concurrentMap.put(ruleBasedSegment.ruleBasedSegment(), ruleBasedSegment);
8181
}

0 commit comments

Comments
 (0)