From cb48bb3083c6a8016164c2df0b85a330c60118f1 Mon Sep 17 00:00:00 2001 From: jmhrpr <25673452+jmhrpr@users.noreply.github.com> Date: Tue, 28 Jan 2025 17:05:57 +0000 Subject: [PATCH] recv_many --- src/messaging.rs | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/src/messaging.rs b/src/messaging.rs index ae546f8..d1b8516 100644 --- a/src/messaging.rs +++ b/src/messaging.rs @@ -133,6 +133,7 @@ where #[async_trait::async_trait] pub trait RecvAdapter

: Send + Sync { async fn recv(&mut self) -> Result, Error>; + async fn recv_many(&mut self, limit: usize) -> Result>, Error>; } pub trait RecvPort @@ -188,6 +189,16 @@ where Ok(msg) } + + pub async fn recv_many(&mut self, limit: usize) -> Result>, Error> { + let receiver = self.receiver.as_mut().ok_or(Error::NotConnected)?; + + let msgs = receiver.recv_many(limit).await?; + + self.counter += msgs.len() as u64; + + Ok(msgs) + } } impl RecvPort for InputPort @@ -310,6 +321,14 @@ pub mod tokio { None => Err(Error::RecvError), } } + + async fn recv_many(&mut self, limit: usize) -> Result>, Error> { + let mut buffer = Vec::with_capacity(limit); + + self.0.recv_many(&mut buffer, limit).await; + + Ok(buffer) + } } pub type OutputPort

= super::OutputPort, P>;