Skip to content

Commit

Permalink
update reconnect
Browse files Browse the repository at this point in the history
  • Loading branch information
MacOMNI committed Oct 31, 2024
1 parent fd6d937 commit 2762b14
Show file tree
Hide file tree
Showing 3 changed files with 127 additions and 66 deletions.
6 changes: 0 additions & 6 deletions Networking/Sources/Networking/Connection.swift
Original file line number Diff line number Diff line change
Expand Up @@ -107,12 +107,6 @@ public final class Connection<Handler: StreamHandler>: Sendable, ConnectionInfoP
}
}

// TODO: Add reconnection attempts & Apply exponential backoff delay
func reconnecting() throws {
let addr = try connection.getRemoteAddress()
print("connection.getRemoteAddress() \(addr)")
}

public var isClosed: Bool {
state.read {
switch $0 {
Expand Down
185 changes: 126 additions & 59 deletions Networking/Sources/Networking/Peer.swift
Original file line number Diff line number Diff line change
Expand Up @@ -75,12 +75,14 @@ public final class Peer<Handler: StreamHandler>: Sendable {

let registration = try QuicRegistration()
let serverConfiguration = try QuicConfiguration(
registration: registration, pkcs12: pkcs12, alpns: allAlpns, client: false, settings: options.serverSettings
registration: registration, pkcs12: pkcs12, alpns: allAlpns, client: false,
settings: options.serverSettings
)

let clientAlpn = alpns[options.role]!
let clientConfiguration = try QuicConfiguration(
registration: registration, pkcs12: pkcs12, alpns: [clientAlpn], client: true, settings: options.clientSettings
registration: registration, pkcs12: pkcs12, alpns: [clientAlpn], client: true,
settings: options.clientSettings
)

publicKey = options.secretKey.publicKey.data.data
Expand All @@ -104,11 +106,14 @@ public final class Peer<Handler: StreamHandler>: Sendable {
alpns: allAlpns
)

logger.debug("Peer initialized", metadata: [
"listenAddress": "\(options.listenAddress)",
"role": "\(options.role)",
"publicKey": "\(options.secretKey.publicKey.data.toHexString())",
])
logger.debug(
"Peer initialized",
metadata: [
"listenAddress": "\(options.listenAddress)",
"role": "\(options.role)",
"publicKey": "\(options.secretKey.publicKey.data.toHexString())",
]
)
}

public func listenAddress() throws -> NetAddr {
Expand All @@ -120,30 +125,33 @@ public final class Peer<Handler: StreamHandler>: Sendable {
let conn = impl.connections.read { connections in
connections.byAddr[address]
}
return try conn ?? impl.connections.write { connections in
if let curr = connections.byAddr[address] {
return curr
}

logger.debug("connecting to peer", metadata: ["address": "\(address)", "role": "\(role)"])
return try conn
?? impl.connections.write { connections in
if let curr = connections.byAddr[address] {
return curr
}

let quicConn = try QuicConnection(
handler: PeerEventHandler(self.impl),
registration: self.impl.clientConfiguration.registration,
configuration: self.impl.clientConfiguration
)
try quicConn.connect(to: address)
let conn = Connection(
quicConn,
impl: self.impl,
role: role,
remoteAddress: address,
initiatedByLocal: true
)
connections.byAddr[address] = conn
connections.byId[conn.id] = conn
return conn
}
logger.debug(
"connecting to peer", metadata: ["address": "\(address)", "role": "\(role)"]
)

let quicConn = try QuicConnection(
handler: PeerEventHandler(self.impl),
registration: self.impl.clientConfiguration.registration,
configuration: self.impl.clientConfiguration
)
try quicConn.connect(to: address)
let conn = Connection(
quicConn,
impl: self.impl,
role: role,
remoteAddress: address,
initiatedByLocal: true
)
connections.byAddr[address] = conn
connections.byId[conn.id] = conn
return conn
}
}

public func getConnection(publicKey: Data) -> Connection<Handler>? {
Expand All @@ -152,7 +160,9 @@ public final class Peer<Handler: StreamHandler>: Sendable {
}
}

public func broadcast(kind: Handler.PresistentHandler.StreamKind, message: Handler.PresistentHandler.Message) {
public func broadcast(
kind: Handler.PresistentHandler.StreamKind, message: Handler.PresistentHandler.Message
) {
let connections = impl.connections.read { connections in
connections.byId.values
}
Expand All @@ -168,11 +178,14 @@ public final class Peer<Handler: StreamHandler>: Sendable {
case .success:
break
case let .failure(error):
impl.logger.warning("Failed to send message", metadata: [
"connectionId": "\(connection.id)",
"kind": "\(kind)",
"error": "\(error)",
])
impl.logger.warning(
"Failed to send message",
metadata: [
"connectionId": "\(connection.id)",
"kind": "\(kind)",
"error": "\(error)",
]
)
}
}
}
Expand Down Expand Up @@ -259,6 +272,31 @@ final class PeerImpl<Handler: StreamHandler>: Sendable {
}
}

// TODO: Add reconnection attempts & Apply exponential backoff delay
func reconnect(to address: NetAddr, role: PeerRole) throws {
logger.debug("reconnecting", metadata: ["address": "\(address)", "role": "\(role)"])
Task {
try await Task.sleep(for: .microseconds(1000))
try connections.write { connections in
let quicConn = try QuicConnection(
handler: PeerEventHandler(self),
registration: clientConfiguration.registration,
configuration: clientConfiguration
)
try quicConn.connect(to: address)
let conn = Connection(
quicConn,
impl: self,
role: role,
remoteAddress: address,
initiatedByLocal: true
)
connections.byAddr[address] = conn
connections.byId[conn.id] = conn
}
}
}

func addStream(_ stream: Stream<Handler>) {
streams.write { streams in
if streams[stream.id] != nil {
Expand All @@ -280,11 +318,15 @@ private struct PeerEventHandler<Handler: StreamHandler>: QuicEventHandler {
self.impl = impl
}

func newConnection(_: QuicListener, connection: QuicConnection, info: ConnectionInfo) -> QuicStatus {
func newConnection(_: QuicListener, connection: QuicConnection, info: ConnectionInfo)
-> QuicStatus
{
let addr = info.remoteAddress
let role = impl.alpnLookup[info.negotiatedAlpn]
guard let role else {
logger.warning("unknown alpn: \(String(data: info.negotiatedAlpn, encoding: .utf8) ?? info.negotiatedAlpn.toDebugHexString())")
logger.warning(
"unknown alpn: \(String(data: info.negotiatedAlpn, encoding: .utf8) ?? info.negotiatedAlpn.toDebugHexString())"
)
return .code(.alpnNegFailure)
}
logger.debug("new connection: \(addr) role: \(role)")
Expand All @@ -303,20 +345,28 @@ private struct PeerEventHandler<Handler: StreamHandler>: QuicEventHandler {
connections.byId[connection.id]
}
guard let conn else {
logger.warning("Attempt to open but connection is absent", metadata: ["connectionId": "\(connection.id)"])
logger.warning(
"Attempt to open but connection is absent",
metadata: ["connectionId": "\(connection.id)"]
)
return .code(.connectionRefused)
}

do {
let (publicKey, alternativeName) = try parseCertificate(data: certificate, type: .x509)
logger.trace("Certificate parsed", metadata: [
"connectionId": "\(connection.id)",
"publicKey": "\(publicKey.toHexString())",
"alternativeName": "\(alternativeName)",
])
logger.trace(
"Certificate parsed",
metadata: [
"connectionId": "\(connection.id)",
"publicKey": "\(publicKey.toHexString())",
"alternativeName": "\(alternativeName)",
]
)
if publicKey == impl.publicKey {
// Self connection detected
logger.trace("Rejecting self-connection", metadata: ["connectionId": "\(connection.id)"])
logger.trace(
"Rejecting self-connection", metadata: ["connectionId": "\(connection.id)"]
)
return .code(.connectionRefused)
}
if alternativeName != generateSubjectAlternativeName(pubkey: publicKey) {
Expand All @@ -333,9 +383,13 @@ private struct PeerEventHandler<Handler: StreamHandler>: QuicEventHandler {
try conn.opened(publicKey: publicKey)
return .code(.success)
} else {
logger.debug("Rejecting duplicate connection by rule", metadata: [
"connectionId": "\(connection.id)", "publicKey": "\(publicKey.toHexString())",
])
logger.debug(
"Rejecting duplicate connection by rule",
metadata: [
"connectionId": "\(connection.id)",
"publicKey": "\(publicKey.toHexString())",
]
)
return .code(.connectionRefused)
}
} else {
Expand All @@ -345,7 +399,10 @@ private struct PeerEventHandler<Handler: StreamHandler>: QuicEventHandler {
}
}
} catch {
logger.warning("Certificate parsing failed", metadata: ["connectionId": "\(connection.id)", "error": "\(error)"])
logger.warning(
"Certificate parsing failed",
metadata: ["connectionId": "\(connection.id)", "error": "\(error)"]
)
return .code(.badCert)
}
}
Expand All @@ -355,7 +412,9 @@ private struct PeerEventHandler<Handler: StreamHandler>: QuicEventHandler {
connections.byId[connection.id]
}
guard let conn else {
logger.warning("Connected but connection is gone?", metadata: ["connectionId": "\(connection.id)"])
logger.warning(
"Connected but connection is gone?", metadata: ["connectionId": "\(connection.id)"]
)
return
}

Expand All @@ -377,6 +436,10 @@ private struct PeerEventHandler<Handler: StreamHandler>: QuicEventHandler {

func shutdownComplete(_ connection: QuicConnection) {
logger.info("connection shutdown complete", metadata: ["connectionId": "\(connection.id)"])
// let conn = impl.connections.read { connections in
// connections.byId[connection.id]
// }
// let needReconnect = conn.needReconnect
impl.connections.write { connections in
if let conn = connections.byId[connection.id] {
let needReconnect = conn.needReconnect
Expand All @@ -389,14 +452,11 @@ private struct PeerEventHandler<Handler: StreamHandler>: QuicEventHandler {
connections.byAddr.removeValue(forKey: conn.remoteAddress)
if needReconnect {
do {
try conn.reconnecting()
logger.info("Reconnection complete", metadata: ["connectionId": "\(connection.id)"])
try impl.reconnect(to: conn.remoteAddress, role: conn.role)
} catch {
logger.warning(
"\(connection.id) Failed to reconnect",
metadata: ["error": "\(error)"]
)
logger.error("reconnect failed", metadata: ["error": "\(error)"])
}

} else {
logger.info("Connection closed", metadata: ["connectionId": "\(connection.id)"])
}
Expand All @@ -405,7 +465,10 @@ private struct PeerEventHandler<Handler: StreamHandler>: QuicEventHandler {
}

func shutdownInitiated(_ connection: QuicConnection, reason: ConnectionCloseReason) {
logger.info("Shutdown initiated", metadata: ["connectionId": "\(connection.id)", "reason": "\(reason)"])
logger.info(
"Shutdown initiated",
metadata: ["connectionId": "\(connection.id)", "reason": "\(reason)"]
)
if shouldReconnect(basedOn: reason) {
logger.info("shouldReconnect true", metadata: ["connectionId": "\(connection.id)"])
impl.connections.write { connections in
Expand Down Expand Up @@ -471,10 +534,14 @@ private struct PeerEventHandler<Handler: StreamHandler>: QuicEventHandler {
if let connection {
connection.streamClosed(stream: stream, abort: !status.isSucceeded)
} else {
logger.warning("Stream closed but connection is gone?", metadata: ["streamId": "\(stream.id)"])
logger.warning(
"Stream closed but connection is gone?", metadata: ["streamId": "\(stream.id)"]
)
}
} else {
logger.warning("Stream closed but stream is gone?", metadata: ["streamId": "\(quicStream.id)"])
logger.warning(
"Stream closed but stream is gone?", metadata: ["streamId": "\(quicStream.id)"]
)
}
}
}
2 changes: 1 addition & 1 deletion Networking/Tests/NetworkingTests/PeerTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,7 @@ struct PeerTests {
}

@Test
func peerReconnect() async throws {
func connReconnect() async throws {
let handler2 = MockPresentStreamHandler()
let messageData = Data("Post-recovery message".utf8)

Expand Down

0 comments on commit 2762b14

Please sign in to comment.