Skip to content

Commit

Permalink
p2p test (#194)
Browse files Browse the repository at this point in the history
* Merge branch 'master' into dev_peer

* update peer test

* update peer

* update peer test

* update test

* update peer test

* update test

* udpate close

* update stream event

* update stream close

* add todo

* handles unidirection peer

* update comment

---------

Co-authored-by: Bryan Chen <[email protected]>
  • Loading branch information
MacOMNI and xlc authored Oct 25, 2024
1 parent 0ea4adf commit 98694d8
Show file tree
Hide file tree
Showing 9 changed files with 388 additions and 267 deletions.
9 changes: 5 additions & 4 deletions Networking/Sources/MsQuicSwift/QuicEventHandler.swift
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ public protocol QuicEventHandler: Sendable {
func streamStarted(_ connect: QuicConnection, stream: QuicStream)

// stream events
func dataReceived(_ stream: QuicStream, data: Data)
// nil data indicate end of data stream
func dataReceived(_ stream: QuicStream, data: Data?)
func closed(_ stream: QuicStream, status: QuicStatus, code: QuicErrorCode)
}

Expand All @@ -60,7 +61,7 @@ extension QuicEventHandler {

public func streamStarted(_: QuicConnection, stream _: QuicStream) {}

public func dataReceived(_: QuicStream, data _: Data) {}
public func dataReceived(_: QuicStream, data _: Data?) {}

public func closed(_: QuicStream, status _: QuicStatus, code _: QuicErrorCode) {}
}
Expand All @@ -73,7 +74,7 @@ public final class MockQuicEventHandler: QuicEventHandler {
case shutdownInitiated(connection: QuicConnection, reason: ConnectionCloseReason)
case shutdownComplete(connection: QuicConnection)
case streamStarted(connection: QuicConnection, stream: QuicStream)
case dataReceived(stream: QuicStream, data: Data)
case dataReceived(stream: QuicStream, data: Data?)
case closed(stream: QuicStream, status: QuicStatus, code: QuicErrorCode)
}

Expand Down Expand Up @@ -122,7 +123,7 @@ public final class MockQuicEventHandler: QuicEventHandler {
}
}

public func dataReceived(_ stream: QuicStream, data: Data) {
public func dataReceived(_ stream: QuicStream, data: Data?) {
events.write { events in
events.append(.dataReceived(stream: stream, data: data))
}
Expand Down
4 changes: 4 additions & 0 deletions Networking/Sources/MsQuicSwift/QuicStream.swift
Original file line number Diff line number Diff line change
Expand Up @@ -201,9 +201,13 @@ private class StreamHandle {

case QUIC_STREAM_EVENT_PEER_SEND_SHUTDOWN:
logger.trace("Peer send shutdown")
if let stream {
stream.handler.dataReceived(stream, data: nil)
}

case QUIC_STREAM_EVENT_PEER_SEND_ABORTED:
logger.trace("Peer send aborted")
// TODO: check if we need to close the stream completely

case QUIC_STREAM_EVENT_SHUTDOWN_COMPLETE:
logger.trace("Stream shutdown complete")
Expand Down
86 changes: 43 additions & 43 deletions Networking/Sources/Networking/Connection.swift
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,11 @@ public protocol ConnectionInfoProtocol {
var remoteAddress: NetAddr { get }
}

enum ConnectionError: Error {
case receiveFailed
case invalidLength
}

public final class Connection<Handler: StreamHandler>: Sendable, ConnectionInfoProtocol {
let connection: QuicConnection
let impl: PeerImpl<Handler>
Expand Down Expand Up @@ -43,12 +48,8 @@ public final class Connection<Handler: StreamHandler>: Sendable, ConnectionInfoP
let kind = request.kind
let stream = try createStream(kind: kind)
try stream.send(message: data)
// TODO: pipe this to decoder directly to be able to reject early
var response = Data()
while let nextData = await stream.receive() {
response.append(nextData)
}
return response

return try await receiveData(stream: stream)
}

@discardableResult
Expand Down Expand Up @@ -120,31 +121,15 @@ public final class Connection<Handler: StreamHandler>: Sendable, ConnectionInfoP

var decoder = impl.ephemeralStreamHandler.createDecoder(kind: ceKind)

let lengthData = await stream.receive(count: 4)
guard let lengthData else {
stream.close(abort: true)
logger.debug("Invalid request length")
return
}
let length = UInt32(littleEndian: lengthData.withUnsafeBytes { $0.loadUnaligned(as: UInt32.self) })
// sanity check for length
// TODO: pick better value
guard length < 1024 * 1024 * 10 else {
do {
let data = try await receiveData(stream: stream)
let request = try decoder.decode(data: data)
let resp = try await impl.ephemeralStreamHandler.handle(connection: self, request: request)
try stream.send(message: resp, finish: true)
} catch {
logger.debug("Failed to handle request", metadata: ["error": "\(error)"])
stream.close(abort: true)
logger.debug("Invalid request length: \(length)")
// TODO: report bad peer
return
}
let data = await stream.receive(count: Int(length))
guard let data else {
stream.close(abort: true)
logger.debug("Invalid request data")
// TODO: report bad peer
return
}
let request = try decoder.decode(data: data)
let resp = try await impl.ephemeralStreamHandler.handle(connection: self, request: request)
try stream.send(message: resp, finish: true)
}
}
}
Expand All @@ -159,6 +144,34 @@ public final class Connection<Handler: StreamHandler>: Sendable, ConnectionInfoP
}
}

