Skip to content

Commit

Permalink
update peer test
Browse files Browse the repository at this point in the history
  • Loading branch information
MacOMNI committed Oct 25, 2024
1 parent d2c2e63 commit 7a5877d
Show file tree
Hide file tree
Showing 4 changed files with 34 additions and 34 deletions.
9 changes: 9 additions & 0 deletions Networking/Sources/MsQuicSwift/QuicStream.swift
Original file line number Diff line number Diff line change
Expand Up @@ -198,9 +198,18 @@ private class StreamHandle {
logger.warning("Stream received data but it is already gone?")
}
}
if event.pointee.RECEIVE.Flags.rawValue & QUIC_RECEIVE_FLAG_FIN.rawValue != 0 {
// maybe close function need it
if let stream {
stream.handler.dataReceived(stream, data: Data())
}
}

case QUIC_STREAM_EVENT_PEER_SEND_SHUTDOWN:
logger.trace("Peer send shutdown")
api.call { api in
_ = api.pointee.StreamShutdown(ptr, QUIC_STREAM_SHUTDOWN_FLAG_GRACEFUL, 0)
}

case QUIC_STREAM_EVENT_PEER_SEND_ABORTED:
logger.trace("Peer send aborted")
Expand Down
47 changes: 21 additions & 26 deletions Networking/Sources/Networking/Connection.swift
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ public protocol ConnectionInfoProtocol {

enum ConnectionError: Error {
case receiveFailed
case invalidLength
}

public final class Connection<Handler: StreamHandler>: Sendable, ConnectionInfoProtocol {
Expand Down Expand Up @@ -42,26 +43,30 @@ public final class Connection<Handler: StreamHandler>: Sendable, ConnectionInfoP
try? connection.shutdown(errorCode: abort ? 1 : 0) // TODO: define some error code
}

public func request(_ request: Handler.EphemeralHandler.Request) async throws -> Data {
public func decodeLength(from data: Data) throws -> UInt32 {
guard data.count >= 4 else {
throw ConnectionError.invalidLength
}
return UInt32(littleEndian: data.withUnsafeBytes { $0.loadUnaligned(as: UInt32.self) })
}

public func request(_ request: Handler.EphemeralHandler.Request) async throws -> [Data] {
let data = try request.encode()
let kind = request.kind
let stream = try createStream(kind: kind)
try stream.send(message: data)
// TODO: pipe this to decoder directly to be able to reject early
var response = Data()

var reps = [Data()]
while let nextData = await stream.receive() {
response.append(nextData)
break
}
guard response.count >= 4 else {
stream.close(abort: true)
throw ConnectionError.receiveFailed
if nextData.isEmpty { // fin flag
break
}
let length = try decodeLength(from: nextData.prefix(4))
let data = nextData.dropFirst(4).prefix(Int(length))
reps.append(data)
}
let lengthData = response.prefix(4)
let length = UInt32(
littleEndian: lengthData.withUnsafeBytes { $0.loadUnaligned(as: UInt32.self) }
)
return response.dropFirst(4).prefix(Int(length))

return reps
}

@discardableResult
Expand Down Expand Up @@ -121,7 +126,6 @@ public final class Connection<Handler: StreamHandler>: Sendable, ConnectionInfoP
return
}
if let upKind = Handler.PresistentHandler.StreamKind(rawValue: byte) {
// TODO: handle duplicated UP streams
presistentStreams.write { presistentStreams in
presistentStreams[upKind] = stream
}
Expand All @@ -134,25 +138,20 @@ public final class Connection<Handler: StreamHandler>: Sendable, ConnectionInfoP
var decoder = impl.ephemeralStreamHandler.createDecoder(kind: ceKind)

let lengthData = await stream.receive(count: 4)
guard let lengthData else {
guard let lengthData, let length = try? decodeLength(from: lengthData) else {
stream.close(abort: true)
logger.debug("Invalid request length")
return
}
let length = UInt32(littleEndian: lengthData.withUnsafeBytes { $0.loadUnaligned(as: UInt32.self) })
// sanity check for length
// TODO: pick better value
guard length < 1024 * 1024 * 10 else {
stream.close(abort: true)
logger.debug("Invalid request length: \(length)")
// TODO: report bad peer
return
}
let data = await stream.receive(count: Int(length))
guard let data else {
stream.close(abort: true)
logger.debug("Invalid request data")
// TODO: report bad peer
return
}
let request = try decoder.decode(data: data)
Expand Down Expand Up @@ -196,16 +195,12 @@ func presistentStreamRunLoop<Handler: StreamHandler>(
do {
while true {
let lengthData = await stream.receive(count: 4)
guard let lengthData else {
guard let lengthData, let length = try? connection.decodeLength(from: lengthData) else {
break
}
let length = UInt32(littleEndian: lengthData.withUnsafeBytes { $0.loadUnaligned(as: UInt32.self) })
// sanity check for length
// TODO: pick better value
guard length < 1024 * 1024 * 10 else {
stream.close(abort: true)
logger.debug("Invalid message length: \(length)")
// TODO: report bad peer
return
}
let data = await stream.receive(count: Int(length))
Expand Down
4 changes: 0 additions & 4 deletions Networking/Sources/Networking/Stream.swift
Original file line number Diff line number Diff line change
Expand Up @@ -85,10 +85,6 @@ final class Stream<Handler: StreamHandler>: Sendable, StreamProtocol {
}

func received(data: Data) {
if data.isEmpty {
return
}

if !channel.syncSend(data) {
logger.warning("stream \(id) is full")
// TODO: backpressure handling
Expand Down
8 changes: 4 additions & 4 deletions Networking/Tests/NetworkingTests/PeerTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -207,19 +207,19 @@ struct PeerTests {
)
try? await Task.sleep(for: .milliseconds(100))

let data1 = try await connection1.request(
let dataList1 = try await connection1.request(
MockRequest(kind: .typeA, data: Data("hello world".utf8))
)
#expect(data1 == Data("hello world".utf8))
#expect(dataList1.first == Data("hello world".utf8))

let connection2 = try peer2.connect(
to: NetAddr(ipAddress: "127.0.0.1", port: 8083)!, role: .validator
)
try? await Task.sleep(for: .milliseconds(100))

let data2 = try await connection2.request(
let dataList2 = try await connection2.request(
MockRequest(kind: .typeB, data: Data("I am jam".utf8))
)
#expect(data2 == Data("I am jam".utf8))
#expect(dataList2.first == Data("I am jam".utf8))
}
}

0 comments on commit 7a5877d

Please sign in to comment.