diff --git a/Database/Sources/Database/Options.swift b/Database/Sources/Database/Options.swift index 8c67d8f2..011e9f9d 100644 --- a/Database/Sources/Database/Options.swift +++ b/Database/Sources/Database/Options.swift @@ -22,9 +22,13 @@ public struct Options: ~Copyable, Sendable { rocksdb_options_set_create_if_missing(ptr.value, createIfMissing ? 1 : 0) } - public func setLevelCompactionDynamicLevelBytes(levelCompactionDynamicLevelBytes: Bool) { + public func setLevelCompactionDynamicLevelBytes(_ levelCompactionDynamicLevelBytes: Bool) { rocksdb_options_set_level_compaction_dynamic_level_bytes(ptr.value, levelCompactionDynamicLevelBytes ? 1 : 0) } + + public func setCreateIfMissingColumnFamilies(_ createIfMissingColumnFamilies: Bool) { + rocksdb_options_set_create_missing_column_families(ptr.value, createIfMissingColumnFamilies ? 1 : 0) + } } public struct WriteOptions: ~Copyable, Sendable { diff --git a/Database/Sources/Database/RocksDB.swift b/Database/Sources/Database/RocksDB.swift index f98d2d34..23878026 100644 --- a/Database/Sources/Database/RocksDB.swift +++ b/Database/Sources/Database/RocksDB.swift @@ -2,10 +2,12 @@ import Foundation import rocksdb import Utils -public final class RocksDB: Sendable { +public protocol ColumnFamilyKey: Sendable, CaseIterable, Hashable, RawRepresentable {} + +public final class RocksDB: Sendable { public enum BatchOperation { - case delete(key: Data) - case put(key: Data, value: Data) + case delete(column: CFKey, key: Data) + case put(column: CFKey, key: Data, value: Data) } public enum Error: Swift.Error { @@ -17,12 +19,12 @@ public final class RocksDB: Sendable { case noData } - private let dbOptions: Options private let writeOptions: WriteOptions private let readOptions: ReadOptions - private let db: SendableOpaquePointer + private let db: SafePointer + private let cfHandles: [SendableOpaquePointer] - public init(path: URL) throws(Error) { + public init(path: URL) throws { let dbOptions = Options() // TODO: starting from options here @@ -32,22 +34,50 @@ public final class RocksDB: Sendable { dbOptions.increaseParallelism(cpus: cpus) dbOptions.optimizeLevelStyleCompaction(memtableMemoryBudget: 512 * 1024 * 1024) // 512 MB dbOptions.setCreateIfMissing(true) + dbOptions.setCreateIfMissingColumnFamilies(true) + + let cfOptions = Options() + cfOptions.setLevelCompactionDynamicLevelBytes(true) + + var names = CFKey.allCases.map { "\($0)" } + // ensure always have a default column family + if !names.contains("default") { + names.insert("default", at: 0) + } + var cfOptionsList = names.map { _ in cfOptions.value as OpaquePointer? } + + var outHandles = [OpaquePointer?](repeating: nil, count: names.count) // open DB - db = try Self.call { err, _ in - rocksdb_open(dbOptions.value, path.path, &err).asSendable - } onErr: { message throws(Error) in - throw Error.openFailed(message: message) + let dbPtr = try FFIUtils.withCString(names) { cnames in + var cnames = cnames + return try Self.call { err, _ in + rocksdb_open_column_families( + dbOptions.value, + path.path, + Int32(names.count), + &cnames, + &cfOptionsList, + &outHandles, + &err + ) + } onErr: { message throws in + throw Error.openFailed(message: message) + } } - self.dbOptions = dbOptions + db = SafePointer(ptr: dbPtr!, free: rocksdb_close) + + cfHandles = outHandles.map { $0!.asSendable } writeOptions = WriteOptions() readOptions = ReadOptions() } deinit { - rocksdb_close(db.value) + for handle in cfHandles { + rocksdb_column_family_handle_destroy(handle.value) + } } } @@ -57,8 +87,8 @@ extension RocksDB { private static func call( _ data: [Data], fn: (inout UnsafeMutablePointer?, [(ptr: UnsafeRawPointer, count: Int)]) -> R, - onErr: (String) throws(Error) -> Void - ) throws(Error) -> R { + onErr: (String) throws -> Void + ) throws -> R { var err: UnsafeMutablePointer? defer { free(err) @@ -97,54 +127,71 @@ extension RocksDB { private static func call( _ data: Data..., fn: (inout UnsafeMutablePointer?, [(ptr: UnsafeRawPointer, count: Int)]) -> R, - onErr: (String) throws(Error) -> Void - ) throws(Error) -> R { + onErr: (String) throws -> Void + ) throws -> R { try call(data, fn: fn, onErr: onErr) } private static func call( _ data: Data..., fn: ([(ptr: UnsafeRawPointer, count: Int)]) -> R - ) throws(Error) -> R { + ) throws -> R { try call(data) { _, ptrs in fn(ptrs) - } onErr: { _ throws(Error) in + } onErr: { _ throws in // do nothing as it should never be called } } + + private func getHandle(column: CFKey) -> OpaquePointer { + cfHandles[Int(column.rawValue)].value + } } // MARK: - public methods extension RocksDB { - public func put(key: Data, value: Data) throws(Error) { + public func put(column: CFKey, key: Data, value: Data) throws { + let handle = getHandle(column: column) try Self.call(key, value) { err, ptrs in let key = ptrs[0] let value = ptrs[1] - rocksdb_put(db.value, writeOptions.value, key.ptr, key.count, value.ptr, value.count, &err) - } onErr: { message throws(Error) in + rocksdb_put_cf( + db.value, + writeOptions.value, + handle, + key.ptr, + key.count, + value.ptr, + value.count, + &err + ) + } onErr: { message throws in throw Error.putFailed(message: message) } } - public func get(key: Data) throws -> Data? { + public func get(column: CFKey, key: Data) throws -> Data? { var len = 0 + let handle = getHandle(column: column) let ret = try Self.call(key) { err, ptrs in let key = ptrs[0] - return rocksdb_get(db.value, readOptions.value, key.ptr, key.count, &len, &err) - } onErr: { message throws(Error) in + return rocksdb_get_cf(db.value, readOptions.value, handle, key.ptr, key.count, &len, &err) + } onErr: { message throws in throw Error.getFailed(message: message) } return ret.map { Data(bytesNoCopy: $0, count: len, deallocator: .free) } } - public func delete(key: Data) throws { + public func delete(column: CFKey, key: Data) throws { + let handle = getHandle(column: column) + try Self.call(key) { err, ptrs in let key = ptrs[0] - rocksdb_delete(db.value, writeOptions.value, key.ptr, key.count, &err) - } onErr: { message throws(Error) in + rocksdb_delete_cf(db.value, writeOptions.value, handle, key.ptr, key.count, &err) + } onErr: { message throws in throw Error.deleteFailed(message: message) } } @@ -155,25 +202,27 @@ extension RocksDB { for operation in operations { switch operation { - case let .delete(key): + case let .delete(column, key): + let handle = getHandle(column: column) try Self.call(key) { ptrs in let key = ptrs[0] - rocksdb_writebatch_delete(writeBatch, key.ptr, key.count) + rocksdb_writebatch_delete_cf(writeBatch, handle, key.ptr, key.count) } - case let .put(key, value): + case let .put(column, key, value): + let handle = getHandle(column: column) try Self.call(key, value) { ptrs in let key = ptrs[0] let value = ptrs[1] - rocksdb_writebatch_put(writeBatch, key.ptr, key.count, value.ptr, value.count) + rocksdb_writebatch_put_cf(writeBatch, handle, key.ptr, key.count, value.ptr, value.count) } } } try Self.call { err, _ in rocksdb_write(db.value, writeOptions.value, writeBatch, &err) - } onErr: { message throws(Error) in + } onErr: { message throws in throw Error.batchFailed(message: message) } } diff --git a/Database/Tests/DatabaseTests/RocksDBTests.swift b/Database/Tests/DatabaseTests/RocksDBTests.swift index 7ed6c1d6..9586ccc4 100644 --- a/Database/Tests/DatabaseTests/RocksDBTests.swift +++ b/Database/Tests/DatabaseTests/RocksDBTests.swift @@ -6,9 +6,13 @@ import Testing @testable import Database extension String { - var data: Data { - Data(utf8) - } + var data: Data { Data(utf8) } +} + +enum Columns: UInt8, Sendable, ColumnFamilyKey { + case col1 + case col2 + case col3 } final class RocksDBTests { @@ -17,49 +21,100 @@ final class RocksDBTests { return tmpDir.appendingPathComponent("\(UUID().uuidString)") }() - var rocksDB: RocksDB! + var rocksDB: RocksDB! init() throws { rocksDB = try RocksDB(path: path) } deinit { - rocksDB = nil // close it first - // then delete the files + rocksDB = nil try! FileManager.default.removeItem(at: path) } @Test func basicOperations() throws { - #expect(try rocksDB.get(key: "123".data) == nil) + #expect(try rocksDB.get(column: .col1, key: "123".data) == nil) - try rocksDB.put(key: "123".data, value: "qwe".data) - try rocksDB.put(key: "234".data, value: "asd".data) + try rocksDB.put(column: .col1, key: "123".data, value: "qwe".data) + try rocksDB.put(column: .col1, key: "234".data, value: "asd".data) - #expect(try rocksDB.get(key: "123".data) == "qwe".data) - #expect(try rocksDB.get(key: "234".data) == "asd".data) + #expect(try rocksDB.get(column: .col1, key: "123".data) == "qwe".data) + #expect(try rocksDB.get(column: .col1, key: "234".data) == "asd".data) - try rocksDB.delete(key: "123".data) + try rocksDB.delete(column: .col1, key: "123".data) - #expect(try rocksDB.get(key: "123".data) == nil) + #expect(try rocksDB.get(column: .col1, key: "123".data) == nil) - try rocksDB.put(key: "234".data, value: "asdfg".data) + try rocksDB.put(column: .col1, key: "234".data, value: "asdfg".data) - #expect(try rocksDB.get(key: "234".data) == "asdfg".data) + #expect(try rocksDB.get(column: .col1, key: "234".data) == "asdfg".data) } @Test func testBatchOperations() throws { - try rocksDB.put(key: "123".data, value: "qwe".data) + try rocksDB.put(column: .col1, key: "123".data, value: "qwe".data) + + try rocksDB.batch(operations: [ + .delete(column: .col1, key: "123".data), + .put(column: .col1, key: "234".data, value: "wer".data), + .put(column: .col1, key: "345".data, value: "ert".data), + .delete(column: .col1, key: "234".data), + .put(column: .col1, key: "345".data, value: "ertert".data), + ]) + + #expect(try rocksDB.get(column: .col1, key: "123".data) == nil) + #expect(try rocksDB.get(column: .col1, key: "234".data) == nil) + #expect(try rocksDB.get(column: .col1, key: "345".data) == "ertert".data) + } + + @Test func testMultipleColumnFamilies() throws { + // Test operations across different column families + try rocksDB.put(column: .col1, key: "key1".data, value: "value1".data) + try rocksDB.put(column: .col2, key: "key1".data, value: "value2".data) + try rocksDB.put(column: .col3, key: "key1".data, value: "value3".data) + + #expect(try rocksDB.get(column: .col1, key: "key1".data) == "value1".data) + #expect(try rocksDB.get(column: .col2, key: "key1".data) == "value2".data) + #expect(try rocksDB.get(column: .col3, key: "key1".data) == "value3".data) + } + + @Test func testLargeValues() throws { + // Test handling of large values + let largeValue = Data((0 ..< 1_000_000).map { UInt8($0 % 256) }) + try rocksDB.put(column: .col1, key: "large".data, value: largeValue) + let retrieved = try rocksDB.get(column: .col1, key: "large".data) + #expect(retrieved == largeValue) + } + + @Test func testBatchOperationsAcrossColumns() throws { + // Test batch operations across different column families try rocksDB.batch(operations: [ - .delete(key: "123".data), - .put(key: "234".data, value: "wer".data), - .put(key: "345".data, value: "ert".data), - .delete(key: "234".data), - .put(key: "345".data, value: "ertert".data), + .put(column: .col1, key: "batch1".data, value: "value1".data), + .put(column: .col2, key: "batch2".data, value: "value2".data), + .put(column: .col3, key: "batch3".data, value: "value3".data), ]) - #expect(try rocksDB.get(key: "123".data) == nil) - #expect(try rocksDB.get(key: "234".data) == nil) - #expect(try rocksDB.get(key: "345".data) == "ertert".data) + #expect(try rocksDB.get(column: .col1, key: "batch1".data) == "value1".data) + #expect(try rocksDB.get(column: .col2, key: "batch2".data) == "value2".data) + #expect(try rocksDB.get(column: .col3, key: "batch3".data) == "value3".data) + } + + @Test func testEmptyValues() throws { + // Test handling of empty values + try rocksDB.put(column: .col1, key: "empty".data, value: Data()) + + let retrieved = try rocksDB.get(column: .col1, key: "empty".data) + #expect(retrieved?.isEmpty == true) + } + + @Test func testErrorConditions() throws { + // Test invalid operations + let invalidDB = try? RocksDB(path: URL(fileURLWithPath: "/nonexistent/path")) + #expect(invalidDB == nil) + + // Test deleting non-existent key + try rocksDB.delete(column: .col1, key: "nonexistent".data) + let value = try rocksDB.get(column: .col1, key: "nonexistent".data) + #expect(value == nil) } } diff --git a/Utils/Sources/Utils/Crypto/FFIUtils.swift b/Utils/Sources/Utils/Crypto/FFIUtils.swift index 6a58edc2..7eba3a28 100644 --- a/Utils/Sources/Utils/Crypto/FFIUtils.swift +++ b/Utils/Sources/Utils/Crypto/FFIUtils.swift @@ -1,6 +1,6 @@ import Foundation -enum FFIUtils { +public enum FFIUtils { private static func _call( data: [Data], out: inout Data?, @@ -8,7 +8,7 @@ enum FFIUtils { onErr: (Int) throws(E) -> Void ) throws(E) { func helper(data: ArraySlice, ptr: [(ptr: UnsafeRawPointer, count: UInt)]) -> Int { - if data.isEmpty { + guard let first = data.first else { if var outData = out { let res = outData.withUnsafeMutableBytes { (bufferPtr: UnsafeMutableRawBufferPointer) -> Int in guard let bufferAddress = bufferPtr.baseAddress else { @@ -22,7 +22,6 @@ enum FFIUtils { return fn(ptr, nil) } let rest = data.dropFirst() - let first = data.first! return first.withUnsafeBytes { (bufferPtr: UnsafeRawBufferPointer) -> Int in guard let bufferAddress = bufferPtr.baseAddress else { fatalError("unreachable: bufferPtr.baseAddress is nil") @@ -75,4 +74,22 @@ enum FFIUtils { _call(data: data, out: &out2, fn: { ptrs, out_buf in fn(ptrs, out_buf!) }, onErr: { err in fatalError("unreachable: \(err)") }) out = out2! } + + private static func _withCString( + fn: ([UnsafePointer?]) throws -> R, + str: ArraySlice, + ptrs: [UnsafePointer?] + ) rethrows -> R { + guard let first = str.first else { + return try fn(ptrs) + } + let rest = str.dropFirst() + return try first.withCString { ptr in + try _withCString(fn: fn, str: rest, ptrs: ptrs + [ptr]) + } + } + + public static func withCString(_ str: [String], fn: ([UnsafePointer?]) throws -> R) rethrows -> R { + try _withCString(fn: fn, str: str[...], ptrs: []) + } }