diff --git a/src/handler/srtp.rs b/src/handler/srtp.rs index 9aa89ab..bdc663a 100644 --- a/src/handler/srtp.rs +++ b/src/handler/srtp.rs @@ -3,6 +3,7 @@ use crate::server::states::ServerStates; use bytes::BytesMut; use log::{debug, error}; use retty::channel::{Handler, InboundContext, InboundHandler, OutboundContext, OutboundHandler}; +use rtcp::compound_packet::CompoundPacket; use shared::{ error::{Error, Result}, marshal::{Marshal, Unmarshal}, @@ -49,8 +50,15 @@ impl InboundHandler for SrtpInbound { let mut remote_context = transport.remote_srtp_context(); if let Some(context) = remote_context.as_mut() { let mut decrypted = context.decrypt_rtcp(&message)?; - let rtcp_packet = rtcp::packet::unmarshal(&mut decrypted)?; - Ok(MessageEvent::Rtp(RTPMessageEvent::Rtcp(rtcp_packet))) + let rtcp_packets = rtcp::packet::unmarshal(&mut decrypted)?; + let rtcp_packets = if rtcp_packets.len() > 1 { + let compound_packet = CompoundPacket(rtcp_packets); + compound_packet.validate()?; + compound_packet.0 + } else { + rtcp_packets + }; + Ok(MessageEvent::Rtp(RTPMessageEvent::Rtcp(rtcp_packets))) } else { Err(Error::Other(format!( "remote_srtp_context is not set yet for four_tuple {:?}", @@ -102,10 +110,18 @@ impl OutboundHandler for SrtpOutbound { let transport = server_states.get_mut_transport(&four_tuple)?; match message { - RTPMessageEvent::Rtcp(rtcp_message) => { + RTPMessageEvent::Rtcp(rtcp_packets) => { + let rtcp_packets = if rtcp_packets.len() > 1 { + let compound_packet = CompoundPacket(rtcp_packets); + compound_packet.validate()?; + compound_packet.0 + } else { + rtcp_packets + }; + let mut local_context = transport.local_srtp_context(); if let Some(context) = local_context.as_mut() { - let packet = rtcp::packet::marshal(&rtcp_message)?; + let packet = rtcp::packet::marshal(&rtcp_packets)?; context.encrypt_rtcp(&packet) } else { Err(Error::Other(format!( diff --git a/src/interceptor/report/sender_report.rs b/src/interceptor/report/sender_report.rs index a08a775..5d2a718 100644 --- a/src/interceptor/report/sender_report.rs +++ b/src/interceptor/report/sender_report.rs @@ -1,6 +1,8 @@ use crate::interceptor::report::ReportBuilder; use crate::interceptor::{Interceptor, InterceptorEvent}; -use crate::messages::TaggedMessageEvent; +use crate::messages::{MessageEvent, RTPMessageEvent, TaggedMessageEvent}; +use rtcp::header::PacketType; +use rtcp::source_description::{SdesType, SourceDescription}; use std::time::Instant; pub(crate) struct SenderReport { @@ -24,6 +26,46 @@ impl Interceptor for SenderReport { fn read(&mut self, msg: &mut TaggedMessageEvent) -> Vec { let mut interceptor_events = vec![]; + + if let MessageEvent::Rtp(RTPMessageEvent::Rtcp(rtcp_packets)) = &msg.message { + let mut inbound_rtcp_packets = vec![]; + + if rtcp_packets.len() > 1 { + let mut has_sender_report = false; + for rtcp_packet in rtcp_packets { + let packet_type = rtcp_packet.header().packet_type; + if packet_type == PacketType::SenderReport { + inbound_rtcp_packets.push(rtcp_packet.clone()); + has_sender_report = true; + } else if has_sender_report && packet_type == PacketType::SourceDescription { + if let Some(e) = rtcp_packet.as_any().downcast_ref::() { + let mut has_cname = false; + for c in &e.chunks { + for it in &c.items { + if it.sdes_type == SdesType::SdesCname { + has_cname = true + } + } + } + if has_cname { + inbound_rtcp_packets.push(rtcp_packet.clone()); + } + } + } + } + } else if !rtcp_packets.is_empty() + && rtcp_packets[0].header().packet_type == PacketType::SenderReport + { + inbound_rtcp_packets.push(rtcp_packets[0].clone()); + } + + interceptor_events.push(InterceptorEvent::Inbound(TaggedMessageEvent { + now: msg.now, + transport: msg.transport, + message: MessageEvent::Rtp(RTPMessageEvent::Rtcp(inbound_rtcp_packets)), + })); + } + if let Some(next) = self.next.as_mut() { let mut events = next.read(msg); interceptor_events.append(&mut events); @@ -32,6 +74,8 @@ impl Interceptor for SenderReport { } fn write(&mut self, msg: &mut TaggedMessageEvent) -> Vec { + // SenderReport in SFU does nothing, but just forwarding it to other Endpoints + let mut interceptor_events = vec![]; if let Some(next) = self.next.as_mut() { let mut events = next.write(msg);