Skip to content

Commit

Permalink
Add submit-matching-entities function and tests
Browse files Browse the repository at this point in the history
This function accepts entities instead of just one. It is a nice addition cause it allow us to batch
update. Note that the same time options (i.e.: start-valid-time) is applied to each entity.
  • Loading branch information
arichiardi committed Oct 12, 2023
1 parent 7dec1c5 commit a621dae
Show file tree
Hide file tree
Showing 5 changed files with 234 additions and 23 deletions.
3 changes: 2 additions & 1 deletion deps.edn
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@
:deps {}
:aliases
{:test {:extra-paths ["test"]
:extra-deps {com.xtdb/xtdb-core {:mvn/version "1.23.2"}
:extra-deps {clojure.java-time/clojure.java-time {:mvn/version "1.3.0"}
com.xtdb/xtdb-core {:mvn/version "1.23.2"}
lambdaisland/kaocha {:mvn/version "1.70.1086"}
org.slf4j/slf4j-nop {:mvn/version "1.7.36"}}
:main-opts ["-m" "kaocha.runner"]}
Expand Down
79 changes: 57 additions & 22 deletions src/com/cohesic/xtdb.clj
Original file line number Diff line number Diff line change
Expand Up @@ -129,12 +129,12 @@ This version of the function is memoized."
The :atomic? flag also adds a ::xt/match op if the current entity was present."
[current-entity new-entity & {:keys [atomic? start-valid-time end-valid-time]}]
(let [entity-id (or (:xt/id current-entity) (:xt/id new-entity))]
(if-not (= current-entity new-entity)
(let [match-op [::xt/match entity-id current-entity start-valid-time]
put-op [::xt/put new-entity start-valid-time end-valid-time]]
(if atomic? [match-op put-op] [put-op]))
[])))
(if-not (= current-entity new-entity)
(let [entity-id (or (:xt/id current-entity) (:xt/id new-entity))
match-op [::xt/match entity-id current-entity start-valid-time]
put-op [::xt/put new-entity start-valid-time end-valid-time]]
(if atomic? [match-op put-op] [put-op]))
[]))

(defn reverse-tx-ops-before-time
"Scan the entity history and return tx ops that reverse the entity right before the input time.
Expand All @@ -153,29 +153,64 @@ This version of the function is memoized."
iterator-seq
(reverse-history-before-time entity-id as-of-time opts))))

(defn submit-matching-entities
"Submit a transaction for the entities, atomically.
Atomicity is implemented this way: if the new entity coincides with
the one on disk, no transaction is sent over.
If the entity is differs, it adds a `::xt/match` against the old one
before inserting. Additionally, it uses `xt/await-tx` for making sure
we block until a transaction is indexed.
It returns the transaction returned by `xt/submit-tx` or `nil` if
nothing no transaction was submitted.
Shout out @emccue:
https://clojurians.slack.com/archives/CG3AM2F7V/p1638606871304400?thread_ts=1638590188.303400&cid=CG3AM2F7V
The optional `opts` map is:
{:start-valid-time java.util.Date
:end-valid-time java.util.Date}
Note that the same time option (i.e.: start-valid-time) is applied to
each entity."
[xtdb-node entities & {:keys [start-valid-time] :as opts}]
(when (seq entities)
(let [entity-ids (mapv :xt/id entities)
results (-> (xt/db xtdb-node start-valid-time)
(xt/q '{:find [(pull ?e [*])] :where [[?e :xt/id ?id]] :in [[?id ...]]}
entity-ids))
current-entity-by-id (->> results
(map first)
(group-by :xt/id))
opts (assoc opts :atomic? true)
tx-ops (into []
(mapcat #(idempotent-put-ops (-> current-entity-by-id
(get (:xt/id %))
first)
%
opts))
entities)]
(when (seq tx-ops)
(let [tx (xt/submit-tx xtdb-node tx-ops)]
(xt/await-tx xtdb-node tx)
tx)))))

