Skip to content

Commit

Permalink
add track_remote_rx await
Browse files Browse the repository at this point in the history
  • Loading branch information
yngrtc committed Feb 19, 2024
1 parent bf893bc commit cc726cf
Show file tree
Hide file tree
Showing 5 changed files with 97 additions and 33 deletions.
22 changes: 18 additions & 4 deletions src/description/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -608,9 +608,8 @@ pub(crate) fn get_msid(media: &MediaDescription) -> Option<MediaStreamId> {
None
}

pub(crate) fn get_ssrc_groups(media: &MediaDescription) -> Result<(Vec<SsrcGroup>, HashSet<SSRC>)> {
pub(crate) fn get_ssrc_groups(media: &MediaDescription) -> Result<Vec<SsrcGroup>> {
let mut ssrc_groups = vec![];
let mut ssrc_set = HashSet::new();

for a in &media.attributes {
if a.key == "ssrc-group" {
Expand All @@ -621,7 +620,6 @@ pub(crate) fn get_ssrc_groups(media: &MediaDescription) -> Result<(Vec<SsrcGroup
for field in fields.iter().skip(1) {
let ssrc = field.parse::<u32>()?;
ssrcs.push(ssrc);
ssrc_set.insert(ssrc);
}
ssrc_groups.push(SsrcGroup {
name: fields[0].to_string(),
Expand All @@ -632,7 +630,23 @@ pub(crate) fn get_ssrc_groups(media: &MediaDescription) -> Result<(Vec<SsrcGroup
}
}

Ok((ssrc_groups, ssrc_set))
Ok(ssrc_groups)
}

pub(crate) fn get_ssrcs(media: &MediaDescription) -> Result<HashSet<SSRC>> {
let mut ssrc_set = HashSet::new();
for a in &media.attributes {
if a.key == "ssrc" {
if let Some(value) = a.value.as_ref() {
let fields: Vec<&str> = value.split_whitespace().collect();
if fields.len() >= 1 {
let ssrc = fields[0].parse::<u32>()?;
ssrc_set.insert(ssrc);
}
}
}
}
Ok(ssrc_set)
}

pub(crate) fn extract_fingerprint(desc: &SessionDescription) -> Result<(String, String)> {
Expand Down
5 changes: 3 additions & 2 deletions src/session/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ pub(crate) mod config;

use crate::description::{
codecs_from_media_description, get_cname, get_mid_value, get_msid, get_peer_direction,
get_rids, get_ssrc_groups, populate_sdp, rtp_extensions_from_media_description,
get_rids, get_ssrc_groups, get_ssrcs, populate_sdp, rtp_extensions_from_media_description,
update_sdp_origin, MediaSection, RTCSessionDescription, MEDIA_SECTION_APPLICATION,
};
use crate::description::{
Expand Down Expand Up @@ -155,7 +155,8 @@ impl Session {
if !has_mid_value {
let cname = get_cname(media);
let msid = get_msid(media);
let (ssrc_groups, ssrcs) = get_ssrc_groups(media)?;
let ssrc_groups = get_ssrc_groups(media)?;
let ssrcs = get_ssrcs(media)?;
let codecs = codecs_from_media_description(media)?;
let header_extensions = rtp_extensions_from_media_description(media)?;
let rtp_params = RTCRtpParameters {
Expand Down
42 changes: 34 additions & 8 deletions tests/common/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ use anyhow::Result;
use hyper::{Body, Client, Method, Request};
use log::LevelFilter::Debug;
use log::{error, info};
use rand::random;
use sfu::{EndpointId, SessionId};
use std::io::Write;
use std::sync::Arc;
Expand All @@ -24,10 +23,12 @@ use webrtc::peer_connection::sdp::sdp_type::RTCSdpType;
use webrtc::peer_connection::sdp::session_description::RTCSessionDescription;
use webrtc::peer_connection::RTCPeerConnection;
use webrtc::rtp_transceiver::rtp_codec::RTCRtpCodecCapability;
use webrtc::rtp_transceiver::rtp_sender::RTCRtpSender;
use webrtc::rtp_transceiver::rtp_transceiver_direction::RTCRtpTransceiverDirection;
use webrtc::rtp_transceiver::{RTCRtpTransceiver, RTCRtpTransceiverInit};
use webrtc::rtp_transceiver::RTCRtpTransceiverInit;
use webrtc::track::track_local::track_local_static_sample::TrackLocalStaticSample;
use webrtc::track::track_local::TrackLocal;
use webrtc::track::track_remote::TrackRemote;

pub const HOST: &'static str = "127.0.0.1";
pub const SIGNAL_PORT: u16 = 8080;
Expand All @@ -39,6 +40,7 @@ fn pretty_sdp(input: &str) -> String {

pub async fn setup_peer_connection(
config: RTCConfiguration,
endpoint_id: EndpointId,
) -> Result<(EndpointId, Arc<RTCPeerConnection>)> {
let _ = env_logger::Builder::new()
.format(|buf, record| {
Expand All @@ -55,8 +57,6 @@ pub async fn setup_peer_connection(
.filter(None, Debug)
.try_init();

let endpoint_id = random::<u64>();

// some setup code, like creating required files/directories, starting
// servers, etc.
info!("setup_peer_connection {}", endpoint_id);
Expand Down Expand Up @@ -106,11 +106,12 @@ pub async fn setup_peer_connection(

pub async fn setup_peer_connections(
configs: Vec<RTCConfiguration>,
endpoint_ids: Vec<EndpointId>,
) -> Result<Vec<(EndpointId, Arc<RTCPeerConnection>)>> {
let mut peer_connections = Vec::with_capacity(configs.len());

for config in configs {
let (endpoint_id, peer_connection) = setup_peer_connection(config).await?;
for (config, endpoint_id) in configs.into_iter().zip(endpoint_ids) {
let (endpoint_id, peer_connection) = setup_peer_connection(config, endpoint_id).await?;
peer_connections.push((endpoint_id, peer_connection));
}

Expand Down Expand Up @@ -276,6 +277,14 @@ pub async fn connect(
endpoint_id,
pretty_sdp(&answer_str)
);

// Sets the LocalDescription, and starts our UDP listeners
if let Err(err) = pc.set_local_description(answer).await {
error!("create_answer error {:?}", err);
assert!(false);
return;
}

if let Err(err) = dc.send_text(answer_str).await {
error!("data channel send answer error {:?}", err);
assert!(false);
Expand Down Expand Up @@ -340,7 +349,7 @@ pub async fn add_track(
mime_type: &str,
track_id: &str,
direction: RTCRtpTransceiverDirection,
) -> Result<Arc<RTCRtpTransceiver>> {
) -> Result<(Arc<RTCRtpSender>, Arc<TrackLocalStaticSample>)> {
// Create a video track
let track = Arc::new(TrackLocalStaticSample::new(
RTCRtpCodecCapability {
Expand All @@ -362,5 +371,22 @@ pub async fn add_track(
)
.await?;

Ok(rtp_transceiver)
Ok((rtp_transceiver.sender().await, track))
}

pub async fn on_track(
peer_connection: &Arc<RTCPeerConnection>,
) -> Result<UnboundedReceiver<Arc<TrackRemote>>> {
let (track_tx, track_rx) = tokio::sync::mpsc::unbounded_channel::<Arc<TrackRemote>>();
peer_connection.on_track(Box::new(move |track, _, _| {
let tx = track_tx.clone();
Box::pin(async move {
if let Err(err) = tx.send(track) {
error!("track_tx send error {}", err);
assert!(false);
}
})
}));

Ok(track_rx)
}
22 changes: 13 additions & 9 deletions tests/data_channels_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ async fn test_data_channel() -> anyhow::Result<()> {
..Default::default()
};

let (endpoint_id, peer_connection) = match common::setup_peer_connection(config).await {
let (endpoint_id, peer_connection) = match common::setup_peer_connection(config, 0).await {
Ok(ok) => ok,
Err(err) => {
error!("error: {}", err);
Expand Down Expand Up @@ -58,14 +58,18 @@ async fn test_data_channels() -> anyhow::Result<()> {
..Default::default()
};

let peer_connections =
match common::setup_peer_connections(vec![config.clone(), config.clone(), config]).await {
Ok(ok) => ok,
Err(err) => {
error!("{}: error {}", session_id, err);
return Err(err.into());
}
};
let peer_connections = match common::setup_peer_connections(
vec![config.clone(), config.clone(), config],
vec![0, 1, 2],
)
.await
{
Ok(ok) => ok,
Err(err) => {
error!("{}: error {}", session_id, err);
return Err(err.into());
}
};

for (endpoint_id, peer_connection) in peer_connections.iter() {
match common::connect(HOST, SIGNAL_PORT, session_id, *endpoint_id, peer_connection).await {
Expand Down
39 changes: 29 additions & 10 deletions tests/play_from_disk_vpx_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use crate::common::{HOST, SIGNAL_PORT};
use log::{error, info};
use rand::random;
use sfu::SessionId;
//use shared::error::Error;
use webrtc::api::media_engine::MIME_TYPE_VP8;
use webrtc::ice_transport::ice_server::RTCIceServer;
use webrtc::peer_connection::configuration::RTCConfiguration;
Expand All @@ -23,14 +24,14 @@ async fn test_play_from_disk_vpx_1to1() -> anyhow::Result<()> {
..Default::default()
};

let peer_connections = match common::setup_peer_connections(vec![config.clone(), config]).await
{
Ok(ok) => ok,
Err(err) => {
error!("{}: error {}", session_id, err);
return Err(err.into());
}
};
let peer_connections =
match common::setup_peer_connections(vec![config.clone(), config], vec![0, 1]).await {
Ok(ok) => ok,
Err(err) => {
error!("{}: error {}", session_id, err);
return Err(err.into());
}
};

let mut data_channels = vec![];
for (endpoint_id, peer_connection) in peer_connections.iter() {
Expand All @@ -47,7 +48,7 @@ async fn test_play_from_disk_vpx_1to1() -> anyhow::Result<()> {
data_channels.push((data_channel_tx, data_channel_rx));
}

let rtp_transceiver = match common::add_track(
let (rtp_sender, _track_local) = match common::add_track(
&peer_connections[0].1,
MIME_TYPE_VP8,
"video_track",
Expand All @@ -66,13 +67,20 @@ async fn test_play_from_disk_vpx_1to1() -> anyhow::Result<()> {
// Before these packets are returned they are processed by interceptors. For things
// like NACK this needs to be called.
tokio::spawn(async move {
let rtp_sender = rtp_transceiver.sender().await;
while let Ok((rtcp_packets, _)) = rtp_sender.read_rtcp().await {
info!("received RTCP packets {:?}", rtcp_packets);
//TODO: check RTCP report and handle cancel
}
});

let _track_remote_rx = match common::on_track(&peer_connections[1].1).await {
Ok(ok) => ok,
Err(err) => {
error!("{}/{}: error {}", session_id, peer_connections[1].0, err);
return Err(err.into());
}
};

match common::renegotiate(
HOST,
SIGNAL_PORT,
Expand Down Expand Up @@ -106,6 +114,17 @@ async fn test_play_from_disk_vpx_1to1() -> anyhow::Result<()> {
assert!(false);
}

// waiting for track_remote for endpoint 1
/*let _track_remote = match track_remote_rx.recv().await {
Some(track_remote) => track_remote,
None => {
assert!(false);
return Err(Error::Other("track remote rx close".to_string()).into());
}
};*/

// Verify track_local and track_remote match

match common::teardown_peer_connections(peer_connections).await {
Ok(ok) => ok,
Err(err) => {
Expand Down

0 comments on commit cc726cf

Please sign in to comment.