From 7fb6e5e70f52a40cad711f8cfd73382655ca6baf Mon Sep 17 00:00:00 2001 From: Bryan Chen Date: Wed, 11 Dec 2024 17:29:08 +1300 Subject: [PATCH] test with rocksdb and many issue fixes --- .../Blockchain/RuntimeProtocols/Runtime.swift | 6 ++- .../Sources/Blockchain/State/State.swift | 1 + .../Blockchain/State/StateBackend.swift | 8 ++- .../Sources/Blockchain/State/StateTrie.swift | 7 +-- .../Sources/Database/RocksDBBackend.swift | 40 +++++++++++++-- Database/Sources/Database/Stores.swift | 1 + Database/Sources/RocksDBSwift/RocksDB.swift | 15 ++++++ Node/Sources/Node/ValidatorNode.swift | 7 ++- Node/Tests/NodeTests/NodeTests.swift | 50 ++++++++++++++++++- Node/Tests/NodeTests/Topology.swift | 7 ++- 10 files changed, 126 insertions(+), 16 deletions(-) diff --git a/Blockchain/Sources/Blockchain/RuntimeProtocols/Runtime.swift b/Blockchain/Sources/Blockchain/RuntimeProtocols/Runtime.swift index f08814ed..65dc51bb 100644 --- a/Blockchain/Sources/Blockchain/RuntimeProtocols/Runtime.swift +++ b/Blockchain/Sources/Blockchain/RuntimeProtocols/Runtime.swift @@ -14,7 +14,7 @@ public final class Runtime { case invalidReportAuthorizer case encodeError(any Swift.Error) case invalidExtrinsicHash - case invalidParentHash + case invalidParentHash(state: Data32, header: Data32) case invalidHeaderStateRoot case invalidHeaderEpochMarker case invalidHeaderWinningTickets @@ -53,7 +53,7 @@ public final class Runtime { let block = block.value guard block.header.parentHash == state.value.lastBlockHash else { - throw Error.invalidParentHash + throw Error.invalidParentHash(state: state.value.lastBlockHash, header: block.header.parentHash) } guard block.header.priorStateRoot == context.stateRoot else { @@ -189,6 +189,8 @@ public final class Runtime { // after reports as it need old recent history try updateRecentHistory(block: block, state: &newState) + + try await newState.save() } catch let error as Error { throw error } catch let error as SafroleError { diff --git a/Blockchain/Sources/Blockchain/State/State.swift b/Blockchain/Sources/Blockchain/State/State.swift index 35851112..32e9a8c7 100644 --- a/Blockchain/Sources/Blockchain/State/State.swift +++ b/Blockchain/Sources/Blockchain/State/State.swift @@ -220,6 +220,7 @@ public struct State: Sendable { // TODO: we don't really want to write to the underlying backend here // instead, it should be writting to a in memory layer // and when actually saving the state, save the in memory layer to the presistent store + @discardableResult public func save() async throws -> Data32 { try await backend.write(layer.toKV()) return await backend.rootHash diff --git a/Blockchain/Sources/Blockchain/State/StateBackend.swift b/Blockchain/Sources/Blockchain/State/StateBackend.swift index d9d05da0..a34feeee 100644 --- a/Blockchain/Sources/Blockchain/State/StateBackend.swift +++ b/Blockchain/Sources/Blockchain/State/StateBackend.swift @@ -3,7 +3,7 @@ import Foundation import Utils public enum StateBackendError: Error { - case missingState + case missingState(key: Sendable) case invalidData } @@ -35,7 +35,7 @@ public final class StateBackend: Sendable { if Key.optional { return nil } - throw StateBackendError.missingState + throw StateBackendError.missingState(key: key) } public func batchRead(_ keys: [any StateKey]) async throws -> [(key: any StateKey, value: (Codable & Sendable)?)] { @@ -74,4 +74,8 @@ public final class StateBackend: Sendable { return nil } } + + public func debugPrint() async throws { + try await trie.debugPrint() + } } diff --git a/Blockchain/Sources/Blockchain/State/StateTrie.swift b/Blockchain/Sources/Blockchain/State/StateTrie.swift index 40e26a47..7c7dffb5 100644 --- a/Blockchain/Sources/Blockchain/State/StateTrie.swift +++ b/Blockchain/Sources/Blockchain/State/StateTrie.swift @@ -24,7 +24,7 @@ private struct TrieNode { right = Data32(data.data.suffix(32))! self.isNew = isNew rawValue = nil - switch data.data[0] & 0b1100_0000 { + switch data.data.first! & 0b1100_0000 { case 0b1000_0000: type = .embeddedLeaf case 0b1100_0000: @@ -66,7 +66,7 @@ private struct TrieNode { guard type == .embeddedLeaf else { return nil } - let len = left.data[0] & 0b0011_1111 + let len = left.data.first! & 0b0011_1111 return right.data[relative: 0 ..< Int(len)] } @@ -85,7 +85,7 @@ private struct TrieNode { static func branch(left: Data32, right: Data32) -> TrieNode { var left = left.data - left[0] = left[0] & 0b0111_1111 // clear the highest bit + left[left.startIndex] = left[left.startIndex] & 0b0111_1111 // clear the highest bit return .init(left: Data32(left)!, right: right, type: .branch, isNew: true, rawValue: nil) } } @@ -352,6 +352,7 @@ public actor StateTrie: Sendable { } } + logger.info("Root hash: \(rootHash.toHexString())") try await printNode(rootHash, depth: 0) } } diff --git a/Database/Sources/Database/RocksDBBackend.swift b/Database/Sources/Database/RocksDBBackend.swift index f9090c97..9cac1815 100644 --- a/Database/Sources/Database/RocksDBBackend.swift +++ b/Database/Sources/Database/RocksDBBackend.swift @@ -2,8 +2,11 @@ import Blockchain import Codec import Foundation import RocksDBSwift +import TracingUtils import Utils +private let logger = Logger(label: "RocksDBBackend") + public enum RocksDBBackendError: Error { case genesisHashMismatch(expected: Data32, actual: Data) } @@ -16,6 +19,7 @@ public final class RocksDBBackend: Sendable { private let blockHashByTimeslot: Store>> private let blockHashByNumber: Store>> private let blockNumberByHash: Store> + private let stateRootByHash: Store> private let stateTrie: Store> private let stateValue: Store> private let stateRefs: Store> @@ -31,6 +35,7 @@ public final class RocksDBBackend: Sendable { blockHashByTimeslot = Store(db: db, column: .blockIndexes, coder: JamCoder(config: config, prefix: Data([0]))) blockHashByNumber = Store(db: db, column: .blockIndexes, coder: JamCoder(config: config, prefix: Data([1]))) blockNumberByHash = Store(db: db, column: .blockIndexes, coder: JamCoder(config: config, prefix: Data([2]))) + stateRootByHash = Store(db: db, column: .blockIndexes, coder: JamCoder(config: config, prefix: Data([3]))) stateTrie = Store(db: db, column: .state, coder: JamCoder(config: config, prefix: Data([0]))) stateValue = Store(db: db, column: .state, coder: JamCoder(config: config, prefix: Data([1]))) stateRefs = Store(db: db, column: .stateRefs, coder: JamCoder(config: config, prefix: Data([0]))) @@ -43,18 +48,25 @@ public final class RocksDBBackend: Sendable { guard genesis == genesisBlockHash.data else { throw RocksDBBackendError.genesisHashMismatch(expected: genesisBlockHash, actual: genesis) } + + logger.trace("DB loaded") } else { // must be a new db try meta.put(key: MetaKey.genesisHash.key, value: genesisBlockHash.data) try await add(block: genesisBlock) let backend = StateBackend(self, config: config, rootHash: Data32()) try await backend.writeRaw(Array(genesisStateData)) + let rootHash = await backend.rootHash + try stateRootByHash.put(key: genesisBlockHash, value: rootHash) try setHeads([genesisBlockHash]) try await setFinalizedHead(hash: genesisBlockHash) + + logger.trace("New DB initialized") } } private func setHeads(_ heads: Set) throws { + logger.trace("setHeads() \(heads)") try meta.put(key: MetaKey.heads.key, value: JamEncoder.encode(heads)) } } @@ -65,7 +77,7 @@ extension RocksDBBackend: BlockchainDataProviderProtocol { } public func hasState(hash: Data32) async throws -> Bool { - try stateTrie.exists(key: hash.data) + try stateRootByHash.exists(key: hash) } public func isHead(hash: Data32) async throws -> Bool { @@ -85,7 +97,12 @@ extension RocksDBBackend: BlockchainDataProviderProtocol { } public func getState(hash: Data32) async throws -> StateRef? { - try await State(backend: StateBackend(self, config: config, rootHash: hash)).asRef() + logger.trace("getState() \(hash)") + + guard let rootHash = try stateRootByHash.get(key: hash) else { + return nil + } + return try await State(backend: StateBackend(self, config: config, rootHash: rootHash)).asRef() } public func getFinalizedHead() async throws -> Data32? { @@ -109,6 +126,8 @@ extension RocksDBBackend: BlockchainDataProviderProtocol { } public func add(block: BlockRef) async throws { + logger.trace("add(block:) \(block.hash)") + // TODO: batch put try blocks.put(key: block.hash, value: block) @@ -129,15 +148,22 @@ extension RocksDBBackend: BlockchainDataProviderProtocol { try blockNumberByHash.put(key: block.hash, value: blockNumber) } - public func add(state _: StateRef) async throws { - // nothing to do + public func add(state: StateRef) async throws { + logger.trace("add(state:) \(state.value.lastBlockHash)") + + let rootHash = await state.value.stateRoot + try stateRootByHash.put(key: state.value.lastBlockHash, value: rootHash) } public func setFinalizedHead(hash: Data32) async throws { + logger.trace("setFinalizedHead() \(hash)") + try meta.put(key: MetaKey.finalizedHead.key, value: hash.data) } public func updateHead(hash: Data32, parent: Data32) async throws { + logger.trace("updateHead() \(hash) \(parent)") + var heads = try await getHeads() // parent needs to be either @@ -155,6 +181,8 @@ extension RocksDBBackend: BlockchainDataProviderProtocol { } public func remove(hash: Data32) async throws { + logger.trace("remove() \(hash)") + // TODO: batch delete try blocks.delete(key: hash) @@ -203,6 +231,8 @@ extension RocksDBBackend: StateBackendProtocol { } public func batchUpdate(_ updates: [StateBackendOperation]) async throws { + logger.trace("batchUpdate() \(updates.count) operations") + // TODO: implement this using merge operator to perform atomic increment // so we can do the whole thing in a single batch for update in updates { @@ -228,6 +258,8 @@ extension RocksDBBackend: StateBackendProtocol { } public func gc(callback _: @Sendable (Data) -> Data32?) async throws { + logger.trace("gc()") + // TODO: implement } } diff --git a/Database/Sources/Database/Stores.swift b/Database/Sources/Database/Stores.swift index 272fc63a..51c732c2 100644 --- a/Database/Sources/Database/Stores.swift +++ b/Database/Sources/Database/Stores.swift @@ -13,6 +13,7 @@ enum StoreId: UInt8, ColumnFamilyKey { // 0x00 + timeslot => Set // 0x01 + blockNumber => Set // 0x02 + blockHash => blockNumber + // 0x03 + blockHash => stateRootHash case blockIndexes = 2 // state trie // 0x00 + hash => trie node diff --git a/Database/Sources/RocksDBSwift/RocksDB.swift b/Database/Sources/RocksDBSwift/RocksDB.swift index 33f027d6..c76c7f55 100644 --- a/Database/Sources/RocksDBSwift/RocksDB.swift +++ b/Database/Sources/RocksDBSwift/RocksDB.swift @@ -1,7 +1,10 @@ import Foundation import rocksdb +import TracingUtils import Utils +private let logger = Logger(label: "RocksDB") + public protocol ColumnFamilyKey: Sendable, CaseIterable, Hashable, RawRepresentable {} public enum BatchOperation { @@ -159,6 +162,8 @@ extension RocksDB { extension RocksDB { public func put(column: CFKey, key: Data, value: Data) throws { + logger.trace("put() \(column) \(key.toHexString()) \(value.toHexString())") + let handle = getHandle(column: column) try Self.call(key, value) { err, ptrs in let key = ptrs[0] @@ -179,6 +184,8 @@ extension RocksDB { } public func get(column: CFKey, key: Data) throws -> Data? { + logger.trace("get() \(column) \(key.toHexString())") + var len = 0 let handle = getHandle(column: column) @@ -193,6 +200,8 @@ extension RocksDB { } public func delete(column: CFKey, key: Data) throws { + logger.trace("delete() \(column) \(key.toHexString())") + let handle = getHandle(column: column) try Self.call(key) { err, ptrs in @@ -204,12 +213,16 @@ extension RocksDB { } public func batch(operations: [BatchOperation]) throws { + logger.trace("batch() \(operations.count) operations") + let writeBatch = rocksdb_writebatch_create() defer { rocksdb_writebatch_destroy(writeBatch) } for operation in operations { switch operation { case let .delete(column, key): + logger.trace("batch() delete \(column) \(key.toHexString())") + let handle = try getHandle(column: column).unwrap(orError: Error.invalidColumn(column)) try Self.call(key) { ptrs in let key = ptrs[0] @@ -217,6 +230,8 @@ extension RocksDB { } case let .put(column, key, value): + logger.trace("batch() put \(column) \(key.toHexString()) \(value.toHexString())") + let handle = try getHandle(column: column).unwrap(orError: Error.invalidColumn(column)) try Self.call(key, value) { ptrs in let key = ptrs[0] diff --git a/Node/Sources/Node/ValidatorNode.swift b/Node/Sources/Node/ValidatorNode.swift index 63af3755..5d55cdce 100644 --- a/Node/Sources/Node/ValidatorNode.swift +++ b/Node/Sources/Node/ValidatorNode.swift @@ -3,6 +3,8 @@ import Foundation import TracingUtils import Utils +private let logger = Logger(label: "ValidatorNode") + public class ValidatorNode: Node { private var validator: ValidatorService! @@ -34,12 +36,15 @@ public class ValidatorNode: Node { let dataProvider: BlockchainDataProvider = blockchain.dataProvider let local = config.local Task { - let genesisState = try await dataProvider.getState(hash: dataProvider.genesisBlockHash) if !local { + logger.trace("Waiting for sync") await syncManager.waitForSyncCompletion() } + logger.trace("Sync completed") await validator.onSyncCompleted() + let genesisState = try await dataProvider.getState(hash: dataProvider.genesisBlockHash) if await dataProvider.bestHead.hash == dataProvider.genesisBlockHash { + logger.trace("Calling on(genesis:)") await validator.on(genesis: genesisState) } } diff --git a/Node/Tests/NodeTests/NodeTests.swift b/Node/Tests/NodeTests/NodeTests.swift index 635880c4..c9688f86 100644 --- a/Node/Tests/NodeTests/NodeTests.swift +++ b/Node/Tests/NodeTests/NodeTests.swift @@ -1,13 +1,28 @@ import Blockchain import Foundation import Testing +import TracingUtils import Utils @testable import Node -struct NodeTests { +final class NodeTests { + let path = { + let tmpDir = FileManager.default.temporaryDirectory + return tmpDir.appendingPathComponent("\(UUID().uuidString)") + }() + + func getDatabase(_ idx: Int) -> Database { + Database.rocksDB(path: path.appendingPathComponent("\(idx)")) + } + + deinit { + try? FileManager.default.removeItem(at: path) + } + @Test - func validatorNode() async throws { + func validatorNodeInMemory() async throws { + setupTestLogger() let (nodes, scheduler) = try await Topology( nodes: [NodeDescription(isValidator: true)] ).build(genesis: .preset(.minimal)) @@ -36,6 +51,37 @@ struct NodeTests { #expect(try await validatorNode.blockchain.dataProvider.hasBlock(hash: newBestHead.hash)) } + @Test + func validatorNodeRocksDB() async throws { + setupTestLogger() + let (nodes, scheduler) = try await Topology( + nodes: [NodeDescription(isValidator: true, database: getDatabase(0))] + ).build(genesis: .preset(.minimal)) + + let (validatorNode, storeMiddlware) = nodes[0] + + // Get initial state + let initialBestHead = await validatorNode.dataProvider.bestHead + let initialTimeslot = initialBestHead.timeslot + + // Advance time + for _ in 0 ..< 10 { + await scheduler.advance(by: TimeInterval(validatorNode.blockchain.config.value.slotPeriodSeconds)) + await storeMiddlware.wait() + } + + // Wait for block production + try await Task.sleep(for: .milliseconds(500)) + + // Get new state + let newBestHead = await validatorNode.dataProvider.bestHead + let newTimeslot = newBestHead.timeslot + + // Verify block was produced + #expect(newTimeslot > initialTimeslot) + #expect(try await validatorNode.blockchain.dataProvider.hasBlock(hash: newBestHead.hash)) + } + @Test func sync() async throws { // Create validator and full node diff --git a/Node/Tests/NodeTests/Topology.swift b/Node/Tests/NodeTests/Topology.swift index 30014ad3..785814d4 100644 --- a/Node/Tests/NodeTests/Topology.swift +++ b/Node/Tests/NodeTests/Topology.swift @@ -6,10 +6,12 @@ import Utils struct NodeDescription { let isValidator: Bool let devSeed: UInt32 + let database: Database - public init(isValidator: Bool = false, devSeed: UInt32 = 0) { + public init(isValidator: Bool = false, devSeed: UInt32 = 0, database: Database = .inMemory) { self.isValidator = isValidator self.devSeed = devSeed + self.database = database } } @@ -41,7 +43,8 @@ struct Topology { key: keystore.get(Ed25519.self, publicKey: keys.ed25519)! ), peers: [], - local: nodes.count == 1 + local: nodes.count == 1, + database: desc.database ) let nodeCls = desc.isValidator ? ValidatorNode.self : Node.self let node = try await nodeCls.init(