@@ -7,7 +7,9 @@ use std::{
77 task:: { Context , Poll } ,
88} ;
99
10- use futures:: { channel:: mpsc, future:: BoxFuture , FutureExt as _} ;
10+ use futures:: {
11+ channel:: mpsc, future:: BoxFuture , stream:: FuturesUnordered , FutureExt as _, StreamExt as _,
12+ } ;
1113use linera_base:: {
1214 data_types:: Blob ,
1315 identifiers:: ChainId ,
@@ -52,7 +54,8 @@ mod metrics {
5254 use std:: sync:: LazyLock ;
5355
5456 use linera_base:: prometheus_util:: {
55- linear_bucket_interval, register_histogram_vec, register_int_counter_vec,
57+ exponential_bucket_interval, linear_bucket_interval, register_histogram_vec,
58+ register_int_counter_vec,
5659 } ;
5760 use prometheus:: { HistogramVec , IntCounterVec } ;
5861
@@ -118,6 +121,163 @@ mod metrics {
118121 & [ ] ,
119122 )
120123 } ) ;
124+
125+ pub static NOTIFICATION_BATCH_SIZE : LazyLock < HistogramVec > = LazyLock :: new ( || {
126+ register_histogram_vec (
127+ "notification_batch_size" ,
128+ "Number of notifications per batch sent to proxy" ,
129+ & [ ] ,
130+ exponential_bucket_interval ( 1.0 , 250.0 ) ,
131+ )
132+ } ) ;
133+
134+ pub static NOTIFICATION_BATCHES_SENT : LazyLock < IntCounterVec > = LazyLock :: new ( || {
135+ register_int_counter_vec (
136+ "notification_batches_sent" ,
137+ "Total notification batches sent" ,
138+ & [ "status" ] ,
139+ )
140+ } ) ;
141+ }
142+
143+ /// Handles batched forwarding of notifications to proxy and exporters.
144+ struct BatchForwarder {
145+ nickname : String ,
146+ client : NotifierServiceClient < Channel > ,
147+ exporter_clients : Vec < NotifierServiceClient < Channel > > ,
148+ pending_notifications : Vec < Notification > ,
149+ futures : FuturesUnordered < BoxFuture < ' static , ( ) > > ,
150+ tasks_in_flight : usize ,
151+ batch_limit : usize ,
152+ max_tasks : usize ,
153+ }
154+
155+ impl BatchForwarder {
156+ /// Spawns a single task and increments the in-flight counter.
157+ fn spawn_task < F > ( & mut self , future : F )
158+ where
159+ F : std:: future:: Future < Output = ( ) > + Send + ' static ,
160+ {
161+ self . futures . push ( future. boxed ( ) ) ;
162+ self . tasks_in_flight += 1 ;
163+ }
164+
165+ /// Waits for a task to complete and decrements the counter.
166+ /// Returns `Some(())` if a task completed, `None` if no tasks were pending.
167+ async fn wait_for_task ( & mut self ) -> Option < ( ) > {
168+ self . futures . next ( ) . await ?;
169+ self . tasks_in_flight -= 1 ;
170+ Some ( ( ) )
171+ }
172+
173+ /// Spawns batch send tasks up to max_tasks limit.
174+ fn spawn_batches ( & mut self ) {
175+ while !self . pending_notifications . is_empty ( ) && self . tasks_in_flight < self . max_tasks {
176+ let chunk_size = std:: cmp:: min ( self . batch_limit , self . pending_notifications . len ( ) ) ;
177+ let batch: Vec < Notification > = self . pending_notifications . drain ( ..chunk_size) . collect ( ) ;
178+
179+ #[ cfg( with_metrics) ]
180+ metrics:: NOTIFICATION_BATCH_SIZE
181+ . with_label_values ( & [ ] )
182+ . observe ( batch. len ( ) as f64 ) ;
183+
184+ let client = self . client . clone ( ) ;
185+ let exporter_clients = self . exporter_clients . clone ( ) ;
186+ let nickname = self . nickname . clone ( ) ;
187+
188+ self . spawn_task ( async move {
189+ Self :: send_batch ( nickname, client, exporter_clients, batch) . await ;
190+ } ) ;
191+ }
192+ }
193+
194+ /// Returns true if there are no pending notifications and no in-flight tasks.
195+ fn is_fully_drained ( & self ) -> bool {
196+ self . pending_notifications . is_empty ( ) && self . tasks_in_flight == 0
197+ }
198+
199+ /// Sends a batch of notifications to the proxy and exporters.
200+ async fn send_batch (
201+ nickname : String ,
202+ mut client : NotifierServiceClient < Channel > ,
203+ mut exporter_clients : Vec < NotifierServiceClient < Channel > > ,
204+ batch : Vec < Notification > ,
205+ ) {
206+ // Convert to proto notifications, logging any deserialization errors
207+ let mut proto_notifications = Vec :: with_capacity ( batch. len ( ) ) ;
208+ for notification in & batch {
209+ match notification. clone ( ) . try_into ( ) {
210+ Ok ( proto) => proto_notifications. push ( proto) ,
211+ Err ( error) => {
212+ warn ! (
213+ %error,
214+ nickname,
215+ ?notification. chain_id,
216+ ?notification. reason,
217+ "could not deserialize notification"
218+ ) ;
219+ }
220+ }
221+ }
222+
223+ // Collect chain_ids for error logging
224+ let chain_ids: Vec < _ > = batch. iter ( ) . map ( |n| n. chain_id ) . collect ( ) ;
225+
226+ // Send batch to proxy
227+ let request = Request :: new ( api:: NotificationBatch {
228+ notifications : proto_notifications. clone ( ) ,
229+ } ) ;
230+ let result = client. notify_batch ( request) . await ;
231+
232+ #[ cfg( with_metrics) ]
233+ {
234+ let status = if result. is_ok ( ) { "success" } else { "error" } ;
235+ metrics:: NOTIFICATION_BATCHES_SENT
236+ . with_label_values ( & [ status] )
237+ . inc ( ) ;
238+ }
239+
240+ if let Err ( error) = result {
241+ error ! (
242+ %error,
243+ nickname,
244+ batch_size = proto_notifications. len( ) ,
245+ ?chain_ids,
246+ "proxy: could not send notification batch" ,
247+ ) ;
248+ }
249+
250+ // Send NewBlock notifications to exporters
251+ let new_block_notifications: Vec < _ > = batch
252+ . iter ( )
253+ . filter ( |n| matches ! ( n. reason, Reason :: NewBlock { .. } ) )
254+ . collect ( ) ;
255+
256+ let exporter_notifications: Vec < api:: Notification > = new_block_notifications
257+ . iter ( )
258+ . filter_map ( |n| ( * n) . clone ( ) . try_into ( ) . ok ( ) )
259+ . collect ( ) ;
260+
261+ if !exporter_notifications. is_empty ( ) {
262+ let exporter_chain_ids: Vec < _ > =
263+ new_block_notifications. iter ( ) . map ( |n| n. chain_id ) . collect ( ) ;
264+
265+ for exporter_client in & mut exporter_clients {
266+ let request = Request :: new ( api:: NotificationBatch {
267+ notifications : exporter_notifications. clone ( ) ,
268+ } ) ;
269+ if let Err ( error) = exporter_client. notify_batch ( request) . await {
270+ error ! (
271+ %error,
272+ nickname,
273+ batch_size = exporter_notifications. len( ) ,
274+ ?exporter_chain_ids,
275+ "block exporter: could not send notification batch" ,
276+ ) ;
277+ }
278+ }
279+ }
280+ }
121281}
122282
123283#[ derive( Clone ) ]
@@ -253,6 +413,7 @@ where
253413 proxy. internal_address ( & internal_network. protocol ) ,
254414 exporter_addresses,
255415 receiver,
416+ notification_config. clone ( ) ,
256417 )
257418 } ) ;
258419 }
@@ -300,23 +461,24 @@ where
300461 GrpcServerHandle { handle }
301462 }
302463
303- /// Continuously waits for receiver to receive a notification which is then sent to
304- /// the proxy.
305- #[ instrument( skip( receiver) ) ]
464+ /// Continuously waits for receiver to receive notifications and sends them to
465+ /// the proxy in batches for improved throughput .
466+ #[ instrument( skip( receiver, config ) ) ]
306467 async fn forward_notifications (
307468 nickname : String ,
308469 proxy_address : String ,
309470 exporter_addresses : Vec < String > ,
310471 mut receiver : tokio:: sync:: broadcast:: Receiver < Notification > ,
472+ config : NotificationConfig ,
311473 ) {
312474 let channel = tonic:: transport:: Channel :: from_shared ( proxy_address. clone ( ) )
313475 . expect ( "Proxy URI should be valid" )
314476 . connect_lazy ( ) ;
315- let mut client = NotifierServiceClient :: new ( channel)
477+ let client = NotifierServiceClient :: new ( channel)
316478 . max_encoding_message_size ( GRPC_MAX_MESSAGE_SIZE )
317479 . max_decoding_message_size ( GRPC_MAX_MESSAGE_SIZE ) ;
318480
319- let mut exporter_clients: Vec < NotifierServiceClient < Channel > > = exporter_addresses
481+ let exporter_clients: Vec < NotifierServiceClient < Channel > > = exporter_addresses
320482 . iter ( )
321483 . map ( |address| {
322484 let channel = tonic:: transport:: Channel :: from_shared ( address. clone ( ) )
@@ -328,61 +490,62 @@ where
328490 } )
329491 . collect :: < Vec < _ > > ( ) ;
330492
331- loop {
332- let notification = match receiver. recv ( ) . await {
333- Ok ( notification) => notification,
334- Err ( RecvError :: Lagged ( skipped_count) ) => {
335- warn ! (
336- nickname,
337- skipped_count, "notification receiver lagged, messages were skipped"
338- ) ;
339- #[ cfg( with_metrics) ]
340- metrics:: NOTIFICATIONS_SKIPPED_RECEIVER_LAG
341- . with_label_values ( & [ ] )
342- . inc_by ( skipped_count) ;
343- continue ;
344- }
345- Err ( RecvError :: Closed ) => {
346- warn ! (
347- nickname,
348- "notification channel closed, exiting forwarding loop"
349- ) ;
350- break ;
351- }
352- } ;
493+ let mut forwarder = BatchForwarder {
494+ nickname : nickname. clone ( ) ,
495+ client,
496+ exporter_clients,
497+ pending_notifications : Vec :: new ( ) ,
498+ futures : FuturesUnordered :: new ( ) ,
499+ tasks_in_flight : 0 ,
500+ batch_limit : config. notification_batch_size ,
501+ max_tasks : config. notification_max_in_flight ,
502+ } ;
353503
354- let reason = & notification. reason ;
355- let chain_id = notification. chain_id ;
356- let notification: api:: Notification = match notification. clone ( ) . try_into ( ) {
357- Ok ( notification) => notification,
358- Err ( error) => {
359- warn ! ( %error, nickname, "could not deserialize notification" ) ;
360- continue ;
504+ loop {
505+ tokio:: select! {
506+ biased;
507+
508+ result = receiver. recv( ) => {
509+ match result {
510+ Ok ( notification) => {
511+ forwarder. pending_notifications. push( notification) ;
512+
513+ if forwarder. tasks_in_flight == 0
514+ || ( forwarder. pending_notifications. len( ) >= forwarder. batch_limit
515+ && forwarder. tasks_in_flight < forwarder. max_tasks) {
516+ forwarder. spawn_batches( ) ;
517+ }
518+ }
519+ Err ( RecvError :: Lagged ( skipped_count) ) => {
520+ warn!(
521+ nickname,
522+ skipped_count, "notification receiver lagged, messages were skipped"
523+ ) ;
524+ #[ cfg( with_metrics) ]
525+ metrics:: NOTIFICATIONS_SKIPPED_RECEIVER_LAG
526+ . with_label_values( & [ ] )
527+ . inc_by( skipped_count) ;
528+ }
529+ Err ( RecvError :: Closed ) => {
530+ warn!(
531+ nickname,
532+ "notification channel closed, draining pending notifications"
533+ ) ;
534+ // Drain all pending notifications before exiting
535+ loop {
536+ forwarder. spawn_batches( ) ;
537+ if forwarder. is_fully_drained( ) {
538+ break ;
539+ }
540+ forwarder. wait_for_task( ) . await ;
541+ }
542+ break ;
543+ }
544+ }
361545 }
362- } ;
363- let request = tonic:: Request :: new ( notification. clone ( ) ) ;
364- if let Err ( error) = client. notify ( request) . await {
365- error ! (
366- %error,
367- nickname,
368- ?chain_id,
369- ?reason,
370- "proxy: could not send notification" ,
371- )
372- }
373546
374- if let Reason :: NewBlock { height : _, hash : _ } = reason {
375- for exporter_client in & mut exporter_clients {
376- let request = tonic:: Request :: new ( notification. clone ( ) ) ;
377- if let Err ( error) = exporter_client. notify ( request) . await {
378- error ! (
379- %error,
380- nickname,
381- ?chain_id,
382- ?reason,
383- "block exporter: could not send notification" ,
384- )
385- }
547+ Some ( ( ) ) = forwarder. wait_for_task( ) => {
548+ forwarder. spawn_batches( ) ;
386549 }
387550 }
388551 }
0 commit comments