-
Notifications
You must be signed in to change notification settings - Fork 140
Added filter order optimizations #3389
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from 1 commit
38088f7
6a4af89
3958150
a4792a8
536a774
573874c
0fbd558
35bc707
afaffb0
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,40 @@ | ||
| /* | ||
| * Copyright © 2018 Knative Authors ([email protected]) | ||
| * | ||
| * Licensed under the Apache License, Version 2.0 (the "License"); | ||
| * you may not use this file except in compliance with the License. | ||
| * You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
| package dev.knative.eventing.kafka.broker.dispatcher.impl.filter.subscriptionsapi; | ||
|
|
||
| import dev.knative.eventing.kafka.broker.dispatcher.Filter; | ||
|
|
||
| public class FilterCounter { | ||
| private final Filter filter; | ||
| private int count; | ||
|
|
||
| public FilterCounter(Filter filter) { | ||
| this.filter = filter; | ||
| this.count = 0; | ||
| } | ||
|
|
||
| public Filter getFilter() { | ||
| return filter; | ||
| } | ||
|
|
||
| public int getCount() { | ||
| return count; | ||
| } | ||
|
|
||
| public int incrementCount() { | ||
| return this.count++; | ||
| } | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,71 @@ | ||
| /* | ||
| * Copyright © 2018 Knative Authors ([email protected]) | ||
| * | ||
| * Licensed under the Apache License, Version 2.0 (the "License"); | ||
| * you may not use this file except in compliance with the License. | ||
| * You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
| package dev.knative.eventing.kafka.broker.dispatcher.impl.filter.subscriptionsapi; | ||
|
|
||
| import java.util.Collections; | ||
| import java.util.List; | ||
| import java.util.concurrent.ArrayBlockingQueue; | ||
| import java.util.concurrent.locks.ReadWriteLock; | ||
| import org.slf4j.Logger; | ||
|
|
||
| public class FilterListOptimizer extends Thread { | ||
|
||
| private final ReadWriteLock readWriteLock; | ||
|
|
||
| private final ArrayBlockingQueue<Integer> indexSwapQueue; | ||
|
|
||
| private final List<FilterCounter> filters; | ||
|
|
||
| private final Logger logger; | ||
|
|
||
| public FilterListOptimizer( | ||
| ReadWriteLock readWriteLock, | ||
| ArrayBlockingQueue<Integer> indexSwapQueue, | ||
| List<FilterCounter> filters, | ||
| Logger logger) { | ||
| this.filters = filters; | ||
| this.indexSwapQueue = indexSwapQueue; | ||
| this.readWriteLock = readWriteLock; | ||
| this.logger = logger; | ||
| } | ||
|
|
||
| @Override | ||
| public void run() { | ||
| while (true) { | ||
| if (Thread.interrupted()) { | ||
| return; | ||
| } | ||
| try { | ||
| this.readWriteLock.readLock().lock(); | ||
| final int swapIndex = | ||
| this.indexSwapQueue.take(); // this is the only line that throws InterruptedException | ||
| if (swapIndex != 0 | ||
| && this.filters.get(swapIndex).incrementCount() | ||
| > 2 * this.filters.get(swapIndex - 1).getCount()) { | ||
| new Thread(() -> { | ||
| this.readWriteLock.writeLock().lock(); | ||
| Collections.swap(this.filters, swapIndex - 1, swapIndex); | ||
| this.readWriteLock.writeLock().unlock(); | ||
| }) | ||
| .start(); | ||
| } | ||
| this.readWriteLock.readLock().unlock(); | ||
| } catch (InterruptedException e) { | ||
| logger.debug("Filter optimizer thread was interrupted", e); | ||
| this.readWriteLock.readLock().unlock(); | ||
| } | ||
| } | ||
| } | ||
| } | ||
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Generally, Vertx doesn't like locks and blocking operations, I'm pretty sure with this implementation we will block the event loop and that causes basically to block any event delivery for a particular trigger