Skip to content

Commit

Permalink
Reformat promesa.exec ns
Browse files Browse the repository at this point in the history
  • Loading branch information
niwinz committed Dec 5, 2023
1 parent 394c494 commit 2314a97
Showing 1 changed file with 110 additions and 111 deletions.
221 changes: 110 additions & 111 deletions src/promesa/exec.cljc
Original file line number Diff line number Diff line change
Expand Up @@ -134,22 +134,22 @@
:cljs (satisfies? pt/IExecutor o)))

#?(:clj
(defn shutdown!
"Shutdowns the executor service."
[^ExecutorService executor]
(.shutdown executor)))
(defn shutdown!
"Shutdowns the executor service."
[^ExecutorService executor]
(.shutdown executor)))

#?(:clj
(defn shutdown-now!
"Shutdowns and interrupts the executor service."
[^ExecutorService executor]
(.shutdownNow executor)))
(defn shutdown-now!
"Shutdowns and interrupts the executor service."
[^ExecutorService executor]
(.shutdownNow executor)))

#?(:clj
(defn shutdown?
"Check if execitor is in shutdown state."
[^ExecutorService executor]
(.isShutdown executor)))
(defn shutdown?
"Check if execitor is in shutdown state."
[^ExecutorService executor]
(.isShutdown executor)))

(defn resolve-executor
{:no-doc true}
Expand Down Expand Up @@ -563,7 +563,7 @@
(-run! [this f]
(-> (pt/-promise nil)
(pt/-fmap (fn [_]
(try (f) (catch :default _ nil))))
(try (f) (catch :default _ nil))))
(pt/-fmap noop)))

(-submit! [this f]
Expand Down Expand Up @@ -702,34 +702,33 @@
(list (if interrupt? 'promesa.exec/shutdown-now! 'promesa.exec/shutdown!) executor-sym))))))))

#?(:clj
(defn pmap
"Analogous to the `clojure.core/pmap` with the excetion that it allows
(defn pmap
"Analogous to the `clojure.core/pmap` with the excetion that it allows
use a custom executor (binded to *default-executor* var) The default
clojure chunk size (32) is used for evaluation and the real
parallelism is determined by the provided executor.
**EXPERIMENTAL API:** This function should be considered
EXPERIMENTAL and may be changed or removed in future versions until
this notification is removed."
{:experimental true}
([f coll]
(let [executor (resolve-executor *default-executor*)
frame (Var/cloneThreadBindingFrame)]
(->> coll
(map (fn [o] (pt/-submit! executor #(do
(Var/resetThreadBindingFrame frame)
(f o)))))
(clojure.lang.RT/iter)
(clojure.lang.RT/chunkIteratorSeq)
(map (fn [o] (.get ^CompletableFuture o))))))
([f coll & colls]
(let [step-fn (fn step-fn [cs]
(lazy-seq
(let [ss (map seq cs)]
(when (every? identity ss)
(cons (map first ss) (step-fn (map rest ss)))))))]
(pmap #(apply f %) (step-fn (cons coll colls)))))))
{:experimental true}
([f coll]
(let [executor (resolve-executor *default-executor*)
frame (Var/cloneThreadBindingFrame)]
(->> coll
(map (fn [o] (pt/-submit! executor #(do
(Var/resetThreadBindingFrame frame)
(f o)))))
(clojure.lang.RT/iter)
(clojure.lang.RT/chunkIteratorSeq)
(map (fn [o] (.get ^CompletableFuture o))))))
([f coll & colls]
(let [step-fn (fn step-fn [cs]
(lazy-seq
(let [ss (map seq cs)]
(when (every? identity ss)
(cons (map first ss) (step-fn (map rest ss)))))))]
(pmap #(apply f %) (step-fn (cons coll colls)))))))

#?(:clj
(defn fn->thread
Expand All @@ -743,53 +742,53 @@
thread)))

#?(:clj
(defmacro thread
"A low-level, not-pooled thread constructor, it accepts an optional
(defmacro thread
"A low-level, not-pooled thread constructor, it accepts an optional
map as first argument and the body. The options map is interepreted
as options if a literal map is provided. The available options are:
`:name`, `:priority`, `:daemon` and `:virtual`. The `:virtual`
option is ignored if you are using a JVM that has no support for
Virtual Threads."
[opts & body]
(let [[opts body] (if (map? opts) [opts body] [{} (cons opts body)])]
`(fn->thread (^:once fn* [] ~@body) ~@(mapcat identity opts)))))
[opts & body]
(let [[opts body] (if (map? opts) [opts body] [{} (cons opts body)])]
`(fn->thread (^:once fn* [] ~@body) ~@(mapcat identity opts)))))

#?(:clj
(defn thread-call
"Advanced version of `p/thread-call` that creates and starts a thread
(defn thread-call
"Advanced version of `p/thread-call` that creates and starts a thread
configured with `opts`. No executor service is used, this will start
a plain unpooled thread; returns a non-cancellable promise instance"
[f & {:as opts}]
(let [p (CompletableFuture.)]
(fn->thread #(try
(pt/-resolve! p (f))
(catch Throwable cause
(pt/-reject! p cause)))
(assoc opts :start true))
p)))

#?(:clj
(defn current-thread
"Return the current thread."
[]
(Thread/currentThread)))
[f & {:as opts}]
(let [p (CompletableFuture.)]
(fn->thread #(try
(pt/-resolve! p (f))
(catch Throwable cause
(pt/-reject! p cause)))
(assoc opts :start true))
p)))

