From 80e6ed003514169f34a8c5815e505a73190df42b Mon Sep 17 00:00:00 2001 From: Xiliang Chen Date: Wed, 11 Dec 2024 21:49:41 +1300 Subject: [PATCH] Db store (#247) * move files * stores * implement most of the methods * implement read all * link everything * fix tests * can't get upper bound working * remove useless code * add store test * fix * test with rocksdb and many issue fixes * avoid JamCoder overhead in most of the stores * fix codec * fix warnings * update tests --- .gitignore | 2 + .../BlockchainDataProvider.swift | 33 ++- .../BlockchainDataProviderProtocol.swift | 19 +- .../InMemoryDataProvider.swift | 30 +- .../Blockchain/RuntimeProtocols/Runtime.swift | 6 +- .../Sources/Blockchain/State/State.swift | 1 + .../Blockchain/State/StateBackend.swift | 8 +- .../Sources/Blockchain/State/StateTrie.swift | 7 +- Boka/Sources/Boka.swift | 11 +- Codec/Sources/Codec/JamDecoder.swift | 34 ++- Codec/Sources/Codec/JamEncoder.swift | 24 +- Codec/Tests/CodecTests/DecoderTests.swift | 11 + Codec/Tests/CodecTests/EncoderTests.swift | 10 + Database/Package.swift | 20 +- Database/Sources/Database/BinaryCoder.swift | 97 +++++++ Database/Sources/Database/JamCoder.swift | 31 +++ Database/Sources/Database/Options.swift | 52 ---- Database/Sources/Database/RawCoder.swift | 19 ++ .../Sources/Database/RocksDBBackend.swift | 260 +++++++++++++++++- Database/Sources/Database/Stores.swift | 36 +++ Database/Sources/RocksDBSwift/Options.swift | 128 +++++++++ .../{Database => RocksDBSwift}/RocksDB.swift | 45 ++- Database/Sources/RocksDBSwift/Store.swift | 59 ++++ .../DatabaseTests/RocksDBBackendTests.swift | 102 +++++++ .../RocksDBTests.swift | 81 ++++-- .../Tests/RocksDBSwiftTests/StoreTests.swift | 152 ++++++++++ Node/Package.swift | 2 + Node/Sources/Node/Node.swift | 46 +++- Node/Sources/Node/ValidatorNode.swift | 7 +- Node/Tests/NodeTests/NodeTests.swift | 59 +++- Node/Tests/NodeTests/Topology.swift | 7 +- Utils/Tests/UtilsTests/Data32Tests.swift | 11 + .../EventBus/DispatcherMiddlewareTests.swift | 4 - .../Merklization/MerklizationTests.swift | 3 +- 34 files changed, 1232 insertions(+), 185 deletions(-) create mode 100644 Database/Sources/Database/BinaryCoder.swift create mode 100644 Database/Sources/Database/JamCoder.swift delete mode 100644 Database/Sources/Database/Options.swift create mode 100644 Database/Sources/Database/RawCoder.swift create mode 100644 Database/Sources/Database/Stores.swift create mode 100644 Database/Sources/RocksDBSwift/Options.swift rename Database/Sources/{Database => RocksDBSwift}/RocksDB.swift (83%) create mode 100644 Database/Sources/RocksDBSwift/Store.swift create mode 100644 Database/Tests/DatabaseTests/RocksDBBackendTests.swift rename Database/Tests/{DatabaseTests => RocksDBSwiftTests}/RocksDBTests.swift (55%) create mode 100644 Database/Tests/RocksDBSwiftTests/StoreTests.swift diff --git a/.gitignore b/.gitignore index 82dfe354..581b1b7e 100644 --- a/.gitignore +++ b/.gitignore @@ -26,3 +26,5 @@ settings.json c_cpp_properties.json Tools/openrpc.json + +tmp diff --git a/Blockchain/Sources/Blockchain/BlockchainDataProvider/BlockchainDataProvider.swift b/Blockchain/Sources/Blockchain/BlockchainDataProvider/BlockchainDataProvider.swift index 4dcc5b0d..d42c9a0f 100644 --- a/Blockchain/Sources/Blockchain/BlockchainDataProvider/BlockchainDataProvider.swift +++ b/Blockchain/Sources/Blockchain/BlockchainDataProvider/BlockchainDataProvider.swift @@ -9,6 +9,11 @@ public struct HeadInfo: Sendable { public var number: UInt32 } +public enum BlockchainDataProviderError: Error, Equatable { + case noData(hash: Data32) + case uncanonical(hash: Data32) +} + public actor BlockchainDataProvider: Sendable { public private(set) var bestHead: HeadInfo public private(set) var finalizedHead: HeadInfo @@ -18,21 +23,21 @@ public actor BlockchainDataProvider: Sendable { let heads = try await dataProvider.getHeads() var bestHead = HeadInfo(hash: dataProvider.genesisBlockHash, timeslot: 0, number: 0) for head in heads { - let header = try await dataProvider.getHeader(hash: head) + let header = try await dataProvider.getHeader(hash: head).unwrap() if header.value.timeslot > bestHead.timeslot { - let number = try await dataProvider.getBlockNumber(hash: head) + let number = try await dataProvider.getBlockNumber(hash: head).unwrap() bestHead = HeadInfo(hash: head, timeslot: header.value.timeslot, number: number) } } self.bestHead = bestHead - let finalizedHeadHash = try await dataProvider.getFinalizedHead() + let finalizedHeadHash = try await dataProvider.getFinalizedHead().unwrap() finalizedHead = try await HeadInfo( hash: finalizedHeadHash, - timeslot: dataProvider.getHeader(hash: finalizedHeadHash).value.timeslot, - number: dataProvider.getBlockNumber(hash: finalizedHeadHash) + timeslot: dataProvider.getHeader(hash: finalizedHeadHash).unwrap().value.timeslot, + number: dataProvider.getBlockNumber(hash: finalizedHeadHash).unwrap() ) self.dataProvider = dataProvider @@ -44,7 +49,7 @@ public actor BlockchainDataProvider: Sendable { try await dataProvider.updateHead(hash: block.hash, parent: block.header.parentHash) if block.header.timeslot > bestHead.timeslot { - let number = try await dataProvider.getBlockNumber(hash: block.hash) + let number = try await getBlockNumber(hash: block.hash) bestHead = HeadInfo(hash: block.hash, timeslot: block.header.timeslot, number: number) } @@ -66,19 +71,19 @@ extension BlockchainDataProvider { } public func getBlockNumber(hash: Data32) async throws -> UInt32 { - try await dataProvider.getBlockNumber(hash: hash) + try await dataProvider.getBlockNumber(hash: hash).unwrap(orError: BlockchainDataProviderError.noData(hash: hash)) } public func getHeader(hash: Data32) async throws -> HeaderRef { - try await dataProvider.getHeader(hash: hash) + try await dataProvider.getHeader(hash: hash).unwrap(orError: BlockchainDataProviderError.noData(hash: hash)) } public func getBlock(hash: Data32) async throws -> BlockRef { - try await dataProvider.getBlock(hash: hash) + try await dataProvider.getBlock(hash: hash).unwrap(orError: BlockchainDataProviderError.noData(hash: hash)) } public func getState(hash: Data32) async throws -> StateRef { - try await dataProvider.getState(hash: hash) + try await dataProvider.getState(hash: hash).unwrap(orError: BlockchainDataProviderError.noData(hash: hash)) } public func getHeads() async throws -> Set { @@ -122,7 +127,7 @@ extension BlockchainDataProvider { logger.debug("setting finalized head: \(hash)") let oldFinalizedHead = finalizedHead - let number = try await dataProvider.getBlockNumber(hash: hash) + let number = try await getBlockNumber(hash: hash) var hashToCheck = hash var hashToCheckNumber = number @@ -132,11 +137,11 @@ extension BlockchainDataProvider { logger.trace("purge block: \(hash)") try await dataProvider.remove(hash: hash) } - hashToCheck = try await dataProvider.getHeader(hash: hashToCheck).value.parentHash + hashToCheck = try await getHeader(hash: hashToCheck).value.parentHash hashToCheckNumber -= 1 } - let header = try await dataProvider.getHeader(hash: hash) + let header = try await getHeader(hash: hash) finalizedHead = HeadInfo(hash: hash, timeslot: header.value.timeslot, number: number) try await dataProvider.setFinalizedHead(hash: hash) } @@ -152,6 +157,6 @@ extension BlockchainDataProvider { } public func getBestState() async throws -> StateRef { - try await dataProvider.getState(hash: bestHead.hash) + try await dataProvider.getState(hash: bestHead.hash).unwrap(orError: BlockchainDataProviderError.noData(hash: bestHead.hash)) } } diff --git a/Blockchain/Sources/Blockchain/BlockchainDataProvider/BlockchainDataProviderProtocol.swift b/Blockchain/Sources/Blockchain/BlockchainDataProvider/BlockchainDataProviderProtocol.swift index c7c5cdb9..f59e3e08 100644 --- a/Blockchain/Sources/Blockchain/BlockchainDataProvider/BlockchainDataProviderProtocol.swift +++ b/Blockchain/Sources/Blockchain/BlockchainDataProvider/BlockchainDataProviderProtocol.swift @@ -1,28 +1,19 @@ import Utils -public enum BlockchainDataProviderError: Error, Equatable { - case noData(hash: Data32) - case uncanonical(hash: Data32) -} - public protocol BlockchainDataProviderProtocol: Sendable { func hasBlock(hash: Data32) async throws -> Bool func hasState(hash: Data32) async throws -> Bool func isHead(hash: Data32) async throws -> Bool - func getBlockNumber(hash: Data32) async throws -> UInt32 + func getBlockNumber(hash: Data32) async throws -> UInt32? - /// throw BlockchainDataProviderError.noData if not found - func getHeader(hash: Data32) async throws -> HeaderRef + func getHeader(hash: Data32) async throws -> HeaderRef? - /// throw BlockchainDataProviderError.noData if not found - func getBlock(hash: Data32) async throws -> BlockRef + func getBlock(hash: Data32) async throws -> BlockRef? - /// throw BlockchainDataProviderError.noData if not found - func getState(hash: Data32) async throws -> StateRef + func getState(hash: Data32) async throws -> StateRef? - /// throw BlockchainDataProviderError.noData if not found - func getFinalizedHead() async throws -> Data32 + func getFinalizedHead() async throws -> Data32? func getHeads() async throws -> Set /// return empty set if not found diff --git a/Blockchain/Sources/Blockchain/BlockchainDataProvider/InMemoryDataProvider.swift b/Blockchain/Sources/Blockchain/BlockchainDataProvider/InMemoryDataProvider.swift index 8b90dd6c..5ddd0dc9 100644 --- a/Blockchain/Sources/Blockchain/BlockchainDataProvider/InMemoryDataProvider.swift +++ b/Blockchain/Sources/Blockchain/BlockchainDataProvider/InMemoryDataProvider.swift @@ -34,35 +34,23 @@ extension InMemoryDataProvider: BlockchainDataProviderProtocol { heads.contains(hash) } - public func getBlockNumber(hash: Data32) async throws -> UInt32 { - guard let number = numberByHash[hash] else { - throw BlockchainDataProviderError.noData(hash: hash) - } - return number + public func getBlockNumber(hash: Data32) async throws -> UInt32? { + numberByHash[hash] } - public func getHeader(hash: Data32) throws -> HeaderRef { - guard let header = blockByHash[hash]?.header.asRef() else { - throw BlockchainDataProviderError.noData(hash: hash) - } - return header + public func getHeader(hash: Data32) throws -> HeaderRef? { + blockByHash[hash]?.header.asRef() } - public func getBlock(hash: Data32) throws -> BlockRef { - guard let block = blockByHash[hash] else { - throw BlockchainDataProviderError.noData(hash: hash) - } - return block + public func getBlock(hash: Data32) throws -> BlockRef? { + blockByHash[hash] } - public func getState(hash: Data32) throws -> StateRef { - guard let state = stateByBlockHash[hash] else { - throw BlockchainDataProviderError.noData(hash: hash) - } - return state + public func getState(hash: Data32) throws -> StateRef? { + stateByBlockHash[hash] } - public func getFinalizedHead() -> Data32 { + public func getFinalizedHead() -> Data32? { finalizedHead } diff --git a/Blockchain/Sources/Blockchain/RuntimeProtocols/Runtime.swift b/Blockchain/Sources/Blockchain/RuntimeProtocols/Runtime.swift index f08814ed..65dc51bb 100644 --- a/Blockchain/Sources/Blockchain/RuntimeProtocols/Runtime.swift +++ b/Blockchain/Sources/Blockchain/RuntimeProtocols/Runtime.swift @@ -14,7 +14,7 @@ public final class Runtime { case invalidReportAuthorizer case encodeError(any Swift.Error) case invalidExtrinsicHash - case invalidParentHash + case invalidParentHash(state: Data32, header: Data32) case invalidHeaderStateRoot case invalidHeaderEpochMarker case invalidHeaderWinningTickets @@ -53,7 +53,7 @@ public final class Runtime { let block = block.value guard block.header.parentHash == state.value.lastBlockHash else { - throw Error.invalidParentHash + throw Error.invalidParentHash(state: state.value.lastBlockHash, header: block.header.parentHash) } guard block.header.priorStateRoot == context.stateRoot else { @@ -189,6 +189,8 @@ public final class Runtime { // after reports as it need old recent history try updateRecentHistory(block: block, state: &newState) + + try await newState.save() } catch let error as Error { throw error } catch let error as SafroleError { diff --git a/Blockchain/Sources/Blockchain/State/State.swift b/Blockchain/Sources/Blockchain/State/State.swift index 35851112..32e9a8c7 100644 --- a/Blockchain/Sources/Blockchain/State/State.swift +++ b/Blockchain/Sources/Blockchain/State/State.swift @@ -220,6 +220,7 @@ public struct State: Sendable { // TODO: we don't really want to write to the underlying backend here // instead, it should be writting to a in memory layer // and when actually saving the state, save the in memory layer to the presistent store + @discardableResult public func save() async throws -> Data32 { try await backend.write(layer.toKV()) return await backend.rootHash diff --git a/Blockchain/Sources/Blockchain/State/StateBackend.swift b/Blockchain/Sources/Blockchain/State/StateBackend.swift index d9d05da0..a34feeee 100644 --- a/Blockchain/Sources/Blockchain/State/StateBackend.swift +++ b/Blockchain/Sources/Blockchain/State/StateBackend.swift @@ -3,7 +3,7 @@ import Foundation import Utils public enum StateBackendError: Error { - case missingState + case missingState(key: Sendable) case invalidData } @@ -35,7 +35,7 @@ public final class StateBackend: Sendable { if Key.optional { return nil } - throw StateBackendError.missingState + throw StateBackendError.missingState(key: key) } public func batchRead(_ keys: [any StateKey]) async throws -> [(key: any StateKey, value: (Codable & Sendable)?)] { @@ -74,4 +74,8 @@ public final class StateBackend: Sendable { return nil } } + + public func debugPrint() async throws { + try await trie.debugPrint() + } } diff --git a/Blockchain/Sources/Blockchain/State/StateTrie.swift b/Blockchain/Sources/Blockchain/State/StateTrie.swift index 40e26a47..7c7dffb5 100644 --- a/Blockchain/Sources/Blockchain/State/StateTrie.swift +++ b/Blockchain/Sources/Blockchain/State/StateTrie.swift @@ -24,7 +24,7 @@ private struct TrieNode { right = Data32(data.data.suffix(32))! self.isNew = isNew rawValue = nil - switch data.data[0] & 0b1100_0000 { + switch data.data.first! & 0b1100_0000 { case 0b1000_0000: type = .embeddedLeaf case 0b1100_0000: @@ -66,7 +66,7 @@ private struct TrieNode { guard type == .embeddedLeaf else { return nil } - let len = left.data[0] & 0b0011_1111 + let len = left.data.first! & 0b0011_1111 return right.data[relative: 0 ..< Int(len)] } @@ -85,7 +85,7 @@ private struct TrieNode { static func branch(left: Data32, right: Data32) -> TrieNode { var left = left.data - left[0] = left[0] & 0b0111_1111 // clear the highest bit + left[left.startIndex] = left[left.startIndex] & 0b0111_1111 // clear the highest bit return .init(left: Data32(left)!, right: right, type: .branch, isNew: true, rawValue: nil) } } @@ -352,6 +352,7 @@ public actor StateTrie: Sendable { } } + logger.info("Root hash: \(rootHash.toHexString())") try await printNode(rootHash, depth: 0) } } diff --git a/Boka/Sources/Boka.swift b/Boka/Sources/Boka.swift index 1ff183eb..f7b67ddd 100644 --- a/Boka/Sources/Boka.swift +++ b/Boka/Sources/Boka.swift @@ -116,9 +116,11 @@ struct Boka: AsyncParsableCommand { logger.info("Node name: \(name)") } - if let basePath { - logger.info("Base path: \(basePath)") - } + let database: Database = basePath.map { + var path = URL(fileURLWithPath: $0) + path.append(path: "db") + return .rocksDB(path: path) + } ?? .inMemory logger.info("Peers: \(peers)") @@ -168,7 +170,8 @@ struct Boka: AsyncParsableCommand { network: networkConfig, peers: peers, local: local, - name: name + name: name, + database: database ) let node: Node = if validator { diff --git a/Codec/Sources/Codec/JamDecoder.swift b/Codec/Sources/Codec/JamDecoder.swift index 5daa63d5..2f6339c0 100644 --- a/Codec/Sources/Codec/JamDecoder.swift +++ b/Codec/Sources/Codec/JamDecoder.swift @@ -40,7 +40,7 @@ public class JamDecoder { } } -protocol ArrayWrapper: Collection where Element: Decodable { +private protocol ArrayWrapper: Collection where Element: Decodable { static func from(array: [Element]) -> Self } @@ -50,6 +50,16 @@ extension Array: ArrayWrapper where Element: Decodable { } } +private protocol OptionalWrapper: Decodable { + static var wrappedType: Decodable.Type { get } +} + +extension Optional: OptionalWrapper where Wrapped: Decodable { + static var wrappedType: Decodable.Type { + Wrapped.self + } +} + private class DecodeContext: Decoder { struct PushCodingPath: ~Copyable { let decoder: DecodeContext @@ -160,8 +170,28 @@ private class DecodeContext: Decoder { } } + fileprivate func decodeOptional(_ type: T.Type, key: CodingKey?) throws -> T? { + let byte = try input.read() + switch byte { + case 0: + return nil + case 1: + return try decode(type, key: key) + default: + throw DecodingError.dataCorrupted( + DecodingError.Context( + codingPath: codingPath, + debugDescription: "Invalid boolean value: \(byte)" + ) + ) + } + } + fileprivate func decode(_ type: T.Type, key: CodingKey?) throws -> T { - if type == Data.self { + // optional hanlding must be first to avoid type coercion + if let type = type as? any OptionalWrapper.Type { + try decodeOptional(type.wrappedType, key: key) as! T + } else if type == Data.self { try decodeData(codingPath: codingPath) as Data as! T } else if type == [UInt8].self { try decodeData(codingPath: codingPath) as [UInt8] as! T diff --git a/Codec/Sources/Codec/JamEncoder.swift b/Codec/Sources/Codec/JamEncoder.swift index 756148df..59796f1e 100644 --- a/Codec/Sources/Codec/JamEncoder.swift +++ b/Codec/Sources/Codec/JamEncoder.swift @@ -30,6 +30,16 @@ public class JamEncoder { } } +private protocol OptionalWrapper: Encodable { + var wrapped: Encodable? { get } +} + +extension Optional: OptionalWrapper where Wrapped: Encodable { + var wrapped: Encodable? { + self + } +} + private class EncodeContext: Encoder { var codingPath: [CodingKey] = [] var userInfo: [CodingUserInfoKey: Any] = [ @@ -85,18 +95,20 @@ private class EncodeContext: Encoder { } } - fileprivate func encodeOptional(_ value: Encodable) throws { - let mirror = Mirror(reflecting: value) - if let someValue = mirror.children.first?.value as? Encodable { + fileprivate func encodeOptional(_ value: OptionalWrapper) throws { + if let value = value.wrapped { data.append(UInt8(1)) // Encode presence flag - try encode(someValue) // Encode the unwrapped value + try encode(value) // Encode the unwrapped value } else { data.append(UInt8(0)) // Encode absence flag } } fileprivate func encode(_ value: some Encodable) throws { - if let value = value as? Data { + // optional hanlding must be first to avoid type coercion + if let value = value as? OptionalWrapper { + try encodeOptional(value) + } else if let value = value as? Data { encodeData(value, lengthPrefix: true) } else if let value = value as? [UInt8] { encodeData(value) @@ -104,8 +116,6 @@ private class EncodeContext: Encoder { encodeData(value.data, lengthPrefix: false) } else if let value = value as? [Encodable] { try encodeArray(value) - } else if Mirror(reflecting: value).displayStyle == .optional { - try encodeOptional(value) } else { try value.encode(to: self) } diff --git a/Codec/Tests/CodecTests/DecoderTests.swift b/Codec/Tests/CodecTests/DecoderTests.swift index 277ca2ab..711cd6db 100644 --- a/Codec/Tests/CodecTests/DecoderTests.swift +++ b/Codec/Tests/CodecTests/DecoderTests.swift @@ -182,6 +182,17 @@ struct DecoderTests { #expect(decodedNone == .none) } + @Test func decodeOptionalData() throws { + let encodedSome = Data([1, 3, 1, 2, 3]) + let encodedNone = Data([0]) + + let decodedSome = try JamDecoder.decode(Data?.self, from: encodedSome) + let decodedNone = try JamDecoder.decode(Data?.self, from: encodedNone) + + #expect(decodedSome == .some(Data([1, 2, 3]))) + #expect(decodedNone == .none) + } + @Test func decodeFixedWidthInteger() throws { var encodedInt8 = Data([251]) let encodedUInt64 = Data([21, 205, 91, 7, 0, 0, 0, 0]) diff --git a/Codec/Tests/CodecTests/EncoderTests.swift b/Codec/Tests/CodecTests/EncoderTests.swift index 0996ea9c..d81cb8bf 100644 --- a/Codec/Tests/CodecTests/EncoderTests.swift +++ b/Codec/Tests/CodecTests/EncoderTests.swift @@ -379,6 +379,16 @@ struct EncoderTests { #expect(encodedNone == Data([0])) // None encoded as 1 byte (0) } + @Test func encodeOptionalData() throws { + let optionalValue: Data? = Data([1, 2, 3]) + let encodedSome = try JamEncoder.encode(optionalValue) + + let encodedNone = try JamEncoder.encode(Data?.none) + + #expect(encodedSome == Data([1, 3, 1, 2, 3])) // Optional with value encoded + #expect(encodedNone == Data([0])) // None encoded as 1 byte (0) + } + @Test func encodeInt() throws { let intValue = 123_456_789 let encoded = try JamEncoder.encode(intValue) diff --git a/Database/Package.swift b/Database/Package.swift index 92d8e400..e820ad24 100644 --- a/Database/Package.swift +++ b/Database/Package.swift @@ -17,17 +17,24 @@ let package = Package( ], dependencies: [ .package(path: "../Blockchain"), + .package(path: "../Codec"), .package(path: "../Utils"), .package(url: "https://github.com/apple/swift-testing.git", branch: "0.10.0"), ], targets: [ - // Targets are the basic building blocks of a package, defining a module or a test suite. - // Targets can depend on other targets in this package and products from dependencies. .target( name: "Database", dependencies: [ - "rocksdb", + "RocksDBSwift", "Blockchain", + "Codec", + "Utils", + ] + ), + .target( + name: "RocksDBSwift", + dependencies: [ + "rocksdb", "Utils", ], linkerSettings: [ @@ -49,6 +56,13 @@ let package = Package( .product(name: "Testing", package: "swift-testing"), ] ), + .testTarget( + name: "RocksDBSwiftTests", + dependencies: [ + "RocksDBSwift", + .product(name: "Testing", package: "swift-testing"), + ] + ), ], swiftLanguageModes: [.version("6")] ) diff --git a/Database/Sources/Database/BinaryCoder.swift b/Database/Sources/Database/BinaryCoder.swift new file mode 100644 index 00000000..71434f44 --- /dev/null +++ b/Database/Sources/Database/BinaryCoder.swift @@ -0,0 +1,97 @@ +import Blockchain +import Foundation +import RocksDBSwift +import Utils + +protocol BinaryCodable { + func encode() throws -> Data + static func decode(from data: Data) throws -> Self +} + +extension Data: BinaryCodable { + func encode() throws -> Data { + self + } + + static func decode(from data: Data) throws -> Data { + data + } +} + +extension Data32: BinaryCodable { + func encode() throws -> Data { + data + } + + static func decode(from data: Data) throws -> Data32 { + try Data32(data).unwrap() + } +} + +extension UInt32: BinaryCodable { + static func decode(from data: Data) throws -> Self { + guard data.count == 4 else { + throw DecodingError.dataCorrupted( + DecodingError.Context( + codingPath: [], + debugDescription: "Invalid data length" + ) + ) + } + return data.withUnsafeBytes { ptr in + ptr.loadUnaligned(as: Self.self) + } + } +} + +extension Set: BinaryCodable { + func encode() throws -> Data { + var data = Data(capacity: count * 32) + for element in self { + data.append(element.data) + } + return data + } + + static func decode(from data: Data) throws -> Set { + guard data.count % 32 == 0 else { + throw DecodingError.dataCorrupted( + DecodingError.Context( + codingPath: [], + debugDescription: "Invalid data length" + ) + ) + } + var set = Set() + for i in stride(from: 0, to: data.count, by: 32) { + set.insert(Data32(data[relative: i ..< i + 32])!) + } + return set + } +} + +struct BinaryCoder: StoreCoder { + typealias Key = Key + typealias Value = Value + + private let config: ProtocolConfigRef + private let prefix: Data? + + init(config: ProtocolConfigRef, prefix: Data? = nil) { + self.config = config + self.prefix = prefix + } + + func encode(key: Key) throws -> Data { + let encodedKey = try key.encode() + return prefix.map { $0 + encodedKey } ?? encodedKey + } + + func encode(value: Value) throws -> Data { + try value.encode() + } + + func decode(data: Data) throws -> Value { + try Value.decode(from: data) + } +} diff --git a/Database/Sources/Database/JamCoder.swift b/Database/Sources/Database/JamCoder.swift new file mode 100644 index 00000000..8541d21a --- /dev/null +++ b/Database/Sources/Database/JamCoder.swift @@ -0,0 +1,31 @@ +import Blockchain +import Codec +import Foundation +import RocksDBSwift + +struct JamCoder: StoreCoder { + typealias Key = Key + typealias Value = Value + + private let config: ProtocolConfigRef + private let prefix: Data + + init(config: ProtocolConfigRef, prefix: Data = Data()) { + self.config = config + self.prefix = prefix + } + + func encode(key: Key) throws -> Data { + let encoder = JamEncoder(prefix) + try encoder.encode(key) + return encoder.data + } + + func encode(value: Value) throws -> Data { + try JamEncoder.encode(value) + } + + func decode(data: Data) throws -> Value { + try JamDecoder.decode(Value.self, from: data, withConfig: config) + } +} diff --git a/Database/Sources/Database/Options.swift b/Database/Sources/Database/Options.swift deleted file mode 100644 index 011e9f9d..00000000 --- a/Database/Sources/Database/Options.swift +++ /dev/null @@ -1,52 +0,0 @@ -import rocksdb -import Utils - -public struct Options: ~Copyable, Sendable { - let ptr: SafePointer - - var value: OpaquePointer { ptr.value } - - public init() { - ptr = .init(ptr: rocksdb_options_create(), free: rocksdb_options_destroy) - } - - public func increaseParallelism(cpus: Int) { - rocksdb_options_increase_parallelism(ptr.value, Int32(cpus)) - } - - public func optimizeLevelStyleCompaction(memtableMemoryBudget: UInt64) { - rocksdb_options_optimize_level_style_compaction(ptr.value, memtableMemoryBudget) - } - - public func setCreateIfMissing(_ createIfMissing: Bool) { - rocksdb_options_set_create_if_missing(ptr.value, createIfMissing ? 1 : 0) - } - - public func setLevelCompactionDynamicLevelBytes(_ levelCompactionDynamicLevelBytes: Bool) { - rocksdb_options_set_level_compaction_dynamic_level_bytes(ptr.value, levelCompactionDynamicLevelBytes ? 1 : 0) - } - - public func setCreateIfMissingColumnFamilies(_ createIfMissingColumnFamilies: Bool) { - rocksdb_options_set_create_missing_column_families(ptr.value, createIfMissingColumnFamilies ? 1 : 0) - } -} - -public struct WriteOptions: ~Copyable, Sendable { - let ptr: SafePointer - - var value: OpaquePointer { ptr.value } - - public init() { - ptr = .init(ptr: rocksdb_writeoptions_create(), free: rocksdb_writeoptions_destroy) - } -} - -public struct ReadOptions: ~Copyable, Sendable { - let ptr: SafePointer - - var value: OpaquePointer { ptr.value } - - public init() { - ptr = .init(ptr: rocksdb_readoptions_create(), free: rocksdb_readoptions_destroy) - } -} diff --git a/Database/Sources/Database/RawCoder.swift b/Database/Sources/Database/RawCoder.swift new file mode 100644 index 00000000..cec2631a --- /dev/null +++ b/Database/Sources/Database/RawCoder.swift @@ -0,0 +1,19 @@ +import Foundation +import RocksDBSwift + +struct RawCoder: StoreCoder { + typealias Key = Data + typealias Value = Data + + func encode(key: Key) throws -> Data { + key + } + + func encode(value: Value) throws -> Data { + value + } + + func decode(data: Data) throws -> Value { + data + } +} diff --git a/Database/Sources/Database/RocksDBBackend.swift b/Database/Sources/Database/RocksDBBackend.swift index 78f02e12..83034f1f 100644 --- a/Database/Sources/Database/RocksDBBackend.swift +++ b/Database/Sources/Database/RocksDBBackend.swift @@ -1,27 +1,265 @@ import Blockchain +import Codec import Foundation +import RocksDBSwift +import TracingUtils import Utils -public final class RocksDBBackend: StateBackendProtocol { - public init() {} +private let logger = Logger(label: "RocksDBBackend") - public func read(key _: Data) async throws -> Data? { - fatalError("unimplemented") +public enum RocksDBBackendError: Error { + case genesisHashMismatch(expected: Data32, actual: Data) +} + +public final class RocksDBBackend: Sendable { + private let config: ProtocolConfigRef + private let db: RocksDB + private let meta: Store + private let blocks: Store> + private let blockHashByTimeslot: Store>> + private let blockHashByNumber: Store>> + private let blockNumberByHash: Store> + private let stateRootByHash: Store> + private let stateTrie: Store> + private let stateValue: Store> + private let stateRefs: Store> + private let stateRefsRaw: Store> + + public let genesisBlockHash: Data32 + + public init(path: URL, config: ProtocolConfigRef, genesisBlock: BlockRef, genesisStateData: [Data32: Data]) async throws { + self.config = config + db = try RocksDB(path: path) + meta = Store(db: db, column: .meta, coder: RawCoder()) + blocks = Store(db: db, column: .blocks, coder: JamCoder(config: config)) + blockHashByTimeslot = Store(db: db, column: .blockIndexes, coder: BinaryCoder(config: config, prefix: Data([0]))) + blockHashByNumber = Store(db: db, column: .blockIndexes, coder: BinaryCoder(config: config, prefix: Data([1]))) + blockNumberByHash = Store(db: db, column: .blockIndexes, coder: BinaryCoder(config: config, prefix: Data([2]))) + stateRootByHash = Store(db: db, column: .blockIndexes, coder: BinaryCoder(config: config, prefix: Data([3]))) + stateTrie = Store(db: db, column: .state, coder: BinaryCoder(config: config, prefix: Data([0]))) + stateValue = Store(db: db, column: .state, coder: BinaryCoder(config: config, prefix: Data([1]))) + stateRefs = Store(db: db, column: .stateRefs, coder: BinaryCoder(config: config, prefix: Data([0]))) + stateRefsRaw = Store(db: db, column: .stateRefs, coder: BinaryCoder(config: config, prefix: Data([1]))) + + genesisBlockHash = genesisBlock.hash + + let genesis = try meta.get(key: MetaKey.genesisHash.key) + if let genesis { + guard genesis == genesisBlockHash.data else { + throw RocksDBBackendError.genesisHashMismatch(expected: genesisBlockHash, actual: genesis) + } + + logger.trace("DB loaded") + } else { + // must be a new db + try meta.put(key: MetaKey.genesisHash.key, value: genesisBlockHash.data) + try await add(block: genesisBlock) + let backend = StateBackend(self, config: config, rootHash: Data32()) + try await backend.writeRaw(Array(genesisStateData)) + let rootHash = await backend.rootHash + try stateRootByHash.put(key: genesisBlockHash, value: rootHash) + try setHeads([genesisBlockHash]) + try await setFinalizedHead(hash: genesisBlockHash) + + logger.trace("New DB initialized") + } + } + + private func setHeads(_ heads: Set) throws { + logger.trace("setHeads() \(heads)") + try meta.put(key: MetaKey.heads.key, value: JamEncoder.encode(heads)) + } +} + +extension RocksDBBackend: BlockchainDataProviderProtocol { + public func hasBlock(hash: Data32) async throws -> Bool { + try blocks.exists(key: hash) + } + + public func hasState(hash: Data32) async throws -> Bool { + try stateRootByHash.exists(key: hash) + } + + public func isHead(hash: Data32) async throws -> Bool { + try await getHeads().contains(hash) + } + + public func getBlockNumber(hash: Data32) async throws -> UInt32? { + try blockNumberByHash.get(key: hash) + } + + public func getHeader(hash: Data32) async throws -> HeaderRef? { + try await getBlock(hash: hash)?.header.asRef() + } + + public func getBlock(hash: Data32) async throws -> BlockRef? { + try blocks.get(key: hash) + } + + public func getState(hash: Data32) async throws -> StateRef? { + logger.trace("getState() \(hash)") + + guard let rootHash = try stateRootByHash.get(key: hash) else { + return nil + } + return try await State(backend: StateBackend(self, config: config, rootHash: rootHash)).asRef() + } + + public func getFinalizedHead() async throws -> Data32? { + try meta.get(key: MetaKey.finalizedHead.key).flatMap { Data32($0) } + } + + public func getHeads() async throws -> Set { + let data = try meta.get(key: MetaKey.heads.key) + guard let data else { + return [] + } + return try JamDecoder.decode(Set.self, from: data) + } + + public func getBlockHash(byTimeslot: TimeslotIndex) async throws -> Set { + try blockHashByTimeslot.get(key: byTimeslot) ?? Set() + } + + public func getBlockHash(byNumber: UInt32) async throws -> Set { + try blockHashByNumber.get(key: byNumber) ?? Set() } - public func readAll(prefix _: Data, startKey _: Data?, limit _: UInt32?) async throws -> [(key: Data, value: Data)] { - fatalError("unimplemented") + public func add(block: BlockRef) async throws { + logger.trace("add(block:) \(block.hash)") + + // TODO: batch put + + try blocks.put(key: block.hash, value: block) + var timeslotHashes = try await getBlockHash(byTimeslot: block.header.timeslot) + timeslotHashes.insert(block.hash) + try blockHashByTimeslot.put(key: block.header.timeslot, value: timeslotHashes) + + let blockNumber = if let number = try await getBlockNumber(hash: block.header.parentHash) { + number + 1 + } else { + UInt32(0) + } + + var numberHashes = try await getBlockHash(byNumber: blockNumber) + numberHashes.insert(block.hash) + try blockHashByNumber.put(key: blockNumber, value: numberHashes) + + try blockNumberByHash.put(key: block.hash, value: blockNumber) } - public func batchUpdate(_: [StateBackendOperation]) async throws { - fatalError("unimplemented") + public func add(state: StateRef) async throws { + logger.trace("add(state:) \(state.value.lastBlockHash)") + + let rootHash = await state.value.stateRoot + try stateRootByHash.put(key: state.value.lastBlockHash, value: rootHash) } - public func readValue(hash _: Data32) async throws -> Data? { - fatalError("unimplemented") + public func setFinalizedHead(hash: Data32) async throws { + logger.trace("setFinalizedHead() \(hash)") + + try meta.put(key: MetaKey.finalizedHead.key, value: hash.data) + } + + public func updateHead(hash: Data32, parent: Data32) async throws { + logger.trace("updateHead() \(hash) \(parent)") + + var heads = try await getHeads() + + // parent needs to be either + // - existing head + // - known block + if heads.remove(parent) == nil { + if try await !hasBlock(hash: parent) { + throw BlockchainDataProviderError.noData(hash: parent) + } + } + + heads.insert(hash) + + try meta.put(key: MetaKey.heads.key, value: JamEncoder.encode(heads)) + } + + public func remove(hash: Data32) async throws { + logger.trace("remove() \(hash)") + + // TODO: batch delete + + try blocks.delete(key: hash) + + if let block = try await getBlock(hash: hash) { + try blockHashByTimeslot.delete(key: block.header.timeslot) + } + + if let blockNumber = try await getBlockNumber(hash: hash) { + try blockHashByNumber.delete(key: blockNumber) + } + try blockNumberByHash.delete(key: hash) + } +} + +extension RocksDBBackend: StateBackendProtocol { + public func read(key: Data) async throws -> Data? { + try stateTrie.get(key: key) + } + + public func readAll(prefix: Data, startKey: Data?, limit: UInt32?) async throws -> [(key: Data, value: Data)] { + let snapshot = db.createSnapshot() + let readOptions = ReadOptions() + readOptions.setSnapshot(snapshot) + + let iterator = db.createIterator(column: .state, readOptions: readOptions) + iterator.seek(to: startKey ?? prefix) + + var ret = [(key: Data, value: Data)]() + if let limit { + ret.reserveCapacity(Int(limit)) + } + for _ in 0 ..< (limit ?? .max) { + iterator.next() + if let (key, value) = iterator.read() { + if key.starts(with: prefix) { + ret.append((key, value)) + } else { + break + } + } else { + break + } + } + return ret + } + + public func batchUpdate(_ updates: [StateBackendOperation]) async throws { + logger.trace("batchUpdate() \(updates.count) operations") + + // TODO: implement this using merge operator to perform atomic increment + // so we can do the whole thing in a single batch + for update in updates { + switch update { + case let .write(key, value): + try stateTrie.put(key: key, value: value) + case let .writeRawValue(key, value): + try stateValue.put(key: key, value: value) + let refCount = try stateRefsRaw.get(key: key) ?? 0 + try stateRefsRaw.put(key: key, value: refCount + 1) + case let .refIncrement(key): + let refCount = try stateRefs.get(key: key) ?? 0 + try stateRefs.put(key: key, value: refCount + 1) + case let .refDecrement(key): + let refCount = try stateRefs.get(key: key) ?? 0 + try stateRefs.put(key: key, value: refCount - 1) + } + } + } + + public func readValue(hash: Data32) async throws -> Data? { + try stateValue.get(key: hash) } public func gc(callback _: @Sendable (Data) -> Data32?) async throws { - fatalError("unimplemented") + logger.trace("gc()") + + // TODO: implement } } diff --git a/Database/Sources/Database/Stores.swift b/Database/Sources/Database/Stores.swift new file mode 100644 index 00000000..51c732c2 --- /dev/null +++ b/Database/Sources/Database/Stores.swift @@ -0,0 +1,36 @@ +import Blockchain +import Foundation +import RocksDBSwift +import Utils + +enum StoreId: UInt8, ColumnFamilyKey { + // metadata and configurations + case meta = 0 + // blocks + // blockHash => blockBody + case blocks = 1 + // block indexes + // 0x00 + timeslot => Set + // 0x01 + blockNumber => Set + // 0x02 + blockHash => blockNumber + // 0x03 + blockHash => stateRootHash + case blockIndexes = 2 + // state trie + // 0x00 + hash => trie node + // 0x01 + value hash => state value + case state = 3 + // ref count + // 0x00 + node hash => ref count + // 0x01 + value hash => ref count + case stateRefs = 4 +} + +enum MetaKey: UInt8 { + case genesisHash = 0 // Data32 + case heads = 1 // Set + case finalizedHead = 2 // Data32 + + var key: Data { + Data([rawValue]) + } +} diff --git a/Database/Sources/RocksDBSwift/Options.swift b/Database/Sources/RocksDBSwift/Options.swift new file mode 100644 index 00000000..06cdc5b9 --- /dev/null +++ b/Database/Sources/RocksDBSwift/Options.swift @@ -0,0 +1,128 @@ +import Foundation +import rocksdb +import Utils + +struct Options: ~Copyable, Sendable { + let ptr: SafePointer + + var value: OpaquePointer { ptr.value } + + init() { + ptr = .init(ptr: rocksdb_options_create(), free: rocksdb_options_destroy) + } + + func increaseParallelism(cpus: Int) { + rocksdb_options_increase_parallelism(ptr.value, Int32(cpus)) + } + + func optimizeLevelStyleCompaction(memtableMemoryBudget: UInt64) { + rocksdb_options_optimize_level_style_compaction(ptr.value, memtableMemoryBudget) + } + + func setCreateIfMissing(_ createIfMissing: Bool) { + rocksdb_options_set_create_if_missing(ptr.value, createIfMissing ? 1 : 0) + } + + func setLevelCompactionDynamicLevelBytes(_ levelCompactionDynamicLevelBytes: Bool) { + rocksdb_options_set_level_compaction_dynamic_level_bytes(ptr.value, levelCompactionDynamicLevelBytes ? 1 : 0) + } + + func setCreateIfMissingColumnFamilies(_ createIfMissingColumnFamilies: Bool) { + rocksdb_options_set_create_missing_column_families(ptr.value, createIfMissingColumnFamilies ? 1 : 0) + } +} + +struct WriteOptions: ~Copyable, Sendable { + let ptr: SafePointer + + var value: OpaquePointer { ptr.value } + + init() { + ptr = .init(ptr: rocksdb_writeoptions_create(), free: rocksdb_writeoptions_destroy) + } +} + +public struct ReadOptions: ~Copyable, Sendable { + let ptr: SafePointer + + var value: OpaquePointer { ptr.value } + + public init() { + ptr = .init(ptr: rocksdb_readoptions_create(), free: rocksdb_readoptions_destroy) + } + + public func setSnapshot(_ snapshot: borrowing Snapshot) { + rocksdb_readoptions_set_snapshot(ptr.value, snapshot.value) + } +} + +public struct Snapshot: ~Copyable, Sendable { + let ptr: SafePointer + + var value: OpaquePointer { ptr.value } + + init(_ db: SendableOpaquePointer) { + ptr = .init(ptr: rocksdb_create_snapshot(db.value), free: { ptr in rocksdb_release_snapshot(db.value, ptr) }) + } +} + +public struct Iterator: ~Copyable, Sendable { + let ptr: SafePointer + + var value: OpaquePointer { ptr.value } + + init(_ db: OpaquePointer, readOptions: borrowing ReadOptions, columnFamily: OpaquePointer) { + ptr = .init( + ptr: rocksdb_create_iterator_cf(db, readOptions.value, columnFamily), + free: rocksdb_iter_destroy + ) + } + + public func seek(to key: Data) { + key.withUnsafeBytes { rocksdb_iter_seek(ptr.value, $0.baseAddress, key.count) } + } + + // read the key-value pair at the current position + public func read() -> (key: Data, value: Data)? { + read { pair -> (key: Data, value: Data)? in + guard let pair else { + return nil + } + // copy key and value + + let keyData = Data(buffer: pair.key) + let valueData = Data(buffer: pair.value) + return (key: keyData, value: valueData) + } + } + + /// read the key-value pair at the current position + /// the passed key and values are only valid during the execution of the passed closure + public func read(fn: ((key: UnsafeBufferPointer, value: UnsafeBufferPointer)?) throws -> R) rethrows -> R { + guard rocksdb_iter_valid(ptr.value) != 0 else { + return try fn(nil) + } + + var keyLength = 0 + var valueLength = 0 + let key = rocksdb_iter_key(ptr.value, &keyLength) + let value = rocksdb_iter_value(ptr.value, &valueLength) + + guard let key, let value else { + return try fn(nil) + } + + let keyPtr = UnsafeBufferPointer(start: key, count: keyLength) + let valuePtr = UnsafeBufferPointer(start: value, count: valueLength) + + return try fn((key: keyPtr, value: valuePtr)) + } + + public func next() { + rocksdb_iter_next(ptr.value) + } + + public func prev() { + rocksdb_iter_prev(ptr.value) + } +} diff --git a/Database/Sources/Database/RocksDB.swift b/Database/Sources/RocksDBSwift/RocksDB.swift similarity index 83% rename from Database/Sources/Database/RocksDB.swift rename to Database/Sources/RocksDBSwift/RocksDB.swift index 23878026..c76c7f55 100644 --- a/Database/Sources/Database/RocksDB.swift +++ b/Database/Sources/RocksDBSwift/RocksDB.swift @@ -1,15 +1,18 @@ import Foundation import rocksdb +import TracingUtils import Utils +private let logger = Logger(label: "RocksDB") + public protocol ColumnFamilyKey: Sendable, CaseIterable, Hashable, RawRepresentable {} -public final class RocksDB: Sendable { - public enum BatchOperation { - case delete(column: CFKey, key: Data) - case put(column: CFKey, key: Data, value: Data) - } +public enum BatchOperation { + case delete(column: UInt8, key: Data) + case put(column: UInt8, key: Data, value: Data) +} +public final class RocksDB: Sendable { public enum Error: Swift.Error { case openFailed(message: String) case putFailed(message: String) @@ -17,6 +20,7 @@ public final class RocksDB: Sendable { case deleteFailed(message: String) case batchFailed(message: String) case noData + case invalidColumn(UInt8) } private let writeOptions: WriteOptions @@ -25,6 +29,8 @@ public final class RocksDB: Sendable { private let cfHandles: [SendableOpaquePointer] public init(path: URL) throws { + try FileManager.default.createDirectory(at: path, withIntermediateDirectories: true) + let dbOptions = Options() // TODO: starting from options here @@ -146,12 +152,18 @@ extension RocksDB { private func getHandle(column: CFKey) -> OpaquePointer { cfHandles[Int(column.rawValue)].value } + + private func getHandle(column: UInt8) -> OpaquePointer? { + cfHandles[safe: Int(column)]?.value + } } // MARK: - public methods extension RocksDB { public func put(column: CFKey, key: Data, value: Data) throws { + logger.trace("put() \(column) \(key.toHexString()) \(value.toHexString())") + let handle = getHandle(column: column) try Self.call(key, value) { err, ptrs in let key = ptrs[0] @@ -172,6 +184,8 @@ extension RocksDB { } public func get(column: CFKey, key: Data) throws -> Data? { + logger.trace("get() \(column) \(key.toHexString())") + var len = 0 let handle = getHandle(column: column) @@ -186,6 +200,8 @@ extension RocksDB { } public func delete(column: CFKey, key: Data) throws { + logger.trace("delete() \(column) \(key.toHexString())") + let handle = getHandle(column: column) try Self.call(key) { err, ptrs in @@ -197,20 +213,26 @@ extension RocksDB { } public func batch(operations: [BatchOperation]) throws { + logger.trace("batch() \(operations.count) operations") + let writeBatch = rocksdb_writebatch_create() defer { rocksdb_writebatch_destroy(writeBatch) } for operation in operations { switch operation { case let .delete(column, key): - let handle = getHandle(column: column) + logger.trace("batch() delete \(column) \(key.toHexString())") + + let handle = try getHandle(column: column).unwrap(orError: Error.invalidColumn(column)) try Self.call(key) { ptrs in let key = ptrs[0] rocksdb_writebatch_delete_cf(writeBatch, handle, key.ptr, key.count) } case let .put(column, key, value): - let handle = getHandle(column: column) + logger.trace("batch() put \(column) \(key.toHexString()) \(value.toHexString())") + + let handle = try getHandle(column: column).unwrap(orError: Error.invalidColumn(column)) try Self.call(key, value) { ptrs in let key = ptrs[0] let value = ptrs[1] @@ -226,4 +248,13 @@ extension RocksDB { throw Error.batchFailed(message: message) } } + + public func createSnapshot() -> Snapshot { + Snapshot(db.ptr) + } + + public func createIterator(column: CFKey, readOptions: borrowing ReadOptions) -> Iterator { + let handle = getHandle(column: column) + return Iterator(db.value, readOptions: readOptions, columnFamily: handle) + } } diff --git a/Database/Sources/RocksDBSwift/Store.swift b/Database/Sources/RocksDBSwift/Store.swift new file mode 100644 index 00000000..1d4355f5 --- /dev/null +++ b/Database/Sources/RocksDBSwift/Store.swift @@ -0,0 +1,59 @@ +import Foundation + +public protocol StoreCoder: Sendable { + associatedtype Key + associatedtype Value + + func encode(key: Key) throws -> Data + func encode(value: Value) throws -> Data + func decode(data: Data) throws -> Value +} + +public final class Store: Sendable { + private let db: RocksDB + private let column: CFKey + private let coder: Coder + + public init(db: RocksDB, column: CFKey, coder: Coder) { + self.db = db + self.column = column + self.coder = coder + } + + public func get(key: Coder.Key) throws -> Coder.Value? { + let encodedKey = try coder.encode(key: key) + + let data = try db.get(column: column, key: encodedKey) + + return try data.map { try coder.decode(data: $0) } + } + + public func put(key: Coder.Key, value: Coder.Value) throws { + let encodedKey = try coder.encode(key: key) + let encodedValue = try coder.encode(value: value) + + try db.put(column: column, key: encodedKey, value: encodedValue) + } + + public func delete(key: Coder.Key) throws { + let encodedKey = try coder.encode(key: key) + try db.delete(column: column, key: encodedKey) + } + + public func exists(key: Coder.Key) throws -> Bool { + let encodedKey = try coder.encode(key: key) + // it seems like + return try db.get(column: column, key: encodedKey) != nil + } + + public func putOperation(key: Coder.Key, value: Coder.Value) throws -> BatchOperation { + let encodedKey = try coder.encode(key: key) + let encodedValue = try coder.encode(value: value) + return .put(column: column.rawValue, key: encodedKey, value: encodedValue) + } + + public func deleteOperation(key: Coder.Key) throws -> BatchOperation { + let encodedKey = try coder.encode(key: key) + return .delete(column: column.rawValue, key: encodedKey) + } +} diff --git a/Database/Tests/DatabaseTests/RocksDBBackendTests.swift b/Database/Tests/DatabaseTests/RocksDBBackendTests.swift new file mode 100644 index 00000000..79dc9dec --- /dev/null +++ b/Database/Tests/DatabaseTests/RocksDBBackendTests.swift @@ -0,0 +1,102 @@ +import Blockchain +import Database +import Foundation +import Testing +import Utils + +final class RocksDBBackendTests { + let path = { + let tmpDir = FileManager.default.temporaryDirectory + return tmpDir.appendingPathComponent("\(UUID().uuidString)") + }() + + let config: ProtocolConfigRef = .dev + let genesisBlock: BlockRef + var backend: RocksDBBackend! + + init() async throws { + genesisBlock = BlockRef.dummy(config: config) + + // Initialize backend with genesis block + backend = try await RocksDBBackend( + path: path, + config: config, + genesisBlock: genesisBlock, + genesisStateData: [:] + ) + } + + deinit { + backend = nil + try? FileManager.default.removeItem(at: path) + } + + @Test + func testGenesisBlockInitialization() async throws { + // Verify genesis block was properly stored + let exists = try await backend.hasBlock(hash: genesisBlock.hash) + #expect(exists == true) + + // Verify it's both a head and finalized head + let isHead = try await backend.isHead(hash: genesisBlock.hash) + #expect(isHead == true) + + let finalizedHead = try await backend.getFinalizedHead() + #expect(finalizedHead == genesisBlock.hash) + + // Verify block number + let blockNumber = try await backend.getBlockNumber(hash: genesisBlock.hash) + #expect(blockNumber == 0) + } + + @Test + func testBlockOperations() async throws { + // Create and add a new block + let block1 = BlockRef.dummy(config: config, parent: genesisBlock) + + try await backend.add(block: block1) + try await backend.updateHead(hash: block1.hash, parent: genesisBlock.hash) + + // Verify block was stored + let storedBlock = try await backend.getBlock(hash: block1.hash) + #expect(storedBlock == block1) + + // Verify block indexes + let blocksByTimeslot = try await backend.getBlockHash(byTimeslot: 1) + #expect(blocksByTimeslot.contains(block1.hash)) + + let blocksByNumber = try await backend.getBlockHash(byNumber: 1) + #expect(blocksByNumber.contains(block1.hash)) + + // Test block removal + try await backend.remove(hash: block1.hash) + let exists = try await backend.hasBlock(hash: block1.hash) + #expect(exists == false) + } + + @Test + func testChainReorganization() async throws { + // Create two competing chains + let block1 = BlockRef.dummy(config: config, parent: genesisBlock) + + let block2 = BlockRef.dummy(config: config, parent: genesisBlock).mutate { block in + block.header.unsigned.timeslot = 123 + } + + // Add both blocks and update heads + try await backend.add(block: block1) + try await backend.add(block: block2) + try await backend.updateHead(hash: block1.hash, parent: genesisBlock.hash) + try await backend.updateHead(hash: block2.hash, parent: genesisBlock.hash) + + // Verify both are heads + let heads = try await backend.getHeads() + #expect(heads.contains(block1.hash)) + #expect(heads.contains(block2.hash)) + + // Test finalization of one chain + try await backend.setFinalizedHead(hash: block1.hash) + let finalizedHead = try await backend.getFinalizedHead() + #expect(finalizedHead == block1.hash) + } +} diff --git a/Database/Tests/DatabaseTests/RocksDBTests.swift b/Database/Tests/RocksDBSwiftTests/RocksDBTests.swift similarity index 55% rename from Database/Tests/DatabaseTests/RocksDBTests.swift rename to Database/Tests/RocksDBSwiftTests/RocksDBTests.swift index 9586ccc4..5d974dbc 100644 --- a/Database/Tests/DatabaseTests/RocksDBTests.swift +++ b/Database/Tests/RocksDBSwiftTests/RocksDBTests.swift @@ -3,7 +3,7 @@ import Foundation import Testing -@testable import Database +@testable import RocksDBSwift extension String { var data: Data { Data(utf8) } @@ -54,11 +54,11 @@ final class RocksDBTests { try rocksDB.put(column: .col1, key: "123".data, value: "qwe".data) try rocksDB.batch(operations: [ - .delete(column: .col1, key: "123".data), - .put(column: .col1, key: "234".data, value: "wer".data), - .put(column: .col1, key: "345".data, value: "ert".data), - .delete(column: .col1, key: "234".data), - .put(column: .col1, key: "345".data, value: "ertert".data), + .delete(column: Columns.col1.rawValue, key: "123".data), + .put(column: Columns.col1.rawValue, key: "234".data, value: "wer".data), + .put(column: Columns.col1.rawValue, key: "345".data, value: "ert".data), + .delete(column: Columns.col1.rawValue, key: "234".data), + .put(column: Columns.col1.rawValue, key: "345".data, value: "ertert".data), ]) #expect(try rocksDB.get(column: .col1, key: "123".data) == nil) @@ -89,9 +89,9 @@ final class RocksDBTests { @Test func testBatchOperationsAcrossColumns() throws { // Test batch operations across different column families try rocksDB.batch(operations: [ - .put(column: .col1, key: "batch1".data, value: "value1".data), - .put(column: .col2, key: "batch2".data, value: "value2".data), - .put(column: .col3, key: "batch3".data, value: "value3".data), + .put(column: Columns.col1.rawValue, key: "batch1".data, value: "value1".data), + .put(column: Columns.col2.rawValue, key: "batch2".data, value: "value2".data), + .put(column: Columns.col3.rawValue, key: "batch3".data, value: "value3".data), ]) #expect(try rocksDB.get(column: .col1, key: "batch1".data) == "value1".data) @@ -107,14 +107,61 @@ final class RocksDBTests { #expect(retrieved?.isEmpty == true) } - @Test func testErrorConditions() throws { - // Test invalid operations - let invalidDB = try? RocksDB(path: URL(fileURLWithPath: "/nonexistent/path")) - #expect(invalidDB == nil) + @Test func testIterator() throws { + // Setup test data + let testData = [ + ("key1", "value1"), + ("key2", "value2"), + ("key3", "value3"), + ("key4", "value4"), + ("key5", "value5"), + ] + + // Insert test data + for (key, value) in testData { + try rocksDB.put(column: .col1, key: key.data, value: value.data) + } + + // Test forward iteration + let readOptions = ReadOptions() + let iterator = rocksDB.createIterator(column: .col1, readOptions: readOptions) + + iterator.seek(to: "key1".data) + var count = 0 + while let pair = iterator.read() { + let key = String(data: pair.key, encoding: .utf8)! + let value = String(data: pair.value, encoding: .utf8)! + #expect(key == testData[count].0) + #expect(value == testData[count].1) + count += 1 + iterator.next() + } + #expect(count == testData.count) + } + + @Test func testSnapshot() throws { + // Insert initial data + try rocksDB.put(column: .col1, key: "key1".data, value: "value1".data) - // Test deleting non-existent key - try rocksDB.delete(column: .col1, key: "nonexistent".data) - let value = try rocksDB.get(column: .col1, key: "nonexistent".data) - #expect(value == nil) + // Create snapshot + let snapshot = rocksDB.createSnapshot() + let readOptions = ReadOptions() + readOptions.setSnapshot(snapshot) + + // Modify data after snapshot + try rocksDB.put(column: .col1, key: "key1".data, value: "modified".data) + try rocksDB.put(column: .col1, key: "key2".data, value: "value2".data) + + // Read using snapshot should see original value + let iterator = rocksDB.createIterator(column: .col1, readOptions: readOptions) + iterator.seek(to: "key1".data) + if let pair = iterator.read() { + let value = String(data: pair.value, encoding: .utf8)! + #expect(value == "value1") + } + + // Regular read should see modified value + let currentValue = try rocksDB.get(column: .col1, key: "key1".data) + #expect(String(data: currentValue!, encoding: .utf8) == "modified") } } diff --git a/Database/Tests/RocksDBSwiftTests/StoreTests.swift b/Database/Tests/RocksDBSwiftTests/StoreTests.swift new file mode 100644 index 00000000..f991ee8b --- /dev/null +++ b/Database/Tests/RocksDBSwiftTests/StoreTests.swift @@ -0,0 +1,152 @@ +import Foundation +@testable import RocksDBSwift +import Testing + +// First, let's create a simple coder for testing +struct JSONCoder: StoreCoder { + typealias Key = K + typealias Value = V + + private let encoder = JSONEncoder() + private let decoder = JSONDecoder() + + func encode(key: K) throws -> Data { + try encoder.encode(key) + } + + func encode(value: V) throws -> Data { + try encoder.encode(value) + } + + func decode(data: Data) throws -> V { + try decoder.decode(V.self, from: data) + } +} + +// Test model structures +struct TestKey: Codable, Hashable { + let id: String +} + +struct TestValue: Codable, Equatable { + let name: String + let age: Int + let data: [String] +} + +final class StoreTests { + let path = { + let tmpDir = FileManager.default.temporaryDirectory + return tmpDir.appendingPathComponent("\(UUID().uuidString)") + }() + + var rocksDB: RocksDB! + var store: Store>! + + init() throws { + rocksDB = try RocksDB(path: path) + store = Store( + db: rocksDB, + column: .col1, + coder: JSONCoder() + ) + } + + deinit { + rocksDB = nil + try! FileManager.default.removeItem(at: path) + } + + @Test + func testBasicOperations() throws { + let key = TestKey(id: "test1") + let value = TestValue(name: "John", age: 30, data: ["a", "b", "c"]) + + // Test put and get + try store.put(key: key, value: value) + let retrieved = try store.get(key: key) + #expect(retrieved == value) + + // Test exists + #expect(try store.exists(key: key) == true) + + // Test delete + try store.delete(key: key) + #expect(try store.get(key: key) == nil) + #expect(try store.exists(key: key) == false) + } + + @Test + func testBatchOperations() throws { + let key1 = TestKey(id: "batch1") + let key2 = TestKey(id: "batch2") + let value1 = TestValue(name: "Alice", age: 25, data: ["x"]) + let value2 = TestValue(name: "Bob", age: 35, data: ["y"]) + + // Create batch operations + let putOp1 = try store.putOperation(key: key1, value: value1) + let putOp2 = try store.putOperation(key: key2, value: value2) + + // Execute batch + try rocksDB.batch(operations: [putOp1, putOp2]) + + // Verify results + #expect(try store.get(key: key1) == value1) + #expect(try store.get(key: key2) == value2) + + // Test batch delete + let deleteOp = try store.deleteOperation(key: key1) + try rocksDB.batch(operations: [deleteOp]) + #expect(try store.get(key: key1) == nil) + #expect(try store.get(key: key2) == value2) + } + + @Test + func testErrorHandling() throws { + // Test getting non-existent key + let nonExistentKey = TestKey(id: "nothere") + #expect(try store.get(key: nonExistentKey) == nil) + + // Test multiple operations + let key = TestKey(id: "test") + let value1 = TestValue(name: "First", age: 20, data: ["1"]) + let value2 = TestValue(name: "Second", age: 30, data: ["2"]) + + try store.put(key: key, value: value1) + try store.put(key: key, value: value2) + + let final = try store.get(key: key) + #expect(final == value2) + } + + @Test + func testLargeData() throws { + let key = TestKey(id: "large") + let largeArray = (0 ..< 1000).map { String($0) } + let value = TestValue(name: "Large", age: 99, data: largeArray) + + try store.put(key: key, value: value) + let retrieved = try store.get(key: key) + #expect(retrieved == value) + } + + @Test + func testMultipleStores() throws { + // Create a second store with different types + let store2: Store> = Store( + db: rocksDB, + column: .col2, + coder: JSONCoder() + ) + + // Test operations on both stores + let key1 = TestKey(id: "store1") + let value1 = TestValue(name: "Store1", age: 40, data: ["test"]) + + try store.put(key: key1, value: value1) + try store2.put(key: "store2", value: 42) + + #expect(try store.get(key: key1) == value1) + #expect(try store2.get(key: "store2") == 42) + } +} diff --git a/Node/Package.swift b/Node/Package.swift index 1da05f07..a7a3e4d7 100644 --- a/Node/Package.swift +++ b/Node/Package.swift @@ -21,6 +21,7 @@ let package = Package( .package(path: "../RPC"), .package(path: "../TracingUtils"), .package(path: "../Utils"), + .package(path: "../Database"), .package(url: "https://github.com/apple/swift-testing.git", branch: "0.10.0"), .package(url: "https://github.com/gh123man/Async-Channels.git", from: "1.0.2"), ], @@ -34,6 +35,7 @@ let package = Package( "RPC", "TracingUtils", "Utils", + "Database", .product(name: "AsyncChannels", package: "Async-Channels"), ] ), diff --git a/Node/Sources/Node/Node.swift b/Node/Sources/Node/Node.swift index a3d4db09..46d8276f 100644 --- a/Node/Sources/Node/Node.swift +++ b/Node/Sources/Node/Node.swift @@ -1,4 +1,6 @@ import Blockchain +import Database +import Foundation import Networking import RPC import TracingUtils @@ -9,6 +11,34 @@ private let logger = Logger(label: "node") public typealias RPCConfig = Server.Config public typealias NetworkConfig = Network.Config +public enum Database { + case inMemory + case rocksDB(path: URL) + + public func open(chainspec: ChainSpec) async throws -> BlockchainDataProvider { + switch self { + case let .rocksDB(path): + logger.info("Using RocksDB backend at \(path.absoluteString)") + let backend = try await RocksDBBackend( + path: path, + config: chainspec.getConfig(), + genesisBlock: chainspec.getBlock(), + genesisStateData: chainspec.getState() + ) + return try await BlockchainDataProvider(backend) + case .inMemory: + logger.info("Using in-memory backend") + let genesisBlock = try chainspec.getBlock() + let genesisStateData = try chainspec.getState() + let backend = try StateBackend(InMemoryBackend(), config: chainspec.getConfig(), rootHash: Data32()) + try await backend.writeRaw(Array(genesisStateData)) + let genesisState = try await State(backend: backend) + let genesisStateRef = StateRef(genesisState) + return try await BlockchainDataProvider(InMemoryDataProvider(genesisState: genesisStateRef, genesisBlock: genesisBlock)) + } + } +} + public class Node { public struct Config { public var rpc: RPCConfig? @@ -16,19 +46,22 @@ public class Node { public var peers: [NetAddr] public var local: Bool public var name: String? + public var database: Database public init( rpc: RPCConfig?, network: NetworkConfig, peers: [NetAddr] = [], local: Bool = false, - name: String? = nil + name: String? = nil, + database: Database = .inMemory ) { self.rpc = rpc self.network = network self.peers = peers self.local = local self.name = name + self.database = database } } @@ -50,17 +83,12 @@ public class Node { self.config = config let chainspec = try await genesis.load() - let genesisBlock = try chainspec.getBlock() - let genesisStateData = try chainspec.getState() - let backend = try StateBackend(InMemoryBackend(), config: chainspec.getConfig(), rootHash: Data32()) - try await backend.writeRaw(Array(genesisStateData)) - let genesisState = try await State(backend: backend) - let genesisStateRef = StateRef(genesisState) let protocolConfig = try chainspec.getConfig() - logger.info("Genesis: \(genesisBlock.hash)") + dataProvider = try await config.database.open(chainspec: chainspec) + + logger.info("Genesis: \(dataProvider.genesisBlockHash)") - dataProvider = try await BlockchainDataProvider(InMemoryDataProvider(genesisState: genesisStateRef, genesisBlock: genesisBlock)) self.scheduler = scheduler let blockchain = try await Blockchain( config: protocolConfig, diff --git a/Node/Sources/Node/ValidatorNode.swift b/Node/Sources/Node/ValidatorNode.swift index 63af3755..5d55cdce 100644 --- a/Node/Sources/Node/ValidatorNode.swift +++ b/Node/Sources/Node/ValidatorNode.swift @@ -3,6 +3,8 @@ import Foundation import TracingUtils import Utils +private let logger = Logger(label: "ValidatorNode") + public class ValidatorNode: Node { private var validator: ValidatorService! @@ -34,12 +36,15 @@ public class ValidatorNode: Node { let dataProvider: BlockchainDataProvider = blockchain.dataProvider let local = config.local Task { - let genesisState = try await dataProvider.getState(hash: dataProvider.genesisBlockHash) if !local { + logger.trace("Waiting for sync") await syncManager.waitForSyncCompletion() } + logger.trace("Sync completed") await validator.onSyncCompleted() + let genesisState = try await dataProvider.getState(hash: dataProvider.genesisBlockHash) if await dataProvider.bestHead.hash == dataProvider.genesisBlockHash { + logger.trace("Calling on(genesis:)") await validator.on(genesis: genesisState) } } diff --git a/Node/Tests/NodeTests/NodeTests.swift b/Node/Tests/NodeTests/NodeTests.swift index 635880c4..fd3e4f7b 100644 --- a/Node/Tests/NodeTests/NodeTests.swift +++ b/Node/Tests/NodeTests/NodeTests.swift @@ -5,9 +5,22 @@ import Utils @testable import Node -struct NodeTests { +final class NodeTests { + let path = { + let tmpDir = FileManager.default.temporaryDirectory + return tmpDir.appendingPathComponent("\(UUID().uuidString)") + }() + + func getDatabase(_ idx: Int) -> Database { + Database.rocksDB(path: path.appendingPathComponent("\(idx)")) + } + + deinit { + try? FileManager.default.removeItem(at: path) + } + @Test - func validatorNode() async throws { + func validatorNodeInMemory() async throws { let (nodes, scheduler) = try await Topology( nodes: [NodeDescription(isValidator: true)] ).build(genesis: .preset(.minimal)) @@ -36,13 +49,43 @@ struct NodeTests { #expect(try await validatorNode.blockchain.dataProvider.hasBlock(hash: newBestHead.hash)) } + @Test + func validatorNodeRocksDB() async throws { + let (nodes, scheduler) = try await Topology( + nodes: [NodeDescription(isValidator: true, database: getDatabase(0))] + ).build(genesis: .preset(.minimal)) + + let (validatorNode, storeMiddlware) = nodes[0] + + // Get initial state + let initialBestHead = await validatorNode.dataProvider.bestHead + let initialTimeslot = initialBestHead.timeslot + + // Advance time + for _ in 0 ..< 10 { + await scheduler.advance(by: TimeInterval(validatorNode.blockchain.config.value.slotPeriodSeconds)) + await storeMiddlware.wait() + } + + // Wait for block production + try await Task.sleep(for: .milliseconds(500)) + + // Get new state + let newBestHead = await validatorNode.dataProvider.bestHead + let newTimeslot = newBestHead.timeslot + + // Verify block was produced + #expect(newTimeslot > initialTimeslot) + #expect(try await validatorNode.blockchain.dataProvider.hasBlock(hash: newBestHead.hash)) + } + @Test func sync() async throws { // Create validator and full node let (nodes, scheduler) = try await Topology( nodes: [ - NodeDescription(isValidator: true), - NodeDescription(devSeed: 1), + NodeDescription(isValidator: true, database: getDatabase(0)), + NodeDescription(devSeed: 1, database: getDatabase(1)), ], connections: [(0, 1)] ).build(genesis: .preset(.minimal)) @@ -91,10 +134,10 @@ struct NodeTests { // Create multiple nodes let (nodes, scheduler) = try await Topology( nodes: [ - NodeDescription(isValidator: true), - NodeDescription(isValidator: true, devSeed: 1), - NodeDescription(devSeed: 2), - NodeDescription(devSeed: 3), + NodeDescription(isValidator: true, database: getDatabase(0)), + NodeDescription(isValidator: true, devSeed: 1, database: getDatabase(1)), + NodeDescription(devSeed: 2, database: getDatabase(2)), + NodeDescription(devSeed: 3, database: .inMemory), ], connections: [(0, 1), (0, 2), (0, 3), (1, 2), (1, 3)] ).build(genesis: .preset(.minimal)) diff --git a/Node/Tests/NodeTests/Topology.swift b/Node/Tests/NodeTests/Topology.swift index 30014ad3..785814d4 100644 --- a/Node/Tests/NodeTests/Topology.swift +++ b/Node/Tests/NodeTests/Topology.swift @@ -6,10 +6,12 @@ import Utils struct NodeDescription { let isValidator: Bool let devSeed: UInt32 + let database: Database - public init(isValidator: Bool = false, devSeed: UInt32 = 0) { + public init(isValidator: Bool = false, devSeed: UInt32 = 0, database: Database = .inMemory) { self.isValidator = isValidator self.devSeed = devSeed + self.database = database } } @@ -41,7 +43,8 @@ struct Topology { key: keystore.get(Ed25519.self, publicKey: keys.ed25519)! ), peers: [], - local: nodes.count == 1 + local: nodes.count == 1, + database: desc.database ) let nodeCls = desc.isValidator ? ValidatorNode.self : Node.self let node = try await nodeCls.init( diff --git a/Utils/Tests/UtilsTests/Data32Tests.swift b/Utils/Tests/UtilsTests/Data32Tests.swift index 5bedcd33..064a2c40 100644 --- a/Utils/Tests/UtilsTests/Data32Tests.swift +++ b/Utils/Tests/UtilsTests/Data32Tests.swift @@ -1,3 +1,4 @@ +import Codec import Foundation import Testing @@ -50,4 +51,14 @@ struct Data32Tests { arr.sort() #expect(arr == [a, b, c]) } + + @Test func encodeArrayOptional() throws { + let value: [Data32?] = [Data32()] + + let encoded = try JamEncoder.encode(value) + #expect(encoded == Data([1, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0])) + + let decoded = try JamDecoder.decode([Data32?].self, from: encoded) + #expect(decoded == value) + } } diff --git a/Utils/Tests/UtilsTests/EventBus/DispatcherMiddlewareTests.swift b/Utils/Tests/UtilsTests/EventBus/DispatcherMiddlewareTests.swift index edbbcc14..7bb75559 100644 --- a/Utils/Tests/UtilsTests/EventBus/DispatcherMiddlewareTests.swift +++ b/Utils/Tests/UtilsTests/EventBus/DispatcherMiddlewareTests.swift @@ -21,10 +21,6 @@ final class MiddlewareTests { let parallelMiddleware = Middleware.parallel(firstMiddleware, secondMiddleware) - let handler: MiddlewareHandler = { _ in - await orderManager.appendOrder(2) - } - try await parallelMiddleware.handle((), next: { await orderManager.appendOrder(1) }) diff --git a/Utils/Tests/UtilsTests/Merklization/MerklizationTests.swift b/Utils/Tests/UtilsTests/Merklization/MerklizationTests.swift index b57c706c..c37c338d 100644 --- a/Utils/Tests/UtilsTests/Merklization/MerklizationTests.swift +++ b/Utils/Tests/UtilsTests/Merklization/MerklizationTests.swift @@ -17,7 +17,7 @@ extension Blake2b256 { struct MerklizationTests { @Test func testHash() throws { - var mmr = MMR([]) + let mmr = MMR([]) let emptyHash = try JamEncoder.encode(mmr).keccakHash() #expect(mmr.hash() == emptyHash) } @@ -30,7 +30,6 @@ struct MerklizationTests { Data("node3".utf8), ] let result = Merklization.binaryMerklize(input) - print("result = \(result)") let expected = Blake2b256.hash("node", Blake2b256.hash("node", Data("node1".utf8),