Skip to content

Commit

Permalink
merge rtp_handler into srtp_handler
Browse files Browse the repository at this point in the history
  • Loading branch information
Rusty Rain committed Feb 4, 2024
1 parent b721503 commit b2b1fd0
Show file tree
Hide file tree
Showing 12 changed files with 71 additions and 173 deletions.
8 changes: 3 additions & 5 deletions examples/chat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use retty::executor::LocalExecutorBuilder;
use retty::transport::{AsyncTransport, AsyncTransportWrite, TaggedBytesMut};
use sfu::{
DataChannelHandler, DemuxerHandler, DtlsHandler, ExceptionHandler, GatewayHandler,
RTCCertificate, RtcpHandler, RtpHandler, SctpHandler, ServerConfig, ServerStates, SrtpHandler,
InterceptorHandler, RTCCertificate, SctpHandler, ServerConfig, ServerStates, SrtpHandler,
StunHandler,
};
use std::cell::RefCell;
Expand Down Expand Up @@ -153,8 +153,7 @@ fn main() -> anyhow::Result<()> {
let data_channel_handler = DataChannelHandler::new();
// SRTP
let srtp_handler = SrtpHandler::new(Rc::clone(&server_states_moved));
let rtp_handler = RtpHandler::new();
let rtcp_handler = RtcpHandler::new();
let interceptor_handler = InterceptorHandler::new();
// Gateway
let gateway_handler = GatewayHandler::new(Rc::clone(&server_states_moved));
let read_exception_handler = ExceptionHandler::new();
Expand All @@ -169,8 +168,7 @@ fn main() -> anyhow::Result<()> {
pipeline.add_back(data_channel_handler);
// SRTP
pipeline.add_back(srtp_handler);
pipeline.add_back(rtp_handler);
pipeline.add_back(rtcp_handler);
pipeline.add_back(interceptor_handler);
// Gateway
pipeline.add_back(gateway_handler);
pipeline.add_back(read_exception_handler);
Expand Down
23 changes: 13 additions & 10 deletions src/handler/rtcp.rs → src/handler/interceptor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,22 +3,22 @@ use log::debug;
use retty::channel::{Handler, InboundContext, InboundHandler, OutboundContext, OutboundHandler};

#[derive(Default)]
struct RtcpInbound;
struct InterceptorInbound;
#[derive(Default)]
struct RtcpOutbound;
struct InterceptorOutbound;
#[derive(Default)]
pub struct RtcpHandler {
rtcp_inbound: RtcpInbound,
rtcp_outbound: RtcpOutbound,
pub struct InterceptorHandler {
interceptor_inbound: InterceptorInbound,
interceptor_outbound: InterceptorOutbound,
}

impl RtcpHandler {
impl InterceptorHandler {
pub fn new() -> Self {
Self::default()
}
}

impl InboundHandler for RtcpInbound {
impl InboundHandler for InterceptorInbound {
type Rin = TaggedMessageEvent;
type Rout = Self::Rin;

Expand All @@ -33,7 +33,7 @@ impl InboundHandler for RtcpInbound {
}
}

impl OutboundHandler for RtcpOutbound {
impl OutboundHandler for InterceptorOutbound {
type Win = TaggedMessageEvent;
type Wout = Self::Win;

Expand All @@ -48,7 +48,7 @@ impl OutboundHandler for RtcpOutbound {
}
}

impl Handler for RtcpHandler {
impl Handler for InterceptorHandler {
type Rin = TaggedMessageEvent;
type Rout = Self::Rin;
type Win = TaggedMessageEvent;
Expand All @@ -64,6 +64,9 @@ impl Handler for RtcpHandler {
Box<dyn InboundHandler<Rin = Self::Rin, Rout = Self::Rout>>,
Box<dyn OutboundHandler<Win = Self::Win, Wout = Self::Wout>>,
) {
(Box::new(self.rtcp_inbound), Box::new(self.rtcp_outbound))
(
Box::new(self.interceptor_inbound),
Box::new(self.interceptor_outbound),
)
}
}
3 changes: 1 addition & 2 deletions src/handler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,7 @@ pub(crate) mod demuxer;
pub(crate) mod dtls;
pub(crate) mod exception;
pub(crate) mod gateway;
pub(crate) mod rtcp;
pub(crate) mod rtp;
pub(crate) mod interceptor;
pub(crate) mod sctp;
pub(crate) mod srtp;
pub(crate) mod stun;
118 changes: 0 additions & 118 deletions src/handler/rtp.rs

This file was deleted.

80 changes: 43 additions & 37 deletions src/handler/srtp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use log::{debug, error};
use retty::channel::{Handler, InboundContext, InboundHandler, OutboundContext, OutboundHandler};
use shared::{
error::{Error, Result},
marshal::{Marshal, Unmarshal},
util::is_rtcp,
};
use std::cell::RefCell;
Expand Down Expand Up @@ -37,22 +38,19 @@ impl InboundHandler for SrtpInbound {
type Rout = Self::Rin;

fn read(&mut self, ctx: &InboundContext<Self::Rin, Self::Rout>, mut msg: Self::Rin) {
if let MessageEvent::Rtp(RTPMessageEvent::Raw(rtp_message)) = msg.message {
if let MessageEvent::Rtp(RTPMessageEvent::Raw(message)) = msg.message {
debug!("srtp read {:?}", msg.transport.peer_addr);
let try_read = || -> Result<BytesMut> {
let try_read = || -> Result<MessageEvent> {
let four_tuple = (&msg.transport).into();
let mut server_states = self.server_states.borrow_mut();
let transport = match server_states.get_mut_transport(&four_tuple) {
Ok(transport) => transport,
Err(err) => {
return Err(err);
}
};
let transport = server_states.get_mut_transport(&four_tuple)?;

if is_rtcp(&rtp_message) {
if is_rtcp(&message) {
let mut remote_context = transport.remote_srtp_context();
if let Some(context) = remote_context.as_mut() {
context.decrypt_rtcp(&rtp_message)
let mut decrypted = context.decrypt_rtcp(&message)?;
let rtcp_packet = rtcp::packet::unmarshal(&mut decrypted)?;
Ok(MessageEvent::Rtp(RTPMessageEvent::Rtcp(rtcp_packet)))
} else {
Err(Error::Other(format!(
"remote_srtp_context is not set yet for four_tuple {:?}",
Expand All @@ -62,7 +60,9 @@ impl InboundHandler for SrtpInbound {
} else {
let mut remote_context = transport.remote_srtp_context();
if let Some(context) = remote_context.as_mut() {
context.decrypt_rtp(&rtp_message)
let mut decrypted = context.decrypt_rtp(&message)?;
let rtp_packet = rtp::Packet::unmarshal(&mut decrypted)?;
Ok(MessageEvent::Rtp(RTPMessageEvent::Rtp(rtp_packet)))
} else {
Err(Error::Other(format!(
"remote_srtp_context is not set yet for four_tuple {:?}",
Expand All @@ -73,8 +73,8 @@ impl InboundHandler for SrtpInbound {
};

match try_read() {
Ok(decrypted) => {
msg.message = MessageEvent::Rtp(RTPMessageEvent::Raw(decrypted));
Ok(message) => {
msg.message = message;
ctx.fire_read(msg);
}
Err(err) => {
Expand All @@ -94,36 +94,42 @@ impl OutboundHandler for SrtpOutbound {
type Wout = Self::Win;

fn write(&mut self, ctx: &OutboundContext<Self::Win, Self::Wout>, mut msg: Self::Win) {
if let MessageEvent::Rtp(RTPMessageEvent::Raw(rtp_message)) = msg.message {
if let MessageEvent::Rtp(message) = msg.message {
debug!("srtp write {:?}", msg.transport.peer_addr);
let try_write = || -> Result<BytesMut> {
let four_tuple = (&msg.transport).into();
let mut server_states = self.server_states.borrow_mut();
let transport = match server_states.get_mut_transport(&four_tuple) {
Ok(transport) => transport,
Err(err) => {
return Err(err);
let transport = server_states.get_mut_transport(&four_tuple)?;

match message {
RTPMessageEvent::Rtcp(rtcp_message) => {
let mut local_context = transport.local_srtp_context();
if let Some(context) = local_context.as_mut() {
let packet = rtcp::packet::marshal(&rtcp_message)?;
context.encrypt_rtcp(&packet)
} else {
Err(Error::Other(format!(
"local_srtp_context is not set yet for four_tuple {:?}",
four_tuple
)))
}
}
};
if is_rtcp(&rtp_message) {
let mut local_context = transport.local_srtp_context();
if let Some(context) = local_context.as_mut() {
context.encrypt_rtcp(&rtp_message)
} else {
Err(Error::Other(format!(
"local_srtp_context is not set yet for four_tuple {:?}",
four_tuple
)))
RTPMessageEvent::Rtp(rtp_message) => {
let mut local_context = transport.local_srtp_context();
if let Some(context) = local_context.as_mut() {
let packet = rtp_message.marshal()?;
context.encrypt_rtp(&packet)
} else {
Err(Error::Other(format!(
"local_srtp_context is not set yet for four_tuple {:?}",
four_tuple
)))
}
}
} else {
let mut local_context = transport.local_srtp_context();
if let Some(context) = local_context.as_mut() {
context.encrypt_rtp(&rtp_message)
} else {
Err(Error::Other(format!(
"local_srtp_context is not set yet for four_tuple {:?}",
four_tuple
)))
RTPMessageEvent::Raw(raw_packet) => {
// Bypass
debug!("Bypass srtp write {:?}", msg.transport.peer_addr);
Ok(raw_packet)
}
}
};
Expand Down
3 changes: 3 additions & 0 deletions src/interceptor/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
pub(crate) mod nack;
pub(crate) mod report;
pub(crate) mod twcc;
1 change: 1 addition & 0 deletions src/interceptor/nack/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@

2 changes: 2 additions & 0 deletions src/interceptor/report/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
pub(crate) mod receiver_report;
pub(crate) mod sender_report;
1 change: 1 addition & 0 deletions src/interceptor/report/receiver_report.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@

1 change: 1 addition & 0 deletions src/interceptor/report/sender_report.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@

1 change: 1 addition & 0 deletions src/interceptor/twcc/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@

3 changes: 2 additions & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
pub(crate) mod description;
pub(crate) mod endpoint;
pub(crate) mod handler;
pub(crate) mod interceptor;
pub(crate) mod messages;
pub(crate) mod server;
pub(crate) mod session;
Expand All @@ -12,7 +13,7 @@ pub(crate) mod types;
pub use description::RTCSessionDescription;
pub use handler::{
data::DataChannelHandler, demuxer::DemuxerHandler, dtls::DtlsHandler,
exception::ExceptionHandler, gateway::GatewayHandler, rtcp::RtcpHandler, rtp::RtpHandler,
exception::ExceptionHandler, gateway::GatewayHandler, interceptor::InterceptorHandler,
sctp::SctpHandler, srtp::SrtpHandler, stun::StunHandler,
};
pub use server::{certificate::RTCCertificate, config::ServerConfig, states::ServerStates};

0 comments on commit b2b1fd0

Please sign in to comment.