diff --git a/client/src/main/java/io/split/client/SplitFactoryImpl.java b/client/src/main/java/io/split/client/SplitFactoryImpl.java index 3104f4266..7d8374e1a 100644 --- a/client/src/main/java/io/split/client/SplitFactoryImpl.java +++ b/client/src/main/java/io/split/client/SplitFactoryImpl.java @@ -277,7 +277,7 @@ public SplitFactoryImpl(String apiToken, SplitClientConfig config) throws URISyn _syncManager = SyncManagerImp.build(splitTasks, _splitFetcher, splitCache, splitAPI, segmentCache, _gates, _telemetryStorageProducer, _telemetrySynchronizer, config, splitParser, - flagSetsFilter); + ruleBasedSegmentParser, flagSetsFilter, ruleBasedSegmentCache); _syncManager.start(); // DestroyOnShutDown diff --git a/client/src/main/java/io/split/engine/common/ConsumerSynchronizer.java b/client/src/main/java/io/split/engine/common/ConsumerSynchronizer.java index 7bcee43a5..c92092537 100644 --- a/client/src/main/java/io/split/engine/common/ConsumerSynchronizer.java +++ b/client/src/main/java/io/split/engine/common/ConsumerSynchronizer.java @@ -35,7 +35,7 @@ public void stopPeriodicFetching() { } @Override - public void refreshSplits(Long targetChangeNumber) { + public void refreshSplits(Long targetChangeNumber, Long ruleBasedSegmentChangeNumber) { //No-Op } diff --git a/client/src/main/java/io/split/engine/common/LocalhostSynchronizer.java b/client/src/main/java/io/split/engine/common/LocalhostSynchronizer.java index e92151846..211148804 100644 --- a/client/src/main/java/io/split/engine/common/LocalhostSynchronizer.java +++ b/client/src/main/java/io/split/engine/common/LocalhostSynchronizer.java @@ -56,7 +56,7 @@ public void stopPeriodicFetching() { } @Override - public void refreshSplits(Long targetChangeNumber) { + public void refreshSplits(Long targetChangeNumber, Long ruleBasedSegmentChangeNumber) { FetchResult fetchResult = _splitFetcher.forceRefresh(new FetchOptions.Builder().cacheControlHeaders(true).build()); if (fetchResult.isSuccess()){ _log.debug("Refresh feature flags completed"); diff --git a/client/src/main/java/io/split/engine/common/PushManagerImp.java b/client/src/main/java/io/split/engine/common/PushManagerImp.java index 653249308..4862765f4 100644 --- a/client/src/main/java/io/split/engine/common/PushManagerImp.java +++ b/client/src/main/java/io/split/engine/common/PushManagerImp.java @@ -2,6 +2,7 @@ import com.google.common.annotations.VisibleForTesting; import io.split.client.interceptors.FlagSetsFilter; +import io.split.engine.experiments.RuleBasedSegmentParser; import io.split.engine.experiments.SplitParser; import io.split.engine.sse.AuthApiClient; import io.split.engine.sse.AuthApiClientImp; @@ -17,6 +18,7 @@ import io.split.engine.sse.workers.FeatureFlagWorkerImp; import io.split.engine.sse.workers.Worker; +import io.split.storages.RuleBasedSegmentCache; import io.split.storages.SplitCacheProducer; import io.split.telemetry.domain.StreamingEvent; import io.split.telemetry.domain.enums.StreamEventsEnum; @@ -79,9 +81,11 @@ public static PushManagerImp build(Synchronizer synchronizer, ThreadFactory threadFactory, SplitParser splitParser, SplitCacheProducer splitCacheProducer, - FlagSetsFilter flagSetsFilter) { - FeatureFlagsWorker featureFlagsWorker = new FeatureFlagWorkerImp(synchronizer, splitParser, splitCacheProducer, - telemetryRuntimeProducer, flagSetsFilter); + FlagSetsFilter flagSetsFilter, + RuleBasedSegmentCache ruleBasedSegmentCache, + RuleBasedSegmentParser ruleBasedSegmentParser) { + FeatureFlagsWorker featureFlagsWorker = new FeatureFlagWorkerImp(synchronizer, splitParser, ruleBasedSegmentParser, splitCacheProducer, + ruleBasedSegmentCache, telemetryRuntimeProducer, flagSetsFilter); Worker segmentWorker = new SegmentsWorkerImp(synchronizer); PushStatusTracker pushStatusTracker = new PushStatusTrackerImp(statusMessages, telemetryRuntimeProducer); diff --git a/client/src/main/java/io/split/engine/common/SyncManagerImp.java b/client/src/main/java/io/split/engine/common/SyncManagerImp.java index 691b6def4..8de8ec510 100644 --- a/client/src/main/java/io/split/engine/common/SyncManagerImp.java +++ b/client/src/main/java/io/split/engine/common/SyncManagerImp.java @@ -5,10 +5,12 @@ import io.split.client.SplitClientConfig; import io.split.client.interceptors.FlagSetsFilter; import io.split.engine.SDKReadinessGates; +import io.split.engine.experiments.RuleBasedSegmentParser; import io.split.engine.experiments.SplitFetcher; import io.split.engine.experiments.SplitParser; import io.split.engine.experiments.SplitSynchronizationTask; import io.split.engine.segments.SegmentSynchronizationTask; +import io.split.storages.RuleBasedSegmentCache; import io.split.storages.SegmentCacheProducer; import io.split.storages.SplitCacheProducer; import io.split.telemetry.domain.StreamingEvent; @@ -89,12 +91,15 @@ public static SyncManagerImp build(SplitTasks splitTasks, TelemetrySynchronizer telemetrySynchronizer, SplitClientConfig config, SplitParser splitParser, - FlagSetsFilter flagSetsFilter) { + RuleBasedSegmentParser ruleBasedSegmentParser, + FlagSetsFilter flagSetsFilter, + RuleBasedSegmentCache ruleBasedSegmentCache) { LinkedBlockingQueue pushMessages = new LinkedBlockingQueue<>(); Synchronizer synchronizer = new SynchronizerImp(splitTasks, splitFetcher, splitCacheProducer, segmentCacheProducer, + ruleBasedSegmentCache, config.streamingRetryDelay(), config.streamingFetchMaxRetries(), config.failedAttemptsBeforeLogging(), @@ -109,7 +114,9 @@ public static SyncManagerImp build(SplitTasks splitTasks, config.getThreadFactory(), splitParser, splitCacheProducer, - flagSetsFilter); + flagSetsFilter, + ruleBasedSegmentCache, + ruleBasedSegmentParser); return new SyncManagerImp(splitTasks, config.streamingEnabled(), diff --git a/client/src/main/java/io/split/engine/common/Synchronizer.java b/client/src/main/java/io/split/engine/common/Synchronizer.java index 8885e8b16..d685a3ed7 100644 --- a/client/src/main/java/io/split/engine/common/Synchronizer.java +++ b/client/src/main/java/io/split/engine/common/Synchronizer.java @@ -6,7 +6,7 @@ public interface Synchronizer { boolean syncAll(); void startPeriodicFetching(); void stopPeriodicFetching(); - void refreshSplits(Long targetChangeNumber); + void refreshSplits(Long targetChangeNumber, Long ruleBasedSegmentChangeNumber); void localKillSplit(SplitKillNotification splitKillNotification); void refreshSegment(String segmentName, Long targetChangeNumber); void startPeriodicDataRecording(); diff --git a/client/src/main/java/io/split/engine/common/SynchronizerImp.java b/client/src/main/java/io/split/engine/common/SynchronizerImp.java index 81f26ccd4..d9210578c 100644 --- a/client/src/main/java/io/split/engine/common/SynchronizerImp.java +++ b/client/src/main/java/io/split/engine/common/SynchronizerImp.java @@ -9,6 +9,7 @@ import io.split.engine.segments.SegmentFetcher; import io.split.engine.segments.SegmentSynchronizationTask; import io.split.engine.sse.dtos.SplitKillNotification; +import io.split.storages.RuleBasedSegmentCacheProducer; import io.split.storages.SegmentCacheProducer; import io.split.storages.SplitCacheProducer; import io.split.telemetry.synchronizer.TelemetrySyncTask; @@ -34,6 +35,7 @@ public class SynchronizerImp implements Synchronizer { private final SplitFetcher _splitFetcher; private final SegmentSynchronizationTask _segmentSynchronizationTaskImp; private final SplitCacheProducer _splitCacheProducer; + private final RuleBasedSegmentCacheProducer _ruleBasedSegmentCacheProducer; private final SegmentCacheProducer segmentCacheProducer; private final ImpressionsManager _impressionManager; private final EventsTask _eventsTask; @@ -48,6 +50,7 @@ public SynchronizerImp(SplitTasks splitTasks, SplitFetcher splitFetcher, SplitCacheProducer splitCacheProducer, SegmentCacheProducer segmentCacheProducer, + RuleBasedSegmentCacheProducer ruleBasedSegmentCacheProducer, int onDemandFetchRetryDelayMs, int onDemandFetchMaxRetries, int failedAttemptsBeforeLogging, @@ -56,6 +59,7 @@ public SynchronizerImp(SplitTasks splitTasks, _splitFetcher = checkNotNull(splitFetcher); _segmentSynchronizationTaskImp = checkNotNull(splitTasks.getSegmentSynchronizationTask()); _splitCacheProducer = checkNotNull(splitCacheProducer); + _ruleBasedSegmentCacheProducer = checkNotNull(ruleBasedSegmentCacheProducer); this.segmentCacheProducer = checkNotNull(segmentCacheProducer); _onDemandFetchRetryDelayMs = checkNotNull(onDemandFetchRetryDelayMs); _onDemandFetchMaxRetries = onDemandFetchMaxRetries; @@ -103,7 +107,7 @@ private static class SyncResult { private final FetchResult _fetchResult; } - private SyncResult attemptSplitsSync(long targetChangeNumber, + private SyncResult attemptSplitsSync(long targetChangeNumber, long ruleBasedSegmentChangeNumber, FetchOptions opts, Function nextWaitMs, int maxRetries) { @@ -114,7 +118,8 @@ private SyncResult attemptSplitsSync(long targetChangeNumber, if (fetchResult != null && !fetchResult.retry() && !fetchResult.isSuccess()) { return new SyncResult(false, remainingAttempts, fetchResult); } - if (targetChangeNumber <= _splitCacheProducer.getChangeNumber()) { + if (targetChangeNumber <= _splitCacheProducer.getChangeNumber() + && ruleBasedSegmentChangeNumber <= _ruleBasedSegmentCacheProducer.getChangeNumber()) { return new SyncResult(true, remainingAttempts, fetchResult); } else if (remainingAttempts <= 0) { return new SyncResult(false, remainingAttempts, fetchResult); @@ -130,9 +135,17 @@ private SyncResult attemptSplitsSync(long targetChangeNumber, } @Override - public void refreshSplits(Long targetChangeNumber) { + public void refreshSplits(Long targetChangeNumber, Long ruleBasedSegmentChangeNumber) { - if (targetChangeNumber <= _splitCacheProducer.getChangeNumber()) { + if (targetChangeNumber == null || targetChangeNumber == 0) { + targetChangeNumber = _splitCacheProducer.getChangeNumber(); + } + if (ruleBasedSegmentChangeNumber == null || ruleBasedSegmentChangeNumber == 0) { + ruleBasedSegmentChangeNumber = _ruleBasedSegmentCacheProducer.getChangeNumber(); + } + + if (targetChangeNumber <= _splitCacheProducer.getChangeNumber() + && ruleBasedSegmentChangeNumber <= _ruleBasedSegmentCacheProducer.getChangeNumber()) { return; } @@ -142,7 +155,7 @@ public void refreshSplits(Long targetChangeNumber) { .flagSetsFilter(_sets) .build(); - SyncResult regularResult = attemptSplitsSync(targetChangeNumber, opts, + SyncResult regularResult = attemptSplitsSync(targetChangeNumber, ruleBasedSegmentChangeNumber, opts, (discard) -> (long) _onDemandFetchRetryDelayMs, _onDemandFetchMaxRetries); int attempts = _onDemandFetchMaxRetries - regularResult.remainingAttempts(); @@ -157,7 +170,7 @@ public void refreshSplits(Long targetChangeNumber) { _log.info(String.format("No changes fetched after %s attempts. Will retry bypassing CDN.", attempts)); FetchOptions withCdnBypass = new FetchOptions.Builder(opts).targetChangeNumber(targetChangeNumber).build(); Backoff backoff = new Backoff(ON_DEMAND_FETCH_BACKOFF_BASE_MS, ON_DEMAND_FETCH_BACKOFF_MAX_WAIT_MS); - SyncResult withCDNBypassed = attemptSplitsSync(targetChangeNumber, withCdnBypass, + SyncResult withCDNBypassed = attemptSplitsSync(targetChangeNumber, ruleBasedSegmentChangeNumber, withCdnBypass, (discard) -> backoff.interval(), ON_DEMAND_FETCH_BACKOFF_MAX_RETRIES); int withoutCDNAttempts = ON_DEMAND_FETCH_BACKOFF_MAX_RETRIES - withCDNBypassed._remainingAttempts; @@ -175,7 +188,7 @@ public void localKillSplit(SplitKillNotification splitKillNotification) { if (splitKillNotification.getChangeNumber() > _splitCacheProducer.getChangeNumber()) { _splitCacheProducer.kill(splitKillNotification.getSplitName(), splitKillNotification.getDefaultTreatment(), splitKillNotification.getChangeNumber()); - refreshSplits(splitKillNotification.getChangeNumber()); + refreshSplits(splitKillNotification.getChangeNumber(), 0L); } } diff --git a/client/src/main/java/io/split/engine/experiments/ParsedSplit.java b/client/src/main/java/io/split/engine/experiments/ParsedSplit.java index b94b5d964..a4d52d6a2 100644 --- a/client/src/main/java/io/split/engine/experiments/ParsedSplit.java +++ b/client/src/main/java/io/split/engine/experiments/ParsedSplit.java @@ -2,6 +2,7 @@ import com.google.common.collect.ImmutableList; import io.split.engine.matchers.AttributeMatcher; +import io.split.engine.matchers.RuleBasedSegmentMatcher; import io.split.engine.matchers.UserDefinedSegmentMatcher; import java.util.HashSet; @@ -243,6 +244,15 @@ public Set getSegmentsNames() { .collect(Collectors.toSet()); } + public Set getRuleBasedSegmentsNames() { + return parsedConditions().stream() + .flatMap(parsedCondition -> parsedCondition.matcher().attributeMatchers().stream()) + .filter(ParsedSplit::isRuleBasedSegmentMatcher) + .map(ParsedSplit::asRuleBasedSegmentMatcherForEach) + .map(RuleBasedSegmentMatcher::getSegmentName) + .collect(Collectors.toSet()); + } + private static boolean isSegmentMatcher(AttributeMatcher attributeMatcher) { return ((AttributeMatcher.NegatableMatcher) attributeMatcher.matcher()).delegate() instanceof UserDefinedSegmentMatcher; } @@ -251,4 +261,11 @@ private static UserDefinedSegmentMatcher asSegmentMatcherForEach(AttributeMatche return (UserDefinedSegmentMatcher) ((AttributeMatcher.NegatableMatcher) attributeMatcher.matcher()).delegate(); } + private static boolean isRuleBasedSegmentMatcher(AttributeMatcher attributeMatcher) { + return ((AttributeMatcher.NegatableMatcher) attributeMatcher.matcher()).delegate() instanceof RuleBasedSegmentMatcher; + } + + private static RuleBasedSegmentMatcher asRuleBasedSegmentMatcherForEach(AttributeMatcher attributeMatcher) { + return (RuleBasedSegmentMatcher) ((AttributeMatcher.NegatableMatcher) attributeMatcher.matcher()).delegate(); + } } diff --git a/client/src/main/java/io/split/engine/sse/NotificationParserImp.java b/client/src/main/java/io/split/engine/sse/NotificationParserImp.java index e5fee7502..8bfaf886c 100644 --- a/client/src/main/java/io/split/engine/sse/NotificationParserImp.java +++ b/client/src/main/java/io/split/engine/sse/NotificationParserImp.java @@ -1,10 +1,12 @@ package io.split.engine.sse; +import io.split.client.dtos.RuleBasedSegment; +import io.split.client.dtos.Split; import io.split.client.utils.Json; import io.split.engine.sse.dtos.ControlNotification; import io.split.engine.sse.dtos.ErrorNotification; -import io.split.engine.sse.dtos.FeatureFlagChangeNotification; +import io.split.engine.sse.dtos.CommonChangeNotification; import io.split.engine.sse.dtos.GenericNotificationData; import io.split.engine.sse.dtos.IncomingNotification; import io.split.engine.sse.dtos.OccupancyNotification; @@ -47,7 +49,9 @@ public ErrorNotification parseError(String payload) throws EventParsingException private IncomingNotification parseNotification(GenericNotificationData genericNotificationData) throws Exception { switch (genericNotificationData.getType()) { case SPLIT_UPDATE: - return new FeatureFlagChangeNotification(genericNotificationData); + return new CommonChangeNotification(genericNotificationData, Split.class); + case RB_SEGMENT_UPDATE: + return new CommonChangeNotification(genericNotificationData, RuleBasedSegment.class); case SPLIT_KILL: return new SplitKillNotification(genericNotificationData); case SEGMENT_UPDATE: diff --git a/client/src/main/java/io/split/engine/sse/NotificationProcessor.java b/client/src/main/java/io/split/engine/sse/NotificationProcessor.java index bdd842455..fce86757c 100644 --- a/client/src/main/java/io/split/engine/sse/NotificationProcessor.java +++ b/client/src/main/java/io/split/engine/sse/NotificationProcessor.java @@ -1,13 +1,12 @@ package io.split.engine.sse; -import io.split.engine.sse.dtos.FeatureFlagChangeNotification; import io.split.engine.sse.dtos.IncomingNotification; import io.split.engine.sse.dtos.SplitKillNotification; import io.split.engine.sse.dtos.StatusNotification; public interface NotificationProcessor { void process(IncomingNotification notification); - void processSplitUpdate(FeatureFlagChangeNotification featureFlagChangeNotification); + void processUpdates(IncomingNotification notification); void processSplitKill(SplitKillNotification splitKillNotification); void processSegmentUpdate(long changeNumber, String segmentName); void processStatus(StatusNotification statusNotification); diff --git a/client/src/main/java/io/split/engine/sse/NotificationProcessorImp.java b/client/src/main/java/io/split/engine/sse/NotificationProcessorImp.java index b21a7344a..7e38cbadc 100644 --- a/client/src/main/java/io/split/engine/sse/NotificationProcessorImp.java +++ b/client/src/main/java/io/split/engine/sse/NotificationProcessorImp.java @@ -1,7 +1,6 @@ package io.split.engine.sse; import com.google.common.annotations.VisibleForTesting; -import io.split.engine.sse.dtos.FeatureFlagChangeNotification; import io.split.engine.sse.dtos.GenericNotificationData; import io.split.engine.sse.dtos.IncomingNotification; import io.split.engine.sse.dtos.SplitKillNotification; @@ -31,20 +30,19 @@ public static NotificationProcessorImp build(FeatureFlagsWorker featureFlagsWork return new NotificationProcessorImp(featureFlagsWorker, segmentWorker, pushStatusTracker); } - @Override - public void process(IncomingNotification notification) { - notification.handler(this); + public void processUpdates(IncomingNotification notification) { + _featureFlagsWorker.addToQueue(notification); } @Override - public void processSplitUpdate(FeatureFlagChangeNotification featureFlagChangeNotification) { - _featureFlagsWorker.addToQueue(featureFlagChangeNotification); + public void process(IncomingNotification notification) { + notification.handler(this); } @Override public void processSplitKill(SplitKillNotification splitKillNotification) { _featureFlagsWorker.kill(splitKillNotification); - _featureFlagsWorker.addToQueue(new FeatureFlagChangeNotification(GenericNotificationData.builder() + _featureFlagsWorker.addToQueue(new SplitKillNotification(GenericNotificationData.builder() .changeNumber(splitKillNotification.getChangeNumber()) .channel(splitKillNotification.getChannel()) .build())); diff --git a/client/src/main/java/io/split/engine/sse/dtos/FeatureFlagChangeNotification.java b/client/src/main/java/io/split/engine/sse/dtos/CommonChangeNotification.java similarity index 66% rename from client/src/main/java/io/split/engine/sse/dtos/FeatureFlagChangeNotification.java rename to client/src/main/java/io/split/engine/sse/dtos/CommonChangeNotification.java index 05f79abec..e29426599 100644 --- a/client/src/main/java/io/split/engine/sse/dtos/FeatureFlagChangeNotification.java +++ b/client/src/main/java/io/split/engine/sse/dtos/CommonChangeNotification.java @@ -1,6 +1,5 @@ package io.split.engine.sse.dtos; -import io.split.client.dtos.Split; import io.split.client.utils.Json; import io.split.engine.segments.SegmentSynchronizationTaskImp; import io.split.engine.sse.NotificationProcessor; @@ -16,25 +15,29 @@ import static io.split.engine.sse.utils.DecompressionUtil.gZipDecompress; import static io.split.engine.sse.utils.DecompressionUtil.zLibDecompress; -public class FeatureFlagChangeNotification extends IncomingNotification { +public class CommonChangeNotification extends IncomingNotification { private static final Logger _log = LoggerFactory.getLogger(SegmentSynchronizationTaskImp.class); private final long changeNumber; private long previousChangeNumber; - private Split featureFlagDefinition; private CompressType compressType; + private Y definition; + private Class _definitionClass; - public FeatureFlagChangeNotification(GenericNotificationData genericNotificationData) { - super(Type.SPLIT_UPDATE, genericNotificationData.getChannel()); + public CommonChangeNotification(GenericNotificationData genericNotificationData, + Class definitionClass) { + super(genericNotificationData.getType(), genericNotificationData.getChannel()); changeNumber = genericNotificationData.getChangeNumber(); + _definitionClass = definitionClass; + if(genericNotificationData.getPreviousChangeNumber() != null) { previousChangeNumber = genericNotificationData.getPreviousChangeNumber(); } compressType = CompressType.from(genericNotificationData.getCompressType()); - if (compressType == null || genericNotificationData.getFeatureFlagDefinition() == null) { + if (compressType == null || genericNotificationData.getDefinition() == null) { return; } try { - byte[] decodedBytes = Base64.getDecoder().decode(genericNotificationData.getFeatureFlagDefinition()); + byte[] decodedBytes = Base64.getDecoder().decode(genericNotificationData.getDefinition()); switch (compressType) { case GZIP: decodedBytes = gZipDecompress(decodedBytes); @@ -43,13 +46,14 @@ public FeatureFlagChangeNotification(GenericNotificationData genericNotification decodedBytes = zLibDecompress(decodedBytes); break; } - featureFlagDefinition = Json.fromJson(new String(decodedBytes, 0, decodedBytes.length, "UTF-8"), Split.class); + + updateDefinition(decodedBytes); } catch (UnsupportedEncodingException | IllegalArgumentException e) { - _log.warn("Could not decode base64 data in flag definition", e); + _log.warn("Could not decode base64 data in definition", e); } catch (DataFormatException d) { - _log.warn("Could not decompress feature flag definition with zlib algorithm", d); + _log.warn("Could not decompress definition with zlib algorithm", d); } catch (IOException i) { - _log.warn("Could not decompress feature flag definition with gzip algorithm", i); + _log.warn("Could not decompress definition with gzip algorithm", i); } } @@ -59,22 +63,25 @@ public long getChangeNumber() { public long getPreviousChangeNumber() { return previousChangeNumber; } - - public Split getFeatureFlagDefinition() { - return featureFlagDefinition; - } - public CompressType getCompressType() { return compressType; } + public Y getDefinition() { + return definition; + } + @Override public void handler(NotificationProcessor notificationProcessor) { - notificationProcessor.processSplitUpdate(this); + notificationProcessor.processUpdates(this); } @Override public String toString() { return String.format("Type: %s; Channel: %s; ChangeNumber: %s", getType(), getChannel(), getChangeNumber()); } + + private void updateDefinition(byte[] decodedBytes) throws UnsupportedEncodingException { + definition = (Y) Json.fromJson(new String(decodedBytes, "UTF-8"), _definitionClass); + } } \ No newline at end of file diff --git a/client/src/main/java/io/split/engine/sse/dtos/GenericNotificationData.java b/client/src/main/java/io/split/engine/sse/dtos/GenericNotificationData.java index 7fd3dc1bd..998434ec9 100644 --- a/client/src/main/java/io/split/engine/sse/dtos/GenericNotificationData.java +++ b/client/src/main/java/io/split/engine/sse/dtos/GenericNotificationData.java @@ -75,7 +75,7 @@ public Long getPreviousChangeNumber() { return previousChangeNumber; } - public String getFeatureFlagDefinition() { + public String getDefinition() { return featureFlagDefinition; } diff --git a/client/src/main/java/io/split/engine/sse/dtos/IncomingNotification.java b/client/src/main/java/io/split/engine/sse/dtos/IncomingNotification.java index aa476e431..ee00bafe4 100644 --- a/client/src/main/java/io/split/engine/sse/dtos/IncomingNotification.java +++ b/client/src/main/java/io/split/engine/sse/dtos/IncomingNotification.java @@ -5,6 +5,7 @@ public abstract class IncomingNotification { public enum Type { SPLIT_UPDATE, + RB_SEGMENT_UPDATE, SPLIT_KILL, SEGMENT_UPDATE, CONTROL, diff --git a/client/src/main/java/io/split/engine/sse/workers/FeatureFlagWorkerImp.java b/client/src/main/java/io/split/engine/sse/workers/FeatureFlagWorkerImp.java index f66656495..33f9481c8 100644 --- a/client/src/main/java/io/split/engine/sse/workers/FeatureFlagWorkerImp.java +++ b/client/src/main/java/io/split/engine/sse/workers/FeatureFlagWorkerImp.java @@ -1,12 +1,17 @@ package io.split.engine.sse.workers; +import io.split.client.dtos.RuleBasedSegment; import io.split.client.dtos.Split; import io.split.client.interceptors.FlagSetsFilter; import io.split.client.utils.FeatureFlagsToUpdate; +import io.split.client.utils.RuleBasedSegmentsToUpdate; import io.split.engine.common.Synchronizer; +import io.split.engine.experiments.RuleBasedSegmentParser; import io.split.engine.experiments.SplitParser; -import io.split.engine.sse.dtos.FeatureFlagChangeNotification; +import io.split.engine.sse.dtos.CommonChangeNotification; +import io.split.engine.sse.dtos.IncomingNotification; import io.split.engine.sse.dtos.SplitKillNotification; +import io.split.storages.RuleBasedSegmentCache; import io.split.storages.SplitCacheProducer; import io.split.telemetry.domain.enums.UpdatesFromSSEEnum; import io.split.telemetry.storage.TelemetryRuntimeProducer; @@ -18,23 +23,30 @@ import static com.google.common.base.Preconditions.checkNotNull; import static io.split.client.utils.FeatureFlagProcessor.processFeatureFlagChanges; +import static io.split.client.utils.RuleBasedSegmentProcessor.processRuleBasedSegmentChanges; -public class FeatureFlagWorkerImp extends Worker implements FeatureFlagsWorker { +public class FeatureFlagWorkerImp extends Worker implements FeatureFlagsWorker { private static final Logger _log = LoggerFactory.getLogger(FeatureFlagWorkerImp.class); private final Synchronizer _synchronizer; private final SplitParser _splitParser; + private final RuleBasedSegmentParser _ruleBasedSegmentParser; private final SplitCacheProducer _splitCacheProducer; + private final RuleBasedSegmentCache _ruleBasedSegmentCache; private final TelemetryRuntimeProducer _telemetryRuntimeProducer; private final FlagSetsFilter _flagSetsFilter; - public FeatureFlagWorkerImp(Synchronizer synchronizer, SplitParser splitParser, SplitCacheProducer splitCacheProducer, + public FeatureFlagWorkerImp(Synchronizer synchronizer, SplitParser splitParser, RuleBasedSegmentParser ruleBasedSegmentParser, + SplitCacheProducer splitCacheProducer, + RuleBasedSegmentCache ruleBasedSegmentCache, TelemetryRuntimeProducer telemetryRuntimeProducer, FlagSetsFilter flagSetsFilter) { super("Feature flags"); _synchronizer = checkNotNull(synchronizer); _splitParser = splitParser; + _ruleBasedSegmentParser = ruleBasedSegmentParser; _splitCacheProducer = splitCacheProducer; _telemetryRuntimeProducer = telemetryRuntimeProducer; _flagSetsFilter = flagSetsFilter; + _ruleBasedSegmentCache = ruleBasedSegmentCache; } @Override @@ -49,21 +61,56 @@ public void kill(SplitKillNotification splitKillNotification) { } @Override - protected void executeRefresh(FeatureFlagChangeNotification featureFlagChangeNotification) { - boolean success = addOrUpdateFeatureFlag(featureFlagChangeNotification); - + protected void executeRefresh(IncomingNotification incomingNotification) { + boolean success; + long changeNumber = 0L; + long changeNumberRBS = 0L; + if (incomingNotification.getType() == IncomingNotification.Type.SPLIT_UPDATE) { + CommonChangeNotification featureFlagChangeNotification = (CommonChangeNotification) incomingNotification; + success = addOrUpdateFeatureFlag(featureFlagChangeNotification); + changeNumber = featureFlagChangeNotification.getChangeNumber(); + } else { + CommonChangeNotification ruleBasedSegmentChangeNotification = (CommonChangeNotification) incomingNotification; + success = AddOrUpdateRuleBasedSegment(ruleBasedSegmentChangeNotification); + changeNumberRBS = ruleBasedSegmentChangeNotification.getChangeNumber(); + } if (!success) - _synchronizer.refreshSplits(featureFlagChangeNotification.getChangeNumber()); + _synchronizer.refreshSplits(changeNumber, changeNumberRBS); } - private boolean addOrUpdateFeatureFlag(FeatureFlagChangeNotification featureFlagChangeNotification) { + private boolean AddOrUpdateRuleBasedSegment(CommonChangeNotification ruleBasedSegmentChangeNotification) { + if (ruleBasedSegmentChangeNotification.getChangeNumber() <= _ruleBasedSegmentCache.getChangeNumber()) { + return true; + } + try { + if (ruleBasedSegmentChangeNotification.getDefinition() != null && + ruleBasedSegmentChangeNotification.getPreviousChangeNumber() == _ruleBasedSegmentCache.getChangeNumber()) { + RuleBasedSegment ruleBasedSegment = (RuleBasedSegment) ruleBasedSegmentChangeNotification.getDefinition(); + RuleBasedSegmentsToUpdate ruleBasedSegmentsToUpdate = processRuleBasedSegmentChanges(_ruleBasedSegmentParser, + Collections.singletonList(ruleBasedSegment)); + _ruleBasedSegmentCache.update(ruleBasedSegmentsToUpdate.getToAdd(), ruleBasedSegmentsToUpdate.getToRemove(), + ruleBasedSegmentChangeNotification.getChangeNumber()); + Set segments = ruleBasedSegmentsToUpdate.getSegments(); + for (String segmentName: segments) { + _synchronizer.forceRefreshSegment(segmentName); + } + // TODO: Add Telemetry once it is spec'd +// _telemetryRuntimeProducer.recordUpdatesFromSSE(UpdatesFromSSEEnum.RULE_BASED_SEGMENTS); + return true; + } + } catch (Exception e) { + _log.warn("Something went wrong processing a Rule based Segment notification", e); + } + return false; + } + private boolean addOrUpdateFeatureFlag(CommonChangeNotification featureFlagChangeNotification) { if (featureFlagChangeNotification.getChangeNumber() <= _splitCacheProducer.getChangeNumber()) { return true; } try { - if (featureFlagChangeNotification.getFeatureFlagDefinition() != null && + if (featureFlagChangeNotification.getDefinition() != null && featureFlagChangeNotification.getPreviousChangeNumber() == _splitCacheProducer.getChangeNumber()) { - Split featureFlag = featureFlagChangeNotification.getFeatureFlagDefinition(); + Split featureFlag = (Split) featureFlagChangeNotification.getDefinition(); FeatureFlagsToUpdate featureFlagsToUpdate = processFeatureFlagChanges(_splitParser, Collections.singletonList(featureFlag), _flagSetsFilter); _splitCacheProducer.update(featureFlagsToUpdate.getToAdd(), featureFlagsToUpdate.getToRemove(), @@ -72,6 +119,12 @@ private boolean addOrUpdateFeatureFlag(FeatureFlagChangeNotification featureFlag for (String segmentName: segments) { _synchronizer.forceRefreshSegment(segmentName); } + if (featureFlagsToUpdate.getToAdd().stream().count() > 0) { + Set ruleBasedSegments = featureFlagsToUpdate.getToAdd().get(0).getRuleBasedSegmentsNames(); + if (!ruleBasedSegments.isEmpty() && !_ruleBasedSegmentCache.contains(ruleBasedSegments)) { + return false; + } + } _telemetryRuntimeProducer.recordUpdatesFromSSE(UpdatesFromSSEEnum.SPLITS); return true; } diff --git a/client/src/main/java/io/split/engine/sse/workers/FeatureFlagsWorker.java b/client/src/main/java/io/split/engine/sse/workers/FeatureFlagsWorker.java index 354dbd7e1..b2cc1fbbc 100644 --- a/client/src/main/java/io/split/engine/sse/workers/FeatureFlagsWorker.java +++ b/client/src/main/java/io/split/engine/sse/workers/FeatureFlagsWorker.java @@ -1,10 +1,10 @@ package io.split.engine.sse.workers; -import io.split.engine.sse.dtos.FeatureFlagChangeNotification; +import io.split.engine.sse.dtos.IncomingNotification; import io.split.engine.sse.dtos.SplitKillNotification; public interface FeatureFlagsWorker { - void addToQueue(FeatureFlagChangeNotification featureFlagChangeNotification); + void addToQueue(IncomingNotification incomingNotification); void start(); void stop(); void kill(SplitKillNotification splitKillNotification); diff --git a/client/src/main/java/io/split/storages/RuleBasedSegmentCacheConsumer.java b/client/src/main/java/io/split/storages/RuleBasedSegmentCacheConsumer.java index 0002ee1ef..eeffd2ccb 100644 --- a/client/src/main/java/io/split/storages/RuleBasedSegmentCacheConsumer.java +++ b/client/src/main/java/io/split/storages/RuleBasedSegmentCacheConsumer.java @@ -11,6 +11,7 @@ public interface RuleBasedSegmentCacheConsumer { ParsedRuleBasedSegment get(String name); Collection getAll(); List ruleBasedSegmentNames(); + boolean contains(Set ruleBasedSegmentNames); long getChangeNumber(); Set getSegments(); } \ No newline at end of file diff --git a/client/src/main/java/io/split/storages/memory/RuleBasedSegmentCacheInMemoryImp.java b/client/src/main/java/io/split/storages/memory/RuleBasedSegmentCacheInMemoryImp.java index 812e64b1a..a86dc50fe 100644 --- a/client/src/main/java/io/split/storages/memory/RuleBasedSegmentCacheInMemoryImp.java +++ b/client/src/main/java/io/split/storages/memory/RuleBasedSegmentCacheInMemoryImp.java @@ -98,4 +98,9 @@ public Set getSegments() { return _concurrentMap.values().stream() .flatMap(parsedRuleBasedSegment -> parsedRuleBasedSegment.getSegmentsNames().stream()).collect(Collectors.toSet()); } + + @Override + public boolean contains(Set ruleBasedSegmentNames) { + return getSegments().contains(ruleBasedSegmentNames); + } } \ No newline at end of file diff --git a/client/src/main/java/io/split/storages/pluggable/adapters/UserCustomRuleBasedSegmentAdapterConsumer.java b/client/src/main/java/io/split/storages/pluggable/adapters/UserCustomRuleBasedSegmentAdapterConsumer.java index 0b1dc88cd..2fe52bc80 100644 --- a/client/src/main/java/io/split/storages/pluggable/adapters/UserCustomRuleBasedSegmentAdapterConsumer.java +++ b/client/src/main/java/io/split/storages/pluggable/adapters/UserCustomRuleBasedSegmentAdapterConsumer.java @@ -81,6 +81,7 @@ public Set getSegments() { getSegmentsNames().stream()).collect(Collectors.toSet()); } + private List stringsToParsedRuleBasedSegments(List elements) { List result = new ArrayList<>(); for(String s : elements) { @@ -92,4 +93,10 @@ private List stringsToParsedRuleBasedSegments(List ruleBasedSegmentNames) { + return getSegments().contains(ruleBasedSegmentNames); + } + } \ No newline at end of file diff --git a/client/src/test/java/io/split/engine/common/LocalhostSynchronizerTest.java b/client/src/test/java/io/split/engine/common/LocalhostSynchronizerTest.java index 6c0029084..7edbea333 100644 --- a/client/src/test/java/io/split/engine/common/LocalhostSynchronizerTest.java +++ b/client/src/test/java/io/split/engine/common/LocalhostSynchronizerTest.java @@ -101,7 +101,7 @@ public void testRefreshSplits() { SplitTasks splitTasks = SplitTasks.build(splitSynchronizationTask, null, null, null, null, null); LocalhostSynchronizer localhostSynchronizer = new LocalhostSynchronizer(splitTasks, splitFetcher, false); - localhostSynchronizer.refreshSplits(null); + localhostSynchronizer.refreshSplits(null, null); Mockito.verify(splitChangeFetcher, Mockito.times(1)).fetch(Mockito.anyLong(), Mockito.anyLong(), Mockito.anyObject()); } diff --git a/client/src/test/java/io/split/engine/common/SynchronizerTest.java b/client/src/test/java/io/split/engine/common/SynchronizerTest.java index b51ff8a8e..5ea05c35a 100644 --- a/client/src/test/java/io/split/engine/common/SynchronizerTest.java +++ b/client/src/test/java/io/split/engine/common/SynchronizerTest.java @@ -12,6 +12,7 @@ import io.split.storages.SplitCache; import io.split.storages.SplitCacheConsumer; import io.split.storages.SplitCacheProducer; +import io.split.storages.RuleBasedSegmentCacheProducer; import io.split.storages.memory.InMemoryCacheImp; import io.split.engine.experiments.FetchResult; import io.split.engine.experiments.SplitFetcherImp; @@ -22,9 +23,11 @@ import io.split.telemetry.synchronizer.TelemetrySyncTask; import org.junit.Assert; import org.junit.Before; +import org.junit.Rule; import org.junit.Test; import org.mockito.ArgumentCaptor; import org.mockito.Mockito; +import org.mockito.internal.matchers.Any; import java.lang.reflect.Field; import java.lang.reflect.Modifier; @@ -43,6 +46,7 @@ public class SynchronizerTest { private SegmentSynchronizationTask _segmentFetcher; private SplitFetcherImp _splitFetcher; private SplitCacheProducer _splitCacheProducer; + private RuleBasedSegmentCacheProducer _ruleBasedSegmentCacheProducer; private Synchronizer _synchronizer; private SegmentCacheProducer _segmentCacheProducer; private SplitTasks _splitTasks; @@ -56,6 +60,7 @@ public void beforeMethod() { _refreshableSplitFetcherTask = Mockito.mock(SplitSynchronizationTask.class); _segmentFetcher = Mockito.mock(SegmentSynchronizationTask.class); _splitFetcher = Mockito.mock(SplitFetcherImp.class); + _ruleBasedSegmentCacheProducer = Mockito.mock(RuleBasedSegmentCacheProducer.class); _splitCacheProducer = Mockito.mock(SplitCacheProducer.class); _segmentCacheProducer = Mockito.mock(SegmentCache.class); _telemetrySyncTask = Mockito.mock(TelemetrySyncTask.class); @@ -65,7 +70,7 @@ public void beforeMethod() { _splitTasks = SplitTasks.build(_refreshableSplitFetcherTask, _segmentFetcher, _impressionsManager, _eventsTask, _telemetrySyncTask, _uniqueKeysTracker); - _synchronizer = new SynchronizerImp(_splitTasks, _splitFetcher, _splitCacheProducer, _segmentCacheProducer, 50, 10, 5, new HashSet<>()); + _synchronizer = new SynchronizerImp(_splitTasks, _splitFetcher, _splitCacheProducer, _segmentCacheProducer, _ruleBasedSegmentCacheProducer, 50, 10, 5, new HashSet<>()); } @Test @@ -120,7 +125,7 @@ public void stopPeriodicFetching() { public void streamingRetryOnSplit() { when(_splitCacheProducer.getChangeNumber()).thenReturn(0l).thenReturn(0l).thenReturn(1l); when(_splitFetcher.forceRefresh(Mockito.anyObject())).thenReturn(new FetchResult(true, false, new HashSet<>())); - _synchronizer.refreshSplits(1L); + _synchronizer.refreshSplits(1L, 0L); Mockito.verify(_splitCacheProducer, Mockito.times(3)).getChangeNumber(); } @@ -145,7 +150,7 @@ public void streamingRetryOnSplitAndSegment() { SegmentFetcher fetcher = Mockito.mock(SegmentFetcher.class); when(_segmentCacheProducer.getChangeNumber(Mockito.anyString())).thenReturn(0l).thenReturn(0l).thenReturn(1l); when(_segmentFetcher.getFetcher(Mockito.anyString())).thenReturn(fetcher); - _synchronizer.refreshSplits(1L); + _synchronizer.refreshSplits(1L, 0L); Mockito.verify(_splitCacheProducer, Mockito.times(3)).getChangeNumber(); Mockito.verify(_segmentFetcher, Mockito.times(2)).getFetcher(Mockito.anyString()); @@ -158,6 +163,7 @@ public void testCDNBypassIsRequestedAfterNFailures() { _splitFetcher, cache, _segmentCacheProducer, + _ruleBasedSegmentCacheProducer, 50, 3, 1, @@ -173,7 +179,7 @@ public void testCDNBypassIsRequestedAfterNFailures() { return new FetchResult(true, false, new HashSet<>()); }).when(_splitFetcher).forceRefresh(optionsCaptor.capture()); - imp.refreshSplits(123L); + imp.refreshSplits(123L, 0L); List options = optionsCaptor.getAllValues(); Assert.assertEquals(options.size(), 4); @@ -190,6 +196,7 @@ public void testCDNBypassRequestLimitAndBackoff() throws NoSuchFieldException, I _splitFetcher, cache, _segmentCacheProducer, + _ruleBasedSegmentCacheProducer, 50, 3, 1, @@ -214,7 +221,7 @@ public void testCDNBypassRequestLimitAndBackoff() throws NoSuchFieldException, I backoffBase.set(imp, 1); // 1ms long before = System.currentTimeMillis(); - imp.refreshSplits(1L); + imp.refreshSplits(1L, 0L); long after = System.currentTimeMillis(); List options = optionsCaptor.getAllValues(); @@ -245,6 +252,7 @@ public void testCDNBypassRequestLimitAndForSegmentsBackoff() throws NoSuchFieldE _splitFetcher, cache, _segmentCacheProducer, + _ruleBasedSegmentCacheProducer, 50, 3, 1, @@ -303,6 +311,7 @@ public void testDataRecording(){ _splitFetcher, cache, _segmentCacheProducer, + _ruleBasedSegmentCacheProducer, 50, 3, 1, @@ -321,4 +330,18 @@ public void testDataRecording(){ Mockito.verify(_uniqueKeysTracker, Mockito.times(1)).stop(); Mockito.verify(_telemetrySyncTask, Mockito.times(1)).stopScheduledTask(); } + + @Test + public void skipSyncWhenChangeNumbersAreZero() { + _synchronizer.refreshSplits(0L, 0L); + Mockito.verify(_splitFetcher, Mockito.times(0)).forceRefresh(Mockito.anyObject()); + } + + @Test + public void testSyncRuleBasedSegment() { + when(_ruleBasedSegmentCacheProducer.getChangeNumber()).thenReturn(-1l).thenReturn(-1l).thenReturn(123l); + when(_splitFetcher.forceRefresh(Mockito.anyObject())).thenReturn(new FetchResult(true, false, new HashSet<>())); + _synchronizer.refreshSplits(0L, 123L); + Mockito.verify(_splitFetcher, Mockito.times(2)).forceRefresh(Mockito.anyObject()); + } } \ No newline at end of file diff --git a/client/src/test/java/io/split/engine/sse/CommonChangeNotificationTest.java b/client/src/test/java/io/split/engine/sse/CommonChangeNotificationTest.java new file mode 100644 index 000000000..f536870c1 --- /dev/null +++ b/client/src/test/java/io/split/engine/sse/CommonChangeNotificationTest.java @@ -0,0 +1,46 @@ +package io.split.engine.sse; + +import io.split.client.dtos.RuleBasedSegment; +import io.split.client.dtos.Split; +import io.split.client.dtos.Status; +import io.split.client.utils.Json; +import io.split.engine.sse.dtos.CommonChangeNotification; +import io.split.engine.sse.dtos.GenericNotificationData; +import io.split.engine.sse.dtos.IncomingNotification; +import io.split.engine.sse.dtos.RawMessageNotification; +import io.split.engine.sse.enums.CompressType; +import org.junit.Assert; +import org.junit.Test; + +public class CommonChangeNotificationTest { + + @Test + public void testFeatureFlagNotification() { + String notification = "{\"id\":\"vQQ61wzBRO:0:0\",\"clientId\":\"pri:MTUxNzg3MDg1OQ==\",\"timestamp\":1684265694676,\"encoding\":\"json\",\"channel\":\"NzM2MDI5Mzc0_MjkyNTIzNjczMw==_splits\",\"data\":\"{\\\"type\\\":\\\"SPLIT_UPDATE\\\",\\\"changeNumber\\\":1684265694505,\\\"pcn\\\":0,\\\"c\\\":2,\\\"d\\\":\\\"eJzMk99u2kwQxV8lOtdryQZj8N6hD5QPlThSTVNVEUKDPYZt1jZar1OlyO9emf8lVFWv2ss5zJyd82O8hTWUZSqZvW04opwhUVdsIKBSSKR+10vS1HWW7pIdz2NyBjRwHS8IXEopTLgbQqDYT+ZUm3LxlV4J4mg81LpMyKqygPRc94YeM6eQTtjphp4fegLVXvD6Qdjt9wPXF6gs2bqCxPC/2eRpDIEXpXXblpGuWCDljGptZ4bJ5lxYSJRZBoFkTcWKozpfsoH0goHfCXpB6PfcngDpVQnZEUjKIlOr2uwWqiC3zU5L1aF+3p7LFhUkPv8/mY2nk3gGgZxssmZzb8p6A9n25ktVtA9iGI3ODXunQ3HDp+AVWT6F+rZWlrWq7MN+YkSWWvuTDvkMSnNV7J6oTdl6qKTEvGnmjcCGjL2IYC/ovPYgUKnvvPtbmrmApiVryLM7p2jE++AfH6fTx09/HvuF32LWnNjStM0Xh3c8ukZcsZlEi3h8/zCObsBpJ0acqYLTmFdtqitK1V6NzrfpdPBbLmVx4uK26e27izpDu/r5yf/16AXun2Cr4u6w591xw7+LfDidLj6Mv8TXwP8xbofv/c7UmtHMmx8BAAD//0fclvU=\\\"}\"}"; + RawMessageNotification rawMessageNotification = Json.fromJson(notification, RawMessageNotification.class); + GenericNotificationData genericNotificationData = Json.fromJson(rawMessageNotification.getData(), GenericNotificationData.class); + + CommonChangeNotification featureFlagChangeNotification = new CommonChangeNotification(genericNotificationData, Split.class); + Assert.assertEquals(IncomingNotification.Type.SPLIT_UPDATE, featureFlagChangeNotification.getType()); + Assert.assertEquals(1684265694505L, featureFlagChangeNotification.getChangeNumber()); + Assert.assertEquals(CompressType.ZLIB, featureFlagChangeNotification.getCompressType()); + Assert.assertEquals(0L, featureFlagChangeNotification.getPreviousChangeNumber()); + Assert.assertEquals("mauro_java", featureFlagChangeNotification.getDefinition().name); + Assert.assertEquals(-1769377604, featureFlagChangeNotification.getDefinition().seed); + } + + @Test + public void testRuleBasedSegmentNotification() { + String notification = "{\"id\":\"vQQ61wzBRO:0:0\",\"clientId\":\"pri:MTUxNzg3MDg1OQ==\",\"timestamp\":1684265694676,\"encoding\":\"json\",\"channel\":\"NzM2MDI5Mzc0_MjkyNTIzNjczMw==_splits\",\"data\":\"{\\\"type\\\":\\\"RB_SEGMENT_UPDATE\\\",\\\"changeNumber\\\":1684265694505,\\\"pcn\\\":0,\\\"c\\\":0,\\\"d\\\":\\\"eyJjaGFuZ2VOdW1iZXIiOiA1LCAibmFtZSI6ICJzYW1wbGVfcnVsZV9iYXNlZF9zZWdtZW50IiwgInN0YXR1cyI6ICJBQ1RJVkUiLCAidHJhZmZpY1R5cGVOYW1lIjogInVzZXIiLCAiZXhjbHVkZWQiOiB7ImtleXMiOiBbIm1hdXJvQHNwbGl0LmlvIiwgImdhc3RvbkBzcGxpdC5pbyJdLCAic2VnbWVudHMiOiBbXX0sICJjb25kaXRpb25zIjogW3sibWF0Y2hlckdyb3VwIjogeyJjb21iaW5lciI6ICJBTkQiLCAibWF0Y2hlcnMiOiBbeyJrZXlTZWxlY3RvciI6IHsidHJhZmZpY1R5cGUiOiAidXNlciIsICJhdHRyaWJ1dGUiOiAiZW1haWwifSwgIm1hdGNoZXJUeXBlIjogIkVORFNfV0lUSCIsICJuZWdhdGUiOiBmYWxzZSwgIndoaXRlbGlzdE1hdGNoZXJEYXRhIjogeyJ3aGl0ZWxpc3QiOiBbIkBzcGxpdC5pbyJdfX1dfX1dfQ==\\\"}\"}"; + RawMessageNotification rawMessageNotification = Json.fromJson(notification, RawMessageNotification.class); + GenericNotificationData genericNotificationData = Json.fromJson(rawMessageNotification.getData(), GenericNotificationData.class); + + CommonChangeNotification ruleBasedSegmentCommonChangeNotification = new CommonChangeNotification(genericNotificationData, RuleBasedSegment.class); + Assert.assertEquals(IncomingNotification.Type.RB_SEGMENT_UPDATE, ruleBasedSegmentCommonChangeNotification.getType()); + Assert.assertEquals(1684265694505L, ruleBasedSegmentCommonChangeNotification.getChangeNumber()); + Assert.assertEquals(CompressType.NOT_COMPRESSED, ruleBasedSegmentCommonChangeNotification.getCompressType()); + Assert.assertEquals(0L, ruleBasedSegmentCommonChangeNotification.getPreviousChangeNumber()); + Assert.assertEquals("sample_rule_based_segment", ruleBasedSegmentCommonChangeNotification.getDefinition().name); + Assert.assertEquals(Status.ACTIVE, ruleBasedSegmentCommonChangeNotification.getDefinition().status); + } +} \ No newline at end of file diff --git a/client/src/test/java/io/split/engine/sse/EventSourceClientTest.java b/client/src/test/java/io/split/engine/sse/EventSourceClientTest.java index 604b6371f..e3a5050e1 100644 --- a/client/src/test/java/io/split/engine/sse/EventSourceClientTest.java +++ b/client/src/test/java/io/split/engine/sse/EventSourceClientTest.java @@ -3,8 +3,8 @@ import io.split.SSEMockServer; import io.split.client.RequestDecorator; import io.split.engine.sse.client.SSEClient; +import io.split.engine.sse.dtos.CommonChangeNotification; import io.split.engine.sse.dtos.ErrorNotification; -import io.split.engine.sse.dtos.FeatureFlagChangeNotification; import io.split.telemetry.storage.InMemoryTelemetryStorage; import io.split.telemetry.storage.TelemetryRuntimeProducer; import org.apache.hc.client5.http.config.RequestConfig; @@ -96,7 +96,7 @@ public void startAndReceiveNotification() throws IOException { Awaitility.await() .atMost(50L, TimeUnit.SECONDS) - .untilAsserted(() -> Mockito.verify(_notificationProcessor, Mockito.times(1)).process(Mockito.any(FeatureFlagChangeNotification.class))); + .untilAsserted(() -> Mockito.verify(_notificationProcessor, Mockito.times(1)).process(Mockito.any(CommonChangeNotification.class))); OutboundSseEvent sseEventError = new OutboundEvent .Builder() diff --git a/client/src/test/java/io/split/engine/sse/NotificationParserImpTest.java b/client/src/test/java/io/split/engine/sse/NotificationParserImpTest.java index cd57e56ac..69e574def 100644 --- a/client/src/test/java/io/split/engine/sse/NotificationParserImpTest.java +++ b/client/src/test/java/io/split/engine/sse/NotificationParserImpTest.java @@ -1,6 +1,7 @@ package io.split.engine.sse; -import io.split.engine.sse.dtos.FeatureFlagChangeNotification; +import io.split.client.dtos.Split; +import io.split.engine.sse.dtos.CommonChangeNotification; import io.split.engine.sse.enums.CompressType; import io.split.engine.sse.exceptions.EventParsingException; @@ -14,9 +15,10 @@ public void validateZlibCompressType() throws EventParsingException { String payload = "{\"id\":\"vQQ61wzBRO:0:0\",\"clientId\":\"pri:MTUxNzg3MDg1OQ==\",\"timestamp\":1684265694676,\"encoding\":\"json\",\"channel\":\"NzM2MDI5Mzc0_MjkyNTIzNjczMw==_splits\",\"data\":\"{\\\"type\\\":\\\"SPLIT_UPDATE\\\",\\\"changeNumber\\\":1684265694505,\\\"pcn\\\":0,\\\"c\\\":2,\\\"d\\\":\\\"eJzMk99u2kwQxV8lOtdryQZj8N6hD5QPlThSTVNVEUKDPYZt1jZar1OlyO9emf8lVFWv2ss5zJyd82O8hTWUZSqZvW04opwhUVdsIKBSSKR+10vS1HWW7pIdz2NyBjRwHS8IXEopTLgbQqDYT+ZUm3LxlV4J4mg81LpMyKqygPRc94YeM6eQTtjphp4fegLVXvD6Qdjt9wPXF6gs2bqCxPC/2eRpDIEXpXXblpGuWCDljGptZ4bJ5lxYSJRZBoFkTcWKozpfsoH0goHfCXpB6PfcngDpVQnZEUjKIlOr2uwWqiC3zU5L1aF+3p7LFhUkPv8/mY2nk3gGgZxssmZzb8p6A9n25ktVtA9iGI3ODXunQ3HDp+AVWT6F+rZWlrWq7MN+YkSWWvuTDvkMSnNV7J6oTdl6qKTEvGnmjcCGjL2IYC/ovPYgUKnvvPtbmrmApiVryLM7p2jE++AfH6fTx09/HvuF32LWnNjStM0Xh3c8ukZcsZlEi3h8/zCObsBpJ0acqYLTmFdtqitK1V6NzrfpdPBbLmVx4uK26e27izpDu/r5yf/16AXun2Cr4u6w591xw7+LfDidLj6Mv8TXwP8xbofv/c7UmtHMmx8BAAD//0fclvU=\\\"}\"}"; NotificationParserImp notificationParserImp = new NotificationParserImp(); - FeatureFlagChangeNotification incomingNotification = (FeatureFlagChangeNotification) notificationParserImp.parseMessage(payload); - Assert.assertEquals(incomingNotification.getFeatureFlagDefinition().name, "mauro_java"); - Assert.assertEquals(incomingNotification.getFeatureFlagDefinition().changeNumber, 1684265694505L); + CommonChangeNotification incomingNotification = (CommonChangeNotification) notificationParserImp.parseMessage(payload); + Split split = (Split) incomingNotification.getDefinition(); + Assert.assertEquals(split.name, "mauro_java"); + Assert.assertEquals(split.changeNumber, 1684265694505L); Assert.assertEquals(CompressType.ZLIB, incomingNotification.getCompressType()); Assert.assertEquals(0, incomingNotification.getPreviousChangeNumber()); } @@ -26,9 +28,10 @@ public void validateGzipCompressType() throws EventParsingException { String payload = "{\"id\":\"vQQ61wzBRO:0:0\",\"clientId\":\"pri:MTUxNzg3MDg1OQ==\",\"timestamp\":1684265694676,\"encoding\":\"json\",\"channel\":\"NzM2MDI5Mzc0_MjkyNTIzNjczMw==_splits\",\"data\":\"{\\\"type\\\":\\\"SPLIT_UPDATE\\\",\\\"changeNumber\\\":1684265694505,\\\"pcn\\\":0,\\\"c\\\":1,\\\"d\\\":\\\"H4sIAAAAAAAA/8yT327aTBDFXyU612vJxoTgvUMfKB8qcaSapqoihAZ7DNusvWi9TpUiv3tl/pdQVb1qL+cwc3bOj/EGzlKeq3T6tuaYCoZEXbGFgMogkXXDIM0y31v4C/aCgMnrU9/3gl7Pp4yilMMIAuVusqDamvlXeiWIg/FAa5OSU6aEDHz/ip4wZ5Be1AmjoBsFAtVOCO56UXh31/O7ApUjV1eQGPw3HT+NIPCitG7bctIVC2ScU63d1DK5gksHCZPnEEhXVC45rosFW8ig1++GYej3g85tJEB6aSA7Aqkpc7Ws7XahCnLTbLVM7evnzalsUUHi8//j6WgyTqYQKMilK7b31tRryLa3WKiyfRCDeHhq2Dntiys+JS/J8THUt5VyrFXlHnYTQ3LU2h91yGdQVqhy+0RtTeuhUoNZ08wagTVZdxbBndF5vYVApb7z9m9pZgKaFqwhT+6coRHvg398nEweP/157Bd+S1hz6oxtm88O73B0jbhgM47nyej+YRRfgdNODDlXJWcJL9tUF5SqnRqfbtPr4LdcTHnk4rfp3buLOkG7+Pmp++vRM9w/wVblzX7Pm8OGfxf5YDKZfxh9SS6B/2Pc9t/7ja01o5k1PwIAAP//uTipVskEAAA=\\\"}\"}"; NotificationParserImp notificationParserImp = new NotificationParserImp(); - FeatureFlagChangeNotification incomingNotification = (FeatureFlagChangeNotification) notificationParserImp.parseMessage(payload); - Assert.assertEquals(incomingNotification.getFeatureFlagDefinition().name, "mauro_java"); - Assert.assertEquals(incomingNotification.getFeatureFlagDefinition().changeNumber, 1684333081259L); + CommonChangeNotification incomingNotification = (CommonChangeNotification) notificationParserImp.parseMessage(payload); + Split split = (Split) incomingNotification.getDefinition(); + Assert.assertEquals(split.name, "mauro_java"); + Assert.assertEquals(split.changeNumber, 1684333081259L); Assert.assertEquals(CompressType.GZIP, incomingNotification.getCompressType()); Assert.assertEquals(0, incomingNotification.getPreviousChangeNumber()); } @@ -38,9 +41,10 @@ public void validateNotCompressType() throws EventParsingException { String payload = "{\"id\":\"vQQ61wzBRO:0:0\",\"clientId\":\"pri:MTUxNzg3MDg1OQ==\",\"timestamp\":1684265694676,\"encoding\":\"json\",\"channel\":\"NzM2MDI5Mzc0_MjkyNTIzNjczMw==_splits\",\"data\":\"{\\\"type\\\":\\\"SPLIT_UPDATE\\\",\\\"changeNumber\\\":1684329854385,\\\"pcn\\\":0,\\\"c\\\":0,\\\"d\\\":\\\"eyJ0cmFmZmljVHlwZU5hbWUiOiJ1c2VyIiwiaWQiOiJkNDMxY2RkMC1iMGJlLTExZWEtOGE4MC0xNjYwYWRhOWNlMzkiLCJuYW1lIjoibWF1cm9famF2YSIsInRyYWZmaWNBbGxvY2F0aW9uIjoxMDAsInRyYWZmaWNBbGxvY2F0aW9uU2VlZCI6LTkyMzkxNDkxLCJzZWVkIjotMTc2OTM3NzYwNCwic3RhdHVzIjoiQUNUSVZFIiwia2lsbGVkIjpmYWxzZSwiZGVmYXVsdFRyZWF0bWVudCI6Im9mZiIsImNoYW5nZU51bWJlciI6MTY4NDMyOTg1NDM4NSwiYWxnbyI6MiwiY29uZmlndXJhdGlvbnMiOnt9LCJjb25kaXRpb25zIjpbeyJjb25kaXRpb25UeXBlIjoiV0hJVEVMSVNUIiwibWF0Y2hlckdyb3VwIjp7ImNvbWJpbmVyIjoiQU5EIiwibWF0Y2hlcnMiOlt7Im1hdGNoZXJUeXBlIjoiV0hJVEVMSVNUIiwibmVnYXRlIjpmYWxzZSwid2hpdGVsaXN0TWF0Y2hlckRhdGEiOnsid2hpdGVsaXN0IjpbImFkbWluIiwibWF1cm8iLCJuaWNvIl19fV19LCJwYXJ0aXRpb25zIjpbeyJ0cmVhdG1lbnQiOiJvZmYiLCJzaXplIjoxMDB9XSwibGFiZWwiOiJ3aGl0ZWxpc3RlZCJ9LHsiY29uZGl0aW9uVHlwZSI6IlJPTExPVVQiLCJtYXRjaGVyR3JvdXAiOnsiY29tYmluZXIiOiJBTkQiLCJtYXRjaGVycyI6W3sia2V5U2VsZWN0b3IiOnsidHJhZmZpY1R5cGUiOiJ1c2VyIn0sIm1hdGNoZXJUeXBlIjoiSU5fU0VHTUVOVCIsIm5lZ2F0ZSI6ZmFsc2UsInVzZXJEZWZpbmVkU2VnbWVudE1hdGNoZXJEYXRhIjp7InNlZ21lbnROYW1lIjoibWF1ci0yIn19XX0sInBhcnRpdGlvbnMiOlt7InRyZWF0bWVudCI6Im9uIiwic2l6ZSI6MH0seyJ0cmVhdG1lbnQiOiJvZmYiLCJzaXplIjoxMDB9LHsidHJlYXRtZW50IjoiVjQiLCJzaXplIjowfSx7InRyZWF0bWVudCI6InY1Iiwic2l6ZSI6MH1dLCJsYWJlbCI6ImluIHNlZ21lbnQgbWF1ci0yIn0seyJjb25kaXRpb25UeXBlIjoiUk9MTE9VVCIsIm1hdGNoZXJHcm91cCI6eyJjb21iaW5lciI6IkFORCIsIm1hdGNoZXJzIjpbeyJrZXlTZWxlY3RvciI6eyJ0cmFmZmljVHlwZSI6InVzZXIifSwibWF0Y2hlclR5cGUiOiJBTExfS0VZUyIsIm5lZ2F0ZSI6ZmFsc2V9XX0sInBhcnRpdGlvbnMiOlt7InRyZWF0bWVudCI6Im9uIiwic2l6ZSI6MH0seyJ0cmVhdG1lbnQiOiJvZmYiLCJzaXplIjoxMDB9LHsidHJlYXRtZW50IjoiVjQiLCJzaXplIjowfSx7InRyZWF0bWVudCI6InY1Iiwic2l6ZSI6MH1dLCJsYWJlbCI6ImRlZmF1bHQgcnVsZSJ9XX0=\\\"}\"}"; NotificationParserImp notificationParserImp = new NotificationParserImp(); - FeatureFlagChangeNotification incomingNotification = (FeatureFlagChangeNotification) notificationParserImp.parseMessage(payload); - Assert.assertEquals(incomingNotification.getFeatureFlagDefinition().name, "mauro_java"); - Assert.assertEquals(incomingNotification.getFeatureFlagDefinition().changeNumber, 1684329854385L); + CommonChangeNotification incomingNotification = (CommonChangeNotification) notificationParserImp.parseMessage(payload); + Split split = (Split) incomingNotification.getDefinition(); + Assert.assertEquals(split.name, "mauro_java"); + Assert.assertEquals(split.changeNumber, 1684329854385L); Assert.assertEquals(CompressType.NOT_COMPRESSED, incomingNotification.getCompressType()); Assert.assertEquals(0, incomingNotification.getPreviousChangeNumber()); } @@ -50,7 +54,7 @@ public void validateCompressTypeIncorrect() throws EventParsingException { String payload = "{\"id\":\"vQQ61wzBRO:0:0\",\"clientId\":\"pri:MTUxNzg3MDg1OQ==\",\"timestamp\":1684265694676,\"encoding\":\"json\",\"channel\":\"NzM2MDI5Mzc0_MjkyNTIzNjczMw==_splits\",\"data\":\"{\\\"type\\\":\\\"SPLIT_UPDATE\\\",\\\"changeNumber\\\":1684265694505,\\\"pcn\\\":0,\\\"c\\\":3,\\\"d\\\":\\\"eJzMk99u2kwQxV8lOtdryQZj8N6hD5QPlThSTVNVEUKDPYZt1jZar1OlyO9emf8lVFWv2ss5zJyd82O8hTWUZSqZvW04opwhUVdsIKBSSKR+10vS1HWW7pIdz2NyBjRwHS8IXEopTLgbQqDYT+ZUm3LxlV4J4mg81LpMyKqygPRc94YeM6eQTtjphp4fegLVXvD6Qdjt9wPXF6gs2bqCxPC/2eRpDIEXpXXblpGuWCDljGptZ4bJ5lxYSJRZBoFkTcWKozpfsoH0goHfCXpB6PfcngDpVQnZEUjKIlOr2uwWqiC3zU5L1aF+3p7LFhUkPv8/mY2nk3gGgZxssmZzb8p6A9n25ktVtA9iGI3ODXunQ3HDp+AVWT6F+rZWlrWq7MN+YkSWWvuTDvkMSnNV7J6oTdl6qKTEvGnmjcCGjL2IYC/ovPYgUKnvvPtbmrmApiVryLM7p2jE++AfH6fTx09/HvuF32LWnNjStM0Xh3c8ukZcsZlEi3h8/zCObsBpJ0acqYLTmFdtqitK1V6NzrfpdPBbLmVx4uK26e27izpDu/r5yf/16AXun2Cr4u6w591xw7+LfDidLj6Mv8TXwP8xbofv/c7UmtHMmx8BAAD//0fclvU=\\\"}\"}"; NotificationParserImp notificationParserImp = new NotificationParserImp(); - FeatureFlagChangeNotification incomingNotification = (FeatureFlagChangeNotification) notificationParserImp.parseMessage(payload); + CommonChangeNotification incomingNotification = (CommonChangeNotification) notificationParserImp.parseMessage(payload); Assert.assertNull(incomingNotification.getCompressType()); Assert.assertEquals(0, incomingNotification.getPreviousChangeNumber()); } @@ -60,7 +64,7 @@ public void validateCompressTypeNull() throws EventParsingException { String payload = "{\"id\":\"vQQ61wzBRO:0:0\",\"clientId\":\"pri:MTUxNzg3MDg1OQ==\",\"timestamp\":1684265694676,\"encoding\":\"json\",\"channel\":\"NzM2MDI5Mzc0_MjkyNTIzNjczMw==_splits\",\"data\":\"{\\\"type\\\":\\\"SPLIT_UPDATE\\\",\\\"changeNumber\\\":1684265694505,\\\"pcn\\\":0,\\\"d\\\":\\\"eJzMk99u2kwQxV8lOtdryQZj8N6hD5QPlThSTVNVEUKDPYZt1jZar1OlyO9emf8lVFWv2ss5zJyd82O8hTWUZSqZvW04opwhUVdsIKBSSKR+10vS1HWW7pIdz2NyBjRwHS8IXEopTLgbQqDYT+ZUm3LxlV4J4mg81LpMyKqygPRc94YeM6eQTtjphp4fegLVXvD6Qdjt9wPXF6gs2bqCxPC/2eRpDIEXpXXblpGuWCDljGptZ4bJ5lxYSJRZBoFkTcWKozpfsoH0goHfCXpB6PfcngDpVQnZEUjKIlOr2uwWqiC3zU5L1aF+3p7LFhUkPv8/mY2nk3gGgZxssmZzb8p6A9n25ktVtA9iGI3ODXunQ3HDp+AVWT6F+rZWlrWq7MN+YkSWWvuTDvkMSnNV7J6oTdl6qKTEvGnmjcCGjL2IYC/ovPYgUKnvvPtbmrmApiVryLM7p2jE++AfH6fTx09/HvuF32LWnNjStM0Xh3c8ukZcsZlEi3h8/zCObsBpJ0acqYLTmFdtqitK1V6NzrfpdPBbLmVx4uK26e27izpDu/r5yf/16AXun2Cr4u6w591xw7+LfDidLj6Mv8TXwP8xbofv/c7UmtHMmx8BAAD//0fclvU=\\\"}\"}"; NotificationParserImp notificationParserImp = new NotificationParserImp(); - FeatureFlagChangeNotification incomingNotification = (FeatureFlagChangeNotification) notificationParserImp.parseMessage(payload); + CommonChangeNotification incomingNotification = (CommonChangeNotification) notificationParserImp.parseMessage(payload); Assert.assertNull(incomingNotification.getCompressType()); Assert.assertEquals(0, incomingNotification.getPreviousChangeNumber()); } diff --git a/client/src/test/java/io/split/engine/sse/NotificationParserTest.java b/client/src/test/java/io/split/engine/sse/NotificationParserTest.java index 26f9e1997..ad0b4075e 100644 --- a/client/src/test/java/io/split/engine/sse/NotificationParserTest.java +++ b/client/src/test/java/io/split/engine/sse/NotificationParserTest.java @@ -1,13 +1,6 @@ package io.split.engine.sse; -import io.split.engine.sse.dtos.ControlNotification; -import io.split.engine.sse.dtos.ControlType; -import io.split.engine.sse.dtos.ErrorNotification; -import io.split.engine.sse.dtos.FeatureFlagChangeNotification; -import io.split.engine.sse.dtos.IncomingNotification; -import io.split.engine.sse.dtos.OccupancyNotification; -import io.split.engine.sse.dtos.SegmentChangeNotification; -import io.split.engine.sse.dtos.SplitKillNotification; +import io.split.engine.sse.dtos.*; import io.split.engine.sse.exceptions.EventParsingException; import org.junit.Before; import org.junit.Test; @@ -29,7 +22,7 @@ public void parseSplitUpdateShouldReturnParsedEvent() throws EventParsingExcepti IncomingNotification result = notificationParser.parseMessage(payload); assertEquals(IncomingNotification.Type.SPLIT_UPDATE, result.getType()); assertEquals("xxxx_xxxx_splits", result.getChannel()); - assertEquals(1592590435115L, ((FeatureFlagChangeNotification) result).getChangeNumber()); + assertEquals(1592590435115L, ((CommonChangeNotification) result).getChangeNumber()); } @Test diff --git a/client/src/test/java/io/split/engine/sse/NotificationProcessorTest.java b/client/src/test/java/io/split/engine/sse/NotificationProcessorTest.java index a56f05dd1..ea75ecb1d 100644 --- a/client/src/test/java/io/split/engine/sse/NotificationProcessorTest.java +++ b/client/src/test/java/io/split/engine/sse/NotificationProcessorTest.java @@ -1,12 +1,8 @@ package io.split.engine.sse; -import io.split.engine.sse.dtos.ControlNotification; -import io.split.engine.sse.dtos.FeatureFlagChangeNotification; -import io.split.engine.sse.dtos.GenericNotificationData; -import io.split.engine.sse.dtos.OccupancyNotification; -import io.split.engine.sse.dtos.SegmentChangeNotification; -import io.split.engine.sse.dtos.SegmentQueueDto; -import io.split.engine.sse.dtos.SplitKillNotification; +import io.split.client.dtos.RuleBasedSegment; +import io.split.client.dtos.Split; +import io.split.engine.sse.dtos.*; import io.split.engine.sse.workers.SegmentsWorkerImp; import io.split.engine.sse.workers.FeatureFlagsWorker; import io.split.engine.sse.workers.Worker; @@ -37,13 +33,29 @@ public void processSplitUpdateAddToQueueInWorker() { .changeNumber(changeNumber) .channel(channel) .build(); - FeatureFlagChangeNotification splitChangeNotification = new FeatureFlagChangeNotification(genericNotificationData); + CommonChangeNotification splitChangeNotification = new CommonChangeNotification(genericNotificationData, Split.class); _notificationProcessor.process(splitChangeNotification); Mockito.verify(_featureFlagsWorker, Mockito.times(1)).addToQueue(Mockito.anyObject()); } + @Test + public void processRuleBasedSegmentUpdateAddToQueueInWorker() { + long changeNumber = 1585867723838L; + String channel = "splits"; + GenericNotificationData genericNotificationData = GenericNotificationData.builder() + .changeNumber(changeNumber) + .channel(channel) + .type(IncomingNotification.Type.RB_SEGMENT_UPDATE) + .build(); + CommonChangeNotification ruleBasedSegmentChangeNotification = new CommonChangeNotification(genericNotificationData, RuleBasedSegment.class); + + _notificationProcessor.process(ruleBasedSegmentChangeNotification); + + Mockito.verify(_featureFlagsWorker, Mockito.times(1)).addToQueue(Mockito.anyObject()); + } + @Test public void processSplitKillAndAddToQueueInWorker() { long changeNumber = 1585867723838L; diff --git a/client/src/test/java/io/split/engine/sse/workers/FeatureFlagWorkerImpTest.java b/client/src/test/java/io/split/engine/sse/workers/FeatureFlagWorkerImpTest.java index 9be19b487..1f7c9a8c7 100644 --- a/client/src/test/java/io/split/engine/sse/workers/FeatureFlagWorkerImpTest.java +++ b/client/src/test/java/io/split/engine/sse/workers/FeatureFlagWorkerImpTest.java @@ -1,14 +1,24 @@ package io.split.engine.sse.workers; +import io.split.client.dtos.ConditionType; +import io.split.client.dtos.Split; +import io.split.client.dtos.RuleBasedSegment; +import io.split.client.dtos.MatcherCombiner; import io.split.client.interceptors.FlagSetsFilter; import io.split.client.interceptors.FlagSetsFilterImpl; import io.split.client.utils.Json; import io.split.engine.common.Synchronizer; import io.split.engine.common.SynchronizerImp; +import io.split.engine.experiments.ParsedCondition; +import io.split.engine.experiments.ParsedRuleBasedSegment; +import io.split.engine.experiments.RuleBasedSegmentParser; import io.split.engine.experiments.SplitParser; -import io.split.engine.sse.dtos.FeatureFlagChangeNotification; -import io.split.engine.sse.dtos.GenericNotificationData; +import io.split.engine.matchers.AttributeMatcher; +import io.split.engine.matchers.CombiningMatcher; +import io.split.engine.sse.dtos.CommonChangeNotification; import io.split.engine.sse.dtos.RawMessageNotification; +import io.split.engine.sse.dtos.GenericNotificationData; +import io.split.storages.RuleBasedSegmentCache; import io.split.storages.SplitCacheProducer; import io.split.storages.memory.InMemoryCacheImp; import io.split.telemetry.domain.UpdatesFromSSE; @@ -18,8 +28,12 @@ import org.junit.Test; import org.mockito.Mockito; +import java.util.ArrayList; +import java.util.Arrays; import java.util.HashSet; +import static org.mockito.Mockito.when; + public class FeatureFlagWorkerImpTest { private static final FlagSetsFilter FLAG_SETS_FILTER = new FlagSetsFilterImpl(new HashSet<>()); @@ -27,54 +41,114 @@ public class FeatureFlagWorkerImpTest { @Test public void testRefreshSplitsWithCorrectFF() { SplitParser splitParser = new SplitParser(); + RuleBasedSegmentParser ruleBasedSegmentParser = new RuleBasedSegmentParser(); Synchronizer synchronizer = Mockito.mock(SynchronizerImp.class); SplitCacheProducer splitCacheProducer = Mockito.mock(SplitCacheProducer.class); + RuleBasedSegmentCache ruleBasedSegmentCache = Mockito.mock(RuleBasedSegmentCache.class); TelemetryStorage telemetryRuntimeProducer = new InMemoryTelemetryStorage(); - FeatureFlagWorkerImp featureFlagsWorker = new FeatureFlagWorkerImp(synchronizer, splitParser, splitCacheProducer, telemetryRuntimeProducer, FLAG_SETS_FILTER); + FeatureFlagWorkerImp featureFlagsWorker = new FeatureFlagWorkerImp(synchronizer, splitParser, ruleBasedSegmentParser, splitCacheProducer, ruleBasedSegmentCache, telemetryRuntimeProducer, FLAG_SETS_FILTER); String notification = "{\"id\":\"vQQ61wzBRO:0:0\",\"clientId\":\"pri:MTUxNzg3MDg1OQ==\",\"timestamp\":1684265694676,\"encoding\":\"json\",\"channel\":\"NzM2MDI5Mzc0_MjkyNTIzNjczMw==_splits\",\"data\":\"{\\\"type\\\":\\\"SPLIT_UPDATE\\\",\\\"changeNumber\\\":1684265694505,\\\"pcn\\\":0,\\\"c\\\":2,\\\"d\\\":\\\"eJzMk99u2kwQxV8lOtdryQZj8N6hD5QPlThSTVNVEUKDPYZt1jZar1OlyO9emf8lVFWv2ss5zJyd82O8hTWUZSqZvW04opwhUVdsIKBSSKR+10vS1HWW7pIdz2NyBjRwHS8IXEopTLgbQqDYT+ZUm3LxlV4J4mg81LpMyKqygPRc94YeM6eQTtjphp4fegLVXvD6Qdjt9wPXF6gs2bqCxPC/2eRpDIEXpXXblpGuWCDljGptZ4bJ5lxYSJRZBoFkTcWKozpfsoH0goHfCXpB6PfcngDpVQnZEUjKIlOr2uwWqiC3zU5L1aF+3p7LFhUkPv8/mY2nk3gGgZxssmZzb8p6A9n25ktVtA9iGI3ODXunQ3HDp+AVWT6F+rZWlrWq7MN+YkSWWvuTDvkMSnNV7J6oTdl6qKTEvGnmjcCGjL2IYC/ovPYgUKnvvPtbmrmApiVryLM7p2jE++AfH6fTx09/HvuF32LWnNjStM0Xh3c8ukZcsZlEi3h8/zCObsBpJ0acqYLTmFdtqitK1V6NzrfpdPBbLmVx4uK26e27izpDu/r5yf/16AXun2Cr4u6w591xw7+LfDidLj6Mv8TXwP8xbofv/c7UmtHMmx8BAAD//0fclvU=\\\"}\"}"; RawMessageNotification rawMessageNotification = Json.fromJson(notification, RawMessageNotification.class); GenericNotificationData genericNotificationData = Json.fromJson(rawMessageNotification.getData(), GenericNotificationData.class); - FeatureFlagChangeNotification featureFlagChangeNotification = new FeatureFlagChangeNotification(genericNotificationData); + + CommonChangeNotification featureFlagChangeNotification = new CommonChangeNotification(genericNotificationData, Split.class); featureFlagsWorker.executeRefresh(featureFlagChangeNotification); UpdatesFromSSE updatesFromSSE = telemetryRuntimeProducer.popUpdatesFromSSE(); Assert.assertEquals(1, updatesFromSSE.getSplits()); - Mockito.verify(synchronizer, Mockito.times(0)).refreshSplits(1684265694505L); + Mockito.verify(synchronizer, Mockito.times(0)).refreshSplits(1684265694505L, 0L); Mockito.verify(synchronizer, Mockito.times(1)).forceRefreshSegment(Mockito.anyString()); } @Test public void testRefreshSplitsWithEmptyData() { SplitParser splitParser = new SplitParser(); + RuleBasedSegmentParser ruleBasedSegmentParser = new RuleBasedSegmentParser(); Synchronizer synchronizer = Mockito.mock(SynchronizerImp.class); SplitCacheProducer splitCacheProducer = Mockito.mock(SplitCacheProducer.class); + RuleBasedSegmentCache ruleBasedSegmentCache = Mockito.mock(RuleBasedSegmentCache.class); TelemetryStorage telemetryRuntimeProducer = new InMemoryTelemetryStorage(); - FeatureFlagWorkerImp featureFlagsWorker = new FeatureFlagWorkerImp(synchronizer, splitParser, splitCacheProducer, telemetryRuntimeProducer, FLAG_SETS_FILTER); + FeatureFlagWorkerImp featureFlagsWorker = new FeatureFlagWorkerImp(synchronizer, splitParser, ruleBasedSegmentParser, splitCacheProducer, ruleBasedSegmentCache, telemetryRuntimeProducer, FLAG_SETS_FILTER); String notification = "{\"id\":\"vQQ61wzBRO:0:0\",\"clientId\":\"pri:MTUxNzg3MDg1OQ==\",\"timestamp\":1684265694676,\"encoding\":\"json\",\"channel\":\"NzM2MDI5Mzc0_MjkyNTIzNjczMw==_splits\",\"data\":\"{\\\"type\\\":\\\"SPLIT_UPDATE\\\",\\\"changeNumber\\\":1684265694505}\"}"; RawMessageNotification rawMessageNotification = Json.fromJson(notification, RawMessageNotification.class); GenericNotificationData genericNotificationData = Json.fromJson(rawMessageNotification.getData(), GenericNotificationData.class); - FeatureFlagChangeNotification featureFlagChangeNotification = new FeatureFlagChangeNotification(genericNotificationData); + + CommonChangeNotification featureFlagChangeNotification = new CommonChangeNotification(genericNotificationData, Split.class); featureFlagsWorker.executeRefresh(featureFlagChangeNotification); UpdatesFromSSE updatesFromSSE = telemetryRuntimeProducer.popUpdatesFromSSE(); Assert.assertEquals(0, updatesFromSSE.getSplits()); - Mockito.verify(synchronizer, Mockito.times(1)).refreshSplits(1684265694505L); + Mockito.verify(synchronizer, Mockito.times(1)).refreshSplits(1684265694505L, 0L); Mockito.verify(synchronizer, Mockito.times(0)).forceRefreshSegment(Mockito.anyString()); } @Test public void testRefreshSplitsArchiveFF() { SplitParser splitParser = new SplitParser(); + RuleBasedSegmentParser ruleBasedSegmentParser = new RuleBasedSegmentParser(); Synchronizer synchronizer = Mockito.mock(SynchronizerImp.class); SplitCacheProducer splitCacheProducer = new InMemoryCacheImp(1686165614090L, FLAG_SETS_FILTER); + RuleBasedSegmentCache ruleBasedSegmentCache = Mockito.mock(RuleBasedSegmentCache.class); TelemetryStorage telemetryRuntimeProducer = new InMemoryTelemetryStorage(); - FeatureFlagWorkerImp featureFlagsWorker = new FeatureFlagWorkerImp(synchronizer, splitParser, splitCacheProducer, telemetryRuntimeProducer, FLAG_SETS_FILTER); + FeatureFlagWorkerImp featureFlagsWorker = new FeatureFlagWorkerImp(synchronizer, splitParser, ruleBasedSegmentParser, splitCacheProducer, ruleBasedSegmentCache, telemetryRuntimeProducer, FLAG_SETS_FILTER); String notification = "{\"id\":\"vQQ61wzBRO:0:0\",\"clientId\":\"pri:MTUxNzg3MDg1OQ==\",\"timestamp\":1684265694676,\"encoding\":\"json\",\"channel\":\"NzM2MDI5Mzc0_MjkyNTIzNjczMw==_splits\",\"data\":\"{\\\"type\\\":\\\"SPLIT_UPDATE\\\",\\\"changeNumber\\\":1686165617166,\\\"pcn\\\":1686165614090,\\\"c\\\":2,\\\"d\\\":\\\"eJxsUdFu4jAQ/JVqnx3JDjTh/JZCrj2JBh0EqtOBIuNswKqTIMeuxKH8+ykhiKrqiyXvzM7O7lzAGlEUSqbnEyaiRODgGjRAQOXAIQ/puPB96tHHIPQYQ/QmFNErxEgG44DKnI2AQHXtTOI0my6WcXZAmxoUtsTKvil7nNZVoQ5RYdFERh7VBwK5TY60rqWwqq6AM0q/qa8Qc+As/EHZ5HHMCDR9wQ/9kIajcEygscK6BjhEy+nLr008AwLvSuuOVgjdIIEcC+H03RZw2Hg/n88JEJBHUR0wceUeDXAWTAIWPAYsZEFAQOhDDdwnIPslnOk9NcAvNwEOly3IWtdmC3wLe+1wCy0Q2Hh/zNvTV9xg3sFtr5irQe3v5f7twgAOy8V8vlinQKAUVh7RPJvanbrBsi73qurMQpTM7oSrzjueV6hR2tp05E8J39MV1hq1d7YrWWxsZ2cQGYjzeLXK0pcoyRbLLP69juZZuuiyxoPo2oa7ukqYc+JKNEq+XgVmwopucC6sGMSS9etTvAQCH0I7BO7Ttt21BE7C2E8XsN+l06h/CJy25CveH/eGM0rbHQEt9qiHnR62jtKR7N/8wafQ7tr/AQAA//8S4fPB\\\"}\"}"; RawMessageNotification rawMessageNotification = Json.fromJson(notification, RawMessageNotification.class); GenericNotificationData genericNotificationData = Json.fromJson(rawMessageNotification.getData(), GenericNotificationData.class); - FeatureFlagChangeNotification featureFlagChangeNotification = new FeatureFlagChangeNotification(genericNotificationData); + + CommonChangeNotification featureFlagChangeNotification = new CommonChangeNotification(genericNotificationData, Split.class); featureFlagsWorker.executeRefresh(featureFlagChangeNotification); UpdatesFromSSE updatesFromSSE = telemetryRuntimeProducer.popUpdatesFromSSE(); Assert.assertEquals(1, updatesFromSSE.getSplits()); - Mockito.verify(synchronizer, Mockito.times(0)).refreshSplits(1686165617166L); + Mockito.verify(synchronizer, Mockito.times(0)).refreshSplits(1686165617166L, 0L); Mockito.verify(synchronizer, Mockito.times(0)).forceRefreshSegment(Mockito.anyString()); } + + @Test + public void testUpdateRuleBasedSegmentsWithCorrectFF() { + io.split.engine.matchers.Matcher matcher = (matchValue, bucketingKey, attributes, evaluationContext) -> false; + ParsedCondition parsedCondition = new ParsedCondition(ConditionType.ROLLOUT, + new CombiningMatcher(MatcherCombiner.AND, Arrays.asList(new AttributeMatcher("email", matcher, false))), + null, + "my label"); + ParsedRuleBasedSegment parsedRBS = new ParsedRuleBasedSegment("sample_rule_based_segment", + Arrays.asList(parsedCondition), + "user", + 5, + Arrays.asList("mauro@split.io","gaston@split.io"), + new ArrayList<>()); + + SplitParser splitParser = new SplitParser(); + RuleBasedSegmentParser ruleBasedSegmentParser = new RuleBasedSegmentParser(); + Synchronizer synchronizer = Mockito.mock(SynchronizerImp.class); + SplitCacheProducer splitCacheProducer = Mockito.mock(SplitCacheProducer.class); + RuleBasedSegmentCache ruleBasedSegmentCache = Mockito.mock(RuleBasedSegmentCache.class); + TelemetryStorage telemetryRuntimeProducer = new InMemoryTelemetryStorage(); + FeatureFlagWorkerImp featureFlagsWorker = new FeatureFlagWorkerImp(synchronizer, splitParser, ruleBasedSegmentParser, splitCacheProducer, ruleBasedSegmentCache, telemetryRuntimeProducer, FLAG_SETS_FILTER); + String notification = "{\"id\":\"vQQ61wzBRO:0:0\",\"clientId\":\"pri:MTUxNzg3MDg1OQ==\",\"timestamp\":1684265694676,\"encoding\":\"json\",\"channel\":\"NzM2MDI5Mzc0_MjkyNTIzNjczMw==_splits\",\"data\":\"{\\\"type\\\":\\\"RB_SEGMENT_UPDATE\\\",\\\"changeNumber\\\":1684265694505,\\\"pcn\\\":0,\\\"c\\\":0,\\\"d\\\":\\\"eyJjaGFuZ2VOdW1iZXIiOiA1LCAibmFtZSI6ICJzYW1wbGVfcnVsZV9iYXNlZF9zZWdtZW50IiwgInN0YXR1cyI6ICJBQ1RJVkUiLCAidHJhZmZpY1R5cGVOYW1lIjogInVzZXIiLCAiZXhjbHVkZWQiOiB7ImtleXMiOiBbIm1hdXJvQHNwbGl0LmlvIiwgImdhc3RvbkBzcGxpdC5pbyJdLCAic2VnbWVudHMiOiBbXX0sICJjb25kaXRpb25zIjogW3sibWF0Y2hlckdyb3VwIjogeyJjb21iaW5lciI6ICJBTkQiLCAibWF0Y2hlcnMiOiBbeyJrZXlTZWxlY3RvciI6IHsidHJhZmZpY1R5cGUiOiAidXNlciIsICJhdHRyaWJ1dGUiOiAiZW1haWwifSwgIm1hdGNoZXJUeXBlIjogIkVORFNfV0lUSCIsICJuZWdhdGUiOiBmYWxzZSwgIndoaXRlbGlzdE1hdGNoZXJEYXRhIjogeyJ3aGl0ZWxpc3QiOiBbIkBzcGxpdC5pbyJdfX1dfX1dfQ==\\\"}\"}"; + RawMessageNotification rawMessageNotification = Json.fromJson(notification, RawMessageNotification.class); + GenericNotificationData genericNotificationData = Json.fromJson(rawMessageNotification.getData(), GenericNotificationData.class); + + CommonChangeNotification ruleBasedSegmentChangeNotification = new CommonChangeNotification(genericNotificationData, RuleBasedSegment.class); + featureFlagsWorker.executeRefresh(ruleBasedSegmentChangeNotification); + Mockito.verify(ruleBasedSegmentCache, Mockito.times(1)).update(Arrays.asList(parsedRBS), new ArrayList<>(), 1684265694505L); + } + + @Test + public void testRefreshRuleBasedSegmentWithCorrectFF() { + SplitParser splitParser = new SplitParser(); + RuleBasedSegmentParser ruleBasedSegmentParser = new RuleBasedSegmentParser(); + Synchronizer synchronizer = Mockito.mock(SynchronizerImp.class); + SplitCacheProducer splitCacheProducer = Mockito.mock(SplitCacheProducer.class); + RuleBasedSegmentCache ruleBasedSegmentCache = Mockito.mock(RuleBasedSegmentCache.class); + HashSet rbs = new HashSet<>(); + rbs.add("sample_rule_based_segment"); + when(ruleBasedSegmentCache.contains(rbs)).thenReturn(false); + TelemetryStorage telemetryRuntimeProducer = new InMemoryTelemetryStorage(); + FeatureFlagWorkerImp featureFlagsWorker = new FeatureFlagWorkerImp(synchronizer, splitParser, ruleBasedSegmentParser, splitCacheProducer, ruleBasedSegmentCache, telemetryRuntimeProducer, FLAG_SETS_FILTER); + String notification = "{\"id\":\"vQQ61wzBRO:0:0\",\"clientId\":\"pri:MTUxNzg3MDg1OQ==\",\"timestamp\":1684265694676,\"encoding\":\"json\",\"channel\":\"NzM2MDI5Mzc0_MjkyNTIzNjczMw==_splits\",\"data\":\"{\\\"type\\\":\\\"SPLIT_UPDATE\\\",\\\"changeNumber\\\":1684265694505,\\\"pcn\\\":0,\\\"c\\\":0,\\\"d\\\":\\\"eyJjaGFuZ2VOdW1iZXIiOiAxMCwgInRyYWZmaWNUeXBlTmFtZSI6ICJ1c2VyIiwgIm5hbWUiOiAicmJzX2ZsYWciLCAidHJhZmZpY0FsbG9jYXRpb24iOiAxMDAsICJ0cmFmZmljQWxsb2NhdGlvblNlZWQiOiAxODI4Mzc3MzgwLCAic2VlZCI6IC0yODY2MTc5MjEsICJzdGF0dXMiOiAiQUNUSVZFIiwgImtpbGxlZCI6IGZhbHNlLCAiZGVmYXVsdFRyZWF0bWVudCI6ICJvZmYiLCAiYWxnbyI6IDIsICJjb25kaXRpb25zIjogW3siY29uZGl0aW9uVHlwZSI6ICJST0xMT1VUIiwgIm1hdGNoZXJHcm91cCI6IHsiY29tYmluZXIiOiAiQU5EIiwgIm1hdGNoZXJzIjogW3sia2V5U2VsZWN0b3IiOiB7InRyYWZmaWNUeXBlIjogInVzZXIifSwgIm1hdGNoZXJUeXBlIjogIklOX1JVTEVfQkFTRURfU0VHTUVOVCIsICJuZWdhdGUiOiBmYWxzZSwgInVzZXJEZWZpbmVkU2VnbWVudE1hdGNoZXJEYXRhIjogeyJzZWdtZW50TmFtZSI6ICJzYW1wbGVfcnVsZV9iYXNlZF9zZWdtZW50In19XX0sICJwYXJ0aXRpb25zIjogW3sidHJlYXRtZW50IjogIm9uIiwgInNpemUiOiAxMDB9LCB7InRyZWF0bWVudCI6ICJvZmYiLCAic2l6ZSI6IDB9XSwgImxhYmVsIjogImluIHJ1bGUgYmFzZWQgc2VnbWVudCBzYW1wbGVfcnVsZV9iYXNlZF9zZWdtZW50In0sIHsiY29uZGl0aW9uVHlwZSI6ICJST0xMT1VUIiwgIm1hdGNoZXJHcm91cCI6IHsiY29tYmluZXIiOiAiQU5EIiwgIm1hdGNoZXJzIjogW3sia2V5U2VsZWN0b3IiOiB7InRyYWZmaWNUeXBlIjogInVzZXIifSwgIm1hdGNoZXJUeXBlIjogIkFMTF9LRVlTIiwgIm5lZ2F0ZSI6IGZhbHNlfV19LCAicGFydGl0aW9ucyI6IFt7InRyZWF0bWVudCI6ICJvbiIsICJzaXplIjogMH0sIHsidHJlYXRtZW50IjogIm9mZiIsICJzaXplIjogMTAwfV0sICJsYWJlbCI6ICJkZWZhdWx0IHJ1bGUifV0sICJjb25maWd1cmF0aW9ucyI6IHt9LCAic2V0cyI6IFtdLCAiaW1wcmVzc2lvbnNEaXNhYmxlZCI6IGZhbHNlfQ==\\\"}\"}"; + RawMessageNotification rawMessageNotification = Json.fromJson(notification, RawMessageNotification.class); + GenericNotificationData genericNotificationData = Json.fromJson(rawMessageNotification.getData(), GenericNotificationData.class); + CommonChangeNotification featureFlagChangeNotification = new CommonChangeNotification(genericNotificationData, Split.class); + + featureFlagsWorker.executeRefresh(featureFlagChangeNotification); + Mockito.verify(synchronizer, Mockito.times(0)).refreshSplits(0L, 1684265694505L); + } } \ No newline at end of file diff --git a/client/src/test/java/io/split/engine/sse/workers/SplitsWorkerTest.java b/client/src/test/java/io/split/engine/sse/workers/SplitsWorkerTest.java index 2ece6c550..7e63fa554 100644 --- a/client/src/test/java/io/split/engine/sse/workers/SplitsWorkerTest.java +++ b/client/src/test/java/io/split/engine/sse/workers/SplitsWorkerTest.java @@ -1,12 +1,16 @@ package io.split.engine.sse.workers; +import io.split.client.dtos.Split; import io.split.client.interceptors.FlagSetsFilter; import io.split.client.interceptors.FlagSetsFilterImpl; import io.split.engine.common.Synchronizer; +import io.split.engine.experiments.RuleBasedSegmentParser; import io.split.engine.experiments.SplitParser; -import io.split.engine.sse.dtos.FeatureFlagChangeNotification; +import io.split.engine.sse.dtos.CommonChangeNotification; import io.split.engine.sse.dtos.GenericNotificationData; +import io.split.engine.sse.dtos.IncomingNotification; import io.split.engine.sse.dtos.SplitKillNotification; +import io.split.storages.RuleBasedSegmentCache; import io.split.storages.SplitCacheProducer; import io.split.telemetry.storage.InMemoryTelemetryStorage; import io.split.telemetry.storage.TelemetryRuntimeProducer; @@ -28,14 +32,16 @@ public class SplitsWorkerTest { public void addToQueueWithoutElementsWShouldNotTriggerFetch() throws InterruptedException { Synchronizer splitFetcherMock = Mockito.mock(Synchronizer.class); SplitParser splitParser = new SplitParser(); + RuleBasedSegmentParser ruleBasedSegmentParser = new RuleBasedSegmentParser(); SplitCacheProducer splitCacheProducer = Mockito.mock(SplitCacheProducer.class); + RuleBasedSegmentCache ruleBasedSegmentCache = Mockito.mock(RuleBasedSegmentCache.class); TelemetryRuntimeProducer telemetryRuntimeProducer = Mockito.mock(InMemoryTelemetryStorage.class); - FeatureFlagsWorker featureFlagsWorker = new FeatureFlagWorkerImp(splitFetcherMock, splitParser, splitCacheProducer, telemetryRuntimeProducer, FLAG_SETS_FILTER); + FeatureFlagsWorker featureFlagsWorker = new FeatureFlagWorkerImp(splitFetcherMock, splitParser, ruleBasedSegmentParser, splitCacheProducer, ruleBasedSegmentCache, telemetryRuntimeProducer, FLAG_SETS_FILTER); featureFlagsWorker.start(); Thread.sleep(500); - Mockito.verify(splitFetcherMock, Mockito.never()).refreshSplits(Mockito.anyObject()); + Mockito.verify(splitFetcherMock, Mockito.never()).refreshSplits(Mockito.anyObject(), Mockito.anyObject()); featureFlagsWorker.stop(); } @@ -43,29 +49,36 @@ public void addToQueueWithoutElementsWShouldNotTriggerFetch() throws Interrupted public void addToQueueWithElementsWShouldTriggerFetch() throws InterruptedException { Synchronizer syncMock = Mockito.mock(Synchronizer.class); SplitParser splitParser = new SplitParser(); + RuleBasedSegmentParser ruleBasedSegmentParser = new RuleBasedSegmentParser(); SplitCacheProducer splitCacheProducer = Mockito.mock(SplitCacheProducer.class); + RuleBasedSegmentCache ruleBasedSegmentCache = Mockito.mock(RuleBasedSegmentCache.class); TelemetryRuntimeProducer telemetryRuntimeProducer = Mockito.mock(InMemoryTelemetryStorage.class); - FeatureFlagsWorker featureFlagsWorker = new FeatureFlagWorkerImp(syncMock, splitParser, splitCacheProducer, telemetryRuntimeProducer, FLAG_SETS_FILTER); + FeatureFlagsWorker featureFlagsWorker = new FeatureFlagWorkerImp(syncMock, splitParser, ruleBasedSegmentParser, splitCacheProducer, ruleBasedSegmentCache, telemetryRuntimeProducer, FLAG_SETS_FILTER); featureFlagsWorker.start(); ArgumentCaptor cnCaptor = ArgumentCaptor.forClass(Long.class); + ArgumentCaptor cnCaptor2 = ArgumentCaptor.forClass(Long.class); - featureFlagsWorker.addToQueue(new FeatureFlagChangeNotification(GenericNotificationData.builder() + featureFlagsWorker.addToQueue(new CommonChangeNotification(GenericNotificationData.builder() .changeNumber(1585956698457L) - .build())); - featureFlagsWorker.addToQueue(new FeatureFlagChangeNotification(GenericNotificationData.builder() + .type(IncomingNotification.Type.SPLIT_UPDATE) + .build(), Split.class)); + featureFlagsWorker.addToQueue(new CommonChangeNotification(GenericNotificationData.builder() .changeNumber(1585956698467L) - .build())); - featureFlagsWorker.addToQueue(new FeatureFlagChangeNotification(GenericNotificationData.builder() + .type(IncomingNotification.Type.SPLIT_UPDATE) + .build(), Split.class)); + featureFlagsWorker.addToQueue(new CommonChangeNotification(GenericNotificationData.builder() .changeNumber(1585956698477L) - .build())); - featureFlagsWorker.addToQueue(new FeatureFlagChangeNotification(GenericNotificationData.builder() + .type(IncomingNotification.Type.SPLIT_UPDATE) + .build(), Split.class)); + featureFlagsWorker.addToQueue(new CommonChangeNotification(GenericNotificationData.builder() .changeNumber(1585956698476L) - .build())); + .type(IncomingNotification.Type.SPLIT_UPDATE) + .build(), Split.class)); Thread.sleep(1000); - Mockito.verify(syncMock, Mockito.times(4)).refreshSplits(cnCaptor.capture()); + Mockito.verify(syncMock, Mockito.times(4)).refreshSplits(cnCaptor.capture(), cnCaptor2.capture()); List captured = cnCaptor.getAllValues(); assertThat(captured, contains(1585956698457L, 1585956698467L, 1585956698477L, 1585956698476L)); featureFlagsWorker.stop(); @@ -79,9 +92,11 @@ public void killShouldTriggerFetch() { Synchronizer syncMock = Mockito.mock(Synchronizer.class); SplitParser splitParser = new SplitParser(); + RuleBasedSegmentParser ruleBasedSegmentParser = new RuleBasedSegmentParser(); SplitCacheProducer splitCacheProducer = Mockito.mock(SplitCacheProducer.class); + RuleBasedSegmentCache ruleBasedSegmentCache = Mockito.mock(RuleBasedSegmentCache.class); TelemetryRuntimeProducer telemetryRuntimeProducer = Mockito.mock(InMemoryTelemetryStorage.class); - FeatureFlagsWorker featureFlagsWorker = new FeatureFlagWorkerImp(syncMock, splitParser, splitCacheProducer, telemetryRuntimeProducer, FLAG_SETS_FILTER) { + FeatureFlagsWorker featureFlagsWorker = new FeatureFlagWorkerImp(syncMock, splitParser, ruleBasedSegmentParser, splitCacheProducer, ruleBasedSegmentCache, telemetryRuntimeProducer, FLAG_SETS_FILTER) { }; featureFlagsWorker.start(); SplitKillNotification splitKillNotification = new SplitKillNotification(GenericNotificationData.builder() @@ -99,31 +114,36 @@ public void killShouldTriggerFetch() { public void messagesNotProcessedWhenWorkerStopped() throws InterruptedException { Synchronizer syncMock = Mockito.mock(Synchronizer.class); SplitParser splitParser = new SplitParser(); + RuleBasedSegmentParser ruleBasedSegmentParser = new RuleBasedSegmentParser(); SplitCacheProducer splitCacheProducer = Mockito.mock(SplitCacheProducer.class); + RuleBasedSegmentCache ruleBasedSegmentCache = Mockito.mock(RuleBasedSegmentCache.class); TelemetryRuntimeProducer telemetryRuntimeProducer = Mockito.mock(InMemoryTelemetryStorage.class); - FeatureFlagsWorker featureFlagsWorker = new FeatureFlagWorkerImp(syncMock, splitParser, splitCacheProducer, telemetryRuntimeProducer, FLAG_SETS_FILTER); + FeatureFlagsWorker featureFlagsWorker = new FeatureFlagWorkerImp(syncMock, splitParser, ruleBasedSegmentParser, splitCacheProducer, ruleBasedSegmentCache, telemetryRuntimeProducer, FLAG_SETS_FILTER); featureFlagsWorker.start(); - featureFlagsWorker.addToQueue(new FeatureFlagChangeNotification(GenericNotificationData.builder() + featureFlagsWorker.addToQueue(new CommonChangeNotification(GenericNotificationData.builder() .changeNumber(1585956698457L) - .build())); + .type(IncomingNotification.Type.SPLIT_UPDATE) + .build(), Split.class)); Thread.sleep(500); featureFlagsWorker.stop(); Thread.sleep(500); - featureFlagsWorker.addToQueue(new FeatureFlagChangeNotification(GenericNotificationData.builder() + featureFlagsWorker.addToQueue(new CommonChangeNotification(GenericNotificationData.builder() .changeNumber(1585956698467L) - .build())); - Mockito.verify(syncMock, Mockito.times(1)).refreshSplits(Mockito.anyObject()); // Previous one! + .type(IncomingNotification.Type.SPLIT_UPDATE) + .build(), Split.class)); + Mockito.verify(syncMock, Mockito.times(1)).refreshSplits(Mockito.anyObject(), Mockito.anyObject()); // Previous one! Mockito.reset(syncMock); featureFlagsWorker.start(); - featureFlagsWorker.addToQueue(new FeatureFlagChangeNotification(GenericNotificationData.builder() + featureFlagsWorker.addToQueue(new CommonChangeNotification(GenericNotificationData.builder() .changeNumber(1585956698477L) - .build())); + .type(IncomingNotification.Type.SPLIT_UPDATE) + .build(), Split.class)); Thread.sleep(500); - Mockito.verify(syncMock, Mockito.times(1)).refreshSplits(Mockito.anyObject()); + Mockito.verify(syncMock, Mockito.times(1)).refreshSplits(Mockito.anyObject(), Mockito.anyObject()); featureFlagsWorker.stop(); } } \ No newline at end of file