diff --git a/Networking/Sources/MsQuicSwift/QuicEventHandler.swift b/Networking/Sources/MsQuicSwift/QuicEventHandler.swift index 3c9e2456..6927c7ec 100644 --- a/Networking/Sources/MsQuicSwift/QuicEventHandler.swift +++ b/Networking/Sources/MsQuicSwift/QuicEventHandler.swift @@ -37,7 +37,8 @@ public protocol QuicEventHandler: Sendable { func streamStarted(_ connect: QuicConnection, stream: QuicStream) // stream events - func dataReceived(_ stream: QuicStream, data: Data) + // nil data indicate end of data stream + func dataReceived(_ stream: QuicStream, data: Data?) func closed(_ stream: QuicStream, status: QuicStatus, code: QuicErrorCode) } @@ -60,7 +61,7 @@ extension QuicEventHandler { public func streamStarted(_: QuicConnection, stream _: QuicStream) {} - public func dataReceived(_: QuicStream, data _: Data) {} + public func dataReceived(_: QuicStream, data _: Data?) {} public func closed(_: QuicStream, status _: QuicStatus, code _: QuicErrorCode) {} } @@ -73,7 +74,7 @@ public final class MockQuicEventHandler: QuicEventHandler { case shutdownInitiated(connection: QuicConnection, reason: ConnectionCloseReason) case shutdownComplete(connection: QuicConnection) case streamStarted(connection: QuicConnection, stream: QuicStream) - case dataReceived(stream: QuicStream, data: Data) + case dataReceived(stream: QuicStream, data: Data?) case closed(stream: QuicStream, status: QuicStatus, code: QuicErrorCode) } @@ -122,7 +123,7 @@ public final class MockQuicEventHandler: QuicEventHandler { } } - public func dataReceived(_ stream: QuicStream, data: Data) { + public func dataReceived(_ stream: QuicStream, data: Data?) { events.write { events in events.append(.dataReceived(stream: stream, data: data)) } diff --git a/Networking/Sources/MsQuicSwift/QuicStream.swift b/Networking/Sources/MsQuicSwift/QuicStream.swift index 563a84b4..55a88e34 100644 --- a/Networking/Sources/MsQuicSwift/QuicStream.swift +++ b/Networking/Sources/MsQuicSwift/QuicStream.swift @@ -201,9 +201,13 @@ private class StreamHandle { case QUIC_STREAM_EVENT_PEER_SEND_SHUTDOWN: logger.trace("Peer send shutdown") + if let stream { + stream.handler.dataReceived(stream, data: nil) + } case QUIC_STREAM_EVENT_PEER_SEND_ABORTED: logger.trace("Peer send aborted") + // TODO: check if we need to close the stream completely case QUIC_STREAM_EVENT_SHUTDOWN_COMPLETE: logger.trace("Stream shutdown complete") diff --git a/Networking/Sources/Networking/Connection.swift b/Networking/Sources/Networking/Connection.swift index 7ec54763..8f873f76 100644 --- a/Networking/Sources/Networking/Connection.swift +++ b/Networking/Sources/Networking/Connection.swift @@ -12,6 +12,11 @@ public protocol ConnectionInfoProtocol { var remoteAddress: NetAddr { get } } +enum ConnectionError: Error { + case receiveFailed + case invalidLength +} + public final class Connection: Sendable, ConnectionInfoProtocol { let connection: QuicConnection let impl: PeerImpl @@ -43,12 +48,8 @@ public final class Connection: Sendable, ConnectionInfoP let kind = request.kind let stream = try createStream(kind: kind) try stream.send(message: data) - // TODO: pipe this to decoder directly to be able to reject early - var response = Data() - while let nextData = await stream.receive() { - response.append(nextData) - } - return response + + return try await receiveData(stream: stream) } @discardableResult @@ -120,31 +121,15 @@ public final class Connection: Sendable, ConnectionInfoP var decoder = impl.ephemeralStreamHandler.createDecoder(kind: ceKind) - let lengthData = await stream.receive(count: 4) - guard let lengthData else { - stream.close(abort: true) - logger.debug("Invalid request length") - return - } - let length = UInt32(littleEndian: lengthData.withUnsafeBytes { $0.loadUnaligned(as: UInt32.self) }) - // sanity check for length - // TODO: pick better value - guard length < 1024 * 1024 * 10 else { + do { + let data = try await receiveData(stream: stream) + let request = try decoder.decode(data: data) + let resp = try await impl.ephemeralStreamHandler.handle(connection: self, request: request) + try stream.send(message: resp, finish: true) + } catch { + logger.debug("Failed to handle request", metadata: ["error": "\(error)"]) stream.close(abort: true) - logger.debug("Invalid request length: \(length)") - // TODO: report bad peer - return } - let data = await stream.receive(count: Int(length)) - guard let data else { - stream.close(abort: true) - logger.debug("Invalid request data") - // TODO: report bad peer - return - } - let request = try decoder.decode(data: data) - let resp = try await impl.ephemeralStreamHandler.handle(connection: self, request: request) - try stream.send(message: resp, finish: true) } } } @@ -159,6 +144,34 @@ public final class Connection: Sendable, ConnectionInfoP } } +// expect length prefixed data +// stream close is an error +private func receiveData(stream: Stream) async throws -> Data { + let data = try await receiveMaybeData(stream: stream) + guard let data else { + throw ConnectionError.receiveFailed + } + return data +} + +// stream close is not an error +private func receiveMaybeData(stream: Stream) async throws -> Data? { + let lengthData = await stream.receive(count: 4) + guard let lengthData else { + return nil + } + let length = UInt32(littleEndian: lengthData.withUnsafeBytes { $0.loadUnaligned(as: UInt32.self) }) + // sanity check for length + // TODO: pick better value + guard length < 1024 * 1024 * 10 else { + stream.close(abort: true) + logger.debug("Invalid request length: \(length)") + // TODO: report bad peer + throw ConnectionError.invalidLength + } + return await stream.receive(count: Int(length)) +} + func presistentStreamRunLoop( kind: Handler.PresistentHandler.StreamKind, logger: Logger, @@ -182,20 +195,7 @@ func presistentStreamRunLoop( var decoder = handler.createDecoder(kind: kind) do { while true { - let lengthData = await stream.receive(count: 4) - guard let lengthData else { - break - } - let length = UInt32(littleEndian: lengthData.withUnsafeBytes { $0.loadUnaligned(as: UInt32.self) }) - // sanity check for length - // TODO: pick better value - guard length < 1024 * 1024 * 10 else { - stream.close(abort: true) - logger.debug("Invalid message length: \(length)") - // TODO: report bad peer - return - } - let data = await stream.receive(count: Int(length)) + let data = try await receiveMaybeData(stream: stream) guard let data else { break } diff --git a/Networking/Sources/Networking/MockPeerEventHandler.swift b/Networking/Sources/Networking/MockPeerEventHandler.swift deleted file mode 100644 index 1c2540f8..00000000 --- a/Networking/Sources/Networking/MockPeerEventHandler.swift +++ /dev/null @@ -1,76 +0,0 @@ -import Foundation -import MsQuicSwift -import Utils - -public final class MockPeerEventHandler: QuicEventHandler { - public enum EventType { - case newConnection(listener: QuicListener, connection: QuicConnection, info: ConnectionInfo) - case shouldOpen(connection: QuicConnection, certificate: Data?) - case connected(connection: QuicConnection) - case shutdownInitiated(connection: QuicConnection, reason: ConnectionCloseReason) - case streamStarted(connection: QuicConnection, stream: QuicStream) - case dataReceived(stream: QuicStream, data: Data) - case closed(stream: QuicStream, status: QuicStatus, code: QuicErrorCode) - } - - public let events: ThreadSafeContainer<[EventType]> = .init([]) - - public init() {} - - public func newConnection( - _ listener: QuicListener, connection: QuicConnection, info: ConnectionInfo - ) -> QuicStatus { - events.write { events in - events.append(.newConnection(listener: listener, connection: connection, info: info)) - } - - return .code(.success) - } - - public func shouldOpen(_: QuicConnection, certificate: Data?) -> QuicStatus { - guard let certificate else { - return .code(.requiredCert) - } - do { - let (publicKey, alternativeName) = try parseCertificate(data: certificate, type: .x509) - if alternativeName != generateSubjectAlternativeName(pubkey: publicKey) { - return .code(.badCert) - } - } catch { - return .code(.badCert) - } - return .code(.success) - } - - public func connected(_ connection: QuicConnection) { - events.write { events in - events.append(.connected(connection: connection)) - } - } - - public func shutdownInitiated(_ connection: QuicConnection, reason: ConnectionCloseReason) { - print("shutdownInitiated \(connection.id) with reason \(reason)") - events.write { events in - events.append(.shutdownInitiated(connection: connection, reason: reason)) - } - } - - public func streamStarted(_ connect: QuicConnection, stream: QuicStream) { - events.write { events in - events.append(.streamStarted(connection: connect, stream: stream)) - } - } - - public func dataReceived(_ stream: QuicStream, data: Data) { - events.write { events in - events.append(.dataReceived(stream: stream, data: data)) - } - } - - public func closed(_ stream: QuicStream, status: QuicStatus, code: QuicErrorCode) { - print("closed stream \(stream.id) with status \(status) and code \(code)") - events.write { events in - events.append(.closed(stream: stream, status: status, code: code)) - } - } -} diff --git a/Networking/Sources/Networking/Peer.swift b/Networking/Sources/Networking/Peer.swift index 90a26e51..efd26b06 100644 --- a/Networking/Sources/Networking/Peer.swift +++ b/Networking/Sources/Networking/Peer.swift @@ -342,7 +342,7 @@ private struct PeerEventHandler: QuicEventHandler { } } - func dataReceived(_ stream: QuicStream, data: Data) { + func dataReceived(_ stream: QuicStream, data: Data?) { let stream = impl.streams.read { streams in streams[stream.id] } diff --git a/Networking/Sources/Networking/Stream.swift b/Networking/Sources/Networking/Stream.swift index 10f5a220..0a79d362 100644 --- a/Networking/Sources/Networking/Stream.swift +++ b/Networking/Sources/Networking/Stream.swift @@ -6,7 +6,16 @@ import TracingUtils import Utils public enum StreamStatus: Sendable { - case open, closed, aborted + // bidirection open + case open + // remote to local channel closed + case sendOnly + // local to remote channel closed + case receiveOnly + // stream completely closed + case closed + // stream aborted + case aborted } enum StreamError: Error { @@ -69,9 +78,21 @@ final class Stream: Sendable, StreamProtocol { try stream.send(data: data, finish: finish) } + var canSend: Bool { + status == .open || status == .sendOnly + } + + var canReceive: Bool { + status == .open || status == .receiveOnly + } + + var ended: Bool { + status == .closed || status == .aborted + } + // send message with length prefix func send(message: Data, finish: Bool = false) throws { - guard status == .open else { + guard canSend else { throw StreamError.notOpen } @@ -82,13 +103,23 @@ final class Stream: Sendable, StreamProtocol { } try stream.send(data: lengthData, finish: false) try stream.send(data: message, finish: finish) + if finish { + status = .receiveOnly + } } - func received(data: Data) { + func received(data: Data?) { + guard let data else { + if !canReceive { + logger.warning("unexpected status: \(status)") + } + status = .sendOnly + channel.close() + return + } if data.isEmpty { return } - if !channel.syncSend(data) { logger.warning("stream \(id) is full") // TODO: backpressure handling @@ -97,7 +128,7 @@ final class Stream: Sendable, StreamProtocol { // initiate stream close public func close(abort: Bool = false) { - if status != .open { + if ended { logger.warning("Trying to close stream \(id) in status \(status)") return } @@ -109,6 +140,7 @@ final class Stream: Sendable, StreamProtocol { // remote initiated close func closed(abort: Bool = false) { status = abort ? .aborted : .closed + channel.close() } func receive() async -> Data? { diff --git a/Networking/Tests/MsQuicSwiftTests/QuicListenerTests.swift b/Networking/Tests/MsQuicSwiftTests/QuicListenerTests.swift index 8e15b935..7ba8c5d4 100644 --- a/Networking/Tests/MsQuicSwiftTests/QuicListenerTests.swift +++ b/Networking/Tests/MsQuicSwiftTests/QuicListenerTests.swift @@ -35,6 +35,8 @@ struct QuicListenerTests { let registration: QuicRegistration init() throws { + // setupTestLogger() + registration = try QuicRegistration() } diff --git a/Networking/Tests/NetworkingTests/MockPeerEventTests.swift b/Networking/Tests/NetworkingTests/MockPeerEventTests.swift index 1ad9adda..8c2ef262 100644 --- a/Networking/Tests/NetworkingTests/MockPeerEventTests.swift +++ b/Networking/Tests/NetworkingTests/MockPeerEventTests.swift @@ -6,6 +6,77 @@ import Utils @testable import Networking final class MockPeerEventTests { + final class MockPeerEventHandler: QuicEventHandler { + enum EventType { + case newConnection(listener: QuicListener, connection: QuicConnection, info: ConnectionInfo) + case shouldOpen(connection: QuicConnection, certificate: Data?) + case connected(connection: QuicConnection) + case shutdownInitiated(connection: QuicConnection, reason: ConnectionCloseReason) + case streamStarted(connection: QuicConnection, stream: QuicStream) + case dataReceived(stream: QuicStream, data: Data?) + case closed(stream: QuicStream, status: QuicStatus, code: QuicErrorCode) + } + + let events: ThreadSafeContainer<[EventType]> = .init([]) + + init() {} + + func newConnection( + _ listener: QuicListener, connection: QuicConnection, info: ConnectionInfo + ) -> QuicStatus { + events.write { events in + events.append(.newConnection(listener: listener, connection: connection, info: info)) + } + + return .code(.success) + } + + func shouldOpen(_: QuicConnection, certificate: Data?) -> QuicStatus { + guard let certificate else { + return .code(.requiredCert) + } + do { + let (publicKey, alternativeName) = try parseCertificate(data: certificate, type: .x509) + if alternativeName != generateSubjectAlternativeName(pubkey: publicKey) { + return .code(.badCert) + } + } catch { + return .code(.badCert) + } + return .code(.success) + } + + func connected(_ connection: QuicConnection) { + events.write { events in + events.append(.connected(connection: connection)) + } + } + + func shutdownInitiated(_ connection: QuicConnection, reason: ConnectionCloseReason) { + events.write { events in + events.append(.shutdownInitiated(connection: connection, reason: reason)) + } + } + + func streamStarted(_ connect: QuicConnection, stream: QuicStream) { + events.write { events in + events.append(.streamStarted(connection: connect, stream: stream)) + } + } + + func dataReceived(_ stream: QuicStream, data: Data?) { + events.write { events in + events.append(.dataReceived(stream: stream, data: data)) + } + } + + func closed(_ stream: QuicStream, status: QuicStatus, code: QuicErrorCode) { + events.write { events in + events.append(.closed(stream: stream, status: status, code: code)) + } + } + } + let registration: QuicRegistration let certData: Data let badCertData: Data diff --git a/Networking/Tests/NetworkingTests/PeerTests.swift b/Networking/Tests/NetworkingTests/PeerTests.swift index 0846d22f..1161cefb 100644 --- a/Networking/Tests/NetworkingTests/PeerTests.swift +++ b/Networking/Tests/NetworkingTests/PeerTests.swift @@ -1,138 +1,225 @@ -// import Foundation -// import MsQuicSwift -// import Testing -// import Utils -// -// @testable import Networking -// -// struct PeerTests { -// struct MockMessage: MessageProtocol { -// let data: Data -// func encode() throws -> Data { -// data -// } -// } -// -// struct MockRequest: RequestProtocol { -// var kind: Kind -// var data: Data -// func encode() throws -> Data { -// data -// } -// -// typealias StreamKind = Kind -// } -// -// public enum UniquePresistentStreamKind: UInt8, StreamKindProtocol { -// case uniqueA = 0x01 -// case uniqueB = 0x02 -// case uniqueC = 0x03 -// } -// -// public enum EphemeralStreamKind: UInt8, StreamKindProtocol { -// case typeA = 0x04 -// case typeB = 0x05 -// case typeC = 0x06 -// } -// -// struct MockMessageDecoder: MessageDecoder { -// typealias Message = MockMessage -// -// func decode(data: Data) throws -> Message { -// MockMessage(data: data) -// } -// -// func finish() -> Data? { -// print("MockMessageDecoder finish") -// return nil -// } -// } -// -// struct MockEphemeralStreamHandler: EphemeralStreamHandler { -// typealias StreamKind = EphemeralStreamKind -// typealias Request = MockRequest -// -// func createDecoder(kind _: StreamKind) -> any MessageDecoder { -// return MockMessageDecoder() as! any MessageDecoder -// } -// -// // deal with data -// func handle(connection _: any ConnectionInfoProtocol, request _: Request) async throws -> Data { -// print("MockEphemeralStreamHandler handle") -// return Data() -// } -// } -// -// struct MockPresentStreamHandler: PresistentStreamHandler { -// func streamOpened( -// connection _: any Networking.ConnectionInfoProtocol, -// stream _: any Networking.StreamProtocol, kind _: PeerTests.UniquePresistentStreamKind -// ) async throws { -// print("streamOpened") -// } -// -// func handle( -// connection _: any Networking.ConnectionInfoProtocol, -// message _: PeerTests.MockRequest -// ) async throws { -// print("handle") -// } -// -// typealias StreamKind = UniquePresistentStreamKind -// typealias Request = MockRequest -// -// func createDecoder(kind _: StreamKind) -> any MessageDecoder { -// return MockMessageDecoder() as! any MessageDecoder -// } -// -// func handle(connection _: any ConnectionInfoProtocol, request _: Request) async throws -> Data { -// Data() -// } -// } -// -// struct MockStreamHandler: StreamHandler { -// typealias PresistentHandler = MockPresentStreamHandler -// -// typealias EphemeralHandler = MockEphemeralStreamHandler -// } -// -// @Test -// func peerInit() async throws { -// let peer1 = try Peer( -// options: PeerOptions( -// mode: .validator, -// listenAddress: NetAddr(ipAddress: "127.0.0.1", port: 8081)!, -// genesisHeader: Data32(), -// secretKey: Ed25519.SecretKey(from: Data32()), -// presistentStreamHandler: MockPresentStreamHandler(), -// ephemeralStreamHandler: MockEphemeralStreamHandler(), -// serverSettings: .defaultSettings, -// clientSettings: .defaultSettings -// ) -// ) -// let peer2 = try Peer( -// options: PeerOptions( -// mode: .validator, -// listenAddress: NetAddr(ipAddress: "127.0.0.1", port: 8082)!, -// genesisHeader: Data32(), -// secretKey: Ed25519.SecretKey(from: Data32()), -// presistentStreamHandler: MockPresentStreamHandler(), -// ephemeralStreamHandler: MockEphemeralStreamHandler(), -// serverSettings: .defaultSettings, -// clientSettings: .defaultSettings -// ) -// ) -// try? await Task.sleep(for: .seconds(1)) -// let connection = try peer1.connect( -// to: NetAddr(ipAddress: "127.0.0.1", port: 8082)!, mode: .validator -// ) -// try? await Task.sleep(for: .seconds(2)) -// let data = try await connection.request(MockRequest(kind: .typeA, data: Data("hello world".utf8))) -//// if let string = String(data: data, encoding: .utf8) { -//// print(string) -//// } else { -//// print("Failed to convert Data to String") -//// } -// try? await Task.sleep(for: .seconds(10)) -// } -// } +import Foundation +import MsQuicSwift +import Testing +import Utils + +@testable import Networking + +struct PeerTests { + struct MockMessage: MessageProtocol { + let data: Data + func encode() throws -> Data { + data + } + } + + struct MockRequest: RequestProtocol { + var kind: Kind + var data: Data + func encode() throws -> Data { + let length = UInt32(data.count) + var lengthData = withUnsafeBytes(of: length.littleEndian) { Data($0) } + lengthData.append(data) + return lengthData + } + + typealias StreamKind = Kind + } + + public enum UniquePresistentStreamKind: UInt8, StreamKindProtocol { + case uniqueA = 0x01 + case uniqueB = 0x02 + case uniqueC = 0x03 + } + + public enum EphemeralStreamKind: UInt8, StreamKindProtocol { + case typeA = 0x04 + case typeB = 0x05 + case typeC = 0x06 + } + + struct MockEphemeralMessageDecoder: MessageDecoder { + typealias Message = MockRequest + + var data: Data? + var kind: EphemeralStreamKind + + init(kind: EphemeralStreamKind) { + self.kind = kind + } + + mutating func decode(data: Data) throws -> MockRequest { + self.data = data + return MockRequest(kind: kind, data: data) + } + + func finish() -> Data? { + data + } + } + + struct MockUniqueMessageDecoder: MessageDecoder { + typealias Message = MockRequest + + var data: Data? + var kind: UniquePresistentStreamKind + + init(kind: UniquePresistentStreamKind) { + self.kind = kind + } + + mutating func decode(data: Data) throws -> MockRequest { + self.data = data + return MockRequest(kind: kind, data: data) + } + + func finish() -> Data? { + data + } + } + + struct MockEphemeralStreamHandler: EphemeralStreamHandler { + typealias StreamKind = EphemeralStreamKind + typealias Request = MockRequest + + func createDecoder(kind: StreamKind) -> any MessageDecoder { + MockEphemeralMessageDecoder(kind: kind) + } + + func handle(connection _: any ConnectionInfoProtocol, request: Request) async throws -> Data { + let data = request.data + guard data.count >= 4 else { + throw NSError( + domain: "ExtractError", code: 1, + userInfo: [NSLocalizedDescriptionKey: "Data too short to contain length"] + ) + } + let lengthData = data.prefix(4) + let length = UInt32( + littleEndian: lengthData.withUnsafeBytes { $0.loadUnaligned(as: UInt32.self) } + ) + let actualData = data.dropFirst(4).prefix(Int(length)) + + return actualData + } + } + + struct MockPresentStreamHandler: PresistentStreamHandler { + func streamOpened( + connection _: any Networking.ConnectionInfoProtocol, + stream _: any Networking.StreamProtocol>, + kind _: PeerTests.UniquePresistentStreamKind + ) async throws {} + + func handle( + connection _: any Networking.ConnectionInfoProtocol, + message _: PeerTests.MockRequest + ) async throws {} + + typealias StreamKind = UniquePresistentStreamKind + typealias Request = MockRequest + + func createDecoder(kind: StreamKind) -> any MessageDecoder { + MockUniqueMessageDecoder(kind: kind) + } + } + + struct MockStreamHandler: StreamHandler { + typealias PresistentHandler = MockPresentStreamHandler + + typealias EphemeralHandler = MockEphemeralStreamHandler + } + + @Test + func peerBroadcast() async throws { + let peer1 = try Peer( + options: PeerOptions( + role: .validator, + listenAddress: NetAddr(ipAddress: "127.0.0.1", port: 8081)!, + genesisHeader: Data32(), + secretKey: Ed25519.SecretKey(from: Data32()), + presistentStreamHandler: MockPresentStreamHandler(), + ephemeralStreamHandler: MockEphemeralStreamHandler(), + serverSettings: .defaultSettings, + clientSettings: .defaultSettings + ) + ) + let peer2 = try Peer( + options: PeerOptions( + role: .validator, + listenAddress: NetAddr(ipAddress: "127.0.0.1", port: 8082)!, + genesisHeader: Data32(), + secretKey: Ed25519.SecretKey(from: Data32()), + presistentStreamHandler: MockPresentStreamHandler(), + ephemeralStreamHandler: MockEphemeralStreamHandler(), + serverSettings: .defaultSettings, + clientSettings: .defaultSettings + ) + ) + try? await Task.sleep(for: .milliseconds(100)) + _ = try peer1.connect( + to: NetAddr(ipAddress: "127.0.0.1", port: 8082)!, role: .validator + ) + _ = try peer2.connect( + to: NetAddr(ipAddress: "127.0.0.1", port: 8081)!, role: .validator + ) + try? await Task.sleep(for: .milliseconds(100)) + peer1.broadcast( + kind: .uniqueA, message: .init(kind: .uniqueA, data: Data("hello world".utf8)) + ) + try? await Task.sleep(for: .milliseconds(100)) + peer2.broadcast( + kind: .uniqueB, message: .init(kind: .uniqueB, data: Data("I am jam".utf8)) + ) + try? await Task.sleep(for: .milliseconds(500)) + } + + @Test + func peerRequest() async throws { + let peer1 = try Peer( + options: PeerOptions( + role: .validator, + listenAddress: NetAddr(ipAddress: "127.0.0.1", port: 8083)!, + genesisHeader: Data32(), + secretKey: Ed25519.SecretKey(from: Data32()), + presistentStreamHandler: MockPresentStreamHandler(), + ephemeralStreamHandler: MockEphemeralStreamHandler(), + serverSettings: .defaultSettings, + clientSettings: .defaultSettings + ) + ) + let peer2 = try Peer( + options: PeerOptions( + role: .validator, + listenAddress: NetAddr(ipAddress: "127.0.0.1", port: 8084)!, + genesisHeader: Data32(), + secretKey: Ed25519.SecretKey(from: Data32()), + presistentStreamHandler: MockPresentStreamHandler(), + ephemeralStreamHandler: MockEphemeralStreamHandler(), + serverSettings: .defaultSettings, + clientSettings: .defaultSettings + ) + ) + try? await Task.sleep(for: .milliseconds(100)) + + let connection1 = try peer1.connect( + to: NetAddr(ipAddress: "127.0.0.1", port: 8084)!, role: .validator + ) + try? await Task.sleep(for: .milliseconds(100)) + + let dataList1 = try await connection1.request( + MockRequest(kind: .typeA, data: Data("hello world".utf8)) + ) + #expect(dataList1 == Data("hello world".utf8)) + + let connection2 = try peer2.connect( + to: NetAddr(ipAddress: "127.0.0.1", port: 8083)!, role: .validator + ) + try? await Task.sleep(for: .milliseconds(100)) + + let dataList2 = try await connection2.request( + MockRequest(kind: .typeB, data: Data("I am jam".utf8)) + ) + #expect(dataList2 == Data("I am jam".utf8)) + } +}