Skip to content

Commit

Permalink
implement test_datachannel
Browse files Browse the repository at this point in the history
  • Loading branch information
yngrtc committed Feb 17, 2024
1 parent ae25e2a commit 0d3624e
Show file tree
Hide file tree
Showing 11 changed files with 233 additions and 29 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/cargo.yml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ jobs:
- name: Build
run: cargo build --verbose
- name: Run tests
run: cargo test --verbose
run: cargo test --verbose --lib

rustfmt_and_clippy:
name: Check rustfmt style && run clippy
Expand Down
5 changes: 5 additions & 0 deletions examples/signal/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,11 @@ async fn remote_handler(
endpoint_id: _,
reason,
} => {
error!(
"SignalingProtocolMessage::Err {}",
String::from_utf8(reason.to_vec())
.unwrap_or("Unknown Error".to_string()),
);
let mut response = Response::new(Body::from(reason));
*response.status_mut() = StatusCode::INTERNAL_SERVER_ERROR;
return Ok(response);
Expand Down
2 changes: 1 addition & 1 deletion rtc
Submodule rtc updated from 22defe to 5260d3
4 changes: 2 additions & 2 deletions src/description/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -639,12 +639,12 @@ pub(crate) fn extract_fingerprint(desc: &SessionDescription) -> Result<(String,
let mut fingerprints = vec![];

if let Some(fingerprint) = desc.attribute("fingerprint") {
fingerprints.push(fingerprint.clone());
fingerprints.push(fingerprint.to_string());
}

for m in &desc.media_descriptions {
if let Some(fingerprint) = m.attribute("fingerprint").and_then(|o| o) {
fingerprints.push(fingerprint.to_owned());
fingerprints.push(fingerprint.to_string());
}
}

Expand Down
17 changes: 10 additions & 7 deletions src/endpoint/candidate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -153,13 +153,16 @@ impl ConnectionCredentials {
.ok_or(Error::ErrAttributeNotFound)?
.ok_or(Error::ErrAttributeNotFound)?
.to_string();
let fingerprint = sdp
.media_descriptions
.iter()
.find_map(|m| m.attribute("fingerprint"))
.ok_or(Error::ErrAttributeNotFound)?
.ok_or(Error::ErrAttributeNotFound)?
.try_into()?;
let fingerprint = if let Some(fingerprint) = sdp.attribute("fingerprint") {
fingerprint.try_into()?
} else {
sdp.media_descriptions
.iter()
.find_map(|m| m.attribute("fingerprint"))
.ok_or(Error::ErrAttributeNotFound)?
.ok_or(Error::ErrAttributeNotFound)?
.try_into()?
};
let role = DTLSRole::from(sdp);

Ok(Self {
Expand Down
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,4 @@ pub use handler::{
sctp::SctpHandler, srtp::SrtpHandler, stun::StunHandler,
};
pub use server::{certificate::RTCCertificate, config::ServerConfig, states::ServerStates};
pub use types::{EndpointId, SessionId};
180 changes: 178 additions & 2 deletions tests/common/mod.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,25 @@
use log::info;
#![allow(dead_code)]

use anyhow::Result;
use hyper::{Body, Client, Method, Request};
use log::LevelFilter::Debug;
use log::{error, info};
use rand::random;
use sfu::{EndpointId, SessionId};
use std::io::Write;
use std::sync::Arc;
use webrtc::api::interceptor_registry::register_default_interceptors;
use webrtc::api::media_engine::MediaEngine;
use webrtc::api::APIBuilder;
use webrtc::data_channel::data_channel_message::DataChannelMessage;
use webrtc::interceptor::registry::Registry;
use webrtc::peer_connection::configuration::RTCConfiguration;
use webrtc::peer_connection::peer_connection_state::RTCPeerConnectionState;
use webrtc::peer_connection::sdp::sdp_type::RTCSdpType;
use webrtc::peer_connection::sdp::session_description::RTCSessionDescription;
use webrtc::peer_connection::RTCPeerConnection;

pub fn setup() -> anyhow::Result<()> {
pub async fn setup(config: RTCConfiguration) -> Result<(Arc<RTCPeerConnection>, EndpointId)> {
env_logger::Builder::new()
.format(|buf, record| {
writeln!(
Expand All @@ -22,5 +39,164 @@ pub fn setup() -> anyhow::Result<()> {
// servers, etc.
info!("common setup");

// Create a MediaEngine object to configure the supported codec
let mut m = MediaEngine::default();

// Register default codecs
m.register_default_codecs()?;

// Create a InterceptorRegistry. This is the user configurable RTP/RTCP Pipeline.
// This provides NACKs, RTCP Reports and other features. If you use `webrtc.NewPeerConnection`
// this is enabled by default. If you are manually managing You MUST create a InterceptorRegistry
// for each PeerConnection.
let mut registry = Registry::new();

// Use the default set of Interceptors
registry = register_default_interceptors(registry, &mut m)?;

// Create the API object with the MediaEngine
let api = APIBuilder::new()
.with_media_engine(m)
.with_interceptor_registry(registry)
.build();

// Create a new RTCPeerConnection
let peer_connection = Arc::new(api.new_peer_connection(config).await?);

// Set the handler for Peer connection state
// This will notify you when the peer has connected/disconnected
peer_connection.on_peer_connection_state_change(Box::new(move |s: RTCPeerConnectionState| {
info!("Peer Connection State has changed: {s}");

if s == RTCPeerConnectionState::Failed {
// Wait until PeerConnection has had no network activity for 30 seconds or another failure. It may be reconnected using an ICE Restart.
// Use webrtc.PeerConnectionStateDisconnected if you are interested in detecting faster timeout.
// Note that the PeerConnection may come back from PeerConnectionStateDisconnected.
error!("Peer Connection has gone to failed exiting");
assert!(false);
}

Box::pin(async {})
}));

Ok((peer_connection, random::<u64>()))
}

pub async fn teardown(pc: Arc<RTCPeerConnection>) -> Result<()> {
pc.close().await?;

Ok(())
}

async fn signaling(
host: String,
signal_port: u16,
session_id: SessionId,
endpoint_id: EndpointId,
offer_payload: String,
) -> Result<RTCSessionDescription> {
let req = Request::builder()
.method(Method::POST)
.uri(format!(
"http://{host}:{signal_port}/offer/{session_id}/{endpoint_id}"
))
.header("content-type", "application/json; charset=utf-8")
.body(Body::from(offer_payload))?;

let resp = Client::new().request(req).await?;
let answer_payload =
std::str::from_utf8(&hyper::body::to_bytes(resp.into_body()).await?)?.to_string();
let answer = serde_json::from_str::<RTCSessionDescription>(&answer_payload)?;

Ok(answer)
}

pub async fn connect(
host: String,
signal_port: u16,
session_id: SessionId,
endpoint_id: EndpointId,
peer_connection: &Arc<RTCPeerConnection>,
) -> Result<()> {
// Create a datachannel with label 'data'
let data_channel = peer_connection.create_data_channel("data", None).await?;

// Register SDP message handling
let peer_connection_clone = peer_connection.clone();
let data_channel_clone = data_channel.clone();
data_channel.on_message(Box::new(move |msg: DataChannelMessage| {
let sdp_str = String::from_utf8(msg.data.to_vec()).unwrap();
info!("SDP from DataChannel: '{sdp_str}'");
let sdp = match serde_json::from_str::<RTCSessionDescription>(&sdp_str) {
Ok(sdp) => sdp,
Err(err) => {
error!("deserialize sdp str failed: {}", err);
assert!(false);
return Box::pin(async {});
}
};
let pc = peer_connection_clone.clone();
let dc = data_channel_clone.clone();
Box::pin(async move {
match sdp.sdp_type {
RTCSdpType::Offer => {
if let Err(err) = pc.set_remote_description(sdp).await {
error!("set_remote_description offer error {:?}", err);
assert!(false);
return;
}

// Create an answer to send to the other process
let answer = match pc.create_answer(None).await {
Ok(a) => a,
Err(err) => {
error!("create_answer error {:?}", err);
assert!(false);
return;
}
};

let answer_str = match serde_json::to_string(&answer) {
Ok(a) => a,
Err(err) => {
error!("serialize answer error {:?}", err);
assert!(false);
return;
}
};
if let Err(err) = dc.send_text(answer_str).await {
error!("data channel send answer error {:?}", err);
assert!(false);
return;
}
}
RTCSdpType::Answer => {
if let Err(err) = pc.set_remote_description(sdp).await {
error!("set_remote_description answer error {:?}", err);
assert!(false);
return;
}
}
_ => {
error!("Unsupported SDP type {}", sdp.sdp_type);
assert!(false);
}
}
})
}));

// Create an offer to send to the other process
let offer = peer_connection.create_offer(None).await?;

// Send our offer to the HTTP server listening in the other process
let offer_payload = serde_json::to_string(&offer)?;

// Sets the LocalDescription, and starts our UDP listeners
// Note: this will start the gathering of ICE candidates
peer_connection.set_local_description(offer).await?;

let answer = signaling(host, signal_port, session_id, endpoint_id, offer_payload).await?;
peer_connection.set_remote_description(answer).await?;

Ok(())
}
27 changes: 23 additions & 4 deletions tests/datachannel_test.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,29 @@
use rand::random;
use sfu::SessionId;
use webrtc::ice_transport::ice_server::RTCIceServer;
use webrtc::peer_connection::configuration::RTCConfiguration;

// importing common module.
mod common;

#[test]
fn test_datachannel() -> anyhow::Result<()> {
// using common code.
common::setup()?;
#[tokio::test]
async fn test_datachannel() -> anyhow::Result<()> {
// Prepare the configuration
let config = RTCConfiguration {
ice_servers: vec![RTCIceServer {
urls: vec!["stun:stun.l.google.com:19302".to_owned()],
..Default::default()
}],
..Default::default()
};
let host = "127.0.0.1".to_string();
let signal_port = 8080u16;
let session_id: SessionId = random::<u64>();

let (peer_connection, endpoint_id) = common::setup(config).await?;

common::connect(host, signal_port, session_id, endpoint_id, &peer_connection).await?;

common::teardown(peer_connection).await?;
Ok(())
}
8 changes: 4 additions & 4 deletions tests/rtcp_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@
mod common;

#[ignore]
#[test]
fn test_rtcp() -> anyhow::Result<()> {
// using common code.
common::setup()?;
#[tokio::test]
async fn test_rtcp() -> anyhow::Result<()> {
/*let (pc, _done_rx) = common::setup().await?;
common::teardown(pc).await?;*/
Ok(())
}
8 changes: 4 additions & 4 deletions tests/rtp_test.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
// importing common module.
mod common;

#[test]
fn test_rtp() -> anyhow::Result<()> {
// using common code.
common::setup()?;
#[tokio::test]
async fn test_rtp() -> anyhow::Result<()> {
/*let (pc, _done_rx) = common::setup().await?;
common::teardown(pc).await?;*/
Ok(())
}
8 changes: 4 additions & 4 deletions tests/signaling_test.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
// importing common module.
mod common;

#[test]
fn test_signaling() -> anyhow::Result<()> {
// using common code.
common::setup()?;
#[tokio::test]
async fn test_signaling() -> anyhow::Result<()> {
/*let (pc, _done_rx) = common::setup().await?;
common::teardown(pc).await?;*/
Ok(())
}

0 comments on commit 0d3624e

Please sign in to comment.