Skip to content

Commit

Permalink
impl handle_rtp_message/handle_rtcp_message
Browse files Browse the repository at this point in the history
  • Loading branch information
yngrtc committed Jan 25, 2024
1 parent e78e805 commit 21665e0
Show file tree
Hide file tree
Showing 2 changed files with 69 additions and 9 deletions.
74 changes: 65 additions & 9 deletions src/handlers/gateway/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use log::{debug, info, warn};
use retty::channel::{Handler, InboundContext, InboundHandler, OutboundContext, OutboundHandler};
use retty::transport::TransportContext;
use shared::error::{Error, Result};
use std::net::SocketAddr;
use std::rc::Rc;
use std::time::Instant;
use stun::attributes::{
Expand Down Expand Up @@ -232,22 +233,52 @@ impl GatewayInbound {

fn handle_rtp_message(
&mut self,
_now: Instant,
_transport_context: TransportContext,
mut _rtp_packet: rtp::packet::Packet,
now: Instant,
transport_context: TransportContext,
rtp_packet: rtp::packet::Packet,
) -> Result<Vec<TaggedMessageEvent>> {
//TODO: Selective Forwarding RTP Packets
Ok(vec![])
let peer_addrs = self.get_other_peer_addrs(&transport_context)?;

let mut outgoing_messages = Vec::with_capacity(peer_addrs.len());
for peer_addr in peer_addrs {
outgoing_messages.push(TaggedMessageEvent {
now,
transport: TransportContext {
local_addr: transport_context.local_addr,
peer_addr,
ecn: transport_context.ecn,
},
message: MessageEvent::RTP(RTPMessageEvent::RTP(rtp_packet.clone())),
});
}

Ok(outgoing_messages)
}

fn handle_rtcp_message(
&mut self,
_now: Instant,
_transport_context: TransportContext,
mut _rtcp_packets: Vec<Box<dyn rtcp::packet::Packet>>,
now: Instant,
transport_context: TransportContext,
rtcp_packets: Vec<Box<dyn rtcp::packet::Packet>>,
) -> Result<Vec<TaggedMessageEvent>> {
//TODO: Selective Forwarding RTCP Packets
Ok(vec![])
let peer_addrs = self.get_other_peer_addrs(&transport_context)?;

let mut outgoing_messages = Vec::with_capacity(peer_addrs.len());
for peer_addr in peer_addrs {
outgoing_messages.push(TaggedMessageEvent {
now,
transport: TransportContext {
local_addr: transport_context.local_addr,
peer_addr,
ecn: transport_context.ecn,
},
message: MessageEvent::RTP(RTPMessageEvent::RTCP(rtcp_packets.clone())),
});
}

Ok(outgoing_messages)
}

fn check_stun_message(
Expand Down Expand Up @@ -301,6 +332,32 @@ impl GatewayInbound {
}
}

fn get_other_peer_addrs(
&self,
transport_context: &TransportContext,
) -> Result<Vec<SocketAddr>> {
let four_tuple = transport_context.into();
let endpoint = self
.server_states
.find_endpoint(&four_tuple)
.ok_or(Error::ErrClientTransportNotSet)?;
let session = endpoint.session().upgrade().ok_or(Error::SessionEof)?;
let endpoint_id = endpoint.endpoint_id();

let mut peer_addrs = vec![];
let endpoints = session.endpoints().borrow();
for (&other_endpoint_id, other_endpoint) in &*endpoints {
if other_endpoint_id != endpoint_id {
let transports = other_endpoint.transports().borrow();
let four_tuples = transports.keys();
for other_four_tuple in four_tuples {
peer_addrs.push(other_four_tuple.peer_addr);
}
}
}
Ok(peer_addrs)
}

fn send_server_reflective_address(
&mut self,
now: Instant,
Expand Down Expand Up @@ -329,7 +386,6 @@ impl GatewayInbound {
})
}

#[allow(clippy::type_complexity)]
fn add_endpoint(
&mut self,
request: &stun::message::Message,
Expand Down
4 changes: 4 additions & 0 deletions src/server/session/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,10 @@ impl Session {
self.endpoints.borrow().contains_key(endpoint_id)
}

pub(crate) fn endpoints(&self) -> &RefCell<HashMap<EndpointId, Rc<Endpoint>>> {
&self.endpoints
}

pub(crate) fn set_remote_description(
&self,
endpoint: &Rc<Endpoint>,
Expand Down

0 comments on commit 21665e0

Please sign in to comment.