From d993fc915de520d3fd799952b36e388ddd9b8860 Mon Sep 17 00:00:00 2001 From: Florian M Date: Mon, 16 Dec 2024 11:52:16 +0100 Subject: [PATCH 1/4] WIP: Add more API calls to deal with multiple keys at once --- src/main/protobuf/fossildbapi.proto | 13 ++++++++++++- .../scalableminds/fossildb/FossilDBGrpcImpl.scala | 6 ++++++ .../scalableminds/fossildb/db/RocksDBStore.scala | 1 - .../fossildb/db/VersionedKeyValueStore.scala | 4 ++++ .../scalableminds/fossildb/FossilDBSuite.scala | 15 +++++++++++++++ 5 files changed, 37 insertions(+), 2 deletions(-) diff --git a/src/main/protobuf/fossildbapi.proto b/src/main/protobuf/fossildbapi.proto index e17394d..5235b7d 100644 --- a/src/main/protobuf/fossildbapi.proto +++ b/src/main/protobuf/fossildbapi.proto @@ -45,6 +45,16 @@ message DeleteReply { optional string errorMessage = 2; } +message DeleteAllByPrefixRequest { + required string collection = 1; + required string prefix = 2; +} + +message DeleteAllByPrefixReply { + required bool success = 1; + optional string errorMessage = 2; +} + message GetMultipleVersionsRequest { required string collection = 1; required string key = 2; @@ -156,10 +166,11 @@ service FossilDB { rpc Put (PutRequest) returns (PutReply) {} rpc Delete (DeleteRequest) returns (DeleteReply) {} rpc DeleteMultipleVersions (DeleteMultipleVersionsRequest) returns (DeleteMultipleVersionsReply) {} + rpc DeleteAllByPrefix (DeleteAllByPrefixRequest) returns (DeleteAllByPrefixReply) {} rpc ListKeys (ListKeysRequest) returns (ListKeysReply) {} rpc ListVersions (ListVersionsRequest) returns (ListVersionsReply) {} rpc Backup (BackupRequest) returns (BackupReply) {} rpc RestoreFromBackup (RestoreFromBackupRequest) returns (RestoreFromBackupReply) {} rpc CompactAllData (CompactAllDataRequest) returns (CompactAllDataReply) {} rpc ExportDB (ExportDBRequest) returns (ExportDBReply) {} -} \ No newline at end of file +} diff --git a/src/main/scala/com/scalableminds/fossildb/FossilDBGrpcImpl.scala b/src/main/scala/com/scalableminds/fossildb/FossilDBGrpcImpl.scala index 2b7a71c..3ab571a 100644 --- a/src/main/scala/com/scalableminds/fossildb/FossilDBGrpcImpl.scala +++ b/src/main/scala/com/scalableminds/fossildb/FossilDBGrpcImpl.scala @@ -60,6 +60,12 @@ class FossilDBGrpcImpl(storeManager: StoreManager) DeleteMultipleVersionsReply(success = true) } { errorMsg => DeleteMultipleVersionsReply(success = false, errorMsg) } + override def deleteAllByPrefix(req: DeleteAllByPrefixRequest): Future[DeleteAllByPrefixReply] = withExceptionHandler(req) { + val store = storeManager.getStore(req.collection) + store.withRawRocksIterator{rocksIt => store.deleteAllByPrefix(rocksIt, req.prefix)} + DeleteAllByPrefixReply(success = true) + } { errorMsg => DeleteAllByPrefixReply(success = false, errorMsg)} + override def listKeys(req: ListKeysRequest): Future[ListKeysReply] = withExceptionHandler(req) { val store = storeManager.getStore(req.collection) val keys = store.withRawRocksIterator{rocksIt => store.listKeys(rocksIt, req.limit, req.startAfterKey)} diff --git a/src/main/scala/com/scalableminds/fossildb/db/RocksDBStore.scala b/src/main/scala/com/scalableminds/fossildb/db/RocksDBStore.scala index 4ccb03e..c930bfe 100644 --- a/src/main/scala/com/scalableminds/fossildb/db/RocksDBStore.scala +++ b/src/main/scala/com/scalableminds/fossildb/db/RocksDBStore.scala @@ -36,7 +36,6 @@ class RocksDBManager(dataDir: Path, columnFamilies: List[String], optionsFilePat } options.setCreateIfMissing(true).setCreateMissingColumnFamilies(true) val defaultColumnFamilyOptions: ColumnFamilyOptions = cfListRef.find(_.getName sameElements RocksDB.DEFAULT_COLUMN_FAMILY).map(_.getOptions).getOrElse(columnOptions) - println(defaultColumnFamilyOptions) val newColumnFamilyDescriptors = (columnFamilies.map(_.getBytes) :+ RocksDB.DEFAULT_COLUMN_FAMILY).diff(cfListRef.toList.map(_.getName)).map(new ColumnFamilyDescriptor(_, defaultColumnFamilyOptions)) val columnFamilyDescriptors = cfListRef.toList ::: newColumnFamilyDescriptors logger.info("Opening RocksDB at " + dataDir.toAbsolutePath) diff --git a/src/main/scala/com/scalableminds/fossildb/db/VersionedKeyValueStore.scala b/src/main/scala/com/scalableminds/fossildb/db/VersionedKeyValueStore.scala index 135c0a8..de4e0dd 100644 --- a/src/main/scala/com/scalableminds/fossildb/db/VersionedKeyValueStore.scala +++ b/src/main/scala/com/scalableminds/fossildb/db/VersionedKeyValueStore.scala @@ -180,6 +180,10 @@ class VersionedKeyValueStore(underlying: RocksDBStore) { deleteIter(versionsIterator) } + def deleteAllByPrefix(rocksIt: RocksIterator, prefix: String): Unit = { + RocksDBStore.scanKeysOnly(rocksIt, prefix, Some(prefix)).foreach(underlying.delete) + } + def put(key: String, version: Long, value: Array[Byte]): Unit = { requireValidKey(key) underlying.put(VersionedKey(key, version).toString, value) diff --git a/src/test/scala/com/scalableminds/fossildb/FossilDBSuite.scala b/src/test/scala/com/scalableminds/fossildb/FossilDBSuite.scala index 19b7ac2..3e784db 100644 --- a/src/test/scala/com/scalableminds/fossildb/FossilDBSuite.scala +++ b/src/test/scala/com/scalableminds/fossildb/FossilDBSuite.scala @@ -134,6 +134,21 @@ class FossilDBSuite extends AnyFlatSpec with BeforeAndAfterEach with TestHelpers assert(testData1 == reply.value) } + "DeleteAllByPrefix" should "delete all versions of all values matching this prefix" in { + client.put(PutRequest(collectionA, "prefixedA", Some(0), testData1)) + client.put(PutRequest(collectionA, "prefixedA", Some(1), testData1)) + client.put(PutRequest(collectionA, "prefixedB", Some(0), testData2)) + client.put(PutRequest(collectionA, "prefixedC", Some(0), testData2)) + client.put(PutRequest(collectionA, "differentKey", Some(0), testData2)) + client.put(PutRequest(collectionA, "differentKey", Some(1), testData2)) + client.put(PutRequest(collectionA, "yetDifferentKey", Some(0), testData2)) + client.deleteAllByPrefix(DeleteAllByPrefixRequest(collectionA, "prefixed")) + val reply = client.listKeys(ListKeysRequest(collectionA)) + assert(reply.keys.length == 2) + assert(reply.keys.contains("differentKey")) + assert(reply.keys.contains("yetDifferentKey")) + } + "ListKeys" should "list all keys of a collection" in { client.put(PutRequest(collectionA, aKey, Some(0), testData1)) client.put(PutRequest(collectionA, aKey, Some(1), testData2)) From c2fdb1c344fe3071741724bef978fdff95b40cc1 Mon Sep 17 00:00:00 2001 From: Florian M Date: Mon, 16 Dec 2024 13:50:28 +0100 Subject: [PATCH 2/4] add PutMultipleVersions --- src/main/protobuf/fossildbapi.proto | 13 +++++++++++++ .../scalableminds/fossildb/FossilDBGrpcImpl.scala | 10 ++++++++++ .../com/scalableminds/fossildb/FossilDBSuite.scala | 10 ++++++++++ 3 files changed, 33 insertions(+) diff --git a/src/main/protobuf/fossildbapi.proto b/src/main/protobuf/fossildbapi.proto index 5235b7d..34119a9 100644 --- a/src/main/protobuf/fossildbapi.proto +++ b/src/main/protobuf/fossildbapi.proto @@ -34,6 +34,18 @@ message PutReply { optional string errorMessage = 2; } +message PutMultipleVersionsRequest { + required string collection = 1; + required string key = 2; + repeated uint64 versions = 3; + repeated bytes values = 4; +} + +message PutMultipleVersionsReply { + required bool success = 1; + optional string errorMessage = 2; +} + message DeleteRequest { required string collection = 1; required string key = 2; @@ -164,6 +176,7 @@ service FossilDB { rpc GetMultipleVersions (GetMultipleVersionsRequest) returns (GetMultipleVersionsReply) {} rpc GetMultipleKeys (GetMultipleKeysRequest) returns (GetMultipleKeysReply) {} rpc Put (PutRequest) returns (PutReply) {} + rpc PutMultipleVersions (PutMultipleVersionsRequest) returns (PutMultipleVersionsReply) {} rpc Delete (DeleteRequest) returns (DeleteReply) {} rpc DeleteMultipleVersions (DeleteMultipleVersionsRequest) returns (DeleteMultipleVersionsReply) {} rpc DeleteAllByPrefix (DeleteAllByPrefixRequest) returns (DeleteAllByPrefixReply) {} diff --git a/src/main/scala/com/scalableminds/fossildb/FossilDBGrpcImpl.scala b/src/main/scala/com/scalableminds/fossildb/FossilDBGrpcImpl.scala index 3ab571a..fc0debd 100644 --- a/src/main/scala/com/scalableminds/fossildb/FossilDBGrpcImpl.scala +++ b/src/main/scala/com/scalableminds/fossildb/FossilDBGrpcImpl.scala @@ -36,6 +36,16 @@ class FossilDBGrpcImpl(storeManager: StoreManager) PutReply(success = true) } { errorMsg => PutReply(success = false, errorMsg) } + override def putMultipleVersions(req: PutMultipleVersionsRequest): Future[PutMultipleVersionsReply] = withExceptionHandler(req) { + val store = storeManager.getStore(req.collection) + require(req.versions.length == req.values.length, s"Must supply as many versions as values, got ${req.versions.length} versions vs ${req.values.length} values.") + require(req.versions.forall(_ >= 0), "Version numbers must be non-negative") + req.versions.zip(req.values).foreach { case (version, value) => + store.put(req.key, version, value.toByteArray) + } + PutMultipleVersionsReply(success = true) + } { errorMsg => PutMultipleVersionsReply(success = false, errorMsg)} + override def delete(req: DeleteRequest): Future[DeleteReply] = withExceptionHandler(req) { val store = storeManager.getStore(req.collection) store.delete(req.key, req.version) diff --git a/src/test/scala/com/scalableminds/fossildb/FossilDBSuite.scala b/src/test/scala/com/scalableminds/fossildb/FossilDBSuite.scala index 3e784db..ec2560b 100644 --- a/src/test/scala/com/scalableminds/fossildb/FossilDBSuite.scala +++ b/src/test/scala/com/scalableminds/fossildb/FossilDBSuite.scala @@ -69,6 +69,16 @@ class FossilDBSuite extends AnyFlatSpec with BeforeAndAfterEach with TestHelpers assert(testData2 == reply.value) } + "PutMultipleVersions" should "overwrite old values, leave others untouched" in { + client.put(PutRequest(collectionA, aKey, Some(0), testData1)) + client.put(PutRequest(collectionA, aKey, Some(2), testData1)) + client.putMultipleVersions(PutMultipleVersionsRequest(collectionA, aKey, Seq(1,2,3), Seq(testData2, testData3, testData3))) + val reply = client.getMultipleVersions(GetMultipleVersionsRequest(collectionA, aKey)) + assert(reply.values.length == 4) + assert(reply.versions == Seq(3,2,1,0)) + assert(reply.values == Seq(testData3, testData3, testData2, testData1)) + } + it should "fail on non-existent collection" in { val reply = client.put(PutRequest("nonExistentCollection", aKey, Some(0), testData1)) assert(!reply.success) From bfe38a39f2be1a39540511f91382b8eef7c6bdcd Mon Sep 17 00:00:00 2001 From: Florian M Date: Mon, 16 Dec 2024 14:45:37 +0100 Subject: [PATCH 3/4] bump minor versions of dependencies --- build.sbt | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/build.sbt b/build.sbt index 17e3e8a..3f5b1c1 100644 --- a/build.sbt +++ b/build.sbt @@ -24,9 +24,9 @@ version := getVersionFromGit scalaVersion := "2.13.12" libraryDependencies ++= Seq( - "ch.qos.logback" % "logback-classic" % "1.4.7", + "ch.qos.logback" % "logback-classic" % "1.5.6", "com.typesafe.scala-logging" %% "scala-logging" % "3.9.5", - "org.scalatest" % "scalatest_2.13" % "3.2.15" % "test", + "org.scalatest" % "scalatest_2.13" % "3.2.19" % "test", "io.grpc" % "grpc-netty" % scalapb.compiler.Version.grpcJavaVersion, "io.grpc" % "grpc-services" % scalapb.compiler.Version.grpcJavaVersion, "com.thesamet.scalapb" %% "scalapb-runtime-grpc" % scalapb.compiler.Version.scalapbVersion, From b1fe6b9f0ba8b5789b67c9b3c8eb488e6b6e56f2 Mon Sep 17 00:00:00 2001 From: Florian M Date: Mon, 16 Dec 2024 14:46:31 +0100 Subject: [PATCH 4/4] changelog --- Changelog.md | 3 +++ 1 file changed, 3 insertions(+) diff --git a/Changelog.md b/Changelog.md index 6f615d4..493e6e3 100644 --- a/Changelog.md +++ b/Changelog.md @@ -1,5 +1,8 @@ # Changelog +## Added + - New API endpoints `DeleteAllByPrefix` and `PutMultipleVersions`. [#47](https://github.com/scalableminds/fossildb/pull/47) + ## Breaking Changes - The `GetMultipleKeys` call now takes a `startAfterKey` instead of a `key` for pagination. The returned list will only start *after* this key. [#38](https://github.com/scalableminds/fossildb/pull/38)