Skip to content

Commit

Permalink
update test
Browse files Browse the repository at this point in the history
  • Loading branch information
MacOMNI committed Jan 15, 2025
1 parent 9c13896 commit b1d7737
Show file tree
Hide file tree
Showing 7 changed files with 64 additions and 56 deletions.
2 changes: 1 addition & 1 deletion Networking/Sources/MsQuicSwift/QuicStream.swift
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ public final class QuicStream: Sendable {
throw QuicError.alreadyClosed
}

logger.info("\(storage.connection.id) \(id) sending \(data.count) bytes data \(data.toHexString())")
logger.debug("\(storage.connection.id) \(id) sending \(data.count) bytes data \(data.toHexString())")
let messageLength = data.count

if messageLength == 0 {
Expand Down
14 changes: 5 additions & 9 deletions Networking/Sources/Networking/Connection.swift
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ public final class Connection<Handler: StreamHandler>: Sendable, ConnectionInfoP
let data = try request.encode()
let kind = request.kind
let stream = try createStream(kind: kind)
try stream.send(message: data)
try await stream.send(message: data)

return try await receiveData(stream: stream)
}
Expand Down Expand Up @@ -234,7 +234,7 @@ public final class Connection<Handler: StreamHandler>: Sendable, ConnectionInfoP
impl.addStream(stream)
Task {
guard let byte = await stream.receiveByte() else {
logger.info("stream closed without receiving kind. status: \(stream.status)")
logger.warning("stream closed without receiving kind. status: \(stream.status)")

Check warning on line 237 in Networking/Sources/Networking/Connection.swift

View check run for this annotation

Codecov / codecov/patch

Networking/Sources/Networking/Connection.swift#L237

Added line #L237 was not covered by tests
return
}
if let upKind = Handler.PresistentHandler.StreamKind(rawValue: byte) {
Expand Down Expand Up @@ -278,11 +278,7 @@ public final class Connection<Handler: StreamHandler>: Sendable, ConnectionInfoP
let data = try await receiveData(stream: stream)
let request = try decoder.decode(data: data)
let resp = try await impl.ephemeralStreamHandler.handle(connection: self, request: request)
// logger
// .info(
// "sending addr \(remoteAddress.description) request data \(resp.toHexString()) with \(resp.count) bytes "
// )
try stream.send(message: resp, finish: true)
try await stream.send(message: resp, finish: true)
} catch {
logger.error("Failed to handle request", metadata: ["error": "\(error)"])
stream.close(abort: true)
Expand Down Expand Up @@ -322,7 +318,7 @@ private func receiveMaybeData(stream: Stream<some StreamHandler>) async throws -
// TODO: pick better value
guard length < 1024 * 1024 * 10 else {
stream.close(abort: true)
logger.info("Invalid request length: \(length)")
logger.error("Invalid request length: \(length)")
// TODO: report bad peer
throw ConnectionError.invalidLength
}
Expand Down Expand Up @@ -360,7 +356,7 @@ func presistentStreamRunLoop<Handler: StreamHandler>(
} catch {
logger
.error(
"UP stream run loop failed: \(error) from \(connection.remoteAddress) \(connection.id) \(stream.id) data \(msg.toHexString())"
"UP stream run loop failed: \(error) remote \(connection.remoteAddress) \(connection.id) \(stream.id) kind: \(kind) data \(msg.toHexString()) bytes \(msg.count)"

Check failure on line 359 in Networking/Sources/Networking/Connection.swift

View workflow job for this annotation

GitHub Actions / Swift Lint

Line Length (line_length)

Line should be 140 characters or less; currently it has 182 characters
)

Check warning on line 360 in Networking/Sources/Networking/Connection.swift

View check run for this annotation

Codecov / codecov/patch

Networking/Sources/Networking/Connection.swift#L357-L360

Added lines #L357 - L360 were not covered by tests
stream.close(abort: true)
}
Expand Down
38 changes: 21 additions & 17 deletions Networking/Sources/Networking/Peer.swift
Original file line number Diff line number Diff line change
Expand Up @@ -186,20 +186,24 @@ public final class Peer<Handler: StreamHandler>: Sendable {
}
for connection in connections {
if let stream = try? connection.createPreistentStream(kind: kind) {
let res = Result(catching: { try stream.send(message: messageData) })
switch res {
case .success:
break
case let .failure(error):
impl.logger.warning(
"Failed to send message",
metadata: [
"connectionId": "\(connection.id)",
"kind": "\(kind)",
"message": "\(messageData)",
"error": "\(error)",
]
)
Task {
let res = await Result {
try await stream.send(message: messageData)
}
switch res {
case .success:
break
case let .failure(error):
impl.logger.warning(
"Failed to send message",
metadata: [
"connectionId": "\(connection.id)",
"kind": "\(kind)",
"message": "\(messageData)",
"error": "\(error)",
]
)
}
}
}
}
Expand Down Expand Up @@ -299,7 +303,7 @@ final class PeerImpl<Handler: StreamHandler>: Sendable {
var state = reconnectStates.read { reconnectStates in
reconnectStates[address] ?? .init()
}
logger.info("reconnecting to \(address) \(state.attempt) attempts")
logger.debug("reconnecting to \(address) \(state.attempt) attempts")
guard state.attempt < maxRetryAttempts else {
logger.warning("reconnecting to \(address) exceeded max attempts")
return
Expand Down Expand Up @@ -339,7 +343,7 @@ final class PeerImpl<Handler: StreamHandler>: Sendable {
states[connection.id] ?? .init()
}

logger.info("Reopen attempt for stream \(kind) on connection \(connection.id) \(state.attempt) attempts")
logger.debug("Reopen attempt for stream \(kind) on connection \(connection.id) \(state.attempt) attempts")
guard state.attempt < maxRetryAttempts else {
logger.warning("Reopen attempt for stream \(kind) on connection \(connection.id) exceeded max attempts")
return
Expand Down Expand Up @@ -395,7 +399,7 @@ private struct PeerEventHandler<Handler: StreamHandler>: QuicEventHandler {
}
logger
.info(
"new connection: \(addr) id: \(connection.id) local addr: \(info.localAddress) remote addr: \(info.remoteAddress), role: \(role)"
"new connection: \(connection.id) local addr: \(info.localAddress) remote addr: \(info.remoteAddress), role: \(role)"
)
if impl.addConnection(connection, addr: addr, role: role) {
return .code(.success)
Expand Down
54 changes: 36 additions & 18 deletions Networking/Sources/Networking/Stream.swift
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,39 @@ public protocol StreamProtocol<Message> {

var id: UniqueId { get }
var status: StreamStatus { get }
func send(message: Message) throws
func send(message: Message) async throws
func close(abort: Bool)
}

actor StreamSender {
private let stream: QuicStream
private var status: StreamStatus

init(stream: QuicStream, status: StreamStatus) {
self.stream = stream
self.status = status
}

func send(message: Data, finish: Bool = false) throws {
guard status == .open || status == .sendOnly else {
throw StreamError.notOpen

Check warning on line 45 in Networking/Sources/Networking/Stream.swift

View check run for this annotation

Codecov / codecov/patch

Networking/Sources/Networking/Stream.swift#L45

Added line #L45 was not covered by tests
}

let length = UInt32(message.count)
var lengthData = Data(repeating: 0, count: 4)
lengthData.withUnsafeMutableBytes { ptr in
ptr.storeBytes(of: UInt32(littleEndian: length), as: UInt32.self)
}

try stream.send(data: lengthData, finish: false)
try stream.send(data: message, finish: finish)

if finish {
status = .receiveOnly
}
}
}

final class Stream<Handler: StreamHandler>: Sendable, StreamProtocol {
typealias Message = Handler.PresistentHandler.Message

Expand All @@ -41,6 +70,7 @@ final class Stream<Handler: StreamHandler>: Sendable, StreamProtocol {
private let channel: Channel<Data> = .init(capacity: 100)
private let nextData: Mutex<Data?> = .init(nil)
private let _status: ThreadSafeContainer<StreamStatus> = .init(.open)
private let sender: StreamSender
let connectionId: UniqueId
let kind: Handler.PresistentHandler.StreamKind?

Expand All @@ -63,10 +93,11 @@ final class Stream<Handler: StreamHandler>: Sendable, StreamProtocol {
self.connectionId = connectionId
self.impl = impl
self.kind = kind
sender = StreamSender(stream: stream, status: .open)
}

public func send(message: Handler.PresistentHandler.Message) throws {
try send(message: message.encode(), finish: false)
public func send(message: Handler.PresistentHandler.Message) async throws {
try await send(message: message.encode(), finish: false)
}

/// send raw data
Expand All @@ -91,21 +122,8 @@ final class Stream<Handler: StreamHandler>: Sendable, StreamProtocol {
}

// send message with length prefix
func send(message: Data, finish: Bool = false) throws {
guard canSend else {
throw StreamError.notOpen
}

let length = UInt32(message.count)
var lengthData = Data(repeating: 0, count: 4)
lengthData.withUnsafeMutableBytes { ptr in
ptr.storeBytes(of: UInt32(littleEndian: length), as: UInt32.self)
}
try stream.send(data: lengthData, finish: false)
try stream.send(data: message, finish: finish)
if finish {
status = .receiveOnly
}
func send(message: Data, finish: Bool = false) async throws {
try await sender.send(message: message, finish: finish)
}

func received(data: Data?) {
Expand Down
2 changes: 1 addition & 1 deletion Node/Sources/Node/NetworkingProtocol/NetworkManager.swift
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,7 @@ struct HandlerImpl: NetworkProtocolHandler {
heads: headsWithTimeslot
)

try stream.send(message: .blockAnnouncementHandshake(handshake))
try await stream.send(message: .blockAnnouncementHandshake(handshake))
}
}
}
6 changes: 0 additions & 6 deletions Node/Tests/NodeTests/BlockAnnouncementDecoderTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -57,10 +57,4 @@ final class BlockAnnouncementDecoderTests {
_ = try JamDecoder.decode(BlockAnnouncement.self, from: data, withConfig: config)
}
}

@Test
func decodeNotAllDataWasConsumed() throws {}

@Test
func decodeNilValue() throws {}
}
4 changes: 0 additions & 4 deletions Node/Tests/NodeTests/Topology.swift
Original file line number Diff line number Diff line change
Expand Up @@ -64,10 +64,6 @@ struct Topology {
let fromNode = ret[from].0
let toNode = ret[to].0
let conn = try fromNode.network.network.connect(to: toNode.network.network.listenAddress(), role: .validator)
try print(
"connect from \(fromNode.network.network.listenAddress().description) to \(toNode.network.network.listenAddress().description)"
)
print("connect \(conn.id) address: \(conn.remoteAddress)")
try? await conn.ready()
}
return (ret, scheduler)
Expand Down

0 comments on commit b1d7737

Please sign in to comment.