Skip to content

Commit daaf05d

Browse files
authored
sctp: limit the bytes in the PendingQueue by using a semaphore (#367)
As discussed in #360 the pending queue can grow indefinitely if the sender writes packets faster than the association is able to transmit them. This PR solves this by enforcing a limit on the pending queue. This blocks the sender until enough space is free.
1 parent 225cec0 commit daaf05d

File tree

14 files changed

+443
-203
lines changed

14 files changed

+443
-203
lines changed

data/src/data_channel/mod.rs

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -118,7 +118,9 @@ impl DataChannel {
118118
})
119119
.marshal()?;
120120

121-
stream.write_sctp(&msg, PayloadProtocolIdentifier::Dcep)?;
121+
stream
122+
.write_sctp(&msg, PayloadProtocolIdentifier::Dcep)
123+
.await?;
122124
}
123125
Ok(DataChannel::new(stream, config))
124126
}
@@ -284,10 +286,13 @@ impl DataChannel {
284286
};
285287

286288
let n = if data_len == 0 {
287-
let _ = self.stream.write_sctp(&Bytes::from_static(&[0]), ppi)?;
289+
let _ = self
290+
.stream
291+
.write_sctp(&Bytes::from_static(&[0]), ppi)
292+
.await?;
288293
0
289294
} else {
290-
let n = self.stream.write_sctp(data, ppi)?;
295+
let n = self.stream.write_sctp(data, ppi).await?;
291296
self.bytes_sent.fetch_add(n, Ordering::SeqCst);
292297
n
293298
};
@@ -300,7 +305,8 @@ impl DataChannel {
300305
let ack = Message::DataChannelAck(DataChannelAck {}).marshal()?;
301306
Ok(self
302307
.stream
303-
.write_sctp(&ack, PayloadProtocolIdentifier::Dcep)?)
308+
.write_sctp(&ack, PayloadProtocolIdentifier::Dcep)
309+
.await?)
304310
}
305311

306312
/// Close closes the DataChannel and the underlying SCTP stream.

sctp/CHANGELOG.md

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,14 @@
22

33
## Unreleased
44

