diff --git a/Networking/Sources/Networking/Peer.swift b/Networking/Sources/Networking/Peer.swift index 3f47c6f1..e1dcf626 100644 --- a/Networking/Sources/Networking/Peer.swift +++ b/Networking/Sources/Networking/Peer.swift @@ -303,9 +303,10 @@ private struct PeerEventHandler: QuicEventHandler { connections.byId[connection.id] } guard let conn else { - logger.warning("Trying to open connection but connection is gone?", metadata: ["connectionId": "\(connection.id)"]) + logger.warning("Attempt to open but connection is absent", metadata: ["connectionId": "\(connection.id)"]) return .code(.connectionRefused) } + do { let (publicKey, alternativeName) = try parseCertificate(data: certificate, type: .x509) logger.trace("Certificate parsed", metadata: [ @@ -313,39 +314,38 @@ private struct PeerEventHandler: QuicEventHandler { "publicKey": "\(publicKey.toHexString())", "alternativeName": "\(alternativeName)", ]) - if alternativeName != generateSubjectAlternativeName(pubkey: publicKey) { - return .code(.badCert) - } - if publicKey == impl.publicKey { - // self connection - logger.trace("self connection rejected", metadata: [ - "connectionId": "\(connection.id)", - "publicKey": "\(publicKey.toHexString())", - ]) + // Self connection detected + logger.trace("Rejecting self-connection", metadata: ["connectionId": "\(connection.id)"]) return .code(.connectionRefused) } - + if alternativeName != generateSubjectAlternativeName(pubkey: publicKey) { + return .code(.badCert) + } // TODO: verify if it is current or next validator + // Check for an existing connection by public key return try impl.connections.write { connections in if connections.byPublicKey.keys.contains(publicKey) { - // duplicated connection - logger.debug("duplicated connection rejected", metadata: [ - "connectionId": "\(connection.id)", - "publicKey": "\(publicKey.toHexString())", - ]) - // TODO: write a test for this - return .code(.connectionRefused) + // Deterministically decide based on public key comparison + if !publicKey.lexicographicallyPrecedes(impl.publicKey) { + connections.byPublicKey[publicKey] = conn + try conn.opened(publicKey: publicKey) + return .code(.success) + } else { + logger.debug("Rejecting duplicate connection by rule", metadata: [ + "connectionId": "\(connection.id)", "publicKey": "\(publicKey.toHexString())", + ]) + return .code(.connectionRefused) + } + } else { + connections.byPublicKey[publicKey] = conn + try conn.opened(publicKey: publicKey) + return .code(.success) } - connections.byPublicKey[publicKey] = conn - try conn.opened(publicKey: publicKey) - return .code(.success) } } catch { - logger.warning("Failed to parse certificate", metadata: [ - "connectionId": "\(connection.id)", - "error": "\(error)"]) + logger.warning("Certificate parsing failed", metadata: ["connectionId": "\(connection.id)", "error": "\(error)"]) return .code(.badCert) } } diff --git a/Networking/Tests/NetworkingTests/PeerTests.swift b/Networking/Tests/NetworkingTests/PeerTests.swift index dd5f14d0..f776ad9c 100644 --- a/Networking/Tests/NetworkingTests/PeerTests.swift +++ b/Networking/Tests/NetworkingTests/PeerTests.swift @@ -143,6 +143,46 @@ struct PeerTests { typealias EphemeralHandler = MockEphemeralStreamHandler } + @Test + func concurrentPeerConnection() async throws { + let peer1 = try Peer( + options: PeerOptions( + role: .validator, + listenAddress: NetAddr(ipAddress: "127.0.0.1", port: 0)!, + genesisHeader: Data32(), + secretKey: Ed25519.SecretKey(from: Data32.random()), + presistentStreamHandler: MockPresentStreamHandler(), + ephemeralStreamHandler: MockEphemeralStreamHandler(), + serverSettings: .defaultSettings, + clientSettings: .defaultSettings + ) + ) + let peer2 = try Peer( + options: PeerOptions( + role: .validator, + listenAddress: NetAddr(ipAddress: "127.0.0.1", port: 0)!, + genesisHeader: Data32(), + secretKey: Ed25519.SecretKey(from: Data32.random()), + presistentStreamHandler: MockPresentStreamHandler(), + ephemeralStreamHandler: MockEphemeralStreamHandler(), + serverSettings: .defaultSettings, + clientSettings: .defaultSettings + ) + ) + + let connection1 = try peer1.connect(to: peer2.listenAddress(), role: .validator) + let connection2 = try peer2.connect(to: peer1.listenAddress(), role: .validator) + try? await Task.sleep(for: .milliseconds(50)) + if !connection1.isClosed { + let data = try await connection1.request(MockRequest(kind: .typeA, data: Data("hello world".utf8))) + #expect(data == Data("hello world response".utf8)) + } + if !connection2.isClosed { + let data = try await connection2.request(MockRequest(kind: .typeA, data: Data("hello world".utf8))) + #expect(data == Data("hello world response".utf8)) + } + } + @Test func largeDataRequest() async throws { let handler1 = MockPresentStreamHandler() @@ -159,7 +199,7 @@ struct PeerTests { let peer1 = try Peer( options: PeerOptions( role: .validator, - listenAddress: NetAddr(ipAddress: "127.0.0.1", port: 9085)!, + listenAddress: NetAddr(ipAddress: "127.0.0.1", port: 0)!, genesisHeader: Data32(), secretKey: Ed25519.SecretKey(from: Data32.random()), presistentStreamHandler: handler1, @@ -172,7 +212,7 @@ struct PeerTests { let peer2 = try Peer( options: PeerOptions( role: .validator, - listenAddress: NetAddr(ipAddress: "127.0.0.1", port: 9086)!, + listenAddress: NetAddr(ipAddress: "127.0.0.1", port: 0)!, genesisHeader: Data32(), secretKey: Ed25519.SecretKey(from: Data32.random()), presistentStreamHandler: handler2, @@ -185,7 +225,7 @@ struct PeerTests { try? await Task.sleep(for: .milliseconds(50)) let connection1 = try peer1.connect( - to: NetAddr(ipAddress: "127.0.0.1", port: 9086)!, role: .validator + to: peer2.listenAddress(), role: .validator ) try? await Task.sleep(for: .milliseconds(50)) @@ -218,7 +258,7 @@ struct PeerTests { let peer1 = try Peer( options: PeerOptions( role: .validator, - listenAddress: NetAddr(ipAddress: "127.0.0.1", port: 185)!, + listenAddress: NetAddr(ipAddress: "127.0.0.1", port: 0)!, genesisHeader: Data32(), secretKey: Ed25519.SecretKey(from: Data32.random()), presistentStreamHandler: MockPresentStreamHandler(), @@ -231,7 +271,7 @@ struct PeerTests { let peer2 = try Peer( options: PeerOptions( role: .validator, - listenAddress: NetAddr(ipAddress: "127.0.0.1", port: 186)!, + listenAddress: NetAddr(ipAddress: "127.0.0.1", port: 0)!, genesisHeader: Data32(), secretKey: Ed25519.SecretKey(from: Data32.random()), presistentStreamHandler: handler2, @@ -244,7 +284,7 @@ struct PeerTests { let peer3 = try Peer( options: PeerOptions( role: .validator, - listenAddress: NetAddr(ipAddress: "127.0.0.1", port: 187)!, + listenAddress: NetAddr(ipAddress: "127.0.0.1", port: 0)!, genesisHeader: Data32(), secretKey: Ed25519.SecretKey(from: Data32.random()), presistentStreamHandler: handler2, @@ -257,7 +297,7 @@ struct PeerTests { try? await Task.sleep(for: .milliseconds(50)) let connection = try peer1.connect( - to: NetAddr(ipAddress: "127.0.0.1", port: 186)!, role: .validator + to: peer2.listenAddress(), role: .validator ) try? await Task.sleep(for: .milliseconds(50)) @@ -308,7 +348,7 @@ struct PeerTests { let peer1 = try Peer( options: PeerOptions( role: .validator, - listenAddress: NetAddr(ipAddress: "127.0.0.1", port: 8081)!, + listenAddress: NetAddr(ipAddress: "127.0.0.1", port: 0)!, genesisHeader: Data32(), secretKey: Ed25519.SecretKey(from: Data32.random()), presistentStreamHandler: handler1, @@ -320,7 +360,7 @@ struct PeerTests { let peer2 = try Peer( options: PeerOptions( role: .validator, - listenAddress: NetAddr(ipAddress: "127.0.0.1", port: 8082)!, + listenAddress: NetAddr(ipAddress: "127.0.0.1", port: 0)!, genesisHeader: Data32(), secretKey: Ed25519.SecretKey(from: Data32.random()), presistentStreamHandler: handler2, @@ -331,7 +371,7 @@ struct PeerTests { ) _ = try peer1.connect( - to: NetAddr(ipAddress: "127.0.0.1", port: 8082)!, role: .validator + to: peer2.listenAddress(), role: .validator ) try? await Task.sleep(for: .milliseconds(50)) @@ -344,7 +384,7 @@ struct PeerTests { kind: .uniqueB, message: .init(kind: .uniqueB, data: Data("I am jam".utf8)) ) // Verify last received data - try? await Task.sleep(for: .milliseconds(50)) + try? await Task.sleep(for: .milliseconds(100)) await #expect(handler2.lastReceivedData == Data("hello world".utf8)) await #expect(handler1.lastReceivedData == Data("I am jam".utf8)) } @@ -383,6 +423,7 @@ struct PeerTests { let dataList1 = try await connection1.request( MockRequest(kind: .typeA, data: Data("hello world".utf8)) ) + try? await Task.sleep(for: .milliseconds(100)) #expect(dataList1 == Data("hello world response".utf8)) } @@ -452,7 +493,7 @@ struct PeerTests { let peer = try Peer( options: PeerOptions( role: .builder, - listenAddress: NetAddr(ipAddress: "127.0.0.1", port: UInt16(7081 + i))!, + listenAddress: NetAddr(ipAddress: "127.0.0.1", port: 0)!, genesisHeader: Data32(), secretKey: Ed25519.SecretKey(from: Data32.random()), presistentStreamHandler: handler,