diff --git a/Networking/Sources/MsQuicSwift/QuicConnection.swift b/Networking/Sources/MsQuicSwift/QuicConnection.swift index f6b3844b..dddea21f 100644 --- a/Networking/Sources/MsQuicSwift/QuicConnection.swift +++ b/Networking/Sources/MsQuicSwift/QuicConnection.swift @@ -204,7 +204,7 @@ private class ConnectionHandle { fileprivate func callbackHandler(event: UnsafePointer) -> QuicStatus { switch event.pointee.Type { case QUIC_CONNECTION_EVENT_PEER_CERTIFICATE_RECEIVED: - logger.trace("Peer certificate received") + logger.debug("Peer certificate received") if let connection { let evtData = event.pointee.PEER_CERTIFICATE_RECEIVED let data: Data? diff --git a/Networking/Sources/Networking/Peer.swift b/Networking/Sources/Networking/Peer.swift index 14e8717e..3f47c6f1 100644 --- a/Networking/Sources/Networking/Peer.swift +++ b/Networking/Sources/Networking/Peer.swift @@ -376,15 +376,16 @@ private struct PeerEventHandler: QuicEventHandler { } func shutdownComplete(_ connection: QuicConnection) { - logger.trace("connection shutdown complete", metadata: ["connectionId": "\(connection.id)"]) + logger.debug("connection shutdown complete", metadata: ["connectionId": "\(connection.id)"]) impl.connections.write { connections in if let conn = connections.byId[connection.id] { - conn.closed() - connections.byId.removeValue(forKey: connection.id) - connections.byAddr.removeValue(forKey: conn.remoteAddress) + // remove publickey first,func closed will change state to closed if let publicKey = conn.publicKey { connections.byPublicKey.removeValue(forKey: publicKey) } + conn.closed() + connections.byId.removeValue(forKey: connection.id) + connections.byAddr.removeValue(forKey: conn.remoteAddress) } } } diff --git a/Networking/Tests/NetworkingTests/PeerTests.swift b/Networking/Tests/NetworkingTests/PeerTests.swift index 53f79386..dd5f14d0 100644 --- a/Networking/Tests/NetworkingTests/PeerTests.swift +++ b/Networking/Tests/NetworkingTests/PeerTests.swift @@ -143,6 +143,163 @@ struct PeerTests { typealias EphemeralHandler = MockEphemeralStreamHandler } + @Test + func largeDataRequest() async throws { + let handler1 = MockPresentStreamHandler() + let handler2 = MockPresentStreamHandler() + // Define the data size, 5MB + let dataSize = 5 * 1024 * 1024 + var largeData = Data(capacity: dataSize) + + // Generate random data + for _ in 0 ..< dataSize { + largeData.append(UInt8.random(in: 0 ... 255)) + } + + let peer1 = try Peer( + options: PeerOptions( + role: .validator, + listenAddress: NetAddr(ipAddress: "127.0.0.1", port: 9085)!, + genesisHeader: Data32(), + secretKey: Ed25519.SecretKey(from: Data32.random()), + presistentStreamHandler: handler1, + ephemeralStreamHandler: MockEphemeralStreamHandler(), + serverSettings: .defaultSettings, + clientSettings: .defaultSettings + ) + ) + + let peer2 = try Peer( + options: PeerOptions( + role: .validator, + listenAddress: NetAddr(ipAddress: "127.0.0.1", port: 9086)!, + genesisHeader: Data32(), + secretKey: Ed25519.SecretKey(from: Data32.random()), + presistentStreamHandler: handler2, + ephemeralStreamHandler: MockEphemeralStreamHandler(), + serverSettings: .defaultSettings, + clientSettings: .defaultSettings + ) + ) + + try? await Task.sleep(for: .milliseconds(50)) + + let connection1 = try peer1.connect( + to: NetAddr(ipAddress: "127.0.0.1", port: 9086)!, role: .validator + ) + try? await Task.sleep(for: .milliseconds(50)) + + let receivedData1 = try await connection1.request( + MockRequest(kind: .typeA, data: largeData) + ) + + // Verify that the received data matches the original large data + #expect(receivedData1 == largeData + Data(" response".utf8)) + + peer1.broadcast( + kind: .uniqueA, message: .init(kind: .uniqueA, data: largeData) + ) + try? await Task.sleep(for: .milliseconds(50)) + + peer2.broadcast( + kind: .uniqueB, message: .init(kind: .uniqueB, data: largeData) + ) + // Verify last received data + try? await Task.sleep(for: .milliseconds(1000)) + await #expect(handler2.lastReceivedData == largeData) + await #expect(handler1.lastReceivedData == largeData) + } + + @Test + func peerFailureRecovery() async throws { + let handler2 = MockPresentStreamHandler() + let messageData = Data("Post-recovery message".utf8) + + let peer1 = try Peer( + options: PeerOptions( + role: .validator, + listenAddress: NetAddr(ipAddress: "127.0.0.1", port: 185)!, + genesisHeader: Data32(), + secretKey: Ed25519.SecretKey(from: Data32.random()), + presistentStreamHandler: MockPresentStreamHandler(), + ephemeralStreamHandler: MockEphemeralStreamHandler(), + serverSettings: .defaultSettings, + clientSettings: .defaultSettings + ) + ) + + let peer2 = try Peer( + options: PeerOptions( + role: .validator, + listenAddress: NetAddr(ipAddress: "127.0.0.1", port: 186)!, + genesisHeader: Data32(), + secretKey: Ed25519.SecretKey(from: Data32.random()), + presistentStreamHandler: handler2, + ephemeralStreamHandler: MockEphemeralStreamHandler(), + serverSettings: .defaultSettings, + clientSettings: .defaultSettings + ) + ) + + let peer3 = try Peer( + options: PeerOptions( + role: .validator, + listenAddress: NetAddr(ipAddress: "127.0.0.1", port: 187)!, + genesisHeader: Data32(), + secretKey: Ed25519.SecretKey(from: Data32.random()), + presistentStreamHandler: handler2, + ephemeralStreamHandler: MockEphemeralStreamHandler(), + serverSettings: .defaultSettings, + clientSettings: .defaultSettings + ) + ) + + try? await Task.sleep(for: .milliseconds(50)) + + let connection = try peer1.connect( + to: NetAddr(ipAddress: "127.0.0.1", port: 186)!, role: .validator + ) + try? await Task.sleep(for: .milliseconds(50)) + + let receivedData = try await connection.request( + MockRequest(kind: .typeA, data: messageData) + ) + + #expect(receivedData == messageData + Data(" response".utf8)) + try? await Task.sleep(for: .milliseconds(50)) + // Simulate a peer failure by disconnecting one peer + connection.close(abort: true) + // Wait to simulate downtime + try? await Task.sleep(for: .milliseconds(200)) + // check the peer is usable & connect to another peer + let connection2 = try peer1.connect( + to: peer3.listenAddress(), + role: .validator + ) + try? await Task.sleep(for: .milliseconds(50)) + let receivedData2 = try await connection2.request( + MockRequest(kind: .typeA, data: messageData) + ) + try? await Task.sleep(for: .milliseconds(50)) + #expect(receivedData2 == messageData + Data(" response".utf8)) + // Reconnect the failing peer + let reconnection = try peer1.connect( + to: peer2.listenAddress(), + role: .validator + ) + try? await Task.sleep(for: .milliseconds(50)) + let recoverData = try await reconnection.request( + MockRequest(kind: .typeA, data: messageData) + ) + try? await Task.sleep(for: .milliseconds(50)) + #expect(recoverData == messageData + Data(" response".utf8)) + peer1.broadcast( + kind: .uniqueC, message: .init(kind: .uniqueC, data: messageData) + ) + try? await Task.sleep(for: .milliseconds(50)) + await #expect(handler2.lastReceivedData == messageData) + } + @Test func peerBroadcast() async throws { let handler1 = MockPresentStreamHandler()