diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index c1ba4c58..e5e8e807 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -25,6 +25,7 @@ jobs: test: name: Build and Test runs-on: [self-hosted, linux] + timeout-minutes: 30 steps: - name: Checkout Code uses: actions/checkout@v4 diff --git a/Networking/Sources/MsQuicSwift/QuicConnection.swift b/Networking/Sources/MsQuicSwift/QuicConnection.swift index 75956d7e..f6b3844b 100644 --- a/Networking/Sources/MsQuicSwift/QuicConnection.swift +++ b/Networking/Sources/MsQuicSwift/QuicConnection.swift @@ -226,13 +226,14 @@ private class ConnectionHandle { case QUIC_CONNECTION_EVENT_SHUTDOWN_INITIATED_BY_TRANSPORT: let evtData = event.pointee.SHUTDOWN_INITIATED_BY_TRANSPORT - if evtData.Status == QuicStatusCode.connectionIdle.rawValue { + let status = QuicStatus(rawValue: evtData.Status) + if status == .code(.connectionIdle) { logger.trace("Successfully shut down on idle.") if let connection { connection.handler.shutdownInitiated(connection, reason: .idle) } } else { - logger.debug("Shut down by transport. Status: \(evtData.Status) Error: \(evtData.ErrorCode)") + logger.debug("Shut down by transport. Status: \(status) Error: \(evtData.ErrorCode)") if let connection { connection.handler.shutdownInitiated( connection, diff --git a/Networking/Sources/Networking/Connection.swift b/Networking/Sources/Networking/Connection.swift index 5d9e3532..3d4acc8d 100644 --- a/Networking/Sources/Networking/Connection.swift +++ b/Networking/Sources/Networking/Connection.swift @@ -10,22 +10,47 @@ public protocol ConnectionInfoProtocol { var id: UniqueId { get } var role: PeerRole { get } var remoteAddress: NetAddr { get } + var publicKey: Data? { get } } enum ConnectionError: Error { case receiveFailed case invalidLength + case unexpectedState + case closed +} + +enum ConnectionState { + case connecting(continuations: [CheckedContinuation]) + case connected(publicKey: Data) + case closed } public final class Connection: Sendable, ConnectionInfoProtocol { let connection: QuicConnection let impl: PeerImpl + public let role: PeerRole public let remoteAddress: NetAddr + let presistentStreams: ThreadSafeContainer< [Handler.PresistentHandler.StreamKind: Stream] > = .init([:]) let initiatedByLocal: Bool + private let state: ThreadSafeContainer = .init(.connecting(continuations: [])) + + public var publicKey: Data? { + state.read { + switch $0 { + case .connecting: + nil + case let .connected(publicKey): + publicKey + case .closed: + nil + } + } + } public var id: UniqueId { connection.id @@ -39,11 +64,79 @@ public final class Connection: Sendable, ConnectionInfoP self.initiatedByLocal = initiatedByLocal } + func opened(publicKey: Data) throws { + try state.write { state in + if case let .connecting(continuations) = state { + for continuation in continuations { + continuation.resume() + } + state = .connected(publicKey: publicKey) + } else { + throw ConnectionError.unexpectedState + } + } + } + + func closed() { + state.write { state in + if case let .connecting(continuations) = state { + for continuation in continuations { + continuation.resume(throwing: ConnectionError.closed) + } + state = .closed + } + state = .closed + } + } + + public var isClosed: Bool { + state.read { + switch $0 { + case .connecting: + false + case .connected: + false + case .closed: + true + } + } + } + + public func ready() async throws { + let isReady = state.read { + switch $0 { + case .connecting: + false + case .connected: + true + case .closed: + true + } + } + + if isReady { + return + } + try await withCheckedThrowingContinuation { continuation in + state.write { state in + if case var .connecting(continuations) = state { + continuations.append(continuation) + state = .connecting(continuations: continuations) + } else { + continuation.resume() + } + } + } + } + public func close(abort: Bool = false) { try? connection.shutdown(errorCode: abort ? 1 : 0) // TODO: define some error code } public func request(_ request: Handler.EphemeralHandler.Request) async throws -> Data { + guard !isClosed else { + throw ConnectionError.closed + } logger.trace("sending request", metadata: ["kind": "\(request.kind)"]) let data = try request.encode() let kind = request.kind diff --git a/Networking/Sources/Networking/Peer.swift b/Networking/Sources/Networking/Peer.swift index 8f203665..14e8717e 100644 --- a/Networking/Sources/Networking/Peer.swift +++ b/Networking/Sources/Networking/Peer.swift @@ -60,6 +60,8 @@ public final class Peer: Sendable { impl.logger } + public let publicKey: Data + public init(options: PeerOptions) throws { let logger = Logger(label: "Peer".uniqueId) @@ -81,11 +83,14 @@ public final class Peer: Sendable { registration: registration, pkcs12: pkcs12, alpns: [clientAlpn], client: true, settings: options.clientSettings ) + publicKey = options.secretKey.publicKey.data.data + impl = PeerImpl( logger: logger, role: options.role, settings: options.peerSettings, alpns: alpns, + publicKey: publicKey, clientConfiguration: clientConfiguration, presistentStreamHandler: options.presistentStreamHandler, ephemeralStreamHandler: options.ephemeralStreamHandler @@ -98,6 +103,12 @@ public final class Peer: Sendable { listenAddress: options.listenAddress, alpns: allAlpns ) + + logger.debug("Peer initialized", metadata: [ + "listenAddress": "\(options.listenAddress)", + "role": "\(options.role)", + "publicKey": "\(options.secretKey.publicKey.data.toHexString())", + ]) } public func listenAddress() throws -> NetAddr { @@ -107,11 +118,10 @@ public final class Peer: Sendable { // TODO: see if we can remove the role parameter public func connect(to address: NetAddr, role: PeerRole) throws -> Connection { let conn = impl.connections.read { connections in - connections.byAddr[address]?.0 + connections.byAddr[address] } return try conn ?? impl.connections.write { connections in - let curr = connections.byAddr[address]?.0 - if let curr { + if let curr = connections.byAddr[address] { return curr } @@ -130,12 +140,18 @@ public final class Peer: Sendable { remoteAddress: address, initiatedByLocal: true ) - connections.byAddr[address] = (conn, role) + connections.byAddr[address] = conn connections.byId[conn.id] = conn return conn } } + public func getConnection(publicKey: Data) -> Connection? { + impl.connections.read { connections in + connections.byPublicKey[publicKey] + } + } + public func broadcast(kind: Handler.PresistentHandler.StreamKind, message: Handler.PresistentHandler.Message) { let connections = impl.connections.read { connections in connections.byId.values @@ -170,8 +186,9 @@ public final class Peer: Sendable { final class PeerImpl: Sendable { struct ConnectionStorage { - var byAddr: [NetAddr: (Connection, PeerRole)] = [:] + var byAddr: [NetAddr: Connection] = [:] var byId: [UniqueId: Connection] = [:] + var byPublicKey: [Data: Connection] = [:] } fileprivate let logger: Logger @@ -179,6 +196,7 @@ final class PeerImpl: Sendable { fileprivate let settings: PeerSettings fileprivate let alpns: [PeerRole: Data] fileprivate let alpnLookup: [Data: PeerRole] + fileprivate let publicKey: Data fileprivate let clientConfiguration: QuicConfiguration @@ -193,6 +211,7 @@ final class PeerImpl: Sendable { role: PeerRole, settings: PeerSettings, alpns: [PeerRole: Data], + publicKey: Data, clientConfiguration: QuicConfiguration, presistentStreamHandler: Handler.PresistentHandler, ephemeralStreamHandler: Handler.EphemeralHandler @@ -201,6 +220,7 @@ final class PeerImpl: Sendable { self.role = role self.settings = settings self.alpns = alpns + self.publicKey = publicKey self.clientConfiguration = clientConfiguration self.presistentStreamHandler = presistentStreamHandler self.ephemeralStreamHandler = ephemeralStreamHandler @@ -215,7 +235,7 @@ final class PeerImpl: Sendable { func addConnection(_ connection: QuicConnection, addr: NetAddr, role: PeerRole) -> Bool { connections.write { connections in if role == .builder { - let currentCount = connections.byAddr.values.filter { $0.1 == role }.count + let currentCount = connections.byAddr.values.filter { $0.role == role }.count if currentCount >= self.settings.maxBuilderConnections { self.logger.warning("max builder connections reached") // TODO: consider connection rotation strategy @@ -233,7 +253,7 @@ final class PeerImpl: Sendable { remoteAddress: addr, initiatedByLocal: false ) - connections.byAddr[addr] = (conn, role) + connections.byAddr[addr] = conn connections.byId[connection.id] = conn return true } @@ -279,6 +299,13 @@ private struct PeerEventHandler: QuicEventHandler { guard let certificate else { return .code(.requiredCert) } + let conn = impl.connections.read { connections in + connections.byId[connection.id] + } + guard let conn else { + logger.warning("Trying to open connection but connection is gone?", metadata: ["connectionId": "\(connection.id)"]) + return .code(.connectionRefused) + } do { let (publicKey, alternativeName) = try parseCertificate(data: certificate, type: .x509) logger.trace("Certificate parsed", metadata: [ @@ -289,8 +316,31 @@ private struct PeerEventHandler: QuicEventHandler { if alternativeName != generateSubjectAlternativeName(pubkey: publicKey) { return .code(.badCert) } - if impl.role == PeerRole.validator { - // TODO: verify if it is current or next validator + + if publicKey == impl.publicKey { + // self connection + logger.trace("self connection rejected", metadata: [ + "connectionId": "\(connection.id)", + "publicKey": "\(publicKey.toHexString())", + ]) + return .code(.connectionRefused) + } + + // TODO: verify if it is current or next validator + + return try impl.connections.write { connections in + if connections.byPublicKey.keys.contains(publicKey) { + // duplicated connection + logger.debug("duplicated connection rejected", metadata: [ + "connectionId": "\(connection.id)", + "publicKey": "\(publicKey.toHexString())", + ]) + // TODO: write a test for this + return .code(.connectionRefused) + } + connections.byPublicKey[publicKey] = conn + try conn.opened(publicKey: publicKey) + return .code(.success) } } catch { logger.warning("Failed to parse certificate", metadata: [ @@ -298,7 +348,6 @@ private struct PeerEventHandler: QuicEventHandler { "error": "\(error)"]) return .code(.badCert) } - return .code(.success) } func connected(_ connection: QuicConnection) { @@ -330,8 +379,12 @@ private struct PeerEventHandler: QuicEventHandler { logger.trace("connection shutdown complete", metadata: ["connectionId": "\(connection.id)"]) impl.connections.write { connections in if let conn = connections.byId[connection.id] { + conn.closed() connections.byId.removeValue(forKey: connection.id) - connections.byAddr[conn.remoteAddress] = nil + connections.byAddr.removeValue(forKey: conn.remoteAddress) + if let publicKey = conn.publicKey { + connections.byPublicKey.removeValue(forKey: publicKey) + } } } } diff --git a/Networking/Tests/MsQuicSwiftTests/QuicListenerTests.swift b/Networking/Tests/MsQuicSwiftTests/QuicListenerTests.swift index 7ba8c5d4..13e28a33 100644 --- a/Networking/Tests/MsQuicSwiftTests/QuicListenerTests.swift +++ b/Networking/Tests/MsQuicSwiftTests/QuicListenerTests.swift @@ -133,7 +133,7 @@ struct QuicListenerTests { }.first! try remoteStream2.send(data: Data("another replay to 2".utf8)) - try? await Task.sleep(for: .milliseconds(100)) + try? await Task.sleep(for: .milliseconds(200)) let receivedData = serverHandler.events.value.compactMap { switch $0 { case let .dataReceived(_, data): diff --git a/Networking/Tests/NetworkingTests/PeerTests.swift b/Networking/Tests/NetworkingTests/PeerTests.swift index 25c00e36..53f79386 100644 --- a/Networking/Tests/NetworkingTests/PeerTests.swift +++ b/Networking/Tests/NetworkingTests/PeerTests.swift @@ -1,6 +1,7 @@ import Foundation import MsQuicSwift import Testing +import TracingUtils import Utils @testable import Networking @@ -76,10 +77,10 @@ struct PeerTests { } actor DataStorage { - private(set) var lastReceivedData: Data? + private(set) var data: [Data] = [] - func updateData(_ data: Data?) { - lastReceivedData = data + func updateData(_ data: Data) { + self.data.append(data) } } @@ -89,7 +90,7 @@ struct PeerTests { private let dataStorage = DataStorage() var lastReceivedData: Data? { - get async { await dataStorage.lastReceivedData } + get async { await dataStorage.data.last } } func createDecoder(kind: StreamKind) -> any MessageDecoder { @@ -97,17 +98,21 @@ struct PeerTests { } func handle(connection _: any ConnectionInfoProtocol, request: Request) async throws -> Data { - let data = request.data + let data = request.data + Data(" response".utf8) await dataStorage.updateData(data) return data } } - struct MockPresentStreamHandler: PresistentStreamHandler { + final class MockPresentStreamHandler: PresistentStreamHandler { private let dataStorage = DataStorage() var lastReceivedData: Data? { - get async { await dataStorage.lastReceivedData } + get async { await dataStorage.data.last } + } + + var receivedData: [Data] { + get async { await dataStorage.data } } func streamOpened( @@ -148,7 +153,7 @@ struct PeerTests { role: .validator, listenAddress: NetAddr(ipAddress: "127.0.0.1", port: 8081)!, genesisHeader: Data32(), - secretKey: Ed25519.SecretKey(from: Data32()), + secretKey: Ed25519.SecretKey(from: Data32.random()), presistentStreamHandler: handler1, ephemeralStreamHandler: MockEphemeralStreamHandler(), serverSettings: .defaultSettings, @@ -160,30 +165,29 @@ struct PeerTests { role: .validator, listenAddress: NetAddr(ipAddress: "127.0.0.1", port: 8082)!, genesisHeader: Data32(), - secretKey: Ed25519.SecretKey(from: Data32()), + secretKey: Ed25519.SecretKey(from: Data32.random()), presistentStreamHandler: handler2, ephemeralStreamHandler: MockEphemeralStreamHandler(), serverSettings: .defaultSettings, clientSettings: .defaultSettings ) ) - try? await Task.sleep(for: .milliseconds(100)) + _ = try peer1.connect( to: NetAddr(ipAddress: "127.0.0.1", port: 8082)!, role: .validator ) - _ = try peer2.connect( - to: NetAddr(ipAddress: "127.0.0.1", port: 8081)!, role: .validator - ) - try? await Task.sleep(for: .milliseconds(100)) + + try? await Task.sleep(for: .milliseconds(50)) + peer1.broadcast( kind: .uniqueA, message: .init(kind: .uniqueA, data: Data("hello world".utf8)) ) - try? await Task.sleep(for: .milliseconds(100)) + peer2.broadcast( kind: .uniqueB, message: .init(kind: .uniqueB, data: Data("I am jam".utf8)) ) // Verify last received data - try? await Task.sleep(for: .milliseconds(100)) + try? await Task.sleep(for: .milliseconds(50)) await #expect(handler2.lastReceivedData == Data("hello world".utf8)) await #expect(handler1.lastReceivedData == Data("I am jam".utf8)) } @@ -193,9 +197,9 @@ struct PeerTests { let peer1 = try Peer( options: PeerOptions( role: .validator, - listenAddress: NetAddr(ipAddress: "127.0.0.1", port: 8083)!, + listenAddress: NetAddr(ipAddress: "127.0.0.1", port: 0)!, genesisHeader: Data32(), - secretKey: Ed25519.SecretKey(from: Data32()), + secretKey: Ed25519.SecretKey(from: Data32.random()), presistentStreamHandler: MockPresentStreamHandler(), ephemeralStreamHandler: MockEphemeralStreamHandler(), serverSettings: .defaultSettings, @@ -205,50 +209,96 @@ struct PeerTests { let peer2 = try Peer( options: PeerOptions( role: .validator, - listenAddress: NetAddr(ipAddress: "127.0.0.1", port: 8084)!, + listenAddress: NetAddr(ipAddress: "127.0.0.1", port: 0)!, genesisHeader: Data32(), - secretKey: Ed25519.SecretKey(from: Data32()), + secretKey: Ed25519.SecretKey(from: Data32.random()), presistentStreamHandler: MockPresentStreamHandler(), ephemeralStreamHandler: MockEphemeralStreamHandler(), serverSettings: .defaultSettings, clientSettings: .defaultSettings ) ) - try? await Task.sleep(for: .milliseconds(100)) let connection1 = try peer1.connect( - to: NetAddr(ipAddress: "127.0.0.1", port: 8084)!, role: .validator + to: peer2.listenAddress(), role: .validator ) - try? await Task.sleep(for: .milliseconds(100)) let dataList1 = try await connection1.request( MockRequest(kind: .typeA, data: Data("hello world".utf8)) ) - #expect(dataList1 == Data("hello world".utf8)) + #expect(dataList1 == Data("hello world response".utf8)) + } + + @Test + func duplicatedConnection() async throws { + let secretKey1 = try Ed25519.SecretKey(from: Data32.random()) + let secretKey2 = try Ed25519.SecretKey(from: Data32.random()) + + let peer1 = try Peer( + options: PeerOptions( + role: .validator, + listenAddress: NetAddr(ipAddress: "127.0.0.1", port: 0)!, + genesisHeader: Data32(), + secretKey: secretKey1, + presistentStreamHandler: MockPresentStreamHandler(), + ephemeralStreamHandler: MockEphemeralStreamHandler(), + serverSettings: .defaultSettings, + clientSettings: .defaultSettings + ) + ) + + let peer2 = try Peer( + options: PeerOptions( + role: .validator, + listenAddress: NetAddr(ipAddress: "127.0.0.1", port: 0)!, + genesisHeader: Data32(), + secretKey: secretKey2, + presistentStreamHandler: MockPresentStreamHandler(), + ephemeralStreamHandler: MockEphemeralStreamHandler(), + serverSettings: .defaultSettings, + clientSettings: .defaultSettings + ) + ) - let connection2 = try peer2.connect( - to: NetAddr(ipAddress: "127.0.0.1", port: 8083)!, role: .validator + let connection1 = try peer1.connect( + to: peer2.listenAddress(), role: .validator ) - try? await Task.sleep(for: .milliseconds(100)) - let dataList2 = try await connection2.request( - MockRequest(kind: .typeB, data: Data("I am jam".utf8)) + let connection2 = try peer1.connect( + to: peer2.listenAddress(), role: .validator ) - #expect(dataList2 == Data("I am jam".utf8)) + + #expect(connection1 === connection2) + try await Task.sleep(for: .milliseconds(50)) + + let connection3 = try peer2.connect( + to: peer1.listenAddress(), role: .validator + ) + + #expect(connection1.publicKey == secretKey2.publicKey.data.data) + + await #expect(throws: Error.self) { + try await connection3.request( + MockRequest(kind: .typeB, data: Data()) + ) + } } @Test func multiplePeerBroadcast() async throws { var peers: [Peer] = [] + var handlers: [MockPresentStreamHandler] = [] // Create 100 peer nodes for i in 0 ..< 100 { + let handler = MockPresentStreamHandler() + handlers.append(handler) let peer = try Peer( options: PeerOptions( role: .builder, listenAddress: NetAddr(ipAddress: "127.0.0.1", port: UInt16(7081 + i))!, genesisHeader: Data32(), - secretKey: Ed25519.SecretKey(from: Data32()), - presistentStreamHandler: MockPresentStreamHandler(), + secretKey: Ed25519.SecretKey(from: Data32.random()), + presistentStreamHandler: handler, ephemeralStreamHandler: MockEphemeralStreamHandler(), serverSettings: .defaultSettings, clientSettings: .defaultSettings @@ -257,18 +307,23 @@ struct PeerTests { peers.append(peer) } - try? await Task.sleep(for: .milliseconds(100)) - - // Connect each peer to the next one in a circular network + // Make some connections for i in 0 ..< peers.count { - let nextPeerIndex = (i + 1) % peers.count - _ = try peers[i].connect( - to: NetAddr(ipAddress: "127.0.0.1", port: UInt16(7081 + nextPeerIndex))!, + let peer = peers[i] + let otherPeer = peers[(i + 1) % peers.count] + let conn1 = try peer.connect( + to: otherPeer.listenAddress(), + role: .validator + ) + let otherPeer2 = peers[(i + 2) % peers.count] + let conn2 = try peer.connect( + to: otherPeer2.listenAddress(), role: .validator ) - } - try? await Task.sleep(for: .milliseconds(100)) + try await conn1.ready() + try await conn2.ready() + } // Broadcast a message from each peer for (i, peer) in peers.enumerated() { @@ -280,7 +335,16 @@ struct PeerTests { } // Wait for message propagation - try? await Task.sleep(for: .milliseconds(200)) + try? await Task.sleep(for: .milliseconds(100)) + + // everyone should receive two messages + for (idx, handler) in handlers.enumerated() { + #expect(await handler.receivedData.count == 4) // 2 outgoing + 2 incoming + #expect(await handler.receivedData.contains(Data("Message from peer \((idx + 99) % 100)".utf8))) + #expect(await handler.receivedData.contains(Data("Message from peer \((idx + 98) % 100)".utf8))) + #expect(await handler.receivedData.contains(Data("Message from peer \((idx + 1) % 100)".utf8))) + #expect(await handler.receivedData.contains(Data("Message from peer \((idx + 2) % 100)".utf8))) + } } @Test @@ -288,13 +352,13 @@ struct PeerTests { var peers: [Peer] = [] // Create 100 peer nodes - for i in 0 ..< 100 { + for _ in 0 ..< 100 { let peer = try Peer( options: PeerOptions( role: .builder, - listenAddress: NetAddr(ipAddress: "127.0.0.1", port: UInt16(6091 + i))!, + listenAddress: NetAddr(ipAddress: "127.0.0.1", port: 0)!, genesisHeader: Data32(), - secretKey: Ed25519.SecretKey(from: Data32()), + secretKey: Ed25519.SecretKey(from: Data32.random()), presistentStreamHandler: MockPresentStreamHandler(), ephemeralStreamHandler: MockEphemeralStreamHandler(), serverSettings: .defaultSettings, @@ -305,33 +369,40 @@ struct PeerTests { } // Wait for peers to initialize - try? await Task.sleep(for: .milliseconds(100)) + try? await Task.sleep(for: .milliseconds(50)) + var tasks = [Task]() // Test request-response by having each peer request from the next peer for i in 0 ..< 100 { - let messageData = Data("Request from peer \(i)".utf8) - let port = UInt16(6091 + (i + 1) % 100) - let type = (i + 1) % 2 == 0 ? EphemeralStreamKind.typeA : EphemeralStreamKind.typeB - let response = try await peers[i].connect( - to: NetAddr(ipAddress: "127.0.0.1", port: port)!, - role: .validator - ).request(MockRequest(kind: type, data: messageData)) - #expect(response == messageData, "Peer \(i) should receive correct response") + tasks.append(Task { + let messageData = Data("Request from peer \(i)".utf8) + let otherPeer = peers[(i + 1) % peers.count] + let type = (i + 1) % 2 == 0 ? EphemeralStreamKind.typeA : EphemeralStreamKind.typeB + let response = try await peers[i].connect( + to: otherPeer.listenAddress(), + role: .validator + ).request(MockRequest(kind: type, data: messageData)) + #expect(response == messageData + Data(" response".utf8), "Peer \(i) should receive correct response") + }) + } + + for task in tasks { + try await task.value } } @Test func highConcurrentRequest() async throws { var peers: [Peer] = [] - - // Create 100 peers - for i in 0 ..< 100 { + let peersCount = 50 + // Create peers + for _ in 0 ..< peersCount { let peer = try Peer( options: PeerOptions( role: .validator, - listenAddress: NetAddr(ipAddress: "127.0.0.1", port: UInt16(8300 + i))!, + listenAddress: NetAddr(ipAddress: "127.0.0.1", port: 0)!, genesisHeader: Data32(), - secretKey: Ed25519.SecretKey(from: Data32()), + secretKey: Ed25519.SecretKey(from: Data32.random()), presistentStreamHandler: MockPresentStreamHandler(), ephemeralStreamHandler: MockEphemeralStreamHandler(), serverSettings: .defaultSettings, @@ -341,29 +412,39 @@ struct PeerTests { peers.append(peer) } - for i in 0 ..< peers.count - 1 { - _ = try peers[i].connect( - to: NetAddr(ipAddress: "127.0.0.1", port: UInt16(8300 + i + 1))!, - role: .validator - ) + var connections = [Connection]() + for i in 0 ..< peers.count { + let peer = peers[i] + for j in 0 ..< peers.count { + if i >= j { + continue + } + let otherPeer = peers[j] + let conn = try peer.connect( + to: otherPeer.listenAddress(), + role: .validator + ) + connections.append(conn) + } } - // Allow connections to establish - try? await Task.sleep(for: .milliseconds(100)) + for conn in connections { + try await conn.ready() + } // Send multiple requests from each peer - for peer in peers { - let tasks = (1 ... 88).map { _ in - Task { - let net = try peer.listenAddress() - let random = Int.random(in: 0 ..< 100) - let type = random % 2 == 0 ? EphemeralStreamKind.typeA : EphemeralStreamKind.typeB - let messageData = Data("Concurrent request \(net.description) + \(random)".utf8) - let response = try await peer.connect( - to: net, - role: .validator - ).request(MockRequest(kind: type, data: messageData)) - #expect(response == messageData, "Peer should receive correct response") + for (idx, peer) in peers.enumerated() { + let tasks = (1 ..< peersCount).map { i in + let other = peers[(idx + i) % peers.count] + return Task { + let type = i % 2 == 0 ? EphemeralStreamKind.typeA : EphemeralStreamKind.typeB + let messageData = Data("Concurrent request \(i)".utf8) + let response = try await peer.getConnection( + publicKey: other.publicKey + ) + .unwrap() + .request(MockRequest(kind: type, data: messageData)) + #expect(response == messageData + Data(" response".utf8), "Peer should receive correct response") } } // Wait for all tasks to complete @@ -379,14 +460,14 @@ struct PeerTests { var handles: [MockPresentStreamHandler] = [] // Create 50 peers with unique addresses - for i in 0 ..< 50 { + for _ in 0 ..< 50 { let handle = MockPresentStreamHandler() let peer = try Peer( options: PeerOptions( role: .validator, - listenAddress: NetAddr(ipAddress: "127.0.0.1", port: UInt16(100 + i))!, + listenAddress: NetAddr(ipAddress: "127.0.0.1", port: 0)!, genesisHeader: Data32(), - secretKey: Ed25519.SecretKey(from: Data32()), + secretKey: Ed25519.SecretKey(from: Data32.random()), presistentStreamHandler: handle, ephemeralStreamHandler: MockEphemeralStreamHandler(), serverSettings: .defaultSettings, @@ -399,23 +480,24 @@ struct PeerTests { // Connect each peer to form a fully connected network for i in 0 ..< peers.count { - for j in 0 ..< peers.count where i != j { - _ = try peers[i].connect( - to: NetAddr(ipAddress: "127.0.0.1", port: UInt16(100 + j))!, + let peer = peers[i] + for j in 0 ..< peers.count where i > j { + let otherPeer = peers[j] + let conn = try peer.connect( + to: otherPeer.listenAddress(), role: .validator ) + + try await conn.ready() } } - // Wait for all connections to establish - try? await Task.sleep(for: .seconds(1)) - let centralPeer = peers[0] let messagedata = Data("Sync message".utf8) centralPeer.broadcast(kind: .uniqueA, message: MockRequest(kind: .uniqueA, data: messagedata)) // Wait for message to propagate - try? await Task.sleep(for: .seconds(2)) + try? await Task.sleep(for: .seconds(1)) // Check that each peer received the broadcast for i in 1 ..< handles.count { diff --git a/Node/Sources/Node/NetworkingProtocol/Network.swift b/Node/Sources/Node/NetworkingProtocol/Network.swift index 5e3a3963..31001360 100644 --- a/Node/Sources/Node/NetworkingProtocol/Network.swift +++ b/Node/Sources/Node/NetworkingProtocol/Network.swift @@ -71,6 +71,11 @@ public final class Network: Sendable { try peer.connect(to: to, role: role) } + public func send(to: PeerId, message: CERequest) async throws -> Data { + let conn = try peer.getConnection(publicKey: to.publicKey) ?? peer.connect(to: to.address, role: .builder) + return try await conn.request(message) + } + public func send(to: NetAddr, message: CERequest) async throws -> Data { let conn = try peer.connect(to: to, role: .builder) return try await conn.request(message) diff --git a/Node/Sources/Node/NetworkingProtocol/NetworkManager.swift b/Node/Sources/Node/NetworkingProtocol/NetworkManager.swift index 724be134..44e3afa3 100644 --- a/Node/Sources/Node/NetworkingProtocol/NetworkManager.swift +++ b/Node/Sources/Node/NetworkingProtocol/NetworkManager.swift @@ -71,19 +71,19 @@ public final class NetworkManager: Sendable { } } - private func getAddresses(target: BroadcastTarget) -> Set { + private func getAddresses(target: BroadcastTarget) -> Set> { // TODO: get target from onchain state switch target { case .safroleStep1Validator: // TODO: only send to the selected validator in the spec - devPeers + Set(devPeers.map { .right($0) }) case .currentValidators: // TODO: read onchain state for validators - devPeers + Set(devPeers.map { .right($0) }) } } - private func send(to: NetAddr, message: CERequest) async throws -> Data { + private func send(to: PeerId, message: CERequest) async throws -> Data { try await network.send(to: to, message: message) } @@ -97,7 +97,12 @@ public final class NetworkManager: Sendable { Task { logger.trace("sending message", metadata: ["target": "\(target)", "message": "\(message)"]) let res = await Result { - try await network.send(to: target, message: message) + switch target { + case let .left(peerId): + try await network.send(to: peerId, message: message) + case let .right(address): + try await network.send(to: address, message: message) + } } await responseHandler(res) } @@ -111,7 +116,12 @@ public final class NetworkManager: Sendable { logger.trace("sending message", metadata: ["target": "\(target)", "message": "\(message)"]) // not expecting a response // TODO: handle errors and ensure no data is returned - _ = try await network.send(to: target, message: message) + switch target { + case let .left(peerId): + _ = try await network.send(to: peerId, message: message) + case let .right(address): + _ = try await network.send(to: address, message: message) + } } } } @@ -219,10 +229,16 @@ struct HandlerImpl: NetworkProtocolHandler { switch upMessage { case let .blockAnnouncementHandshake(message): logger.trace("received block announcement handshake: \(message)") - await peerManager.addPeer(address: connection.remoteAddress, handshake: message) + try await peerManager.addPeer( + id: PeerId(publicKey: connection.publicKey.unwrap(), address: connection.remoteAddress), + handshake: message + ) case let .blockAnnouncement(message): logger.trace("received block announcement: \(message)") - await peerManager.updatePeer(address: connection.remoteAddress, message: message) + try await peerManager.updatePeer( + id: PeerId(publicKey: connection.publicKey.unwrap(), address: connection.remoteAddress), + message: message + ) } } diff --git a/Node/Sources/Node/NetworkingProtocol/PeerId.swift b/Node/Sources/Node/NetworkingProtocol/PeerId.swift new file mode 100644 index 00000000..34ec7d4d --- /dev/null +++ b/Node/Sources/Node/NetworkingProtocol/PeerId.swift @@ -0,0 +1,11 @@ +import Foundation + +public struct PeerId: Sendable, Hashable { + public let publicKey: Data + public let address: NetAddr + + public init(publicKey: Data, address: NetAddr) { + self.publicKey = publicKey + self.address = address + } +} diff --git a/Node/Sources/Node/NetworkingProtocol/PeerManager.swift b/Node/Sources/Node/NetworkingProtocol/PeerManager.swift index 2cd9b437..f65462b6 100644 --- a/Node/Sources/Node/NetworkingProtocol/PeerManager.swift +++ b/Node/Sources/Node/NetworkingProtocol/PeerManager.swift @@ -7,7 +7,7 @@ import Utils private let logger = Logger(label: "PeerManager") public struct PeerInfo: Sendable { - public let address: NetAddr + public let id: PeerId public internal(set) var finalized: HashAndSlot public internal(set) var heads: Set = [] @@ -23,29 +23,29 @@ public struct PeerInfo: Sendable { public actor PeerManager: Sendable { private let eventBus: EventBus - public private(set) var peers: [NetAddr: PeerInfo] = [:] + public private(set) var peers: [Data: PeerInfo] = [:] init(eventBus: EventBus) { self.eventBus = eventBus } - func addPeer(address: NetAddr, handshake: BlockAnnouncementHandshake) { + func addPeer(id: PeerId, handshake: BlockAnnouncementHandshake) { var peer = PeerInfo( - address: address, + id: id, finalized: handshake.finalized ) for head in handshake.heads { peer.heads.insert(head) } - peers[address] = peer + peers[id.publicKey] = peer - logger.debug("added peer", metadata: ["address": "\(address)", "finalized": "\(peer.finalized)"]) + logger.debug("added peer", metadata: ["address": "\(id.address)", "publicKey": "\(id.publicKey)", "finalized": "\(peer.finalized)"]) eventBus.publish(NetworkEvents.PeerAdded(info: peer)) } - func updatePeer(address: NetAddr, message: BlockAnnouncement) { + func updatePeer(id: PeerId, message: BlockAnnouncement) { let updatedPeer: PeerInfo - if var peer = peers[address] { + if var peer = peers[id.publicKey] { peer.finalized = message.finalized // purge heads that are older than the finalized head // or if it is the parent of the new block @@ -59,16 +59,18 @@ public actor PeerManager: Sendable { } else { // this shouldn't happen but let's handle it updatedPeer = PeerInfo( - address: address, + id: id, finalized: message.finalized, heads: [ HashAndSlot(hash: message.header.hash, timeslot: message.header.value.timeslot), ] ) } - peers[address] = updatedPeer + peers[id.publicKey] = updatedPeer - logger.debug("updated peer", metadata: ["address": "\(address)", "finalized": "\(updatedPeer.finalized)"]) + logger.debug("updated peer", metadata: [ + "address": "\(id.address)", "publicKey": "\(id.publicKey)", "finalized": "\(updatedPeer.finalized)", + ]) eventBus.publish(NetworkEvents.PeerUpdated(info: updatedPeer, newBlockHeader: message.header)) } } diff --git a/Node/Sources/Node/NetworkingProtocol/SyncManager.swift b/Node/Sources/Node/NetworkingProtocol/SyncManager.swift index 41858774..7fa410b8 100644 --- a/Node/Sources/Node/NetworkingProtocol/SyncManager.swift +++ b/Node/Sources/Node/NetworkingProtocol/SyncManager.swift @@ -32,7 +32,7 @@ public actor SyncManager: Sendable { private var networkBest: HashAndSlot? private var networkFinalizedBest: HashAndSlot? - private var currentRequest: (peer: NetAddr, request: BlockRequest)? + private var currentRequest: (peer: PeerId, request: BlockRequest)? public init(blockchain: Blockchain, network: Network, peerManager: PeerManager, eventBus: EventBus) { self.blockchain = blockchain @@ -91,7 +91,7 @@ public actor SyncManager: Sendable { await bulkSync(currentHead: currentHead) case .syncing: if let newBlockHeader { - importBlock(currentTimeslot: currentHead.timeslot, newHeader: newBlockHeader, peer: info.address) + importBlock(currentTimeslot: currentHead.timeslot, newHeader: newBlockHeader, peer: info.id) } } } @@ -101,18 +101,18 @@ public actor SyncManager: Sendable { return } - for (addr, info) in await peerManager.peers { + for info in await peerManager.peers.values { if let peerBest = info.best, peerBest.timeslot > currentHead.timeslot { let request = BlockRequest( hash: currentHead.hash, direction: .ascendingExcludsive, maxBlocks: min(BLOCK_REQUEST_BLOCK_COUNT, peerBest.timeslot - currentHead.timeslot) ) - currentRequest = (addr, request) - logger.debug("bulk syncing", metadata: ["peer": "\(addr)", "request": "\(request)"]) + currentRequest = (info.id, request) + logger.debug("bulk syncing", metadata: ["peer": "\(info.id)", "request": "\(request)"]) Task { - let resp = try await network.send(to: addr, message: .blockRequest(request)) + let resp = try await network.send(to: info.id, message: .blockRequest(request)) let decoded = try CERequest.decodeResponseForBlockRequest(data: resp, config: blockchain.config) for block in decoded { try await blockchain.importBlock(block) @@ -143,7 +143,7 @@ public actor SyncManager: Sendable { } } - private func importBlock(currentTimeslot: TimeslotIndex, newHeader: HeaderRef, peer: NetAddr) { + private func importBlock(currentTimeslot: TimeslotIndex, newHeader: HeaderRef, peer: PeerId) { logger.debug("importing block", metadata: ["hash": "\(newHeader.hash)", "remote": "\(peer)"]) let blockchain = blockchain let network = network diff --git a/Utils/Sources/Utils/Either.swift b/Utils/Sources/Utils/Either.swift index 35c5a767..625695bb 100644 --- a/Utils/Sources/Utils/Either.swift +++ b/Utils/Sources/Utils/Either.swift @@ -118,3 +118,5 @@ extension Either: EncodedSize where Left: EncodedSize, Right: EncodedSize { return nil } } + +extension Either: Hashable where Left: Hashable, Right: Hashable {}