From 7cc93837555a176bb2bcb8de7981692cccacb4b9 Mon Sep 17 00:00:00 2001 From: MacOMNI <414294494@qq.com> Date: Fri, 1 Nov 2024 15:21:55 +0800 Subject: [PATCH] update peer --- Networking/Sources/Networking/Peer.swift | 36 +++++++++++-------- .../Tests/NetworkingTests/PeerTests.swift | 15 ++++---- 2 files changed, 31 insertions(+), 20 deletions(-) diff --git a/Networking/Sources/Networking/Peer.swift b/Networking/Sources/Networking/Peer.swift index cb3559f6..9c43ddb5 100644 --- a/Networking/Sources/Networking/Peer.swift +++ b/Networking/Sources/Networking/Peer.swift @@ -274,10 +274,14 @@ final class PeerImpl: Sendable { // TODO: Add reconnection attempts & Apply exponential backoff delay func reconnect(to address: NetAddr, role: PeerRole) throws { - logger.debug("reconnecting", metadata: ["address": "\(address)", "role": "\(role)"]) Task { - try await Task.sleep(for: .microseconds(1000)) + try await Task.sleep(for: .microseconds(2000)) + logger.debug("reconnecting", metadata: ["to address": "\(address)", "role": "\(role)"]) try connections.write { connections in + if connections.byAddr[address] != nil { + logger.warning("reconnecting to \(address) already connected") + return + } let quicConn = try QuicConnection( handler: PeerEventHandler(self), registration: clientConfiguration.registration, @@ -293,6 +297,7 @@ final class PeerImpl: Sendable { ) connections.byAddr[address] = conn connections.byId[conn.id] = conn + logger.debug("reconnectted", metadata: ["address": "\(address)", "role": "\(role)"]) } } } @@ -436,25 +441,27 @@ private struct PeerEventHandler: QuicEventHandler { func shutdownComplete(_ connection: QuicConnection) { logger.debug("connection shutdown complete", metadata: ["connectionId": "\(connection.id)"]) - impl.connections.write { connections in + let conn = impl.connections.read { connections in + connections.byId[connection.id] + } + let needReconnect = impl.connections.write { connections in if let conn = connections.byId[connection.id] { let needReconnect = conn.needReconnect if let publicKey = conn.publicKey { connections.byPublicKey.removeValue(forKey: publicKey) } - conn.closed() connections.byId.removeValue(forKey: connection.id) connections.byAddr.removeValue(forKey: conn.remoteAddress) - if needReconnect { - do { - try impl.reconnect(to: conn.remoteAddress, role: conn.role) - } catch { - logger.error("reconnect failed", metadata: ["error": "\(error)"]) - } - - } else { - logger.debug("Connection closed", metadata: ["connectionId": "\(connection.id)"]) - } + conn.closed() + return needReconnect + } + return false + } + if needReconnect, let address = conn?.remoteAddress, let role = conn?.role { + do { + try impl.reconnect(to: address, role: role) + } catch { + logger.error("reconnect failed", metadata: ["error": "\(error)"]) } } } @@ -469,6 +476,7 @@ private struct PeerEventHandler: QuicEventHandler { impl.connections.write { connections in if let conn = connections.byId[connection.id] { if let publicKey = conn.publicKey { + connections.byPublicKey.removeValue(forKey: publicKey) conn.reconnect(publicKey: publicKey) } } diff --git a/Networking/Tests/NetworkingTests/PeerTests.swift b/Networking/Tests/NetworkingTests/PeerTests.swift index 75e22052..b6febad3 100644 --- a/Networking/Tests/NetworkingTests/PeerTests.swift +++ b/Networking/Tests/NetworkingTests/PeerTests.swift @@ -270,7 +270,7 @@ struct PeerTests { clientSettings: .defaultSettings ) ) - + try print("peer1: \(peer1.listenAddress())") let peer2 = try Peer( options: PeerOptions( role: .validator, @@ -283,12 +283,13 @@ struct PeerTests { clientSettings: .defaultSettings ) ) - + try print("peer2: \(peer2.listenAddress())") try? await Task.sleep(for: .milliseconds(100)) let connection = try peer1.connect( to: peer2.listenAddress(), role: .validator ) + print("connection addr: \(connection.remoteAddress)") try? await Task.sleep(for: .milliseconds(100)) let receivedData = try await connection.request( @@ -305,7 +306,9 @@ struct PeerTests { kind: .uniqueC, message: .init(kind: .uniqueC, data: messageData) ) try? await Task.sleep(for: .milliseconds(1000)) - await #expect(handler2.lastReceivedData == messageData) + let lastReceivedData = await handler2.lastReceivedData + print("lastReceivedData \(lastReceivedData?.toHexString() ?? "none")") + #expect(lastReceivedData == messageData) } @Test @@ -401,7 +404,7 @@ struct PeerTests { #expect(receivedData == messageData + Data(" response".utf8)) try? await Task.sleep(for: .milliseconds(100)) // Simulate a peer failure by disconnecting one peer - connection.close(abort: true) + connection.close(abort: false) // Wait to simulate downtime try? await Task.sleep(for: .milliseconds(200)) // Reconnect the failing peer @@ -416,10 +419,10 @@ struct PeerTests { try? await Task.sleep(for: .milliseconds(100)) #expect(recoverData == messageData + Data(" response".utf8)) peer1.broadcast( - kind: .uniqueC, message: .init(kind: .uniqueC, data: messageData) + kind: .uniqueC, message: .init(kind: .uniqueC, data: recoverData) ) try? await Task.sleep(for: .milliseconds(500)) - await #expect(handler2.lastReceivedData == messageData) + await #expect(handler2.lastReceivedData == recoverData) } @Test