From 0e780553dbd7a9dfdefeb403cffeda229c1cadc2 Mon Sep 17 00:00:00 2001 From: MacOMNI <414294494@qq.com> Date: Fri, 17 Jan 2025 04:46:34 +0800 Subject: [PATCH] multiplePeers tests (#265) * multiplePeers tests * update swiftlint * update more tests * update * update more test * update swiftlint * update * update * update * update swiftlint * update tests * update node test * update logger * update test * update node * update tests * update nodetest * update test * update tests * update node test * update swiftlint * update swiftlint * add some logger * update some logger * update test * update swiftlint * update node test --- .../Sources/Blockchain/Blockchain.swift | 2 - .../BlockchainDataProvider.swift | 4 +- .../InMemoryDataProvider.swift | 2 +- .../RuntimeProtocols/Accumulation.swift | 2 +- .../RuntimeProtocols/Guaranteeing.swift | 4 +- .../Sources/Blockchain/State/StateTrie.swift | 2 +- .../Blockchain/Types/EpochMarker.swift | 12 ++ .../Sources/Blockchain/Types/Header.swift | 4 +- .../VMInvocations/HostCall/HostCalls.swift | 2 +- .../Blockchain/Validator/BlockAuthor.swift | 2 +- Codec/Sources/Codec/JamDecoder.swift | 2 +- .../Sources/Database/RocksDBBackend.swift | 2 +- .../Sources/MsQuicSwift/QuicStream.swift | 9 +- .../Sources/Networking/Connection.swift | 20 +-- Networking/Sources/Networking/Peer.swift | 35 +++-- Networking/Sources/Networking/Stream.swift | 54 ++++--- .../xcshareddata/xcschemes/Node.xcscheme | 12 ++ Node/Sources/Node/Config.swift | 4 +- .../Node/NetworkingProtocol/Network.swift | 2 +- .../NetworkingProtocol/NetworkManager.swift | 2 +- .../Node/NetworkingProtocol/PeerManager.swift | 2 +- .../Node/NetworkingProtocol/SyncManager.swift | 5 +- .../BlockAnnouncementDecoderTests.swift | 60 ++++++++ Node/Tests/NodeTests/NodeTests.swift | 144 +++++++++++++----- Node/Tests/NodeTests/Topology.swift | 3 +- Utils/Sources/Utils/Merklization/MMR.swift | 2 +- 26 files changed, 283 insertions(+), 111 deletions(-) create mode 100644 Node/Tests/NodeTests/BlockAnnouncementDecoderTests.swift diff --git a/Blockchain/Sources/Blockchain/Blockchain.swift b/Blockchain/Sources/Blockchain/Blockchain.swift index 8b578e33..463145b2 100644 --- a/Blockchain/Sources/Blockchain/Blockchain.swift +++ b/Blockchain/Sources/Blockchain/Blockchain.swift @@ -48,8 +48,6 @@ public final class Blockchain: ServiceBase, @unchecked Sendable { try await dataProvider.blockImported(block: block, state: state) publish(RuntimeEvents.BlockImported(block: block, state: state, parentState: parent)) - - logger.info("Block imported: #\(block.header.timeslot) \(block.hash)") } } diff --git a/Blockchain/Sources/Blockchain/BlockchainDataProvider/BlockchainDataProvider.swift b/Blockchain/Sources/Blockchain/BlockchainDataProvider/BlockchainDataProvider.swift index d42c9a0f..3da0693b 100644 --- a/Blockchain/Sources/Blockchain/BlockchainDataProvider/BlockchainDataProvider.swift +++ b/Blockchain/Sources/Blockchain/BlockchainDataProvider/BlockchainDataProvider.swift @@ -14,7 +14,7 @@ public enum BlockchainDataProviderError: Error, Equatable { case uncanonical(hash: Data32) } -public actor BlockchainDataProvider: Sendable { +public actor BlockchainDataProvider { public private(set) var bestHead: HeadInfo public private(set) var finalizedHead: HeadInfo private let dataProvider: BlockchainDataProviderProtocol @@ -53,7 +53,7 @@ public actor BlockchainDataProvider: Sendable { bestHead = HeadInfo(hash: block.hash, timeslot: block.header.timeslot, number: number) } - logger.debug("block imported: \(block.hash)") + logger.debug("Block imported: #\(bestHead.timeslot) \(block.hash)") } } diff --git a/Blockchain/Sources/Blockchain/BlockchainDataProvider/InMemoryDataProvider.swift b/Blockchain/Sources/Blockchain/BlockchainDataProvider/InMemoryDataProvider.swift index 5ddd0dc9..93d66c7c 100644 --- a/Blockchain/Sources/Blockchain/BlockchainDataProvider/InMemoryDataProvider.swift +++ b/Blockchain/Sources/Blockchain/BlockchainDataProvider/InMemoryDataProvider.swift @@ -1,6 +1,6 @@ import Utils -public actor InMemoryDataProvider: Sendable { +public actor InMemoryDataProvider { public private(set) var heads: Set public private(set) var finalizedHead: Data32 diff --git a/Blockchain/Sources/Blockchain/RuntimeProtocols/Accumulation.swift b/Blockchain/Sources/Blockchain/RuntimeProtocols/Accumulation.swift index 5ccfdc2e..9885d90d 100644 --- a/Blockchain/Sources/Blockchain/RuntimeProtocols/Accumulation.swift +++ b/Blockchain/Sources/Blockchain/RuntimeProtocols/Accumulation.swift @@ -319,7 +319,7 @@ extension Accumulation { let rightQueueItems = accumulationQueue.array[index...] let leftQueueItems = accumulationQueue.array[0 ..< index] - var allQueueItems = rightQueueItems.flatMap { $0 } + leftQueueItems.flatMap { $0 } + newQueueItems + var allQueueItems = rightQueueItems.flatMap(\.self) + leftQueueItems.flatMap(\.self) + newQueueItems editAccumulatedItems(items: &allQueueItems, accumulatedPackages: Set(zeroPrereqReports.map(\.packageSpecification.workPackageHash))) diff --git a/Blockchain/Sources/Blockchain/RuntimeProtocols/Guaranteeing.swift b/Blockchain/Sources/Blockchain/RuntimeProtocols/Guaranteeing.swift index cdfd57c0..3b08ee14 100644 --- a/Blockchain/Sources/Blockchain/RuntimeProtocols/Guaranteeing.swift +++ b/Blockchain/Sources/Blockchain/RuntimeProtocols/Guaranteeing.swift @@ -185,8 +185,8 @@ extension Guaranteeing { } let recentWorkPackageHashes: Set = Set(recentHistory.items.flatMap(\.lookup.keys)) - let accumulateHistoryReports = Set(accumulationHistory.array.flatMap { $0 }) - let accumulateQueueReports = Set(accumulationQueue.array.flatMap { $0 } + let accumulateHistoryReports = Set(accumulationHistory.array.flatMap(\.self)) + let accumulateQueueReports = Set(accumulationQueue.array.flatMap(\.self) .flatMap(\.workReport.refinementContext.prerequisiteWorkPackages)) let pendingWorkReportHashes = Set(reports.array.flatMap { $0?.workReport.refinementContext.prerequisiteWorkPackages ?? [] }) let pipelinedWorkReportHashes = recentWorkPackageHashes.union(accumulateHistoryReports).union(accumulateQueueReports) diff --git a/Blockchain/Sources/Blockchain/State/StateTrie.swift b/Blockchain/Sources/Blockchain/State/StateTrie.swift index 7c7dffb5..627461b5 100644 --- a/Blockchain/Sources/Blockchain/State/StateTrie.swift +++ b/Blockchain/Sources/Blockchain/State/StateTrie.swift @@ -95,7 +95,7 @@ public enum StateTrieError: Error { case invalidParent } -public actor StateTrie: Sendable { +public actor StateTrie { private let backend: StateBackendProtocol public private(set) var rootHash: Data32 private var nodes: [Data: TrieNode] = [:] diff --git a/Blockchain/Sources/Blockchain/Types/EpochMarker.swift b/Blockchain/Sources/Blockchain/Types/EpochMarker.swift index f7c5a700..8ce21ab3 100644 --- a/Blockchain/Sources/Blockchain/Types/EpochMarker.swift +++ b/Blockchain/Sources/Blockchain/Types/EpochMarker.swift @@ -21,3 +21,15 @@ public struct EpochMarker: Sendable, Equatable, Codable { self.validators = validators } } + +extension EpochMarker: Dummy { + public typealias Config = ProtocolConfigRef + + public static func dummy(config: Config) -> EpochMarker { + EpochMarker( + entropy: Data32(), + ticketsEntropy: Data32(), + validators: try! ConfigFixedSizeArray(config: config, defaultValue: Data32()) + ) + } +} diff --git a/Blockchain/Sources/Blockchain/Types/Header.swift b/Blockchain/Sources/Blockchain/Types/Header.swift index 8c742810..696fdf1c 100644 --- a/Blockchain/Sources/Blockchain/Types/Header.swift +++ b/Blockchain/Sources/Blockchain/Types/Header.swift @@ -168,13 +168,13 @@ extension HeaderRef: Codable { extension Header.Unsigned: Dummy { public typealias Config = ProtocolConfigRef - public static func dummy(config _: Config) -> Header.Unsigned { + public static func dummy(config: Config) -> Header.Unsigned { Header.Unsigned( parentHash: Data32(), priorStateRoot: Data32(), extrinsicsHash: Data32(), timeslot: 0, - epoch: nil, + epoch: EpochMarker.dummy(config: config), winningTickets: nil, offendersMarkers: [], authorIndex: 0, diff --git a/Blockchain/Sources/Blockchain/VMInvocations/HostCall/HostCalls.swift b/Blockchain/Sources/Blockchain/VMInvocations/HostCall/HostCalls.swift index d5616ce0..5267f74b 100644 --- a/Blockchain/Sources/Blockchain/VMInvocations/HostCall/HostCalls.swift +++ b/Blockchain/Sources/Blockchain/VMInvocations/HostCall/HostCalls.swift @@ -909,7 +909,7 @@ public class Invoke: HostCall { self.context = context } - public func _callImpl(config: ProtocolConfigRef, state: VMState) async throws { + public func _callImpl(config _: ProtocolConfigRef, state: VMState) async throws { let pvmIndex: UInt64 = state.readRegister(Registers.Index(raw: 7)) let startAddr: UInt32 = state.readRegister(Registers.Index(raw: 8)) diff --git a/Blockchain/Sources/Blockchain/Validator/BlockAuthor.swift b/Blockchain/Sources/Blockchain/Validator/BlockAuthor.swift index a55f72e8..75baa9d3 100644 --- a/Blockchain/Sources/Blockchain/Validator/BlockAuthor.swift +++ b/Blockchain/Sources/Blockchain/Validator/BlockAuthor.swift @@ -163,7 +163,7 @@ public final class BlockAuthor: ServiceBase2, @unchecked Sendable { await withSpan("BlockAuthor.newBlock", logger: logger) { _ in // TODO: add timeout let block = try await createNewBlock(timeslot: timeslot, claim: claim) - logger.info("New block created: #\(block.header.timeslot) \(block.hash) on parent #\(block.header.parentHash)") + logger.debug("New block created: #\(block.header.timeslot) \(block.hash) on parent #\(block.header.parentHash)") publish(RuntimeEvents.BlockAuthored(block: block)) } } diff --git a/Codec/Sources/Codec/JamDecoder.swift b/Codec/Sources/Codec/JamDecoder.swift index 2f6339c0..41497f27 100644 --- a/Codec/Sources/Codec/JamDecoder.swift +++ b/Codec/Sources/Codec/JamDecoder.swift @@ -310,7 +310,7 @@ private struct JamKeyedDecodingContainer: KeyedDecodingContainerPr throw DecodingError.dataCorrupted( DecodingError.Context( codingPath: decoder.codingPath, - debugDescription: "Invalid boolean value: \(byte)" + debugDescription: "Decode key \(key.stringValue) with invalid boolean value: \(byte)" ) ) } diff --git a/Database/Sources/Database/RocksDBBackend.swift b/Database/Sources/Database/RocksDBBackend.swift index 83034f1f..a56dddbd 100644 --- a/Database/Sources/Database/RocksDBBackend.swift +++ b/Database/Sources/Database/RocksDBBackend.swift @@ -126,7 +126,7 @@ extension RocksDBBackend: BlockchainDataProviderProtocol { } public func add(block: BlockRef) async throws { - logger.trace("add(block:) \(block.hash)") + logger.debug("add(block:) \(block.hash)") // TODO: batch put diff --git a/Networking/Sources/MsQuicSwift/QuicStream.swift b/Networking/Sources/MsQuicSwift/QuicStream.swift index 22f16c4b..5a10cc3a 100644 --- a/Networking/Sources/MsQuicSwift/QuicStream.swift +++ b/Networking/Sources/MsQuicSwift/QuicStream.swift @@ -94,17 +94,16 @@ public final class QuicStream: Sendable { } public func send(data: Data, start: Bool = false, finish: Bool = false) throws { - logger.trace("Sending \(data.count) bytes") - try storage.read { storage in guard let storage, let api = storage.connection.api else { throw QuicError.alreadyClosed } + logger.debug("\(storage.connection.id) \(id) sending \(data.count) bytes data \(data.toHexString())") let messageLength = data.count if messageLength == 0 { - logger.trace("No data to send.") + logger.debug("No data to send.") throw SendError.emptyData // Throw a specific error or return } @@ -173,7 +172,7 @@ private class StreamHandle { fileprivate func callbackHandler(event: UnsafePointer) -> QuicStatus { switch event.pointee.Type { case QUIC_STREAM_EVENT_SEND_COMPLETE: - logger.trace("Stream send completed") + logger.debug("Stream send completed") if let clientContext = event.pointee.SEND_COMPLETE.ClientContext { clientContext.deallocate() // !! deallocate } @@ -188,7 +187,7 @@ private class StreamHandle { totalSize += Int(buffer.Length) } - logger.trace("Stream received \(totalSize) bytes") + logger.debug("Stream received \(totalSize) bytes") var receivedData = Data(capacity: totalSize) diff --git a/Networking/Sources/Networking/Connection.swift b/Networking/Sources/Networking/Connection.swift index b02380ac..44a3e693 100644 --- a/Networking/Sources/Networking/Connection.swift +++ b/Networking/Sources/Networking/Connection.swift @@ -176,7 +176,7 @@ public final class Connection: Sendable, ConnectionInfoP let data = try request.encode() let kind = request.kind let stream = try createStream(kind: kind) - try stream.send(message: data) + try await stream.send(message: data) return try await receiveData(stream: stream) } @@ -234,7 +234,7 @@ public final class Connection: Sendable, ConnectionInfoP impl.addStream(stream) Task { guard let byte = await stream.receiveByte() else { - logger.debug("stream closed without receiving kind. status: \(stream.status)") + logger.warning("stream closed without receiving kind. status: \(stream.status)") return } if let upKind = Handler.PresistentHandler.StreamKind(rawValue: byte) { @@ -246,14 +246,14 @@ public final class Connection: Sendable, ConnectionInfoP if existingStream.stream.id < stream.stream.id { // The new stream has a higher ID, so reset the existing one existingStream.close(abort: false) - logger.debug( + logger.info( "Reset older UP stream with lower ID", metadata: ["existingStreamId": "\(existingStream.stream.id)", "newStreamId": "\(stream.stream.id)"] ) } else { // The existing stream has a higher ID or is equal, so reset the new one stream.close(abort: false) - logger.debug( + logger.info( "Duplicate UP stream detected, closing new stream with lower or equal ID", metadata: ["existingStreamId": "\(existingStream.stream.id)", "newStreamId": "\(stream.stream.id)"] ) @@ -278,9 +278,9 @@ public final class Connection: Sendable, ConnectionInfoP let data = try await receiveData(stream: stream) let request = try decoder.decode(data: data) let resp = try await impl.ephemeralStreamHandler.handle(connection: self, request: request) - try stream.send(message: resp, finish: true) + try await stream.send(message: resp, finish: true) } catch { - logger.debug("Failed to handle request", metadata: ["error": "\(error)"]) + logger.error("Failed to handle request", metadata: ["error": "\(error)"]) stream.close(abort: true) } } @@ -318,7 +318,7 @@ private func receiveMaybeData(stream: Stream) async throws - // TODO: pick better value guard length < 1024 * 1024 * 10 else { stream.close(abort: true) - logger.debug("Invalid request length: \(length)") + logger.error("Invalid request length: \(length)") // TODO: report bad peer throw ConnectionError.invalidLength } @@ -336,9 +336,9 @@ func presistentStreamRunLoop( do { try await handler.streamOpened(connection: connection, stream: stream, kind: kind) } catch { - logger.debug( + logger.error( "Failed to setup presistent stream", - metadata: ["connectionId": "\(connection.id)", "streamId": "\(stream.id)", "kind": "\(kind)", "error": "\(error)"] + metadata: ["connectionId": "\(connection.id)", "streamId": "\(stream.id)", "error": "\(error)"] ) } logger.debug( @@ -352,7 +352,7 @@ func presistentStreamRunLoop( try await handler.handle(connection: connection, message: msg) } } catch { - logger.debug("UP stream run loop failed: \(error)") + logger.error("UP stream run loop failed: \(error) \(connection.id) \(stream.id)") stream.close(abort: true) } diff --git a/Networking/Sources/Networking/Peer.swift b/Networking/Sources/Networking/Peer.swift index 5ca00014..ca97e5c1 100644 --- a/Networking/Sources/Networking/Peer.swift +++ b/Networking/Sources/Networking/Peer.swift @@ -186,19 +186,24 @@ public final class Peer: Sendable { } for connection in connections { if let stream = try? connection.createPreistentStream(kind: kind) { - let res = Result(catching: { try stream.send(message: messageData) }) - switch res { - case .success: - break - case let .failure(error): - impl.logger.warning( - "Failed to send message", - metadata: [ - "connectionId": "\(connection.id)", - "kind": "\(kind)", - "error": "\(error)", - ] - ) + Task { + let res = await Result { + try await stream.send(message: messageData) + } + switch res { + case .success: + break + case let .failure(error): + impl.logger.warning( + "Failed to send message", + metadata: [ + "connectionId": "\(connection.id)", + "kind": "\(kind)", + "message": "\(messageData)", + "error": "\(error)", + ] + ) + } } } } @@ -298,7 +303,7 @@ final class PeerImpl: Sendable { var state = reconnectStates.read { reconnectStates in reconnectStates[address] ?? .init() } - + logger.debug("reconnecting to \(address) \(state.attempt) attempts") guard state.attempt < maxRetryAttempts else { logger.warning("reconnecting to \(address) exceeded max attempts") return @@ -338,6 +343,7 @@ final class PeerImpl: Sendable { states[connection.id] ?? .init() } + logger.debug("Reopen attempt for stream \(kind) on connection \(connection.id) \(state.attempt) attempts") guard state.attempt < maxRetryAttempts else { logger.warning("Reopen attempt for stream \(kind) on connection \(connection.id) exceeded max attempts") return @@ -391,7 +397,6 @@ private struct PeerEventHandler: QuicEventHandler { ) return .code(.alpnNegFailure) } - logger.debug("new connection: \(addr) role: \(role)") if impl.addConnection(connection, addr: addr, role: role) { return .code(.success) } else { diff --git a/Networking/Sources/Networking/Stream.swift b/Networking/Sources/Networking/Stream.swift index 90bf76fa..bf1839b7 100644 --- a/Networking/Sources/Networking/Stream.swift +++ b/Networking/Sources/Networking/Stream.swift @@ -27,10 +27,39 @@ public protocol StreamProtocol { var id: UniqueId { get } var status: StreamStatus { get } - func send(message: Message) throws + func send(message: Message) async throws func close(abort: Bool) } +actor StreamSender { + private let stream: QuicStream + private var status: StreamStatus + + init(stream: QuicStream, status: StreamStatus) { + self.stream = stream + self.status = status + } + + func send(message: Data, finish: Bool = false) throws { + guard status == .open || status == .sendOnly else { + throw StreamError.notOpen + } + + let length = UInt32(message.count) + var lengthData = Data(repeating: 0, count: 4) + lengthData.withUnsafeMutableBytes { ptr in + ptr.storeBytes(of: UInt32(littleEndian: length), as: UInt32.self) + } + + try stream.send(data: lengthData, finish: false) + try stream.send(data: message, finish: finish) + + if finish { + status = .receiveOnly + } + } +} + final class Stream: Sendable, StreamProtocol { typealias Message = Handler.PresistentHandler.Message @@ -41,6 +70,7 @@ final class Stream: Sendable, StreamProtocol { private let channel: Channel = .init(capacity: 100) private let nextData: Mutex = .init(nil) private let _status: ThreadSafeContainer = .init(.open) + private let sender: StreamSender let connectionId: UniqueId let kind: Handler.PresistentHandler.StreamKind? @@ -63,10 +93,11 @@ final class Stream: Sendable, StreamProtocol { self.connectionId = connectionId self.impl = impl self.kind = kind + sender = StreamSender(stream: stream, status: .open) } - public func send(message: Handler.PresistentHandler.Message) throws { - try send(message: message.encode(), finish: false) + public func send(message: Handler.PresistentHandler.Message) async throws { + try await send(message: message.encode(), finish: false) } /// send raw data @@ -91,21 +122,8 @@ final class Stream: Sendable, StreamProtocol { } // send message with length prefix - func send(message: Data, finish: Bool = false) throws { - guard canSend else { - throw StreamError.notOpen - } - - let length = UInt32(message.count) - var lengthData = Data(repeating: 0, count: 4) - lengthData.withUnsafeMutableBytes { ptr in - ptr.storeBytes(of: UInt32(littleEndian: length), as: UInt32.self) - } - try stream.send(data: lengthData, finish: false) - try stream.send(data: message, finish: finish) - if finish { - status = .receiveOnly - } + func send(message: Data, finish: Bool = false) async throws { + try await sender.send(message: message, finish: finish) } func received(data: Data?) { diff --git a/Node/.swiftpm/xcode/xcshareddata/xcschemes/Node.xcscheme b/Node/.swiftpm/xcode/xcshareddata/xcschemes/Node.xcscheme index 4490e214..8501d959 100644 --- a/Node/.swiftpm/xcode/xcshareddata/xcschemes/Node.xcscheme +++ b/Node/.swiftpm/xcode/xcshareddata/xcschemes/Node.xcscheme @@ -29,6 +29,18 @@ selectedLauncherIdentifier = "Xcode.DebuggerFoundation.Launcher.LLDB" shouldUseLaunchSchemeArgsEnv = "YES" shouldAutocreateTestPlan = "YES"> + + + + + + BlockchainDataProvider { switch self { case let .rocksDB(path): - logger.info("Using RocksDB backend at \(path.absoluteString)") + logger.debug("Using RocksDB backend at \(path.absoluteString)") let backend = try await RocksDBBackend( path: path, config: chainspec.getConfig(), @@ -27,7 +27,7 @@ public enum Database { ) return try await BlockchainDataProvider(backend) case .inMemory: - logger.info("Using in-memory backend") + logger.debug("Using in-memory backend") let genesisBlock = try chainspec.getBlock() let genesisStateData = try chainspec.getState() let backend = try StateBackend(InMemoryBackend(), config: chainspec.getConfig(), rootHash: Data32()) diff --git a/Node/Sources/Node/NetworkingProtocol/Network.swift b/Node/Sources/Node/NetworkingProtocol/Network.swift index 31001360..ee3880fc 100644 --- a/Node/Sources/Node/NetworkingProtocol/Network.swift +++ b/Node/Sources/Node/NetworkingProtocol/Network.swift @@ -129,7 +129,7 @@ struct PresistentStreamHandlerImpl: PresistentStreamHandler { } func handle(connection: any ConnectionInfoProtocol, message: Message) async throws { - impl.logger.trace("handling message: \(message) from \(connection.id)") + impl.logger.debug("handling message: \(message) from \(connection.id)") try await impl.handler.handle(connection: connection, upMessage: message) } diff --git a/Node/Sources/Node/NetworkingProtocol/NetworkManager.swift b/Node/Sources/Node/NetworkingProtocol/NetworkManager.swift index bbc687bd..e534828c 100644 --- a/Node/Sources/Node/NetworkingProtocol/NetworkManager.swift +++ b/Node/Sources/Node/NetworkingProtocol/NetworkManager.swift @@ -273,7 +273,7 @@ struct HandlerImpl: NetworkProtocolHandler { heads: headsWithTimeslot ) - try stream.send(message: .blockAnnouncementHandshake(handshake)) + try await stream.send(message: .blockAnnouncementHandshake(handshake)) } } } diff --git a/Node/Sources/Node/NetworkingProtocol/PeerManager.swift b/Node/Sources/Node/NetworkingProtocol/PeerManager.swift index f65462b6..afb4a6c8 100644 --- a/Node/Sources/Node/NetworkingProtocol/PeerManager.swift +++ b/Node/Sources/Node/NetworkingProtocol/PeerManager.swift @@ -20,7 +20,7 @@ public struct PeerInfo: Sendable { // - distinguish between connect peers and offline peers // - peer reputation // - purge offline peers -public actor PeerManager: Sendable { +public actor PeerManager { private let eventBus: EventBus public private(set) var peers: [Data: PeerInfo] = [:] diff --git a/Node/Sources/Node/NetworkingProtocol/SyncManager.swift b/Node/Sources/Node/NetworkingProtocol/SyncManager.swift index 7fa410b8..305fa38b 100644 --- a/Node/Sources/Node/NetworkingProtocol/SyncManager.swift +++ b/Node/Sources/Node/NetworkingProtocol/SyncManager.swift @@ -20,7 +20,7 @@ enum SyncStatus { // - sync peer rotation // - fast sync mode (no verification) // - re-enter to bulk sync mode if new peer with better head is discovered -public actor SyncManager: Sendable { +public actor SyncManager { private let blockchain: Blockchain private let network: Network private let peerManager: PeerManager @@ -139,7 +139,7 @@ public actor SyncManager: Sendable { status = .syncing syncContinuation.forEach { $0.resume() } syncContinuation = [] - logger.info("sync completed") + logger.debug("sync completed") } } @@ -163,6 +163,7 @@ public actor SyncManager: Sendable { } // reverse to import old block first for block in blocks.reversed() { + logger.debug("blocks reversed", metadata: ["hash": "\(String(describing: block.hash))"]) try await blockchain.importBlock(block) } } catch { diff --git a/Node/Tests/NodeTests/BlockAnnouncementDecoderTests.swift b/Node/Tests/NodeTests/BlockAnnouncementDecoderTests.swift new file mode 100644 index 00000000..199078a8 --- /dev/null +++ b/Node/Tests/NodeTests/BlockAnnouncementDecoderTests.swift @@ -0,0 +1,60 @@ +import Blockchain +import Codec +import Foundation +import Testing +import Utils + +@testable import Node + +final class BlockAnnouncementDecoderTests { + @Test + func decodeInvalidEpoch() throws { + let hexString = """ + ed0100007d810035df13056b2e28c4c331a7a53094e97a2b8bceff223ecd34b6cffd2d9ab69998d70\ + 574f759fc057a99604de71d302cb16d365985a180bd8f3387d2d736189d15af832dfe4f67744008b6\ + 2c334b569fcbb4c261e0f065655697306ca252ff000000010000000000000000000000000000000000\ + 0000000000000000000000000000000000000000000000000000000000000000000000000000000000\ + 5e465beb01dbafe160ce8216047f2155dd0569f058afd52dcea601025a8d161d2c5da3a09d66a5d43\ + e7d523e6108736db99d2c2f08fbdcb72a4e8e5aced3482a8552b36000b454fdf6b5418e22ef5d6609\ + e8fc6b816822f02727e085c514d560000000004fbacc2baea15e2e69185623e37a51ee9372ebd80dd\ + 405a34d24a0a40f79e1d92d49a247d13acca8ccaf7cb6d3eb9ef10b3ef29a93e01e9ddce0a4266c4a\ + 2c0e96a3b8c26c8ac6c9063ed7dcdb18479736c7c7be7fbfd006b4cb4b44ffa948154fbacc2baea15\ + e2e69185623e37a51ee9372ebd80dd405a34d24a0a40f79e1d993ccc91f31f5d8657ef98d203ddcc7\ + 38482fe2caaa41f51d983239ac0dbbba04ca820ff3eb8d2ab3b9e1421f7955d876776b0c293f2e31e\ + aa2da18c3b580f5067d810035df13056b2e28c4c331a7a53094e97a2b8bceff223ecd34b6cffd2d9a + """ + let hex = hexString.replacingOccurrences(of: "\\s+", with: "", options: .regularExpression) + let data = Data(fromHexString: hex)! + let config = ProtocolConfigRef.minimal + #expect(throws: DecodingError.self) { + _ = try JamDecoder.decode(BlockAnnouncement.self, from: data, withConfig: config) + } + } + + @Test + func decodeNotEnoughDataToDecode() throws { + let hexString = """ + 371134fcf189799fea21d2b9a50bd8352c7814a120617700e1f984af1cb3698fb1aed1999185b3800\ + 51235aa97f33306a9682e486b12c6016e6df19ea71f1ff6189d15af832dfe4f67744008b62c334b56\ + 9fcbb4c261e0f065655697306ca252ab00000001000000000000000000000000000000000000000000\ + 0000000000000000000000000000000000000000000000000000000000000000000000000000000000\ + 5e465beb01dbafe160ce8216047f2155dd0569f058afd52dcea601025a8d161d2c5da3a09d66a5d43\ + e7d523e6108736db99d2c2f08fbdcb72a4e8e5aced3482a8552b36000b454fdf6b5418e22ef5d6609\ + e8fc6b816822f02727e085c514d5605d069d7591ea55d9cc7adb9e8eaff66a1688d075c69fa94815e\ + f0fe9a56025699a565d486952598747cb9b3b78bb97694100a1cbf8d7af4eb2ea740b844b41d19a99\ + 09db141ee10d89f9bff13d651831cc91098bdf30c917ce89d1b8416af719000000004fbacc2baea15\ + e2e69185623e37a51ee9372ebd80dd405a34d24a0a40f79e1d92d49a247d13acca8ccaf7cb6d3eb9e\ + f10b3ef29a93e01e9ddce0a4266c4a2c0e96a3b8c26c8ac6c9063ed7dcdb18479736c7c7be7fbfd00\ + 6b4cb4b44ffa948154fbacc2baea15e2e69185623e37a51ee9372ebd80dd405a34d24a0a40f79e1d9\ + 575f1115fe3f8a903ac06a3578eeb154ef3f75d2d18fcabfafa4f14530a27f050258515937b7f7bfe\ + f6bf7aa67adf39b59057bbbe0433c9e8a057917c836f814371134fcf189799fea21d2b9a50bd8352c\ + 7814a120617700e1f984af1cb3698f00000000 + """ + let hex = hexString.replacingOccurrences(of: "\\s+", with: "", options: .regularExpression) + let data = Data(fromHexString: hex)! + let config = ProtocolConfigRef.minimal + #expect(throws: DecodingError.self) { + _ = try JamDecoder.decode(BlockAnnouncement.self, from: data, withConfig: config) + } + } +} diff --git a/Node/Tests/NodeTests/NodeTests.swift b/Node/Tests/NodeTests/NodeTests.swift index 6a9e5a5a..ae048af8 100644 --- a/Node/Tests/NodeTests/NodeTests.swift +++ b/Node/Tests/NodeTests/NodeTests.swift @@ -11,16 +11,15 @@ final class NodeTests { return tmpDir.appendingPathComponent("\(UUID().uuidString)") }() - func getDatabase(_ idx: Int) -> Database { - Database.rocksDB(path: path.appendingPathComponent("\(idx)")) + func getDatabase(_ index: Int) -> Database { + Database.rocksDB(path: path.appendingPathComponent("\(index)")) } deinit { try? FileManager.default.removeItem(at: path) } - @Test - func validatorNodeInMemory() async throws { + @Test func validatorNodeInMemory() async throws { let (nodes, scheduler) = try await Topology( nodes: [NodeDescription(isValidator: true)] ).build(genesis: .preset(.minimal)) @@ -33,7 +32,9 @@ final class NodeTests { // Advance time for _ in 0 ..< 10 { - await scheduler.advance(by: TimeInterval(validatorNode.blockchain.config.value.slotPeriodSeconds)) + await scheduler.advance( + by: TimeInterval(validatorNode.blockchain.config.value.slotPeriodSeconds) + ) await storeMiddlware.wait() } @@ -49,8 +50,7 @@ final class NodeTests { #expect(try await validatorNode.blockchain.dataProvider.hasBlock(hash: newBestHead.hash)) } - @Test - func validatorNodeRocksDB() async throws { + @Test func validatorNodeRocksDB() async throws { let (nodes, scheduler) = try await Topology( nodes: [NodeDescription(isValidator: true, database: getDatabase(0))] ).build(genesis: .preset(.minimal)) @@ -63,7 +63,9 @@ final class NodeTests { // Advance time for _ in 0 ..< 10 { - await scheduler.advance(by: TimeInterval(validatorNode.blockchain.config.value.slotPeriodSeconds)) + await scheduler.advance( + by: TimeInterval(validatorNode.blockchain.config.value.slotPeriodSeconds) + ) await storeMiddlware.wait() } @@ -79,8 +81,7 @@ final class NodeTests { #expect(try await validatorNode.blockchain.dataProvider.hasBlock(hash: newBestHead.hash)) } - @Test - func sync() async throws { + @Test func sync() async throws { // Create validator and full node let (nodes, scheduler) = try await Topology( nodes: [ @@ -95,7 +96,9 @@ final class NodeTests { // Advance time to produce blocks for _ in 0 ..< 10 { - await scheduler.advance(by: TimeInterval(validatorNode.blockchain.config.value.slotPeriodSeconds)) + await scheduler.advance( + by: TimeInterval(validatorNode.blockchain.config.value.slotPeriodSeconds) + ) await validatorStoreMiddlware.wait() await nodeStoreMiddlware.wait() } @@ -111,7 +114,9 @@ final class NodeTests { // Produce more blocks for _ in 0 ..< 10 { - await scheduler.advance(by: TimeInterval(validatorNode.blockchain.config.value.slotPeriodSeconds)) + await scheduler.advance( + by: TimeInterval(validatorNode.blockchain.config.value.slotPeriodSeconds) + ) await validatorStoreMiddlware.wait() await nodeStoreMiddlware.wait() } @@ -129,48 +134,109 @@ final class NodeTests { #expect(newValidatorBestHead.timeslot > validatorBestHead.timeslot) } - @Test - func multiplePeers() async throws { + @Test func multiplePeers() async throws { // Create multiple nodes + var nodeDescriptions: [NodeDescription] = [ + NodeDescription(isValidator: true, database: getDatabase(0)), + NodeDescription(isValidator: true, devSeed: 1, database: getDatabase(1)), + ] + // Add 18 non-validator nodes + for i in 2 ... 19 { + nodeDescriptions.append(NodeDescription(devSeed: UInt32(i), database: getDatabase(i))) + } + let (nodes, scheduler) = try await Topology( - nodes: [ - NodeDescription(isValidator: true, database: getDatabase(0)), - NodeDescription(isValidator: true, devSeed: 1, database: getDatabase(1)), - NodeDescription(devSeed: 2, database: getDatabase(2)), - NodeDescription(devSeed: 3, database: .inMemory), - ], - connections: [(0, 1), (0, 2), (0, 3), (1, 2), (1, 3)] + nodes: nodeDescriptions, + connections: (0 ..< 20).flatMap { i in + (i + 1 ..< 20).map { j in (i, j) } // Fully connected topology + } ).build(genesis: .preset(.minimal)) let (validator1, validator1StoreMiddlware) = nodes[0] let (validator2, validator2StoreMiddlware) = nodes[1] - let (node1, node1StoreMiddlware) = nodes[2] - let (node2, node2StoreMiddlware) = nodes[3] - try await Task.sleep(for: .milliseconds(500)) - - // Verify connections - #expect(node1.network.peersCount == 2) - #expect(node2.network.peersCount == 2) + // Extract non-validator nodes and their middleware + let nonValidatorNodes = nodes[2...].map(\.self) - // Advance time and verify sync - for _ in 0 ..< 10 { - await scheduler.advance(by: TimeInterval(validator1.blockchain.config.value.slotPeriodSeconds)) + try await Task.sleep(for: .milliseconds(nodes.count * 100)) + let (node1, _) = nonValidatorNodes[0] + let (node2, _) = nonValidatorNodes[1] + // Verify connections for a sample of non-validator nodes + #expect(node1.network.peersCount == 19) + #expect(node2.network.peersCount == 19) + // Advance time to produce blocks + for _ in 0 ..< 20 { + await scheduler.advance( + by: TimeInterval(validator1.blockchain.config.value.slotPeriodSeconds) + ) await validator1StoreMiddlware.wait() await validator2StoreMiddlware.wait() - await node1StoreMiddlware.wait() - await node2StoreMiddlware.wait() + + for (_, middleware) in nonValidatorNodes { + await middleware.wait() + } + } + let validator1BestHead = await validator1.dataProvider.bestHead + let validator2BestHead = await validator2.dataProvider.bestHead + + for (node, _) in nonValidatorNodes { + let nodeBestHead = await node.dataProvider.bestHead + #expect(validator1BestHead.hash == nodeBestHead.hash) + #expect(validator2BestHead.hash == nodeBestHead.hash) + } + } + + @Test func moreMultiplePeers() async throws { + // Create multiple nodes + var nodeDescriptions: [NodeDescription] = [ + NodeDescription(isValidator: true, database: getDatabase(0)), + NodeDescription(isValidator: true, devSeed: 1, database: getDatabase(1)), + ] + + // Add 18 non-validator nodes + for i in 2 ... 19 { + nodeDescriptions.append(NodeDescription(devSeed: UInt32(i), database: .inMemory)) } - try await Task.sleep(for: .milliseconds(1000)) + let (nodes, scheduler) = try await Topology( + nodes: nodeDescriptions, + connections: (0 ..< 2).flatMap { i in + (2 ..< 20).map { j in (i, j) } // connected topology + } + ).build(genesis: .preset(.minimal)) + + let (validator1, validator1StoreMiddlware) = nodes[0] + let (validator2, validator2StoreMiddlware) = nodes[1] + + // Extract non-validator nodes and their middleware + let nonValidatorNodes = nodes[2...].map(\.self) + try await Task.sleep(for: .milliseconds(nodes.count * 100)) + let (node1, _) = nonValidatorNodes[0] + let (node2, _) = nonValidatorNodes[1] + // Verify connections for a sample of non-validator nodes + #expect(node1.network.peersCount == 2) + #expect(node2.network.peersCount == 2) + // Advance time to produce blocks + for _ in 0 ..< 30 { + await scheduler.advance( + by: TimeInterval(validator1.blockchain.config.value.slotPeriodSeconds) + ) + await validator1StoreMiddlware.wait() + await validator2StoreMiddlware.wait() + + for (_, middleware) in nonValidatorNodes { + await middleware.wait() + } + } + try await Task.sleep(for: .milliseconds(nodes.count * 100)) let validator1BestHead = await validator1.dataProvider.bestHead let validator2BestHead = await validator2.dataProvider.bestHead - let node1BestHead = await node1.dataProvider.bestHead - let node2BestHead = await node2.dataProvider.bestHead - #expect(validator1BestHead.hash == node1BestHead.hash) - #expect(validator1BestHead.hash == node2BestHead.hash) - #expect(validator2BestHead.hash == node1BestHead.hash) + for (node, _) in nonValidatorNodes { + let nodeBestHead = await node.dataProvider.bestHead + #expect(validator1BestHead.hash == nodeBestHead.hash) + #expect(validator2BestHead.hash == nodeBestHead.hash) + } } } diff --git a/Node/Tests/NodeTests/Topology.swift b/Node/Tests/NodeTests/Topology.swift index be8654c2..7b42fb14 100644 --- a/Node/Tests/NodeTests/Topology.swift +++ b/Node/Tests/NodeTests/Topology.swift @@ -63,7 +63,8 @@ struct Topology { for (from, to) in connections { let fromNode = ret[from].0 let toNode = ret[to].0 - _ = try fromNode.network.network.connect(to: toNode.network.network.listenAddress(), role: .validator) + let conn = try fromNode.network.network.connect(to: toNode.network.network.listenAddress(), role: .validator) + try? await conn.ready() } return (ret, scheduler) } diff --git a/Utils/Sources/Utils/Merklization/MMR.swift b/Utils/Sources/Utils/Merklization/MMR.swift index e782a224..591bd112 100644 --- a/Utils/Sources/Utils/Merklization/MMR.swift +++ b/Utils/Sources/Utils/Merklization/MMR.swift @@ -34,7 +34,7 @@ public struct MMR: Sendable, Equatable, Codable { } } - let nonNilPeaks = peaks.compactMap { $0 } + let nonNilPeaks = peaks.compactMap(\.self) return helper(nonNilPeaks[...]) } }