From 3d5532c9a3bf351cc9c96084c0c53696ce91ef93 Mon Sep 17 00:00:00 2001 From: MacOMNI <414294494@qq.com> Date: Wed, 30 Oct 2024 07:11:16 +0800 Subject: [PATCH] update more test --- .../Sources/MsQuicSwift/QuicConnection.swift | 2 +- Networking/Sources/Networking/Peer.swift | 44 +++---------------- .../Tests/NetworkingTests/PeerTests.swift | 41 ++++++++++++----- 3 files changed, 37 insertions(+), 50 deletions(-) diff --git a/Networking/Sources/MsQuicSwift/QuicConnection.swift b/Networking/Sources/MsQuicSwift/QuicConnection.swift index f6b3844b..dddea21f 100644 --- a/Networking/Sources/MsQuicSwift/QuicConnection.swift +++ b/Networking/Sources/MsQuicSwift/QuicConnection.swift @@ -204,7 +204,7 @@ private class ConnectionHandle { fileprivate func callbackHandler(event: UnsafePointer) -> QuicStatus { switch event.pointee.Type { case QUIC_CONNECTION_EVENT_PEER_CERTIFICATE_RECEIVED: - logger.trace("Peer certificate received") + logger.debug("Peer certificate received") if let connection { let evtData = event.pointee.PEER_CERTIFICATE_RECEIVED let data: Data? diff --git a/Networking/Sources/Networking/Peer.swift b/Networking/Sources/Networking/Peer.swift index 5d00293f..3f47c6f1 100644 --- a/Networking/Sources/Networking/Peer.swift +++ b/Networking/Sources/Networking/Peer.swift @@ -115,41 +115,6 @@ public final class Peer: Sendable { try listener.listenAddress() } - public func reconnect(to address: NetAddr, role: PeerRole) throws -> Connection { - let conn = impl.connections.read { connections in - connections.byAddr[address] - } - if let conn { - print("conn \(conn.id.name)") - } else { - print("reconnect") - } - return try impl.connections.write { connections in - if let curr = connections.byAddr[address] { - return curr - } - - logger.info("connecting to peer", metadata: ["address": "\(address)", "role": "\(role)"]) - - let quicConn = try QuicConnection( - handler: PeerEventHandler(self.impl), - registration: self.impl.clientConfiguration.registration, - configuration: self.impl.clientConfiguration - ) - try quicConn.connect(to: address) - let conn = Connection( - quicConn, - impl: self.impl, - role: role, - remoteAddress: address, - initiatedByLocal: true - ) - connections.byAddr[address] = conn - connections.byId[conn.id] = conn - return conn - } - } - // TODO: see if we can remove the role parameter public func connect(to address: NetAddr, role: PeerRole) throws -> Connection { let conn = impl.connections.read { connections in @@ -411,15 +376,16 @@ private struct PeerEventHandler: QuicEventHandler { } func shutdownComplete(_ connection: QuicConnection) { - logger.info("connection shutdown complete", metadata: ["connectionId": "\(connection.id)"]) + logger.debug("connection shutdown complete", metadata: ["connectionId": "\(connection.id)"]) impl.connections.write { connections in if let conn = connections.byId[connection.id] { - conn.closed() - connections.byId.removeValue(forKey: connection.id) - connections.byAddr.removeValue(forKey: conn.remoteAddress) + // remove publickey first,func closed will change state to closed if let publicKey = conn.publicKey { connections.byPublicKey.removeValue(forKey: publicKey) } + conn.closed() + connections.byId.removeValue(forKey: connection.id) + connections.byAddr.removeValue(forKey: conn.remoteAddress) } } } diff --git a/Networking/Tests/NetworkingTests/PeerTests.swift b/Networking/Tests/NetworkingTests/PeerTests.swift index 5b717ef8..dd5f14d0 100644 --- a/Networking/Tests/NetworkingTests/PeerTests.swift +++ b/Networking/Tests/NetworkingTests/PeerTests.swift @@ -212,7 +212,6 @@ struct PeerTests { @Test func peerFailureRecovery() async throws { - let handler1 = MockPresentStreamHandler() let handler2 = MockPresentStreamHandler() let messageData = Data("Post-recovery message".utf8) @@ -222,7 +221,7 @@ struct PeerTests { listenAddress: NetAddr(ipAddress: "127.0.0.1", port: 185)!, genesisHeader: Data32(), secretKey: Ed25519.SecretKey(from: Data32.random()), - presistentStreamHandler: handler1, + presistentStreamHandler: MockPresentStreamHandler(), ephemeralStreamHandler: MockEphemeralStreamHandler(), serverSettings: .defaultSettings, clientSettings: .defaultSettings @@ -242,6 +241,19 @@ struct PeerTests { ) ) + let peer3 = try Peer( + options: PeerOptions( + role: .validator, + listenAddress: NetAddr(ipAddress: "127.0.0.1", port: 187)!, + genesisHeader: Data32(), + secretKey: Ed25519.SecretKey(from: Data32.random()), + presistentStreamHandler: handler2, + ephemeralStreamHandler: MockEphemeralStreamHandler(), + serverSettings: .defaultSettings, + clientSettings: .defaultSettings + ) + ) + try? await Task.sleep(for: .milliseconds(50)) let connection = try peer1.connect( @@ -252,31 +264,40 @@ struct PeerTests { let receivedData = try await connection.request( MockRequest(kind: .typeA, data: messageData) ) + #expect(receivedData == messageData + Data(" response".utf8)) try? await Task.sleep(for: .milliseconds(50)) // Simulate a peer failure by disconnecting one peer connection.close(abort: true) // Wait to simulate downtime - try? await Task.sleep(for: .milliseconds(10000)) + try? await Task.sleep(for: .milliseconds(200)) + // check the peer is usable & connect to another peer + let connection2 = try peer1.connect( + to: peer3.listenAddress(), + role: .validator + ) + try? await Task.sleep(for: .milliseconds(50)) + let receivedData2 = try await connection2.request( + MockRequest(kind: .typeA, data: messageData) + ) + try? await Task.sleep(for: .milliseconds(50)) + #expect(receivedData2 == messageData + Data(" response".utf8)) // Reconnect the failing peer - let reconnect = try peer1.reconnect( + let reconnection = try peer1.connect( to: peer2.listenAddress(), role: .validator ) try? await Task.sleep(for: .milliseconds(50)) - - let rereceivedData = try await reconnect.request( + let recoverData = try await reconnection.request( MockRequest(kind: .typeA, data: messageData) ) try? await Task.sleep(for: .milliseconds(50)) - #expect(rereceivedData == messageData + Data(" response".utf8)) + #expect(recoverData == messageData + Data(" response".utf8)) peer1.broadcast( - kind: .uniqueB, message: .init(kind: .uniqueB, data: messageData) + kind: .uniqueC, message: .init(kind: .uniqueC, data: messageData) ) - // Verify last received data try? await Task.sleep(for: .milliseconds(50)) await #expect(handler2.lastReceivedData == messageData) - try? await Task.sleep(for: .milliseconds(50)) } @Test