From 49bc3e1f1466843fedbc43d14e29ba16b6b9cad8 Mon Sep 17 00:00:00 2001 From: Alex Redington Date: Thu, 20 Jun 2024 15:12:49 -0400 Subject: [PATCH] Virtual Future Applicative Engine (#31) * Remove all `clojure.core` private function invocations * Add eval-key-channel for virtual future engine * applicative engine for virtual futures * conditionally require ns * fun with namespaces * linter * delete the test * changelog --------- Co-authored-by: Alex Redington Co-authored-by: Sophia Velten de Melo --- CHANGELOG.md | 3 ++ project.clj | 2 +- src/nodely/api/v0.clj | 12 +++-- src/nodely/engine/applicative.clj | 10 +++- .../engine/applicative/virtual_future.clj | 47 +++++++++++++++++++ src/nodely/engine/virtual_future.clj | 13 +++-- src/nodely/engine/virtual_workers.clj | 8 ++++ test/nodely/api_test.clj | 1 + 8 files changed, 87 insertions(+), 9 deletions(-) create mode 100644 src/nodely/engine/applicative/virtual_future.clj diff --git a/CHANGELOG.md b/CHANGELOG.md index 2c25abb..fa32e4a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,8 @@ # Changelog +## 1.18.0 / 2024-06-20 +- Add virtual-future based applicative engine (for java21+) + ## 1.17.0 / 2024-05-23 - Better and more consistent error messages when there is a missing key in an environment. - Add new engine :async.virtual-futures that uses Java 21 support for virtual threads diff --git a/project.clj b/project.clj index 35b454b..35611fe 100644 --- a/project.clj +++ b/project.clj @@ -1,4 +1,4 @@ -(defproject dev.nu/nodely "1.17.0" +(defproject dev.nu/nodely "1.18.0" :description "Decoupling data fetching from data dependency declaration" :url "https://github.com/nubank/nodely" :license {:name "MIT"} diff --git a/src/nodely/api/v0.clj b/src/nodely/api/v0.clj index 97ca1e4..40eaa17 100644 --- a/src/nodely/api/v0.clj +++ b/src/nodely/api/v0.clj @@ -53,9 +53,15 @@ ;; Java 21 Virtual Threads Support (try (import java.util.concurrent.ThreadPerTaskExecutor) (require '[nodely.engine.virtual-workers]) - (alter-var-root #'engine-data - assoc :async.virtual-futures {::ns (find-ns 'nodely.engine.virtual-workers) - ::opts-fn (constantly nil)}) + (require '[nodely.engine.applicative.virtual-future :as applicative.virtual-future]) + (alter-var-root #'engine-data assoc + :async.virtual-futures {::ns (find-ns 'nodely.engine.virtual-workers) + ::opts-fn (constantly nil) + ::eval-key-channel true} + :applicative.virtual-future {::ns (find-ns 'nodely.engine.applicative) + ::opts-fn #(assoc % ::applicative/context + (var-get (resolve 'nodely.engine.applicative.virtual-future/context))) + ::eval-key-channel false}) (catch Exception e e)) ;; End Virtual Threads diff --git a/src/nodely/engine/applicative.clj b/src/nodely/engine/applicative.clj index a28106a..7f771db 100644 --- a/src/nodely/engine/applicative.clj +++ b/src/nodely/engine/applicative.clj @@ -3,6 +3,7 @@ (:require [cats.context :as context] [cats.core :as m] + [clojure.core.async :as async] [clojure.pprint :as pp] [nodely.data :as data] [nodely.engine.applicative.promesa :as promesa] @@ -97,7 +98,14 @@ lazy-env (lazy-env/lazy-env env eval-in-context opts)] (m/fmap ::data/value (get lazy-env k))))) -(def eval-key-channel eval-key-contextual) +(defn eval-key-channel + ([env k] + (eval-key-channel env k {})) + ([env k opts] + (let [contextual-v (eval-key-contextual env k opts) + chan (async/promise-chan)] + (m/fmap (partial async/put! chan) contextual-v) + chan))) (defn eval ([env k] diff --git a/src/nodely/engine/applicative/virtual_future.clj b/src/nodely/engine/applicative/virtual_future.clj new file mode 100644 index 0000000..fffb43e --- /dev/null +++ b/src/nodely/engine/applicative/virtual_future.clj @@ -0,0 +1,47 @@ +(ns nodely.engine.applicative.virtual-future + (:require + [cats.protocols :as mp] + [nodely.engine.applicative.protocols :as protocols] + [nodely.engine.virtual-future :as virtual-future :refer [vfuture]]) + (:import + nodely.engine.virtual_future.GreenFuture)) + +(declare context) + +(extend-type GreenFuture + mp/Contextual + (-get-context [_] context) + + mp/Extract + (-extract [it] + (try (deref it) + (catch java.util.concurrent.ExecutionException e + (throw (.getCause e)))))) + +(def context + (reify + mp/Context + protocols/RunNode + (-apply-fn [_ f mv] + (vfuture (f (deref mv)))) + + mp/Functor + (-fmap [mn f mv] + (vfuture (f (deref mv)))) + + mp/Monad + (-mreturn [_ v] + (vfuture v)) + + (-mbind [mn mv f] + (vfuture (let [v (deref mv)] + (deref (f v))))) + + mp/Applicative + (-pure [_ v] + (vfuture v)) + + (-fapply [_ pf pv] + (vfuture (let [f (deref pf) + v (deref pv)] + (f v)))))) diff --git a/src/nodely/engine/virtual_future.clj b/src/nodely/engine/virtual_future.clj index 5e1a237..44fadca 100644 --- a/src/nodely/engine/virtual_future.clj +++ b/src/nodely/engine/virtual_future.clj @@ -1,6 +1,6 @@ (ns nodely.engine.virtual-future (:import - [clojure.lang IBlockingDeref IDeref IPending] + [clojure.lang IBlockingDeref IDeref IPending Var] [java.util.concurrent ExecutorService Executors Future])) (def ^:private virtual-thread-factory @@ -12,10 +12,12 @@ (deftype GreenFuture [^Future executor-future] IDeref - (deref [_] (#'clojure.core/deref-future executor-future)) + (deref [_] (.get executor-future)) IBlockingDeref (deref [_ timeout-ms timeout-val] - (#'clojure.core/deref-future executor-future timeout-ms timeout-val)) + (try (.get executor-future timeout-ms java.util.concurrent.TimeUnit/MILLISECONDS) + (catch java.util.concurrent.TimeoutException e + timeout-val))) IPending (isRealized [_] (.isDone executor-future)) Future @@ -29,7 +31,10 @@ "This function is similar to the clojure.core future-call. The only difference is that it receives as a param a dynamic executor-service instead of using clojure.lang.Agent/soloExecutor" [^ExecutorService executor-service f] - (let [f (#'clojure.core/binding-conveyor-fn f) + (let [binds (Var/getThreadBindingFrame) + f (fn [] + (Var/resetThreadBindingFrame binds) + (f)) fut (.submit executor-service ^Callable f)] (GreenFuture. fut))) diff --git a/src/nodely/engine/virtual_workers.clj b/src/nodely/engine/virtual_workers.clj index f2848f1..a5c6eff 100644 --- a/src/nodely/engine/virtual_workers.clj +++ b/src/nodely/engine/virtual_workers.clj @@ -1,6 +1,7 @@ (ns nodely.engine.virtual-workers (:refer-clojure :exclude [eval]) (:require + [clojure.core.async :as async] [loom.alg :as alg] [nodely.data :as data] [nodely.engine.core :as core] @@ -64,3 +65,10 @@ (defn eval-node [node env] (eval-key (assoc env ::target node) ::target)) + +(defn eval-key-channel + [env k] + (let [ret (async/chan 1)] + (virtual-future/vfuture + (async/>!! ret (eval-key env k))) + ret)) diff --git a/test/nodely/api_test.clj b/test/nodely/api_test.clj index 3ddf474..9275734 100644 --- a/test/nodely/api_test.clj +++ b/test/nodely/api_test.clj @@ -40,6 +40,7 @@ :async.manifold :applicative.promesa :applicative.core-async + :applicative.virtual-future :async.virtual-futures}) (defn channel-interface