diff --git a/sctp/CHANGELOG.md b/sctp/CHANGELOG.md index 4507d2822..f02ebbf3c 100644 --- a/sctp/CHANGELOG.md +++ b/sctp/CHANGELOG.md @@ -3,6 +3,7 @@ ## Unreleased * Performance improvements + * improve algorithm used to push to pending queue from O(n*log(n)) to O(log(n)) [#365](https://github.com/webrtc-rs/webrtc/pull/365) * reuse as many allocations as possible when marshaling [#364](https://github.com/webrtc-rs/webrtc/pull/364) * The lock for the internal association was contended badly because marshaling was done while still in a critical section and also tokio was scheduling tasks badly [#363](https://github.com/webrtc-rs/webrtc/pull/363) diff --git a/sctp/src/queue/payload_queue.rs b/sctp/src/queue/payload_queue.rs index c055abc57..54e1edc71 100644 --- a/sctp/src/queue/payload_queue.rs +++ b/sctp/src/queue/payload_queue.rs @@ -2,7 +2,7 @@ use crate::chunk::chunk_payload_data::ChunkPayloadData; use crate::chunk::chunk_selective_ack::GapAckBlock; use crate::util::*; -use std::collections::HashMap; +use std::collections::{HashMap, VecDeque}; use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::Arc; @@ -10,7 +10,7 @@ use std::sync::Arc; pub(crate) struct PayloadQueue { pub(crate) length: Arc, pub(crate) chunk_map: HashMap, - pub(crate) sorted: Vec, + pub(crate) sorted: VecDeque, pub(crate) dup_tsn: Vec, pub(crate) n_bytes: usize, } @@ -24,26 +24,37 @@ impl PayloadQueue { } } - pub(crate) fn update_sorted_keys(&mut self) { - self.sorted.sort_by(|a, b| { - if sna32lt(*a, *b) { - std::cmp::Ordering::Less - } else { - std::cmp::Ordering::Greater - } - }); - } - pub(crate) fn can_push(&self, p: &ChunkPayloadData, cumulative_tsn: u32) -> bool { !(self.chunk_map.contains_key(&p.tsn) || sna32lte(p.tsn, cumulative_tsn)) } pub(crate) fn push_no_check(&mut self, p: ChunkPayloadData) { + let tsn = p.tsn; self.n_bytes += p.user_data.len(); - self.sorted.push(p.tsn); - self.chunk_map.insert(p.tsn, p); + self.chunk_map.insert(tsn, p); self.length.fetch_add(1, Ordering::SeqCst); - self.update_sorted_keys(); + + if self.sorted.is_empty() || sna32gt(tsn, *self.sorted.back().unwrap()) { + self.sorted.push_back(tsn); + } else if sna32lt(tsn, *self.sorted.front().unwrap()) { + self.sorted.push_front(tsn); + } else { + fn compare_tsn(a: u32, b: u32) -> std::cmp::Ordering { + if sna32lt(a, b) { + std::cmp::Ordering::Less + } else { + std::cmp::Ordering::Greater + } + } + let pos = match self + .sorted + .binary_search_by(|element| compare_tsn(*element, tsn)) + { + Ok(pos) => pos, + Err(pos) => pos, + }; + self.sorted.insert(pos, tsn); + } } /// push pushes a payload data. If the payload data is already in our queue or @@ -57,19 +68,14 @@ impl PayloadQueue { return false; } - self.n_bytes += p.user_data.len(); - self.sorted.push(p.tsn); - self.chunk_map.insert(p.tsn, p); - self.length.fetch_add(1, Ordering::SeqCst); - self.update_sorted_keys(); - + self.push_no_check(p); true } /// pop pops only if the oldest chunk's TSN matches the given TSN. pub(crate) fn pop(&mut self, tsn: u32) -> Option { - if !self.sorted.is_empty() && tsn == self.sorted[0] { - self.sorted.remove(0); + if Some(&tsn) == self.sorted.front() { + self.sorted.pop_front(); if let Some(c) = self.chunk_map.remove(&tsn) { self.length.fetch_sub(1, Ordering::SeqCst); self.n_bytes -= c.user_data.len(); @@ -149,7 +155,7 @@ impl PayloadQueue { } pub(crate) fn get_last_tsn_received(&self) -> Option<&u32> { - self.sorted.last() + self.sorted.back() } pub(crate) fn mark_all_to_retrasmit(&mut self) {