diff --git a/Cargo.toml b/Cargo.toml index 2885161..7270d1d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -9,7 +9,7 @@ documentation = "https://docs.rs/sfu" repository = "https://github.com/webrtc-rs/sfu" homepage = "https://sfu.rs" keywords = ["networking", "protocols"] -categories = [ "network-programming", "asynchronous" ] +categories = ["network-programming", "asynchronous"] [dependencies] retty = "0.24.0" @@ -25,17 +25,18 @@ sha2 = "0.10.8" rustls = "0.21.7" url = { version = "2", features = [] } hex = { version = "0.4.3", features = [] } +opentelemetry = { version = "0.21", features = ["metrics"] } # RTC protocols -shared = { path = "rtc/shared"} -sdp = { path = "rtc/sdp"} -stun = { path = "rtc/stun"} -rtp = { path = "rtc/rtp"} -rtcp = { path = "rtc/rtcp"} -srtp = { path = "rtc/srtp"} -dtls = { path = "rtc/dtls"} -sctp = { path = "rtc/sctp"} -data = { path = "rtc/data"} +shared = { path = "rtc/shared" } +sdp = { path = "rtc/sdp" } +stun = { path = "rtc/stun" } +rtp = { path = "rtc/rtp" } +rtcp = { path = "rtc/rtcp" } +srtp = { path = "rtc/srtp" } +dtls = { path = "rtc/dtls" } +sctp = { path = "rtc/sctp" } +data = { path = "rtc/data" } [dev-dependencies] webrtc = { path = "webrtc/webrtc" } @@ -54,6 +55,9 @@ waitgroup = "0.1.2" thiserror = "1.0.53" core_affinity = "0.8.1" num_cpus = "1.16.0" +opentelemetry_sdk = { version = "0.21", features = ["metrics", "rt-tokio-current-thread"] } +opentelemetry-stdout = { version = "0.2.0", features = ["metrics"] } + [[example]] name = "chat" diff --git a/examples/chat.rs b/examples/chat.rs index 1b31f84..9453a8f 100644 --- a/examples/chat.rs +++ b/examples/chat.rs @@ -1,29 +1,36 @@ -use crate::signal::{handle_signaling_message, SignalingMessage, SignalingServer}; -use async_broadcast::broadcast; +extern crate num_cpus; + +use std::cell::RefCell; +use std::collections::HashMap; +use std::io::Write; +use std::net::SocketAddr; +use std::rc::Rc; +use std::str::FromStr; +use std::sync::Arc; + +use async_broadcast::{broadcast, Receiver}; use clap::Parser; use dtls::extension::extension_use_srtp::SrtpProtectionProfile; use log::{error, info}; +use opentelemetry::{/*global,*/ metrics::MeterProvider as _, KeyValue}; +use opentelemetry_sdk::metrics::{MeterProvider, PeriodicReader}; +use opentelemetry_sdk::{runtime, Resource}; +use opentelemetry_stdout::MetricsExporterBuilder; use retty::bootstrap::BootstrapUdpServer; use retty::channel::Pipeline; use retty::executor::LocalExecutorBuilder; use retty::transport::{AsyncTransport, AsyncTransportWrite, TaggedBytesMut}; +use waitgroup::{WaitGroup, Worker}; + use sfu::{ DataChannelHandler, DemuxerHandler, DtlsHandler, ExceptionHandler, GatewayHandler, InterceptorHandler, RTCCertificate, SctpHandler, ServerConfig, ServerStates, SrtpHandler, StunHandler, }; -use std::cell::RefCell; -use std::collections::HashMap; -use std::io::Write; -use std::net::SocketAddr; -use std::rc::Rc; -use std::str::FromStr; -use std::sync::Arc; -use waitgroup::WaitGroup; -mod signal; +use crate::signal::{handle_signaling_message, SignalingMessage, SignalingServer}; -extern crate num_cpus; +mod signal; #[derive(Default, Debug, Copy, Clone, clap::ValueEnum)] enum Level { @@ -69,6 +76,40 @@ struct Cli { level: Level, } +fn init_meter_provider(mut stop_rx: Receiver<()>, worker: Worker) -> MeterProvider { + let (tx, rx) = std::sync::mpsc::channel(); + + std::thread::spawn(move || { + let rt = tokio::runtime::Builder::new_current_thread() + .enable_io() + .enable_time() + .build() + .unwrap(); + + rt.block_on(async move { + let _worker = worker; + let exporter = MetricsExporterBuilder::default() + .with_encoder(|writer, data| { + Ok(serde_json::to_writer_pretty(writer, &data).unwrap()) + }) + .build(); + let reader = PeriodicReader::builder(exporter, runtime::TokioCurrentThread).build(); + let meter_provider = MeterProvider::builder() + .with_reader(reader) + .with_resource(Resource::new(vec![KeyValue::new("chat", "metrics")])) + .build(); + let _ = tx.send(meter_provider.clone()); + + let _ = stop_rx.recv().await; + let _ = meter_provider.shutdown(); + info!("meter provider is gracefully down"); + }); + }); + + let meter_provider = rx.recv().unwrap(); + meter_provider +} + fn main() -> anyhow::Result<()> { let cli = Cli::parse(); if cli.debug { @@ -103,10 +144,12 @@ fn main() -> anyhow::Result<()> { )?])); let core_num = num_cpus::get(); let wait_group = WaitGroup::new(); + let meter_provider = init_meter_provider(stop_rx.clone(), wait_group.worker()); for port in media_ports { let worker = wait_group.worker(); let host = cli.host.clone(); + let meter_provider = meter_provider.clone(); let mut stop_rx = stop_rx.clone(); let (signaling_tx, signaling_rx) = smol::channel::unbounded::(); media_port_thread_map.insert(port, signaling_tx); @@ -130,7 +173,8 @@ fn main() -> anyhow::Result<()> { let sctp_endpoint_config = sctp::EndpointConfig::default(); let server_states = Rc::new(RefCell::new(ServerStates::new(server_config, - SocketAddr::from_str(&format!("{}:{}", host, port)).unwrap()).unwrap())); + SocketAddr::from_str(&format!("{}:{}", host, port)).unwrap(), + meter_provider.meter(format!("{}:{}", host, port))).unwrap())); info!("listening {}:{}...", host, port); diff --git a/src/description/mod.rs b/src/description/mod.rs index 937fc0e..b1ea13d 100644 --- a/src/description/mod.rs +++ b/src/description/mod.rs @@ -639,7 +639,7 @@ pub(crate) fn get_ssrcs(media: &MediaDescription) -> Result> { if a.key == "ssrc" { if let Some(value) = a.value.as_ref() { let fields: Vec<&str> = value.split_whitespace().collect(); - if fields.len() >= 1 { + if !fields.is_empty() { let ssrc = fields[0].parse::()?; ssrc_set.insert(ssrc); } diff --git a/src/handler/srtp.rs b/src/handler/srtp.rs index c20c059..46b2c85 100644 --- a/src/handler/srtp.rs +++ b/src/handler/srtp.rs @@ -10,6 +10,7 @@ use shared::{ }; use std::cell::RefCell; use std::rc::Rc; +use std::time::Instant; struct SrtpInbound { server_states: Rc>, // for remote_srtp_context @@ -53,8 +54,13 @@ impl InboundHandler for SrtpInbound { if rtcp_packets.is_empty() { return Err(Error::Other("empty rtcp_packets".to_string())); } + + server_states.metrics().record_rtcp_packet_in_count(1, &[]); Ok(MessageEvent::Rtp(RTPMessageEvent::Rtcp(rtcp_packets))) } else { + server_states + .metrics() + .record_remote_srtp_context_not_set_count(1, &[]); Err(Error::Other(format!( "remote_srtp_context is not set yet for four_tuple {:?}", four_tuple @@ -65,8 +71,13 @@ impl InboundHandler for SrtpInbound { if let Some(context) = remote_context.as_mut() { let mut decrypted = context.decrypt_rtp(&message)?; let rtp_packet = rtp::Packet::unmarshal(&mut decrypted)?; + + server_states.metrics().record_rtp_packet_in_count(1, &[]); Ok(MessageEvent::Rtp(RTPMessageEvent::Rtp(rtp_packet))) } else { + server_states + .metrics() + .record_remote_srtp_context_not_set_count(1, &[]); Err(Error::Other(format!( "remote_srtp_context is not set yet for four_tuple {:?}", four_tuple @@ -113,8 +124,18 @@ impl OutboundHandler for SrtpOutbound { let mut local_context = transport.local_srtp_context(); if let Some(context) = local_context.as_mut() { let packet = rtcp::packet::marshal(&rtcp_packets)?; - context.encrypt_rtcp(&packet) + let rtcp_packet = context.encrypt_rtcp(&packet); + + server_states.metrics().record_rtcp_packet_out_count(1, &[]); + rtcp_packet } else { + let metrics = server_states.metrics(); + metrics.record_local_srtp_context_not_set_count(1, &[]); + metrics.record_rtcp_packet_processing_time( + Instant::now().duration_since(msg.now).as_micros() as u64, + &[], + ); + Err(Error::Other(format!( "local_srtp_context is not set yet for four_tuple {:?}", four_tuple @@ -125,8 +146,18 @@ impl OutboundHandler for SrtpOutbound { let mut local_context = transport.local_srtp_context(); if let Some(context) = local_context.as_mut() { let packet = rtp_message.marshal()?; - context.encrypt_rtp(&packet) + let rtp_packet = context.encrypt_rtp(&packet); + + server_states.metrics().record_rtp_packet_out_count(1, &[]); + rtp_packet } else { + let metrics = server_states.metrics(); + metrics.record_local_srtp_context_not_set_count(1, &[]); + metrics.record_rtp_packet_processing_time( + Instant::now().duration_since(msg.now).as_micros() as u64, + &[], + ); + Err(Error::Other(format!( "local_srtp_context is not set yet for four_tuple {:?}", four_tuple diff --git a/src/lib.rs b/src/lib.rs index 3cc7f28..5c3746b 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -6,6 +6,7 @@ pub(crate) mod endpoint; pub(crate) mod handler; pub(crate) mod interceptor; pub(crate) mod messages; +pub(crate) mod metrics; pub(crate) mod server; pub(crate) mod session; pub(crate) mod types; diff --git a/src/metrics/mod.rs b/src/metrics/mod.rs new file mode 100644 index 0000000..3391f18 --- /dev/null +++ b/src/metrics/mod.rs @@ -0,0 +1,89 @@ +use opentelemetry::metrics::{Counter, Meter, ObservableGauge, Unit}; +use opentelemetry::KeyValue; + +pub(crate) struct Metrics { + rtp_packet_in_count: Counter, + rtp_packet_out_count: Counter, + rtcp_packet_in_count: Counter, + rtcp_packet_out_count: Counter, + remote_srtp_context_not_set_count: Counter, + local_srtp_context_not_set_count: Counter, + rtp_packet_processing_time: ObservableGauge, + rtcp_packet_processing_time: ObservableGauge, +} + +impl Metrics { + pub(crate) fn new(meter: Meter) -> Self { + Self { + rtp_packet_in_count: meter.u64_counter("rtp_packet_in_count").init(), + rtp_packet_out_count: meter.u64_counter("rtp_packet_out_count").init(), + rtcp_packet_in_count: meter.u64_counter("rtcp_packet_in_count").init(), + rtcp_packet_out_count: meter.u64_counter("rtcp_packet_out_count").init(), + remote_srtp_context_not_set_count: meter + .u64_counter("remote_srtp_context_not_set_count") + .init(), + local_srtp_context_not_set_count: meter + .u64_counter("local_srtp_context_not_set_count") + .init(), + rtp_packet_processing_time: meter + .u64_observable_gauge("rtp_packet_processing_time") + .with_unit(Unit::new("us")) + .init(), + rtcp_packet_processing_time: meter + .u64_observable_gauge("rtcp_packet_processing_time") + .with_unit(Unit::new("us")) + .init(), + } + } + + pub(crate) fn record_rtp_packet_in_count(&self, value: u64, attributes: &[KeyValue]) { + self.rtp_packet_in_count.add(value, attributes); + } + + pub(crate) fn record_rtp_packet_out_count(&self, value: u64, attributes: &[KeyValue]) { + self.rtp_packet_out_count.add(value, attributes); + } + + pub(crate) fn record_rtcp_packet_in_count(&self, value: u64, attributes: &[KeyValue]) { + self.rtcp_packet_in_count.add(value, attributes); + } + + pub(crate) fn record_rtcp_packet_out_count(&self, value: u64, attributes: &[KeyValue]) { + self.rtcp_packet_out_count.add(value, attributes); + } + + pub(crate) fn record_remote_srtp_context_not_set_count( + &self, + value: u64, + attributes: &[KeyValue], + ) { + self.remote_srtp_context_not_set_count + .add(value, attributes); + } + + pub(crate) fn record_local_srtp_context_not_set_count( + &self, + value: u64, + attributes: &[KeyValue], + ) { + self.local_srtp_context_not_set_count.add(value, attributes); + } + + pub(crate) fn record_rtp_packet_processing_time( + &self, + measurement: u64, + attributes: &[KeyValue], + ) { + self.rtp_packet_processing_time + .observe(measurement, attributes); + } + + pub(crate) fn record_rtcp_packet_processing_time( + &self, + measurement: u64, + attributes: &[KeyValue], + ) { + self.rtcp_packet_processing_time + .observe(measurement, attributes); + } +} diff --git a/src/server/states.rs b/src/server/states.rs index b87f7da..4e63155 100644 --- a/src/server/states.rs +++ b/src/server/states.rs @@ -1,25 +1,30 @@ +use std::collections::hash_map::Entry; +use std::collections::HashMap; +use std::net::SocketAddr; +use std::rc::Rc; +use std::sync::Arc; +use std::time::Instant; + +use opentelemetry::metrics::Meter; +use sctp::{Association, AssociationHandle}; +use shared::error::{Error, Result}; + use crate::description::RTCSessionDescription; use crate::endpoint::{ candidate::{Candidate, ConnectionCredentials}, transport::Transport, Endpoint, }; +use crate::metrics::Metrics; use crate::server::config::ServerConfig; use crate::session::{config::SessionConfig, Session}; use crate::types::{EndpointId, FourTuple, SessionId, UserName}; -use sctp::{Association, AssociationHandle}; -use shared::error::{Error, Result}; -use std::collections::hash_map::Entry; -use std::collections::HashMap; -use std::net::SocketAddr; -use std::rc::Rc; -use std::sync::Arc; -use std::time::Instant; pub struct ServerStates { server_config: Arc, local_addr: SocketAddr, sessions: HashMap, + metrics: Metrics, //TODO: add idle timeout cleanup logic to remove idle endpoint and candidates candidates: HashMap>, @@ -29,7 +34,11 @@ pub struct ServerStates { impl ServerStates { /// create new server states - pub fn new(server_config: Arc, local_addr: SocketAddr) -> Result { + pub fn new( + server_config: Arc, + local_addr: SocketAddr, + meter: Meter, + ) -> Result { let _ = server_config .certificates .first() @@ -42,6 +51,7 @@ impl ServerStates { server_config, local_addr, sessions: HashMap::new(), + metrics: Metrics::new(meter), candidates: HashMap::new(), endpoints: HashMap::new(), @@ -110,6 +120,10 @@ impl ServerStates { Ok(answer) } + pub(crate) fn metrics(&self) -> &Metrics { + &self.metrics + } + pub(crate) fn accept_answer( &mut self, session_id: SessionId,