Skip to content

Commit e748612

Browse files
committed
Send notifications from the proxy in batches
1 parent 2bb4453 commit e748612

4 files changed

Lines changed: 63 additions & 30 deletions

File tree

linera-core/src/client/chain_client/mod.rs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -442,9 +442,11 @@ impl<Env: Environment> ChainClient<Env> {
442442
/// Subscribes to notifications from the specified chain.
443443
#[instrument(level = "trace")]
444444
pub fn subscribe_to(&self, chain_id: ChainId) -> Result<NotificationStream, LocalNodeError> {
445-
Ok(Box::pin(UnboundedReceiverStream::new(
446-
self.client.notifier.subscribe(vec![chain_id]),
447-
)))
445+
// Flatten the batched notifications into individual items
446+
Ok(Box::pin(
447+
UnboundedReceiverStream::new(self.client.notifier.subscribe(vec![chain_id]))
448+
.flat_map(stream::iter),
449+
))
448450
}
449451

450452
/// Returns the storage client used by this client's local node.

linera-core/src/notifier.rs

Lines changed: 37 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,9 @@ use crate::worker;
1414
/// A `Notifier` holds references to clients waiting to receive notifications
1515
/// from the validator.
1616
/// Clients will be evicted if their connections are terminated.
17+
/// Notifications are sent in batches (Vec<N>) to reduce channel operations.
1718
pub struct ChannelNotifier<N> {
18-
inner: papaya::HashMap<ChainId, Vec<UnboundedSender<N>>>,
19+
inner: papaya::HashMap<ChainId, Vec<UnboundedSender<Vec<N>>>>,
1920
}
2021

2122
impl<N> Default for ChannelNotifier<N> {
@@ -27,7 +28,7 @@ impl<N> Default for ChannelNotifier<N> {
2728
}
2829

2930
impl<N> ChannelNotifier<N> {
30-
fn add_sender(&self, chain_ids: Vec<ChainId>, sender: &UnboundedSender<N>) {
31+
fn add_sender(&self, chain_ids: Vec<ChainId>, sender: &UnboundedSender<Vec<N>>) {
3132
let pinned = self.inner.pin();
3233
for id in chain_ids {
3334
pinned.update_or_insert_with(
@@ -39,18 +40,19 @@ impl<N> ChannelNotifier<N> {
3940
}
4041

4142
/// Creates a subscription given a collection of chain IDs and a sender to the client.
42-
pub fn subscribe(&self, chain_ids: Vec<ChainId>) -> UnboundedReceiver<N> {
43+
/// Returns a receiver that yields batches of notifications (Vec<N>).
44+
pub fn subscribe(&self, chain_ids: Vec<ChainId>) -> UnboundedReceiver<Vec<N>> {
4345
let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
4446
self.add_sender(chain_ids, &tx);
4547
rx
4648
}
4749

4850
/// Creates a subscription given a collection of chain IDs and a sender to the client.
49-
/// Immediately posts a first notification as an ACK.
50-
pub fn subscribe_with_ack(&self, chain_ids: Vec<ChainId>, ack: N) -> UnboundedReceiver<N> {
51+
/// Immediately posts a first notification batch as an ACK.
52+
pub fn subscribe_with_ack(&self, chain_ids: Vec<ChainId>, ack: N) -> UnboundedReceiver<Vec<N>> {
5153
let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
5254
self.add_sender(chain_ids, &tx);
53-
tx.send(ack)
55+
tx.send(vec![ack])
5456
.expect("pushing to a new channel should succeed");
5557
rx
5658
}
@@ -61,15 +63,19 @@ where
6163
N: Clone,
6264
{
6365
/// Notifies all the clients waiting for a notification from a given chain.
64-
pub fn notify_chain(&self, chain_id: &ChainId, notification: &N) {
66+
/// Sends a batch of notifications to all subscribers of the chain.
67+
pub fn notify_chain(&self, chain_id: &ChainId, notifications: Vec<N>) {
68+
if notifications.is_empty() {
69+
return;
70+
}
6571
self.inner.pin().compute(*chain_id, |senders| {
6672
let Some((_key, senders)) = senders else {
6773
trace!("Chain {chain_id} has no subscribers.");
6874
return papaya::Operation::Abort(());
6975
};
7076
let live_senders = senders
7177
.iter()
72-
.filter(|sender| sender.send(notification.clone()).is_ok())
78+
.filter(|sender| sender.send(notifications.clone()).is_ok())
7379
.cloned()
7480
.collect::<Vec<_>>();
7581
if live_senders.is_empty() {
@@ -87,8 +93,17 @@ pub trait Notifier: Clone + Send + 'static {
8793

8894
impl Notifier for Arc<ChannelNotifier<worker::Notification>> {
8995
fn notify(&self, notifications: &[worker::Notification]) {
96+
// Group notifications by chain_id to send them in batches
97+
let mut by_chain: std::collections::HashMap<ChainId, Vec<worker::Notification>> =
98+
std::collections::HashMap::new();
9099
for notification in notifications {
91-
self.notify_chain(&notification.chain_id, notification);
100+
by_chain
101+
.entry(notification.chain_id)
102+
.or_default()
103+
.push(notification.clone());
104+
}
105+
for (chain_id, chain_notifications) in by_chain {
106+
self.notify_chain(&chain_id, chain_notifications);
92107
}
93108
}
94109
}
@@ -138,20 +153,20 @@ pub mod tests {
138153
let notifier = Arc::new(notifier);
139154

140155
std::thread::spawn(move || {
141-
while rx_a.blocking_recv().is_some() {
142-
a_rec_clone.fetch_add(1, Ordering::Relaxed);
156+
while let Some(batch) = rx_a.blocking_recv() {
157+
a_rec_clone.fetch_add(batch.len(), Ordering::Relaxed);
143158
}
144159
});
145160

146161
std::thread::spawn(move || {
147-
while rx_b.blocking_recv().is_some() {
148-
b_rec_clone.fetch_add(1, Ordering::Relaxed);
162+
while let Some(batch) = rx_b.blocking_recv() {
163+
b_rec_clone.fetch_add(batch.len(), Ordering::Relaxed);
149164
}
150165
});
151166

152167
std::thread::spawn(move || {
153-
while rx_a_b.blocking_recv().is_some() {
154-
a_b_rec_clone.fetch_add(1, Ordering::Relaxed);
168+
while let Some(batch) = rx_a_b.blocking_recv() {
169+
a_b_rec_clone.fetch_add(batch.len(), Ordering::Relaxed);
155170
}
156171
});
157172

@@ -161,13 +176,13 @@ pub mod tests {
161176
let a_notifier = notifier.clone();
162177
let handle_a = std::thread::spawn(move || {
163178
for _ in 0..NOTIFICATIONS_A {
164-
a_notifier.notify_chain(&chain_a, &());
179+
a_notifier.notify_chain(&chain_a, vec![()]);
165180
}
166181
});
167182

168183
let handle_b = std::thread::spawn(move || {
169184
for _ in 0..NOTIFICATIONS_B {
170-
notifier.notify_chain(&chain_b, &());
185+
notifier.notify_chain(&chain_b, vec![()]);
171186
}
172187
});
173188

@@ -208,22 +223,22 @@ pub mod tests {
208223
assert_eq!(notifier.inner.len(), 4);
209224

210225
rx_c.close();
211-
notifier.notify_chain(&chain_c, &());
226+
notifier.notify_chain(&chain_c, vec![()]);
212227
assert_eq!(notifier.inner.len(), 3);
213228

214229
rx_a.close();
215-
notifier.notify_chain(&chain_a, &());
230+
notifier.notify_chain(&chain_a, vec![()]);
216231
assert_eq!(notifier.inner.len(), 3);
217232

218233
rx_b.close();
219-
notifier.notify_chain(&chain_b, &());
234+
notifier.notify_chain(&chain_b, vec![()]);
220235
assert_eq!(notifier.inner.len(), 2);
221236

222-
notifier.notify_chain(&chain_a, &());
237+
notifier.notify_chain(&chain_a, vec![()]);
223238
assert_eq!(notifier.inner.len(), 1);
224239

225240
rx_d.close();
226-
notifier.notify_chain(&chain_d, &());
241+
notifier.notify_chain(&chain_d, vec![()]);
227242
assert_eq!(notifier.inner.len(), 0);
228243
}
229244
}

linera-core/src/unit_tests/test_utils.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@ use async_trait::async_trait;
1212
use futures::{
1313
future::Either,
1414
lock::{Mutex, MutexGuard},
15+
stream,
16+
stream::StreamExt as _,
1517
Future,
1618
};
1719
use linera_base::{
@@ -492,7 +494,9 @@ where
492494
) -> Result<(), Result<NotificationStream, NodeError>> {
493495
let validator = self.client.lock().await;
494496
let rx = validator.notifier.subscribe(chains);
495-
let stream: NotificationStream = Box::pin(UnboundedReceiverStream::new(rx));
497+
// Flatten the batched notifications into individual items
498+
let stream: NotificationStream =
499+
Box::pin(UnboundedReceiverStream::new(rx).flat_map(stream::iter));
496500
sender.send(Ok(stream))
497501
}
498502

linera-service/src/proxy/grpc.rs

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ use std::{
1515

1616
use anyhow::Result;
1717
use async_trait::async_trait;
18-
use futures::{future::BoxFuture, FutureExt as _};
18+
use futures::{future::BoxFuture, stream, stream::BoxStream, stream::StreamExt as _, FutureExt as _};
1919
use linera_base::identifiers::ChainId;
2020
use linera_core::{
2121
data_types::{CertificatesByHeightRequest, ChainInfo, ChainInfoQuery},
@@ -383,7 +383,7 @@ impl<S> ValidatorNode for GrpcProxy<S>
383383
where
384384
S: Storage + Clone + Send + Sync + 'static,
385385
{
386-
type SubscribeStream = UnboundedReceiverStream<Result<Notification, Status>>;
386+
type SubscribeStream = BoxStream<'static, Result<Notification, Status>>;
387387

388388
#[instrument(skip_all, err(Display), fields(method = "handle_block_proposal"))]
389389
async fn handle_block_proposal(
@@ -482,7 +482,9 @@ where
482482
.0
483483
.notifier
484484
.subscribe_with_ack(chain_ids, Ok(Notification::default()));
485-
Ok(Response::new(UnboundedReceiverStream::new(rx)))
485+
// Flatten the batched notifications into individual items for the gRPC stream
486+
let flattened = UnboundedReceiverStream::new(rx).flat_map(stream::iter);
487+
Ok(Response::new(Box::pin(flattened)))
486488
}
487489

488490
#[instrument(skip_all, err(Display))]
@@ -855,13 +857,23 @@ where
855857
&self,
856858
request: Request<NotificationBatch>,
857859
) -> Result<Response<()>, Status> {
860+
// Group notifications by chain_id to send them in batches
861+
let mut by_chain: std::collections::HashMap<
862+
ChainId,
863+
Vec<Result<Notification, Status>>,
864+
> = std::collections::HashMap::new();
865+
858866
for notification in request.into_inner().notifications {
859867
let chain_id = notification
860868
.chain_id
861869
.clone()
862870
.ok_or_else(|| Status::invalid_argument("Missing field: chain_id."))?
863871
.try_into()?;
864-
self.0.notifier.notify_chain(&chain_id, &Ok(notification));
872+
by_chain.entry(chain_id).or_default().push(Ok(notification));
873+
}
874+
875+
for (chain_id, notifications) in by_chain {
876+
self.0.notifier.notify_chain(&chain_id, notifications);
865877
}
866878
Ok(Response::new(()))
867879
}

0 commit comments

Comments
 (0)