Skip to content

Commit

Permalink
update connection rotation strategy
Browse files Browse the repository at this point in the history
  • Loading branch information
MacOMNI committed Nov 8, 2024
1 parent d36b5aa commit 97c77e7
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 7 deletions.
10 changes: 10 additions & 0 deletions Networking/Sources/Networking/Connection.swift
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ public final class Connection<Handler: StreamHandler>: Sendable, ConnectionInfoP

public let role: PeerRole
public let remoteAddress: NetAddr
private let lastActive: ThreadSafeContainer<TimeInterval> = .init(0)

let presistentStreams: ThreadSafeContainer<
[Handler.PresistentHandler.StreamKind: Stream<Handler>]
Expand All @@ -58,6 +59,10 @@ public final class Connection<Handler: StreamHandler>: Sendable, ConnectionInfoP
}
}

public var lastActiveTimeStamp: TimeInterval {
lastActive.read { $0 }
}

public var id: UniqueId {
connection.id
}
Expand All @@ -68,6 +73,11 @@ public final class Connection<Handler: StreamHandler>: Sendable, ConnectionInfoP
self.role = role
self.remoteAddress = remoteAddress
self.initiatedByLocal = initiatedByLocal
lastActive.write { $0 = Date().timeIntervalSince1970 }
}

func updateLastActive() {
lastActive.write { $0 = Date().timeIntervalSince1970 }
}

func opened(publicKey: Data) throws {
Expand Down
37 changes: 30 additions & 7 deletions Networking/Sources/Networking/Peer.swift
Original file line number Diff line number Diff line change
Expand Up @@ -271,9 +271,19 @@ final class PeerImpl<Handler: StreamHandler>: Sendable {
if role == .builder {
let currentCount = connections.byAddr.values.filter { $0.role == role }.count
if currentCount >= self.settings.maxBuilderConnections {
self.logger.warning("max builder connections reached")
// TODO: consider connection rotation strategy
return false
if let leastActiveConn = connections.byAddr.values.filter({ $0.role == .builder })
.sorted(by: { $0.lastActiveTimeStamp < $1.lastActiveTimeStamp }).first
{
self.logger.warning(
"Replacing least active builder connection at \(leastActiveConn.remoteAddress)"
)
leastActiveConn.close(abort: false)
} else {
self.logger.warning(
"Max builder connections reached, no eligible replacement found"
)
return false
}
}
}
if connections.byAddr[addr] != nil {
Expand Down Expand Up @@ -341,7 +351,9 @@ final class PeerImpl<Handler: StreamHandler>: Sendable {
}

guard state.attempt < maxRetryAttempts else {
logger.warning("Reopen attempt for stream \(kind) on connection \(connection.id) exceeded max attempts")
logger.warning(
"Reopen attempt for stream \(kind) on connection \(connection.id) exceeded max attempts"
)
return
}

Expand All @@ -355,7 +367,9 @@ final class PeerImpl<Handler: StreamHandler>: Sendable {
Task {
try await Task.sleep(for: .seconds(state.delay))
do {
logger.debug("Attempting to reopen UP stream of kind \(kind) for connection \(connection.id)")
logger.debug(
"Attempting to reopen UP stream of kind \(kind) for connection \(connection.id)"
)
try connection.createPreistentStream(kind: kind)
} catch {
logger.error("Failed to reopen UP stream for connection \(connection.id): \(error)")
Expand Down Expand Up @@ -588,8 +602,13 @@ private struct PeerEventHandler<Handler: StreamHandler>: QuicEventHandler {
let stream = impl.streams.read { streams in
streams[stream.id]
}

if let stream {
stream.received(data: data)
let connection = impl.connections.read { connections in
connections.byId[stream.connectionId]
}
connection?.updateLastActive()
}
}

Expand All @@ -611,7 +630,9 @@ private struct PeerEventHandler<Handler: StreamHandler>: QuicEventHandler {
do {
try connection.createPreistentStream(kind: kind)
} catch {
logger.error("Attempt to recreate the persistent stream failed: \(error)")
logger.error(
"Attempt to recreate the persistent stream failed: \(error)"
)
}
}
}
Expand All @@ -629,7 +650,9 @@ private struct PeerEventHandler<Handler: StreamHandler>: QuicEventHandler {
}

// TODO: Add all the cases about reopen up stream
private func shouldReopenStream(connection: Connection<Handler>, stream: Stream<Handler>, status: QuicStatus) -> Bool {
private func shouldReopenStream(
connection: Connection<Handler>, stream: Stream<Handler>, status: QuicStatus
) -> Bool {
// Only reopen if the stream is a persistent UP stream and the closure was unexpected
if connection.isClosed || connection.needReconnect || stream.kind == nil {
return false
Expand Down

0 comments on commit 97c77e7

Please sign in to comment.