Skip to content

Commit

Permalink
update connection rotation strategy
Browse files Browse the repository at this point in the history
  • Loading branch information
MacOMNI committed Nov 14, 2024
1 parent 2939432 commit 99acde2
Show file tree
Hide file tree
Showing 2 changed files with 64 additions and 3 deletions.
16 changes: 13 additions & 3 deletions Networking/Sources/Networking/Peer.swift
Original file line number Diff line number Diff line change
Expand Up @@ -271,9 +271,15 @@ final class PeerImpl<Handler: StreamHandler>: Sendable {
if role == .builder {
let currentCount = connections.byAddr.values.filter { $0.role == role }.count
if currentCount >= self.settings.maxBuilderConnections {
self.logger.warning("max builder connections reached")
// TODO: consider connection rotation strategy
return false
if let conn = connections.byAddr.values.filter({ $0.role == .builder })
.sorted(by: { $0.lastActiveTimeStamp < $1.lastActiveTimeStamp }).first
{
self.logger.warning("Replacing least active builder connection at \(conn.remoteAddress)")
conn.close(abort: false)
} else {
self.logger.warning("Max builder connections reached, no eligible replacement found")
return false
}
}
}
if connections.byAddr[addr] != nil {
Expand Down Expand Up @@ -590,6 +596,10 @@ private struct PeerEventHandler<Handler: StreamHandler>: QuicEventHandler {
}
if let stream {
stream.received(data: data)
let connection = impl.connections.read { connections in
connections.byId[stream.connectionId]
}
connection?.updateLastActive()
}
}

Expand Down
51 changes: 51 additions & 0 deletions Networking/Tests/NetworkingTests/PeerTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,57 @@ struct PeerTests {
typealias EphemeralHandler = MockEphemeralStreamHandler
}

@Test
func connectionRotationStrategy() async throws {
var peers: [Peer<MockStreamHandler>] = []
var handlers: [MockPresentStreamHandler] = []
let centerPeer = try Peer(
options: PeerOptions<MockStreamHandler>(
role: .validator,
listenAddress: NetAddr(ipAddress: "127.0.0.1", port: 0)!,
genesisHeader: Data32(),
secretKey: Ed25519.SecretKey(from: Data32.random()),
presistentStreamHandler: MockPresentStreamHandler(),
ephemeralStreamHandler: MockEphemeralStreamHandler(),
serverSettings: .defaultSettings,
clientSettings: .defaultSettings
)
)
// Create 30 peer nodes
for _ in 0 ..< 30 {
let handler = MockPresentStreamHandler()
handlers.append(handler)
let peer = try Peer(
options: PeerOptions<MockStreamHandler>(
role: .builder,
listenAddress: NetAddr(ipAddress: "127.0.0.1", port: 0)!,
genesisHeader: Data32(),
secretKey: Ed25519.SecretKey(from: Data32.random()),
presistentStreamHandler: handler,
ephemeralStreamHandler: MockEphemeralStreamHandler(),
serverSettings: .defaultSettings,
clientSettings: .defaultSettings
)
)
peers.append(peer)
}

// Make some connections
for i in 0 ..< 30 {
let peer = peers[i]
let con = try peer.connect(to: centerPeer.listenAddress(), role: .builder)
try await con.ready()
}
centerPeer.broadcast(kind: .uniqueA, message: .init(kind: .uniqueA, data: Data("connection rotation strategy".utf8)))
try? await Task.sleep(for: .milliseconds(500))
var receivedCount = 0
for handler in handlers {
receivedCount += await handler.receivedData.count
}
print("recievedCount: \(receivedCount)")
#expect(receivedCount == PeerSettings.defaultSettings.maxBuilderConnections)
}

@Test
func reopenUpStream() async throws {
let handler2 = MockPresentStreamHandler()
Expand Down

0 comments on commit 99acde2

Please sign in to comment.