@@ -255,13 +255,14 @@ impl ActiveRelayActor {
255
255
prio_inbox,
256
256
inbox,
257
257
relay_datagrams_send,
258
- url,
258
+ url : url . clone ( ) ,
259
259
relay_client_builder,
260
260
is_home_relay : false ,
261
261
inactive_timeout : Box :: pin ( time:: sleep ( RELAY_INACTIVE_CLEANUP_TIME ) ) ,
262
262
stop_token,
263
263
metrics,
264
264
receive_queue : ReceiveQueue {
265
+ relay_url : url,
265
266
relay_datagrams_recv,
266
267
relay_disco_recv,
267
268
pending : None ,
@@ -454,6 +455,7 @@ impl ActiveRelayActor {
454
455
}
455
456
}
456
457
}
458
+ _ = self . receive_queue. forward_pending( ) , if self . receive_queue. is_pending( ) => { } ,
457
459
_ = & mut self . inactive_timeout, if !self . is_home_relay => {
458
460
debug!( ?RELAY_INACTIVE_CLEANUP_TIME , "Inactive, exiting." ) ;
459
461
break None ;
@@ -615,7 +617,11 @@ impl ActiveRelayActor {
615
617
let fut = client_sink. send_all( & mut packet_stream) ;
616
618
self . run_sending( fut, & mut state, & mut client_stream) . await ?;
617
619
}
618
- ( ) = self . receive_queue. forward_pending( ) , if self . receive_queue. is_pending( ) => { }
620
+ res = self . receive_queue. forward_pending( ) , if self . receive_queue. is_pending( ) => {
621
+ if let Err ( err) = res {
622
+ break Err ( err) ;
623
+ }
624
+ }
619
625
msg = client_stream. next( ) , if !self . receive_queue. is_pending( ) => {
620
626
let Some ( msg) = msg else {
621
627
break Err ( anyhow!( "Stream closed by server." ) ) ;
@@ -659,12 +665,11 @@ impl ActiveRelayActor {
659
665
. map ( |p| * p != remote_node_id)
660
666
. unwrap_or ( true )
661
667
{
662
- // Avoid map lookup with high throughput single peer.
668
+ // Avoid map () = self.receive_queue.forward_pending(), if self.receive_queue.is_pending() => {} lookup with high throughput single peer.
663
669
state. last_packet_src = Some ( remote_node_id) ;
664
670
state. nodes_present . insert ( remote_node_id) ;
665
671
}
666
- let packets = PacketSplitIter :: new ( self . url . clone ( ) , remote_node_id, data) ;
667
- self . receive_queue . queue_packets ( packets) ;
672
+ self . receive_queue . queue_packets ( remote_node_id, data) ;
668
673
}
669
674
ReceivedMessage :: NodeGone ( node_id) => {
670
675
state. nodes_present . remove ( & node_id) ;
@@ -747,7 +752,11 @@ impl ActiveRelayActor {
747
752
break Err ( anyhow!( "Ping timeout" ) ) ;
748
753
}
749
754
// No need to read the inbox or datagrams to send.
750
- ( ) = self . receive_queue. forward_pending( ) , if self . receive_queue. is_pending( ) => { }
755
+ res = self . receive_queue. forward_pending( ) , if self . receive_queue. is_pending( ) => {
756
+ if let Err ( err) = res {
757
+ break Err ( err) ;
758
+ }
759
+ }
751
760
msg = client_stream. next( ) , if !self . receive_queue. is_pending( ) => {
752
761
let Some ( msg) = msg else {
753
762
break Err ( anyhow!( "Stream closed by server." ) ) ;
@@ -769,6 +778,7 @@ impl ActiveRelayActor {
769
778
770
779
#[ derive( Debug ) ]
771
780
struct ReceiveQueue {
781
+ relay_url : RelayUrl ,
772
782
/// Received relay packets that could not yet be forwarded to the magicsocket.
773
783
pending : Option < PendingRecv > ,
774
784
/// Queue for received relay datagrams.
@@ -794,8 +804,19 @@ impl ReceiveQueue {
794
804
self . pending . is_some ( )
795
805
}
796
806
797
- fn queue_packets ( & mut self , packets : PacketSplitIter ) {
798
- debug_assert ! (
807
+ /// Send packets to their respective queues.
808
+ ///
809
+ /// If a queue is blocked, the packets that were not yet sent will be stored on [`Self`],
810
+ /// and [`Self::is_pending`] will return true. You then need to await [`Self::forward_pending`]
811
+ /// in a loop until [`Self::is_pending`] returns false again. Only then call [`Self::queue_packets`]
812
+ /// again. Otherwise this function will panic.
813
+ ///
814
+ /// ## Panics
815
+ ///
816
+ /// Panics if [`Self::is_pending`] returns `true`.
817
+ fn queue_packets ( & mut self , remote_node_id : NodeId , data : Bytes ) {
818
+ let packets = PacketSplitIter :: new ( self . relay_url . clone ( ) , remote_node_id, data) ;
819
+ assert ! (
799
820
!self . is_pending( ) ,
800
821
"ReceiveQueue::queue_packets may not be called if is_pending() returns true"
801
822
) ;
@@ -808,47 +829,49 @@ impl ReceiveQueue {
808
829
/// to become unblocked. It will then forward the pending items, until a queue is blocked again.
809
830
/// In that case, the remaining items will be stored and [`Self::is_pending`] returns true.
810
831
///
832
+ /// Returns an error if the queue we're blocked on is closed.
833
+ ///
811
834
/// This function is cancellation-safe: If the future is dropped at any point, all items are guaranteed
812
835
/// to either be sent into their respective queues or preserved here.
813
- async fn forward_pending ( & mut self ) {
836
+ async fn forward_pending ( & mut self ) -> Result < ( ) > {
814
837
// We take a reference onto the inner value.
815
838
// we're not `take`ing it here, because this would make the function not cancellation safe.
816
839
let Some ( ref pending) = self . pending else {
817
- return ;
840
+ return Ok ( ( ) ) ;
818
841
} ;
819
842
let disco_permit = match pending. blocked_on {
820
843
RecvPath :: Data => {
821
- std:: future:: poll_fn ( |cx| self . relay_datagrams_recv . poll_send_ready ( cx) )
822
- . await
823
- . ok ( ) ;
844
+ // The data receive queue does not have permits, so we can only wait for free slots.
845
+ self . relay_datagrams_recv . send_ready ( ) . await ?;
824
846
None
825
847
}
826
848
RecvPath :: Disco => {
827
- let Ok ( permit ) = self . relay_disco_recv . clone ( ) . reserve_owned ( ) . await else {
828
- return ;
829
- } ;
849
+ // The disco receive channel has permits, so we can reserve a permit to use afterwards
850
+ // to send at least one item.
851
+ let permit = self . relay_disco_recv . clone ( ) . reserve_owned ( ) . await ? ;
830
852
Some ( permit)
831
853
}
832
854
} ;
855
+ // We checked above that `self.pending` is not `None` so this `expect` is safe.
833
856
let packets = self
834
857
. pending
835
858
. take ( )
836
859
. expect ( "checked to be not empty" )
837
860
. packets ;
838
861
self . handle_packets ( packets, disco_permit) ;
862
+ Ok ( ( ) )
839
863
}
840
864
841
865
fn handle_packets (
842
866
& mut self ,
843
- mut packet_iter : PacketSplitIter ,
867
+ mut packets : PacketSplitIter ,
844
868
mut disco_permit : Option < OwnedPermit < RelayDiscoMessage > > ,
845
869
) {
846
- let remote_node_id = packet_iter . remote_node_id ( ) ;
847
- for datagram in & mut packet_iter {
870
+ let remote_node_id = packets . remote_node_id ( ) ;
871
+ for datagram in & mut packets {
848
872
let Ok ( datagram) = datagram else {
849
873
warn ! ( "Invalid packet split" ) ;
850
- self . pending = None ;
851
- return ;
874
+ break ;
852
875
} ;
853
876
match crate :: disco:: source_and_box_bytes ( & datagram. buf ) {
854
877
Some ( ( source, sealed_box) ) => {
@@ -866,9 +889,9 @@ impl ReceiveQueue {
866
889
permit. send ( message) ;
867
890
} else if let Err ( err) = self . relay_disco_recv . try_send ( message) {
868
891
warn ! ( "Relay disco receive queue blocked: {err}" ) ;
869
- packet_iter . push_front ( datagram) ;
892
+ packets . push_front ( datagram) ;
870
893
self . pending = Some ( PendingRecv {
871
- packets : packet_iter ,
894
+ packets,
872
895
blocked_on : RecvPath :: Disco ,
873
896
} ) ;
874
897
return ;
@@ -877,9 +900,9 @@ impl ReceiveQueue {
877
900
None => {
878
901
if let Err ( err) = self . relay_datagrams_recv . try_send ( datagram) {
879
902
warn ! ( "Relay data receive queue blocked: {err}" ) ;
880
- packet_iter . push_front ( err. into_inner ( ) ) ;
903
+ packets . push_front ( err. into_inner ( ) ) ;
881
904
self . pending = Some ( PendingRecv {
882
- packets : packet_iter ,
905
+ packets,
883
906
blocked_on : RecvPath :: Data ,
884
907
} ) ;
885
908
return ;
0 commit comments