Skip to content

Commit

Permalink
support multiple column family
Browse files Browse the repository at this point in the history
  • Loading branch information
xlc committed Dec 5, 2024
1 parent a54168a commit dcce57b
Show file tree
Hide file tree
Showing 4 changed files with 185 additions and 60 deletions.
6 changes: 5 additions & 1 deletion Database/Sources/Database/Options.swift
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,13 @@ public struct Options: ~Copyable, Sendable {
rocksdb_options_set_create_if_missing(ptr.value, createIfMissing ? 1 : 0)
}

public func setLevelCompactionDynamicLevelBytes(levelCompactionDynamicLevelBytes: Bool) {
public func setLevelCompactionDynamicLevelBytes(_ levelCompactionDynamicLevelBytes: Bool) {
rocksdb_options_set_level_compaction_dynamic_level_bytes(ptr.value, levelCompactionDynamicLevelBytes ? 1 : 0)
}

public func setCreateIfMissingColumnFamilies(_ createIfMissingColumnFamilies: Bool) {
rocksdb_options_set_create_missing_column_families(ptr.value, createIfMissingColumnFamilies ? 1 : 0)
}
}

public struct WriteOptions: ~Copyable, Sendable {
Expand Down
113 changes: 81 additions & 32 deletions Database/Sources/Database/RocksDB.swift
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,12 @@ import Foundation
import rocksdb
import Utils

public final class RocksDB: Sendable {
public protocol ColumnFamilyKey: Sendable, CaseIterable, Hashable, RawRepresentable<UInt8> {}

public final class RocksDB<CFKey: ColumnFamilyKey>: Sendable {
public enum BatchOperation {
case delete(key: Data)
case put(key: Data, value: Data)
case delete(column: CFKey, key: Data)
case put(column: CFKey, key: Data, value: Data)
}

public enum Error: Swift.Error {
Expand All @@ -17,12 +19,12 @@ public final class RocksDB: Sendable {
case noData
}

private let dbOptions: Options
private let writeOptions: WriteOptions
private let readOptions: ReadOptions
private let db: SendableOpaquePointer
private let db: SafePointer
private let cfHandles: [SendableOpaquePointer]

public init(path: URL) throws(Error) {
public init(path: URL) throws {
let dbOptions = Options()

// TODO: starting from options here
Expand All @@ -32,22 +34,50 @@ public final class RocksDB: Sendable {
dbOptions.increaseParallelism(cpus: cpus)
dbOptions.optimizeLevelStyleCompaction(memtableMemoryBudget: 512 * 1024 * 1024) // 512 MB
dbOptions.setCreateIfMissing(true)
dbOptions.setCreateIfMissingColumnFamilies(true)

let cfOptions = Options()
cfOptions.setLevelCompactionDynamicLevelBytes(true)

var names = CFKey.allCases.map { "\($0)" }
// ensure always have a default column family
if !names.contains("default") {
names.insert("default", at: 0)
}
var cfOptionsList = names.map { _ in cfOptions.value as OpaquePointer? }

var outHandles = [OpaquePointer?](repeating: nil, count: names.count)

// open DB
db = try Self.call { err, _ in
rocksdb_open(dbOptions.value, path.path, &err).asSendable
} onErr: { message throws(Error) in
throw Error.openFailed(message: message)
let dbPtr = try FFIUtils.withCString(names) { cnames in
var cnames = cnames
return try Self.call { err, _ in
rocksdb_open_column_families(
dbOptions.value,
path.path,
Int32(names.count),
&cnames,
&cfOptionsList,
&outHandles,
&err
)
} onErr: { message throws in
throw Error.openFailed(message: message)
}
}

self.dbOptions = dbOptions
db = SafePointer(ptr: dbPtr!, free: rocksdb_close)

cfHandles = outHandles.map { $0!.asSendable }

writeOptions = WriteOptions()
readOptions = ReadOptions()
}

deinit {
rocksdb_close(db.value)
for handle in cfHandles {
rocksdb_column_family_handle_destroy(handle.value)
}
}
}

Expand All @@ -57,8 +87,8 @@ extension RocksDB {
private static func call<R>(
_ data: [Data],
fn: (inout UnsafeMutablePointer<Int8>?, [(ptr: UnsafeRawPointer, count: Int)]) -> R,
onErr: (String) throws(Error) -> Void
) throws(Error) -> R {
onErr: (String) throws -> Void
) throws -> R {
var err: UnsafeMutablePointer<Int8>?
defer {
free(err)
Expand Down Expand Up @@ -97,54 +127,71 @@ extension RocksDB {
private static func call<R>(
_ data: Data...,
fn: (inout UnsafeMutablePointer<Int8>?, [(ptr: UnsafeRawPointer, count: Int)]) -> R,
onErr: (String) throws(Error) -> Void
) throws(Error) -> R {
onErr: (String) throws -> Void
) throws -> R {
try call(data, fn: fn, onErr: onErr)
}

private static func call<R>(
_ data: Data...,
fn: ([(ptr: UnsafeRawPointer, count: Int)]) -> R
) throws(Error) -> R {
) throws -> R {
try call(data) { _, ptrs in
fn(ptrs)
} onErr: { _ throws(Error) in
} onErr: { _ throws in
// do nothing as it should never be called
}
}

private func getHandle(column: CFKey) -> OpaquePointer {
cfHandles[Int(column.rawValue)].value
}
}

// MARK: - public methods

extension RocksDB {
public func put(key: Data, value: Data) throws(Error) {
public func put(column: CFKey, key: Data, value: Data) throws {
let handle = getHandle(column: column)
try Self.call(key, value) { err, ptrs in
let key = ptrs[0]
let value = ptrs[1]
rocksdb_put(db.value, writeOptions.value, key.ptr, key.count, value.ptr, value.count, &err)
} onErr: { message throws(Error) in
rocksdb_put_cf(
db.value,
writeOptions.value,
handle,
key.ptr,
key.count,
value.ptr,
value.count,
&err
)
} onErr: { message throws in
throw Error.putFailed(message: message)
}
}

public func get(key: Data) throws -> Data? {
public func get(column: CFKey, key: Data) throws -> Data? {
var len = 0
let handle = getHandle(column: column)

let ret = try Self.call(key) { err, ptrs in
let key = ptrs[0]
return rocksdb_get(db.value, readOptions.value, key.ptr, key.count, &len, &err)
} onErr: { message throws(Error) in
return rocksdb_get_cf(db.value, readOptions.value, handle, key.ptr, key.count, &len, &err)
} onErr: { message throws in
throw Error.getFailed(message: message)
}

return ret.map { Data(bytesNoCopy: $0, count: len, deallocator: .free) }
}

public func delete(key: Data) throws {
public func delete(column: CFKey, key: Data) throws {
let handle = getHandle(column: column)

try Self.call(key) { err, ptrs in
let key = ptrs[0]
rocksdb_delete(db.value, writeOptions.value, key.ptr, key.count, &err)
} onErr: { message throws(Error) in
rocksdb_delete_cf(db.value, writeOptions.value, handle, key.ptr, key.count, &err)
} onErr: { message throws in
throw Error.deleteFailed(message: message)
}
}
Expand All @@ -155,25 +202,27 @@ extension RocksDB {

for operation in operations {
switch operation {
case let .delete(key):
case let .delete(column, key):
let handle = getHandle(column: column)
try Self.call(key) { ptrs in
let key = ptrs[0]
rocksdb_writebatch_delete(writeBatch, key.ptr, key.count)
rocksdb_writebatch_delete_cf(writeBatch, handle, key.ptr, key.count)
}

case let .put(key, value):
case let .put(column, key, value):
let handle = getHandle(column: column)
try Self.call(key, value) { ptrs in
let key = ptrs[0]
let value = ptrs[1]

rocksdb_writebatch_put(writeBatch, key.ptr, key.count, value.ptr, value.count)
rocksdb_writebatch_put_cf(writeBatch, handle, key.ptr, key.count, value.ptr, value.count)
}
}
}

try Self.call { err, _ in
rocksdb_write(db.value, writeOptions.value, writeBatch, &err)
} onErr: { message throws(Error) in
} onErr: { message throws in
throw Error.batchFailed(message: message)
}
}
Expand Down
103 changes: 79 additions & 24 deletions Database/Tests/DatabaseTests/RocksDBTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,13 @@ import Testing
@testable import Database

extension String {
var data: Data {
Data(utf8)
}
var data: Data { Data(utf8) }
}

enum Columns: UInt8, Sendable, ColumnFamilyKey {
case col1
case col2
case col3
}

final class RocksDBTests {
Expand All @@ -17,49 +21,100 @@ final class RocksDBTests {
return tmpDir.appendingPathComponent("\(UUID().uuidString)")
}()

var rocksDB: RocksDB!
var rocksDB: RocksDB<Columns>!

init() throws {
rocksDB = try RocksDB(path: path)
}

deinit {
rocksDB = nil // close it first
// then delete the files
rocksDB = nil
try! FileManager.default.removeItem(at: path)
}

@Test func basicOperations() throws {
#expect(try rocksDB.get(key: "123".data) == nil)
#expect(try rocksDB.get(column: .col1, key: "123".data) == nil)

try rocksDB.put(key: "123".data, value: "qwe".data)
try rocksDB.put(key: "234".data, value: "asd".data)
try rocksDB.put(column: .col1, key: "123".data, value: "qwe".data)
try rocksDB.put(column: .col1, key: "234".data, value: "asd".data)

#expect(try rocksDB.get(key: "123".data) == "qwe".data)
#expect(try rocksDB.get(key: "234".data) == "asd".data)
#expect(try rocksDB.get(column: .col1, key: "123".data) == "qwe".data)
#expect(try rocksDB.get(column: .col1, key: "234".data) == "asd".data)

try rocksDB.delete(key: "123".data)
try rocksDB.delete(column: .col1, key: "123".data)

#expect(try rocksDB.get(key: "123".data) == nil)
#expect(try rocksDB.get(column: .col1, key: "123".data) == nil)

try rocksDB.put(key: "234".data, value: "asdfg".data)
try rocksDB.put(column: .col1, key: "234".data, value: "asdfg".data)

#expect(try rocksDB.get(key: "234".data) == "asdfg".data)
#expect(try rocksDB.get(column: .col1, key: "234".data) == "asdfg".data)
}

@Test func testBatchOperations() throws {
try rocksDB.put(key: "123".data, value: "qwe".data)
try rocksDB.put(column: .col1, key: "123".data, value: "qwe".data)

try rocksDB.batch(operations: [
.delete(column: .col1, key: "123".data),
.put(column: .col1, key: "234".data, value: "wer".data),
.put(column: .col1, key: "345".data, value: "ert".data),
.delete(column: .col1, key: "234".data),
.put(column: .col1, key: "345".data, value: "ertert".data),
])

#expect(try rocksDB.get(column: .col1, key: "123".data) == nil)
#expect(try rocksDB.get(column: .col1, key: "234".data) == nil)
#expect(try rocksDB.get(column: .col1, key: "345".data) == "ertert".data)
}

@Test func testMultipleColumnFamilies() throws {
// Test operations across different column families
try rocksDB.put(column: .col1, key: "key1".data, value: "value1".data)
try rocksDB.put(column: .col2, key: "key1".data, value: "value2".data)
try rocksDB.put(column: .col3, key: "key1".data, value: "value3".data)

#expect(try rocksDB.get(column: .col1, key: "key1".data) == "value1".data)
#expect(try rocksDB.get(column: .col2, key: "key1".data) == "value2".data)
#expect(try rocksDB.get(column: .col3, key: "key1".data) == "value3".data)
}

@Test func testLargeValues() throws {
// Test handling of large values
let largeValue = Data((0 ..< 1_000_000).map { UInt8($0 % 256) })
try rocksDB.put(column: .col1, key: "large".data, value: largeValue)

let retrieved = try rocksDB.get(column: .col1, key: "large".data)
#expect(retrieved == largeValue)
}

@Test func testBatchOperationsAcrossColumns() throws {
// Test batch operations across different column families
try rocksDB.batch(operations: [
.delete(key: "123".data),
.put(key: "234".data, value: "wer".data),
.put(key: "345".data, value: "ert".data),
.delete(key: "234".data),
.put(key: "345".data, value: "ertert".data),
.put(column: .col1, key: "batch1".data, value: "value1".data),
.put(column: .col2, key: "batch2".data, value: "value2".data),
.put(column: .col3, key: "batch3".data, value: "value3".data),
])

#expect(try rocksDB.get(key: "123".data) == nil)
#expect(try rocksDB.get(key: "234".data) == nil)
#expect(try rocksDB.get(key: "345".data) == "ertert".data)
#expect(try rocksDB.get(column: .col1, key: "batch1".data) == "value1".data)
#expect(try rocksDB.get(column: .col2, key: "batch2".data) == "value2".data)
#expect(try rocksDB.get(column: .col3, key: "batch3".data) == "value3".data)
}

@Test func testEmptyValues() throws {
// Test handling of empty values
try rocksDB.put(column: .col1, key: "empty".data, value: Data())

let retrieved = try rocksDB.get(column: .col1, key: "empty".data)
#expect(retrieved?.isEmpty == true)
}

@Test func testErrorConditions() throws {
// Test invalid operations
let invalidDB = try? RocksDB<Columns>(path: URL(fileURLWithPath: "/nonexistent/path"))
#expect(invalidDB == nil)

// Test deleting non-existent key
try rocksDB.delete(column: .col1, key: "nonexistent".data)
let value = try rocksDB.get(column: .col1, key: "nonexistent".data)
#expect(value == nil)
}
}
Loading

0 comments on commit dcce57b

Please sign in to comment.