Skip to content

Commit

Permalink
improve dedup connection logic
Browse files Browse the repository at this point in the history
  • Loading branch information
MacOMNI committed Oct 30, 2024
1 parent e2e9beb commit 33f39be
Show file tree
Hide file tree
Showing 2 changed files with 67 additions and 26 deletions.
48 changes: 24 additions & 24 deletions Networking/Sources/Networking/Peer.swift
Original file line number Diff line number Diff line change
Expand Up @@ -303,49 +303,49 @@ private struct PeerEventHandler<Handler: StreamHandler>: QuicEventHandler {
connections.byId[connection.id]
}
guard let conn else {
logger.warning("Trying to open connection but connection is gone?", metadata: ["connectionId": "\(connection.id)"])
logger.warning("Attempt to open but connection is absent", metadata: ["connectionId": "\(connection.id)"])
return .code(.connectionRefused)
}

do {
let (publicKey, alternativeName) = try parseCertificate(data: certificate, type: .x509)
logger.trace("Certificate parsed", metadata: [
"connectionId": "\(connection.id)",
"publicKey": "\(publicKey.toHexString())",
"alternativeName": "\(alternativeName)",
])
if alternativeName != generateSubjectAlternativeName(pubkey: publicKey) {
return .code(.badCert)
}

if publicKey == impl.publicKey {
// self connection
logger.trace("self connection rejected", metadata: [
"connectionId": "\(connection.id)",
"publicKey": "\(publicKey.toHexString())",
])
// Self connection detected
logger.trace("Rejecting self-connection", metadata: ["connectionId": "\(connection.id)"])
return .code(.connectionRefused)
}

if alternativeName != generateSubjectAlternativeName(pubkey: publicKey) {
return .code(.badCert)
}
// TODO: verify if it is current or next validator

// Check for an existing connection by public key
return try impl.connections.write { connections in
if connections.byPublicKey.keys.contains(publicKey) {
// duplicated connection
logger.debug("duplicated connection rejected", metadata: [
"connectionId": "\(connection.id)",
"publicKey": "\(publicKey.toHexString())",
])
// TODO: write a test for this
return .code(.connectionRefused)
// Deterministically decide based on public key comparison
if !publicKey.lexicographicallyPrecedes(impl.publicKey) {
connections.byPublicKey[publicKey] = conn
try conn.opened(publicKey: publicKey)
return .code(.success)
} else {
logger.debug("Rejecting duplicate connection by rule", metadata: [
"connectionId": "\(connection.id)", "publicKey": "\(publicKey.toHexString())",
])
return .code(.connectionRefused)
}
} else {
connections.byPublicKey[publicKey] = conn
try conn.opened(publicKey: publicKey)
return .code(.success)
}
connections.byPublicKey[publicKey] = conn
try conn.opened(publicKey: publicKey)
return .code(.success)
}
} catch {
logger.warning("Failed to parse certificate", metadata: [
"connectionId": "\(connection.id)",
"error": "\(error)"])
logger.warning("Certificate parsing failed", metadata: ["connectionId": "\(connection.id)", "error": "\(error)"])
return .code(.badCert)
}
}
Expand Down
45 changes: 43 additions & 2 deletions Networking/Tests/NetworkingTests/PeerTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,46 @@ struct PeerTests {
typealias EphemeralHandler = MockEphemeralStreamHandler
}

@Test
func concurrentPeerConnection() async throws {
let peer1 = try Peer(
options: PeerOptions<MockStreamHandler>(
role: .validator,
listenAddress: NetAddr(ipAddress: "127.0.0.1", port: 31)!,
genesisHeader: Data32(),
secretKey: Ed25519.SecretKey(from: Data32.random()),
presistentStreamHandler: MockPresentStreamHandler(),
ephemeralStreamHandler: MockEphemeralStreamHandler(),
serverSettings: .defaultSettings,
clientSettings: .defaultSettings
)
)
let peer2 = try Peer(
options: PeerOptions<MockStreamHandler>(
role: .validator,
listenAddress: NetAddr(ipAddress: "127.0.0.1", port: 32)!,
genesisHeader: Data32(),
secretKey: Ed25519.SecretKey(from: Data32.random()),
presistentStreamHandler: MockPresentStreamHandler(),
ephemeralStreamHandler: MockEphemeralStreamHandler(),
serverSettings: .defaultSettings,
clientSettings: .defaultSettings
)
)

let connection1 = try peer1.connect(to: peer2.listenAddress(), role: .validator)
let connection2 = try peer2.connect(to: peer1.listenAddress(), role: .validator)
try? await Task.sleep(for: .milliseconds(50))
if !connection1.isClosed {
let data = try await connection1.request(MockRequest(kind: .typeA, data: Data("hello world".utf8)))
#expect(data == Data("hello world response".utf8))
}
if !connection2.isClosed {
let data = try await connection2.request(MockRequest(kind: .typeA, data: Data("hello world".utf8)))
#expect(data == Data("hello world response".utf8))
}
}

@Test
func largeDataRequest() async throws {
let handler1 = MockPresentStreamHandler()
Expand Down Expand Up @@ -344,7 +384,7 @@ struct PeerTests {
kind: .uniqueB, message: .init(kind: .uniqueB, data: Data("I am jam".utf8))
)
// Verify last received data
try? await Task.sleep(for: .milliseconds(50))
try? await Task.sleep(for: .milliseconds(100))
await #expect(handler2.lastReceivedData == Data("hello world".utf8))
await #expect(handler1.lastReceivedData == Data("I am jam".utf8))
}
Expand All @@ -366,7 +406,7 @@ struct PeerTests {
let peer2 = try Peer(
options: PeerOptions<MockStreamHandler>(
role: .validator,
listenAddress: NetAddr(ipAddress: "127.0.0.1", port: 0)!,
listenAddress: NetAddr(ipAddress: "127.0.0.1", port: 1)!,
genesisHeader: Data32(),
secretKey: Ed25519.SecretKey(from: Data32.random()),
presistentStreamHandler: MockPresentStreamHandler(),
Expand All @@ -383,6 +423,7 @@ struct PeerTests {
let dataList1 = try await connection1.request(
MockRequest(kind: .typeA, data: Data("hello world".utf8))
)
try? await Task.sleep(for: .milliseconds(100))
#expect(dataList1 == Data("hello world response".utf8))
}

Expand Down

0 comments on commit 33f39be

Please sign in to comment.