Skip to content

[SCTP] Associtation lock contention #363

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 9 commits into from
Jan 2, 2023
3 changes: 3 additions & 0 deletions sctp/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@

## Unreleased

* Performance improvements
* The lock for the internal association was contended badly because marshaling was done while still in a critical section and also tokio was scheduling tasks badly[#363](https://github.com/webrtc-rs/webrtc/pull/363)

## v0.7.0

* Increased minimum support rust version to `1.60.0`.
Expand Down
151 changes: 151 additions & 0 deletions sctp/examples/throughput.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
use clap::{App, AppSettings, Arg};
use std::io::Write;
use std::sync::Arc;
use tokio::net::UdpSocket;
use util::{conn::conn_disconnected_packet::DisconnectedPacketConn, Conn};
use webrtc_sctp::association::*;
use webrtc_sctp::chunk::chunk_payload_data::PayloadProtocolIdentifier;
use webrtc_sctp::stream::*;
use webrtc_sctp::Error;

fn main() -> Result<(), Error> {
env_logger::Builder::new()
.format(|buf, record| {
writeln!(
buf,
"{}:{} [{}] {} - {}",
record.file().unwrap_or("unknown"),
record.line().unwrap_or(0),
record.level(),
chrono::Local::now().format("%H:%M:%S.%6f"),
record.args()
)
})
.filter(None, log::LevelFilter::Warn)
.init();

let mut app = App::new("SCTP Throughput")
.version("0.1.0")
.about("An example of SCTP Server")
.setting(AppSettings::DeriveDisplayOrder)
.setting(AppSettings::SubcommandsNegateReqs)
.arg(
Arg::with_name("FULLHELP")
.help("Prints more detailed help information")
.long("fullhelp"),
)
.arg(
Arg::with_name("port")
.required_unless("FULLHELP")
.takes_value(true)
.long("port")
.help("use port ."),
);

let matches = app.clone().get_matches();

if matches.is_present("FULLHELP") {
app.print_long_help().unwrap();
std::process::exit(0);
}

let port1 = matches.value_of("port").unwrap().to_owned();
let port2 = port1.clone();

std::thread::spawn(|| {
tokio::runtime::Runtime::new()
.unwrap()
.block_on(async move {
let conn = DisconnectedPacketConn::new(Arc::new(
UdpSocket::bind(format!("127.0.0.1:{}", port1))
.await
.unwrap(),
));
println!("listening {}...", conn.local_addr().unwrap());

let config = Config {
net_conn: Arc::new(conn),
max_receive_buffer_size: 0,
max_message_size: 0,
name: "recver".to_owned(),
};
let a = Association::server(config).await?;
println!("created a server");

let stream = a.accept_stream().await.unwrap();
println!("accepted a stream");

// set unordered = true and 10ms treshold for dropping packets
stream.set_reliability_params(true, ReliabilityType::Rexmit, 0);

let mut buff = [0u8; 65535];
let mut recv = 0;
let mut pkt_num = 0;
let mut loop_num = 0;
let mut now = tokio::time::Instant::now();
while let Ok(n) = stream.read(&mut buff).await {
recv += n;
if n != 0 {
pkt_num += 1;
}
loop_num += 1;
if now.elapsed().as_secs() == 1 {
println!(
"Throughput: {} Bytes/s, {} pkts, {} loops",
recv, pkt_num, loop_num
);
now = tokio::time::Instant::now();
recv = 0;
loop_num = 0;
pkt_num = 0;
}
}
Result::<(), Error>::Ok(())
})
});

std::thread::spawn(|| {
tokio::runtime::Runtime::new()
.unwrap()
.block_on(async move {
let conn = Arc::new(UdpSocket::bind("0.0.0.0:0").await.unwrap());
conn.connect(format!("127.0.0.1:{}", port2)).await.unwrap();
println!("connecting {}..", format!("127.0.0.1:{}", port2));

let config = Config {
net_conn: conn,
max_receive_buffer_size: 0,
max_message_size: 0,
name: "sender".to_owned(),
};
let a = Association::client(config).await.unwrap();
println!("created a client");

let stream = a
.open_stream(0, PayloadProtocolIdentifier::Binary)
.await
.unwrap();
println!("opened a stream");

//const LEN: usize = 1200;
const LEN: usize = 65535;
let mut buf = Vec::with_capacity(LEN);
unsafe {
buf.set_len(LEN);
}

let mut now = tokio::time::Instant::now();
let mut pkt_num = 0;
while stream.write(&buf.clone().into()).is_ok() {
pkt_num += 1;
if now.elapsed().as_secs() == 1 {
println!("Send {} pkts", pkt_num);
now = tokio::time::Instant::now();
pkt_num = 0;
}
}
Result::<(), Error>::Ok(())
})
});
loop {}
}
128 changes: 35 additions & 93 deletions sctp/src/association/association_internal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -326,29 +326,21 @@ impl AssociationInternal {
}

self.handle_chunk_end();

Ok(())
}

