Skip to content

Commit 4b638fa

Browse files
authored
add PendingQueue::append (#345)
which appends chunks to the back of the pending queue. Consider the following two simultaneous calls to `send_payload_data`: ``` 1. [beginning_fragment1, data2, data3, ending_fragment4] 2. [beginning_fragment5, data6, data7, ending_fragment8] ``` If `push` is used, due to individual locks, it's possible to end up with: ``` [beginning_fragment1, data2, beginning_fragment5, data3, data6, ending_fragment4, data6, data7, ending_fragment8] ``` This will result in wrong data received by the remote.
1 parent 137f11f commit 4b638fa

File tree

5 files changed

+59
-21
lines changed

5 files changed

+59
-21
lines changed

sctp/CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
* Increased minimum support rust version to `1.60.0`.
66
* Do not loose data in `PollStream::poll_write` [#341](https://github.com/webrtc-rs/webrtc/pull/341).
77
* `PollStream::poll_shutdown`: make sure to flush any writes before shutting down [#340](https://github.com/webrtc-rs/webrtc/pull/340)
8+
* Fixed a possible bug when adding chunks to pending queue [#345](https://github.com/webrtc-rs/webrtc/pull/345)
89

910
## v0.6.1
1011

sctp/src/association/association_internal.rs

Lines changed: 0 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1996,22 +1996,6 @@ impl AssociationInternal {
19961996
packets
19971997
}
19981998

1999-
/// send_payload_data sends the data chunks.
2000-
async fn send_payload_data(&mut self, chunks: Vec<ChunkPayloadData>) -> Result<()> {
2001-
let state = self.get_state();
2002-
if state != AssociationState::Established {
2003-
return Err(Error::ErrPayloadDataStateNotExist);
2004-
}
2005-
2006-
// Push the chunks into the pending queue first.
2007-
for c in chunks {
2008-
self.pending_queue.push(c).await;
2009-
}
2010-
2011-
self.awake_write_loop();
2012-
Ok(())
2013-
}
2014-
20151999
fn check_partial_reliability_status(&self, c: &ChunkPayloadData) {
20162000
if !self.use_forward_tsn {
20172001
return;

sctp/src/queue/pending_queue.rs

Lines changed: 41 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,18 +25,58 @@ impl PendingQueue {
2525
PendingQueue::default()
2626
}
2727

28+
/// Appends a chunk to the back of the pending queue.
2829
pub(crate) async fn push(&self, c: ChunkPayloadData) {
29-
self.n_bytes.fetch_add(c.user_data.len(), Ordering::SeqCst);
30+
let user_data_len = c.user_data.len();
31+
3032
if c.unordered {
3133
let mut unordered_queue = self.unordered_queue.lock().await;
3234
unordered_queue.push_back(c);
3335
} else {
3436
let mut ordered_queue = self.ordered_queue.lock().await;
3537
ordered_queue.push_back(c);
3638
}
39+
40+
self.n_bytes.fetch_add(user_data_len, Ordering::SeqCst);
3741
self.queue_len.fetch_add(1, Ordering::SeqCst);
3842
}
3943

44+
/// Appends chunks to the back of the pending queue.
45+
///
46+
/// # Panics
47+
///
48+
/// If it's a mix of unordered and ordered chunks.
49+
pub(crate) async fn append(&self, chunks: Vec<ChunkPayloadData>) {
50+
if chunks.is_empty() {
51+
return;
52+
}
53+
54+
let total_user_data_len = chunks.iter().fold(0, |acc, c| acc + c.user_data.len());
55+
let chunks_len = chunks.len();
56+
57+
let unordered = chunks
58+
.first()
59+
.expect("chunks to not be empty because of the above check")
60+
.unordered;
61+
if unordered {
62+
let mut unordered_queue = self.unordered_queue.lock().await;
63+
for c in chunks {
64+
assert!(c.unordered, "expected all chunks to be unordered");
65+
unordered_queue.push_back(c);
66+
}
67+
} else {
68+
let mut ordered_queue = self.ordered_queue.lock().await;
69+
for c in chunks {
70+
assert!(!c.unordered, "expected all chunks to be ordered");
71+
ordered_queue.push_back(c);
72+
}
73+
}
74+
75+
self.n_bytes
76+
.fetch_add(total_user_data_len, Ordering::SeqCst);
77+
self.queue_len.fetch_add(chunks_len, Ordering::SeqCst);
78+
}
79+
4080
pub(crate) async fn peek(&self) -> Option<ChunkPayloadData> {
4181
if self.selected.load(Ordering::SeqCst) {
4282
if self.unordered_is_selected.load(Ordering::SeqCst) {

sctp/src/queue/queue_test.rs

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -425,6 +425,21 @@ async fn test_pending_queue_selection_persistence() -> Result<()> {
425425
Ok(())
426426
}
427427

428+
#[tokio::test]
429+
async fn test_pending_queue_append() -> Result<()> {
430+
let pq = PendingQueue::new();
431+
pq.append(vec![
432+
make_data_chunk(0, false, NO_FRAGMENT),
433+
make_data_chunk(1, false, NO_FRAGMENT),
434+
make_data_chunk(3, false, NO_FRAGMENT),
435+
])
436+
.await;
437+
assert_eq!(30, pq.get_num_bytes(), "total bytes mismatch");
438+
assert_eq!(3, pq.len(), "len mismatch");
439+
440+
Ok(())
441+
}
442+
428443
///////////////////////////////////////////////////////////////////
429444
//reassembly_queue_test
430445
///////////////////////////////////////////////////////////////////

sctp/src/stream/mod.rs

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -488,10 +488,8 @@ impl Stream {
488488
return Err(Error::ErrPayloadDataStateNotExist);
489489
}
490490

491-
// Push the chunks into the pending queue first.
492-
for c in chunks {
493-
self.pending_queue.push(c).await;
494-
}
491+
// NOTE: append is used here instead of push in order to prevent chunks interlacing.
492+
self.pending_queue.append(chunks).await;
495493

496494
self.awake_write_loop();
497495
Ok(())

0 commit comments

Comments
 (0)