diff --git a/Codec/Sources/Codec/JamDecoder.swift b/Codec/Sources/Codec/JamDecoder.swift index 2e20e9de..641d54ba 100644 --- a/Codec/Sources/Codec/JamDecoder.swift +++ b/Codec/Sources/Codec/JamDecoder.swift @@ -34,6 +34,10 @@ public class JamDecoder { ) } } + + public var isAtEnd: Bool { + input.isEmpty + } } protocol ArrayWrapper: Collection where Element: Decodable { diff --git a/Node/Sources/Node/NetworkingProtocol/CommonEphemeral/CERequest.swift b/Node/Sources/Node/NetworkingProtocol/CommonEphemeral/CERequest.swift index 64992c0a..f900afae 100644 --- a/Node/Sources/Node/NetworkingProtocol/CommonEphemeral/CERequest.swift +++ b/Node/Sources/Node/NetworkingProtocol/CommonEphemeral/CERequest.swift @@ -68,4 +68,13 @@ extension CERequest: RequestProtocol { fatalError("unimplemented") } } + + static func decodeResponseForBlockRequest(data: Data, config: ProtocolConfigRef) throws -> [BlockRef] { + let decoder = JamDecoder(data: data, config: config) + var resp = [BlockRef]() + while !decoder.isAtEnd { + try resp.append(decoder.decode(BlockRef.self)) + } + return resp + } } diff --git a/Node/Sources/Node/NetworkingProtocol/Network.swift b/Node/Sources/Node/NetworkingProtocol/Network.swift index a512881e..5e3a3963 100644 --- a/Node/Sources/Node/NetworkingProtocol/Network.swift +++ b/Node/Sources/Node/NetworkingProtocol/Network.swift @@ -6,7 +6,7 @@ import TracingUtils import Utils public protocol NetworkProtocolHandler: Sendable { - func handle(ceRequest: CERequest) async throws -> (any Encodable)? + func handle(ceRequest: CERequest) async throws -> [any Encodable] func handle(connection: some ConnectionInfoProtocol, upMessage: UPMessage) async throws func handle( @@ -142,10 +142,11 @@ struct EphemeralStreamHandlerImpl: EphemeralStreamHandler { func handle(connection: any ConnectionInfoProtocol, request: Request) async throws -> Data { impl.logger.trace("handling request: \(request) from \(connection.id)") + let encoder = JamEncoder() let resp = try await impl.handler.handle(ceRequest: request) - if let resp { - return try JamEncoder.encode(resp) + for r in resp { + try encoder.encode(r) } - return Data() + return encoder.data } } diff --git a/Node/Sources/Node/NetworkingProtocol/NetworkManager.swift b/Node/Sources/Node/NetworkingProtocol/NetworkManager.swift index 45cc42a2..25be8887 100644 --- a/Node/Sources/Node/NetworkingProtocol/NetworkManager.swift +++ b/Node/Sources/Node/NetworkingProtocol/NetworkManager.swift @@ -132,7 +132,7 @@ struct HandlerImpl: NetworkProtocolHandler { let blockchain: Blockchain let peerManager: PeerManager - func handle(ceRequest: CERequest) async throws -> (any Encodable)? { + func handle(ceRequest: CERequest) async throws -> [any Encodable] { switch ceRequest { case let .blockRequest(message): let dataProvider = blockchain.dataProvider @@ -178,7 +178,7 @@ struct HandlerImpl: NetworkProtocolHandler { ] )) // TODO: rebroadcast to other peers after some time - return nil + return [] case let .safroleTicket2(message): blockchain.publish(event: RuntimeEvents.SafroleTicketsReceived( items: [ @@ -188,7 +188,7 @@ struct HandlerImpl: NetworkProtocolHandler { ), ] )) - return nil + return [] } } diff --git a/Node/Sources/Node/NetworkingProtocol/SyncManager.swift b/Node/Sources/Node/NetworkingProtocol/SyncManager.swift index 171e674a..d4e5d597 100644 --- a/Node/Sources/Node/NetworkingProtocol/SyncManager.swift +++ b/Node/Sources/Node/NetworkingProtocol/SyncManager.swift @@ -97,7 +97,7 @@ public actor SyncManager: Sendable { Task { let resp = try await network.send(to: addr, message: .blockRequest(request)) - let decoded = try JamDecoder.decode([BlockRef].self, from: resp, withConfig: blockchain.config) + let decoded = try CERequest.decodeResponseForBlockRequest(data: resp, config: blockchain.config) for block in decoded { try await blockchain.importBlock(block) }