From f7e17b631b48ef4cf00815e099368bf31c9c2e84 Mon Sep 17 00:00:00 2001 From: MacOMNI <414294494@qq.com> Date: Fri, 1 Nov 2024 10:49:27 +0800 Subject: [PATCH] update p2p reconnect & test --- Networking/Sources/Networking/Peer.swift | 15 ++---- .../Tests/NetworkingTests/PeerTests.swift | 54 +++++++++++++++++-- 2 files changed, 55 insertions(+), 14 deletions(-) diff --git a/Networking/Sources/Networking/Peer.swift b/Networking/Sources/Networking/Peer.swift index e60e173d..cb3559f6 100644 --- a/Networking/Sources/Networking/Peer.swift +++ b/Networking/Sources/Networking/Peer.swift @@ -435,15 +435,10 @@ private struct PeerEventHandler: QuicEventHandler { } func shutdownComplete(_ connection: QuicConnection) { - logger.info("connection shutdown complete", metadata: ["connectionId": "\(connection.id)"]) -// let conn = impl.connections.read { connections in -// connections.byId[connection.id] -// } -// let needReconnect = conn.needReconnect + logger.debug("connection shutdown complete", metadata: ["connectionId": "\(connection.id)"]) impl.connections.write { connections in if let conn = connections.byId[connection.id] { let needReconnect = conn.needReconnect - // remove publickey first,func closed will change state to closed if let publicKey = conn.publicKey { connections.byPublicKey.removeValue(forKey: publicKey) } @@ -458,19 +453,19 @@ private struct PeerEventHandler: QuicEventHandler { } } else { - logger.info("Connection closed", metadata: ["connectionId": "\(connection.id)"]) + logger.debug("Connection closed", metadata: ["connectionId": "\(connection.id)"]) } } } } func shutdownInitiated(_ connection: QuicConnection, reason: ConnectionCloseReason) { - logger.info( + logger.debug( "Shutdown initiated", metadata: ["connectionId": "\(connection.id)", "reason": "\(reason)"] ) if shouldReconnect(basedOn: reason) { - logger.info("shouldReconnect true", metadata: ["connectionId": "\(connection.id)"]) + logger.debug("shouldReconnect ", metadata: ["connectionId": "\(connection.id)"]) impl.connections.write { connections in if let conn = connections.byId[connection.id] { if let publicKey = conn.publicKey { @@ -478,8 +473,6 @@ private struct PeerEventHandler: QuicEventHandler { } } } - } else { - logger.info("Closing", metadata: ["connectionId": "\(connection.id)"]) } } diff --git a/Networking/Tests/NetworkingTests/PeerTests.swift b/Networking/Tests/NetworkingTests/PeerTests.swift index 045d6a7a..75e22052 100644 --- a/Networking/Tests/NetworkingTests/PeerTests.swift +++ b/Networking/Tests/NetworkingTests/PeerTests.swift @@ -254,7 +254,7 @@ struct PeerTests { } @Test - func connReconnect() async throws { + func connectionNeedToReconnect() async throws { let handler2 = MockPresentStreamHandler() let messageData = Data("Post-recovery message".utf8) @@ -297,7 +297,7 @@ struct PeerTests { #expect(receivedData == messageData + Data(" response".utf8)) try? await Task.sleep(for: .milliseconds(100)) - // Simulate a peer failure by disconnecting one peer + // Simulate abnormal shutdown of connections connection.close(abort: true) // Wait to simulate downtime try? await Task.sleep(for: .milliseconds(200)) @@ -309,7 +309,55 @@ struct PeerTests { } @Test - func peerFailureRecovery() async throws { + func ConnectionNoNeedToReconnect() async throws { + let handler2 = MockPresentStreamHandler() + let messageData = Data("Post-recovery message".utf8) + + let peer1 = try Peer( + options: PeerOptions( + role: .validator, + listenAddress: NetAddr(ipAddress: "127.0.0.1", port: 0)!, + genesisHeader: Data32(), + secretKey: Ed25519.SecretKey(from: Data32.random()), + presistentStreamHandler: MockPresentStreamHandler(), + ephemeralStreamHandler: MockEphemeralStreamHandler(), + serverSettings: .defaultSettings, + clientSettings: .defaultSettings + ) + ) + + let peer2 = try Peer( + options: PeerOptions( + role: .validator, + listenAddress: NetAddr(ipAddress: "127.0.0.1", port: 0)!, + genesisHeader: Data32(), + secretKey: Ed25519.SecretKey(from: Data32.random()), + presistentStreamHandler: handler2, + ephemeralStreamHandler: MockEphemeralStreamHandler(), + serverSettings: .defaultSettings, + clientSettings: .defaultSettings + ) + ) + + try? await Task.sleep(for: .milliseconds(100)) + + let connection = try peer1.connect( + to: peer2.listenAddress(), role: .validator + ) + try? await Task.sleep(for: .milliseconds(100)) + // Simulate regular shutdown of connections + connection.close(abort: false) + // Wait to simulate downtime + try? await Task.sleep(for: .milliseconds(200)) + peer1.broadcast( + kind: .uniqueC, message: .init(kind: .uniqueC, data: messageData) + ) + try? await Task.sleep(for: .milliseconds(1000)) + await #expect(handler2.lastReceivedData == nil) + } + + @Test + func connectionManualReconnect() async throws { let handler2 = MockPresentStreamHandler() let messageData = Data("Post-recovery message".utf8)