Skip to content

Commit

Permalink
update more test
Browse files Browse the repository at this point in the history
  • Loading branch information
MacOMNI committed Oct 29, 2024
1 parent cfc280c commit 3d5532c
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 50 deletions.
2 changes: 1 addition & 1 deletion Networking/Sources/MsQuicSwift/QuicConnection.swift
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ private class ConnectionHandle {
fileprivate func callbackHandler(event: UnsafePointer<QUIC_CONNECTION_EVENT>) -> QuicStatus {
switch event.pointee.Type {
case QUIC_CONNECTION_EVENT_PEER_CERTIFICATE_RECEIVED:
logger.trace("Peer certificate received")
logger.debug("Peer certificate received")
if let connection {
let evtData = event.pointee.PEER_CERTIFICATE_RECEIVED
let data: Data?
Expand Down
44 changes: 5 additions & 39 deletions Networking/Sources/Networking/Peer.swift
Original file line number Diff line number Diff line change
Expand Up @@ -115,41 +115,6 @@ public final class Peer<Handler: StreamHandler>: Sendable {
try listener.listenAddress()
}

public func reconnect(to address: NetAddr, role: PeerRole) throws -> Connection<Handler> {
let conn = impl.connections.read { connections in
connections.byAddr[address]
}
if let conn {
print("conn \(conn.id.name)")
} else {
print("reconnect")
}
return try impl.connections.write { connections in
if let curr = connections.byAddr[address] {
return curr
}

logger.info("connecting to peer", metadata: ["address": "\(address)", "role": "\(role)"])

let quicConn = try QuicConnection(
handler: PeerEventHandler(self.impl),
registration: self.impl.clientConfiguration.registration,
configuration: self.impl.clientConfiguration
)
try quicConn.connect(to: address)
let conn = Connection(
quicConn,
impl: self.impl,
role: role,
remoteAddress: address,
initiatedByLocal: true
)
connections.byAddr[address] = conn
connections.byId[conn.id] = conn
return conn
}
}

// TODO: see if we can remove the role parameter
public func connect(to address: NetAddr, role: PeerRole) throws -> Connection<Handler> {
let conn = impl.connections.read { connections in
Expand Down Expand Up @@ -411,15 +376,16 @@ private struct PeerEventHandler<Handler: StreamHandler>: QuicEventHandler {
}

func shutdownComplete(_ connection: QuicConnection) {
logger.info("connection shutdown complete", metadata: ["connectionId": "\(connection.id)"])
logger.debug("connection shutdown complete", metadata: ["connectionId": "\(connection.id)"])
impl.connections.write { connections in
if let conn = connections.byId[connection.id] {
conn.closed()
connections.byId.removeValue(forKey: connection.id)
connections.byAddr.removeValue(forKey: conn.remoteAddress)
// remove publickey first,func closed will change state to closed
if let publicKey = conn.publicKey {
connections.byPublicKey.removeValue(forKey: publicKey)
}
conn.closed()
connections.byId.removeValue(forKey: connection.id)
connections.byAddr.removeValue(forKey: conn.remoteAddress)
}
}
}
Expand Down
41 changes: 31 additions & 10 deletions Networking/Tests/NetworkingTests/PeerTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,6 @@ struct PeerTests {

@Test
func peerFailureRecovery() async throws {
let handler1 = MockPresentStreamHandler()
let handler2 = MockPresentStreamHandler()
let messageData = Data("Post-recovery message".utf8)

Expand All @@ -222,7 +221,7 @@ struct PeerTests {
listenAddress: NetAddr(ipAddress: "127.0.0.1", port: 185)!,
genesisHeader: Data32(),
secretKey: Ed25519.SecretKey(from: Data32.random()),
presistentStreamHandler: handler1,
presistentStreamHandler: MockPresentStreamHandler(),
ephemeralStreamHandler: MockEphemeralStreamHandler(),
serverSettings: .defaultSettings,
clientSettings: .defaultSettings
Expand All @@ -242,6 +241,19 @@ struct PeerTests {
)
)

let peer3 = try Peer(
options: PeerOptions<MockStreamHandler>(
role: .validator,
listenAddress: NetAddr(ipAddress: "127.0.0.1", port: 187)!,
genesisHeader: Data32(),
secretKey: Ed25519.SecretKey(from: Data32.random()),
presistentStreamHandler: handler2,
ephemeralStreamHandler: MockEphemeralStreamHandler(),
serverSettings: .defaultSettings,
clientSettings: .defaultSettings
)
)

try? await Task.sleep(for: .milliseconds(50))

let connection = try peer1.connect(
Expand All @@ -252,31 +264,40 @@ struct PeerTests {
let receivedData = try await connection.request(
MockRequest(kind: .typeA, data: messageData)
)

#expect(receivedData == messageData + Data(" response".utf8))
try? await Task.sleep(for: .milliseconds(50))
// Simulate a peer failure by disconnecting one peer
connection.close(abort: true)
// Wait to simulate downtime
try? await Task.sleep(for: .milliseconds(10000))
try? await Task.sleep(for: .milliseconds(200))
// check the peer is usable & connect to another peer
let connection2 = try peer1.connect(
to: peer3.listenAddress(),
role: .validator
)
try? await Task.sleep(for: .milliseconds(50))
let receivedData2 = try await connection2.request(
MockRequest(kind: .typeA, data: messageData)
)
try? await Task.sleep(for: .milliseconds(50))
#expect(receivedData2 == messageData + Data(" response".utf8))
// Reconnect the failing peer
let reconnect = try peer1.reconnect(
let reconnection = try peer1.connect(
to: peer2.listenAddress(),
role: .validator
)
try? await Task.sleep(for: .milliseconds(50))

let rereceivedData = try await reconnect.request(
let recoverData = try await reconnection.request(
MockRequest(kind: .typeA, data: messageData)
)
try? await Task.sleep(for: .milliseconds(50))
#expect(rereceivedData == messageData + Data(" response".utf8))
#expect(recoverData == messageData + Data(" response".utf8))
peer1.broadcast(
kind: .uniqueB, message: .init(kind: .uniqueB, data: messageData)
kind: .uniqueC, message: .init(kind: .uniqueC, data: messageData)
)
// Verify last received data
try? await Task.sleep(for: .milliseconds(50))
await #expect(handler2.lastReceivedData == messageData)
try? await Task.sleep(for: .milliseconds(50))
}

@Test
Expand Down

0 comments on commit 3d5532c

Please sign in to comment.