Skip to content

Commit

Permalink
update peer
Browse files Browse the repository at this point in the history
  • Loading branch information
MacOMNI committed Nov 1, 2024
1 parent f7e17b6 commit 7cc9383
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 20 deletions.
36 changes: 22 additions & 14 deletions Networking/Sources/Networking/Peer.swift
Original file line number Diff line number Diff line change
Expand Up @@ -274,10 +274,14 @@ final class PeerImpl<Handler: StreamHandler>: Sendable {

// TODO: Add reconnection attempts & Apply exponential backoff delay
func reconnect(to address: NetAddr, role: PeerRole) throws {
logger.debug("reconnecting", metadata: ["address": "\(address)", "role": "\(role)"])
Task {
try await Task.sleep(for: .microseconds(1000))
try await Task.sleep(for: .microseconds(2000))
logger.debug("reconnecting", metadata: ["to address": "\(address)", "role": "\(role)"])
try connections.write { connections in
if connections.byAddr[address] != nil {
logger.warning("reconnecting to \(address) already connected")
return
}
let quicConn = try QuicConnection(
handler: PeerEventHandler(self),
registration: clientConfiguration.registration,
Expand All @@ -293,6 +297,7 @@ final class PeerImpl<Handler: StreamHandler>: Sendable {
)
connections.byAddr[address] = conn
connections.byId[conn.id] = conn
logger.debug("reconnectted", metadata: ["address": "\(address)", "role": "\(role)"])
}
}
}
Expand Down Expand Up @@ -436,25 +441,27 @@ private struct PeerEventHandler<Handler: StreamHandler>: QuicEventHandler {

func shutdownComplete(_ connection: QuicConnection) {
logger.debug("connection shutdown complete", metadata: ["connectionId": "\(connection.id)"])
impl.connections.write { connections in
let conn = impl.connections.read { connections in
connections.byId[connection.id]
}
let needReconnect = impl.connections.write { connections in
if let conn = connections.byId[connection.id] {
let needReconnect = conn.needReconnect
if let publicKey = conn.publicKey {
connections.byPublicKey.removeValue(forKey: publicKey)
}
conn.closed()
connections.byId.removeValue(forKey: connection.id)
connections.byAddr.removeValue(forKey: conn.remoteAddress)
if needReconnect {
do {
try impl.reconnect(to: conn.remoteAddress, role: conn.role)
} catch {
logger.error("reconnect failed", metadata: ["error": "\(error)"])
}

} else {
logger.debug("Connection closed", metadata: ["connectionId": "\(connection.id)"])
}
conn.closed()
return needReconnect
}
return false
}
if needReconnect, let address = conn?.remoteAddress, let role = conn?.role {
do {
try impl.reconnect(to: address, role: role)
} catch {
logger.error("reconnect failed", metadata: ["error": "\(error)"])
}
}
}
Expand All @@ -469,6 +476,7 @@ private struct PeerEventHandler<Handler: StreamHandler>: QuicEventHandler {
impl.connections.write { connections in
if let conn = connections.byId[connection.id] {
if let publicKey = conn.publicKey {
connections.byPublicKey.removeValue(forKey: publicKey)
conn.reconnect(publicKey: publicKey)
}
}
Expand Down
15 changes: 9 additions & 6 deletions Networking/Tests/NetworkingTests/PeerTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,7 @@ struct PeerTests {
clientSettings: .defaultSettings
)
)

try print("peer1: \(peer1.listenAddress())")
let peer2 = try Peer(
options: PeerOptions<MockStreamHandler>(
role: .validator,
Expand All @@ -283,12 +283,13 @@ struct PeerTests {
clientSettings: .defaultSettings
)
)

try print("peer2: \(peer2.listenAddress())")
try? await Task.sleep(for: .milliseconds(100))

let connection = try peer1.connect(
to: peer2.listenAddress(), role: .validator
)
print("connection addr: \(connection.remoteAddress)")
try? await Task.sleep(for: .milliseconds(100))

let receivedData = try await connection.request(
Expand All @@ -305,7 +306,9 @@ struct PeerTests {
kind: .uniqueC, message: .init(kind: .uniqueC, data: messageData)
)
try? await Task.sleep(for: .milliseconds(1000))
await #expect(handler2.lastReceivedData == messageData)
let lastReceivedData = await handler2.lastReceivedData
print("lastReceivedData \(lastReceivedData?.toHexString() ?? "none")")
#expect(lastReceivedData == messageData)
}

@Test
Expand Down Expand Up @@ -401,7 +404,7 @@ struct PeerTests {
#expect(receivedData == messageData + Data(" response".utf8))
try? await Task.sleep(for: .milliseconds(100))
// Simulate a peer failure by disconnecting one peer
connection.close(abort: true)
connection.close(abort: false)
// Wait to simulate downtime
try? await Task.sleep(for: .milliseconds(200))
// Reconnect the failing peer
Expand All @@ -416,10 +419,10 @@ struct PeerTests {
try? await Task.sleep(for: .milliseconds(100))
#expect(recoverData == messageData + Data(" response".utf8))
peer1.broadcast(
kind: .uniqueC, message: .init(kind: .uniqueC, data: messageData)
kind: .uniqueC, message: .init(kind: .uniqueC, data: recoverData)
)
try? await Task.sleep(for: .milliseconds(500))
await #expect(handler2.lastReceivedData == messageData)
await #expect(handler2.lastReceivedData == recoverData)
}

@Test
Expand Down

0 comments on commit 7cc9383

Please sign in to comment.