fn gather_data_packets_to_retransmit(&mut self, mut raw_packets: Vec<Bytes>) -> Vec<Bytes> {
for p in &self.get_data_packets_to_retransmit() {
if let Ok(raw) = p.marshal() {
raw_packets.push(raw);
} else {
log::warn!(
"[{}] failed to serialize a DATA packet to be retransmitted",
self.name
);
}
fn gather_data_packets_to_retransmit(&mut self, mut raw_packets: Vec<Packet>) -> Vec<Packet> {
for p in self.get_data_packets_to_retransmit() {
raw_packets.push(p);
}

raw_packets
}

async fn gather_outbound_data_and_reconfig_packets(
&mut self,
mut raw_packets: Vec<Bytes>,
) -> Vec<Bytes> {
mut raw_packets: Vec<Packet>,
) -> Vec<Packet> {
// Pop unsent data chunks from the pending queue to send as much as
// cwnd and rwnd allow.
let (chunks, sis_to_reset) = self.pop_pending_data_chunks_to_send();
Expand All @@ -358,12 +350,8 @@ impl AssociationInternal {
if let Some(t3rtx) = &self.t3rtx {
t3rtx.start(self.rto_mgr.get_rto()).await;
}
for p in &self.bundle_data_chunks_into_packets(chunks) {
if let Ok(raw) = p.marshal() {
raw_packets.push(raw);
} else {
log::warn!("[{}] failed to serialize a DATA packet", self.name);
}
for p in self.bundle_data_chunks_into_packets(chunks) {
raw_packets.push(p);
}
}

Expand All @@ -377,14 +365,7 @@ impl AssociationInternal {
);
for c in self.reconfigs.values() {
let p = self.create_packet(vec![Box::new(c.clone())]);
if let Ok(raw) = p.marshal() {
raw_packets.push(raw);
} else {
log::warn!(
"[{}] failed to serialize a RECONFIG packet to be retransmitted",
self.name,
);
}
raw_packets.push(p);
}
}

Expand All @@ -411,14 +392,7 @@ impl AssociationInternal {
self.reconfigs.insert(rsn, c.clone()); // store in the map for retransmission

let p = self.create_packet(vec![Box::new(c)]);
if let Ok(raw) = p.marshal() {
raw_packets.push(raw);
} else {
log::warn!(
"[{}] failed to serialize a RECONFIG packet to be transmitted",
self.name
);
}
raw_packets.push(p);
}

if !self.reconfigs.is_empty() {
Expand All @@ -433,8 +407,8 @@ impl AssociationInternal {

fn gather_outbound_fast_retransmission_packets(
&mut self,
mut raw_packets: Vec<Bytes>,
) -> Vec<Bytes> {
mut raw_packets: Vec<Packet>,
) -> Vec<Packet> {
if self.will_retransmit_fast {
self.will_retransmit_fast = false;

Expand Down Expand Up @@ -487,36 +461,27 @@ impl AssociationInternal {
}

if !to_fast_retrans.is_empty() {
if let Ok(raw) = self.create_packet(to_fast_retrans).marshal() {
raw_packets.push(raw);
} else {
log::warn!(
"[{}] failed to serialize a DATA packet to be fast-retransmitted",
self.name
);
}
let p = self.create_packet(to_fast_retrans);
raw_packets.push(p);
}
}

raw_packets
}

async fn gather_outbound_sack_packets(&mut self, mut raw_packets: Vec<Bytes>) -> Vec<Bytes> {
async fn gather_outbound_sack_packets(&mut self, mut raw_packets: Vec<Packet>) -> Vec<Packet> {
if self.ack_state == AckState::Immediate {
self.ack_state = AckState::Idle;
let sack = self.create_selective_ack_chunk().await;
log::debug!("[{}] sending SACK: {}", self.name, sack);
if let Ok(raw) = self.create_packet(vec![Box::new(sack)]).marshal() {
raw_packets.push(raw);
} else {
log::warn!("[{}] failed to serialize a SACK packet", self.name);
}
let p = self.create_packet(vec![Box::new(sack)]);
raw_packets.push(p);
}

raw_packets
}

fn gather_outbound_forward_tsn_packets(&mut self, mut raw_packets: Vec<Bytes>) -> Vec<Bytes> {
fn gather_outbound_forward_tsn_packets(&mut self, mut raw_packets: Vec<Packet>) -> Vec<Packet> {
/*log::debug!(
"[{}] gatherOutboundForwardTSNPackets {}",
self.name,
Expand All @@ -529,11 +494,8 @@ impl AssociationInternal {
self.cumulative_tsn_ack_point,
) {
let fwd_tsn = self.create_forward_tsn();
if let Ok(raw) = self.create_packet(vec![Box::new(fwd_tsn)]).marshal() {
raw_packets.push(raw);
} else {
log::warn!("[{}] failed to serialize a Forward TSN packet", self.name);
}
let p = self.create_packet(vec![Box::new(fwd_tsn)]);
raw_packets.push(p);
}
}

Expand All @@ -542,8 +504,8 @@ impl AssociationInternal {

async fn gather_outbound_shutdown_packets(
&mut self,
mut raw_packets: Vec<Bytes>,
) -> (Vec<Bytes>, bool) {
mut raw_packets: Vec<Packet>,
) -> (Vec<Packet>, bool) {
let mut ok = true;

if self.will_send_shutdown.load(Ordering::SeqCst) {
Expand All @@ -553,62 +515,42 @@ impl AssociationInternal {
cumulative_tsn_ack: self.cumulative_tsn_ack_point,
};

if let Ok(raw) = self.create_packet(vec![Box::new(shutdown)]).marshal() {
if let Some(t2shutdown) = &self.t2shutdown {
t2shutdown.start(self.rto_mgr.get_rto()).await;
}
raw_packets.push(raw);
} else {
log::warn!("[{}] failed to serialize a Shutdown packet", self.name);
let p = self.create_packet(vec![Box::new(shutdown)]);
if let Some(t2shutdown) = &self.t2shutdown {
t2shutdown.start(self.rto_mgr.get_rto()).await;
}
raw_packets.push(p);
} else if self.will_send_shutdown_ack {
self.will_send_shutdown_ack = false;

let shutdown_ack = ChunkShutdownAck {};

if let Ok(raw) = self.create_packet(vec![Box::new(shutdown_ack)]).marshal() {
if let Some(t2shutdown) = &self.t2shutdown {
t2shutdown.start(self.rto_mgr.get_rto()).await;
}
raw_packets.push(raw);
} else {
log::warn!("[{}] failed to serialize a ShutdownAck packet", self.name);
let p = self.create_packet(vec![Box::new(shutdown_ack)]);
if let Some(t2shutdown) = &self.t2shutdown {
t2shutdown.start(self.rto_mgr.get_rto()).await;
}
raw_packets.push(p);
} else if self.will_send_shutdown_complete {
self.will_send_shutdown_complete = false;

let shutdown_complete = ChunkShutdownComplete {};
ok = false;
let p = self.create_packet(vec![Box::new(shutdown_complete)]);

if let Ok(raw) = self
.create_packet(vec![Box::new(shutdown_complete)])
.marshal()
{
raw_packets.push(raw);
ok = false;
} else {
log::warn!(
"[{}] failed to serialize a ShutdownComplete packet",
self.name
);
}
raw_packets.push(p);
}

(raw_packets, ok)
}

/// gather_outbound gathers outgoing packets. The returned bool value set to
/// false means the association should be closed down after the final send.
pub(crate) async fn gather_outbound(&mut self) -> (Vec<Bytes>, bool) {
let mut raw_packets = vec![];
pub(crate) async fn gather_outbound(&mut self) -> (Vec<Packet>, bool) {
let mut raw_packets = Vec::with_capacity(16);

if !self.control_queue.is_empty() {
for p in self.control_queue.drain(..) {
if let Ok(raw) = p.marshal() {
raw_packets.push(raw);
} else {
log::warn!("[{}] failed to serialize a control packet", self.name);
continue;
}
raw_packets.push(p);
}
}

Expand Down
Loading