diff --git a/Networking/Sources/Networking/Peer.swift b/Networking/Sources/Networking/Peer.swift index fe5d5911..76e9a8eb 100644 --- a/Networking/Sources/Networking/Peer.swift +++ b/Networking/Sources/Networking/Peer.swift @@ -16,6 +16,27 @@ public enum PeerRole: Sendable, Hashable { // case proxy // not yet specified } +struct ReconnectState { + var attempt: Int + var delay: TimeInterval + + init() { + attempt = 0 + delay = 1 + } + + // Initializer with custom values + init(attempt: Int = 0, delay: TimeInterval = 1) { + self.attempt = attempt + self.delay = delay + } + + mutating func applyBackoff() { + attempt += 1 + delay *= 2 + } +} + public struct PeerOptions: Sendable { public var role: PeerRole public var listenAddress: NetAddr @@ -50,7 +71,7 @@ public struct PeerOptions: Sendable { } } -// TODO: reconnects, reopen UP stream, peer reputation system to ban peers not following the protocol +// TODO: reopen UP stream, peer reputation system to ban peers not following the protocol public final class Peer: Sendable { private let impl: PeerImpl @@ -61,7 +82,6 @@ public final class Peer: Sendable { } public let publicKey: Data - public init(options: PeerOptions) throws { let logger = Logger(label: "Peer".uniqueId) @@ -214,7 +234,9 @@ final class PeerImpl: Sendable { fileprivate let connections: ThreadSafeContainer = .init(.init()) fileprivate let streams: ThreadSafeContainer<[UniqueId: Stream]> = .init([:]) + fileprivate let reconnectStates: ThreadSafeContainer<[NetAddr: ReconnectState]> = .init([:]) + let reconnectMaxRetryAttempts = 5 let presistentStreamHandler: Handler.PresistentHandler let ephemeralStreamHandler: Handler.EphemeralHandler @@ -271,29 +293,43 @@ final class PeerImpl: Sendable { } } - // TODO: Add reconnection attempts & Apply exponential backoff delay func reconnect(to address: NetAddr, role: PeerRole) throws { - logger.debug("reconnecting", metadata: ["to address": "\(address)", "role": "\(role)"]) - try connections.write { connections in - if connections.byAddr[address] != nil { - logger.warning("reconnecting to \(address) already connected") - return + let state = reconnectStates.read { reconnectStates in + reconnectStates[address] ?? .init() + } + if state.attempt < reconnectMaxRetryAttempts { + reconnectStates.write { reconnectStates in + if var state = reconnectStates[address] { + state.applyBackoff() + reconnectStates[address] = state + } } - let quicConn = try QuicConnection( - handler: PeerEventHandler(self), - registration: clientConfiguration.registration, - configuration: clientConfiguration - ) - try quicConn.connect(to: address) - let conn = Connection( - quicConn, - impl: self, - role: role, - remoteAddress: address, - initiatedByLocal: true - ) - connections.byAddr[address] = conn - connections.byId[conn.id] = conn + Task { + try await Task.sleep(for: .seconds(state.delay)) + try connections.write { connections in + if connections.byAddr[address] != nil { + logger.warning("reconnecting to \(address) already connected") + return + } + let quicConn = try QuicConnection( + handler: PeerEventHandler(self), + registration: clientConfiguration.registration, + configuration: clientConfiguration + ) + try quicConn.connect(to: address) + let conn = Connection( + quicConn, + impl: self, + role: role, + remoteAddress: address, + initiatedByLocal: true + ) + connections.byAddr[address] = conn + connections.byId[conn.id] = conn + } + } + } else { + logger.warning("reconnect attempt exceeded max attempts") } } @@ -417,6 +453,10 @@ private struct PeerEventHandler: QuicEventHandler { ) return } + // Check if the connection is already reconnected + impl.reconnectStates.write { reconnectStates in + reconnectStates[conn.remoteAddress] = nil + } if conn.initiatedByLocal { for kind in Handler.PresistentHandler.StreamKind.allCases { diff --git a/Networking/Tests/NetworkingTests/MockPeerEventTests.swift b/Networking/Tests/NetworkingTests/MockPeerEventTests.swift index 8c2ef262..71b4793a 100644 --- a/Networking/Tests/NetworkingTests/MockPeerEventTests.swift +++ b/Networking/Tests/NetworkingTests/MockPeerEventTests.swift @@ -147,12 +147,8 @@ final class MockPeerEventTests { configuration: clientConfiguration ) - // Attempt to connect try clientConnection.connect(to: listenAddress) - let stream1 = try clientConnection.createStream() - try stream1.send(data: Data("test data 1".utf8)) - - try? await Task.sleep(for: .milliseconds(100)) + try await Task.sleep(for: .milliseconds(100)) let (_, reason) = clientHandler.events.value.compactMap { switch $0 { case let .shutdownInitiated(connection, reason): @@ -212,7 +208,7 @@ final class MockPeerEventTests { let stream1 = try clientConnection.createStream() try stream1.send(data: Data("test data 1".utf8)) - try? await Task.sleep(for: .milliseconds(100)) + try await Task.sleep(for: .milliseconds(100)) let (_, info) = serverHandler.events.value.compactMap { switch $0 { case let .newConnection(_, connection, info): @@ -266,14 +262,9 @@ final class MockPeerEventTests { registration: registration, configuration: clientConfiguration ) - - // Attempt to connect try clientConnection.connect(to: listenAddress) - let stream1 = try clientConnection.createStream() - try stream1.send(data: Data("test data 1".utf8)) - - try? await Task.sleep(for: .milliseconds(100)) - let (_, reason) = serverHandler.events.value.compactMap { + try await Task.sleep(for: .milliseconds(100)) + let (_, reason) = clientHandler.events.value.compactMap { switch $0 { case let .shutdownInitiated(connection, reason): (connection, reason) as (QuicConnection, ConnectionCloseReason)? diff --git a/Networking/Tests/NetworkingTests/PeerTests.swift b/Networking/Tests/NetworkingTests/PeerTests.swift index 0ec28edf..ba541635 100644 --- a/Networking/Tests/NetworkingTests/PeerTests.swift +++ b/Networking/Tests/NetworkingTests/PeerTests.swift @@ -297,12 +297,12 @@ struct PeerTests { try? await Task.sleep(for: .milliseconds(100)) // Simulate abnormal shutdown of connections connection.close(abort: true) - // Wait to simulate downtime - try? await Task.sleep(for: .milliseconds(200)) + // Wait to simulate downtime & reconnected 3~5s + try? await Task.sleep(for: .milliseconds(3000)) peer1.broadcast( kind: .uniqueC, message: .init(kind: .uniqueC, data: messageData) ) - try? await Task.sleep(for: .milliseconds(1000)) + try await Task.sleep(for: .milliseconds(1000)) let lastReceivedData = await handler2.lastReceivedData #expect(lastReceivedData == messageData) } @@ -693,12 +693,9 @@ struct PeerTests { } var connections = [Connection]() - for i in 0 ..< peers.count { + for i in 0 ..< peersCount { let peer = peers[i] - for j in 0 ..< peers.count { - if i >= j { - continue - } + for j in i + 1 ..< peersCount { let otherPeer = peers[j] let conn = try peer.connect( to: otherPeer.listenAddress(),