Skip to content

Commit

Permalink
update p2p reconnect & test
Browse files Browse the repository at this point in the history
  • Loading branch information
MacOMNI committed Nov 1, 2024
1 parent 2762b14 commit f7e17b6
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 14 deletions.
15 changes: 4 additions & 11 deletions Networking/Sources/Networking/Peer.swift
Original file line number Diff line number Diff line change
Expand Up @@ -435,15 +435,10 @@ private struct PeerEventHandler<Handler: StreamHandler>: QuicEventHandler {
}

func shutdownComplete(_ connection: QuicConnection) {
logger.info("connection shutdown complete", metadata: ["connectionId": "\(connection.id)"])
// let conn = impl.connections.read { connections in
// connections.byId[connection.id]
// }
// let needReconnect = conn.needReconnect
logger.debug("connection shutdown complete", metadata: ["connectionId": "\(connection.id)"])
impl.connections.write { connections in
if let conn = connections.byId[connection.id] {
let needReconnect = conn.needReconnect
// remove publickey first,func closed will change state to closed
if let publicKey = conn.publicKey {
connections.byPublicKey.removeValue(forKey: publicKey)
}
Expand All @@ -458,28 +453,26 @@ private struct PeerEventHandler<Handler: StreamHandler>: QuicEventHandler {
}

} else {
logger.info("Connection closed", metadata: ["connectionId": "\(connection.id)"])
logger.debug("Connection closed", metadata: ["connectionId": "\(connection.id)"])
}
}
}
}

func shutdownInitiated(_ connection: QuicConnection, reason: ConnectionCloseReason) {
logger.info(
logger.debug(
"Shutdown initiated",
metadata: ["connectionId": "\(connection.id)", "reason": "\(reason)"]
)
if shouldReconnect(basedOn: reason) {
logger.info("shouldReconnect true", metadata: ["connectionId": "\(connection.id)"])
logger.debug("shouldReconnect ", metadata: ["connectionId": "\(connection.id)"])
impl.connections.write { connections in
if let conn = connections.byId[connection.id] {
if let publicKey = conn.publicKey {
conn.reconnect(publicKey: publicKey)
}
}
}
} else {
logger.info("Closing", metadata: ["connectionId": "\(connection.id)"])
}
}

Expand Down
54 changes: 51 additions & 3 deletions Networking/Tests/NetworkingTests/PeerTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,7 @@ struct PeerTests {
}

@Test
func connReconnect() async throws {
func connectionNeedToReconnect() async throws {
let handler2 = MockPresentStreamHandler()
let messageData = Data("Post-recovery message".utf8)

Expand Down Expand Up @@ -297,7 +297,7 @@ struct PeerTests {

#expect(receivedData == messageData + Data(" response".utf8))
try? await Task.sleep(for: .milliseconds(100))
// Simulate a peer failure by disconnecting one peer
// Simulate abnormal shutdown of connections
connection.close(abort: true)
// Wait to simulate downtime
try? await Task.sleep(for: .milliseconds(200))
Expand All @@ -309,7 +309,55 @@ struct PeerTests {
}

@Test
func peerFailureRecovery() async throws {
func ConnectionNoNeedToReconnect() async throws {
let handler2 = MockPresentStreamHandler()
let messageData = Data("Post-recovery message".utf8)

let peer1 = try Peer(
options: PeerOptions<MockStreamHandler>(
role: .validator,
listenAddress: NetAddr(ipAddress: "127.0.0.1", port: 0)!,
genesisHeader: Data32(),
secretKey: Ed25519.SecretKey(from: Data32.random()),
presistentStreamHandler: MockPresentStreamHandler(),
ephemeralStreamHandler: MockEphemeralStreamHandler(),
serverSettings: .defaultSettings,
clientSettings: .defaultSettings
)
)

let peer2 = try Peer(
options: PeerOptions<MockStreamHandler>(
role: .validator,
listenAddress: NetAddr(ipAddress: "127.0.0.1", port: 0)!,
genesisHeader: Data32(),
secretKey: Ed25519.SecretKey(from: Data32.random()),
presistentStreamHandler: handler2,
ephemeralStreamHandler: MockEphemeralStreamHandler(),
serverSettings: .defaultSettings,
clientSettings: .defaultSettings
)
)

try? await Task.sleep(for: .milliseconds(100))

let connection = try peer1.connect(
to: peer2.listenAddress(), role: .validator
)
try? await Task.sleep(for: .milliseconds(100))
// Simulate regular shutdown of connections
connection.close(abort: false)
// Wait to simulate downtime
try? await Task.sleep(for: .milliseconds(200))
peer1.broadcast(
kind: .uniqueC, message: .init(kind: .uniqueC, data: messageData)
)
try? await Task.sleep(for: .milliseconds(1000))
await #expect(handler2.lastReceivedData == nil)
}

@Test
func connectionManualReconnect() async throws {
let handler2 = MockPresentStreamHandler()
let messageData = Data("Post-recovery message".utf8)

Expand Down

0 comments on commit f7e17b6

Please sign in to comment.