(defn submit-matching-entity
"Submit a transaction with the new entity atomically.
If the new entity coincides with the one on disk, no transaction is
sent over and `nil` is return.
Atomicity is implemented this way: if the new entity coincides with
the one on disk, no transaction is sent over and `nil` is returned.
Otherwise what xt/submit-tx return is returned.
Otherwise this function returns the transaction returned by
`xt/submit-tx`.
Shout out @emccue:
https://clojurians.slack.com/archives/CG3AM2F7V/p1638606871304400?thread_ts=1638590188.303400&cid=CG3AM2F7V
Opts are:
The optional `opts` map is:
{:start-valid-time java.util.Date
:end-valid-time java.util.Date}"
[xtdb-node new-entity & {:keys [start-valid-time] :as opts}]
(let [entity-id (:xt/id new-entity)
current-entity (-> (xt/db xtdb-node start-valid-time)
(xt/q '{:find [(pull ?e [*])] :where [[?e :xt/id ?id]] :in [?id]}
entity-id)
ffirst)
opts (assoc opts :atomic? true)
tx-ops (idempotent-put-ops current-entity new-entity opts)]
(when (seq tx-ops)
(as-> (xt/submit-tx xtdb-node tx-ops) tx
(xt/await-tx xtdb-node tx)
tx))))
[xtdb-node new-entity & {:as opts}]
(submit-matching-entities xtdb-node [new-entity] opts))
128 changes: 128 additions & 0 deletions test/com/cohesic/kv_store_test.clj
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
(ns com.cohesic.kv-store-test
(:require
[clojure.test :as test :refer [deftest is testing use-fixtures]]
[com.cohesic.xtdb :as sut]
[com.cohesic.xtdb.fixtures :as fixtures]
[com.cohesic.xtdb.test :as xtest]
[java-time.api :as jtime]
[xtdb.api :as xt]))

(use-fixtures :each fixtures/with-kv-store)

