Skip to content

Commit

Permalink
add rtp/rtcp_handler
Browse files Browse the repository at this point in the history
  • Loading branch information
yngrtc committed Jan 22, 2024
1 parent ee8bc1d commit 5952b27
Show file tree
Hide file tree
Showing 3 changed files with 191 additions and 0 deletions.
6 changes: 6 additions & 0 deletions examples/chat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ use sfu::handlers::demuxer::DemuxerHandler;
use sfu::handlers::dtls::DtlsHandler;
use sfu::handlers::exception::ExceptionHandler;
use sfu::handlers::gateway::GatewayHandler;
use sfu::handlers::rtcp::RtcpHandler;
use sfu::handlers::rtp::RtpHandler;
use sfu::handlers::sctp::SctpHandler;
use sfu::handlers::srtp::SrtpHandler;
use sfu::handlers::stun::StunHandler;
Expand Down Expand Up @@ -157,6 +159,8 @@ fn main() -> anyhow::Result<()> {
let data_channel_handler = DataChannelHandler::new();
// SRTP
let srtp_handler = SrtpHandler::new(Rc::clone(&server_states_moved));
let rtp_handler = RtpHandler::new();
let rtcp_handler = RtcpHandler::new();
// Gateway
let gateway_handler = GatewayHandler::new(Rc::clone(&server_states_moved));
let read_exception_handler = ExceptionHandler::new();
Expand All @@ -171,6 +175,8 @@ fn main() -> anyhow::Result<()> {
pipeline.add_back(data_channel_handler);
// SRTP
pipeline.add_back(srtp_handler);
pipeline.add_back(rtp_handler);
pipeline.add_back(rtcp_handler);
// Gateway
pipeline.add_back(gateway_handler);
pipeline.add_back(read_exception_handler);
Expand Down
68 changes: 68 additions & 0 deletions src/handlers/rtcp/mod.rs
Original file line number Diff line number Diff line change
@@ -1 +1,69 @@
use crate::messages::{MessageEvent, RTPMessageEvent, TaggedMessageEvent};
use log::debug;
use retty::channel::{Handler, InboundContext, InboundHandler, OutboundContext, OutboundHandler};

#[derive(Default)]
struct RtcpInbound;
#[derive(Default)]
struct RtcpOutbound;
#[derive(Default)]
pub struct RtcpHandler {
rtcp_inbound: RtcpInbound,
rtcp_outbound: RtcpOutbound,
}

impl RtcpHandler {
pub fn new() -> Self {
Self::default()
}
}

impl InboundHandler for RtcpInbound {
type Rin = TaggedMessageEvent;
type Rout = Self::Rin;

fn read(&mut self, ctx: &InboundContext<Self::Rin, Self::Rout>, msg: Self::Rin) {
if let MessageEvent::RTP(RTPMessageEvent::RTCP(rtcp_messages)) = &msg.message {
if let Some(rtcp_message) = rtcp_messages.first() {
debug!("rtcp read {:?}", rtcp_message.header());
}
}

ctx.fire_read(msg);
}
}

impl OutboundHandler for RtcpOutbound {
type Win = TaggedMessageEvent;
type Wout = Self::Win;

fn write(&mut self, ctx: &OutboundContext<Self::Win, Self::Wout>, msg: Self::Win) {
if let MessageEvent::RTP(RTPMessageEvent::RTCP(rtcp_messages)) = &msg.message {
if let Some(rtcp_message) = rtcp_messages.first() {
debug!("rtcp write {:?}", rtcp_message.header());
}
}

ctx.fire_write(msg);
}
}

impl Handler for RtcpHandler {
type Rin = TaggedMessageEvent;
type Rout = Self::Rin;
type Win = TaggedMessageEvent;
type Wout = Self::Win;

fn name(&self) -> &str {
"RtcpHandler"
}

fn split(
self,
) -> (
Box<dyn InboundHandler<Rin = Self::Rin, Rout = Self::Rout>>,
Box<dyn OutboundHandler<Win = Self::Win, Wout = Self::Wout>>,
) {
(Box::new(self.rtcp_inbound), Box::new(self.rtcp_outbound))
}
}
117 changes: 117 additions & 0 deletions src/handlers/rtp/mod.rs
Original file line number Diff line number Diff line change
@@ -1 +1,118 @@
use crate::messages::{MessageEvent, RTPMessageEvent, TaggedMessageEvent};
use log::{debug, error};
use retty::channel::{Handler, InboundContext, InboundHandler, OutboundContext, OutboundHandler};
use shared::{
marshal::{Marshal, Unmarshal},
util::is_rtcp,
};

#[derive(Default)]
struct RtpInbound;
#[derive(Default)]
struct RtpOutbound;
#[derive(Default)]
pub struct RtpHandler {
rtp_inbound: RtpInbound,
rtp_outbound: RtpOutbound,
}

impl RtpHandler {
pub fn new() -> Self {
Self::default()
}
}

impl InboundHandler for RtpInbound {
type Rin = TaggedMessageEvent;
type Rout = Self::Rin;

fn read(&mut self, ctx: &InboundContext<Self::Rin, Self::Rout>, mut msg: Self::Rin) {
debug!("rtp read {:?}", msg.transport.peer_addr);
if let MessageEvent::RTP(RTPMessageEvent::RAW(mut message)) = msg.message {
if is_rtcp(&message) {
match rtcp::packet::unmarshal(&mut message) {
Ok(rtcp_packet) => {
msg.message = MessageEvent::RTP(RTPMessageEvent::RTCP(rtcp_packet));
ctx.fire_read(msg);
}
Err(err) => {
error!("try_read with error {}", err);
ctx.fire_read_exception(Box::new(err))
}
}
} else {
match rtp::Packet::unmarshal(&mut message) {
Ok(rtp_packet) => {
msg.message = MessageEvent::RTP(RTPMessageEvent::RTP(rtp_packet));
ctx.fire_read(msg);
}
Err(err) => {
error!("try_read with error {}", err);
ctx.fire_read_exception(Box::new(err))
}
}
}
} else {
debug!("bypass rtp read {:?}", msg.transport.peer_addr);
ctx.fire_read(msg);
}
}
}

impl OutboundHandler for RtpOutbound {
type Win = TaggedMessageEvent;
type Wout = Self::Win;

fn write(&mut self, ctx: &OutboundContext<Self::Win, Self::Wout>, mut msg: Self::Win) {
debug!("rtp write {:?}", msg.transport.peer_addr);
match msg.message {
MessageEvent::RTP(RTPMessageEvent::RTCP(rtcp_message)) => {
match rtcp::packet::marshal(&rtcp_message) {
Ok(message) => {
msg.message = MessageEvent::RTP(RTPMessageEvent::RAW(message));
ctx.fire_write(msg);
}
Err(err) => {
error!("try_write with error {}", err);
ctx.fire_write_exception(Box::new(err))
}
}
}
MessageEvent::RTP(RTPMessageEvent::RTP(rtp_message)) => match rtp_message.marshal() {
Ok(message) => {
msg.message = MessageEvent::RTP(RTPMessageEvent::RAW(message));
ctx.fire_write(msg);
}
Err(err) => {
error!("try_write with error {}", err);
ctx.fire_write_exception(Box::new(err))
}
},
_ => {
// Bypass
debug!("Bypass rtp write {:?}", msg.transport.peer_addr);
ctx.fire_write(msg);
}
}
}
}

impl Handler for RtpHandler {
type Rin = TaggedMessageEvent;
type Rout = Self::Rin;
type Win = TaggedMessageEvent;
type Wout = Self::Win;

fn name(&self) -> &str {
"RtpHandler"
}

fn split(
self,
) -> (
Box<dyn InboundHandler<Rin = Self::Rin, Rout = Self::Rout>>,
Box<dyn OutboundHandler<Win = Self::Win, Wout = Self::Wout>>,
) {
(Box::new(self.rtp_inbound), Box::new(self.rtp_outbound))
}
}

0 comments on commit 5952b27

Please sign in to comment.