Skip to content

Commit

Permalink
update connection
Browse files Browse the repository at this point in the history
  • Loading branch information
MacOMNI committed Oct 31, 2024
1 parent c1860d8 commit fd6d937
Show file tree
Hide file tree
Showing 3 changed files with 51 additions and 19 deletions.
10 changes: 5 additions & 5 deletions Networking/Sources/Networking/Connection.swift
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,8 @@ public final class Connection<Handler: StreamHandler>: Sendable, ConnectionInfoP
publicKey
case .closed:
nil
case .reconnect:
nil
case let .reconnect(publicKey):
publicKey
}
}
}
Expand Down Expand Up @@ -101,9 +101,9 @@ public final class Connection<Handler: StreamHandler>: Sendable, ConnectionInfoP
for continuation in continuations {
continuation.resume(throwing: ConnectionError.reconnect)
}
state = .connected(publicKey: publicKey)
state = .reconnect(publicKey: publicKey)
}
state = .connected(publicKey: publicKey)
state = .reconnect(publicKey: publicKey)
}
}

Expand All @@ -128,7 +128,7 @@ public final class Connection<Handler: StreamHandler>: Sendable, ConnectionInfoP
}
}

public var isReconnect: Bool {
public var needReconnect: Bool {
state.read {
switch $0 {
case .reconnect:
Expand Down
3 changes: 2 additions & 1 deletion Networking/Sources/Networking/Peer.swift
Original file line number Diff line number Diff line change
Expand Up @@ -379,14 +379,15 @@ private struct PeerEventHandler<Handler: StreamHandler>: QuicEventHandler {
logger.info("connection shutdown complete", metadata: ["connectionId": "\(connection.id)"])
impl.connections.write { connections in
if let conn = connections.byId[connection.id] {
let needReconnect = conn.needReconnect
// remove publickey first,func closed will change state to closed
if let publicKey = conn.publicKey {
connections.byPublicKey.removeValue(forKey: publicKey)
}
conn.closed()
connections.byId.removeValue(forKey: connection.id)
connections.byAddr.removeValue(forKey: conn.remoteAddress)
if conn.isReconnect {
if needReconnect {
do {
try conn.reconnecting()
logger.info("Reconnection complete", metadata: ["connectionId": "\(connection.id)"])
Expand Down
57 changes: 44 additions & 13 deletions Networking/Tests/NetworkingTests/PeerTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,7 @@ struct PeerTests {
}

@Test
func peerFailureRecovery() async throws {
func peerReconnect() async throws {
let handler2 = MockPresentStreamHandler()
let messageData = Data("Post-recovery message".utf8)

Expand Down Expand Up @@ -284,7 +284,49 @@ struct PeerTests {
)
)

let peer3 = try Peer(
try? await Task.sleep(for: .milliseconds(100))

let connection = try peer1.connect(
to: peer2.listenAddress(), role: .validator
)
try? await Task.sleep(for: .milliseconds(100))

let receivedData = try await connection.request(
MockRequest(kind: .typeA, data: messageData)
)

#expect(receivedData == messageData + Data(" response".utf8))
try? await Task.sleep(for: .milliseconds(100))
// Simulate a peer failure by disconnecting one peer
connection.close(abort: true)
// Wait to simulate downtime
try? await Task.sleep(for: .milliseconds(200))
peer1.broadcast(
kind: .uniqueC, message: .init(kind: .uniqueC, data: messageData)
)
try? await Task.sleep(for: .milliseconds(1000))
await #expect(handler2.lastReceivedData == messageData)
}

@Test
func peerFailureRecovery() async throws {
let handler2 = MockPresentStreamHandler()
let messageData = Data("Post-recovery message".utf8)

let peer1 = try Peer(
options: PeerOptions<MockStreamHandler>(
role: .validator,
listenAddress: NetAddr(ipAddress: "127.0.0.1", port: 0)!,
genesisHeader: Data32(),
secretKey: Ed25519.SecretKey(from: Data32.random()),
presistentStreamHandler: MockPresentStreamHandler(),
ephemeralStreamHandler: MockEphemeralStreamHandler(),
serverSettings: .defaultSettings,
clientSettings: .defaultSettings
)
)

let peer2 = try Peer(
options: PeerOptions<MockStreamHandler>(
role: .validator,
listenAddress: NetAddr(ipAddress: "127.0.0.1", port: 0)!,
Expand Down Expand Up @@ -314,17 +356,6 @@ struct PeerTests {
connection.close(abort: true)
// Wait to simulate downtime
try? await Task.sleep(for: .milliseconds(200))
// check the peer is usable & connect to another peer
let connection2 = try peer1.connect(
to: peer3.listenAddress(),
role: .validator
)
try? await Task.sleep(for: .milliseconds(50))
let receivedData2 = try await connection2.request(
MockRequest(kind: .typeA, data: messageData)
)
try? await Task.sleep(for: .milliseconds(100))
#expect(receivedData2 == messageData + Data(" response".utf8))
// Reconnect the failing peer
let reconnection = try peer1.connect(
to: peer2.listenAddress(),
Expand Down

0 comments on commit fd6d937

Please sign in to comment.