diff --git a/Networking/Sources/Networking/Connection.swift b/Networking/Sources/Networking/Connection.swift index 2fe24b91..46f2e7e4 100644 --- a/Networking/Sources/Networking/Connection.swift +++ b/Networking/Sources/Networking/Connection.swift @@ -36,6 +36,7 @@ public final class Connection: Sendable, ConnectionInfoP public let role: PeerRole public let remoteAddress: NetAddr + private let lastActive: ThreadSafeContainer = .init(0) let presistentStreams: ThreadSafeContainer< [Handler.PresistentHandler.StreamKind: Stream] @@ -58,6 +59,10 @@ public final class Connection: Sendable, ConnectionInfoP } } + public var lastActiveTimeStamp: TimeInterval { + lastActive.read { $0 } + } + public var id: UniqueId { connection.id } @@ -68,6 +73,11 @@ public final class Connection: Sendable, ConnectionInfoP self.role = role self.remoteAddress = remoteAddress self.initiatedByLocal = initiatedByLocal + lastActive.write { $0 = Date().timeIntervalSince1970 } + } + + func updateLastActive() { + lastActive.write { $0 = Date().timeIntervalSince1970 } } func opened(publicKey: Data) throws { diff --git a/Networking/Sources/Networking/Peer.swift b/Networking/Sources/Networking/Peer.swift index 32829072..705aa10c 100644 --- a/Networking/Sources/Networking/Peer.swift +++ b/Networking/Sources/Networking/Peer.swift @@ -271,9 +271,19 @@ final class PeerImpl: Sendable { if role == .builder { let currentCount = connections.byAddr.values.filter { $0.role == role }.count if currentCount >= self.settings.maxBuilderConnections { - self.logger.warning("max builder connections reached") - // TODO: consider connection rotation strategy - return false + if let leastActiveConn = connections.byAddr.values.filter({ $0.role == .builder }) + .sorted(by: { $0.lastActiveTimeStamp < $1.lastActiveTimeStamp }).first + { + self.logger.warning( + "Replacing least active builder connection at \(leastActiveConn.remoteAddress)" + ) + leastActiveConn.close(abort: false) + } else { + self.logger.warning( + "Max builder connections reached, no eligible replacement found" + ) + return false + } } } if connections.byAddr[addr] != nil { @@ -341,7 +351,9 @@ final class PeerImpl: Sendable { } guard state.attempt < maxRetryAttempts else { - logger.warning("Reopen attempt for stream \(kind) on connection \(connection.id) exceeded max attempts") + logger.warning( + "Reopen attempt for stream \(kind) on connection \(connection.id) exceeded max attempts" + ) return } @@ -355,7 +367,9 @@ final class PeerImpl: Sendable { Task { try await Task.sleep(for: .seconds(state.delay)) do { - logger.debug("Attempting to reopen UP stream of kind \(kind) for connection \(connection.id)") + logger.debug( + "Attempting to reopen UP stream of kind \(kind) for connection \(connection.id)" + ) try connection.createPreistentStream(kind: kind) } catch { logger.error("Failed to reopen UP stream for connection \(connection.id): \(error)") @@ -588,8 +602,13 @@ private struct PeerEventHandler: QuicEventHandler { let stream = impl.streams.read { streams in streams[stream.id] } + if let stream { stream.received(data: data) + let connection = impl.connections.read { connections in + connections.byId[stream.connectionId] + } + connection?.updateLastActive() } } @@ -611,7 +630,9 @@ private struct PeerEventHandler: QuicEventHandler { do { try connection.createPreistentStream(kind: kind) } catch { - logger.error("Attempt to recreate the persistent stream failed: \(error)") + logger.error( + "Attempt to recreate the persistent stream failed: \(error)" + ) } } } @@ -629,7 +650,9 @@ private struct PeerEventHandler: QuicEventHandler { } // TODO: Add all the cases about reopen up stream - private func shouldReopenStream(connection: Connection, stream: Stream, status: QuicStatus) -> Bool { + private func shouldReopenStream( + connection: Connection, stream: Stream, status: QuicStatus + ) -> Bool { // Only reopen if the stream is a persistent UP stream and the closure was unexpected if connection.isClosed || connection.needReconnect || stream.kind == nil { return false