Skip to content

[SCTP] improve payload queue push performance #365

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 9 commits into from
Jan 2, 2023
1 change: 1 addition & 0 deletions sctp/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
## Unreleased

* Performance improvements
* improve algorithm used to push to pending queue from O(n*log(n)) to O(log(n)) [#365](https://github.com/webrtc-rs/webrtc/pull/365)
* reuse as many allocations as possible when marshaling [#364](https://github.com/webrtc-rs/webrtc/pull/364)
* The lock for the internal association was contended badly because marshaling was done while still in a critical section and also tokio was scheduling tasks badly [#363](https://github.com/webrtc-rs/webrtc/pull/363)

Expand Down
54 changes: 30 additions & 24 deletions sctp/src/queue/payload_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,15 @@ use crate::chunk::chunk_payload_data::ChunkPayloadData;
use crate::chunk::chunk_selective_ack::GapAckBlock;
use crate::util::*;

use std::collections::HashMap;
use std::collections::{HashMap, VecDeque};
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;

#[derive(Default, Debug)]
pub(crate) struct PayloadQueue {
pub(crate) length: Arc<AtomicUsize>,
pub(crate) chunk_map: HashMap<u32, ChunkPayloadData>,
pub(crate) sorted: Vec<u32>,
pub(crate) sorted: VecDeque<u32>,
pub(crate) dup_tsn: Vec<u32>,
pub(crate) n_bytes: usize,
}
Expand All @@ -24,26 +24,37 @@ impl PayloadQueue {
}
}

pub(crate) fn update_sorted_keys(&mut self) {
self.sorted.sort_by(|a, b| {
if sna32lt(*a, *b) {
std::cmp::Ordering::Less
} else {
std::cmp::Ordering::Greater
}
});
}

pub(crate) fn can_push(&self, p: &ChunkPayloadData, cumulative_tsn: u32) -> bool {
!(self.chunk_map.contains_key(&p.tsn) || sna32lte(p.tsn, cumulative_tsn))
}

pub(crate) fn push_no_check(&mut self, p: ChunkPayloadData) {
let tsn = p.tsn;
self.n_bytes += p.user_data.len();
self.sorted.push(p.tsn);
self.chunk_map.insert(p.tsn, p);
self.chunk_map.insert(tsn, p);
self.length.fetch_add(1, Ordering::SeqCst);
self.update_sorted_keys();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤦‍♂️


if self.sorted.is_empty() || sna32gt(tsn, *self.sorted.back().unwrap()) {
self.sorted.push_back(tsn);
} else if sna32lt(tsn, *self.sorted.front().unwrap()) {
self.sorted.push_front(tsn);
} else {
fn compare_tsn(a: u32, b: u32) -> std::cmp::Ordering {
if sna32lt(a, b) {
std::cmp::Ordering::Less
} else {
std::cmp::Ordering::Greater
}
}
let pos = match self
.sorted
.binary_search_by(|element| compare_tsn(*element, tsn))
{
Ok(pos) => pos,
Err(pos) => pos,
};
self.sorted.insert(pos, tsn);
}
}

/// push pushes a payload data. If the payload data is already in our queue or
Expand All @@ -57,19 +68,14 @@ impl PayloadQueue {
return false;
}

self.n_bytes += p.user_data.len();
self.sorted.push(p.tsn);
self.chunk_map.insert(p.tsn, p);
self.length.fetch_add(1, Ordering::SeqCst);
self.update_sorted_keys();

self.push_no_check(p);
true
}

/// pop pops only if the oldest chunk's TSN matches the given TSN.
pub(crate) fn pop(&mut self, tsn: u32) -> Option<ChunkPayloadData> {
if !self.sorted.is_empty() && tsn == self.sorted[0] {
self.sorted.remove(0);
if Some(&tsn) == self.sorted.front() {
self.sorted.pop_front();
if let Some(c) = self.chunk_map.remove(&tsn) {
self.length.fetch_sub(1, Ordering::SeqCst);
self.n_bytes -= c.user_data.len();
Expand Down Expand Up @@ -149,7 +155,7 @@ impl PayloadQueue {
}

pub(crate) fn get_last_tsn_received(&self) -> Option<&u32> {
self.sorted.last()
self.sorted.back()
}

pub(crate) fn mark_all_to_retrasmit(&mut self) {
Expand Down