Skip to content

Commit

Permalink
improve dedup connection logic (#213)
Browse files Browse the repository at this point in the history
* improve dedup connection logic

* Update Networking/Tests/NetworkingTests/PeerTests.swift

Co-authored-by: Xiliang Chen <[email protected]>

* Update Networking/Tests/NetworkingTests/PeerTests.swift

Co-authored-by: Xiliang Chen <[email protected]>

* Update Networking/Tests/NetworkingTests/PeerTests.swift

Co-authored-by: Xiliang Chen <[email protected]>

* update test

---------

Co-authored-by: Xiliang Chen <[email protected]>
  • Loading branch information
MacOMNI and xlc authored Oct 31, 2024
1 parent 0bd14b7 commit 026e249
Show file tree
Hide file tree
Showing 2 changed files with 77 additions and 36 deletions.
48 changes: 24 additions & 24 deletions Networking/Sources/Networking/Peer.swift
Original file line number Diff line number Diff line change
Expand Up @@ -303,49 +303,49 @@ private struct PeerEventHandler<Handler: StreamHandler>: QuicEventHandler {
connections.byId[connection.id]
}
guard let conn else {
logger.warning("Trying to open connection but connection is gone?", metadata: ["connectionId": "\(connection.id)"])
logger.warning("Attempt to open but connection is absent", metadata: ["connectionId": "\(connection.id)"])
return .code(.connectionRefused)
}

do {
let (publicKey, alternativeName) = try parseCertificate(data: certificate, type: .x509)
logger.trace("Certificate parsed", metadata: [
"connectionId": "\(connection.id)",
"publicKey": "\(publicKey.toHexString())",
"alternativeName": "\(alternativeName)",
])
if alternativeName != generateSubjectAlternativeName(pubkey: publicKey) {
return .code(.badCert)
}

if publicKey == impl.publicKey {
// self connection
logger.trace("self connection rejected", metadata: [
"connectionId": "\(connection.id)",
"publicKey": "\(publicKey.toHexString())",
])
// Self connection detected
logger.trace("Rejecting self-connection", metadata: ["connectionId": "\(connection.id)"])
return .code(.connectionRefused)
}

if alternativeName != generateSubjectAlternativeName(pubkey: publicKey) {
return .code(.badCert)
}
// TODO: verify if it is current or next validator

// Check for an existing connection by public key
return try impl.connections.write { connections in
if connections.byPublicKey.keys.contains(publicKey) {
// duplicated connection
logger.debug("duplicated connection rejected", metadata: [
"connectionId": "\(connection.id)",
"publicKey": "\(publicKey.toHexString())",
])
// TODO: write a test for this
return .code(.connectionRefused)
// Deterministically decide based on public key comparison
if !publicKey.lexicographicallyPrecedes(impl.publicKey) {
connections.byPublicKey[publicKey] = conn
try conn.opened(publicKey: publicKey)
return .code(.success)
} else {
logger.debug("Rejecting duplicate connection by rule", metadata: [
"connectionId": "\(connection.id)", "publicKey": "\(publicKey.toHexString())",
])
return .code(.connectionRefused)
}
} else {
connections.byPublicKey[publicKey] = conn
try conn.opened(publicKey: publicKey)
return .code(.success)
}
connections.byPublicKey[publicKey] = conn
try conn.opened(publicKey: publicKey)
return .code(.success)
}
} catch {
logger.warning("Failed to parse certificate", metadata: [
"connectionId": "\(connection.id)",
"error": "\(error)"])
logger.warning("Certificate parsing failed", metadata: ["connectionId": "\(connection.id)", "error": "\(error)"])
return .code(.badCert)
}
}
Expand Down
65 changes: 53 additions & 12 deletions Networking/Tests/NetworkingTests/PeerTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,46 @@ struct PeerTests {
typealias EphemeralHandler = MockEphemeralStreamHandler
}

@Test
func concurrentPeerConnection() async throws {
let peer1 = try Peer(
options: PeerOptions<MockStreamHandler>(
role: .validator,
listenAddress: NetAddr(ipAddress: "127.0.0.1", port: 0)!,
genesisHeader: Data32(),
secretKey: Ed25519.SecretKey(from: Data32.random()),
presistentStreamHandler: MockPresentStreamHandler(),
ephemeralStreamHandler: MockEphemeralStreamHandler(),
serverSettings: .defaultSettings,
clientSettings: .defaultSettings
)
)
let peer2 = try Peer(
options: PeerOptions<MockStreamHandler>(
role: .validator,
listenAddress: NetAddr(ipAddress: "127.0.0.1", port: 0)!,
genesisHeader: Data32(),
secretKey: Ed25519.SecretKey(from: Data32.random()),
presistentStreamHandler: MockPresentStreamHandler(),
ephemeralStreamHandler: MockEphemeralStreamHandler(),
serverSettings: .defaultSettings,
clientSettings: .defaultSettings
)
)

let connection1 = try peer1.connect(to: peer2.listenAddress(), role: .validator)
let connection2 = try peer2.connect(to: peer1.listenAddress(), role: .validator)
try? await Task.sleep(for: .milliseconds(50))
if !connection1.isClosed {
let data = try await connection1.request(MockRequest(kind: .typeA, data: Data("hello world".utf8)))
#expect(data == Data("hello world response".utf8))
}
if !connection2.isClosed {
let data = try await connection2.request(MockRequest(kind: .typeA, data: Data("hello world".utf8)))
#expect(data == Data("hello world response".utf8))
}
}

