Skip to content

Commit

Permalink
Move bulkhead internal invoke as general api to promesa.exec
Browse files Browse the repository at this point in the history
  • Loading branch information
niwinz committed Dec 5, 2023
1 parent 99a7253 commit 394c494
Show file tree
Hide file tree
Showing 2 changed files with 117 additions and 127 deletions.
223 changes: 109 additions & 114 deletions src/promesa/exec.cljc
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,6 @@

#?(:clj (set! *warn-on-reflection* true))

;; --- Globals & Defaults (with CLJS Impl)

(declare scheduled-executor)
(declare current-thread-executor)

Expand Down Expand Up @@ -231,15 +229,114 @@
(finally
(pop-thread-bindings))))))))

;; --- Public API

#?(:clj
(defn thread-factory?
"Checks if `o` is an instance of ThreadFactory"
[o]
(instance? ThreadFactory o)))

#?(:clj
(def ^{:no-doc true :dynamic true}
*default-counter*
(AtomicLong. 0)))

#?(:clj
(defn get-next
"Get next value from atomic long counter"
{:no-doc true}
([] (.getAndIncrement ^AtomicLong *default-counter*))
([counter] (.getAndIncrement ^AtomicLong counter))))

#?(:clj
(defn thread-factory
"Create a new thread factory instance"
[& {:keys [name daemon priority prefix virtual]}]
(pu/with-compile-cond virtual-threads-available?
(if virtual
(let [thb (Thread/ofVirtual)
thb (cond
(string? name) (.name thb ^String name)
(string? prefix) (.name thb ^String prefix 0)
:else thb)]
(.factory thb))

(let [thb (Thread/ofPlatform)
thb (if (some? priority)
(.priority thb (int priority))
thb)
thb (if (some? daemon)
(.daemon thb ^Boolean daemon)
(.daemon thb true))
thb (cond
(string? name) (.name thb ^String name)
(string? prefix) (.name thb ^String prefix 0)
:else thb)]
(.factory thb)))

(let [counter (AtomicLong. 0)]
(reify ThreadFactory
(newThread [this runnable]
(let [thr (Thread. ^Runnable runnable)]
(when (some? priority)
(.setPriority thr (int priority)))
(when (some? daemon)
(.setDaemon thr ^Boolean daemon))
(when (string? name)
(.setName thr ^String name))
(when (string? prefix)
(.setName thr (str prefix (get-next counter))))
thr)))))))

#?(:clj
(defonce default-thread-factory
(delay (thread-factory :prefix "promesa/platform/"))))

#?(:clj
(defonce default-vthread-factory
(delay (thread-factory :prefix "promesa/virtual/" :virtual true))))

#?(:clj
(defn resolve-thread-factory
{:no-doc true}
^ThreadFactory
[tf]
(cond
(nil? tf) nil
(thread-factory? tf) tf
(= :default tf) @default-thread-factory
(= :platform tf) @default-thread-factory
(= :virtual tf) @default-vthread-factory
(map? tf) (thread-factory tf)
(fn? tf) (reify ThreadFactory
(^Thread newThread [_ ^Runnable runnable]
(tf runnable)))
:else (throw (ex-info "Invalid thread factory" {})))))

#?(:clj
(defn- options->thread-factory
{:no-doc true}
(^ThreadFactory [options]
(resolve-thread-factory
(or (:thread-factory options)
(:factory options))))

(^ThreadFactory [options default]
(resolve-thread-factory
(or (:thread-factory options)
(:factory options)
default)))))

;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; PUBLIC API
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;

(defn exec!
"Run the task in the provided executor, returns `nil`. Analogous to
the `(.execute executor f)`. Fire and forget.
Exception unsafe, can raise exceptions if the executor
rejects the task."

([f]
(let [f (if (fn? f) (wrap-bindings f) f)]
(pt/-exec! (resolve-executor *default-executor*) f)))
Expand Down Expand Up @@ -287,6 +384,12 @@
([scheduler ms f]
(pt/-schedule! (resolve-scheduler scheduler) ms f)))

(defn invoke!
"Invoke a function to be executed in the provided executor
or the default one, and waits for the result. Useful for using
in virtual threads."
([f] (pt/-await! (submit! f)))
([executor f] (pt/-await! (submit! executor f))))

(defn- rejected
[v]
Expand All @@ -295,19 +398,6 @@
(.completeExceptionally ^CompletableFuture p v)
p)))

(defn exec
"Exception safe version of `exec!`. It always returns an promise instance."
([f]
(try
(exec! f)
(catch #?(:clj Throwable :cljs :default) cause
(rejected cause))))
([executor f]
(try
(exec! executor f)
(catch #?(:clj Throwable :cljs :default) cause
(rejected cause)))))

(defn run
"Exception safe version of `run!`. It always returns an promise instance."
([f]
Expand Down Expand Up @@ -349,63 +439,6 @@

;; --- Pool & Thread Factories

#?(:clj
(defn thread-factory?
"Checks if `o` is an instance of ThreadFactory"
[o]
(instance? ThreadFactory o)))

#?(:clj
(def ^{:no-doc true :dynamic true}
*default-counter*
(AtomicLong. 0)))

