diff --git a/src/server/endpoint/mod.rs b/src/server/endpoint/mod.rs index c13eaeb..81b5088 100644 --- a/src/server/endpoint/mod.rs +++ b/src/server/endpoint/mod.rs @@ -2,8 +2,9 @@ pub mod candidate; pub mod transport; use crate::server::endpoint::transport::Transport; +use crate::server::session::description::rtp_transceiver::RTCRtpTransceiver; use crate::server::session::Session; -use crate::types::{EndpointId, FourTuple}; +use crate::types::{EndpointId, FourTuple, Mid}; use std::cell::RefCell; use std::collections::HashMap; use std::rc::{Rc, Weak}; @@ -13,6 +14,7 @@ pub(crate) struct Endpoint { session: Weak, endpoint_id: EndpointId, transports: RefCell>>, + pub(crate) transceivers: RefCell>, } impl Endpoint { @@ -21,6 +23,7 @@ impl Endpoint { session, endpoint_id, transports: RefCell::new(HashMap::new()), + transceivers: RefCell::new(HashMap::new()), } } diff --git a/src/server/session/description/mod.rs b/src/server/session/description/mod.rs index 63d1618..700c49f 100644 --- a/src/server/session/description/mod.rs +++ b/src/server/session/description/mod.rs @@ -272,12 +272,9 @@ pub(crate) fn add_transceiver_sdp( ice_params: &RTCIceParameters, candidate: &SocketAddr, media_section: &MediaSection, - transceiver: Option<&RTCRtpTransceiver>, + transceiver: &RTCRtpTransceiver, params: AddTransceiverSdpParams, ) -> Result<(SessionDescription, bool)> { - if transceiver.is_none() { - return Err(Error::Other("ErrSDPZeroTransceivers".to_string())); - } let (should_add_candidates, mid_value, dtls_role, ice_gathering_state) = ( params.should_add_candidates, params.mid_value, @@ -285,19 +282,18 @@ pub(crate) fn add_transceiver_sdp( params.ice_gathering_state, ); - // Use the first transceiver to generate the section attributes - let t = &transceiver.as_ref().unwrap(); - let mut media = MediaDescription::new_jsep_media_description(t.kind.to_string(), vec![]) - .with_value_attribute(ATTR_KEY_CONNECTION_SETUP.to_owned(), dtls_role.to_string()) - .with_value_attribute(ATTR_KEY_MID.to_owned(), mid_value.clone()) - .with_ice_credentials( - ice_params.username_fragment.clone(), - ice_params.password.clone(), - ) - .with_property_attribute(ATTR_KEY_RTCPMUX.to_owned()) - .with_property_attribute(ATTR_KEY_RTCPRSIZE.to_owned()); - - let codecs = &t.codecs; + let mut media = + MediaDescription::new_jsep_media_description(transceiver.kind.to_string(), vec![]) + .with_value_attribute(ATTR_KEY_CONNECTION_SETUP.to_owned(), dtls_role.to_string()) + .with_value_attribute(ATTR_KEY_MID.to_owned(), mid_value.clone()) + .with_ice_credentials( + ice_params.username_fragment.clone(), + ice_params.password.clone(), + ) + .with_property_attribute(ATTR_KEY_RTCPMUX.to_owned()) + .with_property_attribute(ATTR_KEY_RTCPRSIZE.to_owned()); + + let codecs = &transceiver.codecs; for codec in codecs { let name = codec .capability @@ -325,14 +321,14 @@ pub(crate) fn add_transceiver_sdp( } if codecs.is_empty() { // If we are sender and we have no codecs throw an error early - if t.sender.track.is_some() { + if transceiver.sender.track.is_some() { return Err(Error::Other("ErrSenderWithNoCodecs".to_string())); } // Explicitly reject track if we don't have the codec d = d.with_media(MediaDescription { media_name: sdp::description::media::MediaName { - media: t.kind.to_string(), + media: transceiver.kind.to_string(), port: RangedPort { value: 0, range: None, @@ -391,7 +387,7 @@ pub(crate) fn add_transceiver_sdp( ); } - let sender = &t.sender; + let sender = &transceiver.sender; if let Some(track) = &sender.track { media = media.with_media_source( sender.ssrc, @@ -432,7 +428,7 @@ pub(crate) fn add_transceiver_sdp( let direction = match params.offered_direction { Some(offered_direction) => { use RTCRtpTransceiverDirection::*; - let transceiver_direction = t.direction; + let transceiver_direction = transceiver.direction; match offered_direction { Sendonly | Recvonly => { @@ -449,7 +445,7 @@ pub(crate) fn add_transceiver_sdp( // media or session level, in which case the stream is sendrecv by // default), the corresponding stream in the answer MAY be marked as // sendonly, recvonly, sendrecv, or inactive - Sendrecv | Unspecified => t.direction, + Sendrecv | Unspecified => transceiver.direction, // If an offered media // stream is listed as inactive, it MUST be marked as inactive in the // answer. @@ -464,7 +460,7 @@ pub(crate) fn add_transceiver_sdp( // // When creating offers, the transceiver direction is directly reflected // in the output, even for re-offers. - t.direction + transceiver.direction } }; media = media.with_property_attribute(direction.to_string()); @@ -541,7 +537,9 @@ pub(crate) fn populate_sdp( ice_params, candidate, m, - transceivers.get(&m.mid), + transceivers + .get(&m.mid) + .ok_or(Error::Other("ErrSDPZeroTransceivers".to_string()))?, params, )?; d = d1; diff --git a/src/server/session/description/rtp_sender.rs b/src/server/session/description/rtp_sender.rs index 49c156e..390c523 100644 --- a/src/server/session/description/rtp_sender.rs +++ b/src/server/session/description/rtp_sender.rs @@ -1,5 +1,5 @@ use crate::server::session::description::rtp_codec::RTPCodecType; -use crate::server::session::description::rtp_transceiver::{PayloadType, SSRC}; +use crate::server::session::description::rtp_transceiver::SSRC; #[derive(Debug, Clone)] pub(crate) struct TrackLocal { @@ -18,29 +18,37 @@ pub struct RTCRtpSender { //pub(crate) context: Mutex, //pub(crate) transport: Arc, - pub(crate) payload_type: PayloadType, + //pub(crate) payload_type: PayloadType, pub(crate) ssrc: SSRC, - pub(crate) receive_mtu: usize, - + //pub(crate) receive_mtu: usize, /// a transceiver sender since we can just check the /// transceiver negotiation status - pub(crate) negotiated: bool, + negotiated: bool, //pub(crate) media_engine: Arc, //pub(crate) interceptor: Arc, - pub(crate) id: String, - + //pub(crate) id: String, /// The id of the initial track, even if we later change to a different /// track id should be use when negotiating. pub(crate) initial_track_id: Option, /// AssociatedMediaStreamIds from the WebRTC specifications pub(crate) associated_media_stream_ids: Vec, - //pub(crate) rtp_transceiver: Option, - pub(crate) paused: bool, + paused: bool, } impl RTCRtpSender { + pub(crate) fn new() -> Self { + Self { + track: None, + ssrc: rand::random::(), + negotiated: false, + initial_track_id: None, + associated_media_stream_ids: vec![], + paused: false, + } + } + pub(crate) fn is_negotiated(&self) -> bool { self.negotiated } @@ -48,4 +56,12 @@ impl RTCRtpSender { pub(crate) fn set_negotiated(&mut self) { self.negotiated = true; } + + pub(crate) fn is_paused(&self) -> bool { + self.paused + } + + pub(crate) fn set_paused(&mut self, paused: bool) { + self.paused = paused; + } } diff --git a/src/server/session/description/rtp_transceiver.rs b/src/server/session/description/rtp_transceiver.rs index fe6d7de..70a7cbe 100644 --- a/src/server/session/description/rtp_transceiver.rs +++ b/src/server/session/description/rtp_transceiver.rs @@ -1,7 +1,8 @@ use crate::server::session::description::rtp_codec::{RTCRtpCodecParameters, RTPCodecType}; -use crate::server::session::description::rtp_receiver::RTCRtpReceiver; use crate::server::session::description::rtp_sender::RTCRtpSender; use crate::server::session::description::rtp_transceiver_direction::RTCRtpTransceiverDirection; +use log::{debug, trace}; +use shared::error::Result; /// SSRC represents a synchronization source /// A synchronization source is a randomly chosen @@ -49,17 +50,96 @@ pub struct RTCPFeedback { /// RTPTransceiver represents a combination of an RTPSender and an RTPReceiver that share a common mid. #[derive(Debug, Clone)] pub struct RTCRtpTransceiver { - pub(crate) mid: String, + //pub(crate) mid: String, pub(crate) sender: RTCRtpSender, - pub(crate) receiver: RTCRtpReceiver, + //pub(crate) receiver: RTCRtpReceiver, pub(crate) direction: RTCRtpTransceiverDirection, - pub(crate) current_direction: RTCRtpTransceiverDirection, - + current_direction: RTCRtpTransceiverDirection, pub(crate) codecs: Vec, // User provided codecs via set_codec_preferences - pub(crate) stopped: bool, pub(crate) kind: RTPCodecType, //media_engine: Arc, //trigger_negotiation_needed: Mutex, } + +impl RTCRtpTransceiver { + pub(crate) fn new( + sender: RTCRtpSender, + direction: RTCRtpTransceiverDirection, + codecs: Vec, + kind: RTPCodecType, + ) -> Self { + Self { + sender, + direction, + current_direction: RTCRtpTransceiverDirection::Unspecified, + codecs, + stopped: false, + kind, + } + } + + /// current_direction returns the RTPTransceiver's current direction as negotiated. + /// + /// If this transceiver has never been negotiated or if it's stopped this returns [`RTCRtpTransceiverDirection::Unspecified`]. + pub fn current_direction(&self) -> RTCRtpTransceiverDirection { + if self.stopped { + return RTCRtpTransceiverDirection::Unspecified; + } + + self.current_direction + } + + pub(crate) fn set_current_direction(&mut self, d: RTCRtpTransceiverDirection) { + let previous = self.current_direction; + self.current_direction = d; + if d != previous { + debug!( + "Changing current direction of transceiver from {} to {}", + previous, d, + ); + } + } + + /// Perform any subsequent actions after altering the transceiver's direction. + /// + /// After changing the transceiver's direction this method should be called to perform any + /// side-effects that results from the new direction, such as pausing/resuming the RTP receiver. + pub(crate) fn process_new_current_direction( + &mut self, + previous_direction: RTCRtpTransceiverDirection, + ) -> Result<()> { + if self.stopped { + return Ok(()); + } + + let current_direction = self.current_direction(); + if previous_direction != current_direction { + trace!( + "Processing transceiver direction change from {} to {}", + previous_direction, + current_direction + ); + } else { + // no change. + return Ok(()); + } + + /*{ + let receiver = self.receiver.lock().await; + let pause_receiver = !current_direction.has_recv(); + + if pause_receiver { + receiver.pause().await?; + } else { + receiver.resume().await?; + } + }*/ + + let pause_sender = !current_direction.has_send(); + self.sender.set_paused(pause_sender); + + Ok(()) + } +} diff --git a/src/server/session/mod.rs b/src/server/session/mod.rs index 2bd7762..3b33f81 100644 --- a/src/server/session/mod.rs +++ b/src/server/session/mod.rs @@ -1,3 +1,4 @@ +use log::debug; use retty::transport::TransportContext; use sdp::description::session::Origin; use sdp::util::ConnectionRole; @@ -14,6 +15,7 @@ use crate::server::endpoint::candidate::{Candidate, DTLSRole, RTCIceParameters}; use crate::server::endpoint::transport::Transport; use crate::server::endpoint::Endpoint; use crate::server::session::description::rtp_codec::RTPCodecType; +use crate::server::session::description::rtp_sender::RTCRtpSender; use crate::server::session::description::rtp_transceiver::RTCRtpTransceiver; use crate::server::session::description::rtp_transceiver_direction::RTCRtpTransceiverDirection; use crate::server::session::description::sdp_type::RTCSdpType; @@ -28,7 +30,6 @@ pub(crate) struct Session { session_config: SessionConfig, session_id: SessionId, endpoints: RefCell>>, - transceivers: RefCell>, } impl Session { @@ -37,7 +38,6 @@ impl Session { session_config, session_id, endpoints: RefCell::new(HashMap::new()), - transceivers: RefCell::new(HashMap::new()), } } @@ -77,6 +77,12 @@ impl Session { Rc::clone(candidate), )); endpoint.add_transport(Rc::clone(&transport)); + + { + let mut endpoints = self.endpoints.borrow_mut(); + endpoints.insert(endpoint_id, Rc::clone(&endpoint)); + } + Ok((false, endpoint, transport)) } } @@ -89,15 +95,125 @@ impl Session { self.endpoints.borrow().contains_key(endpoint_id) } + pub(crate) fn set_remote_description( + &self, + endpoint: &Rc, + remote_description: &RTCSessionDescription, + ) -> Result<()> { + let parsed = remote_description + .parsed + .as_ref() + .ok_or(Error::Other("Unparsed remote description".to_string()))?; + + let mut local_transceivers = endpoint.transceivers.borrow_mut(); + let we_offer = remote_description.sdp_type == RTCSdpType::Answer; + + if !we_offer { + // This is an offer from the remove + for media in &parsed.media_descriptions { + let mid_value = match get_mid_value(media) { + Some(m) => { + if m.is_empty() { + return Err(Error::Other( + "ErrPeerConnRemoteDescriptionWithoutMidValue".to_string(), + )); + } else { + m + } + } + None => continue, + }; + + if media.media_name.media == MEDIA_SECTION_APPLICATION { + continue; + } + + let kind = RTPCodecType::from(media.media_name.media.as_str()); + let direction = get_peer_direction(media); + if kind == RTPCodecType::Unspecified + || direction == RTCRtpTransceiverDirection::Unspecified + { + continue; + } + + if !local_transceivers.contains_key(mid_value) { + let local_direction = if direction == RTCRtpTransceiverDirection::Recvonly { + RTCRtpTransceiverDirection::Sendonly + } else { + RTCRtpTransceiverDirection::Recvonly + }; + + let sender = RTCRtpSender::new(); + let transceiver = RTCRtpTransceiver::new(sender, local_direction, vec![], kind); + local_transceivers.insert(mid_value.to_string(), transceiver); + debug!("local_transceivers {:?}", local_transceivers.keys()); + } + } + } else { + // WebRTC Spec 1.0 https://www.w3.org/TR/webrtc/ + // 4.5.9.2 + // This is an answer from the remote. + for media in &parsed.media_descriptions { + let mid_value = match get_mid_value(media) { + Some(m) => { + if m.is_empty() { + return Err(Error::Other( + "ErrPeerConnRemoteDescriptionWithoutMidValue".to_string(), + )); + } else { + m + } + } + None => continue, + }; + + if media.media_name.media == MEDIA_SECTION_APPLICATION { + continue; + } + let kind = RTPCodecType::from(media.media_name.media.as_str()); + let direction = get_peer_direction(media); + if kind == RTPCodecType::Unspecified + || direction == RTCRtpTransceiverDirection::Unspecified + { + continue; + } + + if let Some(t) = local_transceivers.get_mut(mid_value) { + let previous_direction = t.current_direction(); + + // 4.5.9.2.9 + // Let direction be an RTCRtpTransceiverDirection value representing the direction + // from the media description, but with the send and receive directions reversed to + // represent this peer's point of view. If the media description is rejected, + // set direction to "inactive". + let reversed_direction = direction.reverse(); + + // 4.5.9.2.13.2 + // Set transceiver.[[CurrentDirection]] and transceiver.[[Direction]]s to direction. + t.set_current_direction(reversed_direction); + // TODO: According to the specification we should set + // transceiver.[[Direction]] here, however libWebrtc doesn't do this. + // NOTE: After raising this it seems like the specification might + // change to remove the setting of transceiver.[[Direction]]. + // See https://github.com/w3c/webrtc-pc/issues/2751#issuecomment-1185901962 + // t.set_direction_internal(reversed_direction); + t.process_new_current_direction(previous_direction)?; + } + } + } + + Ok(()) + } + pub(crate) fn create_answer( &self, - endpoint_id: EndpointId, + endpoint: &Option>, remote_description: &RTCSessionDescription, local_ice_params: &RTCIceParameters, ) -> Result { let use_identity = false; //TODO: self.config.idp_login_url.is_some(); let mut d = self.generate_matched_sdp( - endpoint_id, + endpoint, remote_description, local_ice_params, use_identity, @@ -123,7 +239,7 @@ impl Session { /// this is used everytime we have a remote_description pub(crate) fn generate_matched_sdp( &self, - _endpoint_id: EndpointId, + endpoint: &Option>, remote_description: &RTCSessionDescription, local_ice_params: &RTCIceParameters, use_identity: bool, @@ -131,13 +247,18 @@ impl Session { connection_role: ConnectionRole, ) -> Result { let d = SessionDescription::new_jsep_session_description(use_identity); + let empty_transceivers = RefCell::new(HashMap::new()); let media_sections = { - let mut local_transceivers = self.transceivers.borrow_mut(); + let mut local_transceivers = if let Some(endpoint) = endpoint.as_ref() { + endpoint.transceivers.borrow_mut() + } else { + empty_transceivers.borrow_mut() + }; let mut media_sections = vec![]; let mut already_have_application_media_section = false; - let mut matched = HashSet::new(); + let mut matched: HashSet = HashSet::new(); if let Some(parsed) = remote_description.parsed.as_ref() { for media in &parsed.media_descriptions { if let Some(mid_value) = get_mid_value(media) { @@ -167,7 +288,7 @@ impl Session { if let Some(t) = local_transceivers.get_mut(mid_value) { t.sender.set_negotiated(); - matched.insert(t.mid.clone()); + matched.insert(mid_value.to_string()); media_sections.push(MediaSection { mid: mid_value.to_owned(), @@ -185,10 +306,10 @@ impl Session { // If we are offering also include unmatched local transceivers if include_unmatched { for (mid, t) in local_transceivers.iter_mut() { - if !matched.contains(mid) { + if !matched.contains::(mid) { t.sender.set_negotiated(); media_sections.push(MediaSection { - mid: t.mid.clone(), + mid: mid.clone(), ..Default::default() }); } @@ -213,6 +334,12 @@ impl Session { return Err(Error::Other("ErrNonCertificate".to_string())); }; + let local_transceiver = if let Some(endpoint) = endpoint.as_ref() { + endpoint.transceivers.borrow() + } else { + empty_transceivers.borrow() + }; + populate_sdp( d, &dtls_fingerprints, @@ -220,7 +347,7 @@ impl Session { local_ice_params, connection_role, &media_sections, - &self.transceivers.borrow(), + &local_transceiver, true, ) } diff --git a/src/server/states.rs b/src/server/states.rs index 0747c11..5a7dcfe 100644 --- a/src/server/states.rs +++ b/src/server/states.rs @@ -70,9 +70,14 @@ impl ServerStates { ); let session = self.create_or_get_session(session_id); - let answer = session.create_answer(endpoint_id, &offer, &local_conn_cred.ice_params)?; + let endpoint = session.get_endpoint(&endpoint_id); - if !session.has_endpoint(&endpoint_id) { + if let Some(endpoint) = endpoint.as_ref() { + session.set_remote_description(endpoint, &offer)?; + } + let answer = session.create_answer(&endpoint, &offer, &local_conn_cred.ice_params)?; + + if endpoint.is_none() { self.add_candidate(Rc::new(Candidate::new( session.session_id(), endpoint_id,