diff --git a/src/promesa/exec.cljc b/src/promesa/exec.cljc index b359840..5ef3329 100644 --- a/src/promesa/exec.cljc +++ b/src/promesa/exec.cljc @@ -134,22 +134,22 @@ :cljs (satisfies? pt/IExecutor o))) #?(:clj -(defn shutdown! - "Shutdowns the executor service." - [^ExecutorService executor] - (.shutdown executor))) + (defn shutdown! + "Shutdowns the executor service." + [^ExecutorService executor] + (.shutdown executor))) #?(:clj -(defn shutdown-now! - "Shutdowns and interrupts the executor service." - [^ExecutorService executor] - (.shutdownNow executor))) + (defn shutdown-now! + "Shutdowns and interrupts the executor service." + [^ExecutorService executor] + (.shutdownNow executor))) #?(:clj -(defn shutdown? - "Check if execitor is in shutdown state." - [^ExecutorService executor] - (.isShutdown executor))) + (defn shutdown? + "Check if execitor is in shutdown state." + [^ExecutorService executor] + (.isShutdown executor))) (defn resolve-executor {:no-doc true} @@ -563,7 +563,7 @@ (-run! [this f] (-> (pt/-promise nil) (pt/-fmap (fn [_] - (try (f) (catch :default _ nil)))) + (try (f) (catch :default _ nil)))) (pt/-fmap noop))) (-submit! [this f] @@ -702,34 +702,33 @@ (list (if interrupt? 'promesa.exec/shutdown-now! 'promesa.exec/shutdown!) executor-sym)))))))) #?(:clj -(defn pmap - "Analogous to the `clojure.core/pmap` with the excetion that it allows + (defn pmap + "Analogous to the `clojure.core/pmap` with the excetion that it allows use a custom executor (binded to *default-executor* var) The default clojure chunk size (32) is used for evaluation and the real parallelism is determined by the provided executor. - **EXPERIMENTAL API:** This function should be considered EXPERIMENTAL and may be changed or removed in future versions until this notification is removed." - {:experimental true} - ([f coll] - (let [executor (resolve-executor *default-executor*) - frame (Var/cloneThreadBindingFrame)] - (->> coll - (map (fn [o] (pt/-submit! executor #(do - (Var/resetThreadBindingFrame frame) - (f o))))) - (clojure.lang.RT/iter) - (clojure.lang.RT/chunkIteratorSeq) - (map (fn [o] (.get ^CompletableFuture o)))))) - ([f coll & colls] - (let [step-fn (fn step-fn [cs] - (lazy-seq - (let [ss (map seq cs)] - (when (every? identity ss) - (cons (map first ss) (step-fn (map rest ss)))))))] - (pmap #(apply f %) (step-fn (cons coll colls))))))) + {:experimental true} + ([f coll] + (let [executor (resolve-executor *default-executor*) + frame (Var/cloneThreadBindingFrame)] + (->> coll + (map (fn [o] (pt/-submit! executor #(do + (Var/resetThreadBindingFrame frame) + (f o))))) + (clojure.lang.RT/iter) + (clojure.lang.RT/chunkIteratorSeq) + (map (fn [o] (.get ^CompletableFuture o)))))) + ([f coll & colls] + (let [step-fn (fn step-fn [cs] + (lazy-seq + (let [ss (map seq cs)] + (when (every? identity ss) + (cons (map first ss) (step-fn (map rest ss)))))))] + (pmap #(apply f %) (step-fn (cons coll colls))))))) #?(:clj (defn fn->thread @@ -743,53 +742,53 @@ thread))) #?(:clj -(defmacro thread - "A low-level, not-pooled thread constructor, it accepts an optional + (defmacro thread + "A low-level, not-pooled thread constructor, it accepts an optional map as first argument and the body. The options map is interepreted as options if a literal map is provided. The available options are: `:name`, `:priority`, `:daemon` and `:virtual`. The `:virtual` option is ignored if you are using a JVM that has no support for Virtual Threads." - [opts & body] - (let [[opts body] (if (map? opts) [opts body] [{} (cons opts body)])] - `(fn->thread (^:once fn* [] ~@body) ~@(mapcat identity opts))))) + [opts & body] + (let [[opts body] (if (map? opts) [opts body] [{} (cons opts body)])] + `(fn->thread (^:once fn* [] ~@body) ~@(mapcat identity opts))))) #?(:clj -(defn thread-call - "Advanced version of `p/thread-call` that creates and starts a thread + (defn thread-call + "Advanced version of `p/thread-call` that creates and starts a thread configured with `opts`. No executor service is used, this will start a plain unpooled thread; returns a non-cancellable promise instance" - [f & {:as opts}] - (let [p (CompletableFuture.)] - (fn->thread #(try - (pt/-resolve! p (f)) - (catch Throwable cause - (pt/-reject! p cause))) - (assoc opts :start true)) - p))) - -#?(:clj -(defn current-thread - "Return the current thread." - [] - (Thread/currentThread))) + [f & {:as opts}] + (let [p (CompletableFuture.)] + (fn->thread #(try + (pt/-resolve! p (f)) + (catch Throwable cause + (pt/-reject! p cause))) + (assoc opts :start true)) + p))) + +#?(:clj + (defn current-thread + "Return the current thread." + [] + (Thread/currentThread))) #?(:clj -(defn set-name! - "Rename thread." - ([name] (set-name! (current-thread) name)) - ([thread name] (.setName ^Thread thread ^String name)))) + (defn set-name! + "Rename thread." + ([name] (set-name! (current-thread) name)) + ([thread name] (.setName ^Thread thread ^String name)))) #?(:clj -(defn get-name - "Retrieve thread name" - ([] (get-name (current-thread))) - ([thread] - (.getName ^Thread thread)))) + (defn get-name + "Retrieve thread name" + ([] (get-name (current-thread))) + ([thread] + (.getName ^Thread thread)))) #?(:clj -(defn interrupted? - "Check if the thread has the interrupted flag set. + (defn interrupted? + "Check if the thread has the interrupted flag set. There are two special cases: @@ -799,62 +798,62 @@ Using the arity 0 (passing no arguments), then the current thread will be checked and **WARNING** the interrupted flag reset to `false`." - ([] - (Thread/interrupted)) - ([thread] - (if (= :current thread) - (.isInterrupted (Thread/currentThread)) - (.isInterrupted ^Thread thread))))) + ([] + (Thread/interrupted)) + ([thread] + (if (= :current thread) + (.isInterrupted (Thread/currentThread)) + (.isInterrupted ^Thread thread))))) #?(:clj -(defn get-thread-id - "Retrieves the thread ID." - ([] - (.getId ^Thread (Thread/currentThread))) - ([^Thread thread] - (.getId thread)))) + (defn get-thread-id + "Retrieves the thread ID." + ([] + (.getId ^Thread (Thread/currentThread))) + ([^Thread thread] + (.getId thread)))) #?(:clj -(defn thread-id - "Retrieves the thread ID." - {:deprecated "11.0"} - ([] - (.getId ^Thread (Thread/currentThread))) - ([^Thread thread] - (.getId thread)))) + (defn thread-id + "Retrieves the thread ID." + {:deprecated "11.0"} + ([] + (.getId ^Thread (Thread/currentThread))) + ([^Thread thread] + (.getId thread)))) #?(:clj -(defn interrupt! - "Interrupt a thread." - ([] - (.interrupt (Thread/currentThread))) - ([^Thread thread] - (.interrupt thread)))) + (defn interrupt! + "Interrupt a thread." + ([] + (.interrupt (Thread/currentThread))) + ([^Thread thread] + (.interrupt thread)))) #?(:clj -(defn thread? - "Check if provided object is a thread instance." - [t] - (instance? Thread t))) + (defn thread? + "Check if provided object is a thread instance." + [t] + (instance? Thread t))) #?(:clj -(defn sleep - "Turn the current thread to sleep accept a number of milliseconds or + (defn sleep + "Turn the current thread to sleep accept a number of milliseconds or Duration instance." - [ms] - (if (instance? Duration ms) - (Thread/sleep (int (.toMillis ^Duration ms))) - (Thread/sleep (int ms))))) - -#?(:clj -(defn throw-uncaught! - "Throw an exception to the current uncaught exception handler." - [cause] - (let [thr (current-thread) - hdl (.getUncaughtExceptionHandler ^Thread thr)] - (.uncaughtException ^Thread$UncaughtExceptionHandler hdl - ^Thread thr - ^Throwable cause)))) + [ms] + (if (instance? Duration ms) + (Thread/sleep (int (.toMillis ^Duration ms))) + (Thread/sleep (int ms))))) + +#?(:clj + (defn throw-uncaught! + "Throw an exception to the current uncaught exception handler." + [cause] + (let [thr (current-thread) + hdl (.getUncaughtExceptionHandler ^Thread thr)] + (.uncaughtException ^Thread$UncaughtExceptionHandler hdl + ^Thread thr + ^Throwable cause)))) #?(:clj (defn structured-task-scope