Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update reconnect attempt #216

Merged
merged 14 commits into from
Nov 5, 2024
86 changes: 63 additions & 23 deletions Networking/Sources/Networking/Peer.swift
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,27 @@ public enum PeerRole: Sendable, Hashable {
// case proxy // not yet specified
}

struct ReconnectState {
var attempt: Int
var delay: TimeInterval

init() {
attempt = 0
delay = 1
}

// Initializer with custom values
init(attempt: Int = 0, delay: TimeInterval = 1) {
self.attempt = attempt
self.delay = delay
}

mutating func applyBackoff() {
attempt += 1
delay *= 2
}
}

public struct PeerOptions<Handler: StreamHandler>: Sendable {
public var role: PeerRole
public var listenAddress: NetAddr
Expand Down Expand Up @@ -50,7 +71,7 @@ public struct PeerOptions<Handler: StreamHandler>: Sendable {
}
}

// TODO: reconnects, reopen UP stream, peer reputation system to ban peers not following the protocol
// TODO: reopen UP stream, peer reputation system to ban peers not following the protocol
public final class Peer<Handler: StreamHandler>: Sendable {
private let impl: PeerImpl<Handler>

Expand All @@ -61,7 +82,6 @@ public final class Peer<Handler: StreamHandler>: Sendable {
}

public let publicKey: Data

public init(options: PeerOptions<Handler>) throws {
let logger = Logger(label: "Peer".uniqueId)

Expand Down Expand Up @@ -214,7 +234,9 @@ final class PeerImpl<Handler: StreamHandler>: Sendable {

fileprivate let connections: ThreadSafeContainer<ConnectionStorage> = .init(.init())
fileprivate let streams: ThreadSafeContainer<[UniqueId: Stream<Handler>]> = .init([:])
fileprivate let reconnectStates: ThreadSafeContainer<[NetAddr: ReconnectState]> = .init([:])

let reconnectMaxRetryAttempts = 5
let presistentStreamHandler: Handler.PresistentHandler
let ephemeralStreamHandler: Handler.EphemeralHandler

Expand Down Expand Up @@ -271,29 +293,43 @@ final class PeerImpl<Handler: StreamHandler>: Sendable {
}
}

// TODO: Add reconnection attempts & Apply exponential backoff delay
func reconnect(to address: NetAddr, role: PeerRole) throws {
logger.debug("reconnecting", metadata: ["to address": "\(address)", "role": "\(role)"])
try connections.write { connections in
if connections.byAddr[address] != nil {
logger.warning("reconnecting to \(address) already connected")
return
let state = reconnectStates.read { reconnectStates in
reconnectStates[address] ?? .init()
}
if state.attempt < reconnectMaxRetryAttempts {
reconnectStates.write { reconnectStates in
if var state = reconnectStates[address] {
state.applyBackoff()
reconnectStates[address] = state
}
}
let quicConn = try QuicConnection(
handler: PeerEventHandler(self),
registration: clientConfiguration.registration,
configuration: clientConfiguration
)
try quicConn.connect(to: address)
let conn = Connection(
quicConn,
impl: self,
role: role,
remoteAddress: address,
initiatedByLocal: true
)
connections.byAddr[address] = conn
connections.byId[conn.id] = conn
Task {
try await Task.sleep(for: .seconds(state.delay))
try connections.write { connections in
if connections.byAddr[address] != nil {
logger.warning("reconnecting to \(address) already connected")
return
}
let quicConn = try QuicConnection(
handler: PeerEventHandler(self),
registration: clientConfiguration.registration,
configuration: clientConfiguration
)
try quicConn.connect(to: address)
let conn = Connection(
quicConn,
impl: self,
role: role,
remoteAddress: address,
initiatedByLocal: true
)
connections.byAddr[address] = conn
connections.byId[conn.id] = conn
}
}
} else {
logger.warning("reconnect attempt exceeded max attempts")
}
}

Expand Down Expand Up @@ -417,6 +453,10 @@ private struct PeerEventHandler<Handler: StreamHandler>: QuicEventHandler {
)
return
}
// Check if the connection is already reconnected
impl.reconnectStates.write { reconnectStates in
reconnectStates[conn.remoteAddress] = nil
}

if conn.initiatedByLocal {
for kind in Handler.PresistentHandler.StreamKind.allCases {
Expand Down
17 changes: 4 additions & 13 deletions Networking/Tests/NetworkingTests/MockPeerEventTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -147,12 +147,8 @@ final class MockPeerEventTests {
configuration: clientConfiguration
)

// Attempt to connect
try clientConnection.connect(to: listenAddress)
let stream1 = try clientConnection.createStream()
try stream1.send(data: Data("test data 1".utf8))

try? await Task.sleep(for: .milliseconds(100))
try await Task.sleep(for: .milliseconds(100))
let (_, reason) = clientHandler.events.value.compactMap {
switch $0 {
case let .shutdownInitiated(connection, reason):
Expand Down Expand Up @@ -212,7 +208,7 @@ final class MockPeerEventTests {
let stream1 = try clientConnection.createStream()
try stream1.send(data: Data("test data 1".utf8))

try? await Task.sleep(for: .milliseconds(100))
try await Task.sleep(for: .milliseconds(100))
let (_, info) = serverHandler.events.value.compactMap {
switch $0 {
case let .newConnection(_, connection, info):
Expand Down Expand Up @@ -266,14 +262,9 @@ final class MockPeerEventTests {
registration: registration,
configuration: clientConfiguration
)

// Attempt to connect
try clientConnection.connect(to: listenAddress)
let stream1 = try clientConnection.createStream()
try stream1.send(data: Data("test data 1".utf8))

try? await Task.sleep(for: .milliseconds(100))
let (_, reason) = serverHandler.events.value.compactMap {
try await Task.sleep(for: .milliseconds(100))
let (_, reason) = clientHandler.events.value.compactMap {
switch $0 {
case let .shutdownInitiated(connection, reason):
(connection, reason) as (QuicConnection, ConnectionCloseReason)?
Expand Down
13 changes: 5 additions & 8 deletions Networking/Tests/NetworkingTests/PeerTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -297,12 +297,12 @@ struct PeerTests {
try? await Task.sleep(for: .milliseconds(100))
// Simulate abnormal shutdown of connections
connection.close(abort: true)
// Wait to simulate downtime
try? await Task.sleep(for: .milliseconds(200))
// Wait to simulate downtime & reconnected 3~5s
try? await Task.sleep(for: .milliseconds(3000))
peer1.broadcast(
kind: .uniqueC, message: .init(kind: .uniqueC, data: messageData)
)
try? await Task.sleep(for: .milliseconds(1000))
try await Task.sleep(for: .milliseconds(1000))
let lastReceivedData = await handler2.lastReceivedData
#expect(lastReceivedData == messageData)
}
Expand Down Expand Up @@ -693,12 +693,9 @@ struct PeerTests {
}

var connections = [Connection<MockStreamHandler>]()
for i in 0 ..< peers.count {
for i in 0 ..< peersCount {
let peer = peers[i]
for j in 0 ..< peers.count {
if i >= j {
continue
}
for j in i + 1 ..< peersCount {
let otherPeer = peers[j]
let conn = try peer.connect(
to: otherPeer.listenAddress(),
Expand Down