// expect length prefixed data
// stream close is an error
private func receiveData(stream: Stream<some StreamHandler>) async throws -> Data {
let data = try await receiveMaybeData(stream: stream)
guard let data else {
throw ConnectionError.receiveFailed
}
return data
}

// stream close is not an error
private func receiveMaybeData(stream: Stream<some StreamHandler>) async throws -> Data? {
let lengthData = await stream.receive(count: 4)
guard let lengthData else {
return nil
}
let length = UInt32(littleEndian: lengthData.withUnsafeBytes { $0.loadUnaligned(as: UInt32.self) })
// sanity check for length
// TODO: pick better value
guard length < 1024 * 1024 * 10 else {
stream.close(abort: true)
logger.debug("Invalid request length: \(length)")
// TODO: report bad peer
throw ConnectionError.invalidLength
}
return await stream.receive(count: Int(length))
}

func presistentStreamRunLoop<Handler: StreamHandler>(
kind: Handler.PresistentHandler.StreamKind,
logger: Logger,
Expand All @@ -182,20 +195,7 @@ func presistentStreamRunLoop<Handler: StreamHandler>(
var decoder = handler.createDecoder(kind: kind)
do {
while true {
let lengthData = await stream.receive(count: 4)
guard let lengthData else {
break
}
let length = UInt32(littleEndian: lengthData.withUnsafeBytes { $0.loadUnaligned(as: UInt32.self) })
// sanity check for length
// TODO: pick better value
guard length < 1024 * 1024 * 10 else {
stream.close(abort: true)
logger.debug("Invalid message length: \(length)")
// TODO: report bad peer
return
}
let data = await stream.receive(count: Int(length))
let data = try await receiveMaybeData(stream: stream)
guard let data else {
break
}
Expand Down
76 changes: 0 additions & 76 deletions Networking/Sources/Networking/MockPeerEventHandler.swift

This file was deleted.

2 changes: 1 addition & 1 deletion Networking/Sources/Networking/Peer.swift
Original file line number Diff line number Diff line change
Expand Up @@ -342,7 +342,7 @@ private struct PeerEventHandler<Handler: StreamHandler>: QuicEventHandler {
}
}

func dataReceived(_ stream: QuicStream, data: Data) {
func dataReceived(_ stream: QuicStream, data: Data?) {
let stream = impl.streams.read { streams in
streams[stream.id]
}
Expand Down
42 changes: 37 additions & 5 deletions Networking/Sources/Networking/Stream.swift
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,16 @@ import TracingUtils
import Utils

public enum StreamStatus: Sendable {
case open, closed, aborted
// bidirection open
case open
// remote to local channel closed
case sendOnly
// local to remote channel closed
case receiveOnly
// stream completely closed
case closed
// stream aborted
case aborted
}

enum StreamError: Error {
Expand Down Expand Up @@ -69,9 +78,21 @@ final class Stream<Handler: StreamHandler>: Sendable, StreamProtocol {
try stream.send(data: data, finish: finish)
}

var canSend: Bool {
status == .open || status == .sendOnly
}

var canReceive: Bool {
status == .open || status == .receiveOnly
}

var ended: Bool {
status == .closed || status == .aborted
}

// send message with length prefix
func send(message: Data, finish: Bool = false) throws {
guard status == .open else {
guard canSend else {
throw StreamError.notOpen
}

Expand All @@ -82,13 +103,23 @@ final class Stream<Handler: StreamHandler>: Sendable, StreamProtocol {
}
try stream.send(data: lengthData, finish: false)
try stream.send(data: message, finish: finish)
if finish {
status = .receiveOnly
}
}

func received(data: Data) {
func received(data: Data?) {
guard let data else {
if !canReceive {
logger.warning("unexpected status: \(status)")
}
status = .sendOnly
channel.close()
return
}
if data.isEmpty {
return
}

if !channel.syncSend(data) {
logger.warning("stream \(id) is full")
// TODO: backpressure handling
Expand All @@ -97,7 +128,7 @@ final class Stream<Handler: StreamHandler>: Sendable, StreamProtocol {

// initiate stream close
public func close(abort: Bool = false) {
if status != .open {
if ended {
logger.warning("Trying to close stream \(id) in status \(status)")
return
}
Expand All @@ -109,6 +140,7 @@ final class Stream<Handler: StreamHandler>: Sendable, StreamProtocol {
// remote initiated close
func closed(abort: Bool = false) {
status = abort ? .aborted : .closed
channel.close()
}

func receive() async -> Data? {
Expand Down
2 changes: 2 additions & 0 deletions Networking/Tests/MsQuicSwiftTests/QuicListenerTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ struct QuicListenerTests {
let registration: QuicRegistration

init() throws {
// setupTestLogger()

registration = try QuicRegistration()
}

Expand Down
Loading

0 comments on commit 98694d8

Please sign in to comment.