Skip to content

Commit

Permalink
Merge pull request #22 from nubank/closure-sequences
Browse files Browse the repository at this point in the history
Closure sequences
  • Loading branch information
aredington authored Feb 16, 2024
2 parents 2781952 + cf41771 commit 110b81d
Show file tree
Hide file tree
Showing 14 changed files with 116 additions and 49 deletions.
4 changes: 2 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# Changelog

## Unreleased
- Better error message for sync lazy engine when env is missing a key
## 1.16.0 / 2024-02-16
- Allow sequences to refer to any node in the environment for the mapped fn

## 1.15.0 / 2023-12-07
- Allow running the lazy synchronous engine with a channel return.
Expand Down
2 changes: 1 addition & 1 deletion project.clj
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
(defproject dev.nu/nodely "1.15.0"
(defproject dev.nu/nodely "1.16.0"
:description "Decoupling data fetching from data dependency declaration"
:url "https://github.com/nubank/nodely"
:license {:name "MIT"}
Expand Down
52 changes: 35 additions & 17 deletions src/nodely/data.clj
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
(ns nodely.data
(:refer-clojure :exclude [map sequence])
(:require
[clojure.set :as set]
[schema.core :as s]))

;;
Expand All @@ -24,10 +25,12 @@
:truthy (s/recursive #'Node)
:falsey (s/recursive #'Node)})

(s/defschema Sequence #::{:type (s/eq :sequence)
:input s/Keyword
:fn (s/pred ifn?)
:tags #{node-tag}})
(s/defschema Sequence #::{:type (s/eq :sequence)
:input s/Keyword
:process-node (s/conditional
#(= (get % ::type) :value) Value
#(= (get % ::type) :leaf) Leaf)
:tags #{node-tag}})

(s/defschema Node (s/conditional
#(= (get % ::type) :value) Value
Expand Down Expand Up @@ -69,15 +72,23 @@

(s/defn sequence
([input :- s/Keyword
fn]
(sequence input fn #{}))
f]
(sequence input f #{}))
([input :- s/Keyword
fn
f
tags :- #{node-tag}]
#::{:type :sequence
:input input
:process-node (value f)
:tags tags})
([input :- s/Keyword
closure-inputs
f
tags :- #{node-tag}]
#::{:type :sequence
:input input
:fn fn
:tags tags}))
#::{:type :sequence
:input input
:process-node (leaf closure-inputs f)
:tags tags}))

;;
;; Node Utilities
Expand All @@ -90,13 +101,20 @@
[node]
(= :value (::type node)))

(defn node-inputs
(defn leaf?
[node]
(case (::type node)
:value #{}
:leaf (::inputs node)
:branch (recur (::condition node))
:sequence #{(::input node)}))
(= :leaf (::type node)))

(defn node-inputs
([node]
(node-inputs node #{}))
([node inputs]
(case (::type node)
:value inputs
:leaf (set/union inputs (::inputs node))
:branch (recur (::condition node) inputs)
:sequence (recur (::process-node node)
(conj inputs (::input node))))))

;;
;; Env Utils
Expand Down
18 changes: 12 additions & 6 deletions src/nodely/engine/applicative.clj
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,18 @@

(defn eval-sequence
[node lazy-env opts]
(let [f (::data/fn node)
in-key (::data/input node)]
(m/alet [input-seq (get lazy-env in-key)
result (sequence (map (fn [x] (m/fmap f (m/pure x)))
(::data/value input-seq)))]
(data/value result))))
(let [in-key (::data/input node)
mf (m/fmap ::data/value
(eval-node (::data/process-node node) lazy-env opts))
mseq (get lazy-env in-key)]
(->> mseq
(m/fmap (comp m/sequence
(partial map
(comp (partial m/fapply mf)
m/pure))
::data/value))
m/join
(m/fmap data/value))))

(defn eval-branch
[{::data/keys [condition truthy falsey]}
Expand Down
6 changes: 3 additions & 3 deletions src/nodely/engine/core.clj
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,8 @@

(defn prepare-inputs
[input-keys env]
(->> (select-keys env input-keys)
(map (juxt key (comp ::data/value val)))
(->> input-keys
(map (juxt identity (comp ::data/value (partial get env))))
(into {})))

(defn eval-leaf
Expand Down Expand Up @@ -126,7 +126,7 @@
(defn- sequence->value
[node env]
(let [in-key (::data/input node)
f (::data/fn node)
f (::data/value (first (node->value (::data/process-node node) env)))
new-env (resolve-inputs [in-key] env)
in (data/get-value new-env in-key)]
[(data/value (mapv f in)) new-env]))
Expand Down
2 changes: 1 addition & 1 deletion src/nodely/engine/core_async/iterative_scheduling.clj
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
[node resolved-env {::keys [max-sequence-parallelism]
:or {max-sequence-parallelism 4}}]
(let [in-key (::data/input node)
f (::data/fn node)
f (::data/value (first (core/node->value (::data/process-node node) resolved-env)))
sequence (map data/value (get (core/prepare-inputs [in-key] resolved-env) in-key))
in-chan (async/to-chan! sequence)
pipeline-result (async/chan)]
Expand Down
19 changes: 12 additions & 7 deletions src/nodely/engine/core_async/lazy_scheduling.clj
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,19 @@

(defn eval-sequence
[node
sequence-input-promise-chan
lazy-env
{::keys [max-sequence-parallelism out-ch exception-ch]
:or {max-sequence-parallelism 4}}]
:or {max-sequence-parallelism 4}
:as opts}]
(nodely.async/jog
(let [f (::data/fn node)
sequence (map data/value (::data/value (async/<! sequence-input-promise-chan)))
(let [dep (::data/input node)
input-chan (get lazy-env dep)
f-ch (async/chan 1)
f (do (eval-async (::data/process-node node)
lazy-env
(assoc opts ::out-ch f-ch))
(::data/value (async/<! f-ch)))
sequence (map data/value (::data/value (async/<! input-chan)))
in-chan (async/to-chan! sequence)
pipeline-result (async/chan)]
(async/pipeline-async max-sequence-parallelism
Expand Down Expand Up @@ -74,9 +81,7 @@
::data/tags
::data/blocking)})
exception-ch))))
:sequence (let [dep (::data/input node)
chan (get lazy-env dep)]
(eval-sequence node chan opts))
:sequence (eval-sequence node lazy-env opts)
:branch (eval-branch node lazy-env opts)))
out-ch)

Expand Down
4 changes: 3 additions & 1 deletion src/nodely/engine/manifold.clj
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
[nodely.data :as data]
[nodely.engine.core :as core]))

(declare eval-async)

(defn- prepare-inputs
[input-keys future-env]
(->> (select-keys future-env input-keys)
Expand All @@ -20,7 +22,7 @@
(defn eval-sequence
[node future-env]
(let [in-key (::data/input node)
f (::data/fn node)
f (::data/value (eval-async (::data/process-node node) future-env))
in (prepare-inputs [in-key] future-env)]
(data/value (->> (get in in-key)
(mapv #(deferred/future (f %)))
Expand Down
10 changes: 8 additions & 2 deletions src/nodely/syntax.clj
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,14 @@
(data/value n)))

(defmacro >sequence
[fn input]
(list `data/sequence (question-mark->keyword input) fn))
[f input]
(let [symbols-to-be-replaced (question-mark-symbols f)
closure-inputs (mapv question-mark->keyword symbols-to-be-replaced)
fn-fn (fn-with-arg-map symbols-to-be-replaced f)]
(assert-not-shadowing! symbols-to-be-replaced)
(if (seq symbols-to-be-replaced)
`(data/sequence ~(question-mark->keyword input) ~closure-inputs ~fn-fn #{})
`(data/sequence ~(question-mark->keyword input) ~f #{}))))

(defmacro >leaf
[expr]
Expand Down
11 changes: 9 additions & 2 deletions test/nodely/engine/applicative_test.clj
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
[nodely.engine.core-async.core :as nodely.async]
[nodely.engine.schema :as schema]
[nodely.fixtures :as fixtures]
[nodely.syntax :as syntax :refer [>leaf >value]]
[nodely.syntax :as syntax :refer [>leaf >sequence >value]]
[nodely.syntax.schema :refer [yielding-schema]]
[promesa.core :as p]
[schema.core :as s]))
Expand Down Expand Up @@ -74,6 +74,10 @@
(def env-with-sequence {:a (>leaf [1 2 3])
:b (syntax/>sequence inc ?a)})

(def env-with-closure-sequence {:x (data/value [1 2 3])
:z (data/value 2)
:y (>sequence #(* % ?z) ?x)})

(def env+sequence-with-nil-values
{:a (>leaf [1 2 nil 4])
:b (syntax/>sequence #(when % (inc %)) ?a)})
Expand Down Expand Up @@ -175,7 +179,10 @@
(is (match? (matchers/within-delta 8000000 2000000000)
(- nanosec-sync nanosec-async)))))
(testing "Actually computes the correct answers"
(is (match? [2 3 4] (applicative/eval-key env-with-sequence+delay :c)))))
(is (match? [2 3 4] (applicative/eval-key env-with-sequence+delay :c))))
(testing "resolve closure sequence"
(is (= [2 4 6]
(applicative/eval-key env-with-closure-sequence :y)))))

(deftest schema-test
(let [env-with-schema {:a (>value 2)
Expand Down
12 changes: 9 additions & 3 deletions test/nodely/engine/core_async/lazy_scheduling_test.clj
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
[nodely.engine.core-async.lazy-scheduling :as nasync]
[nodely.engine.lazy :as engine.lazy]
[nodely.fixtures :as fixtures]
[nodely.syntax :as syntax :refer [>cond >leaf >value blocking]]))
[nodely.syntax :as syntax :refer [>cond >leaf >sequence >value blocking]]))

(def test-env {:a (>leaf (+ 1 2))
:b (>leaf (* ?a 2))
Expand Down Expand Up @@ -57,7 +57,11 @@
:c ?c})})

(def env-with-sequence {:a (>leaf [1 2 3])
:b (syntax/>sequence inc ?a)})
:b (>sequence inc ?a)})

(def env-with-closure-sequence {:a (>value [1 2 3])
:c (>value 2)
:b (>sequence (fn [e] (* e ?c)) ?a)})

(def env+sequence-with-nil-values
{:a (>leaf [1 2 nil 4])
Expand Down Expand Up @@ -142,7 +146,9 @@
(is (match? (matchers/within-delta 8000000 2000000000)
(- nanosec-sync nanosec-async)))))
(testing "Actually computes the correct answers"
(is (= [2 3 4] (nasync/eval-key env-with-sequence+delay :b)))))
(is (match? [2 3 4] (nasync/eval-key env-with-sequence+delay :b))))
(testing "When there's a closure in the sequence expr"
(is (match? [2 4 6] (nasync/eval-key env-with-closure-sequence :b)))))

(deftest eval-test
(testing "eval node async"
Expand Down
13 changes: 11 additions & 2 deletions test/nodely/engine/core_test.clj
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
[nodely.data :as data]
[nodely.engine.core :as core]
[nodely.fixtures :as fixtures]
[nodely.syntax :as syntax :refer [>cond >if >leaf]]
[nodely.syntax :as syntax :refer [>cond >if >leaf >sequence]]
[schema.test]))

(use-fixtures :once schema.test/validate-schemas)
Expand Down Expand Up @@ -69,6 +69,10 @@
(def env-with-sequence {:x (data/value [1 2 3])
:y (data/sequence :x inc)})

(def env-with-closure-sequence {:x (data/value [1 2 3])
:z (data/value 2)
:y (>sequence (fn [e] (* e ?z)) ?x)})

(def branch-with-sequence {:x (data/value [1 2 3])
:y (data/value [4 5 6])
:a (data/value true)
Expand Down Expand Up @@ -165,6 +169,11 @@
(is (= {:x (data/value [1 2 3])
:y (data/value [2 3 4])}
(core/resolve :y env-with-sequence))))
(testing "resolve closure sequence"
(is (= {:x (data/value [1 2 3])
:z (data/value 2)
:y (data/value [2 4 6])}
(core/resolve :y env-with-closure-sequence))))
(testing "resolve with nested branches"
(is (= {:x (data/value 1)
:y (data/value 2)
Expand Down Expand Up @@ -245,4 +254,4 @@
(testing "there is a cycle in the env and checked-env detects it"
(is (thrown? clojure.lang.ExceptionInfo (core/checked-env env-with-cycle))))
(testing "there is no cycle but checked-env reports there is (corner case mutually exclusive)"
(is (thrown? clojure.lang.ExceptionInfo (core/checked-env mutually-exclusive-env-without-cycles)))))
(is (thrown? clojure.lang.ExceptionInfo (core/checked-env mutually-exclusive-env-without-cycles)))))
9 changes: 8 additions & 1 deletion test/nodely/engine/manifold_test.clj
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,11 @@
(def env-with-sequence {:a (>leaf [1 2 3])
:b (syntax/>sequence inc ?a)})

(def env-with-sequence-with-closure
{:a (syntax/>value [1 2 3])
:c (syntax/>value 2)
:b (syntax/>sequence #(* % ?c) ?a)})

(def env-with-sequence+delay {:a (>leaf [1 2 3])
:b (syntax/>sequence
#(do (Thread/sleep 1000) (inc %))
Expand Down Expand Up @@ -54,7 +59,9 @@
(is (match? (matchers/within-delta 8000000 2000000000)
(- nanosec-sync nanosec-async)))))
(testing "Actually computes the correct answers"
(is (= [2 3 4] (manifold/eval-key env-with-sequence+delay :b)))))
(is (= [2 3 4] (manifold/eval-key env-with-sequence+delay :b))))
(testing "Supports closure of nodes in the iterated fn"
(is (= [2 4 6] (manifold/eval-key env-with-sequence-with-closure :b)))))

(deftest eval-test
(testing "eval node async"
Expand Down
3 changes: 2 additions & 1 deletion test/nodely/syntax_test.clj
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,8 @@
:fn fn?}
:b #::data {:type :sequence
:input :a
:fn fn?}}
:process-node #::data {:type :value
:value fn?}}}
{:a (>leaf [1 2 3])
:b (>sequence inc ?a)}))))

Expand Down

0 comments on commit 110b81d

Please sign in to comment.