5-
* Performance improvements
6-
* improve algorithm used to push to pending queue from O(n*log(n)) to O(log(n)) [#365](https://github.com/webrtc-rs/webrtc/pull/365)
7-
* reuse as many allocations as possible when marshaling [#364](https://github.com/webrtc-rs/webrtc/pull/364)
8-
* The lock for the internal association was contended badly because marshaling was done while still in a critical section and also tokio was scheduling tasks badly [#363](https://github.com/webrtc-rs/webrtc/pull/363)
5+
* Limit the bytes in the PendingQueue to avoid packets accumulating there uncontrollably [367](https://github.com/webrtc-rs/webrtc/pull/367)
6+
* Improve algorithm used to push to pending queue from O(n*log(n)) to O(log(n)) [#365](https://github.com/webrtc-rs/webrtc/pull/365)
7+
* Reuse as many allocations as possible when marshaling [#364](https://github.com/webrtc-rs/webrtc/pull/364)
8+
* The lock for the internal association was contended badly because marshaling was done while still in a critical section and also tokio was scheduling tasks badly [#363](https://github.com/webrtc-rs/webrtc/pull/363)
9+
10+
### Breaking
11+
12+
* Make `sctp::Stream::write` & `sctp::Stream::write_sctp` async again [367](https://github.com/webrtc-rs/webrtc/pull/367)
913

1014
## v0.7.0
1115

sctp/examples/ping.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ async fn main() -> Result<(), Error> {
8383
while ping_seq_num < 10 {
8484
let ping_msg = format!("ping {}", ping_seq_num);
8585
println!("sent: {}", ping_msg);
86-
stream_tx.write(&Bytes::from(ping_msg))?;
86+
stream_tx.write(&Bytes::from(ping_msg)).await?;
8787

8888
ping_seq_num += 1;
8989
}

sctp/examples/pong.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,7 @@ async fn main() -> Result<(), Error> {
8686

8787
let pong_msg = format!("pong [{}]", ping_msg);
8888
println!("sent: {}", pong_msg);
89-
stream2.write(&Bytes::from(pong_msg))?;
89+
stream2.write(&Bytes::from(pong_msg)).await?;
9090

9191
tokio::time::sleep(Duration::from_secs(1)).await;
9292
}

sctp/examples/throughput.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -136,7 +136,7 @@ fn main() -> Result<(), Error> {
136136

137137
let mut now = tokio::time::Instant::now();
138138
let mut pkt_num = 0;
139-
while stream.write(&buf.clone().into()).is_ok() {
139+
while stream.write(&buf.clone().into()).await.is_ok() {
140140
pkt_num += 1;
141141
if now.elapsed().as_secs() == 1 {
142142
println!("Send {} pkts", pkt_num);

sctp/src/association/association_internal.rs

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -347,7 +347,7 @@ impl AssociationInternal {
347347
) -> Vec<Packet> {
348348
// Pop unsent data chunks from the pending queue to send as much as
349349
// cwnd and rwnd allow.
350-
let (chunks, sis_to_reset) = self.pop_pending_data_chunks_to_send();
350+
let (chunks, sis_to_reset) = self.pop_pending_data_chunks_to_send().await;
351351
if !chunks.is_empty() {
352352
// Start timer. (noop if already started)
353353
log::trace!("[{}] T3-rtx timer start (pt1)", self.name);
@@ -1717,7 +1717,7 @@ impl AssociationInternal {
17171717
self.handle_peer_last_tsn_and_acknowledgement(false)
17181718
}
17191719

1720-
fn send_reset_request(&mut self, stream_identifier: u16) -> Result<()> {
1720+
async fn send_reset_request(&mut self, stream_identifier: u16) -> Result<()> {
17211721
let state = self.get_state();
17221722
if state != AssociationState::Established {
17231723
return Err(Error::ErrResetPacketInStateNotExist);
@@ -1733,7 +1733,7 @@ impl AssociationInternal {
17331733
..Default::default()
17341734
};
17351735

1736-
self.pending_queue.push(c);
1736+
self.pending_queue.push(c).await;
17371737
self.awake_write_loop();
17381738

17391739
Ok(())
@@ -1798,7 +1798,7 @@ impl AssociationInternal {
17981798
}
17991799

18001800
/// Move the chunk peeked with self.pending_queue.peek() to the inflight_queue.
1801-
fn move_pending_data_chunk_to_inflight_queue(
1801+
async fn move_pending_data_chunk_to_inflight_queue(
18021802
&mut self,
18031803
beginning_fragment: bool,
18041804
unordered: bool,
@@ -1840,7 +1840,7 @@ impl AssociationInternal {
18401840

18411841
/// pop_pending_data_chunks_to_send pops chunks from the pending queues as many as
18421842
/// the cwnd and rwnd allows to send.
1843-
fn pop_pending_data_chunks_to_send(&mut self) -> (Vec<ChunkPayloadData>, Vec<u16>) {
1843+
async fn pop_pending_data_chunks_to_send(&mut self) -> (Vec<ChunkPayloadData>, Vec<u16>) {
18441844
let mut chunks = vec![];
18451845
let mut sis_to_reset = vec![]; // stream identifiers to reset
18461846

@@ -1885,8 +1885,9 @@ impl AssociationInternal {
18851885

18861886
self.rwnd -= data_len as u32;
18871887

1888-
if let Some(chunk) =
1889-
self.move_pending_data_chunk_to_inflight_queue(beginning_fragment, unordered)
1888+
if let Some(chunk) = self
1889+
.move_pending_data_chunk_to_inflight_queue(beginning_fragment, unordered)
1890+
.await
18901891
{
18911892
chunks.push(chunk);
18921893
}
@@ -1898,8 +1899,9 @@ impl AssociationInternal {
18981899
if let Some(c) = self.pending_queue.peek() {
18991900
let (beginning_fragment, unordered) = (c.beginning_fragment, c.unordered);
19001901

1901-
if let Some(chunk) =
1902-
self.move_pending_data_chunk_to_inflight_queue(beginning_fragment, unordered)
1902+
if let Some(chunk) = self
1903+
.move_pending_data_chunk_to_inflight_queue(beginning_fragment, unordered)
1904+
.await
19031905
{
19041906
chunks.push(chunk);
19051907
}

sctp/src/association/association_internal/association_internal_test.rs

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -437,8 +437,8 @@ async fn test_assoc_handle_init() -> Result<()> {
437437
Ok(())
438438
}
439439

440-
#[test]
441-
fn test_assoc_max_message_size_default() -> Result<()> {
440+
#[tokio::test]
441+
async fn test_assoc_max_message_size_default() -> Result<()> {
442442
let mut a = create_association_internal(Config {
443443
net_conn: Arc::new(DumbConn {}),
444444
max_receive_buffer_size: 0,
@@ -458,7 +458,7 @@ fn test_assoc_max_message_size_default() -> Result<()> {
458458
let p = Bytes::from(vec![0u8; 65537]);
459459
let ppi = PayloadProtocolIdentifier::from(s.default_payload_type.load(Ordering::SeqCst));
460460

461-
if let Err(err) = s.write_sctp(&p.slice(..65536), ppi) {
461+
if let Err(err) = s.write_sctp(&p.slice(..65536), ppi).await {
462462
assert_ne!(
463463
Error::ErrOutboundPacketTooLarge,
464464
err,
@@ -468,7 +468,7 @@ fn test_assoc_max_message_size_default() -> Result<()> {
468468
assert!(false, "should be error");
469469
}
470470

471-
if let Err(err) = s.write_sctp(&p.slice(..65537), ppi) {
471+
if let Err(err) = s.write_sctp(&p.slice(..65537), ppi).await {
472472
assert_eq!(
473473
Error::ErrOutboundPacketTooLarge,
474474
err,
@@ -482,8 +482,8 @@ fn test_assoc_max_message_size_default() -> Result<()> {
482482
Ok(())
483483
}
484484

485-
#[test]
486-
fn test_assoc_max_message_size_explicit() -> Result<()> {
485+
#[tokio::test]
486+
async fn test_assoc_max_message_size_explicit() -> Result<()> {
487487
let mut a = create_association_internal(Config {
488488
net_conn: Arc::new(DumbConn {}),
489489
max_receive_buffer_size: 0,
@@ -504,7 +504,7 @@ fn test_assoc_max_message_size_explicit() -> Result<()> {
504504
let p = Bytes::from(vec![0u8; 30001]);
505505
let ppi = PayloadProtocolIdentifier::from(s.default_payload_type.load(Ordering::SeqCst));
506506

507-
if let Err(err) = s.write_sctp(&p.slice(..30000), ppi) {
507+
if let Err(err) = s.write_sctp(&p.slice(..30000), ppi).await {
508508
assert_ne!(
509509
Error::ErrOutboundPacketTooLarge,
510510
err,
@@ -514,7 +514,7 @@ fn test_assoc_max_message_size_explicit() -> Result<()> {
514514
assert!(false, "should be error");
515515
}
516516

517-
if let Err(err) = s.write_sctp(&p.slice(..30001), ppi) {
517+
if let Err(err) = s.write_sctp(&p.slice(..30001), ppi).await {
518518
assert_eq!(
519519
Error::ErrOutboundPacketTooLarge,
520520
err,

0 commit comments

Comments
 (0)