diff --git a/Networking/Sources/MsQuicSwift/QuicStream.swift b/Networking/Sources/MsQuicSwift/QuicStream.swift index 070bc50a..5a10cc3a 100644 --- a/Networking/Sources/MsQuicSwift/QuicStream.swift +++ b/Networking/Sources/MsQuicSwift/QuicStream.swift @@ -99,7 +99,7 @@ public final class QuicStream: Sendable { throw QuicError.alreadyClosed } - logger.info("\(storage.connection.id) \(id) sending \(data.count) bytes data \(data.toHexString())") + logger.debug("\(storage.connection.id) \(id) sending \(data.count) bytes data \(data.toHexString())") let messageLength = data.count if messageLength == 0 { diff --git a/Networking/Sources/Networking/Connection.swift b/Networking/Sources/Networking/Connection.swift index de96c2c2..18d702b1 100644 --- a/Networking/Sources/Networking/Connection.swift +++ b/Networking/Sources/Networking/Connection.swift @@ -176,7 +176,7 @@ public final class Connection: Sendable, ConnectionInfoP let data = try request.encode() let kind = request.kind let stream = try createStream(kind: kind) - try stream.send(message: data) + try await stream.send(message: data) return try await receiveData(stream: stream) } @@ -234,7 +234,7 @@ public final class Connection: Sendable, ConnectionInfoP impl.addStream(stream) Task { guard let byte = await stream.receiveByte() else { - logger.info("stream closed without receiving kind. status: \(stream.status)") + logger.warning("stream closed without receiving kind. status: \(stream.status)") return } if let upKind = Handler.PresistentHandler.StreamKind(rawValue: byte) { @@ -278,11 +278,7 @@ public final class Connection: Sendable, ConnectionInfoP let data = try await receiveData(stream: stream) let request = try decoder.decode(data: data) let resp = try await impl.ephemeralStreamHandler.handle(connection: self, request: request) -// logger -// .info( -// "sending addr \(remoteAddress.description) request data \(resp.toHexString()) with \(resp.count) bytes " -// ) - try stream.send(message: resp, finish: true) + try await stream.send(message: resp, finish: true) } catch { logger.error("Failed to handle request", metadata: ["error": "\(error)"]) stream.close(abort: true) @@ -322,7 +318,7 @@ private func receiveMaybeData(stream: Stream) async throws - // TODO: pick better value guard length < 1024 * 1024 * 10 else { stream.close(abort: true) - logger.info("Invalid request length: \(length)") + logger.error("Invalid request length: \(length)") // TODO: report bad peer throw ConnectionError.invalidLength } @@ -360,7 +356,7 @@ func presistentStreamRunLoop( } catch { logger .error( - "UP stream run loop failed: \(error) from \(connection.remoteAddress) \(connection.id) \(stream.id) data \(msg.toHexString())" + "UP stream run loop failed: \(error) remote \(connection.remoteAddress) \(connection.id) \(stream.id) kind: \(kind) data \(msg.toHexString()) bytes \(msg.count)" ) stream.close(abort: true) } diff --git a/Networking/Sources/Networking/Peer.swift b/Networking/Sources/Networking/Peer.swift index ef71a3fb..2ce4eb08 100644 --- a/Networking/Sources/Networking/Peer.swift +++ b/Networking/Sources/Networking/Peer.swift @@ -186,20 +186,24 @@ public final class Peer: Sendable { } for connection in connections { if let stream = try? connection.createPreistentStream(kind: kind) { - let res = Result(catching: { try stream.send(message: messageData) }) - switch res { - case .success: - break - case let .failure(error): - impl.logger.warning( - "Failed to send message", - metadata: [ - "connectionId": "\(connection.id)", - "kind": "\(kind)", - "message": "\(messageData)", - "error": "\(error)", - ] - ) + Task { + let res = await Result { + try await stream.send(message: messageData) + } + switch res { + case .success: + break + case let .failure(error): + impl.logger.warning( + "Failed to send message", + metadata: [ + "connectionId": "\(connection.id)", + "kind": "\(kind)", + "message": "\(messageData)", + "error": "\(error)", + ] + ) + } } } } @@ -299,7 +303,7 @@ final class PeerImpl: Sendable { var state = reconnectStates.read { reconnectStates in reconnectStates[address] ?? .init() } - logger.info("reconnecting to \(address) \(state.attempt) attempts") + logger.debug("reconnecting to \(address) \(state.attempt) attempts") guard state.attempt < maxRetryAttempts else { logger.warning("reconnecting to \(address) exceeded max attempts") return @@ -339,7 +343,7 @@ final class PeerImpl: Sendable { states[connection.id] ?? .init() } - logger.info("Reopen attempt for stream \(kind) on connection \(connection.id) \(state.attempt) attempts") + logger.debug("Reopen attempt for stream \(kind) on connection \(connection.id) \(state.attempt) attempts") guard state.attempt < maxRetryAttempts else { logger.warning("Reopen attempt for stream \(kind) on connection \(connection.id) exceeded max attempts") return @@ -395,7 +399,7 @@ private struct PeerEventHandler: QuicEventHandler { } logger .info( - "new connection: \(addr) id: \(connection.id) local addr: \(info.localAddress) remote addr: \(info.remoteAddress), role: \(role)" + "new connection: \(connection.id) local addr: \(info.localAddress) remote addr: \(info.remoteAddress), role: \(role)" ) if impl.addConnection(connection, addr: addr, role: role) { return .code(.success) diff --git a/Networking/Sources/Networking/Stream.swift b/Networking/Sources/Networking/Stream.swift index 90bf76fa..bf1839b7 100644 --- a/Networking/Sources/Networking/Stream.swift +++ b/Networking/Sources/Networking/Stream.swift @@ -27,10 +27,39 @@ public protocol StreamProtocol { var id: UniqueId { get } var status: StreamStatus { get } - func send(message: Message) throws + func send(message: Message) async throws func close(abort: Bool) } +actor StreamSender { + private let stream: QuicStream + private var status: StreamStatus + + init(stream: QuicStream, status: StreamStatus) { + self.stream = stream + self.status = status + } + + func send(message: Data, finish: Bool = false) throws { + guard status == .open || status == .sendOnly else { + throw StreamError.notOpen + } + + let length = UInt32(message.count) + var lengthData = Data(repeating: 0, count: 4) + lengthData.withUnsafeMutableBytes { ptr in + ptr.storeBytes(of: UInt32(littleEndian: length), as: UInt32.self) + } + + try stream.send(data: lengthData, finish: false) + try stream.send(data: message, finish: finish) + + if finish { + status = .receiveOnly + } + } +} + final class Stream: Sendable, StreamProtocol { typealias Message = Handler.PresistentHandler.Message @@ -41,6 +70,7 @@ final class Stream: Sendable, StreamProtocol { private let channel: Channel = .init(capacity: 100) private let nextData: Mutex = .init(nil) private let _status: ThreadSafeContainer = .init(.open) + private let sender: StreamSender let connectionId: UniqueId let kind: Handler.PresistentHandler.StreamKind? @@ -63,10 +93,11 @@ final class Stream: Sendable, StreamProtocol { self.connectionId = connectionId self.impl = impl self.kind = kind + sender = StreamSender(stream: stream, status: .open) } - public func send(message: Handler.PresistentHandler.Message) throws { - try send(message: message.encode(), finish: false) + public func send(message: Handler.PresistentHandler.Message) async throws { + try await send(message: message.encode(), finish: false) } /// send raw data @@ -91,21 +122,8 @@ final class Stream: Sendable, StreamProtocol { } // send message with length prefix - func send(message: Data, finish: Bool = false) throws { - guard canSend else { - throw StreamError.notOpen - } - - let length = UInt32(message.count) - var lengthData = Data(repeating: 0, count: 4) - lengthData.withUnsafeMutableBytes { ptr in - ptr.storeBytes(of: UInt32(littleEndian: length), as: UInt32.self) - } - try stream.send(data: lengthData, finish: false) - try stream.send(data: message, finish: finish) - if finish { - status = .receiveOnly - } + func send(message: Data, finish: Bool = false) async throws { + try await sender.send(message: message, finish: finish) } func received(data: Data?) { diff --git a/Node/Sources/Node/NetworkingProtocol/NetworkManager.swift b/Node/Sources/Node/NetworkingProtocol/NetworkManager.swift index bbc687bd..e534828c 100644 --- a/Node/Sources/Node/NetworkingProtocol/NetworkManager.swift +++ b/Node/Sources/Node/NetworkingProtocol/NetworkManager.swift @@ -273,7 +273,7 @@ struct HandlerImpl: NetworkProtocolHandler { heads: headsWithTimeslot ) - try stream.send(message: .blockAnnouncementHandshake(handshake)) + try await stream.send(message: .blockAnnouncementHandshake(handshake)) } } } diff --git a/Node/Tests/NodeTests/BlockAnnouncementDecoderTests.swift b/Node/Tests/NodeTests/BlockAnnouncementDecoderTests.swift index e63419c9..199078a8 100644 --- a/Node/Tests/NodeTests/BlockAnnouncementDecoderTests.swift +++ b/Node/Tests/NodeTests/BlockAnnouncementDecoderTests.swift @@ -57,10 +57,4 @@ final class BlockAnnouncementDecoderTests { _ = try JamDecoder.decode(BlockAnnouncement.self, from: data, withConfig: config) } } - - @Test - func decodeNotAllDataWasConsumed() throws {} - - @Test - func decodeNilValue() throws {} } diff --git a/Node/Tests/NodeTests/Topology.swift b/Node/Tests/NodeTests/Topology.swift index 025e6f89..7b42fb14 100644 --- a/Node/Tests/NodeTests/Topology.swift +++ b/Node/Tests/NodeTests/Topology.swift @@ -64,10 +64,6 @@ struct Topology { let fromNode = ret[from].0 let toNode = ret[to].0 let conn = try fromNode.network.network.connect(to: toNode.network.network.listenAddress(), role: .validator) - try print( - "connect from \(fromNode.network.network.listenAddress().description) to \(toNode.network.network.listenAddress().description)" - ) - print("connect \(conn.id) address: \(conn.remoteAddress)") try? await conn.ready() } return (ret, scheduler)