Skip to content

Commit

Permalink
update todo
Browse files Browse the repository at this point in the history
  • Loading branch information
MacOMNI committed Oct 25, 2024
1 parent b87eb42 commit 0a401e3
Showing 1 changed file with 39 additions and 12 deletions.
51 changes: 39 additions & 12 deletions Networking/Sources/Networking/Connection.swift
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,20 @@ public final class Connection<Handler: StreamHandler>: Sendable, ConnectionInfoP
let impl: PeerImpl<Handler>
public let role: PeerRole
public let remoteAddress: NetAddr
let presistentStreams: ThreadSafeContainer<
[Handler.PresistentHandler.StreamKind: Stream<Handler>]
> = .init([:])
let presistentStreams:
ThreadSafeContainer<
[Handler.PresistentHandler.StreamKind: Stream<Handler>]
> = .init([:])
let initiatedByLocal: Bool

public var id: UniqueId {
connection.id
}

init(_ connection: QuicConnection, impl: PeerImpl<Handler>, role: PeerRole, remoteAddress: NetAddr, initiatedByLocal: Bool) {
init(
_ connection: QuicConnection, impl: PeerImpl<Handler>, role: PeerRole,
remoteAddress: NetAddr, initiatedByLocal: Bool
) {
self.connection = connection
self.impl = impl
self.role = role
Expand Down Expand Up @@ -67,7 +71,9 @@ public final class Connection<Handler: StreamHandler>: Sendable, ConnectionInfoP
}

@discardableResult
func createPreistentStream(kind: Handler.PresistentHandler.StreamKind) throws -> Stream<Handler>? {
func createPreistentStream(kind: Handler.PresistentHandler.StreamKind) throws -> Stream<
Handler
>? {
let stream = presistentStreams.read { presistentStreams in
presistentStreams[kind]
}
Expand Down Expand Up @@ -99,8 +105,12 @@ public final class Connection<Handler: StreamHandler>: Sendable, ConnectionInfoP
)
}

func createStream(kind: UInt8, presistentKind: Handler.PresistentHandler.StreamKind?) throws -> Stream<Handler> {
let stream = try Stream(connection.createStream(), connectionId: id, impl: impl, kind: presistentKind)
func createStream(kind: UInt8, presistentKind: Handler.PresistentHandler.StreamKind?) throws
-> Stream<Handler>
{
let stream = try Stream(
connection.createStream(), connectionId: id, impl: impl, kind: presistentKind
)
impl.addStream(stream)
try stream.send(data: Data([kind]))
return stream
Expand All @@ -123,6 +133,7 @@ public final class Connection<Handler: StreamHandler>: Sendable, ConnectionInfoP
return
}
if let upKind = Handler.PresistentHandler.StreamKind(rawValue: byte) {
// TODO: handle duplicated UP streams
presistentStreams.write { presistentStreams in
presistentStreams[upKind] = stream
}
Expand All @@ -140,9 +151,12 @@ public final class Connection<Handler: StreamHandler>: Sendable, ConnectionInfoP
logger.debug("Invalid request length")
return
}
// sanity check for length
// TODO: pick better value
guard length < 1024 * 1024 * 10 else {
stream.close(abort: true)
logger.debug("Invalid request length: \(length)")
// TODO: report bad peer
return
}
let data = await stream.receive(count: Int(length))
Expand All @@ -152,7 +166,9 @@ public final class Connection<Handler: StreamHandler>: Sendable, ConnectionInfoP
return
}
let request = try decoder.decode(data: data)
let resp = try await impl.ephemeralStreamHandler.handle(connection: self, request: request)
let resp = try await impl.ephemeralStreamHandler.handle(
connection: self, request: request
)
try stream.send(message: resp, finish: true)
}
}
Expand Down Expand Up @@ -181,23 +197,32 @@ func presistentStreamRunLoop<Handler: StreamHandler>(
} catch {
logger.debug(
"Failed to setup presistent stream",
metadata: ["connectionId": "\(connection.id)", "streamId": "\(stream.id)", "kind": "\(kind)", "error": "\(error)"]
metadata: [
"connectionId": "\(connection.id)", "streamId": "\(stream.id)",
"kind": "\(kind)", "error": "\(error)",
]
)
}
logger.debug(
"Starting presistent stream run loop",
metadata: ["connectionId": "\(connection.id)", "streamId": "\(stream.id)", "kind": "\(kind)"]
metadata: [
"connectionId": "\(connection.id)", "streamId": "\(stream.id)", "kind": "\(kind)",
]
)
var decoder = handler.createDecoder(kind: kind)
do {
while true {
let lengthData = await stream.receive(count: 4)
guard let lengthData, let length = try? connection.decodeLength(from: lengthData) else {
guard let lengthData, let length = try? connection.decodeLength(from: lengthData)
else {
break
}
// sanity check for length
// TODO: pick better value
guard length < 1024 * 1024 * 10 else {
stream.close(abort: true)
logger.debug("Invalid message length: \(length)")
// TODO: report bad peer
return
}
let data = await stream.receive(count: Int(length))
Expand All @@ -214,7 +239,9 @@ func presistentStreamRunLoop<Handler: StreamHandler>(

logger.debug(
"Ending presistent stream run loop",
metadata: ["connectionId": "\(connection.id)", "streamId": "\(stream.id)", "kind": "\(kind)"]
metadata: [
"connectionId": "\(connection.id)", "streamId": "\(stream.id)", "kind": "\(kind)",
]
)
}
}

0 comments on commit 0a401e3

Please sign in to comment.