#?(:clj
(defn get-next
"Get next value from atomic long counter"
{:no-doc true}
([] (.getAndIncrement ^AtomicLong *default-counter*))
([counter] (.getAndIncrement ^AtomicLong counter))))

#?(:clj
(defn thread-factory
"Create a new thread factory instance"
[& {:keys [name daemon priority prefix virtual]}]
(pu/with-compile-cond virtual-threads-available?
(if virtual
(let [thb (Thread/ofVirtual)
thb (cond
(string? name) (.name thb ^String name)
(string? prefix) (.name thb ^String prefix 0)
:else thb)]
(.factory thb))

(let [thb (Thread/ofPlatform)
thb (if (some? priority)
(.priority thb (int priority))
thb)
thb (if (some? daemon)
(.daemon thb ^Boolean daemon)
(.daemon thb true))
thb (cond
(string? name) (.name thb ^String name)
(string? prefix) (.name thb ^String prefix 0)
:else thb)]
(.factory thb)))

(let [counter (AtomicLong. 0)]
(reify ThreadFactory
(newThread [this runnable]
(let [thr (Thread. ^Runnable runnable)]
(when (some? priority)
(.setPriority thr (int priority)))
(when (some? daemon)
(.setDaemon thr ^Boolean daemon))
(when (string? name)
(.setName thr ^String name))
(when (string? prefix)
(.setName thr (str prefix (get-next counter))))
thr)))))))

#?(:clj
(defn forkjoin-thread-factory
Expand All @@ -420,45 +453,6 @@
(.setDaemon ^ForkJoinWorkerThread thread ^Boolean daemon)
thread))))))

#?(:clj
(defonce default-thread-factory
(delay (thread-factory :prefix "promesa/platform/"))))

#?(:clj
(defonce default-vthread-factory
(delay (thread-factory :prefix "promesa/virtual/" :virtual true))))

#?(:clj
(defn resolve-thread-factory
{:no-doc true}
^ThreadFactory
[tf]
(cond
(nil? tf) nil
(thread-factory? tf) tf
(= :default tf) @default-thread-factory
(= :platform tf) @default-thread-factory
(= :virtual tf) @default-vthread-factory
(map? tf) (thread-factory tf)
(fn? tf) (reify ThreadFactory
(^Thread newThread [_ ^Runnable runnable]
(tf runnable)))
:else (throw (ex-info "Invalid thread factory" {})))))

#?(:clj
(defn- options->thread-factory
{:no-doc true}
(^ThreadFactory [options]
(resolve-thread-factory
(or (:thread-factory options)
(:factory options))))

(^ThreadFactory [options default]
(resolve-thread-factory
(or (:thread-factory options)
(:factory options)
default)))))

#?(:clj
(defn cached-executor
"A cached thread executor pool constructor."
Expand Down Expand Up @@ -532,7 +526,8 @@
#?(:clj
(reify
Executor
(^void execute [_ ^Runnable f] (.run f)))
(^void execute [_ ^Runnable f]
(.run f)))

:cljs
(reify
Expand Down
21 changes: 8 additions & 13 deletions src/promesa/exec/bulkhead.clj
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,7 @@

(defprotocol IBulkhead
"Bulkhead main API"
(-get-stats [_] "Get internal statistics of the bulkhead instance")
(-invoke! [_ f] "Call synchronously a function under bulkhead context"))
(-get-stats [_] "Get internal statistics of the bulkhead instance"))

(extend-type BlockingQueue
IQueue
Expand Down Expand Up @@ -82,9 +81,6 @@
:max-permits max-permits
:max-queue max-queue}))

(-invoke! [this f]
(p/await! (pt/-submit! this (px/wrap-bindings f))))

Executor
(execute [this f]
(log! "cmd:" "Bulkhead/execute" "f:" (hash f))
Expand Down Expand Up @@ -166,10 +162,7 @@
:timeout timeout}]
(throw (ex-info "bulkhead: timeout" props))))
(finally
(.decrementAndGet counter)))))

(-invoke! [this f]
(p/await! (pt/-submit! this f))))
(.decrementAndGet counter))))))

(ns-unmap *ns* '->SemaphoreBulkhead)
(ns-unmap *ns* 'map->SemaphoreBulkhead)
Expand Down Expand Up @@ -204,14 +197,16 @@
:semaphore (create-with-semaphore params)
(throw (UnsupportedOperationException. "invalid bulkhead type provided"))))

(defn invoke!
{:no-doc true
:deprecated true}
[instance f]
(px/invoke! instance f))

(defn get-stats
[instance]
(-get-stats instance))

(defn invoke!
[instance f]
(-invoke! instance f))

(defn bulkhead?
"Check if the provided object is instance of Bulkhead type."
[o]
Expand Down

0 comments on commit 394c494

Please sign in to comment.