@Test
func largeDataRequest() async throws {
let handler1 = MockPresentStreamHandler()
Expand All @@ -159,7 +199,7 @@ struct PeerTests {
let peer1 = try Peer(
options: PeerOptions<MockStreamHandler>(
role: .validator,
listenAddress: NetAddr(ipAddress: "127.0.0.1", port: 9085)!,
listenAddress: NetAddr(ipAddress: "127.0.0.1", port: 0)!,
genesisHeader: Data32(),
secretKey: Ed25519.SecretKey(from: Data32.random()),
presistentStreamHandler: handler1,
Expand All @@ -172,7 +212,7 @@ struct PeerTests {
let peer2 = try Peer(
options: PeerOptions<MockStreamHandler>(
role: .validator,
listenAddress: NetAddr(ipAddress: "127.0.0.1", port: 9086)!,
listenAddress: NetAddr(ipAddress: "127.0.0.1", port: 0)!,
genesisHeader: Data32(),
secretKey: Ed25519.SecretKey(from: Data32.random()),
presistentStreamHandler: handler2,
Expand All @@ -185,7 +225,7 @@ struct PeerTests {
try? await Task.sleep(for: .milliseconds(50))

let connection1 = try peer1.connect(
to: NetAddr(ipAddress: "127.0.0.1", port: 9086)!, role: .validator
to: peer2.listenAddress(), role: .validator
)
try? await Task.sleep(for: .milliseconds(50))

Expand Down Expand Up @@ -218,7 +258,7 @@ struct PeerTests {
let peer1 = try Peer(
options: PeerOptions<MockStreamHandler>(
role: .validator,
listenAddress: NetAddr(ipAddress: "127.0.0.1", port: 185)!,
listenAddress: NetAddr(ipAddress: "127.0.0.1", port: 0)!,
genesisHeader: Data32(),
secretKey: Ed25519.SecretKey(from: Data32.random()),
presistentStreamHandler: MockPresentStreamHandler(),
Expand All @@ -231,7 +271,7 @@ struct PeerTests {
let peer2 = try Peer(
options: PeerOptions<MockStreamHandler>(
role: .validator,
listenAddress: NetAddr(ipAddress: "127.0.0.1", port: 186)!,
listenAddress: NetAddr(ipAddress: "127.0.0.1", port: 0)!,
genesisHeader: Data32(),
secretKey: Ed25519.SecretKey(from: Data32.random()),
presistentStreamHandler: handler2,
Expand All @@ -244,7 +284,7 @@ struct PeerTests {
let peer3 = try Peer(
options: PeerOptions<MockStreamHandler>(
role: .validator,
listenAddress: NetAddr(ipAddress: "127.0.0.1", port: 187)!,
listenAddress: NetAddr(ipAddress: "127.0.0.1", port: 0)!,
genesisHeader: Data32(),
secretKey: Ed25519.SecretKey(from: Data32.random()),
presistentStreamHandler: handler2,
Expand All @@ -257,7 +297,7 @@ struct PeerTests {
try? await Task.sleep(for: .milliseconds(50))

let connection = try peer1.connect(
to: NetAddr(ipAddress: "127.0.0.1", port: 186)!, role: .validator
to: peer2.listenAddress(), role: .validator
)
try? await Task.sleep(for: .milliseconds(50))

Expand Down Expand Up @@ -308,7 +348,7 @@ struct PeerTests {
let peer1 = try Peer(
options: PeerOptions<MockStreamHandler>(
role: .validator,
listenAddress: NetAddr(ipAddress: "127.0.0.1", port: 8081)!,
listenAddress: NetAddr(ipAddress: "127.0.0.1", port: 0)!,
genesisHeader: Data32(),
secretKey: Ed25519.SecretKey(from: Data32.random()),
presistentStreamHandler: handler1,
Expand All @@ -320,7 +360,7 @@ struct PeerTests {
let peer2 = try Peer(
options: PeerOptions<MockStreamHandler>(
role: .validator,
listenAddress: NetAddr(ipAddress: "127.0.0.1", port: 8082)!,
listenAddress: NetAddr(ipAddress: "127.0.0.1", port: 0)!,
genesisHeader: Data32(),
secretKey: Ed25519.SecretKey(from: Data32.random()),
presistentStreamHandler: handler2,
Expand All @@ -331,7 +371,7 @@ struct PeerTests {
)

_ = try peer1.connect(
to: NetAddr(ipAddress: "127.0.0.1", port: 8082)!, role: .validator
to: peer2.listenAddress(), role: .validator
)

try? await Task.sleep(for: .milliseconds(50))
Expand All @@ -344,7 +384,7 @@ struct PeerTests {
kind: .uniqueB, message: .init(kind: .uniqueB, data: Data("I am jam".utf8))
)
// Verify last received data
try? await Task.sleep(for: .milliseconds(50))
try? await Task.sleep(for: .milliseconds(100))
await #expect(handler2.lastReceivedData == Data("hello world".utf8))
await #expect(handler1.lastReceivedData == Data("I am jam".utf8))
}
Expand Down Expand Up @@ -383,6 +423,7 @@ struct PeerTests {
let dataList1 = try await connection1.request(
MockRequest(kind: .typeA, data: Data("hello world".utf8))
)
try? await Task.sleep(for: .milliseconds(100))
#expect(dataList1 == Data("hello world response".utf8))
}

Expand Down Expand Up @@ -452,7 +493,7 @@ struct PeerTests {
let peer = try Peer(
options: PeerOptions<MockStreamHandler>(
role: .builder,
listenAddress: NetAddr(ipAddress: "127.0.0.1", port: UInt16(7081 + i))!,
listenAddress: NetAddr(ipAddress: "127.0.0.1", port: 0)!,
genesisHeader: Data32(),
secretKey: Ed25519.SecretKey(from: Data32.random()),
presistentStreamHandler: handler,
Expand Down

0 comments on commit 026e249

Please sign in to comment.