#?(:clj
(defn current-thread
"Return the current thread."
[]
(Thread/currentThread)))

#?(:clj
(defn set-name!
"Rename thread."
([name] (set-name! (current-thread) name))
([thread name] (.setName ^Thread thread ^String name))))
(defn set-name!
"Rename thread."
([name] (set-name! (current-thread) name))
([thread name] (.setName ^Thread thread ^String name))))

#?(:clj
(defn get-name
"Retrieve thread name"
([] (get-name (current-thread)))
([thread]
(.getName ^Thread thread))))
(defn get-name
"Retrieve thread name"
([] (get-name (current-thread)))
([thread]
(.getName ^Thread thread))))

#?(:clj
(defn interrupted?
"Check if the thread has the interrupted flag set.
(defn interrupted?
"Check if the thread has the interrupted flag set.
There are two special cases:
Expand All @@ -799,62 +798,62 @@
Using the arity 0 (passing no arguments), then the current thread
will be checked and **WARNING** the interrupted flag reset to
`false`."
([]
(Thread/interrupted))
([thread]
(if (= :current thread)
(.isInterrupted (Thread/currentThread))
(.isInterrupted ^Thread thread)))))
([]
(Thread/interrupted))
([thread]
(if (= :current thread)
(.isInterrupted (Thread/currentThread))
(.isInterrupted ^Thread thread)))))

#?(:clj
(defn get-thread-id
"Retrieves the thread ID."
([]
(.getId ^Thread (Thread/currentThread)))
([^Thread thread]
(.getId thread))))
(defn get-thread-id
"Retrieves the thread ID."
([]
(.getId ^Thread (Thread/currentThread)))
([^Thread thread]
(.getId thread))))

#?(:clj
(defn thread-id
"Retrieves the thread ID."
{:deprecated "11.0"}
([]
(.getId ^Thread (Thread/currentThread)))
([^Thread thread]
(.getId thread))))
(defn thread-id
"Retrieves the thread ID."
{:deprecated "11.0"}
([]
(.getId ^Thread (Thread/currentThread)))
([^Thread thread]
(.getId thread))))

#?(:clj
(defn interrupt!
"Interrupt a thread."
([]
(.interrupt (Thread/currentThread)))
([^Thread thread]
(.interrupt thread))))
(defn interrupt!
"Interrupt a thread."
([]
(.interrupt (Thread/currentThread)))
([^Thread thread]
(.interrupt thread))))

#?(:clj
(defn thread?
"Check if provided object is a thread instance."
[t]
(instance? Thread t)))
(defn thread?
"Check if provided object is a thread instance."
[t]
(instance? Thread t)))

#?(:clj
(defn sleep
"Turn the current thread to sleep accept a number of milliseconds or
(defn sleep
"Turn the current thread to sleep accept a number of milliseconds or
Duration instance."
[ms]
(if (instance? Duration ms)
(Thread/sleep (int (.toMillis ^Duration ms)))
(Thread/sleep (int ms)))))

#?(:clj
(defn throw-uncaught!
"Throw an exception to the current uncaught exception handler."
[cause]
(let [thr (current-thread)
hdl (.getUncaughtExceptionHandler ^Thread thr)]
(.uncaughtException ^Thread$UncaughtExceptionHandler hdl
^Thread thr
^Throwable cause))))
[ms]
(if (instance? Duration ms)
(Thread/sleep (int (.toMillis ^Duration ms)))
(Thread/sleep (int ms)))))

#?(:clj
(defn throw-uncaught!
"Throw an exception to the current uncaught exception handler."
[cause]
(let [thr (current-thread)
hdl (.getUncaughtExceptionHandler ^Thread thr)]
(.uncaughtException ^Thread$UncaughtExceptionHandler hdl
^Thread thr
^Throwable cause))))

#?(:clj
(defn structured-task-scope
Expand Down

0 comments on commit 2314a97

Please sign in to comment.