Skip to content

Commit

Permalink
complete SenderReport Interceptor
Browse files Browse the repository at this point in the history
  • Loading branch information
yngrtc committed Feb 4, 2024
1 parent 21c5f48 commit a7f82b5
Show file tree
Hide file tree
Showing 2 changed files with 65 additions and 5 deletions.
24 changes: 20 additions & 4 deletions src/handler/srtp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use crate::server::states::ServerStates;
use bytes::BytesMut;
use log::{debug, error};
use retty::channel::{Handler, InboundContext, InboundHandler, OutboundContext, OutboundHandler};
use rtcp::compound_packet::CompoundPacket;
use shared::{
error::{Error, Result},
marshal::{Marshal, Unmarshal},
Expand Down Expand Up @@ -49,8 +50,15 @@ impl InboundHandler for SrtpInbound {
let mut remote_context = transport.remote_srtp_context();
if let Some(context) = remote_context.as_mut() {
let mut decrypted = context.decrypt_rtcp(&message)?;
let rtcp_packet = rtcp::packet::unmarshal(&mut decrypted)?;
Ok(MessageEvent::Rtp(RTPMessageEvent::Rtcp(rtcp_packet)))
let rtcp_packets = rtcp::packet::unmarshal(&mut decrypted)?;
let rtcp_packets = if rtcp_packets.len() > 1 {
let compound_packet = CompoundPacket(rtcp_packets);
compound_packet.validate()?;
compound_packet.0
} else {
rtcp_packets
};
Ok(MessageEvent::Rtp(RTPMessageEvent::Rtcp(rtcp_packets)))
} else {
Err(Error::Other(format!(
"remote_srtp_context is not set yet for four_tuple {:?}",
Expand Down Expand Up @@ -102,10 +110,18 @@ impl OutboundHandler for SrtpOutbound {
let transport = server_states.get_mut_transport(&four_tuple)?;

match message {
RTPMessageEvent::Rtcp(rtcp_message) => {
RTPMessageEvent::Rtcp(rtcp_packets) => {
let rtcp_packets = if rtcp_packets.len() > 1 {
let compound_packet = CompoundPacket(rtcp_packets);
compound_packet.validate()?;
compound_packet.0
} else {
rtcp_packets
};

let mut local_context = transport.local_srtp_context();
if let Some(context) = local_context.as_mut() {
let packet = rtcp::packet::marshal(&rtcp_message)?;
let packet = rtcp::packet::marshal(&rtcp_packets)?;
context.encrypt_rtcp(&packet)
} else {
Err(Error::Other(format!(
Expand Down
46 changes: 45 additions & 1 deletion src/interceptor/report/sender_report.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
use crate::interceptor::report::ReportBuilder;
use crate::interceptor::{Interceptor, InterceptorEvent};
use crate::messages::TaggedMessageEvent;
use crate::messages::{MessageEvent, RTPMessageEvent, TaggedMessageEvent};
use rtcp::header::PacketType;
use rtcp::source_description::{SdesType, SourceDescription};
use std::time::Instant;

pub(crate) struct SenderReport {
Expand All @@ -24,6 +26,46 @@ impl Interceptor for SenderReport {

fn read(&mut self, msg: &mut TaggedMessageEvent) -> Vec<InterceptorEvent> {
let mut interceptor_events = vec![];

if let MessageEvent::Rtp(RTPMessageEvent::Rtcp(rtcp_packets)) = &msg.message {
let mut inbound_rtcp_packets = vec![];

if rtcp_packets.len() > 1 {
let mut has_sender_report = false;
for rtcp_packet in rtcp_packets {
let packet_type = rtcp_packet.header().packet_type;
if packet_type == PacketType::SenderReport {
inbound_rtcp_packets.push(rtcp_packet.clone());
has_sender_report = true;
} else if has_sender_report && packet_type == PacketType::SourceDescription {
if let Some(e) = rtcp_packet.as_any().downcast_ref::<SourceDescription>() {
let mut has_cname = false;
for c in &e.chunks {
for it in &c.items {
if it.sdes_type == SdesType::SdesCname {
has_cname = true
}
}
}
if has_cname {
inbound_rtcp_packets.push(rtcp_packet.clone());
}
}
}
}
} else if !rtcp_packets.is_empty()
&& rtcp_packets[0].header().packet_type == PacketType::SenderReport
{
inbound_rtcp_packets.push(rtcp_packets[0].clone());
}

interceptor_events.push(InterceptorEvent::Inbound(TaggedMessageEvent {
now: msg.now,
transport: msg.transport,
message: MessageEvent::Rtp(RTPMessageEvent::Rtcp(inbound_rtcp_packets)),
}));
}

if let Some(next) = self.next.as_mut() {
let mut events = next.read(msg);
interceptor_events.append(&mut events);
Expand All @@ -32,6 +74,8 @@ impl Interceptor for SenderReport {
}

fn write(&mut self, msg: &mut TaggedMessageEvent) -> Vec<InterceptorEvent> {
// SenderReport in SFU does nothing, but just forwarding it to other Endpoints

let mut interceptor_events = vec![];
if let Some(next) = self.next.as_mut() {
let mut events = next.write(msg);
Expand Down

0 comments on commit a7f82b5

Please sign in to comment.