Skip to content

Commit

Permalink
Merge pull request #27 from luposlip/append-docs
Browse files Browse the repository at this point in the history
Wrong index offset when appending > 128 docs
  • Loading branch information
luposlip authored Oct 4, 2023
2 parents f23f8ed + 8284947 commit b69f287
Show file tree
Hide file tree
Showing 6 changed files with 53 additions and 37 deletions.
8 changes: 8 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,14 @@ All notable changes to this project will be documented in this file. This change
- Timestamps for historical versions
- Optimize (speed+size of) low level index format

## [0.9.0-beta8] - 2023-10-04

### Fixed

- Indexing didn't work when appending more than [batch size] documents

NB: [batch size] is currently set to 128.

## [0.9.0-beta6+7] - 2023-10-03

- Append documents to existing nd-db files (previously v1.0.0)
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
# nd-db

```clojure
[com.luposlip/nd-db "0.9.0-beta7"]
[com.luposlip/nd-db "0.9.0-beta8"]
```

_Newline Delimited (read-only) Databases!_
Expand Down
2 changes: 1 addition & 1 deletion project.clj
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
(defproject com.luposlip/nd-db "0.9.0-beta7"
(defproject com.luposlip/nd-db "0.9.0-beta8"
:description "Clojure library to use newline delimited files as fast read-only databases."
:url "https://github.com/luposlip/nd-db"
:license {:name "Apache License, Version 2.0"
Expand Down
17 changes: 9 additions & 8 deletions src/nd_db/core.clj
Original file line number Diff line number Diff line change
Expand Up @@ -178,15 +178,16 @@ meaning DON'T do parallel writes to database..!"
(when (or (nil? doc-emitter)
log-limit)
(throw (ex-info "Can't write to historical database (when log-limit is set)!" {:log-limit log-limit})))
(let [docs (if (map? doc-or-docs)
[doc-or-docs]
doc-or-docs)]
(loop [all-docs-part (partition-all 128 docs)
(let [all-docs (if (map? doc-or-docs)
[doc-or-docs]
doc-or-docs)]
(loop [docs-parts (partition-all 128 all-docs)
aggr-db db]
(if (empty? all-docs-part)
(if (empty? docs-parts)
aggr-db
(let [docs-part-stringed (map doc-emitter (first all-docs-part))]
(recur (rest all-docs-part)
(let [docs-part (first docs-parts)
docs-part-stringed (map doc-emitter docs-part)]
(recur (rest docs-parts)
(-> aggr-db
(emit-docs (->> docs-part-stringed (str/join "\n")))
(ndix/append docs docs-part-stringed))))))))
(ndix/append docs-part docs-part-stringed))))))))
57 changes: 30 additions & 27 deletions src/nd_db/index.clj
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@
[clojure.core.reducers :as r]
[nd-db
[util :as ndut]
[io :as ndio]])
[io :as ndio]]
[clojure.string :as str])
(:import [java.time Instant]
[java.io
BufferedReader FileReader
Expand Down Expand Up @@ -102,7 +103,7 @@ Consider converting the index via nd-db.convert/upgrade-nddbmeta!
db)))
(ndio/append-writer (or serialized-filename (ndio/serialized-db-filepath db))))

(defn append
(defn ^:fx append
"Appends a doc to the index, returns the updated database value."
[{:keys [id-fn id-path] :as db} docs doc-emission-strs]
{:pre [(ndut/db? db)
Expand All @@ -122,32 +123,34 @@ Consider converting the index via nd-db.convert/upgrade-nddbmeta!
docs (if (map? docs) [docs] docs)
doc-emission-strs (if (string? doc-emission-strs) [doc-emission-strs] doc-emission-strs)
doc-ids (map doc-id-fn docs)
[_ [offset length]] (-> serialized-filename
ndio/last-line
ndio/str->)]

[offset length] (->> db :index deref (map second) (sort-by first >) first)
index-data (loop [this-offset (+ offset length 1)
des doc-emission-strs
ids doc-ids
aggr []]
(if (empty? ids)
aggr
(let [doc-str-count (-> des first count)
next-offset (+ this-offset doc-str-count 1)]
(when (zero? doc-str-count)
(throw (ex-info
"Stringified document can't have length 0!"
{:vid (first ids)
:doc-emission-str (first des)})))
(recur next-offset (rest des) (rest ids)
(conj aggr [(first ids) [this-offset doc-str-count]])))))]
(with-open [w (append-writer db serialized-filename)]
(let [index-data (loop [this-offset (+ offset length 1)
des doc-emission-strs
ids doc-ids
aggr []]
(if (empty? ids)
aggr
(let [doc-str-count (-> des first count)
next-offset (+ this-offset doc-str-count 1)]
(recur next-offset (rest des) (rest ids)
(conj aggr [(first ids) [this-offset doc-str-count]])))))]
(doseq [ivec index-data]
(#'ndio/write-nippy-ln w ivec))
(.flush w)
(update db :index
(fn [idx]
(delay
(reduce
(fn [a [doc-id idx-vec]]
(assoc a doc-id idx-vec))
(deref idx)
index-data))))))))
(doseq [ivec index-data]
(#'ndio/write-nippy-ln w ivec))
(.flush w))
(update db :index
(fn [idx]
(delay
(reduce
(fn [a [doc-id idx-vec]]
(assoc a doc-id idx-vec))
(deref idx)
index-data))))))

(defn re-index
"Re-index the database, with a limit on the log size.
Expand Down
4 changes: 4 additions & 0 deletions test/nd_db/core_test.clj
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,10 @@
"New index line count is old line count plus docs appended")
(is (not= (first docs) (nddb/q db 1)) "Old db returns old doc")
(is (= (first docs) (nddb/q new-db 1)) "New db returns new version")

(let [fresh-db (nddb/db :filename tmp-filename :id-path :id)]
(-> fresh-db :index deref)
(is (= (peek docs) (-> fresh-db (nddb/q 222))) "Re-read updated index & q"))
(delete-meta db)
(io/delete-file tmp-filename)))

Expand Down

0 comments on commit b69f287

Please sign in to comment.