diff --git a/Networking/Sources/Networking/Connection.swift b/Networking/Sources/Networking/Connection.swift index 686f3f76..2f218c8f 100644 --- a/Networking/Sources/Networking/Connection.swift +++ b/Networking/Sources/Networking/Connection.swift @@ -107,12 +107,6 @@ public final class Connection: Sendable, ConnectionInfoP } } - // TODO: Add reconnection attempts & Apply exponential backoff delay - func reconnecting() throws { - let addr = try connection.getRemoteAddress() - print("connection.getRemoteAddress() \(addr)") - } - public var isClosed: Bool { state.read { switch $0 { diff --git a/Networking/Sources/Networking/Peer.swift b/Networking/Sources/Networking/Peer.swift index b6443192..e60e173d 100644 --- a/Networking/Sources/Networking/Peer.swift +++ b/Networking/Sources/Networking/Peer.swift @@ -75,12 +75,14 @@ public final class Peer: Sendable { let registration = try QuicRegistration() let serverConfiguration = try QuicConfiguration( - registration: registration, pkcs12: pkcs12, alpns: allAlpns, client: false, settings: options.serverSettings + registration: registration, pkcs12: pkcs12, alpns: allAlpns, client: false, + settings: options.serverSettings ) let clientAlpn = alpns[options.role]! let clientConfiguration = try QuicConfiguration( - registration: registration, pkcs12: pkcs12, alpns: [clientAlpn], client: true, settings: options.clientSettings + registration: registration, pkcs12: pkcs12, alpns: [clientAlpn], client: true, + settings: options.clientSettings ) publicKey = options.secretKey.publicKey.data.data @@ -104,11 +106,14 @@ public final class Peer: Sendable { alpns: allAlpns ) - logger.debug("Peer initialized", metadata: [ - "listenAddress": "\(options.listenAddress)", - "role": "\(options.role)", - "publicKey": "\(options.secretKey.publicKey.data.toHexString())", - ]) + logger.debug( + "Peer initialized", + metadata: [ + "listenAddress": "\(options.listenAddress)", + "role": "\(options.role)", + "publicKey": "\(options.secretKey.publicKey.data.toHexString())", + ] + ) } public func listenAddress() throws -> NetAddr { @@ -120,30 +125,33 @@ public final class Peer: Sendable { let conn = impl.connections.read { connections in connections.byAddr[address] } - return try conn ?? impl.connections.write { connections in - if let curr = connections.byAddr[address] { - return curr - } - - logger.debug("connecting to peer", metadata: ["address": "\(address)", "role": "\(role)"]) + return try conn + ?? impl.connections.write { connections in + if let curr = connections.byAddr[address] { + return curr + } - let quicConn = try QuicConnection( - handler: PeerEventHandler(self.impl), - registration: self.impl.clientConfiguration.registration, - configuration: self.impl.clientConfiguration - ) - try quicConn.connect(to: address) - let conn = Connection( - quicConn, - impl: self.impl, - role: role, - remoteAddress: address, - initiatedByLocal: true - ) - connections.byAddr[address] = conn - connections.byId[conn.id] = conn - return conn - } + logger.debug( + "connecting to peer", metadata: ["address": "\(address)", "role": "\(role)"] + ) + + let quicConn = try QuicConnection( + handler: PeerEventHandler(self.impl), + registration: self.impl.clientConfiguration.registration, + configuration: self.impl.clientConfiguration + ) + try quicConn.connect(to: address) + let conn = Connection( + quicConn, + impl: self.impl, + role: role, + remoteAddress: address, + initiatedByLocal: true + ) + connections.byAddr[address] = conn + connections.byId[conn.id] = conn + return conn + } } public func getConnection(publicKey: Data) -> Connection? { @@ -152,7 +160,9 @@ public final class Peer: Sendable { } } - public func broadcast(kind: Handler.PresistentHandler.StreamKind, message: Handler.PresistentHandler.Message) { + public func broadcast( + kind: Handler.PresistentHandler.StreamKind, message: Handler.PresistentHandler.Message + ) { let connections = impl.connections.read { connections in connections.byId.values } @@ -168,11 +178,14 @@ public final class Peer: Sendable { case .success: break case let .failure(error): - impl.logger.warning("Failed to send message", metadata: [ - "connectionId": "\(connection.id)", - "kind": "\(kind)", - "error": "\(error)", - ]) + impl.logger.warning( + "Failed to send message", + metadata: [ + "connectionId": "\(connection.id)", + "kind": "\(kind)", + "error": "\(error)", + ] + ) } } } @@ -259,6 +272,31 @@ final class PeerImpl: Sendable { } } + // TODO: Add reconnection attempts & Apply exponential backoff delay + func reconnect(to address: NetAddr, role: PeerRole) throws { + logger.debug("reconnecting", metadata: ["address": "\(address)", "role": "\(role)"]) + Task { + try await Task.sleep(for: .microseconds(1000)) + try connections.write { connections in + let quicConn = try QuicConnection( + handler: PeerEventHandler(self), + registration: clientConfiguration.registration, + configuration: clientConfiguration + ) + try quicConn.connect(to: address) + let conn = Connection( + quicConn, + impl: self, + role: role, + remoteAddress: address, + initiatedByLocal: true + ) + connections.byAddr[address] = conn + connections.byId[conn.id] = conn + } + } + } + func addStream(_ stream: Stream) { streams.write { streams in if streams[stream.id] != nil { @@ -280,11 +318,15 @@ private struct PeerEventHandler: QuicEventHandler { self.impl = impl } - func newConnection(_: QuicListener, connection: QuicConnection, info: ConnectionInfo) -> QuicStatus { + func newConnection(_: QuicListener, connection: QuicConnection, info: ConnectionInfo) + -> QuicStatus + { let addr = info.remoteAddress let role = impl.alpnLookup[info.negotiatedAlpn] guard let role else { - logger.warning("unknown alpn: \(String(data: info.negotiatedAlpn, encoding: .utf8) ?? info.negotiatedAlpn.toDebugHexString())") + logger.warning( + "unknown alpn: \(String(data: info.negotiatedAlpn, encoding: .utf8) ?? info.negotiatedAlpn.toDebugHexString())" + ) return .code(.alpnNegFailure) } logger.debug("new connection: \(addr) role: \(role)") @@ -303,20 +345,28 @@ private struct PeerEventHandler: QuicEventHandler { connections.byId[connection.id] } guard let conn else { - logger.warning("Attempt to open but connection is absent", metadata: ["connectionId": "\(connection.id)"]) + logger.warning( + "Attempt to open but connection is absent", + metadata: ["connectionId": "\(connection.id)"] + ) return .code(.connectionRefused) } do { let (publicKey, alternativeName) = try parseCertificate(data: certificate, type: .x509) - logger.trace("Certificate parsed", metadata: [ - "connectionId": "\(connection.id)", - "publicKey": "\(publicKey.toHexString())", - "alternativeName": "\(alternativeName)", - ]) + logger.trace( + "Certificate parsed", + metadata: [ + "connectionId": "\(connection.id)", + "publicKey": "\(publicKey.toHexString())", + "alternativeName": "\(alternativeName)", + ] + ) if publicKey == impl.publicKey { // Self connection detected - logger.trace("Rejecting self-connection", metadata: ["connectionId": "\(connection.id)"]) + logger.trace( + "Rejecting self-connection", metadata: ["connectionId": "\(connection.id)"] + ) return .code(.connectionRefused) } if alternativeName != generateSubjectAlternativeName(pubkey: publicKey) { @@ -333,9 +383,13 @@ private struct PeerEventHandler: QuicEventHandler { try conn.opened(publicKey: publicKey) return .code(.success) } else { - logger.debug("Rejecting duplicate connection by rule", metadata: [ - "connectionId": "\(connection.id)", "publicKey": "\(publicKey.toHexString())", - ]) + logger.debug( + "Rejecting duplicate connection by rule", + metadata: [ + "connectionId": "\(connection.id)", + "publicKey": "\(publicKey.toHexString())", + ] + ) return .code(.connectionRefused) } } else { @@ -345,7 +399,10 @@ private struct PeerEventHandler: QuicEventHandler { } } } catch { - logger.warning("Certificate parsing failed", metadata: ["connectionId": "\(connection.id)", "error": "\(error)"]) + logger.warning( + "Certificate parsing failed", + metadata: ["connectionId": "\(connection.id)", "error": "\(error)"] + ) return .code(.badCert) } } @@ -355,7 +412,9 @@ private struct PeerEventHandler: QuicEventHandler { connections.byId[connection.id] } guard let conn else { - logger.warning("Connected but connection is gone?", metadata: ["connectionId": "\(connection.id)"]) + logger.warning( + "Connected but connection is gone?", metadata: ["connectionId": "\(connection.id)"] + ) return } @@ -377,6 +436,10 @@ private struct PeerEventHandler: QuicEventHandler { func shutdownComplete(_ connection: QuicConnection) { logger.info("connection shutdown complete", metadata: ["connectionId": "\(connection.id)"]) +// let conn = impl.connections.read { connections in +// connections.byId[connection.id] +// } +// let needReconnect = conn.needReconnect impl.connections.write { connections in if let conn = connections.byId[connection.id] { let needReconnect = conn.needReconnect @@ -389,14 +452,11 @@ private struct PeerEventHandler: QuicEventHandler { connections.byAddr.removeValue(forKey: conn.remoteAddress) if needReconnect { do { - try conn.reconnecting() - logger.info("Reconnection complete", metadata: ["connectionId": "\(connection.id)"]) + try impl.reconnect(to: conn.remoteAddress, role: conn.role) } catch { - logger.warning( - "\(connection.id) Failed to reconnect", - metadata: ["error": "\(error)"] - ) + logger.error("reconnect failed", metadata: ["error": "\(error)"]) } + } else { logger.info("Connection closed", metadata: ["connectionId": "\(connection.id)"]) } @@ -405,7 +465,10 @@ private struct PeerEventHandler: QuicEventHandler { } func shutdownInitiated(_ connection: QuicConnection, reason: ConnectionCloseReason) { - logger.info("Shutdown initiated", metadata: ["connectionId": "\(connection.id)", "reason": "\(reason)"]) + logger.info( + "Shutdown initiated", + metadata: ["connectionId": "\(connection.id)", "reason": "\(reason)"] + ) if shouldReconnect(basedOn: reason) { logger.info("shouldReconnect true", metadata: ["connectionId": "\(connection.id)"]) impl.connections.write { connections in @@ -471,10 +534,14 @@ private struct PeerEventHandler: QuicEventHandler { if let connection { connection.streamClosed(stream: stream, abort: !status.isSucceeded) } else { - logger.warning("Stream closed but connection is gone?", metadata: ["streamId": "\(stream.id)"]) + logger.warning( + "Stream closed but connection is gone?", metadata: ["streamId": "\(stream.id)"] + ) } } else { - logger.warning("Stream closed but stream is gone?", metadata: ["streamId": "\(quicStream.id)"]) + logger.warning( + "Stream closed but stream is gone?", metadata: ["streamId": "\(quicStream.id)"] + ) } } } diff --git a/Networking/Tests/NetworkingTests/PeerTests.swift b/Networking/Tests/NetworkingTests/PeerTests.swift index 2950e7cd..045d6a7a 100644 --- a/Networking/Tests/NetworkingTests/PeerTests.swift +++ b/Networking/Tests/NetworkingTests/PeerTests.swift @@ -254,7 +254,7 @@ struct PeerTests { } @Test - func peerReconnect() async throws { + func connReconnect() async throws { let handler2 = MockPresentStreamHandler() let messageData = Data("Post-recovery message".utf8)