Skip to content

Commit

Permalink
Merge pull request #429 from kolserdav/stop-pc-bug
Browse files Browse the repository at this point in the history
Stop pc bug
  • Loading branch information
yngrtc authored Jun 4, 2023
2 parents c919e83 + 59c75bd commit f8a8b6f
Show file tree
Hide file tree
Showing 7 changed files with 105 additions and 54 deletions.
14 changes: 7 additions & 7 deletions webrtc/src/peer_connection/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -473,7 +473,7 @@ impl RTCPeerConnection {
None => return true, // doesn't contain a single a=msid line
};

let sender = t.sender();
let sender = t.sender().await;
// (...)or the number of MSIDs from the a=msid lines in this m= section,
// or the MSID values themselves, differ from what is in
// transceiver.sender.[[AssociatedMediaStreamIds]], return true.
Expand Down Expand Up @@ -1595,7 +1595,7 @@ impl RTCPeerConnection {
pub(crate) async fn start_rtp_senders(&self) -> Result<()> {
let current_transceivers = self.internal.rtp_transceivers.lock().await;
for transceiver in &*current_transceivers {
let sender = transceiver.sender();
let sender = transceiver.sender().await;
if sender.is_negotiated() && !sender.has_sent() {
sender.send(&sender.get_parameters().await).await?;
}
Expand Down Expand Up @@ -1653,7 +1653,7 @@ impl RTCPeerConnection {
let mut senders = vec![];
let rtp_transceivers = self.internal.rtp_transceivers.lock().await;
for transceiver in &*rtp_transceivers {
let sender = transceiver.sender();
let sender = transceiver.sender().await;
senders.push(sender);
}
senders
Expand All @@ -1664,7 +1664,7 @@ impl RTCPeerConnection {
let mut receivers = vec![];
let rtp_transceivers = self.internal.rtp_transceivers.lock().await;
for transceiver in &*rtp_transceivers {
receivers.push(transceiver.receiver());
receivers.push(transceiver.receiver().await);
}
receivers
}
Expand All @@ -1688,7 +1688,7 @@ impl RTCPeerConnection {
let rtp_transceivers = self.internal.rtp_transceivers.lock().await;
for t in &*rtp_transceivers {
if !t.stopped.load(Ordering::SeqCst) && t.kind == track.kind() {
let sender = t.sender();
let sender = t.sender().await;
if sender.track().await.is_none() {
if let Err(err) = sender.replace_track(Some(track)).await {
let _ = sender.stop().await;
Expand All @@ -1715,7 +1715,7 @@ impl RTCPeerConnection {
.add_rtp_transceiver(Arc::clone(&transceiver))
.await;

Ok(transceiver.sender())
Ok(transceiver.sender().await)
}

/// remove_track removes a Track from the PeerConnection
Expand All @@ -1728,7 +1728,7 @@ impl RTCPeerConnection {
{
let rtp_transceivers = self.internal.rtp_transceivers.lock().await;
for t in &*rtp_transceivers {
if t.sender().id == sender.id {
if t.sender().await.id == sender.id {
if sender.track().await.is_none() {
return Ok(());
}
Expand Down
22 changes: 11 additions & 11 deletions webrtc/src/peer_connection/peer_connection_internal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ impl PeerConnectionInternal {
self.undeclared_media_processor();
} else {
for t in &current_transceivers {
let receiver = t.receiver();
let receiver = t.receiver().await;
let tracks = receiver.tracks().await;
if tracks.is_empty() {
continue;
Expand Down Expand Up @@ -217,7 +217,7 @@ impl PeerConnectionInternal {
Arc::clone(&self.media_engine),
interceptor,
));
t.set_receiver(receiver);
t.set_receiver(receiver).await;
}
}

Expand Down Expand Up @@ -338,7 +338,7 @@ impl PeerConnectionInternal {
for incoming_track in incoming_tracks {
// If we already have a TrackRemote for a given SSRC don't handle it again
for t in local_transceivers {
let receiver = t.receiver();
let receiver = t.receiver().await;
for track in receiver.tracks().await {
for ssrc in &incoming_track.ssrcs {
if *ssrc == track.ssrc() {
Expand All @@ -364,7 +364,7 @@ impl PeerConnectionInternal {
continue;
}

let receiver = t.receiver();
let receiver = t.receiver().await;
if receiver.have_received().await {
continue;
}
Expand Down Expand Up @@ -667,7 +667,7 @@ impl PeerConnectionInternal {
}

// TODO: This is dubious because of rollbacks.
t.sender().set_negotiated();
t.sender().await.set_negotiated();
media_sections.push(MediaSection {
id: t.mid().unwrap().0.to_string(),
transceivers: vec![Arc::clone(t)],
Expand Down Expand Up @@ -756,7 +756,7 @@ impl PeerConnectionInternal {
}

if let Some(t) = find_by_mid(mid_value, &mut local_transceivers).await {
t.sender().set_negotiated();
t.sender().await.set_negotiated();
let media_transceivers = vec![t];

// NB: The below could use `then_some`, but with our current MSRV
Expand All @@ -781,7 +781,7 @@ impl PeerConnectionInternal {
// If we are offering also include unmatched local transceivers
if include_unmatched {
for t in &local_transceivers {
t.sender().set_negotiated();
t.sender().await.set_negotiated();
media_sections.push(MediaSection {
id: t.mid().unwrap().0.to_string(),
transceivers: vec![Arc::clone(t)],
Expand Down Expand Up @@ -887,7 +887,7 @@ impl PeerConnectionInternal {
)
.await?;

let receiver = t.receiver();
let receiver = t.receiver().await;
PeerConnectionInternal::start_receiver(
self.setting_engine.get_receive_mtu(),
&incoming,
Expand Down Expand Up @@ -1008,7 +1008,7 @@ impl PeerConnectionInternal {
continue;
}

let receiver = t.receiver();
let receiver = t.receiver().await;

if !rsid.is_empty() {
return receiver
Expand Down Expand Up @@ -1210,7 +1210,7 @@ impl PeerConnectionInternal {
}
let mut track_infos = vec![];
for transeiver in transceivers {
let receiver = transeiver.receiver();
let receiver = transeiver.receiver().await;

if let Some(mid) = transeiver.mid() {
let tracks = receiver.tracks().await;
Expand Down Expand Up @@ -1335,7 +1335,7 @@ impl PeerConnectionInternal {
}
let mut track_infos = vec![];
for transceiver in transceivers {
let sender = transceiver.sender();
let sender = transceiver.sender().await;

let mid = match transceiver.mid() {
Some(mid) => mid,
Expand Down
48 changes: 48 additions & 0 deletions webrtc/src/peer_connection/peer_connection_test.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,22 @@
use super::*;

use crate::api::interceptor_registry::register_default_interceptors;
use crate::api::media_engine::MediaEngine;
use crate::api::media_engine::MIME_TYPE_VP8;
use crate::api::APIBuilder;
use crate::ice_transport::ice_candidate_pair::RTCIceCandidatePair;
use crate::ice_transport::ice_server::RTCIceServer;
use crate::peer_connection::configuration::RTCConfiguration;
use crate::rtp_transceiver::rtp_codec::RTCRtpCodecCapability;
use crate::stats::StatsReportType;
use crate::track::track_local::track_local_static_sample::TrackLocalStaticSample;
use crate::Error;
use interceptor::registry::Registry;

use bytes::Bytes;
use media::Sample;
use std::sync::atomic::AtomicU32;
use std::sync::Arc;
use tokio::time::Duration;
use util::vnet::net::{Net, NetConfig};
use util::vnet::router::{Router, RouterConfig};
Expand Down Expand Up @@ -374,3 +382,43 @@ async fn test_get_stats() -> Result<()> {

Ok(())
}

#[tokio::test]
async fn test_peer_connection_close_is_send() -> Result<()> {
let handle = tokio::spawn(async move { peer().await });
tokio::join!(handle).0.unwrap()
}

async fn peer() -> Result<()> {
let mut m = MediaEngine::default();
m.register_default_codecs()?;
let mut registry = Registry::new();
registry = register_default_interceptors(registry, &mut m)?;
let api = APIBuilder::new()
.with_media_engine(m)
.with_interceptor_registry(registry)
.build();

let config = RTCConfiguration {
ice_servers: vec![RTCIceServer {
urls: vec!["stun:stun.l.google.com:19302".to_owned()],
..Default::default()
}],
..Default::default()
};

let peer_connection = Arc::new(api.new_peer_connection(config).await?);

let offer = peer_connection.create_offer(None).await?;
let mut gather_complete = peer_connection.gathering_complete_promise().await;
peer_connection.set_local_description(offer).await?;
let _ = gather_complete.recv().await;

if peer_connection.local_description().await.is_some() {
//TODO?
}

peer_connection.close().await?;

Ok(())
}
4 changes: 2 additions & 2 deletions webrtc/src/peer_connection/sdp/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -463,7 +463,7 @@ pub(crate) async fn add_transceiver_sdp(
}
if codecs.is_empty() {
// If we are sender and we have no codecs throw an error early
if t.sender().track().await.is_some() {
if t.sender().await.track().await.is_some() {
return Err(Error::ErrSenderWithNoCodecs);
}

Expand Down Expand Up @@ -530,7 +530,7 @@ pub(crate) async fn add_transceiver_sdp(
}

for mt in transceivers {
let sender = mt.sender();
let sender = mt.sender().await;
if let Some(track) = sender.track().await {
media = media.with_media_source(
sender.ssrc,
Expand Down
24 changes: 13 additions & 11 deletions webrtc/src/peer_connection/sdp/sdp_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -642,17 +642,19 @@ async fn test_media_description_fingerprints() -> Result<()> {
"video".to_owned(),
"webrtc-rs".to_owned(),
));
media[i].transceivers[0].set_sender(Arc::new(
RTCRtpSender::new(
api.setting_engine.get_receive_mtu(),
Some(track),
Arc::new(RTCDtlsTransport::default()),
Arc::clone(&api.media_engine),
Arc::clone(&interceptor),
false,
)
.await,
));
media[i].transceivers[0]
.set_sender(Arc::new(
RTCRtpSender::new(
api.setting_engine.get_receive_mtu(),
Some(track),
Arc::new(RTCDtlsTransport::default()),
Arc::clone(&api.media_engine),
Arc::clone(&interceptor),
false,
)
.await,
))
.await;
media[i].transceivers[0].set_direction_internal(RTCRtpTransceiverDirection::Sendonly);
}

Expand Down
Loading

0 comments on commit f8a8b6f

Please sign in to comment.