diff --git a/CHANGELOG.md b/CHANGELOG.md index 3b2a81a..5ede005 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,11 @@ # Changelog # +## Version 9.0.489 + +Date: 2022-10-18 + +- Minor fixes on concurrency limiter hook fns. + ## Version 9.0.488 diff --git a/doc/user-guide.md b/doc/user-guide.md index b5ca9db..aa0cd5d 100644 --- a/doc/user-guide.md +++ b/doc/user-guide.md @@ -9,13 +9,13 @@ A promise library for Clojure and ClojureScript. Leiningen: ```clojure -[funcool/promesa "9.0.488"] +[funcool/promesa "9.0.489"] ``` deps.edn: ```clojure -funcool/promesa {:mvn/version "9.0.488"} +funcool/promesa {:mvn/version "9.0.489"} ``` On the JVM platform _promesa_ is built on top of *completable futures* diff --git a/src/promesa/exec.cljc b/src/promesa/exec.cljc index 6732c55..cb037f5 100644 --- a/src/promesa/exec.cljc +++ b/src/promesa/exec.cljc @@ -475,15 +475,12 @@ #?(:clj (defn concurrency-limiter "Create an instance of concurrencly limiter. EXPERIMENTAL" - [& {:keys [executor concurrency queue-size on-run on-poll on-queue] - :or {executor *default-executor* - concurrency 1 - queue-size Integer/MAX_VALUE}}] - (let [^ExecutorService executor (resolve-executor executor)] + [& {:keys [executor concurrency queue-size on-run on-queue] + :or {concurrency 1 queue-size Integer/MAX_VALUE}}] + (let [^ExecutorService executor (resolve-executor (or executor *default-executor*))] (doto (ConcurrencyLimiter. executor (int concurrency) (int queue-size)) (cond-> (fn? on-queue) (.setOnQueueCallback on-queue)) - (cond-> (fn? on-run) (.setOnRunCallback on-run)) - (cond-> (fn? on-poll) (.setOnPollCallback on-poll)))))) + (cond-> (fn? on-run) (.setOnRunCallback on-run)))))) #?(:clj (extend-type ConcurrencyLimiter diff --git a/src/promesa/exec/ConcurrencyLimiter.java b/src/promesa/exec/ConcurrencyLimiter.java index 0f0706d..a8eb37f 100644 --- a/src/promesa/exec/ConcurrencyLimiter.java +++ b/src/promesa/exec/ConcurrencyLimiter.java @@ -34,13 +34,6 @@ import java.time.Instant; public class ConcurrencyLimiter extends AFn implements IObj { - final private Keyword KW_QUEUE = Keyword.intern("queue"); - final private Keyword KW_STATISTICS = Keyword.intern("statistics"); - final private Keyword KW_CURRENT_QUEUE_SIZE = Keyword.intern("current-queue-size"); - final private Keyword KW_CURRENT_CONCURRENCY = Keyword.intern("current-concurrency"); - final private Keyword KW_REMAINING_QUEUE_SIZE = Keyword.intern("remaining-queue-size"); - final private Keyword KW_REMAINING_CONCURRENCY = Keyword.intern("remaining-concurrency"); - private final BlockingQueue queue; private final ExecutorService executor; private final Semaphore limit; @@ -50,7 +43,6 @@ public class ConcurrencyLimiter extends AFn implements IObj { protected IFn onQueueCallback; protected IFn onRunCallback; - protected IFn onPollCallback; public ConcurrencyLimiter(final ExecutorService executor, final int maxConcurrency, @@ -70,10 +62,6 @@ public void setOnRunCallback(IFn f) { this.onRunCallback = f; } - public void setOnPollCallback(IFn f) { - this.onPollCallback = f; - } - public IObj withMeta(IPersistentMap meta) { this.metadata = meta; return this; @@ -95,7 +83,7 @@ public CompletableFuture invoke(Object arg1) { } if (this.onQueueCallback != null) { - this.onQueueCallback.invoke(); + this.onQueueCallback.invoke(this); } this.executor.submit((Runnable)this); @@ -155,15 +143,6 @@ public void run() { this.executor.submit(task); } } - - if (this.onRunCallback != null) { - var stats = PersistentArrayMap.EMPTY - .assoc(KW_CURRENT_CONCURRENCY, this.getCurrentConcurrency()) - .assoc(KW_CURRENT_QUEUE_SIZE, this.getCurrentQueueSize()) - .assoc(KW_REMAINING_CONCURRENCY, this.getRemainingConcurrency()) - .assoc(KW_REMAINING_QUEUE_SIZE, this.getRemainingQueueSize()); - this.onRunCallback.invoke(stats); - } } private static class Task implements Runnable { @@ -187,8 +166,8 @@ public boolean isCancelled() { @SuppressWarnings("unchecked") public void run() { - if (this.limiter.onPollCallback != null) { - this.limiter.onPollCallback.invoke(this.createdAt); + if (this.limiter.onRunCallback != null) { + this.limiter.onRunCallback.invoke(this.limiter, this.createdAt); } final CompletionStage future; @@ -217,7 +196,7 @@ public void run() { } } - public static class CapacityLimitReachedException extends RuntimeException{ + public static class CapacityLimitReachedException extends RuntimeException { public CapacityLimitReachedException(String msg) { super(msg); }