From 0a401e30c7637b653ab470c40f4a5546a3575b65 Mon Sep 17 00:00:00 2001 From: MacOMNI <414294494@qq.com> Date: Fri, 25 Oct 2024 16:47:49 +0800 Subject: [PATCH] update todo --- .../Sources/Networking/Connection.swift | 51 ++++++++++++++----- 1 file changed, 39 insertions(+), 12 deletions(-) diff --git a/Networking/Sources/Networking/Connection.swift b/Networking/Sources/Networking/Connection.swift index a4a4f42b..7968d24c 100644 --- a/Networking/Sources/Networking/Connection.swift +++ b/Networking/Sources/Networking/Connection.swift @@ -22,16 +22,20 @@ public final class Connection: Sendable, ConnectionInfoP let impl: PeerImpl public let role: PeerRole public let remoteAddress: NetAddr - let presistentStreams: ThreadSafeContainer< - [Handler.PresistentHandler.StreamKind: Stream] - > = .init([:]) + let presistentStreams: + ThreadSafeContainer< + [Handler.PresistentHandler.StreamKind: Stream] + > = .init([:]) let initiatedByLocal: Bool public var id: UniqueId { connection.id } - init(_ connection: QuicConnection, impl: PeerImpl, role: PeerRole, remoteAddress: NetAddr, initiatedByLocal: Bool) { + init( + _ connection: QuicConnection, impl: PeerImpl, role: PeerRole, + remoteAddress: NetAddr, initiatedByLocal: Bool + ) { self.connection = connection self.impl = impl self.role = role @@ -67,7 +71,9 @@ public final class Connection: Sendable, ConnectionInfoP } @discardableResult - func createPreistentStream(kind: Handler.PresistentHandler.StreamKind) throws -> Stream? { + func createPreistentStream(kind: Handler.PresistentHandler.StreamKind) throws -> Stream< + Handler + >? { let stream = presistentStreams.read { presistentStreams in presistentStreams[kind] } @@ -99,8 +105,12 @@ public final class Connection: Sendable, ConnectionInfoP ) } - func createStream(kind: UInt8, presistentKind: Handler.PresistentHandler.StreamKind?) throws -> Stream { - let stream = try Stream(connection.createStream(), connectionId: id, impl: impl, kind: presistentKind) + func createStream(kind: UInt8, presistentKind: Handler.PresistentHandler.StreamKind?) throws + -> Stream + { + let stream = try Stream( + connection.createStream(), connectionId: id, impl: impl, kind: presistentKind + ) impl.addStream(stream) try stream.send(data: Data([kind])) return stream @@ -123,6 +133,7 @@ public final class Connection: Sendable, ConnectionInfoP return } if let upKind = Handler.PresistentHandler.StreamKind(rawValue: byte) { + // TODO: handle duplicated UP streams presistentStreams.write { presistentStreams in presistentStreams[upKind] = stream } @@ -140,9 +151,12 @@ public final class Connection: Sendable, ConnectionInfoP logger.debug("Invalid request length") return } + // sanity check for length + // TODO: pick better value guard length < 1024 * 1024 * 10 else { stream.close(abort: true) logger.debug("Invalid request length: \(length)") + // TODO: report bad peer return } let data = await stream.receive(count: Int(length)) @@ -152,7 +166,9 @@ public final class Connection: Sendable, ConnectionInfoP return } let request = try decoder.decode(data: data) - let resp = try await impl.ephemeralStreamHandler.handle(connection: self, request: request) + let resp = try await impl.ephemeralStreamHandler.handle( + connection: self, request: request + ) try stream.send(message: resp, finish: true) } } @@ -181,23 +197,32 @@ func presistentStreamRunLoop( } catch { logger.debug( "Failed to setup presistent stream", - metadata: ["connectionId": "\(connection.id)", "streamId": "\(stream.id)", "kind": "\(kind)", "error": "\(error)"] + metadata: [ + "connectionId": "\(connection.id)", "streamId": "\(stream.id)", + "kind": "\(kind)", "error": "\(error)", + ] ) } logger.debug( "Starting presistent stream run loop", - metadata: ["connectionId": "\(connection.id)", "streamId": "\(stream.id)", "kind": "\(kind)"] + metadata: [ + "connectionId": "\(connection.id)", "streamId": "\(stream.id)", "kind": "\(kind)", + ] ) var decoder = handler.createDecoder(kind: kind) do { while true { let lengthData = await stream.receive(count: 4) - guard let lengthData, let length = try? connection.decodeLength(from: lengthData) else { + guard let lengthData, let length = try? connection.decodeLength(from: lengthData) + else { break } + // sanity check for length + // TODO: pick better value guard length < 1024 * 1024 * 10 else { stream.close(abort: true) logger.debug("Invalid message length: \(length)") + // TODO: report bad peer return } let data = await stream.receive(count: Int(length)) @@ -214,7 +239,9 @@ func presistentStreamRunLoop( logger.debug( "Ending presistent stream run loop", - metadata: ["connectionId": "\(connection.id)", "streamId": "\(stream.id)", "kind": "\(kind)"] + metadata: [ + "connectionId": "\(connection.id)", "streamId": "\(stream.id)", "kind": "\(kind)", + ] ) } }