From 974e9ec3e9919e235abe7b152914fc03db535da8 Mon Sep 17 00:00:00 2001 From: Haakon Sporsheim Date: Fri, 2 Aug 2024 16:19:27 +0200 Subject: [PATCH] peer_connection: Initialize PeerConnectionInternal correctly PeerConnectionInternal was initialized with default transports and then mutated right after. This resulted in create_ice_transport() using the default DTLS transport instance which again led to state transition reading DTLS transport state from the wrong instance. --- .../peer_connection_internal.rs | 161 +++++++++--------- 1 file changed, 79 insertions(+), 82 deletions(-) diff --git a/webrtc/src/peer_connection/peer_connection_internal.rs b/webrtc/src/peer_connection/peer_connection_internal.rs index 7a89b3e62..6f877bb9f 100644 --- a/webrtc/src/peer_connection/peer_connection_internal.rs +++ b/webrtc/src/peer_connection/peer_connection_internal.rs @@ -77,7 +77,24 @@ impl PeerConnectionInternal { stats_interceptor: Arc, mut configuration: RTCConfiguration, ) -> Result<(Arc, RTCConfiguration)> { - let mut pc = PeerConnectionInternal { + // Create the ice gatherer + let ice_gatherer = Arc::new(api.new_ice_gatherer(RTCIceGatherOptions { + ice_servers: configuration.get_ice_servers(), + ice_gather_policy: configuration.ice_transport_policy, + })?); + + // Create the ICE transport + let ice_transport = Arc::new(api.new_ice_transport(Arc::clone(&ice_gatherer))); + + // Create the DTLS transport + let certificates = configuration.certificates.drain(..).collect(); + let dtls_transport = + Arc::new(api.new_dtls_transport(Arc::clone(&ice_transport), certificates)?); + + // Create the SCTP transport + let sctp_transport = Arc::new(api.new_sctp_transport(Arc::clone(&dtls_transport))?); + + let pc = Arc::new(PeerConnectionInternal { greater_mid: AtomicIsize::new(-1), sdp_origin: Mutex::new(Default::default()), last_offer: Mutex::new("".to_owned()), @@ -89,16 +106,16 @@ impl PeerConnectionInternal { is_negotiation_needed: Arc::new(AtomicBool::new(false)), negotiation_needed_state: Arc::new(AtomicU8::new(NegotiationNeededState::Empty as u8)), signaling_state: Arc::new(AtomicU8::new(RTCSignalingState::Stable as u8)), - ice_transport: Arc::new(Default::default()), - dtls_transport: Arc::new(Default::default()), + ice_transport, + dtls_transport, ice_connection_state: Arc::new(AtomicU8::new(RTCIceConnectionState::New as u8)), - sctp_transport: Arc::new(Default::default()), + sctp_transport, rtp_transceivers: Arc::new(Default::default()), on_track_handler: Arc::new(ArcSwapOption::empty()), on_signaling_state_change_handler: ArcSwapOption::empty(), on_ice_connection_state_change_handler: Arc::new(ArcSwapOption::empty()), on_data_channel_handler: Arc::new(Default::default()), - ice_gatherer: Arc::new(Default::default()), + ice_gatherer, current_local_description: Arc::new(Default::default()), current_remote_description: Arc::new(Default::default()), pending_local_description: Arc::new(Default::default()), @@ -114,39 +131,77 @@ impl PeerConnectionInternal { stats_interceptor, on_peer_connection_state_change_handler: Arc::new(ArcSwapOption::empty()), pending_remote_description: Arc::new(Default::default()), - }; - - // Create the ice gatherer - pc.ice_gatherer = Arc::new(api.new_ice_gatherer(RTCIceGatherOptions { - ice_servers: configuration.get_ice_servers(), - ice_gather_policy: configuration.ice_transport_policy, - })?); - - // Create the ice transport - pc.ice_transport = pc.create_ice_transport(api).await; + }); - // Create the DTLS transport - let certificates = configuration.certificates.drain(..).collect(); - pc.dtls_transport = - Arc::new(api.new_dtls_transport(Arc::clone(&pc.ice_transport), certificates)?); + // Wire up the ice transport connection state change handler + let ice_connection_state = Arc::clone(&pc.ice_connection_state); + let peer_connection_state = Arc::clone(&pc.peer_connection_state); + let is_closed = Arc::clone(&pc.is_closed); + let dtls_transport = Arc::clone(&pc.dtls_transport); + let on_ice_connection_state_change_handler = + Arc::clone(&pc.on_ice_connection_state_change_handler); + let on_peer_connection_state_change_handler = + Arc::clone(&pc.on_peer_connection_state_change_handler); + + pc.ice_transport.on_connection_state_change(Box::new( + move |state: RTCIceTransportState| { + let cs = match state { + RTCIceTransportState::New => RTCIceConnectionState::New, + RTCIceTransportState::Checking => RTCIceConnectionState::Checking, + RTCIceTransportState::Connected => RTCIceConnectionState::Connected, + RTCIceTransportState::Completed => RTCIceConnectionState::Completed, + RTCIceTransportState::Failed => RTCIceConnectionState::Failed, + RTCIceTransportState::Disconnected => RTCIceConnectionState::Disconnected, + RTCIceTransportState::Closed => RTCIceConnectionState::Closed, + _ => { + log::warn!("on_connection_state_change: unhandled ICE state: {}", state); + return Box::pin(async {}); + } + }; - // Create the SCTP transport - pc.sctp_transport = Arc::new(api.new_sctp_transport(Arc::clone(&pc.dtls_transport))?); + let dtls_transport = Arc::clone(&dtls_transport); + let ice_connection_state = Arc::clone(&ice_connection_state); + let on_ice_connection_state_change_handler = + Arc::clone(&on_ice_connection_state_change_handler); + let on_peer_connection_state_change_handler = + Arc::clone(&on_peer_connection_state_change_handler); + let is_closed = Arc::clone(&is_closed); + let peer_connection_state = Arc::clone(&peer_connection_state); + Box::pin(async move { + RTCPeerConnection::do_ice_connection_state_change( + &on_ice_connection_state_change_handler, + &ice_connection_state, + cs, + ) + .await; + + let dtls_transport_state = dtls_transport.state(); + RTCPeerConnection::update_connection_state( + &on_peer_connection_state_change_handler, + &is_closed, + &peer_connection_state, + cs, + dtls_transport_state, + ) + .await; + }) + }, + )); // Wire up the on datachannel handler let on_data_channel_handler = Arc::clone(&pc.on_data_channel_handler); pc.sctp_transport .on_data_channel(Box::new(move |d: Arc| { - let on_data_channel_handler2 = Arc::clone(&on_data_channel_handler); + let on_data_channel_handler = Arc::clone(&on_data_channel_handler); Box::pin(async move { - if let Some(handler) = &*on_data_channel_handler2.load() { + if let Some(handler) = &*on_data_channel_handler.load() { let mut f = handler.lock().await; f(d).await; } }) })); - Ok((Arc::new(pc), configuration)) + Ok((pc, configuration)) } pub(super) async fn start_rtp( @@ -1141,64 +1196,6 @@ impl PeerConnectionInternal { } } - pub(super) async fn create_ice_transport(&self, api: &API) -> Arc { - let ice_transport = Arc::new(api.new_ice_transport(Arc::clone(&self.ice_gatherer))); - - let ice_connection_state = Arc::clone(&self.ice_connection_state); - let peer_connection_state = Arc::clone(&self.peer_connection_state); - let is_closed = Arc::clone(&self.is_closed); - let dtls_transport = Arc::clone(&self.dtls_transport); - let on_ice_connection_state_change_handler = - Arc::clone(&self.on_ice_connection_state_change_handler); - let on_peer_connection_state_change_handler = - Arc::clone(&self.on_peer_connection_state_change_handler); - - ice_transport.on_connection_state_change(Box::new(move |state: RTCIceTransportState| { - let cs = match state { - RTCIceTransportState::New => RTCIceConnectionState::New, - RTCIceTransportState::Checking => RTCIceConnectionState::Checking, - RTCIceTransportState::Connected => RTCIceConnectionState::Connected, - RTCIceTransportState::Completed => RTCIceConnectionState::Completed, - RTCIceTransportState::Failed => RTCIceConnectionState::Failed, - RTCIceTransportState::Disconnected => RTCIceConnectionState::Disconnected, - RTCIceTransportState::Closed => RTCIceConnectionState::Closed, - _ => { - log::warn!("on_connection_state_change: unhandled ICE state: {}", state); - return Box::pin(async {}); - } - }; - - let dtls_transport = Arc::clone(&dtls_transport); - let ice_connection_state2 = Arc::clone(&ice_connection_state); - let on_ice_connection_state_change_handler2 = - Arc::clone(&on_ice_connection_state_change_handler); - let on_peer_connection_state_change_handler2 = - Arc::clone(&on_peer_connection_state_change_handler); - let is_closed2 = Arc::clone(&is_closed); - let peer_connection_state2 = Arc::clone(&peer_connection_state); - Box::pin(async move { - RTCPeerConnection::do_ice_connection_state_change( - &on_ice_connection_state_change_handler2, - &ice_connection_state2, - cs, - ) - .await; - - let dtls_transport_state = dtls_transport.state(); - RTCPeerConnection::update_connection_state( - &on_peer_connection_state_change_handler2, - &is_closed2, - &peer_connection_state2, - cs, - dtls_transport_state, - ) - .await; - }) - })); - - ice_transport - } - /// has_local_description_changed returns whether local media (rtp_transceivers) has changed /// caller of this method should hold `pc.mu` lock pub(super) async fn has_local_description_changed(&self, desc: &RTCSessionDescription) -> bool {