diff --git a/server/Cargo.lock b/server/Cargo.lock index 6532bc7bf..ed4cd4c52 100644 --- a/server/Cargo.lock +++ b/server/Cargo.lock @@ -1054,15 +1054,6 @@ version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "338089f42c427b86394a5ee60ff321da23a5c89c9d89514c829687b26359fcff" -[[package]] -name = "crossbeam-channel" -version = "0.5.13" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "33480d6946193aa8033910124896ca395333cae7e2d1113d1fef6c3272217df2" -dependencies = [ - "crossbeam-utils", -] - [[package]] name = "crossbeam-queue" version = "0.3.11" @@ -2965,9 +2956,9 @@ dependencies = [ [[package]] name = "opentelemetry" -version = "0.22.0" +version = "0.23.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "900d57987be3f2aeb70d385fff9b27fb74c5723cc9a52d904d4f9c807a0667bf" +checksum = "1b69a91d4893e713e06f724597ad630f1fa76057a5e1026c0ca67054a9032a76" dependencies = [ "futures-core", "futures-sink", @@ -2975,14 +2966,13 @@ dependencies = [ "once_cell", "pin-project-lite", "thiserror", - "urlencoding", ] [[package]] name = "opentelemetry-http" -version = "0.11.1" +version = "0.12.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7690dc77bf776713848c4faa6501157469017eaf332baccd4eb1cea928743d94" +checksum = "b0ba633e55c5ea6f431875ba55e71664f2fa5d3a90bd34ec9302eecc41c865dd" dependencies = [ "async-trait", "bytes", @@ -2992,16 +2982,15 @@ dependencies = [ [[package]] name = "opentelemetry-otlp" -version = "0.15.0" +version = "0.16.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1a016b8d9495c639af2145ac22387dcb88e44118e45320d9238fbf4e7889abcb" +checksum = "a94c69209c05319cdf7460c6d4c055ed102be242a0a6245835d7bc42c6ec7f54" dependencies = [ "async-trait", "futures-core", "http 0.2.12", "opentelemetry", "opentelemetry-proto", - "opentelemetry-semantic-conventions", "opentelemetry_sdk", "prost", "thiserror", @@ -3011,9 +3000,9 @@ dependencies = [ [[package]] name = "opentelemetry-proto" -version = "0.5.0" +version = "0.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3a8fddc9b68f5b80dae9d6f510b88e02396f006ad48cac349411fbecc80caae4" +checksum = "984806e6cf27f2b49282e2a05e288f30594f3dbc74eb7a6e99422bc48ed78162" dependencies = [ "opentelemetry", "opentelemetry_sdk", @@ -3021,24 +3010,18 @@ dependencies = [ "tonic", ] -[[package]] -name = "opentelemetry-semantic-conventions" -version = "0.14.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f9ab5bd6c42fb9349dcf28af2ba9a0667f697f9bdcca045d39f2cec5543e2910" - [[package]] name = "opentelemetry_sdk" -version = "0.22.1" +version = "0.23.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9e90c7113be649e31e9a0f8b5ee24ed7a16923b322c3c5ab6367469c049d6b7e" +checksum = "ae312d58eaa90a82d2e627fd86e075cf5230b3f11794e2ed74199ebbe572d4fd" dependencies = [ "async-trait", - "crossbeam-channel", "futures-channel", "futures-executor", "futures-util", "glob", + "lazy_static", "once_cell", "opentelemetry", "ordered-float 4.2.1", @@ -4911,6 +4894,7 @@ dependencies = [ "figment", "form_urlencoded", "futures", + "hex", "hickory-resolver", "hmac-sha256", "http 0.2.12", @@ -5442,9 +5426,9 @@ dependencies = [ [[package]] name = "tracing-opentelemetry" -version = "0.23.0" +version = "0.24.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a9be14ba1bbe4ab79e9229f7f89fab8d120b865859f10527f31c033e599d2284" +checksum = "f68803492bf28ab40aeccaecc7021096bd256baf7ca77c3d425d89b35a7be4e4" dependencies = [ "js-sys", "once_cell", diff --git a/server/svix-server/Cargo.toml b/server/svix-server/Cargo.toml index fdfd43e8e..c12227b19 100644 --- a/server/svix-server/Cargo.toml +++ b/server/svix-server/Cargo.toml @@ -33,11 +33,11 @@ once_cell = "1.18.0" figment = { version = "0.10", features = ["toml", "env", "test"] } tracing = "0.1.35" tracing-subscriber = { version = "0.3", features = ["env-filter", "json"] } -tracing-opentelemetry = "0.23.0" -opentelemetry = "0.22.0" -opentelemetry_sdk = { version = "0.22.1", features = ["rt-tokio"] } -opentelemetry-http = "0.11.0" -opentelemetry-otlp = { version = "0.15.0" } +tracing-opentelemetry = "0.24.0" +opentelemetry = { version = "0.23.0", features = ["metrics"] } +opentelemetry_sdk = { version = "0.23.0", features = ["rt-tokio"] } +opentelemetry-http = "0.12.0" +opentelemetry-otlp = { version = "0.16.0", features = ["metrics"] } validator = { version = "0.16.0", features = ["derive"] } jwt-simple = "0.11.6" ed25519-compact = "2.1.1" @@ -72,6 +72,7 @@ omniqueue = { git = "https://github.com/svix/omniqueue-rs", rev = "75e5a9510ad33 # Not a well-known author, and no longer gets updates => pinned. # Switch to hyper-http-proxy when upgrading hyper to 1.0. hyper-proxy = { version = "=0.9.1", default-features = false, features = ["openssl-tls"] } +hex = "0.4.3" [target.'cfg(not(target_env = "msvc"))'.dependencies] tikv-jemallocator = { version = "0.5", optional = true } diff --git a/server/svix-server/src/lib.rs b/server/svix-server/src/lib.rs index 16af6dead..f71b18f3c 100644 --- a/server/svix-server/src/lib.rs +++ b/server/svix-server/src/lib.rs @@ -13,12 +13,14 @@ use std::{ use aide::axum::ApiRouter; use cfg::ConfigurationInner; +use once_cell::sync::Lazy; use opentelemetry_otlp::WithExportConfig; -use opentelemetry_sdk::runtime::Tokio; +use opentelemetry_sdk::{metrics::SdkMeterProvider, runtime::Tokio}; use queue::TaskQueueProducer; use redis::RedisManager; use sea_orm::DatabaseConnection; use sentry::integrations::tracing::EventFilter; +use svix_ksuid::{KsuidLike, KsuidMs}; use tower::layer::layer_fn; use tower_http::{ cors::{AllowHeaders, Any, CorsLayer}, @@ -44,6 +46,7 @@ pub mod core; pub mod db; pub mod error; pub mod expired_message_cleaner; +pub mod metrics; pub mod openapi; pub mod queue; pub mod redis; @@ -54,6 +57,9 @@ const CRATE_NAME: &str = env!("CARGO_CRATE_NAME"); pub static SHUTTING_DOWN: AtomicBool = AtomicBool::new(false); +pub static INSTANCE_ID: Lazy = + Lazy::new(|| hex::encode(KsuidMs::new(None, None).to_string())); + async fn graceful_shutdown_handler() { let ctrl_c = async { tokio::signal::ctrl_c() @@ -83,6 +89,8 @@ async fn graceful_shutdown_handler() { #[tracing::instrument(name = "app_start", level = "trace", skip_all)] pub async fn run(cfg: Configuration, listener: Option) { + let _metrics = setup_metrics(&cfg); + run_with_prefix(None, cfg, listener).await } @@ -325,6 +333,32 @@ pub fn setup_tracing( (registry, sentry_guard) } +pub fn setup_metrics(cfg: &ConfigurationInner) -> Option { + cfg.opentelemetry_address.as_ref().map(|addr| { + let exporter = opentelemetry_otlp::new_exporter() + .tonic() + .with_endpoint(addr); + + opentelemetry_otlp::new_pipeline() + .metrics(Tokio) + .with_delta_temporality() + .with_exporter(exporter) + .with_resource(opentelemetry_sdk::Resource::new(vec![ + opentelemetry::KeyValue::new( + "service.name", + cfg.opentelemetry_service_name.clone(), + ), + opentelemetry::KeyValue::new("instance_id", INSTANCE_ID.to_owned()), + opentelemetry::KeyValue::new( + "service.version", + option_env!("GITHUB_SHA").unwrap_or("unknown"), + ), + ])) + .build() + .unwrap() + }) +} + pub fn setup_tracing_for_tests() { use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt}; diff --git a/server/svix-server/src/metrics/mod.rs b/server/svix-server/src/metrics/mod.rs new file mode 100644 index 000000000..0f24e7e77 --- /dev/null +++ b/server/svix-server/src/metrics/mod.rs @@ -0,0 +1,13 @@ +mod redis; + +pub fn init_metric(result: Result) -> Option { + match result { + Ok(t) => Some(t), + Err(e) => { + tracing::error!(error = ?e, "Failed to initialize metric"); + None + } + } +} + +pub use self::redis::{RedisQueueMetrics, RedisQueueType}; diff --git a/server/svix-server/src/metrics/redis.rs b/server/svix-server/src/metrics/redis.rs new file mode 100644 index 000000000..bb514a144 --- /dev/null +++ b/server/svix-server/src/metrics/redis.rs @@ -0,0 +1,140 @@ +use opentelemetry::metrics::{Meter, ObservableGauge}; +use redis::{streams::StreamPendingReply, AsyncCommands as _}; + +use super::init_metric; +use crate::{ + error::{Error, Result}, + redis::RedisManager, +}; + +pub enum RedisQueueType<'a> { + Stream(&'a str), + StreamPending { stream: &'a str, group: &'a str }, + List(&'a str), + SortedSet(&'a str), +} + +impl<'a> RedisQueueType<'a> { + pub async fn queue_depth(&self, redis: &RedisManager) -> Result { + let mut conn = redis.get().await?; + match self { + RedisQueueType::Stream(q) => conn + .xlen(q) + .await + .map_err(|e| Error::queue(format!("Failed to query queue depth: {e}"))), + RedisQueueType::StreamPending { stream, group } => { + let reply: StreamPendingReply = conn.xpending(stream, group).await?; + Ok(reply.count() as _) + } + RedisQueueType::List(q) => conn + .llen(q) + .await + .map_err(|e| Error::queue(format!("Failed to query queue depth: {e}"))), + RedisQueueType::SortedSet(q) => conn + .zcard(q) + .await + .map_err(|e| Error::queue(format!("Failed to query queue depth: {e}"))), + } + } +} + +#[derive(Clone)] +pub struct RedisQueueMetrics { + main_queue: Option>, + pending_queue: Option>, + delayed_queue: Option>, + deadletter_queue: Option>, +} + +impl RedisQueueMetrics { + pub fn new(meter: &Meter) -> Self { + let main_queue = init_metric( + meter + .u64_observable_gauge("svix.queue.depth_main") + .try_init(), + ); + + let pending_queue = init_metric( + meter + .u64_observable_gauge("svix.queue.pending_msgs") + .try_init(), + ); + + let delayed_queue = init_metric( + meter + .u64_observable_gauge("svix.queue.depth_delayed") + .try_init(), + ); + + let deadletter_queue = init_metric( + meter + .u64_observable_gauge("svix.queue.depth_dlq") + .try_init(), + ); + + Self { + main_queue, + pending_queue, + delayed_queue, + deadletter_queue, + } + } + pub async fn record( + &self, + redis: &RedisManager, + main_queue: &RedisQueueType<'_>, + pending_queue: &RedisQueueType<'_>, + delayed_queue: &RedisQueueType<'_>, + deadletter_queue: &RedisQueueType<'_>, + ) { + main_queue + .queue_depth(redis) + .await + .map(|d| self.record_main_queue_depth(d)) + .unwrap_or_else(|e| { + tracing::warn!("Failed to record queue depth: {e}"); + }); + pending_queue + .queue_depth(redis) + .await + .map(|d| self.record_pending_queue_depth(d)) + .unwrap_or_else(|e| { + tracing::warn!("Failed to record queue depth: {e}"); + }); + delayed_queue + .queue_depth(redis) + .await + .map(|d| self.record_delayed_queue_depth(d)) + .unwrap_or_else(|e| { + tracing::warn!("Failed to record queue depth: {e}"); + }); + deadletter_queue + .queue_depth(redis) + .await + .map(|d| self.record_deadletter_queue_depth(d)) + .unwrap_or_else(|e| { + tracing::warn!("Failed to record queue depth: {e}"); + }); + } + + fn record_main_queue_depth(&self, value: u64) { + if let Some(recorder) = &self.main_queue { + recorder.observe(value, &[]); + } + } + fn record_pending_queue_depth(&self, value: u64) { + if let Some(recorder) = &self.pending_queue { + recorder.observe(value, &[]); + } + } + fn record_delayed_queue_depth(&self, value: u64) { + if let Some(recorder) = &self.delayed_queue { + recorder.observe(value, &[]); + } + } + fn record_deadletter_queue_depth(&self, value: u64) { + if let Some(recorder) = &self.deadletter_queue { + recorder.observe(value, &[]); + } + } +} diff --git a/server/svix-server/src/mod.rs b/server/svix-server/src/mod.rs new file mode 100644 index 000000000..c201d9d08 --- /dev/null +++ b/server/svix-server/src/mod.rs @@ -0,0 +1,9 @@ +mod redis; +mod worker; + +pub use svix_server_core::metrics::*; + +pub use self::{ + redis::{RedisQueueMetrics, RedisQueueType}, + worker::WorkerMetrics, +}; diff --git a/server/svix-server/src/queue/redis.rs b/server/svix-server/src/queue/redis.rs index c739e72a8..0c11af9a3 100644 --- a/server/svix-server/src/queue/redis.rs +++ b/server/svix-server/src/queue/redis.rs @@ -36,6 +36,7 @@ use super::{QueueTask, TaskQueueConsumer, TaskQueueProducer}; use crate::{ cfg::{Configuration, QueueType}, error::Result, + metrics::RedisQueueType, redis::{RedisConnection, RedisManager}, }; @@ -133,6 +134,7 @@ async fn new_pair_inner( let main_queue_name = format!("{queue_prefix}{main_queue_name}"); let delayed_queue_name = format!("{queue_prefix}{delayed_queue_name}"); let delayed_lock_name = format!("{queue_prefix}{delayed_lock_name}"); + let dlq_name = format!("{queue_prefix}{dlq_name}"); // This fn is only called from // - `queue::new_pair` if the queue type is redis and a DSN is set @@ -177,27 +179,64 @@ async fn new_pair_inner( .expect("Pending duration out of bounds"); // Migrate v1 queues to v2 and v2 queues to v3 on a loop with exponential backoff. - tokio::spawn(async move { - let delays = [ - // 11.25 min - Duration::from_secs(60 * 11 + 15), - // 22.5 min - Duration::from_secs(60 * 22 + 30), - // 45 min - Duration::from_secs(60 * 45), - // 1.5 hours - Duration::from_secs(60 * 30 * 3), - // 3 hours - Duration::from_secs(60 * 60 * 3), - // 6 hours - Duration::from_secs(60 * 60 * 6), - // 12 hours - Duration::from_secs(60 * 60 * 12), - // 24 hours - Duration::from_secs(60 * 60 * 24), - ]; - - run_migration_schedule(&delays, pool).await; + tokio::spawn({ + let pool = pool.clone(); + + async move { + let delays = [ + // 11.25 min + Duration::from_secs(60 * 11 + 15), + // 22.5 min + Duration::from_secs(60 * 22 + 30), + // 45 min + Duration::from_secs(60 * 45), + // 1.5 hours + Duration::from_secs(60 * 30 * 3), + // 3 hours + Duration::from_secs(60 * 60 * 3), + // 6 hours + Duration::from_secs(60 * 60 * 6), + // 12 hours + Duration::from_secs(60 * 60 * 12), + // 24 hours + Duration::from_secs(60 * 60 * 24), + ]; + + run_migration_schedule(&delays, pool).await; + } + }); + + // Metrics task + tokio::spawn({ + let pool = pool.clone(); + let main_queue_name = main_queue_name.clone(); + let delayed_queue_name = delayed_queue_name.clone(); + let deadletter_queue_name = dlq_name.clone(); + + async move { + let mut interval = tokio::time::interval(Duration::from_secs(1)); + let main_queue = RedisQueueType::Stream(&main_queue_name); + let pending = RedisQueueType::StreamPending { + stream: &main_queue_name, + group: WORKERS_GROUP, + }; + let delayed_queue = RedisQueueType::SortedSet(&delayed_queue_name); + let deadletter_queue = RedisQueueType::List(&deadletter_queue_name); + let metrics = + crate::metrics::RedisQueueMetrics::new(&opentelemetry::global::meter("svix.com")); + loop { + interval.tick().await; + metrics + .record( + &pool, + &main_queue, + &pending, + &delayed_queue, + &deadletter_queue, + ) + .await; + } + } }); let config = RedisConfig {