Skip to content

Commit

Permalink
Virtual Future Applicative Engine (#31)
Browse files Browse the repository at this point in the history
* Remove all `clojure.core` private function invocations

* Add eval-key-channel for virtual future engine

* applicative engine for virtual futures

* conditionally require ns

* fun with namespaces

* linter

* delete the test

* changelog

---------

Co-authored-by: Alex Redington <[email protected]>
Co-authored-by: Sophia Velten de Melo <[email protected]>
  • Loading branch information
3 people authored Jun 20, 2024
1 parent 9fd3c41 commit 49bc3e1
Show file tree
Hide file tree
Showing 8 changed files with 87 additions and 9 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
# Changelog

## 1.18.0 / 2024-06-20
- Add virtual-future based applicative engine (for java21+)

## 1.17.0 / 2024-05-23
- Better and more consistent error messages when there is a missing key in an environment.
- Add new engine :async.virtual-futures that uses Java 21 support for virtual threads
Expand Down
2 changes: 1 addition & 1 deletion project.clj
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
(defproject dev.nu/nodely "1.17.0"
(defproject dev.nu/nodely "1.18.0"
:description "Decoupling data fetching from data dependency declaration"
:url "https://github.com/nubank/nodely"
:license {:name "MIT"}
Expand Down
12 changes: 9 additions & 3 deletions src/nodely/api/v0.clj
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,15 @@
;; Java 21 Virtual Threads Support
(try (import java.util.concurrent.ThreadPerTaskExecutor)
(require '[nodely.engine.virtual-workers])
(alter-var-root #'engine-data
assoc :async.virtual-futures {::ns (find-ns 'nodely.engine.virtual-workers)
::opts-fn (constantly nil)})
(require '[nodely.engine.applicative.virtual-future :as applicative.virtual-future])
(alter-var-root #'engine-data assoc
:async.virtual-futures {::ns (find-ns 'nodely.engine.virtual-workers)
::opts-fn (constantly nil)
::eval-key-channel true}
:applicative.virtual-future {::ns (find-ns 'nodely.engine.applicative)
::opts-fn #(assoc % ::applicative/context
(var-get (resolve 'nodely.engine.applicative.virtual-future/context)))
::eval-key-channel false})
(catch Exception e e))
;; End Virtual Threads

Expand Down
10 changes: 9 additions & 1 deletion src/nodely/engine/applicative.clj
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
(:require
[cats.context :as context]
[cats.core :as m]
[clojure.core.async :as async]
[clojure.pprint :as pp]
[nodely.data :as data]
[nodely.engine.applicative.promesa :as promesa]
Expand Down Expand Up @@ -97,7 +98,14 @@
lazy-env (lazy-env/lazy-env env eval-in-context opts)]
(m/fmap ::data/value (get lazy-env k)))))

(def eval-key-channel eval-key-contextual)
(defn eval-key-channel
([env k]
(eval-key-channel env k {}))
([env k opts]
(let [contextual-v (eval-key-contextual env k opts)
chan (async/promise-chan)]
(m/fmap (partial async/put! chan) contextual-v)
chan)))

(defn eval
([env k]
Expand Down
47 changes: 47 additions & 0 deletions src/nodely/engine/applicative/virtual_future.clj
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
(ns nodely.engine.applicative.virtual-future
(:require
[cats.protocols :as mp]
[nodely.engine.applicative.protocols :as protocols]
[nodely.engine.virtual-future :as virtual-future :refer [vfuture]])
(:import
nodely.engine.virtual_future.GreenFuture))

(declare context)

(extend-type GreenFuture
mp/Contextual
(-get-context [_] context)

mp/Extract
(-extract [it]
(try (deref it)
(catch java.util.concurrent.ExecutionException e
(throw (.getCause e))))))

(def context
(reify
mp/Context
protocols/RunNode
(-apply-fn [_ f mv]
(vfuture (f (deref mv))))

mp/Functor
(-fmap [mn f mv]
(vfuture (f (deref mv))))

mp/Monad
(-mreturn [_ v]
(vfuture v))

(-mbind [mn mv f]
(vfuture (let [v (deref mv)]
(deref (f v)))))

mp/Applicative
(-pure [_ v]
(vfuture v))

(-fapply [_ pf pv]
(vfuture (let [f (deref pf)
v (deref pv)]
(f v))))))
13 changes: 9 additions & 4 deletions src/nodely/engine/virtual_future.clj
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
(ns nodely.engine.virtual-future
(:import
[clojure.lang IBlockingDeref IDeref IPending]
[clojure.lang IBlockingDeref IDeref IPending Var]
[java.util.concurrent ExecutorService Executors Future]))

(def ^:private virtual-thread-factory
Expand All @@ -12,10 +12,12 @@

(deftype GreenFuture [^Future executor-future]
IDeref
(deref [_] (#'clojure.core/deref-future executor-future))
(deref [_] (.get executor-future))
IBlockingDeref
(deref [_ timeout-ms timeout-val]
(#'clojure.core/deref-future executor-future timeout-ms timeout-val))
(try (.get executor-future timeout-ms java.util.concurrent.TimeUnit/MILLISECONDS)
(catch java.util.concurrent.TimeoutException e
timeout-val)))
IPending
(isRealized [_] (.isDone executor-future))
Future
Expand All @@ -29,7 +31,10 @@
"This function is similar to the clojure.core future-call. The only difference is that it receives as a param a dynamic
executor-service instead of using clojure.lang.Agent/soloExecutor"
[^ExecutorService executor-service f]
(let [f (#'clojure.core/binding-conveyor-fn f)
(let [binds (Var/getThreadBindingFrame)
f (fn []
(Var/resetThreadBindingFrame binds)
(f))
fut (.submit executor-service ^Callable f)]
(GreenFuture. fut)))

Expand Down
8 changes: 8 additions & 0 deletions src/nodely/engine/virtual_workers.clj
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
(ns nodely.engine.virtual-workers
(:refer-clojure :exclude [eval])
(:require
[clojure.core.async :as async]
[loom.alg :as alg]
[nodely.data :as data]
[nodely.engine.core :as core]
Expand Down Expand Up @@ -64,3 +65,10 @@
(defn eval-node
[node env]
(eval-key (assoc env ::target node) ::target))

(defn eval-key-channel
[env k]
(let [ret (async/chan 1)]
(virtual-future/vfuture
(async/>!! ret (eval-key env k)))
ret))
1 change: 1 addition & 0 deletions test/nodely/api_test.clj
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
:async.manifold
:applicative.promesa
:applicative.core-async
:applicative.virtual-future
:async.virtual-futures})

(defn channel-interface
Expand Down

0 comments on commit 49bc3e1

Please sign in to comment.