From c095281d619240ca67d762a51930cd2bd363a1f1 Mon Sep 17 00:00:00 2001 From: Xiliang Chen Date: Thu, 24 Oct 2024 17:52:55 +1300 Subject: [PATCH] UP and CE messages working (#193) * peer mode to peer role * peer manager * UP and CE messages working * fix --- .../BlockchainDataProvider.swift | 4 - .../BlockchainDataProviderTests.swift | 1 - .../xcshareddata/xcschemes/Boka.xcscheme | 6 +- Boka/Sources/Boka.swift | 6 +- .../Sources/Networking/Connection.swift | 14 +- .../Networking/MockPeerEventHandler.swift | 76 +++++++++ Networking/Sources/Networking/Peer.swift | 146 +++++------------- Networking/Sources/Networking/Stream.swift | 19 ++- .../Sources/Networking/StreamHandler.swift | 2 +- .../NetworkingProtocol/MessageDecoder.swift | 20 +-- .../Node/NetworkingProtocol/Network.swift | 33 ++-- .../NetworkingProtocol/NetworkManager.swift | 48 +++++- .../Node/NetworkingProtocol/PeerManager.swift | 62 ++++++++ .../Node/NetworkingProtocol/Typealias.swift | 2 +- .../UniquePresistent/BlockAnnouncement.swift | 13 +- .../UniquePresistent/UPMessage.swift | 22 +-- Node/Sources/Node/Node.swift | 3 + 17 files changed, 304 insertions(+), 173 deletions(-) create mode 100644 Networking/Sources/Networking/MockPeerEventHandler.swift create mode 100644 Node/Sources/Node/NetworkingProtocol/PeerManager.swift diff --git a/Blockchain/Sources/Blockchain/BlockchainDataProvider/BlockchainDataProvider.swift b/Blockchain/Sources/Blockchain/BlockchainDataProvider/BlockchainDataProvider.swift index edde73cb..86104cde 100644 --- a/Blockchain/Sources/Blockchain/BlockchainDataProvider/BlockchainDataProvider.swift +++ b/Blockchain/Sources/Blockchain/BlockchainDataProvider/BlockchainDataProvider.swift @@ -81,10 +81,6 @@ extension BlockchainDataProvider { try await dataProvider.getState(hash: hash) } - public func getFinalizedHead() async throws -> Data32 { - try await dataProvider.getFinalizedHead() - } - public func getHeads() async throws -> Set { try await dataProvider.getHeads() } diff --git a/Blockchain/Tests/BlockchainTests/BlockchainDataProviderTests.swift b/Blockchain/Tests/BlockchainTests/BlockchainDataProviderTests.swift index 9d2933b6..06924fe4 100644 --- a/Blockchain/Tests/BlockchainTests/BlockchainDataProviderTests.swift +++ b/Blockchain/Tests/BlockchainTests/BlockchainDataProviderTests.swift @@ -22,7 +22,6 @@ struct BlockchainDataProviderTests { @Test func testInitialization() async throws { #expect(await provider.bestHead.hash == genesisBlock.hash) #expect(await provider.finalizedHead.hash == genesisBlock.hash) - #expect(try await provider.getFinalizedHead() == genesisBlock.hash) #expect(try await provider.getHeads() == [genesisBlock.hash]) } diff --git a/Boka/.swiftpm/xcode/xcshareddata/xcschemes/Boka.xcscheme b/Boka/.swiftpm/xcode/xcshareddata/xcschemes/Boka.xcscheme index e3f4eaf8..8e6dc514 100644 --- a/Boka/.swiftpm/xcode/xcshareddata/xcschemes/Boka.xcscheme +++ b/Boka/.swiftpm/xcode/xcshareddata/xcschemes/Boka.xcscheme @@ -63,6 +63,10 @@ + + @@ -75,7 +79,7 @@ diff --git a/Boka/Sources/Boka.swift b/Boka/Sources/Boka.swift index 313d2c04..a6a8bbb5 100644 --- a/Boka/Sources/Boka.swift +++ b/Boka/Sources/Boka.swift @@ -91,7 +91,9 @@ struct Boka: AsyncParsableCommand { let logger = Logger(label: "cli") - logger.info("Starting Boka. Chain: \(chain)") + logger.info("Starting Boka.") + + logger.info("Chain: \(chain)") if let name { logger.info("Node name: \(name)") @@ -131,7 +133,7 @@ struct Boka: AsyncParsableCommand { logger.info("Network key: \(networkKey.publicKey.data.toHexString())") let networkConfig = NetworkConfig( - mode: validator ? .validator : .builder, + role: validator ? .validator : .builder, listenAddress: p2p, key: networkKey ) diff --git a/Networking/Sources/Networking/Connection.swift b/Networking/Sources/Networking/Connection.swift index 7e28ab9b..7ec54763 100644 --- a/Networking/Sources/Networking/Connection.swift +++ b/Networking/Sources/Networking/Connection.swift @@ -8,14 +8,14 @@ private let logger = Logger(label: "Connection") public protocol ConnectionInfoProtocol { var id: UniqueId { get } - var mode: PeerMode { get } + var role: PeerRole { get } var remoteAddress: NetAddr { get } } public final class Connection: Sendable, ConnectionInfoProtocol { let connection: QuicConnection let impl: PeerImpl - public let mode: PeerMode + public let role: PeerRole public let remoteAddress: NetAddr let presistentStreams: ThreadSafeContainer< [Handler.PresistentHandler.StreamKind: Stream] @@ -26,10 +26,10 @@ public final class Connection: Sendable, ConnectionInfoP connection.id } - init(_ connection: QuicConnection, impl: PeerImpl, mode: PeerMode, remoteAddress: NetAddr, initiatedByLocal: Bool) { + init(_ connection: QuicConnection, impl: PeerImpl, role: PeerRole, remoteAddress: NetAddr, initiatedByLocal: Bool) { self.connection = connection self.impl = impl - self.mode = mode + self.role = role self.remoteAddress = remoteAddress self.initiatedByLocal = initiatedByLocal } @@ -42,7 +42,7 @@ public final class Connection: Sendable, ConnectionInfoP let data = try request.encode() let kind = request.kind let stream = try createStream(kind: kind) - try stream.send(data: data) + try stream.send(message: data) // TODO: pipe this to decoder directly to be able to reject early var response = Data() while let nextData = await stream.receive() { @@ -144,7 +144,7 @@ public final class Connection: Sendable, ConnectionInfoP } let request = try decoder.decode(data: data) let resp = try await impl.ephemeralStreamHandler.handle(connection: self, request: request) - try stream.send(data: resp, finish: true) + try stream.send(message: resp, finish: true) } } } @@ -186,7 +186,7 @@ func presistentStreamRunLoop( guard let lengthData else { break } - let length = UInt32(littleEndian: lengthData.withUnsafeBytes { $0.load(as: UInt32.self) }) + let length = UInt32(littleEndian: lengthData.withUnsafeBytes { $0.loadUnaligned(as: UInt32.self) }) // sanity check for length // TODO: pick better value guard length < 1024 * 1024 * 10 else { diff --git a/Networking/Sources/Networking/MockPeerEventHandler.swift b/Networking/Sources/Networking/MockPeerEventHandler.swift new file mode 100644 index 00000000..1c2540f8 --- /dev/null +++ b/Networking/Sources/Networking/MockPeerEventHandler.swift @@ -0,0 +1,76 @@ +import Foundation +import MsQuicSwift +import Utils + +public final class MockPeerEventHandler: QuicEventHandler { + public enum EventType { + case newConnection(listener: QuicListener, connection: QuicConnection, info: ConnectionInfo) + case shouldOpen(connection: QuicConnection, certificate: Data?) + case connected(connection: QuicConnection) + case shutdownInitiated(connection: QuicConnection, reason: ConnectionCloseReason) + case streamStarted(connection: QuicConnection, stream: QuicStream) + case dataReceived(stream: QuicStream, data: Data) + case closed(stream: QuicStream, status: QuicStatus, code: QuicErrorCode) + } + + public let events: ThreadSafeContainer<[EventType]> = .init([]) + + public init() {} + + public func newConnection( + _ listener: QuicListener, connection: QuicConnection, info: ConnectionInfo + ) -> QuicStatus { + events.write { events in + events.append(.newConnection(listener: listener, connection: connection, info: info)) + } + + return .code(.success) + } + + public func shouldOpen(_: QuicConnection, certificate: Data?) -> QuicStatus { + guard let certificate else { + return .code(.requiredCert) + } + do { + let (publicKey, alternativeName) = try parseCertificate(data: certificate, type: .x509) + if alternativeName != generateSubjectAlternativeName(pubkey: publicKey) { + return .code(.badCert) + } + } catch { + return .code(.badCert) + } + return .code(.success) + } + + public func connected(_ connection: QuicConnection) { + events.write { events in + events.append(.connected(connection: connection)) + } + } + + public func shutdownInitiated(_ connection: QuicConnection, reason: ConnectionCloseReason) { + print("shutdownInitiated \(connection.id) with reason \(reason)") + events.write { events in + events.append(.shutdownInitiated(connection: connection, reason: reason)) + } + } + + public func streamStarted(_ connect: QuicConnection, stream: QuicStream) { + events.write { events in + events.append(.streamStarted(connection: connect, stream: stream)) + } + } + + public func dataReceived(_ stream: QuicStream, data: Data) { + events.write { events in + events.append(.dataReceived(stream: stream, data: data)) + } + } + + public func closed(_ stream: QuicStream, status: QuicStatus, code: QuicErrorCode) { + print("closed stream \(stream.id) with status \(status) and code \(code)") + events.write { events in + events.append(.closed(stream: stream, status: status, code: code)) + } + } +} diff --git a/Networking/Sources/Networking/Peer.swift b/Networking/Sources/Networking/Peer.swift index 15a50c3b..90a26e51 100644 --- a/Networking/Sources/Networking/Peer.swift +++ b/Networking/Sources/Networking/Peer.swift @@ -10,14 +10,14 @@ public enum StreamType: Sendable { case commonEphemeral } -public enum PeerMode: Sendable, Hashable { +public enum PeerRole: Sendable, Hashable { case validator case builder // case proxy // not yet specified } public struct PeerOptions: Sendable { - public var mode: PeerMode + public var role: PeerRole public var listenAddress: NetAddr public var genesisHeader: Data32 public var secretKey: Ed25519.SecretKey @@ -28,7 +28,7 @@ public struct PeerOptions: Sendable { public var peerSettings: PeerSettings public init( - mode: PeerMode, + role: PeerRole, listenAddress: NetAddr, genesisHeader: Data32, secretKey: Ed25519.SecretKey, @@ -38,7 +38,7 @@ public struct PeerOptions: Sendable { clientSettings: QuicSettings = .defaultSettings, peerSettings: PeerSettings = .defaultSettings ) { - self.mode = mode + self.role = role self.listenAddress = listenAddress self.genesisHeader = genesisHeader self.secretKey = secretKey @@ -64,8 +64,8 @@ public final class Peer: Sendable { let logger = Logger(label: "Peer".uniqueId) let alpns = [ - PeerMode.validator: Alpn(genesisHeader: options.genesisHeader, builder: false).data, - PeerMode.builder: Alpn(genesisHeader: options.genesisHeader, builder: true).data, + PeerRole.validator: Alpn(genesisHeader: options.genesisHeader, builder: false).data, + PeerRole.builder: Alpn(genesisHeader: options.genesisHeader, builder: true).data, ] let allAlpns = Array(alpns.values) @@ -76,14 +76,14 @@ public final class Peer: Sendable { registration: registration, pkcs12: pkcs12, alpns: allAlpns, client: false, settings: options.serverSettings ) - let clientAlpn = alpns[options.mode]! + let clientAlpn = alpns[options.role]! let clientConfiguration = try QuicConfiguration( registration: registration, pkcs12: pkcs12, alpns: [clientAlpn], client: true, settings: options.clientSettings ) impl = PeerImpl( logger: logger, - mode: options.mode, + role: options.role, settings: options.peerSettings, alpns: alpns, clientConfiguration: clientConfiguration, @@ -104,12 +104,13 @@ public final class Peer: Sendable { try listener.listenAddress() } - public func connect(to address: NetAddr, mode: PeerMode) throws -> Connection { + // TODO: see if we can remove the role parameter + public func connect(to address: NetAddr, role: PeerRole) throws -> Connection { let conn = impl.connections.read { connections in - connections.byType[mode]?[address] + connections.byType[role]?[address] } return try conn ?? impl.connections.write { connections in - let curr = connections.byType[mode, default: [:]][address] + let curr = connections.byType[role, default: [:]][address] if let curr { return curr } @@ -122,11 +123,11 @@ public final class Peer: Sendable { let conn = Connection( quicConn, impl: self.impl, - mode: mode, + role: role, remoteAddress: address, initiatedByLocal: true ) - connections.byType[mode, default: [:]][address] = conn + connections.byType[role, default: [:]][address] = conn connections.byId[conn.id] = conn return conn } @@ -166,15 +167,15 @@ public final class Peer: Sendable { final class PeerImpl: Sendable { struct ConnectionStorage { - var byType: [PeerMode: [NetAddr: Connection]] = [:] + var byType: [PeerRole: [NetAddr: Connection]] = [:] var byId: [UniqueId: Connection] = [:] } fileprivate let logger: Logger - fileprivate let mode: PeerMode + fileprivate let role: PeerRole fileprivate let settings: PeerSettings - fileprivate let alpns: [PeerMode: Data] - fileprivate let alpnLookup: [Data: PeerMode] + fileprivate let alpns: [PeerRole: Data] + fileprivate let alpnLookup: [Data: PeerRole] fileprivate let clientConfiguration: QuicConfiguration @@ -186,50 +187,50 @@ final class PeerImpl: Sendable { fileprivate init( logger: Logger, - mode: PeerMode, + role: PeerRole, settings: PeerSettings, - alpns: [PeerMode: Data], + alpns: [PeerRole: Data], clientConfiguration: QuicConfiguration, presistentStreamHandler: Handler.PresistentHandler, ephemeralStreamHandler: Handler.EphemeralHandler ) { self.logger = logger - self.mode = mode + self.role = role self.settings = settings self.alpns = alpns self.clientConfiguration = clientConfiguration self.presistentStreamHandler = presistentStreamHandler self.ephemeralStreamHandler = ephemeralStreamHandler - var alpnLookup = [Data: PeerMode]() - for (mode, alpn) in alpns { - alpnLookup[alpn] = mode + var alpnLookup = [Data: PeerRole]() + for (role, alpn) in alpns { + alpnLookup[alpn] = role } self.alpnLookup = alpnLookup } - func addConnection(_ connection: QuicConnection, addr: NetAddr, mode: PeerMode) -> Bool { + func addConnection(_ connection: QuicConnection, addr: NetAddr, role: PeerRole) -> Bool { connections.write { connections in - if mode == .builder { - let currentCount = connections.byType[mode]?.count ?? 0 + if role == .builder { + let currentCount = connections.byType[role]?.count ?? 0 if currentCount >= self.settings.maxBuilderConnections { self.logger.warning("max builder connections reached") // TODO: consider connection rotation strategy return false } } - if connections.byType[mode, default: [:]][addr] != nil { + if connections.byType[role, default: [:]][addr] != nil { self.logger.warning("connection already exists") return false } let conn = Connection( connection, impl: self, - mode: mode, + role: role, remoteAddress: addr, initiatedByLocal: false ) - connections.byType[mode, default: [:]][addr] = conn + connections.byType[role, default: [:]][addr] = conn connections.byId[connection.id] = conn return true } @@ -258,22 +259,20 @@ private struct PeerEventHandler: QuicEventHandler { func newConnection(_: QuicListener, connection: QuicConnection, info: ConnectionInfo) -> QuicStatus { let addr = info.remoteAddress - let mode = impl.alpnLookup[info.negotiatedAlpn] - guard let mode else { + let role = impl.alpnLookup[info.negotiatedAlpn] + guard let role else { logger.warning("unknown alpn: \(String(data: info.negotiatedAlpn, encoding: .utf8) ?? info.negotiatedAlpn.toDebugHexString())") return .code(.alpnNegFailure) } - logger.debug("new connection: \(addr) mode: \(mode)") - if impl.addConnection(connection, addr: addr, mode: mode) { + logger.debug("new connection: \(addr) role: \(role)") + if impl.addConnection(connection, addr: addr, role: role) { return .code(.success) } else { return .code(.connectionRefused) } } - // TODO: implement a peer and test this func shouldOpen(_ connection: QuicConnection, certificate: Data?) -> QuicStatus { - // TODO: enable certificate validation logic once parsing logic is fixed guard let certificate else { return .code(.requiredCert) } @@ -287,7 +286,7 @@ private struct PeerEventHandler: QuicEventHandler { if alternativeName != generateSubjectAlternativeName(pubkey: publicKey) { return .code(.badCert) } - if impl.mode == PeerMode.validator { + if impl.role == PeerRole.validator { // TODO: verify if it is current or next validator } } catch { @@ -329,7 +328,7 @@ private struct PeerEventHandler: QuicEventHandler { impl.connections.write { connections in if let conn = connections.byId[connection.id] { connections.byId.removeValue(forKey: connection.id) - connections.byType[conn.mode]?.removeValue(forKey: conn.remoteAddress) + connections.byType[conn.role]?.removeValue(forKey: conn.remoteAddress) } } } @@ -370,76 +369,3 @@ private struct PeerEventHandler: QuicEventHandler { } } } - -public final class MockPeerEventHandler: QuicEventHandler { - public enum EventType { - case newConnection(listener: QuicListener, connection: QuicConnection, info: ConnectionInfo) - case shouldOpen(connection: QuicConnection, certificate: Data?) - case connected(connection: QuicConnection) - case shutdownInitiated(connection: QuicConnection, reason: ConnectionCloseReason) - case streamStarted(connection: QuicConnection, stream: QuicStream) - case dataReceived(stream: QuicStream, data: Data) - case closed(stream: QuicStream, status: QuicStatus, code: QuicErrorCode) - } - - public let events: ThreadSafeContainer<[EventType]> = .init([]) - - public init() {} - - public func newConnection( - _ listener: QuicListener, connection: QuicConnection, info: ConnectionInfo - ) -> QuicStatus { - events.write { events in - events.append(.newConnection(listener: listener, connection: connection, info: info)) - } - - return .code(.success) - } - - public func shouldOpen(_: QuicConnection, certificate: Data?) -> QuicStatus { - guard let certificate else { - return .code(.requiredCert) - } - do { - let (publicKey, alternativeName) = try parseCertificate(data: certificate, type: .x509) - if alternativeName != generateSubjectAlternativeName(pubkey: publicKey) { - return .code(.badCert) - } - } catch { - return .code(.badCert) - } - return .code(.success) - } - - public func connected(_ connection: QuicConnection) { - events.write { events in - events.append(.connected(connection: connection)) - } - } - - public func shutdownInitiated(_ connection: QuicConnection, reason: ConnectionCloseReason) { - print("shutdownInitiated \(connection.id) with reason \(reason)") - events.write { events in - events.append(.shutdownInitiated(connection: connection, reason: reason)) - } - } - - public func streamStarted(_ connect: QuicConnection, stream: QuicStream) { - events.write { events in - events.append(.streamStarted(connection: connect, stream: stream)) - } - } - - public func dataReceived(_ stream: QuicStream, data: Data) { - events.write { events in - events.append(.dataReceived(stream: stream, data: data)) - } - } - - public func closed(_ stream: QuicStream, status: QuicStatus, code: QuicErrorCode) { - print("closed stream \(stream.id) with status \(status) and code \(code)") - events.write { events in - events.append(.closed(stream: stream, status: status, code: code)) - } - } -} diff --git a/Networking/Sources/Networking/Stream.swift b/Networking/Sources/Networking/Stream.swift index 03aaca32..10f5a220 100644 --- a/Networking/Sources/Networking/Stream.swift +++ b/Networking/Sources/Networking/Stream.swift @@ -57,16 +57,33 @@ final class Stream: Sendable, StreamProtocol { } public func send(message: Handler.PresistentHandler.Message) throws { - try send(data: message.encode(), finish: true) + try send(message: message.encode(), finish: false) } + /// send raw data func send(data: Data, finish: Bool = false) throws { guard status == .open else { throw StreamError.notOpen } + try stream.send(data: data, finish: finish) } + // send message with length prefix + func send(message: Data, finish: Bool = false) throws { + guard status == .open else { + throw StreamError.notOpen + } + + let length = UInt32(message.count) + var lengthData = Data(repeating: 0, count: 4) + lengthData.withUnsafeMutableBytes { ptr in + ptr.storeBytes(of: UInt32(littleEndian: length), as: UInt32.self) + } + try stream.send(data: lengthData, finish: false) + try stream.send(data: message, finish: finish) + } + func received(data: Data) { if data.isEmpty { return diff --git a/Networking/Sources/Networking/StreamHandler.swift b/Networking/Sources/Networking/StreamHandler.swift index 2c848f28..9807796d 100644 --- a/Networking/Sources/Networking/StreamHandler.swift +++ b/Networking/Sources/Networking/StreamHandler.swift @@ -23,7 +23,7 @@ public protocol PresistentStreamHandler: Sendable { associatedtype Message: MessageProtocol func createDecoder(kind: StreamKind) -> any MessageDecoder - func streamOpened(connection: any ConnectionInfoProtocol, stream: any StreamProtocol, kind: StreamKind) async throws + func streamOpened(connection: any ConnectionInfoProtocol, stream: any StreamProtocol, kind: StreamKind) async throws func handle(connection: any ConnectionInfoProtocol, message: Message) async throws } diff --git a/Node/Sources/Node/NetworkingProtocol/MessageDecoder.swift b/Node/Sources/Node/NetworkingProtocol/MessageDecoder.swift index d3a63722..1d710c49 100644 --- a/Node/Sources/Node/NetworkingProtocol/MessageDecoder.swift +++ b/Node/Sources/Node/NetworkingProtocol/MessageDecoder.swift @@ -6,11 +6,12 @@ import Networking import Synchronization import TracingUtils -class UPMessageDecoder: MessageDecoder { +class BlockAnnouncementDecoder: MessageDecoder { typealias Message = UPMessage private let config: ProtocolConfigRef private let kind: UniquePresistentStreamKind + private var handshakeReceived = false init(config: ProtocolConfigRef, kind: UniquePresistentStreamKind) { self.config = config @@ -18,15 +19,16 @@ class UPMessageDecoder: MessageDecoder { } func decode(data: Data) throws -> Message { - let type = UPMessage.getType(kind: kind) - let payload = try JamDecoder.decode(type, from: data, withConfig: config) - guard let message = UPMessage.from(kind: kind, data: payload) else { - throw DecodingError.dataCorrupted(DecodingError.Context( - codingPath: [], - debugDescription: "unreachable: invalid UP message" - )) + if handshakeReceived { + return try .blockAnnouncement( + JamDecoder.decode(BlockAnnouncement.self, from: data, withConfig: config) + ) + } else { + handshakeReceived = true + return try .blockAnnouncementHandshake( + JamDecoder.decode(BlockAnnouncementHandshake.self, from: data, withConfig: config) + ) } - return message } } diff --git a/Node/Sources/Node/NetworkingProtocol/Network.swift b/Node/Sources/Node/NetworkingProtocol/Network.swift index 5a0ee275..f9870c47 100644 --- a/Node/Sources/Node/NetworkingProtocol/Network.swift +++ b/Node/Sources/Node/NetworkingProtocol/Network.swift @@ -7,23 +7,29 @@ import Utils public protocol NetworkProtocolHandler: Sendable { func handle(ceRequest: CERequest) async throws -> (any Encodable)? - func handle(upMessage: UPMessage) async throws + func handle(connection: some ConnectionInfoProtocol, upMessage: UPMessage) async throws + + func handle( + connection: some ConnectionInfoProtocol, + stream: some StreamProtocol, + kind: UniquePresistentStreamKind + ) async throws } public final class Network: Sendable { public struct Config { - public var mode: PeerMode + public var role: PeerRole public var listenAddress: NetAddr public var key: Ed25519.SecretKey public var peerSettings: PeerSettings public init( - mode: PeerMode, + role: PeerRole, listenAddress: NetAddr, key: Ed25519.SecretKey, peerSettings: PeerSettings = .defaultSettings ) { - self.mode = mode + self.role = role self.listenAddress = listenAddress self.key = key self.peerSettings = peerSettings @@ -48,7 +54,7 @@ public final class Network: Sendable { ) let option = PeerOptions( - mode: config.mode, + role: config.role, listenAddress: config.listenAddress, genesisHeader: genesisHeader, secretKey: config.key, @@ -61,12 +67,12 @@ public final class Network: Sendable { peer = try Peer(options: option) } - public func connect(to: NetAddr, mode: PeerMode) throws -> some ConnectionInfoProtocol { - try peer.connect(to: to, mode: mode) + public func connect(to: NetAddr, role: PeerRole) throws -> some ConnectionInfoProtocol { + try peer.connect(to: to, role: role) } public func send(to: NetAddr, message: CERequest) async throws -> Data { - let conn = try peer.connect(to: to, mode: .builder) + let conn = try peer.connect(to: to, role: .builder) return try await conn.request(message) } @@ -107,17 +113,20 @@ struct PresistentStreamHandlerImpl: PresistentStreamHandler { fileprivate let impl: NetworkImpl func createDecoder(kind: StreamKind) -> any MessageDecoder { - UPMessageDecoder(config: impl.config, kind: kind) + switch kind { + case .blockAnnouncement: + BlockAnnouncementDecoder(config: impl.config, kind: kind) + } } - func streamOpened(connection _: any ConnectionInfoProtocol, stream _: any StreamProtocol, kind _: StreamKind) throws { - // TODO: send handshake + func streamOpened(connection: any ConnectionInfoProtocol, stream: any StreamProtocol, kind: StreamKind) async throws { + try await impl.handler.handle(connection: connection, stream: stream, kind: kind) } func handle(connection: any ConnectionInfoProtocol, message: Message) async throws { impl.logger.trace("handling message: \(message) from \(connection.id)") - try await impl.handler.handle(upMessage: message) + try await impl.handler.handle(connection: connection, upMessage: message) } } diff --git a/Node/Sources/Node/NetworkingProtocol/NetworkManager.swift b/Node/Sources/Node/NetworkingProtocol/NetworkManager.swift index 9e515786..4cbd2ab9 100644 --- a/Node/Sources/Node/NetworkingProtocol/NetworkManager.swift +++ b/Node/Sources/Node/NetworkingProtocol/NetworkManager.swift @@ -1,6 +1,7 @@ import Blockchain import Codec import Foundation +import Networking import TracingUtils import Utils @@ -12,15 +13,19 @@ enum SendTarget { } public final class NetworkManager: Sendable { + private let peerManager: PeerManager private let network: Network private let blockchain: Blockchain private let subscriptions: EventSubscriptions + // This is for development only // Those peers will receive all the messages regardless the target private let devPeers: Set public init(config: Network.Config, blockchain: Blockchain, eventBus: EventBus, devPeers: Set) async throws { - let handler = HandlerImpl(blockchain: blockchain) + peerManager = PeerManager() + + let handler = HandlerImpl(blockchain: blockchain, peerManager: peerManager) network = try await Network( config: config, protocolConfig: blockchain.config, @@ -29,10 +34,11 @@ public final class NetworkManager: Sendable { ) self.blockchain = blockchain subscriptions = EventSubscriptions(eventBus: eventBus) + self.devPeers = devPeers for peer in devPeers { - _ = try network.connect(to: peer, mode: .validator) + _ = try network.connect(to: peer, role: .validator) } logger.info("P2P Listening on \(try! network.listenAddress())") @@ -68,6 +74,7 @@ public final class NetworkManager: Sendable { let targets = getSendTarget(target: target) for target in targets { Task { + logger.trace("sending message", metadata: ["target": "\(target)", "message": "\(message)"]) let res = await Result { try await network.send(to: target, message: message) } @@ -85,6 +92,7 @@ public final class NetworkManager: Sendable { let targets = getSendTarget(target: target) for target in targets { Task { + logger.trace("sending message", metadata: ["target": "\(target)", "message": "\(message)"]) // not expecting a response // TODO: handle errors and ensure no data is returned _ = try await network.send(to: target, message: message) @@ -93,6 +101,7 @@ public final class NetworkManager: Sendable { } private func on(safroleTicketsGenerated event: RuntimeEvents.SafroleTicketsGenerated) async { + logger.trace("sending tickets", metadata: ["epochIndex": "\(event.epochIndex)"]) for ticket in event.items { await send( message: .safroleTicket1(.init( @@ -112,6 +121,7 @@ public final class NetworkManager: Sendable { struct HandlerImpl: NetworkProtocolHandler { let blockchain: Blockchain + let peerManager: PeerManager func handle(ceRequest: CERequest) async throws -> (any Encodable)? { switch ceRequest { @@ -139,11 +149,39 @@ struct HandlerImpl: NetworkProtocolHandler { } } - func handle(upMessage: UPMessage) async throws { + func handle(connection: some ConnectionInfoProtocol, upMessage: UPMessage) async throws { switch upMessage { + case let .blockAnnouncementHandshake(message): + logger.trace("received block announcement handshake: \(message)") + await peerManager.addPeer(address: connection.remoteAddress, handshake: message) case let .blockAnnouncement(message): - logger.debug("received block announcement: \(message)") - // TODO: handle it + logger.trace("received block announcement: \(message)") + await peerManager.updatePeer(address: connection.remoteAddress, message: message) + } + } + + func handle( + connection _: some ConnectionInfoProtocol, stream: some StreamProtocol, kind: UniquePresistentStreamKind + ) async throws { + switch kind { + case .blockAnnouncement: + // send handshake message + let finalized = await blockchain.dataProvider.finalizedHead + let heads = try await blockchain.dataProvider.getHeads() + var headsWithTimeslot: [HashAndSlot] = [] + for head in heads { + try await headsWithTimeslot.append(HashAndSlot( + hash: head, + timeslot: blockchain.dataProvider.getHeader(hash: head).value.timeslot + )) + } + + let handshake = BlockAnnouncementHandshake( + finalized: HashAndSlot(hash: finalized.hash, timeslot: finalized.timeslot), + heads: headsWithTimeslot + ) + + try stream.send(message: .blockAnnouncementHandshake(handshake)) } } } diff --git a/Node/Sources/Node/NetworkingProtocol/PeerManager.swift b/Node/Sources/Node/NetworkingProtocol/PeerManager.swift new file mode 100644 index 00000000..554ee097 --- /dev/null +++ b/Node/Sources/Node/NetworkingProtocol/PeerManager.swift @@ -0,0 +1,62 @@ +import Blockchain +import Foundation +import Networking +import TracingUtils +import Utils + +private let logger = Logger(label: "PeerManager") + +public struct PeerInfo { + public let address: NetAddr + public internal(set) var finalized: HashAndSlot + public internal(set) var heads: Set = [] +} + +public actor PeerManager: Sendable { + private var peers: [NetAddr: PeerInfo] = [:] + + init() {} + + func addPeer(address: NetAddr, handshake: BlockAnnouncementHandshake) { + var peer = PeerInfo( + address: address, + finalized: handshake.finalized + ) + for head in handshake.heads { + peer.heads.insert(head) + } + peers[address] = peer + + logger.debug("added peer", metadata: ["address": "\(address)", "finalized": "\(peer.finalized)"]) + } + + func updatePeer(address: NetAddr, message: BlockAnnouncement) { + if var peer = peers[address] { + peer.finalized = message.finalized + // purge heads that are older than the finalized head + // or if it is the parent of the new block + // this means if some blocks are skipped, it is possible that we miss purge some heads + // that is ancestor of the new block. but that's fine + peer.heads = peer.heads.filter { head in + head.timeslot > message.finalized.timeslot && head.hash != message.header.parentHash + } + peer.heads.insert(HashAndSlot(hash: message.header.hash(), timeslot: message.header.timeslot)) + peers[address] = peer + } else { + // this shouldn't happen but let's handle it + peers[address] = PeerInfo( + address: address, + finalized: message.finalized, + heads: [ + HashAndSlot(hash: message.header.hash(), timeslot: message.header.timeslot), + ] + ) + } + + logger.debug("updated peer", metadata: ["address": "\(address)", "finalized": "\(peers[address]!.finalized)"]) + } + + public func getPeer(address: NetAddr) -> PeerInfo? { + peers[address] + } +} diff --git a/Node/Sources/Node/NetworkingProtocol/Typealias.swift b/Node/Sources/Node/NetworkingProtocol/Typealias.swift index 4b86e1dd..37639426 100644 --- a/Node/Sources/Node/NetworkingProtocol/Typealias.swift +++ b/Node/Sources/Node/NetworkingProtocol/Typealias.swift @@ -1,5 +1,5 @@ import Networking -public typealias PeerMode = Networking.PeerMode +public typealias PeerRole = Networking.PeerRole public typealias PeerSettings = Networking.PeerSettings public typealias NetAddr = Networking.NetAddr diff --git a/Node/Sources/Node/NetworkingProtocol/UniquePresistent/BlockAnnouncement.swift b/Node/Sources/Node/NetworkingProtocol/UniquePresistent/BlockAnnouncement.swift index f23f1e90..82910794 100644 --- a/Node/Sources/Node/NetworkingProtocol/UniquePresistent/BlockAnnouncement.swift +++ b/Node/Sources/Node/NetworkingProtocol/UniquePresistent/BlockAnnouncement.swift @@ -1,8 +1,17 @@ import Blockchain import Utils +public struct HashAndSlot: Codable, Sendable, Hashable { + public var hash: Data32 + public var timeslot: TimeslotIndex +} + +public struct BlockAnnouncementHandshake: Codable, Sendable { + public var finalized: HashAndSlot + public var heads: [HashAndSlot] +} + public struct BlockAnnouncement: Codable, Sendable { public var header: Header - public var headerHash: Data32 - public var timeslot: TimeslotIndex + public var finalized: HashAndSlot } diff --git a/Node/Sources/Node/NetworkingProtocol/UniquePresistent/UPMessage.swift b/Node/Sources/Node/NetworkingProtocol/UniquePresistent/UPMessage.swift index 77cfe036..31dfdc1e 100644 --- a/Node/Sources/Node/NetworkingProtocol/UniquePresistent/UPMessage.swift +++ b/Node/Sources/Node/NetworkingProtocol/UniquePresistent/UPMessage.swift @@ -3,12 +3,15 @@ import Foundation import Networking public enum UPMessage: Sendable { + case blockAnnouncementHandshake(BlockAnnouncementHandshake) case blockAnnouncement(BlockAnnouncement) } extension UPMessage: MessageProtocol { public func encode() throws -> Data { switch self { + case let .blockAnnouncementHandshake(message): + try JamEncoder.encode(message) case let .blockAnnouncement(message): try JamEncoder.encode(message) } @@ -16,25 +19,10 @@ extension UPMessage: MessageProtocol { public var kind: UniquePresistentStreamKind { switch self { - case .blockAnnouncement: + case .blockAnnouncementHandshake: .blockAnnouncement - } - } - - static func getType(kind: UniquePresistentStreamKind) -> Decodable.Type { - switch kind { case .blockAnnouncement: - BlockAnnouncement.self - } - } - - static func from(kind: UniquePresistentStreamKind, data: any Decodable) -> UPMessage? { - switch kind { - case .blockAnnouncement: - guard let message = data as? BlockAnnouncement else { - return nil - } - return .blockAnnouncement(message) + .blockAnnouncement } } } diff --git a/Node/Sources/Node/Node.swift b/Node/Sources/Node/Node.swift index b6b17107..e60e7520 100644 --- a/Node/Sources/Node/Node.swift +++ b/Node/Sources/Node/Node.swift @@ -36,6 +36,9 @@ public class Node { keystore: KeyStore ) async throws { let (genesisState, genesisBlock, protocolConfig) = try await genesis.load() + + logger.info("Genesis: \(genesisBlock.hash)") + dataProvider = try await BlockchainDataProvider(InMemoryDataProvider(genesisState: genesisState, genesisBlock: genesisBlock)) timeProvider = SystemTimeProvider() let blockchain = try await Blockchain(