diff --git a/CHANGELOG.md b/CHANGELOG.md index e63b660..c01adba 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,15 @@ # Changelog # + +## Version 9.1.538 + +- Add `px/current-thread` helper function +- Add `px/thread-interrupted?` helper function +- Add `px/interrupt-thread!` helper function +- Add `px/join!` helper function +- Add `px/thread-id` helper function + + ## Version 9.1.536 - Assign default parallelism to scheduled executor (based on CPUs). diff --git a/README.md b/README.md index 08e173a..f77778d 100644 --- a/README.md +++ b/README.md @@ -16,13 +16,13 @@ Here you can look a detailed [documentation][1]. deps.edn: ```clojure -funcool/promesa {:mvn/version "9.1.536"} +funcool/promesa {:mvn/version "9.1.538"} ``` Leiningen: ```clojure -[funcool/promesa "9.1.536"] +[funcool/promesa "9.1.538"] ``` ## On the REPL diff --git a/deps.edn b/deps.edn index c44db91..47f7c34 100644 --- a/deps.edn +++ b/deps.edn @@ -5,6 +5,7 @@ {com.bhauman/rebel-readline-cljs {:mvn/version "RELEASE"}, com.bhauman/rebel-readline {:mvn/version "RELEASE"}, org.clojure/tools.namespace {:mvn/version "RELEASE"}, + org.clojure/core.async {:mvn/version "1.6.673"} criterium/criterium {:mvn/version "RELEASE"} thheller/shadow-cljs {:mvn/version "RELEASE"}} :extra-paths ["test" "dev" "src"]}, diff --git a/dev/user.clj b/dev/user.clj index 1776879..5b3f96b 100644 --- a/dev/user.clj +++ b/dev/user.clj @@ -4,6 +4,7 @@ [clojure.test :as test] [clojure.tools.namespace.repl :as r] [clojure.walk :refer [macroexpand-all]] + [clojure.core.async :as a] [criterium.core :refer [quick-bench bench with-progress-reporting]] [promesa.core :as p] [promesa.exec :as px] @@ -14,7 +15,8 @@ (:import java.util.concurrent.CompletableFuture java.util.concurrent.CompletionStage - java.util.function.Function)) + java.util.function.Function + java.util.concurrent.atomic.AtomicLong)) (defmacro run-quick-bench [& exprs] diff --git a/scripts/repl b/scripts/repl index 8e41dd2..5361ec2 100755 --- a/scripts/repl +++ b/scripts/repl @@ -2,4 +2,4 @@ set -ex # clojure -M:dev -e "(set! *warn-on-reflection* true)" -m rebel-readline.main # clojure -M:dev -m rebel-readline.main -clojure -J--enable-preview -M:dev -m rebel-readline.main +clojure -J-Djdk.tracePinnedThreads=short -J-Djdk.virtualThreadScheduler.parallelism=16 -J--enable-preview -M:dev -m rebel-readline.main diff --git a/src/promesa/exec.cljc b/src/promesa/exec.cljc index 461037b..179519b 100644 --- a/src/promesa/exec.cljc +++ b/src/promesa/exec.cljc @@ -14,6 +14,7 @@ (:import clojure.lang.Var java.lang.AutoCloseable + java.time.Duration java.util.concurrent.Callable java.util.concurrent.CompletableFuture java.util.concurrent.Executor @@ -32,7 +33,6 @@ #?(:clj (set! *warn-on-reflection* true)) - ;; --- Globals & Defaults (with CLJS Impl) (declare #?(:clj scheduled-executor :cljs ->ScheduledExecutor)) @@ -223,7 +223,16 @@ (^Thread newThread [_ ^Runnable runnable] (func runnable))))) -#?(:clj (def ^{:no-doc true} counter (AtomicLong. 0))) +#?(:clj (def ^{:no-doc true :dynamic true} + *default-counter* + (AtomicLong. 0))) + +#?(:clj + (defn get-next + "Get next value from atomic long counter" + {:no-doc true} + ([] (.getAndIncrement ^AtomicLong *default-counter*)) + ([counter] (.getAndIncrement ^AtomicLong counter)))) #?(:clj (defn default-thread-factory @@ -232,37 +241,27 @@ :or {daemon true priority Thread/NORM_PRIORITY name "promesa/thread/%s"}}] - (reify ThreadFactory - (newThread [this runnable] - (doto (Thread. ^Runnable runnable) - (.setPriority priority) - (.setDaemon ^Boolean daemon) - (.setName (format name (.getAndIncrement ^AtomicLong counter)))))))) + (let [counter (AtomicLong. 0)] + (reify ThreadFactory + (newThread [this runnable] + (doto (Thread. ^Runnable runnable) + (.setPriority priority) + (.setDaemon ^Boolean daemon) + (.setName (format name (get-next counter))))))))) #?(:clj (defn default-forkjoin-thread-factory ^ForkJoinPool$ForkJoinWorkerThreadFactory [& {:keys [name daemon] :or {name "promesa/forkjoin/%s" daemon true}}] - (let [^AtomicLong counter (AtomicLong. 0)] + (let [counter (AtomicLong. 0)] (reify ForkJoinPool$ForkJoinWorkerThreadFactory (newThread [_ pool] (let [thread (.newThread ForkJoinPool/defaultForkJoinWorkerThreadFactory pool) - tname (format name (.getAndIncrement counter))] + tname (format name (get-next counter))] (.setName ^ForkJoinWorkerThread thread ^String tname) (.setDaemon ^ForkJoinWorkerThread thread ^Boolean daemon) thread)))))) -#?(:clj - (defn- opts->thread-factory - [{:keys [daemon priority] - :or {daemon true priority Thread/NORM_PRIORITY}}] - (fn->thread-factory - (fn [runnable] - (let [thread (Thread. ^Runnable runnable)] - (.setDaemon thread daemon) - (.setPriority thread priority) - thread))))) - #?(:clj (defn- resolve-thread-factory {:no-doc true} @@ -591,4 +590,65 @@ (cons (map first ss) (step-fn (map rest ss)))))))] (pmap #(apply f %) (step-fn (cons coll colls))))))) +#?(:clj + (defmacro thread + "A low-level, not-pooled thread constructor." + [opts & body] + (let [[opts body] (if (map? opts) + [opts body] + [nil (cons opts body)])] + `(let [opts# ~opts + thr# (Thread. (^:once fn* [] ~@body))] + (.setName thr# (or (:name ~opts) (format "promesa/unnamed-thread/%s" (get-next)))) + (.setDaemon thr# (:daemon? ~opts false)) + (.setPriority thr# (:priority ~opts Thread/NORM_PRIORITY)) + (.start thr#) + thr#)))) + +#?(:clj +(defn current-thread + "Return the current thread." + [] + (Thread/currentThread))) +#?(:clj +(defn thread-interrupted? + "Check if the thread has the interrupted flag set. + + There are two special cases: + + Using the `:current` keyword as argument will check the interrupted + flag on the current thread. + + Using the arity 0 (passing no arguments), then the current thread + will be checked and **WARNING** the interrupted flag reset to + `false`." + ([] + (Thread/interrupted)) + ([thread] + (if (= :current thread) + (.isInterrupted (Thread/currentThread)) + (.isInterrupted ^Thread thread))))) + +#?(:clj +(defn thread-id + "Retrieves the thread ID." + ([] + (.getId ^Thread (Thread/currentThread))) + ([^Thread thread] + (.getId thread)))) + +#?(:clj +(defn interrupt-thread! + [^Thread thread] + (.interrupt thread))) + +#?(:clj +(defn join! + "Waits for the specified thread to terminate." + ([^Thread thread] + (.join thread)) + ([^Thread thread duration] + (if (instance? Duration duration) + (.join thread ^Duration duration) + (.join thread (long duration))))))