From 1d51337f2913043dde5ea774d6bdca5b53fc7dbb Mon Sep 17 00:00:00 2001 From: Xiliang Chen Date: Fri, 25 Oct 2024 22:23:38 +1300 Subject: [PATCH] Sync manager (#196) * rename * syncing * sync --- .../Sources/Blockchain/Blockchain.swift | 5 + .../Sources/Blockchain/Types/Header.swift | 30 +++- .../Validator/ValidatorService.swift | 2 +- Boka/Sources/Boka.swift | 25 ++- Networking/Sources/Networking/Peer.swift | 4 +- .../CommonEphemeral/BlockRequest.swift | 12 ++ .../CommonEphemeral/CERequest.swift | 40 ++--- .../Node/NetworkingProtocol/Network.swift | 4 +- .../NetworkingProtocol/NetworkEvents.swift | 15 ++ .../NetworkingProtocol/NetworkManager.swift | 95 ++++++++--- .../Node/NetworkingProtocol/PeerManager.swift | 38 +++-- .../Node/NetworkingProtocol/SyncManager.swift | 149 ++++++++++++++++++ .../UniquePresistent/BlockAnnouncement.swift | 2 +- Node/Sources/Node/Node.swift | 7 +- Node/Sources/Node/NodeDataSource.swift | 2 +- Node/Sources/Node/ValidatorNode.swift | 16 +- 16 files changed, 364 insertions(+), 82 deletions(-) create mode 100644 Node/Sources/Node/NetworkingProtocol/CommonEphemeral/BlockRequest.swift create mode 100644 Node/Sources/Node/NetworkingProtocol/NetworkEvents.swift create mode 100644 Node/Sources/Node/NetworkingProtocol/SyncManager.swift diff --git a/Blockchain/Sources/Blockchain/Blockchain.swift b/Blockchain/Sources/Blockchain/Blockchain.swift index b2284727..acc114a0 100644 --- a/Blockchain/Sources/Blockchain/Blockchain.swift +++ b/Blockchain/Sources/Blockchain/Blockchain.swift @@ -29,6 +29,11 @@ public final class Blockchain: ServiceBase, @unchecked Sendable { public func importBlock(_ block: BlockRef) async throws { logger.debug("importing block: #\(block.header.timeslot) \(block.hash)") + if try await dataProvider.hasBlock(hash: block.hash) { + logger.debug("block already imported", metadata: ["hash": "\(block.hash)"]) + return + } + try await withSpan("importBlock") { span in span.attributes.blockHash = block.hash.description diff --git a/Blockchain/Sources/Blockchain/Types/Header.swift b/Blockchain/Sources/Blockchain/Types/Header.swift index 0daf4178..d4151ab4 100644 --- a/Blockchain/Sources/Blockchain/Types/Header.swift +++ b/Blockchain/Sources/Blockchain/Types/Header.swift @@ -133,7 +133,35 @@ extension Header { } } -public typealias HeaderRef = Ref
+public final class HeaderRef: Ref
, @unchecked Sendable { + public required init(_ value: Header) { + lazyHash = Lazy { + Ref(value.hash()) + } + + super.init(value) + } + + private let lazyHash: Lazy> + + public var hash: Data32 { + lazyHash.value.value + } + + override public var description: String { + "Header(hash: \(hash), timeslot: \(value.timeslot))" + } +} + +extension HeaderRef: Codable { + public convenience init(from decoder: Decoder) throws { + try self.init(.init(from: decoder)) + } + + public func encode(to encoder: Encoder) throws { + try value.encode(to: encoder) + } +} extension Header.Unsigned: Dummy { public typealias Config = ProtocolConfigRef diff --git a/Blockchain/Sources/Blockchain/Validator/ValidatorService.swift b/Blockchain/Sources/Blockchain/Validator/ValidatorService.swift index 2ff891c5..4f846539 100644 --- a/Blockchain/Sources/Blockchain/Validator/ValidatorService.swift +++ b/Blockchain/Sources/Blockchain/Validator/ValidatorService.swift @@ -1,7 +1,7 @@ import Foundation import Utils -public class ValidatorService { +public final class ValidatorService: Sendable { private let blockchain: Blockchain private let keystore: KeyStore private let safrole: SafroleService diff --git a/Boka/Sources/Boka.swift b/Boka/Sources/Boka.swift index a6a8bbb5..babaeb57 100644 --- a/Boka/Sources/Boka.swift +++ b/Boka/Sources/Boka.swift @@ -58,7 +58,7 @@ struct Boka: AsyncParsableCommand { var basePath: String? @Option(name: .long, help: "A preset config or path to chain config file.") - var chain: Genesis = .preset(.dev) + var chain: Genesis = .preset(.minimal) @Option(name: .long, help: "Listen address for RPC server. Pass 'no' to disable RPC server. Default to 127.0.0.1:9955.") var rpc: MaybeEnabled = .enabled(NetAddr(address: "127.0.0.1:9955")!) @@ -81,6 +81,12 @@ struct Boka: AsyncParsableCommand { @Option(name: .long, help: "Node name. For telemetry only.") var name: String? + @Flag(name: .long, help: "Enable local mode, whereas peers are not expected.") + var local: Bool = false + + @Flag(name: .long, help: "Enable dev mode. This is equivalent to --local --validator") + var dev: Bool = false + mutating func run() async throws { let services = try await Tracing.bootstrap("Boka", loggerOnly: true) for service in services { @@ -95,6 +101,16 @@ struct Boka: AsyncParsableCommand { logger.info("Chain: \(chain)") + if dev { + local = true + validator = true + logger.info("Dev mode enabled. Enabling local and validator.") + } + + if local { + logger.info("Local mode enabled.") + } + if let name { logger.info("Node name: \(name)") } @@ -146,7 +162,12 @@ struct Boka: AsyncParsableCommand { handlerMiddleware: .tracing(prefix: "Handler") ) - let config = Node.Config(rpc: rpcConfig, network: networkConfig, peers: peers) + let config = Node.Config( + rpc: rpcConfig, + network: networkConfig, + peers: peers, + local: local + ) let node: Node = if validator { try await ValidatorNode( diff --git a/Networking/Sources/Networking/Peer.swift b/Networking/Sources/Networking/Peer.swift index efd26b06..fe93a5cc 100644 --- a/Networking/Sources/Networking/Peer.swift +++ b/Networking/Sources/Networking/Peer.swift @@ -160,8 +160,8 @@ public final class Peer: Sendable { } // there should be only one connection per peer - public func getPeersCount() -> Int { - impl.connections.value.byId.values.count + public var peersCount: Int { + impl.connections.read { $0.byId.count } } } diff --git a/Node/Sources/Node/NetworkingProtocol/CommonEphemeral/BlockRequest.swift b/Node/Sources/Node/NetworkingProtocol/CommonEphemeral/BlockRequest.swift new file mode 100644 index 00000000..45b67c3f --- /dev/null +++ b/Node/Sources/Node/NetworkingProtocol/CommonEphemeral/BlockRequest.swift @@ -0,0 +1,12 @@ +import Utils + +public struct BlockRequest: Codable, Sendable { + public enum Direction: UInt8, Codable, Sendable { + case ascendingExcludsive = 0 + case descendingInclusive = 1 + } + + public var hash: Data32 + public var direction: Direction + public var maxBlocks: UInt32 +} diff --git a/Node/Sources/Node/NetworkingProtocol/CommonEphemeral/CERequest.swift b/Node/Sources/Node/NetworkingProtocol/CommonEphemeral/CERequest.swift index 893eea64..64992c0a 100644 --- a/Node/Sources/Node/NetworkingProtocol/CommonEphemeral/CERequest.swift +++ b/Node/Sources/Node/NetworkingProtocol/CommonEphemeral/CERequest.swift @@ -4,6 +4,7 @@ import Foundation import Networking public enum CERequest: Sendable { + case blockRequest(BlockRequest) case safroleTicket1(SafroleTicketMessage) case safroleTicket2(SafroleTicketMessage) } @@ -13,6 +14,8 @@ extension CERequest: RequestProtocol { public func encode() throws -> Data { switch self { + case let .blockRequest(message): + try JamEncoder.encode(message) case let .safroleTicket1(message): try JamEncoder.encode(message) case let .safroleTicket2(message): @@ -22,6 +25,8 @@ extension CERequest: RequestProtocol { public var kind: CommonEphemeralStreamKind { switch self { + case .blockRequest: + .blockRequest case .safroleTicket1: .safroleTicket1 case .safroleTicket2: @@ -31,6 +36,8 @@ extension CERequest: RequestProtocol { static func getType(kind: CommonEphemeralStreamKind) -> Decodable.Type { switch kind { + case .blockRequest: + BlockRequest.self case .safroleTicket1: SafroleTicketMessage.self case .safroleTicket2: @@ -42,6 +49,11 @@ extension CERequest: RequestProtocol { static func from(kind: CommonEphemeralStreamKind, data: any Decodable) -> CERequest? { switch kind { + case .blockRequest: + guard let message = data as? BlockRequest else { + return nil + } + return .blockRequest(message) case .safroleTicket1: guard let message = data as? SafroleTicketMessage else { return nil @@ -57,31 +69,3 @@ extension CERequest: RequestProtocol { } } } - -extension CERequest { - public func handle(blockchain: Blockchain) async throws -> (any Encodable)? { - switch self { - case let .safroleTicket1(message): - blockchain.publish(event: RuntimeEvents.SafroleTicketsReceived( - items: [ - ExtrinsicTickets.TicketItem( - attempt: message.attempt, - signature: message.proof - ), - ] - )) - // TODO: rebroadcast to other peers after some time - return nil - case let .safroleTicket2(message): - blockchain.publish(event: RuntimeEvents.SafroleTicketsReceived( - items: [ - ExtrinsicTickets.TicketItem( - attempt: message.attempt, - signature: message.proof - ), - ] - )) - return nil - } - } -} diff --git a/Node/Sources/Node/NetworkingProtocol/Network.swift b/Node/Sources/Node/NetworkingProtocol/Network.swift index f9870c47..a512881e 100644 --- a/Node/Sources/Node/NetworkingProtocol/Network.swift +++ b/Node/Sources/Node/NetworkingProtocol/Network.swift @@ -84,8 +84,8 @@ public final class Network: Sendable { try peer.listenAddress() } - public func getPeersCount() -> Int { - peer.getPeersCount() + public var peersCount: Int { + peer.peersCount } } diff --git a/Node/Sources/Node/NetworkingProtocol/NetworkEvents.swift b/Node/Sources/Node/NetworkingProtocol/NetworkEvents.swift new file mode 100644 index 00000000..382de1c4 --- /dev/null +++ b/Node/Sources/Node/NetworkingProtocol/NetworkEvents.swift @@ -0,0 +1,15 @@ +import Blockchain +import Utils + +public enum NetworkEvents { + public struct PeerAdded: Event { + public let info: PeerInfo + } + + public struct PeerUpdated: Event { + public let info: PeerInfo + public let newBlockHeader: HeaderRef + } + + public struct BulkSyncCompleted: Event {} +} diff --git a/Node/Sources/Node/NetworkingProtocol/NetworkManager.swift b/Node/Sources/Node/NetworkingProtocol/NetworkManager.swift index 4cbd2ab9..45cc42a2 100644 --- a/Node/Sources/Node/NetworkingProtocol/NetworkManager.swift +++ b/Node/Sources/Node/NetworkingProtocol/NetworkManager.swift @@ -7,32 +7,43 @@ import Utils private let logger = Logger(label: "NetworkManager") -enum SendTarget { +let MAX_BLOCKS_PER_REQUEST: UInt32 = 50 + +enum BroadcastTarget { case safroleStep1Validator case currentValidators } public final class NetworkManager: Sendable { - private let peerManager: PeerManager - private let network: Network - private let blockchain: Blockchain + let peerManager: PeerManager + let network: Network + let syncManager: SyncManager + let blockchain: Blockchain private let subscriptions: EventSubscriptions // This is for development only // Those peers will receive all the messages regardless the target private let devPeers: Set - public init(config: Network.Config, blockchain: Blockchain, eventBus: EventBus, devPeers: Set) async throws { - peerManager = PeerManager() + public init( + config: Network.Config, + blockchain: Blockchain, + eventBus: EventBus, + devPeers: Set + ) async throws { + peerManager = PeerManager(eventBus: eventBus) - let handler = HandlerImpl(blockchain: blockchain, peerManager: peerManager) network = try await Network( config: config, protocolConfig: blockchain.config, genesisHeader: blockchain.dataProvider.genesisBlockHash, - handler: handler + handler: HandlerImpl(blockchain: blockchain, peerManager: peerManager) + ) + syncManager = SyncManager( + blockchain: blockchain, network: network, peerManager: peerManager, eventBus: eventBus ) self.blockchain = blockchain + subscriptions = EventSubscriptions(eventBus: eventBus) self.devPeers = devPeers @@ -53,7 +64,7 @@ public final class NetworkManager: Sendable { } } - private func getSendTarget(target: SendTarget) -> Set { + private func getAddresses(target: BroadcastTarget) -> Set { // TODO: get target from onchain state switch target { case .safroleStep1Validator: @@ -65,31 +76,29 @@ public final class NetworkManager: Sendable { } } - private func send( + private func send(to: NetAddr, message: CERequest) async throws -> Data { + try await network.send(to: to, message: message) + } + + private func broadcast( + to: BroadcastTarget, message: CERequest, - target: SendTarget, - responseType: R.Type, - responseHandler: @Sendable @escaping (Result) async -> Void + responseHandler: @Sendable @escaping (Result) async -> Void ) async { - let targets = getSendTarget(target: target) + let targets = getAddresses(target: to) for target in targets { Task { logger.trace("sending message", metadata: ["target": "\(target)", "message": "\(message)"]) let res = await Result { try await network.send(to: target, message: message) } - .flatMap { data in - Result(catching: { - try JamDecoder.decode(responseType, from: data, withConfig: blockchain.config) - }) - } await responseHandler(res) } } } - private func send(message: CERequest, target: SendTarget) async { - let targets = getSendTarget(target: target) + private func broadcast(to: BroadcastTarget, message: CERequest) async { + let targets = getAddresses(target: to) for target in targets { Task { logger.trace("sending message", metadata: ["target": "\(target)", "message": "\(message)"]) @@ -103,19 +112,19 @@ public final class NetworkManager: Sendable { private func on(safroleTicketsGenerated event: RuntimeEvents.SafroleTicketsGenerated) async { logger.trace("sending tickets", metadata: ["epochIndex": "\(event.epochIndex)"]) for ticket in event.items { - await send( + await broadcast( + to: .safroleStep1Validator, message: .safroleTicket1(.init( epochIndex: event.epochIndex, attempt: ticket.ticket.attempt, proof: ticket.ticket.signature - )), - target: .safroleStep1Validator + )) ) } } - public func getPeersCount() -> Int { - network.getPeersCount() + public var peersCount: Int { + network.peersCount } } @@ -125,6 +134,40 @@ struct HandlerImpl: NetworkProtocolHandler { func handle(ceRequest: CERequest) async throws -> (any Encodable)? { switch ceRequest { + case let .blockRequest(message): + let dataProvider = blockchain.dataProvider + let count = min(MAX_BLOCKS_PER_REQUEST, message.maxBlocks) + var resp = [BlockRef]() + resp.reserveCapacity(Int(count)) + switch message.direction { + case .ascendingExcludsive: + let number = try await dataProvider.getBlockNumber(hash: message.hash) + var currentHash = message.hash + for i in 1 ... count { + let hashes = try await dataProvider.getBlockHash(byNumber: number + i) + var found = false + for hash in hashes { + let block = try await dataProvider.getBlock(hash: hash) + if block.header.parentHash == currentHash { + resp.append(block) + found = true + currentHash = hash + break + } + } + if !found { + break + } + } + case .descendingInclusive: + var hash = message.hash + for _ in 0 ..< count { + let block = try await dataProvider.getBlock(hash: hash) + resp.append(block) + hash = block.header.parentHash + } + } + return resp case let .safroleTicket1(message): blockchain.publish(event: RuntimeEvents.SafroleTicketsReceived( items: [ diff --git a/Node/Sources/Node/NetworkingProtocol/PeerManager.swift b/Node/Sources/Node/NetworkingProtocol/PeerManager.swift index 554ee097..2cd9b437 100644 --- a/Node/Sources/Node/NetworkingProtocol/PeerManager.swift +++ b/Node/Sources/Node/NetworkingProtocol/PeerManager.swift @@ -6,16 +6,28 @@ import Utils private let logger = Logger(label: "PeerManager") -public struct PeerInfo { +public struct PeerInfo: Sendable { public let address: NetAddr public internal(set) var finalized: HashAndSlot public internal(set) var heads: Set = [] + + public var best: HashAndSlot? { + heads.max { $0.timeslot < $1.timeslot } + } } +// TODOs: +// - distinguish between connect peers and offline peers +// - peer reputation +// - purge offline peers public actor PeerManager: Sendable { - private var peers: [NetAddr: PeerInfo] = [:] + private let eventBus: EventBus + + public private(set) var peers: [NetAddr: PeerInfo] = [:] - init() {} + init(eventBus: EventBus) { + self.eventBus = eventBus + } func addPeer(address: NetAddr, handshake: BlockAnnouncementHandshake) { var peer = PeerInfo( @@ -28,9 +40,11 @@ public actor PeerManager: Sendable { peers[address] = peer logger.debug("added peer", metadata: ["address": "\(address)", "finalized": "\(peer.finalized)"]) + eventBus.publish(NetworkEvents.PeerAdded(info: peer)) } func updatePeer(address: NetAddr, message: BlockAnnouncement) { + let updatedPeer: PeerInfo if var peer = peers[address] { peer.finalized = message.finalized // purge heads that are older than the finalized head @@ -38,25 +52,23 @@ public actor PeerManager: Sendable { // this means if some blocks are skipped, it is possible that we miss purge some heads // that is ancestor of the new block. but that's fine peer.heads = peer.heads.filter { head in - head.timeslot > message.finalized.timeslot && head.hash != message.header.parentHash + head.timeslot > message.finalized.timeslot && head.hash != message.header.value.parentHash } - peer.heads.insert(HashAndSlot(hash: message.header.hash(), timeslot: message.header.timeslot)) - peers[address] = peer + peer.heads.insert(HashAndSlot(hash: message.header.hash, timeslot: message.header.value.timeslot)) + updatedPeer = peer } else { // this shouldn't happen but let's handle it - peers[address] = PeerInfo( + updatedPeer = PeerInfo( address: address, finalized: message.finalized, heads: [ - HashAndSlot(hash: message.header.hash(), timeslot: message.header.timeslot), + HashAndSlot(hash: message.header.hash, timeslot: message.header.value.timeslot), ] ) } + peers[address] = updatedPeer - logger.debug("updated peer", metadata: ["address": "\(address)", "finalized": "\(peers[address]!.finalized)"]) - } - - public func getPeer(address: NetAddr) -> PeerInfo? { - peers[address] + logger.debug("updated peer", metadata: ["address": "\(address)", "finalized": "\(updatedPeer.finalized)"]) + eventBus.publish(NetworkEvents.PeerUpdated(info: updatedPeer, newBlockHeader: message.header)) } } diff --git a/Node/Sources/Node/NetworkingProtocol/SyncManager.swift b/Node/Sources/Node/NetworkingProtocol/SyncManager.swift new file mode 100644 index 00000000..171e674a --- /dev/null +++ b/Node/Sources/Node/NetworkingProtocol/SyncManager.swift @@ -0,0 +1,149 @@ +import Blockchain +import Codec +import Networking +import TracingUtils +import Utils + +private let logger = Logger(label: "SyncManager") + +let BLOCK_REQUEST_BLOCK_COUNT: UInt32 = 50 + +// TODO: +// - pick best peer +// - remove slow one +// - sync peer rotation +// - fast sync mode (no verification) +public actor SyncManager: Sendable { + private let blockchain: Blockchain + private let network: Network + private let peerManager: PeerManager + + private let subscriptions: EventSubscriptions + + // starts with bulk syncing mode, until our best have catched up with the peer best + private var bulkSyncing = false + private var syncContinuation: [CheckedContinuation] = [] + + private var networkBest: HashAndSlot? + private var networkFinalizedBest: HashAndSlot? + private var currentRequest: (peer: NetAddr, request: BlockRequest)? + + public init(blockchain: Blockchain, network: Network, peerManager: PeerManager, eventBus: EventBus) { + self.blockchain = blockchain + self.network = network + self.peerManager = peerManager + subscriptions = EventSubscriptions(eventBus: eventBus) + + Task { + await subscriptions.subscribe(NetworkEvents.PeerAdded.self, id: "SyncManager.PeerAdded") { [weak self] event in + await self?.on(peerUpdated: event.info, newBlockHeader: nil) + } + await subscriptions.subscribe(NetworkEvents.PeerUpdated.self, id: "SyncManager.PeerUpdated") { [weak self] event in + await self?.on(peerUpdated: event.info, newBlockHeader: event.newBlockHeader) + } + } + } + + public func waitForSyncCompletion() async { + if !bulkSyncing { + return + } + await withCheckedContinuation { continuation in + syncContinuation.append(continuation) + } + } + + private func on(peerUpdated info: PeerInfo, newBlockHeader: HeaderRef?) async { + // TODO: improve this to handle the case misbehaved peers seding us the wrong best + if let networkBest { + if let peerBest = info.best, peerBest.timeslot > networkBest.timeslot { + self.networkBest = peerBest + } + } else { + networkBest = info.best + } + + if let networkFinalizedBest { + if info.finalized.timeslot > networkFinalizedBest.timeslot { + self.networkFinalizedBest = info.finalized + } + } else { + networkFinalizedBest = info.finalized + } + + let currentHead = await blockchain.dataProvider.bestHead + + if bulkSyncing { + await bulkSync(currentHead: currentHead) + } else if let newBlockHeader { + importBlock(currentTimeslot: currentHead.timeslot, newHeader: newBlockHeader, peer: info.address) + } + } + + private func bulkSync(currentHead: HeadInfo) async { + if currentRequest != nil { + return + } + + for (addr, info) in await peerManager.peers { + if let peerBest = info.best, peerBest.timeslot > currentHead.timeslot { + let request = BlockRequest( + hash: currentHead.hash, + direction: .ascendingExcludsive, + maxBlocks: min(BLOCK_REQUEST_BLOCK_COUNT, peerBest.timeslot - currentHead.timeslot) + ) + currentRequest = (addr, request) + logger.debug("bulk syncing", metadata: ["peer": "\(addr)", "request": "\(request)"]) + + Task { + let resp = try await network.send(to: addr, message: .blockRequest(request)) + let decoded = try JamDecoder.decode([BlockRef].self, from: resp, withConfig: blockchain.config) + for block in decoded { + try await blockchain.importBlock(block) + } + + currentRequest = nil + + let currentHead = await blockchain.dataProvider.bestHead + if currentHead.timeslot >= networkBest!.timeslot { + if bulkSyncing { + bulkSyncing = false + syncContinuation.forEach { $0.resume() } + syncContinuation = [] + logger.info("bulk sync completed") + return + } + } + + await bulkSync(currentHead: blockchain.dataProvider.bestHead) + } + + break + } + } + } + + private func importBlock(currentTimeslot: TimeslotIndex, newHeader: HeaderRef, peer: NetAddr) { + let blockchain = blockchain + let network = network + Task.detached { + let hasBlock = try? await blockchain.dataProvider.hasBlock(hash: newHeader.hash) + if hasBlock != true { + do { + let resp = try await network.send(to: peer, message: .blockRequest(BlockRequest( + hash: newHeader.hash, + direction: .descendingInclusive, + maxBlocks: max(1, newHeader.value.timeslot - currentTimeslot) + ))) + let decoded = try JamDecoder.decode([BlockRef].self, from: resp, withConfig: blockchain.config) + // reverse to import old block first + for block in decoded.reversed() { + try await blockchain.importBlock(block) + } + } catch { + logger.warning("block request failed", metadata: ["error": "\(error)", "peer": "\(peer)"]) + } + } + } + } +} diff --git a/Node/Sources/Node/NetworkingProtocol/UniquePresistent/BlockAnnouncement.swift b/Node/Sources/Node/NetworkingProtocol/UniquePresistent/BlockAnnouncement.swift index 82910794..e280c53c 100644 --- a/Node/Sources/Node/NetworkingProtocol/UniquePresistent/BlockAnnouncement.swift +++ b/Node/Sources/Node/NetworkingProtocol/UniquePresistent/BlockAnnouncement.swift @@ -12,6 +12,6 @@ public struct BlockAnnouncementHandshake: Codable, Sendable { } public struct BlockAnnouncement: Codable, Sendable { - public var header: Header + public var header: HeaderRef public var finalized: HashAndSlot } diff --git a/Node/Sources/Node/Node.swift b/Node/Sources/Node/Node.swift index e60e7520..3b7bb7b8 100644 --- a/Node/Sources/Node/Node.swift +++ b/Node/Sources/Node/Node.swift @@ -14,14 +14,17 @@ public class Node { public var rpc: RPCConfig? public var network: NetworkConfig public var peers: [NetAddr] + public var local: Bool - public init(rpc: RPCConfig?, network: NetworkConfig, peers: [NetAddr] = []) { + public init(rpc: RPCConfig?, network: NetworkConfig, peers: [NetAddr] = [], local: Bool = false) { self.rpc = rpc self.network = network self.peers = peers + self.local = local } } + public let config: Config public let blockchain: Blockchain public let rpcServer: Server? public let timeProvider: TimeProvider @@ -35,6 +38,8 @@ public class Node { eventBus: EventBus, keystore: KeyStore ) async throws { + self.config = config + let (genesisState, genesisBlock, protocolConfig) = try await genesis.load() logger.info("Genesis: \(genesisBlock.hash)") diff --git a/Node/Sources/Node/NodeDataSource.swift b/Node/Sources/Node/NodeDataSource.swift index 3fdd3a93..8ca8a65b 100644 --- a/Node/Sources/Node/NodeDataSource.swift +++ b/Node/Sources/Node/NodeDataSource.swift @@ -30,6 +30,6 @@ public final class NodeDataSource: DataSource { } public func getPeersCount() async throws -> Int { - networkManager.getPeersCount() + networkManager.peersCount } } diff --git a/Node/Sources/Node/ValidatorNode.swift b/Node/Sources/Node/ValidatorNode.swift index 752297c6..86c1ac9b 100644 --- a/Node/Sources/Node/ValidatorNode.swift +++ b/Node/Sources/Node/ValidatorNode.swift @@ -12,16 +12,24 @@ public class ValidatorNode: Node { try await super.init(config: config, genesis: genesis, eventBus: eventBus, keystore: keystore) let scheduler = DispatchQueueScheduler(timeProvider: timeProvider) - validator = await ValidatorService( + let validator = await ValidatorService( blockchain: blockchain, keystore: keystore, eventBus: eventBus, scheduler: scheduler, dataProvider: dataProvider ) + self.validator = validator - let genesisState = try await dataProvider.getState(hash: blockchain.dataProvider.genesisBlockHash) - - await validator.on(genesis: genesisState) + let syncManager = network.syncManager + let dataProvider = blockchain.dataProvider + let local = config.local + Task { + let genesisState = try await dataProvider.getState(hash: dataProvider.genesisBlockHash) + if !local { + await syncManager.waitForSyncCompletion() + } + await validator.on(genesis: genesisState) + } } }