(deftest submit-matching-entity-sanity-check
(let [node xtest/*node*
p1 (xtest/random-person)]
(testing "submitting against empty database"
(xtest/is-committed? node (sut/submit-matching-entity node p1))
(is (= (:xt/id p1)
(-> (xt/db node)
(xt/q {:find '[e] :where [['e :person/name (:person/name p1)]]})
ffirst))))))

(deftest submit-matching-entities-empty-entities
(let [node xtest/*node* entities []] (is (nil? (sut/submit-matching-entities node entities)))))

(deftest submit-matching-entities-sanity-check
(let [node xtest/*node*
p1 (xtest/random-person)
p2 (xtest/random-person)
entities [p1 p2]
entity-ids (into #{} (map :xt/id entities))]
(testing "submitting against empty database"
(xtest/is-committed? node (sut/submit-matching-entities node entities))
(is (= entity-ids
(-> (xt/db node)
(xt/q '{:find [e] :in [[?id ...]] :where [[e :xt/id ?id]]}
entity-ids)
(xtest/result-set)))))
(testing "submitting against populated database"
(testing "changing a document"
(let [p1-last-name "Foo"
p1-bis (assoc p1 :person/last-name p1-last-name)
entities [p1-bis]]
(xtest/is-committed? node (sut/submit-matching-entities node entities))
(is (= p1-last-name
(-> (xt/db node)
(xt/q {:find '[?last-name]
:where [['e :person/name (:person/name p1)]
'[e :person/last-name ?last-name]]})
ffirst)))))
(testing "submitting a document unchanged"
(let [entities [p2]]
(is (nil? (sut/submit-matching-entities node entities))
"the function returns `nil` when there is nothing to do"))))))

(deftest submit-matching-entities-at-start-valid-time
(let [node xtest/*node*
p1 (xtest/random-person)
p2 (xtest/random-person)
entities [p1 p2]
entity-ids (into #{} (map :xt/id entities))]
(testing "submitting entities at :start-valid-time"
(let [start-valid-time (jtime/instant)]
(xtest/is-committed? node
(sut/submit-matching-entities
node
entities
:start-valid-time
(jtime/java-date start-valid-time)))
(testing "fetching before :start-valid-time"
(let [before-start-time (jtime/minus start-valid-time (jtime/days 1))]
(is (empty?
(-> (xt/db node (jtime/java-date before-start-time))
(xt/q '{:find [e] :in [[?name ...]] :where [[e :person/name ?name]]}
(mapv :person/name entities))))
"nothing shows up before :start-valid-time")))
(testing "fetching after :start-valid-time"
(let [after-start-time (jtime/plus start-valid-time (jtime/days 1))]
(is (= entity-ids
(-> (xt/db node (jtime/java-date after-start-time))
(xt/q '{:find [e] :in [[?name ...]] :where [[e :person/name ?name]]}
(mapv :person/name entities))
(xtest/result-set))))))))))

(deftest submit-matching-entities-at-start+end-valid-time
(let [node xtest/*node*
p1 (xtest/random-person)
p2 (xtest/random-person)
entities [p1 p2]
entity-ids (into #{} (map :xt/id entities))]
(testing "submitting entities at :start-valid-time *and* :end-valid-time"
(let [start-valid-time (jtime/instant)
end-valid-time (jtime/plus (jtime/instant) (jtime/days 3))]
(println end-valid-time)
(xtest/is-committed? node
(sut/submit-matching-entities
node
entities
:start-valid-time (jtime/java-date start-valid-time)
:end-valid-time (jtime/java-date end-valid-time)))
(testing "fetching before :start-valid-time"
(let [before-start-time (jtime/minus start-valid-time (jtime/days 1))]
(is (empty?
(-> (xt/db node (jtime/java-date before-start-time))
(xt/q '{:find [e] :in [[?name ...]] :where [[e :person/name ?name]]}
(mapv :person/name entities))))
"nothing shows up after :end-valid-time")))
(testing "fetching after :start-valid-time"
(let [after-start-time (jtime/plus start-valid-time (jtime/days 1))]
(is (= entity-ids
(-> (xt/db node (jtime/java-date after-start-time))
(xt/q '{:find [e] :in [[?name ...]] :where [[e :person/name ?name]]}
(mapv :person/name entities))
(xtest/result-set)))
"nothing shows up after :end-valid-time")))
(testing "fetching before :end-valid-time"
(let [before-end-time (jtime/plus (jtime/instant) (jtime/days 2))]
(is (= entity-ids
(-> (xt/db node (jtime/java-date before-end-time))
(xt/q '{:find [e] :in [[?name ...]] :where [[e :person/name ?name]]}
(mapv :person/name entities))
(xtest/result-set))))))
(testing "fetching after :end-valid-time"
(let [after-end-time (jtime/plus end-valid-time (jtime/days 1))]
(is (empty?
(-> (xt/db node (jtime/java-date after-end-time))
(xt/q '{:find [e] :in [[?name ...]] :where [[e :person/name ?name]]}
(mapv :person/name entities))))
"nothing shows up after :end-valid-time")))))))
28 changes: 28 additions & 0 deletions test/com/cohesic/xtdb/fixtures.clj
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
(ns com.cohesic.xtdb.fixtures
(:require
[clojure.java.io :as io]
[com.cohesic.xtdb.test :as test]
[xtdb.api :as xt]
[xtdb.io :as xio]
[xtdb.mem-kv])
(:import
java.nio.file.Files
java.nio.file.attribute.FileAttribute))

(def ^:dynamic *db-dir-prefix* "cohesic-xtdb-tests")
(def ^:dynamic *kv-opts* {})

(defn with-kv-store
[f]
(let [db-dir (.toFile (Files/createTempDirectory *db-dir-prefix* (make-array FileAttribute 0)))]
(try
(letfn [(kv-store [db-dir-suffix]
{:kv-store (merge {:xtdb/module 'xtdb.mem-kv/->kv-store
:sync? true
:db-dir (io/file db-dir db-dir-suffix)}
*kv-opts*)})]
(with-open [node (xt/start-node {:xtdb/tx-log (kv-store "tx-log")
:xtdb/document-store (kv-store "doc-store")
:xtdb/index-store (kv-store "index-store")})]
(binding [test/*node* node] (f))))
(finally (xio/delete-dir db-dir)))))
19 changes: 19 additions & 0 deletions test/com/cohesic/xtdb/test.clj
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
(ns com.cohesic.xtdb.test
(:require
[clojure.test :as test]
[xtdb.api :as xt])
(:import
xtdb.api.IXtdb))

(def ^:dynamic ^IXtdb *node*)

(defn random-person
[]
{:xt/id (random-uuid)
:person/name (rand-nth ["Giovanni" "Pietro" "Sergio" "Alessio" "Danilo"])
:person/last-name (rand-nth ["Giovannone" "Pietrone" "Sergione" "Alessione" "Danilone"])
:person/age (rand-int 100)})

(defn is-committed? [node tx] (test/is (xt/tx-committed? node tx)))

(defn result-set [results] (into #{} (map first) results))

0 comments on commit a621dae

Please sign in to comment.