diff --git a/data-plane/benchmarks/resources/filter-class-list.txt b/data-plane/benchmarks/resources/filter-class-list.txt index f050db4586..0307d60a2b 100644 --- a/data-plane/benchmarks/resources/filter-class-list.txt +++ b/data-plane/benchmarks/resources/filter-class-list.txt @@ -2,3 +2,5 @@ ExactFilterBenchmark NotFilterBenchmark PrefixFilterBenchmark SuffixFilterBenchmark +AnyFilterBenchmark +AllFilterBenchmark diff --git a/data-plane/benchmarks/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/filter/AllFilterBenchmark.java b/data-plane/benchmarks/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/filter/AllFilterBenchmark.java index 2faff3027e..43f18305cf 100644 --- a/data-plane/benchmarks/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/filter/AllFilterBenchmark.java +++ b/data-plane/benchmarks/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/filter/AllFilterBenchmark.java @@ -16,16 +16,17 @@ package dev.knative.eventing.kafka.broker.dispatcher.impl.filter; +import com.google.common.collect.ImmutableList; import dev.knative.eventing.kafka.broker.dispatcher.Filter; import dev.knative.eventing.kafka.broker.dispatcher.impl.filter.subscriptionsapi.AllFilter; import dev.knative.eventing.kafka.broker.dispatcher.impl.filter.subscriptionsapi.ExactFilter; import dev.knative.eventing.kafka.broker.dispatcher.impl.filter.subscriptionsapi.PrefixFilter; import dev.knative.eventing.kafka.broker.dispatcher.impl.filter.subscriptionsapi.SuffixFilter; import io.cloudevents.CloudEvent; -import java.util.List; import java.util.Map; public class AllFilterBenchmark { + public static ExactFilter makeExactFilter() { return new ExactFilter(Map.of("type", "com.github.pull.create")); } @@ -50,7 +51,7 @@ public static class AllFilterWithExactFilter extends FilterBenchmark { @Override protected Filter createFilter() { - return new AllFilter(List.of(makeExactFilter())); + return new AllFilter(ImmutableList.of(makeExactFilter()), vertx, FILTER_REORDER_TIME_MILLISECONDS); } @Override @@ -63,7 +64,10 @@ public static class AllFilterMatchAllSubFilters extends FilterBenchmark { @Override protected Filter createFilter() { - return new AllFilter(List.of(makeExactFilter(), makePrefixFilter(), makeSuffixFilter())); + return new AllFilter( + ImmutableList.of(makeExactFilter(), makePrefixFilter(), makeSuffixFilter()), + vertx, + FILTER_REORDER_TIME_MILLISECONDS); } @Override @@ -76,7 +80,10 @@ public static class AllFilterFirstMatchEndOfArray extends FilterBenchmark { @Override protected Filter createFilter() { - return new AllFilter(List.of(makePrefixFilterNoMatch(), makeSuffixFilterNoMatch(), makeExactFilter())); + return new AllFilter( + ImmutableList.of(makePrefixFilterNoMatch(), makeSuffixFilterNoMatch(), makeExactFilter()), + vertx, + FILTER_REORDER_TIME_MILLISECONDS); } @Override @@ -89,7 +96,10 @@ public static class AllFilterFirstMatchStartOfArray extends FilterBenchmark { @Override protected Filter createFilter() { - return new AllFilter(List.of(makeExactFilter(), makePrefixFilterNoMatch(), makeSuffixFilterNoMatch())); + return new AllFilter( + ImmutableList.of(makeExactFilter(), makePrefixFilterNoMatch(), makeSuffixFilterNoMatch()), + vertx, + FILTER_REORDER_TIME_MILLISECONDS); } @Override @@ -102,7 +112,10 @@ public static class AllFilterOneNonMatchingFilterInMiddle extends FilterBenchmar @Override protected Filter createFilter() { - return new AllFilter(List.of(makeExactFilter(), makePrefixFilterNoMatch(), makePrefixFilter())); + return new AllFilter( + ImmutableList.of(makeExactFilter(), makePrefixFilterNoMatch(), makePrefixFilter()), + vertx, + FILTER_REORDER_TIME_MILLISECONDS); } @Override @@ -115,7 +128,10 @@ public static class AllFilterNoMatchingFilters extends FilterBenchmark { @Override protected Filter createFilter() { - return new AllFilter(List.of(makePrefixFilterNoMatch(), makeSuffixFilterNoMatch())); + return new AllFilter( + ImmutableList.of(makePrefixFilterNoMatch(), makeSuffixFilterNoMatch()), + vertx, + FILTER_REORDER_TIME_MILLISECONDS); } @Override diff --git a/data-plane/benchmarks/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/filter/AnyFilterBenchmark.java b/data-plane/benchmarks/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/filter/AnyFilterBenchmark.java index 702bc75d7f..b8347a0302 100644 --- a/data-plane/benchmarks/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/filter/AnyFilterBenchmark.java +++ b/data-plane/benchmarks/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/filter/AnyFilterBenchmark.java @@ -16,10 +16,10 @@ package dev.knative.eventing.kafka.broker.dispatcher.impl.filter; +import com.google.common.collect.ImmutableList; import dev.knative.eventing.kafka.broker.dispatcher.Filter; import dev.knative.eventing.kafka.broker.dispatcher.impl.filter.subscriptionsapi.*; import io.cloudevents.CloudEvent; -import java.util.List; import java.util.Map; public class AnyFilterBenchmark { @@ -48,7 +48,7 @@ public static PrefixFilter makePrefixFilterNoMatch() { return new PrefixFilter(Map.of("type", "other.event")); } - public static SuffixFilter makeSufficFilterNoMatch() { + public static SuffixFilter makeSuffixFilterNoMatch() { return new SuffixFilter(Map.of("source", "qwertyuiop")); } @@ -56,7 +56,7 @@ public static class AnyFilterWithExactFilterBenchmark extends FilterBenchmark { @Override protected Filter createFilter() { - return new AnyFilter(List.of(makeExactFilter())); + return new AnyFilter(ImmutableList.of(makeExactFilter()), vertx, FILTER_REORDER_TIME_MILLISECONDS); } @Override @@ -69,7 +69,10 @@ public static class AnyFilterMatchAllSubfilters extends FilterBenchmark { @Override protected Filter createFilter() { - return new AnyFilter(List.of(makeExactFilter(), makePrefixFilter(), makeSuffixFilter())); + return new AnyFilter( + ImmutableList.of(makeExactFilter(), makePrefixFilter(), makeSuffixFilter()), + vertx, + FILTER_REORDER_TIME_MILLISECONDS); } @Override @@ -82,7 +85,10 @@ public static class AnyFilterFirstMatchAtEnd extends FilterBenchmark { @Override protected Filter createFilter() { - return new AnyFilter(List.of(makePrefixFilterNoMatch(), makeSufficFilterNoMatch(), makeExactFilter())); + return new AnyFilter( + ImmutableList.of(makePrefixFilterNoMatch(), makeSuffixFilterNoMatch(), makeExactFilter()), + vertx, + FILTER_REORDER_TIME_MILLISECONDS); } @Override @@ -95,7 +101,10 @@ public static class AnyFilterFirstMatchAtStart extends FilterBenchmark { @Override protected Filter createFilter() { - return new AnyFilter(List.of(makeExactFilter(), makePrefixFilterNoMatch(), makeSufficFilterNoMatch())); + return new AnyFilter( + ImmutableList.of(makeExactFilter(), makePrefixFilterNoMatch(), makeSuffixFilterNoMatch()), + vertx, + FILTER_REORDER_TIME_MILLISECONDS); } @Override @@ -108,7 +117,10 @@ public static class AnyFilter2EventsMatch2DifferentFilters extends FilterBenchma @Override protected Filter createFilter() { - return new AnyFilter(List.of(makePrefixFilter(), makePrefixFilterNoMatch())); + return new AnyFilter( + ImmutableList.of(makePrefixFilter(), makePrefixFilterNoMatch()), + vertx, + FILTER_REORDER_TIME_MILLISECONDS); } @Override @@ -120,7 +132,10 @@ protected CloudEvent createEvent() { public static class AnyFilter2EventsMatch2DifferentFiltersOneFilterMatchesNeither extends FilterBenchmark { @Override protected Filter createFilter() { - return new AnyFilter(List.of(makeSufficFilterNoMatch(), makePrefixFilter(), makePrefixFilterNoMatch())); + return new AnyFilter( + ImmutableList.of(makeSuffixFilterNoMatch(), makePrefixFilter(), makePrefixFilterNoMatch()), + vertx, + FILTER_REORDER_TIME_MILLISECONDS); } @Override diff --git a/data-plane/benchmarks/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/filter/FilterBenchmark.java b/data-plane/benchmarks/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/filter/FilterBenchmark.java index bc29350bad..a3aaf04b86 100644 --- a/data-plane/benchmarks/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/filter/FilterBenchmark.java +++ b/data-plane/benchmarks/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/filter/FilterBenchmark.java @@ -18,6 +18,7 @@ import dev.knative.eventing.kafka.broker.dispatcher.Filter; import io.cloudevents.CloudEvent; +import io.vertx.core.Vertx; import java.util.concurrent.TimeUnit; import org.openjdk.jmh.annotations.*; import org.openjdk.jmh.infra.Blackhole; @@ -32,8 +33,18 @@ public abstract class FilterBenchmark { Filter filter; CloudEvent cloudEvent; + Vertx vertx; + + public static final long FILTER_REORDER_TIME_MILLISECONDS = 10000; // 1 seconds + + @TearDown + public void closeVertx() { + this.vertx.close(); + } + @Setup(Level.Trial) public void setupFilter() { + this.vertx = Vertx.vertx(); this.filter = createFilter(); } @@ -42,13 +53,20 @@ public void setupCloudEvent() { this.cloudEvent = createEvent(); } + @TearDown(Level.Trial) + public void teardown() { + this.filter.close(vertx); + } + protected abstract Filter createFilter(); protected abstract CloudEvent createEvent(); @Benchmark public void benchmarkFilterCreation(Blackhole bh) { - bh.consume(this.createFilter()); + final var filter = this.createFilter(); + filter.close(vertx); + bh.consume(filter); } @Benchmark diff --git a/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/Filter.java b/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/Filter.java index 67248bef94..aaac496ed9 100644 --- a/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/Filter.java +++ b/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/Filter.java @@ -16,6 +16,7 @@ package dev.knative.eventing.kafka.broker.dispatcher; import io.cloudevents.CloudEvent; +import io.vertx.core.Vertx; import java.util.function.Predicate; /** @@ -30,4 +31,18 @@ public interface Filter extends Predicate { static Filter noop() { return ce -> true; } + + default int getCount() { + return 0; + } + ; + + default int incrementCount() { + return 0; + } + ; + + default void close(Vertx vertx) { + return; + } } diff --git a/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/RecordDispatcherImpl.java b/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/RecordDispatcherImpl.java index b62604d0e9..19b1c36b38 100644 --- a/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/RecordDispatcherImpl.java +++ b/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/RecordDispatcherImpl.java @@ -77,6 +77,8 @@ public class RecordDispatcherImpl implements RecordDispatcher { private static final String EKB_ERROR_PREFIX = "kne-"; private static final int KN_ERROR_DATA_MAX_BYTES = 1024; + private final Vertx vertx; + private final Filter filter; private final Function, Future>> subscriberSender; private final Function, Future>> dlsSender; @@ -103,6 +105,7 @@ public class RecordDispatcherImpl implements RecordDispatcher { * @param consumerTracer consumer tracer */ public RecordDispatcherImpl( + final Vertx vertx, final ConsumerVerticleContext consumerVerticleContext, final Filter filter, final CloudEventSender subscriberSender, @@ -111,6 +114,7 @@ public RecordDispatcherImpl( final RecordDispatcherListener recordDispatcherListener, final ConsumerTracer consumerTracer, final MeterRegistry meterRegistry) { + Objects.requireNonNull(vertx, "provide vertx"); Objects.requireNonNull(consumerVerticleContext, "provide consumerVerticleContext"); Objects.requireNonNull(filter, "provide filter"); Objects.requireNonNull(subscriberSender, "provide subscriberSender"); @@ -118,6 +122,7 @@ public RecordDispatcherImpl( Objects.requireNonNull(recordDispatcherListener, "provide offsetStrategy"); Objects.requireNonNull(responseHandler, "provide sinkResponseHandler"); + this.vertx = vertx; this.consumerVerticleContext = consumerVerticleContext; this.filter = filter; this.subscriberSender = composeSenderAndSinkHandler(subscriberSender, responseHandler, "subscriber"); @@ -503,6 +508,8 @@ private void recordReceived(final ConsumerRecordContext recordContext) { public Future close() { this.closed.set(true); + this.filter.close(vertx); + Metrics.searchEgressMeters( meterRegistry, consumerVerticleContext.getEgress().getReference()) .forEach(meterRegistry::remove); diff --git a/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/filter/AttributesFilter.java b/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/filter/AttributesFilter.java index 876fc5d1b3..459400cb3f 100644 --- a/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/filter/AttributesFilter.java +++ b/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/filter/AttributesFilter.java @@ -64,6 +64,8 @@ public AttributeEntry(String name, String expectedValue, Function attributes; + private int count; + /** * All args constructor. * @@ -81,6 +83,7 @@ public AttributesFilter(final Map attributes) { } }))) .collect(Collectors.toUnmodifiableList()); + this.count = 0; } /** @@ -132,4 +135,14 @@ private static String getOrDefault(@Nullable final T s, final Function filters; private static final Logger logger = LoggerFactory.getLogger(AllFilter.class); - public AllFilter(List filters) { - this.filters = filters; + private final AtomicReference> filters; + + private int count; + + private final long periodicTimerId; + + private boolean shouldReorder; + + public AllFilter(ImmutableList filters, Vertx vertx, long delayMilliseconds) { + logger.debug("Starting with timeout {}", delayMilliseconds); + this.periodicTimerId = vertx.setPeriodic(delayMilliseconds, this::reorder); + this.count = 0; + this.filters = new AtomicReference<>(filters); } - @Override - public boolean test(CloudEvent cloudEvent) { + private void reorder(Long id) { + if (!this.shouldReorder) { + return; + } + logger.debug("Reordering filters!"); + this.filters.updateAndGet((filterCounters -> filterCounters.stream() + .sorted(Comparator.comparingInt(Filter::getCount).reversed()) + .collect(ImmutableList.toImmutableList()))); + this.shouldReorder = false; + } + + private static boolean test(CloudEvent cloudEvent, List filters, Consumer shouldReorder) { logger.debug("Testing event against ALL filters. Event {}", cloudEvent); - for (Filter filter : filters) { + int i = 0; + for (final Filter filter : filters) { if (!filter.test(cloudEvent)) { + int count = filter.incrementCount(); + if (i != 0 && count > 2 * filters.get(i - 1).getCount()) { + shouldReorder.accept(true); + } logger.debug("Test failed. Filter {} Event {}", filter, cloudEvent); return false; } + i++; } logger.debug("Test ALL filters succeeded. Event {}", cloudEvent); return true; } + + @Override + public int getCount() { + return this.count; + } + + @Override + public int incrementCount() { + return this.count++; + } + + private void setShouldReorder(boolean shouldReorder) { + this.shouldReorder = shouldReorder; + } + + @Override + public boolean test(CloudEvent cloudEvent) { + return AllFilter.test(cloudEvent, this.filters.get(), this::setShouldReorder); + } + + @Override + public void close(Vertx vertx) { + logger.debug("Closing periodic reorder job"); + vertx.cancelTimer(this.periodicTimerId); + this.filters.get().forEach((f) -> f.close(vertx)); + } } diff --git a/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/filter/subscriptionsapi/AnyFilter.java b/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/filter/subscriptionsapi/AnyFilter.java index 2325300801..24bb316d97 100644 --- a/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/filter/subscriptionsapi/AnyFilter.java +++ b/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/filter/subscriptionsapi/AnyFilter.java @@ -15,9 +15,13 @@ */ package dev.knative.eventing.kafka.broker.dispatcher.impl.filter.subscriptionsapi; +import com.google.common.collect.ImmutableList; import dev.knative.eventing.kafka.broker.dispatcher.Filter; import io.cloudevents.CloudEvent; -import java.util.List; +import io.vertx.core.Vertx; +import java.util.Comparator; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Consumer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -25,23 +29,71 @@ public class AnyFilter implements Filter { private static final Logger logger = LoggerFactory.getLogger(AnyFilter.class); - private final List filters; + private final AtomicReference> filters; - public AnyFilter(List filters) { - this.filters = filters; + private int count; + + private final long periodicTimerId; + + private boolean shouldReorder; + + public AnyFilter(ImmutableList filters, Vertx vertx, long delayMilliseconds) { + this.periodicTimerId = vertx.setPeriodic(delayMilliseconds, this::reorder); + this.count = 0; + this.filters = new AtomicReference<>(filters); } - @Override - public boolean test(CloudEvent cloudEvent) { - logger.debug("Testing event against ANY filter. Event {}", cloudEvent); + private void reorder(Long id) { + if (!this.shouldReorder) { + return; + } + logger.debug("Reordering ANY filter!"); + this.filters.updateAndGet((filterCounters -> filterCounters.stream() + .sorted(Comparator.comparingInt(Filter::getCount).reversed()) + .collect(ImmutableList.toImmutableList()))); + this.shouldReorder = false; + } - for (Filter filter : filters) { + private static boolean test(CloudEvent cloudEvent, ImmutableList filters, Consumer shouldReorder) { + logger.debug("Testing event against ANY filter. Event {}", cloudEvent); + int i = 0; + for (final Filter filter : filters) { if (filter.test(cloudEvent)) { + int count = filter.incrementCount(); + if (i != 0 && count > 2 * filters.get(i - 1).getCount()) { + shouldReorder.accept(true); + } logger.debug("Test succeeded. Filter {} Event {}", filter, cloudEvent); return true; } + i++; } logger.debug("Test failed. All filters failed. Event {}", cloudEvent); return false; } + + @Override + public int getCount() { + return this.count; + } + + @Override + public int incrementCount() { + return this.count++; + } + + private void setShouldReorder(boolean shouldReorder) { + this.shouldReorder = shouldReorder; + } + + @Override + public boolean test(CloudEvent cloudEvent) { + return AnyFilter.test(cloudEvent, this.filters.get(), this::setShouldReorder); + } + + @Override + public void close(Vertx vertx) { + vertx.cancelTimer(this.periodicTimerId); + this.filters.get().forEach((f) -> f.close(vertx)); + } } diff --git a/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/filter/subscriptionsapi/CeSqlFilter.java b/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/filter/subscriptionsapi/CeSqlFilter.java index df3c935d1c..d386d8bf5e 100644 --- a/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/filter/subscriptionsapi/CeSqlFilter.java +++ b/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/filter/subscriptionsapi/CeSqlFilter.java @@ -29,12 +29,15 @@ public class CeSqlFilter implements Filter { private static final Logger logger = LoggerFactory.getLogger(CeSqlFilter.class); + private int count; + private final Expression expression; private final EvaluationRuntime runtime; public CeSqlFilter(String sqlExpression) { this.expression = Parser.parseDefault(sqlExpression); this.runtime = EvaluationRuntime.getDefault(); + this.count = 0; } @Override @@ -54,4 +57,14 @@ public boolean test(CloudEvent cloudEvent) { return false; } } + + @Override + public int getCount() { + return this.count; + } + + @Override + public int incrementCount() { + return this.count++; + } } diff --git a/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/filter/subscriptionsapi/NotFilter.java b/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/filter/subscriptionsapi/NotFilter.java index 8d90034751..5dc898a44d 100644 --- a/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/filter/subscriptionsapi/NotFilter.java +++ b/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/filter/subscriptionsapi/NotFilter.java @@ -17,6 +17,7 @@ import dev.knative.eventing.kafka.broker.dispatcher.Filter; import io.cloudevents.CloudEvent; +import io.vertx.core.Vertx; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -26,8 +27,11 @@ public class NotFilter implements Filter { private final Filter filter; + private int count; + public NotFilter(Filter filter) { this.filter = filter; + this.count = 0; } @Override @@ -38,4 +42,19 @@ public boolean test(CloudEvent cloudEvent) { logger.debug("{}: Filter {} - Event {}", result, this.filter, cloudEvent); return passed; } + + @Override + public int getCount() { + return this.count; + } + + @Override + public int incrementCount() { + return this.count++; + } + + @Override + public void close(Vertx vertx) { + this.filter.close(vertx); + } } diff --git a/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/main/ConsumerVerticleBuilder.java b/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/main/ConsumerVerticleBuilder.java index b6a5f038cd..17704efbef 100644 --- a/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/main/ConsumerVerticleBuilder.java +++ b/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/main/ConsumerVerticleBuilder.java @@ -17,6 +17,7 @@ import static dev.knative.eventing.kafka.broker.core.utils.Logging.keyValue; +import com.google.common.collect.ImmutableList; import dev.knative.eventing.kafka.broker.contract.DataPlaneContract; import dev.knative.eventing.kafka.broker.core.ReactiveKafkaConsumer; import dev.knative.eventing.kafka.broker.core.ReactiveKafkaProducer; @@ -61,7 +62,6 @@ import java.util.List; import java.util.Properties; import java.util.concurrent.TimeUnit; -import java.util.stream.Collectors; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; import org.apache.kafka.common.TopicPartition; @@ -70,6 +70,8 @@ public class ConsumerVerticleBuilder { private static final CloudEventSender NO_DEAD_LETTER_SINK_SENDER = CloudEventSender.noop("No dead letter sink set"); + private static final long FILTER_REORDER_TIME_MILLISECONDS = 1000 * 60 * 5; // 5 minutes + private final ConsumerVerticleContext consumerVerticleContext; public ConsumerVerticleBuilder(final ConsumerVerticleContext consumerVerticleContext) { @@ -108,8 +110,9 @@ private void build(final Vertx vertx, final ConsumerVerticle consumerVerticle, f final var recordDispatcher = new RecordDispatcherMutatorChain( new RecordDispatcherImpl( + vertx, consumerVerticleContext, - getFilter(), + getFilter(vertx), egressSubscriberSender, egressDeadLetterSender, responseHandler, @@ -179,10 +182,10 @@ public void onPartitionsAssigned(Collection partitions) { }; } - private Filter getFilter() { + private Filter getFilter(Vertx vertx) { // Dialected filters should override the attributes filter if (consumerVerticleContext.getEgress().getDialectedFilterCount() > 0) { - return getFilter(consumerVerticleContext.getEgress().getDialectedFilterList()); + return getFilter(consumerVerticleContext.getEgress().getDialectedFilterList(), vertx); } else if (consumerVerticleContext.getEgress().hasFilter()) { return new ExactFilter( consumerVerticleContext.getEgress().getFilter().getAttributesMap()); @@ -190,23 +193,34 @@ private Filter getFilter() { return Filter.noop(); } - private static Filter getFilter(List filters) { + private static Filter getFilter(List filters, Vertx vertx) { return new AllFilter( - filters.stream().map(ConsumerVerticleBuilder::getFilter).collect(Collectors.toList())); + filters.stream() + .map((DataPlaneContract.DialectedFilter filter) -> + ConsumerVerticleBuilder.getFilter(filter, vertx)) + .collect(ImmutableList.toImmutableList()), + vertx, + FILTER_REORDER_TIME_MILLISECONDS); } - private static Filter getFilter(DataPlaneContract.DialectedFilter filter) { + private static Filter getFilter(DataPlaneContract.DialectedFilter filter, Vertx vertx) { return switch (filter.getFilterCase()) { case EXACT -> new ExactFilter(filter.getExact().getAttributesMap()); case PREFIX -> new PrefixFilter(filter.getPrefix().getAttributesMap()); case SUFFIX -> new SuffixFilter(filter.getSuffix().getAttributesMap()); - case NOT -> new NotFilter(getFilter(filter.getNot().getFilter())); - case ANY -> new AnyFilter(filter.getAny().getFiltersList().stream() - .map(ConsumerVerticleBuilder::getFilter) - .collect(Collectors.toList())); - case ALL -> new AllFilter(filter.getAll().getFiltersList().stream() - .map(ConsumerVerticleBuilder::getFilter) - .collect(Collectors.toList())); + case NOT -> new NotFilter(getFilter(filter.getNot().getFilter(), vertx)); + case ANY -> new AnyFilter( + filter.getAny().getFiltersList().stream() + .map((DataPlaneContract.DialectedFilter f) -> ConsumerVerticleBuilder.getFilter(f, vertx)) + .collect(ImmutableList.toImmutableList()), + vertx, + FILTER_REORDER_TIME_MILLISECONDS); + case ALL -> new AllFilter( + filter.getAll().getFiltersList().stream() + .map((DataPlaneContract.DialectedFilter f) -> ConsumerVerticleBuilder.getFilter(f, vertx)) + .collect(ImmutableList.toImmutableList()), + vertx, + FILTER_REORDER_TIME_MILLISECONDS); case CESQL -> new CeSqlFilter(filter.getCesql().getExpression()); default -> Filter.noop(); }; diff --git a/data-plane/dispatcher/src/test/java/dev/knative/eventing/kafka/broker/dispatcher/impl/RecordDispatcherTest.java b/data-plane/dispatcher/src/test/java/dev/knative/eventing/kafka/broker/dispatcher/impl/RecordDispatcherTest.java index 98ce91556d..4a9f768cb8 100644 --- a/data-plane/dispatcher/src/test/java/dev/knative/eventing/kafka/broker/dispatcher/impl/RecordDispatcherTest.java +++ b/data-plane/dispatcher/src/test/java/dev/knative/eventing/kafka/broker/dispatcher/impl/RecordDispatcherTest.java @@ -46,6 +46,7 @@ import io.micrometer.prometheus.PrometheusMeterRegistry; import io.vertx.core.Future; import io.vertx.core.MultiMap; +import io.vertx.core.Vertx; import io.vertx.core.buffer.Buffer; import io.vertx.core.http.HttpVersion; import io.vertx.ext.web.client.impl.HttpResponseImpl; @@ -64,6 +65,7 @@ @ExtendWith(VertxExtension.class) public class RecordDispatcherTest { + Vertx vertx = Vertx.vertx(); private static final ConsumerVerticleContext resourceContext = FakeConsumerVerticleContext.get( FakeConsumerVerticleContext.get().getResource(), @@ -91,6 +93,7 @@ public void shouldNotSendToSubscriberNorToDeadLetterSinkIfValueDoesntMatch() { final RecordDispatcherListener receiver = offsetManagerMock(); final var dispatcherHandler = new RecordDispatcherImpl( + vertx, resourceContext, value -> false, CloudEventSender.noop("subscriber send called"), @@ -122,6 +125,7 @@ public void shouldSendOnlyToSubscriberIfValueMatches() { final RecordDispatcherListener receiver = offsetManagerMock(); final var dispatcherHandler = new RecordDispatcherImpl( + vertx, resourceContext, value -> true, new CloudEventSenderMock(record -> { @@ -161,6 +165,7 @@ public void shouldSendToDeadLetterSinkIfValueMatchesAndSubscriberSenderFails() { final RecordDispatcherListener receiver = offsetManagerMock(); final var dispatcherHandler = new RecordDispatcherImpl( + vertx, resourceContext, value -> true, new CloudEventSenderMock(record -> { @@ -201,6 +206,7 @@ public void shouldCallFailedToSendToDeadLetterSinkIfValueMatchesAndSubscriberAnd final RecordDispatcherListener receiver = offsetManagerMock(); final var dispatcherHandler = new RecordDispatcherImpl( + vertx, resourceContext, value -> true, new CloudEventSenderMock(record -> { @@ -244,6 +250,7 @@ public void failedEventsShouldBeEnhancedWithErrorExtensionsPriorToSendingToDls() String errorBody = "{ \"message\": \"bad bad things happened\" }"; final var dispatcherHandler = new RecordDispatcherImpl( + vertx, resourceContext, value -> true, new CloudEventSenderMock(record -> { @@ -307,6 +314,7 @@ public void failedEventsShouldBeEnhancedWithCustomHttpHeaders() { MultiMap.caseInsensitiveMultiMap().add(validErrorKey, "hello").add(invalidErrorKey, "nope"); final var dispatcherHandler = new RecordDispatcherImpl( + vertx, resourceContext, value -> true, new CloudEventSenderMock(record -> { @@ -369,6 +377,7 @@ public void failedEventsShouldBeEnhancedWithErrorExtensionsPriorToSendingToDlsBo String errorBodyTooLarge = errorBody + "QWERTY"; final var dispatcherHandler = new RecordDispatcherImpl( + vertx, resourceContext, value -> true, new CloudEventSenderMock(record -> { @@ -440,6 +449,7 @@ private HttpResponseImpl makeHttpResponseWithHeaders(int statusCode, Str final RecordDispatcherListener receiver = offsetManagerMock(); final var dispatcherHandler = new RecordDispatcherImpl( + vertx, resourceContext, value -> true, new CloudEventSenderMock(record -> { @@ -481,6 +491,7 @@ public void shouldCloseSinkResponseHandlerSubscriberSenderAndDeadLetterSinkSende when(deadLetterSender.close()).thenReturn(Future.succeededFuture()); final RecordDispatcher recordDispatcher = new RecordDispatcherImpl( + vertx, resourceContext, Filter.noop(), subscriberSender, @@ -507,6 +518,7 @@ public void shouldDiscardRecordIfInvalidCloudEvent() { final RecordDispatcherListener receiver = offsetManagerMock(); final var dispatcherHandler = new RecordDispatcherImpl( + vertx, resourceContext, Filter.noop(), CloudEventSender.noop("subscriber send called"), diff --git a/data-plane/dispatcher/src/test/java/dev/knative/eventing/kafka/broker/dispatcher/impl/consumer/AbstractConsumerVerticleTest.java b/data-plane/dispatcher/src/test/java/dev/knative/eventing/kafka/broker/dispatcher/impl/consumer/AbstractConsumerVerticleTest.java index cd6c69989d..44dc6160e5 100644 --- a/data-plane/dispatcher/src/test/java/dev/knative/eventing/kafka/broker/dispatcher/impl/consumer/AbstractConsumerVerticleTest.java +++ b/data-plane/dispatcher/src/test/java/dev/knative/eventing/kafka/broker/dispatcher/impl/consumer/AbstractConsumerVerticleTest.java @@ -69,6 +69,7 @@ public abstract class AbstractConsumerVerticleTest { public void subscribedToTopic(final Vertx vertx, final VertxTestContext context) { final var consumer = new MockConsumer(OffsetResetStrategy.LATEST); final var recordDispatcher = new RecordDispatcherImpl( + vertx, resourceContext, value -> false, CloudEventSender.noop("subscriber send called"), @@ -116,6 +117,7 @@ public void onPartitionsAssigned(final Collection partitions) {} public void stop(final Vertx vertx, final VertxTestContext context) { final var consumer = new MockConsumer(OffsetResetStrategy.LATEST); final var recordDispatcher = new RecordDispatcherImpl( + vertx, resourceContext, value -> false, CloudEventSender.noop("subscriber send called"), @@ -199,6 +201,7 @@ public void shouldCloseEverything(final Vertx vertx, final VertxTestContext cont final var sinkClosed = new AtomicBoolean(false); final var recordDispatcher = new RecordDispatcherImpl( + vertx, resourceContext, ce -> true, new CloudEventSenderMock(record -> Future.succeededFuture(), () -> { diff --git a/data-plane/dispatcher/src/test/java/dev/knative/eventing/kafka/broker/dispatcher/impl/filter/subscriptionsapi/AllFilterTest.java b/data-plane/dispatcher/src/test/java/dev/knative/eventing/kafka/broker/dispatcher/impl/filter/subscriptionsapi/AllFilterTest.java index c77d6209a1..c471dd2884 100644 --- a/data-plane/dispatcher/src/test/java/dev/knative/eventing/kafka/broker/dispatcher/impl/filter/subscriptionsapi/AllFilterTest.java +++ b/data-plane/dispatcher/src/test/java/dev/knative/eventing/kafka/broker/dispatcher/impl/filter/subscriptionsapi/AllFilterTest.java @@ -17,14 +17,14 @@ import static org.assertj.core.api.Assertions.assertThat; +import com.google.common.collect.ImmutableList; import dev.knative.eventing.kafka.broker.dispatcher.Filter; import io.cloudevents.CloudEvent; import io.cloudevents.core.builder.CloudEventBuilder; +import io.vertx.core.Vertx; import java.net.URI; import java.time.OffsetDateTime; import java.time.ZoneOffset; -import java.util.Collections; -import java.util.List; import java.util.Map; import java.util.stream.Stream; import org.junit.jupiter.params.ParameterizedTest; @@ -33,6 +33,8 @@ public class AllFilterTest { + static final Vertx vertx = Vertx.vertx(); + static final CloudEvent event = CloudEventBuilder.v1() .withId("123-42") .withDataContentType("application/cloudevents+json") @@ -47,29 +49,42 @@ public class AllFilterTest { @MethodSource(value = {"testCases"}) public void match(CloudEvent event, Filter filter, boolean shouldMatch) { assertThat(filter.test(event)).isEqualTo(shouldMatch); + filter.close(vertx); } static Stream testCases() { return Stream.of( - Arguments.of(event, new AllFilter(List.of(new ExactFilter(Map.of("id", "123-42")))), true), Arguments.of( event, - new AllFilter(List.of( - new ExactFilter(Map.of("id", "123-42")), - new ExactFilter(Map.of("source", "/api/some-source")))), + new AllFilter(ImmutableList.of(new ExactFilter(Map.of("id", "123-42"))), vertx, 500), + true), + Arguments.of( + event, + new AllFilter( + ImmutableList.of( + new ExactFilter(Map.of("id", "123-42")), + new ExactFilter(Map.of("source", "/api/some-source"))), + vertx, + 500), true), Arguments.of( event, - new AllFilter(List.of( - new ExactFilter(Map.of("id", "123")), - new ExactFilter(Map.of("source", "/api/some-source")))), + new AllFilter( + ImmutableList.of( + new ExactFilter(Map.of("id", "123")), + new ExactFilter(Map.of("source", "/api/some-source"))), + vertx, + 500), false), Arguments.of( event, - new AllFilter(List.of( - new ExactFilter(Map.of("id", "123-42")), - new ExactFilter(Map.of("source", "/api/something-else")))), + new AllFilter( + ImmutableList.of( + new ExactFilter(Map.of("id", "123-42")), + new ExactFilter(Map.of("source", "/api/something-else"))), + vertx, + 500), false), - Arguments.of(event, new AllFilter(Collections.emptyList()), true)); + Arguments.of(event, new AllFilter(ImmutableList.of(), Vertx.vertx(), 500), true)); } } diff --git a/data-plane/dispatcher/src/test/java/dev/knative/eventing/kafka/broker/dispatcher/impl/filter/subscriptionsapi/AnyFilterTest.java b/data-plane/dispatcher/src/test/java/dev/knative/eventing/kafka/broker/dispatcher/impl/filter/subscriptionsapi/AnyFilterTest.java index c1564c4cb4..518af42caf 100644 --- a/data-plane/dispatcher/src/test/java/dev/knative/eventing/kafka/broker/dispatcher/impl/filter/subscriptionsapi/AnyFilterTest.java +++ b/data-plane/dispatcher/src/test/java/dev/knative/eventing/kafka/broker/dispatcher/impl/filter/subscriptionsapi/AnyFilterTest.java @@ -17,14 +17,14 @@ import static org.assertj.core.api.Assertions.assertThat; +import com.google.common.collect.ImmutableList; import dev.knative.eventing.kafka.broker.dispatcher.Filter; import io.cloudevents.CloudEvent; import io.cloudevents.core.builder.CloudEventBuilder; +import io.vertx.core.Vertx; import java.net.URI; import java.time.OffsetDateTime; import java.time.ZoneOffset; -import java.util.Collections; -import java.util.List; import java.util.Map; import java.util.stream.Stream; import org.junit.jupiter.params.ParameterizedTest; @@ -33,6 +33,8 @@ public class AnyFilterTest { + static final Vertx vertx = Vertx.vertx(); + static final CloudEvent event = CloudEventBuilder.v1() .withId("123-42") .withDataContentType("application/cloudevents+json") @@ -47,35 +49,51 @@ public class AnyFilterTest { @MethodSource(value = {"testCases"}) public void match(CloudEvent event, Filter filter, boolean shouldMatch) { assertThat(filter.test(event)).isEqualTo(shouldMatch); + filter.close(vertx); } static Stream testCases() { return Stream.of( - Arguments.of(event, new AnyFilter(List.of(new ExactFilter(Map.of("id", "123-42")))), true), Arguments.of( event, - new AnyFilter(List.of( - new ExactFilter(Map.of("id", "123-42")), - new ExactFilter(Map.of("source", "/api/some-source")))), + new AnyFilter(ImmutableList.of(new ExactFilter(Map.of("id", "123-42"))), vertx, 500), + true), + Arguments.of( + event, + new AnyFilter( + ImmutableList.of( + new ExactFilter(Map.of("id", "123-42")), + new ExactFilter(Map.of("source", "/api/some-source"))), + vertx, + 500), true), Arguments.of( event, - new AnyFilter(List.of( - new ExactFilter(Map.of("id", "123")), - new ExactFilter(Map.of("source", "/api/some-source")))), + new AnyFilter( + ImmutableList.of( + new ExactFilter(Map.of("id", "123")), + new ExactFilter(Map.of("source", "/api/some-source"))), + vertx, + 500), true), Arguments.of( event, - new AnyFilter(List.of( - new ExactFilter(Map.of("id", "123-42")), - new ExactFilter(Map.of("source", "/api/something-else")))), + new AnyFilter( + ImmutableList.of( + new ExactFilter(Map.of("id", "123-42")), + new ExactFilter(Map.of("source", "/api/something-else"))), + vertx, + 500), true), Arguments.of( event, - new AnyFilter(List.of( - new ExactFilter(Map.of("id", "123")), - new ExactFilter(Map.of("source", "/api/something-else")))), + new AnyFilter( + ImmutableList.of( + new ExactFilter(Map.of("id", "123")), + new ExactFilter(Map.of("source", "/api/something-else"))), + vertx, + 500), false), - Arguments.of(event, new AnyFilter(Collections.emptyList()), false)); + Arguments.of(event, new AnyFilter(ImmutableList.of(), vertx, 500), false)); } }