From e3879eb9d504e8f55c2797e758d7680b2caab086 Mon Sep 17 00:00:00 2001 From: MacOMNI <414294494@qq.com> Date: Wed, 13 Nov 2024 09:37:48 +0800 Subject: [PATCH] update reopen up stream (#218) * update reopen up stream * update reopen up stream * update reopen up stream * update test --- .../Sources/MsQuicSwift/QuicStream.swift | 1 - .../Sources/Networking/Connection.swift | 26 +++- Networking/Sources/Networking/Peer.swift | 130 +++++++++++++----- .../Tests/NetworkingTests/PeerTests.swift | 121 +++++++++++++++- Utils/Sources/Utils/UniqueId.swift | 4 + 5 files changed, 245 insertions(+), 37 deletions(-) diff --git a/Networking/Sources/MsQuicSwift/QuicStream.swift b/Networking/Sources/MsQuicSwift/QuicStream.swift index 55a88e34..9690d75f 100644 --- a/Networking/Sources/MsQuicSwift/QuicStream.swift +++ b/Networking/Sources/MsQuicSwift/QuicStream.swift @@ -207,7 +207,6 @@ private class StreamHandle { case QUIC_STREAM_EVENT_PEER_SEND_ABORTED: logger.trace("Peer send aborted") - // TODO: check if we need to close the stream completely case QUIC_STREAM_EVENT_SHUTDOWN_COMPLETE: logger.trace("Stream shutdown complete") diff --git a/Networking/Sources/Networking/Connection.swift b/Networking/Sources/Networking/Connection.swift index bc7b54cb..2fe24b91 100644 --- a/Networking/Sources/Networking/Connection.swift +++ b/Networking/Sources/Networking/Connection.swift @@ -234,13 +234,37 @@ public final class Connection: Sendable, ConnectionInfoP return } if let upKind = Handler.PresistentHandler.StreamKind(rawValue: byte) { - // TODO: handle duplicated UP streams + // Check for duplicate UP streams + let existingStream = presistentStreams.read { presistentStreams in + presistentStreams[upKind] + } + if let existingStream { + if existingStream.stream.id < stream.stream.id { + // The new stream has a higher ID, so reset the existing one + existingStream.close(abort: false) + logger.debug( + "Reset older UP stream with lower ID", + metadata: ["existingStreamId": "\(existingStream.stream.id)", "newStreamId": "\(stream.stream.id)"] + ) + } else { + // The existing stream has a higher ID or is equal, so reset the new one + stream.close(abort: false) + logger.debug( + "Duplicate UP stream detected, closing new stream with lower or equal ID", + metadata: ["existingStreamId": "\(existingStream.stream.id)", "newStreamId": "\(stream.stream.id)"] + ) + return // Exit without replacing the existing stream + } + } + + // Write the new stream as the active one for this UP kind presistentStreams.write { presistentStreams in presistentStreams[upKind] = stream } runPresistentStreamLoop(stream: stream, kind: upKind) return } + if let ceKind = Handler.EphemeralHandler.StreamKind(rawValue: byte) { logger.debug("stream opened. kind: \(ceKind)") diff --git a/Networking/Sources/Networking/Peer.swift b/Networking/Sources/Networking/Peer.swift index 76e9a8eb..32829072 100644 --- a/Networking/Sources/Networking/Peer.swift +++ b/Networking/Sources/Networking/Peer.swift @@ -16,7 +16,7 @@ public enum PeerRole: Sendable, Hashable { // case proxy // not yet specified } -struct ReconnectState { +struct BackoffState { var attempt: Int var delay: TimeInterval @@ -25,7 +25,6 @@ struct ReconnectState { delay = 1 } - // Initializer with custom values init(attempt: Int = 0, delay: TimeInterval = 1) { self.attempt = attempt self.delay = delay @@ -234,9 +233,10 @@ final class PeerImpl: Sendable { fileprivate let connections: ThreadSafeContainer = .init(.init()) fileprivate let streams: ThreadSafeContainer<[UniqueId: Stream]> = .init([:]) - fileprivate let reconnectStates: ThreadSafeContainer<[NetAddr: ReconnectState]> = .init([:]) + fileprivate let reconnectStates: ThreadSafeContainer<[NetAddr: BackoffState]> = .init([:]) + fileprivate let reopenStates: ThreadSafeContainer<[UniqueId: BackoffState]> = .init([:]) - let reconnectMaxRetryAttempts = 5 + let maxRetryAttempts = 5 let presistentStreamHandler: Handler.PresistentHandler let ephemeralStreamHandler: Handler.EphemeralHandler @@ -297,39 +297,70 @@ final class PeerImpl: Sendable { let state = reconnectStates.read { reconnectStates in reconnectStates[address] ?? .init() } - if state.attempt < reconnectMaxRetryAttempts { - reconnectStates.write { reconnectStates in - if var state = reconnectStates[address] { - state.applyBackoff() - reconnectStates[address] = state - } + + guard state.attempt < maxRetryAttempts else { + logger.warning("reconnecting to \(address) exceeded max attempts") + return + } + + reconnectStates.write { reconnectStates in + if var state = reconnectStates[address] { + state.applyBackoff() + reconnectStates[address] = state } - Task { - try await Task.sleep(for: .seconds(state.delay)) - try connections.write { connections in - if connections.byAddr[address] != nil { - logger.warning("reconnecting to \(address) already connected") - return - } - let quicConn = try QuicConnection( - handler: PeerEventHandler(self), - registration: clientConfiguration.registration, - configuration: clientConfiguration - ) - try quicConn.connect(to: address) - let conn = Connection( - quicConn, - impl: self, - role: role, - remoteAddress: address, - initiatedByLocal: true - ) - connections.byAddr[address] = conn - connections.byId[conn.id] = conn + } + Task { + try await Task.sleep(for: .seconds(state.delay)) + try connections.write { connections in + if connections.byAddr[address] != nil { + logger.warning("reconnecting to \(address) already connected") + return } + let quicConn = try QuicConnection( + handler: PeerEventHandler(self), + registration: clientConfiguration.registration, + configuration: clientConfiguration + ) + try quicConn.connect(to: address) + let conn = Connection( + quicConn, + impl: self, + role: role, + remoteAddress: address, + initiatedByLocal: true + ) + connections.byAddr[address] = conn + connections.byId[conn.id] = conn + } + } + } + + func reopenUpStream(connection: Connection, kind: Handler.PresistentHandler.StreamKind) { + let state = reopenStates.read { states in + states[connection.id] ?? .init() + } + + guard state.attempt < maxRetryAttempts else { + logger.warning("Reopen attempt for stream \(kind) on connection \(connection.id) exceeded max attempts") + return + } + + reopenStates.write { states in + if var state = states[connection.id] { + state.applyBackoff() + states[connection.id] = state + } + } + + Task { + try await Task.sleep(for: .seconds(state.delay)) + do { + logger.debug("Attempting to reopen UP stream of kind \(kind) for connection \(connection.id)") + try connection.createPreistentStream(kind: kind) + } catch { + logger.error("Failed to reopen UP stream for connection \(connection.id): \(error)") + reopenUpStream(connection: connection, kind: kind) } - } else { - logger.warning("reconnect attempt exceeded max attempts") } } @@ -546,6 +577,10 @@ private struct PeerEventHandler: QuicEventHandler { } if let conn { conn.streamStarted(stream: stream) + // Check + impl.reopenStates.write { states in + states[conn.id] = nil + } } } @@ -562,12 +597,25 @@ private struct PeerEventHandler: QuicEventHandler { let stream = impl.streams.read { streams in streams[quicStream.id] } + if let stream { let connection = impl.connections.read { connections in connections.byId[stream.connectionId] } if let connection { connection.streamClosed(stream: stream, abort: !status.isSucceeded) + if shouldReopenStream(connection: connection, stream: stream, status: status) { + do { + if let kind = stream.kind { + // impl.reopenUpStream(connection: connection, kind: kind); + do { + try connection.createPreistentStream(kind: kind) + } catch { + logger.error("Attempt to recreate the persistent stream failed: \(error)") + } + } + } + } } else { logger.warning( "Stream closed but connection is gone?", metadata: ["streamId": "\(stream.id)"] @@ -579,4 +627,18 @@ private struct PeerEventHandler: QuicEventHandler { ) } } + + // TODO: Add all the cases about reopen up stream + private func shouldReopenStream(connection: Connection, stream: Stream, status: QuicStatus) -> Bool { + // Only reopen if the stream is a persistent UP stream and the closure was unexpected + if connection.isClosed || connection.needReconnect || stream.kind == nil { + return false + } + switch QuicStatusCode(rawValue: status.rawValue) { + case .connectionIdle, .badCert: + return false + default: + return !status.isSucceeded + } + } } diff --git a/Networking/Tests/NetworkingTests/PeerTests.swift b/Networking/Tests/NetworkingTests/PeerTests.swift index ba541635..ca189d51 100644 --- a/Networking/Tests/NetworkingTests/PeerTests.swift +++ b/Networking/Tests/NetworkingTests/PeerTests.swift @@ -143,6 +143,124 @@ struct PeerTests { typealias EphemeralHandler = MockEphemeralStreamHandler } + @Test + func reopenUpStream() async throws { + let handler2 = MockPresentStreamHandler() + var messageData = Data("reopen up stream".utf8) + let peer1 = try Peer( + options: PeerOptions( + role: .validator, + listenAddress: NetAddr(ipAddress: "127.0.0.1", port: 0)!, + genesisHeader: Data32(), + secretKey: Ed25519.SecretKey(from: Data32.random()), + presistentStreamHandler: MockPresentStreamHandler(), + ephemeralStreamHandler: MockEphemeralStreamHandler(), + serverSettings: .defaultSettings, + clientSettings: .defaultSettings + ) + ) + let peer2 = try Peer( + options: PeerOptions( + role: .validator, + listenAddress: NetAddr(ipAddress: "127.0.0.1", port: 0)!, + genesisHeader: Data32(), + secretKey: Ed25519.SecretKey(from: Data32.random()), + presistentStreamHandler: handler2, + ephemeralStreamHandler: MockEphemeralStreamHandler(), + serverSettings: .defaultSettings, + clientSettings: .defaultSettings + ) + ) + try? await Task.sleep(for: .milliseconds(100)) + + let connection = try peer1.connect( + to: peer2.listenAddress(), role: .validator + ) + try? await Task.sleep(for: .milliseconds(100)) + + peer1.broadcast( + kind: .uniqueA, message: .init(kind: .uniqueA, data: messageData) + ) + try? await Task.sleep(for: .milliseconds(100)) + let lastReceivedData = await handler2.lastReceivedData + #expect(lastReceivedData == messageData) + + try? await Task.sleep(for: .milliseconds(100)) + // Simulate abnormal close stream + let stream = connection.presistentStreams.read { presistentStreams in + presistentStreams[.uniqueA] + } + stream!.close(abort: true) + // Wait to simulate downtime & reopen up stream 3~5s + try? await Task.sleep(for: .milliseconds(3000)) + messageData = Data("reopen up stream data".utf8) + peer1.broadcast( + kind: .uniqueA, message: .init(kind: .uniqueA, data: messageData) + ) + try await Task.sleep(for: .milliseconds(1000)) + let lastReceivedData2 = await handler2.lastReceivedData + #expect(lastReceivedData2 == messageData) + } + + @Test + func regularClosedStream() async throws { + let handler2 = MockPresentStreamHandler() + var messageData = Data("reopen up stream".utf8) + let peer1 = try Peer( + options: PeerOptions( + role: .validator, + listenAddress: NetAddr(ipAddress: "127.0.0.1", port: 0)!, + genesisHeader: Data32(), + secretKey: Ed25519.SecretKey(from: Data32.random()), + presistentStreamHandler: MockPresentStreamHandler(), + ephemeralStreamHandler: MockEphemeralStreamHandler(), + serverSettings: .defaultSettings, + clientSettings: .defaultSettings + ) + ) + let peer2 = try Peer( + options: PeerOptions( + role: .validator, + listenAddress: NetAddr(ipAddress: "127.0.0.1", port: 0)!, + genesisHeader: Data32(), + secretKey: Ed25519.SecretKey(from: Data32.random()), + presistentStreamHandler: handler2, + ephemeralStreamHandler: MockEphemeralStreamHandler(), + serverSettings: .defaultSettings, + clientSettings: .defaultSettings + ) + ) + try? await Task.sleep(for: .milliseconds(100)) + + let connection = try peer1.connect( + to: peer2.listenAddress(), role: .validator + ) + try? await Task.sleep(for: .milliseconds(100)) + + peer1.broadcast( + kind: .uniqueA, message: .init(kind: .uniqueA, data: messageData) + ) + try? await Task.sleep(for: .milliseconds(100)) + let lastReceivedData = await handler2.lastReceivedData + #expect(lastReceivedData == messageData) + + try? await Task.sleep(for: .milliseconds(100)) + // Simulate regular close stream + let stream = connection.presistentStreams.read { presistentStreams in + presistentStreams[.uniqueA] + } + stream!.close(abort: false) + // Wait to simulate downtime + try? await Task.sleep(for: .milliseconds(3000)) + messageData = Data("close up stream".utf8) + peer1.broadcast( + kind: .uniqueA, message: .init(kind: .uniqueA, data: messageData) + ) + try await Task.sleep(for: .milliseconds(1000)) + let lastReceivedData2 = await handler2.lastReceivedData + #expect(lastReceivedData2 != messageData) + } + @Test func concurrentPeerConnection() async throws { let peer1 = try Peer( @@ -611,10 +729,11 @@ struct PeerTests { data: Data("Message from peer \(i)".utf8) ) peer.broadcast(kind: message.kind, message: message) + try? await Task.sleep(for: .milliseconds(50)) } // Wait for message propagation - try? await Task.sleep(for: .milliseconds(100)) + try? await Task.sleep(for: .milliseconds(1000)) // everyone should receive two messages for (idx, handler) in handlers.enumerated() { diff --git a/Utils/Sources/Utils/UniqueId.swift b/Utils/Sources/Utils/UniqueId.swift index 2b58eb63..201b594c 100644 --- a/Utils/Sources/Utils/UniqueId.swift +++ b/Utils/Sources/Utils/UniqueId.swift @@ -21,6 +21,10 @@ extension UniqueId: Equatable { public static func == (lhs: UniqueId, rhs: UniqueId) -> Bool { lhs.id == rhs.id } + + public static func < (lhs: UniqueId, rhs: UniqueId) -> Bool { + lhs.id < rhs.id + } } extension UniqueId: Hashable {