Skip to content

Commit

Permalink
Add redis OTEL metrics
Browse files Browse the repository at this point in the history
This adds support exporting OTEL metrics, and adds some
basic monitoring of the various redis queues.
  • Loading branch information
jaymell committed Oct 8, 2024
1 parent 1b7834c commit bf75afe
Show file tree
Hide file tree
Showing 7 changed files with 277 additions and 57 deletions.
44 changes: 14 additions & 30 deletions server/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 6 additions & 5 deletions server/svix-server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,11 @@ once_cell = "1.18.0"
figment = { version = "0.10", features = ["toml", "env", "test"] }
tracing = "0.1.35"
tracing-subscriber = { version = "0.3", features = ["env-filter", "json"] }
tracing-opentelemetry = "0.23.0"
opentelemetry = "0.22.0"
opentelemetry_sdk = { version = "0.22.1", features = ["rt-tokio"] }
opentelemetry-http = "0.11.0"
opentelemetry-otlp = { version = "0.15.0" }
tracing-opentelemetry = "0.24.0"
opentelemetry = { version = "0.23.0", features = ["metrics"] }
opentelemetry_sdk = { version = "0.23.0", features = ["rt-tokio"] }
opentelemetry-http = "0.12.0"
opentelemetry-otlp = { version = "0.16.0", features = ["metrics"] }
validator = { version = "0.16.0", features = ["derive"] }
jwt-simple = "0.11.6"
ed25519-compact = "2.1.1"
Expand Down Expand Up @@ -72,6 +72,7 @@ omniqueue = { git = "https://github.com/svix/omniqueue-rs", rev = "75e5a9510ad33
# Not a well-known author, and no longer gets updates => pinned.
# Switch to hyper-http-proxy when upgrading hyper to 1.0.
hyper-proxy = { version = "=0.9.1", default-features = false, features = ["openssl-tls"] }
hex = "0.4.3"

[target.'cfg(not(target_env = "msvc"))'.dependencies]
tikv-jemallocator = { version = "0.5", optional = true }
Expand Down
36 changes: 35 additions & 1 deletion server/svix-server/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,14 @@ use std::{

use aide::axum::ApiRouter;
use cfg::ConfigurationInner;
use once_cell::sync::Lazy;
use opentelemetry_otlp::WithExportConfig;
use opentelemetry_sdk::runtime::Tokio;
use opentelemetry_sdk::{metrics::SdkMeterProvider, runtime::Tokio};
use queue::TaskQueueProducer;
use redis::RedisManager;
use sea_orm::DatabaseConnection;
use sentry::integrations::tracing::EventFilter;
use svix_ksuid::{KsuidLike, KsuidMs};
use tower::layer::layer_fn;
use tower_http::{
cors::{AllowHeaders, Any, CorsLayer},
Expand All @@ -44,6 +46,7 @@ pub mod core;
pub mod db;
pub mod error;
pub mod expired_message_cleaner;
pub mod metrics;
pub mod openapi;
pub mod queue;
pub mod redis;
Expand All @@ -54,6 +57,9 @@ const CRATE_NAME: &str = env!("CARGO_CRATE_NAME");

pub static SHUTTING_DOWN: AtomicBool = AtomicBool::new(false);

pub static INSTANCE_ID: Lazy<String> =
Lazy::new(|| hex::encode(KsuidMs::new(None, None).to_string()));

async fn graceful_shutdown_handler() {
let ctrl_c = async {
tokio::signal::ctrl_c()
Expand Down Expand Up @@ -83,6 +89,8 @@ async fn graceful_shutdown_handler() {

#[tracing::instrument(name = "app_start", level = "trace", skip_all)]
pub async fn run(cfg: Configuration, listener: Option<TcpListener>) {
let _metrics = setup_metrics(&cfg);

run_with_prefix(None, cfg, listener).await
}

Expand Down Expand Up @@ -325,6 +333,32 @@ pub fn setup_tracing(
(registry, sentry_guard)
}

pub fn setup_metrics(cfg: &ConfigurationInner) -> Option<SdkMeterProvider> {
cfg.opentelemetry_address.as_ref().map(|addr| {
let exporter = opentelemetry_otlp::new_exporter()
.tonic()
.with_endpoint(addr);

opentelemetry_otlp::new_pipeline()
.metrics(Tokio)
.with_delta_temporality()
.with_exporter(exporter)
.with_resource(opentelemetry_sdk::Resource::new(vec![
opentelemetry::KeyValue::new(
"service.name",
cfg.opentelemetry_service_name.clone(),
),
opentelemetry::KeyValue::new("instance_id", INSTANCE_ID.to_owned()),
opentelemetry::KeyValue::new(
"service.version",
option_env!("GITHUB_SHA").unwrap_or("unknown"),
),
]))
.build()
.unwrap()
})
}

pub fn setup_tracing_for_tests() {
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};

Expand Down
13 changes: 13 additions & 0 deletions server/svix-server/src/metrics/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
mod redis;

pub fn init_metric<T, E: std::fmt::Debug>(result: Result<T, E>) -> Option<T> {
match result {
Ok(t) => Some(t),
Err(e) => {
tracing::error!(error = ?e, "Failed to initialize metric");
None
}
}
}

pub use self::redis::{RedisQueueMetrics, RedisQueueType};
140 changes: 140 additions & 0 deletions server/svix-server/src/metrics/redis.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
use opentelemetry::metrics::{Meter, ObservableGauge};
use redis::{streams::StreamPendingReply, AsyncCommands as _};

use super::init_metric;
use crate::{
error::{Error, Result},
redis::RedisManager,
};

pub enum RedisQueueType<'a> {
Stream(&'a str),
StreamPending { stream: &'a str, group: &'a str },
List(&'a str),
SortedSet(&'a str),
}

impl<'a> RedisQueueType<'a> {
pub async fn queue_depth(&self, redis: &RedisManager) -> Result<u64> {
let mut conn = redis.get().await?;
match self {
RedisQueueType::Stream(q) => conn
.xlen(q)
.await
.map_err(|e| Error::queue(format!("Failed to query queue depth: {e}"))),
RedisQueueType::StreamPending { stream, group } => {
let reply: StreamPendingReply = conn.xpending(stream, group).await?;
Ok(reply.count() as _)
}
RedisQueueType::List(q) => conn
.llen(q)
.await
.map_err(|e| Error::queue(format!("Failed to query queue depth: {e}"))),
RedisQueueType::SortedSet(q) => conn
.zcard(q)
.await
.map_err(|e| Error::queue(format!("Failed to query queue depth: {e}"))),
}
}
}

#[derive(Clone)]
pub struct RedisQueueMetrics {
main_queue: Option<ObservableGauge<u64>>,
pending_queue: Option<ObservableGauge<u64>>,
delayed_queue: Option<ObservableGauge<u64>>,
deadletter_queue: Option<ObservableGauge<u64>>,
}

impl RedisQueueMetrics {
pub fn new(meter: &Meter) -> Self {
let main_queue = init_metric(
meter
.u64_observable_gauge("svix.queue.depth_main")
.try_init(),
);

let pending_queue = init_metric(
meter
.u64_observable_gauge("svix.queue.pending_msgs")
.try_init(),
);

let delayed_queue = init_metric(
meter
.u64_observable_gauge("svix.queue.depth_delayed")
.try_init(),
);

let deadletter_queue = init_metric(
meter
.u64_observable_gauge("svix.queue.depth_dlq")
.try_init(),
);

Self {
main_queue,
pending_queue,
delayed_queue,
deadletter_queue,
}
}
pub async fn record(
&self,
redis: &RedisManager,
main_queue: &RedisQueueType<'_>,
pending_queue: &RedisQueueType<'_>,
delayed_queue: &RedisQueueType<'_>,
deadletter_queue: &RedisQueueType<'_>,
) {
main_queue
.queue_depth(redis)
.await
.map(|d| self.record_main_queue_depth(d))
.unwrap_or_else(|e| {
tracing::warn!("Failed to record queue depth: {e}");
});
pending_queue
.queue_depth(redis)
.await
.map(|d| self.record_pending_queue_depth(d))
.unwrap_or_else(|e| {
tracing::warn!("Failed to record queue depth: {e}");
});
delayed_queue
.queue_depth(redis)
.await
.map(|d| self.record_delayed_queue_depth(d))
.unwrap_or_else(|e| {
tracing::warn!("Failed to record queue depth: {e}");
});
deadletter_queue
.queue_depth(redis)
.await
.map(|d| self.record_deadletter_queue_depth(d))
.unwrap_or_else(|e| {
tracing::warn!("Failed to record queue depth: {e}");
});
}

fn record_main_queue_depth(&self, value: u64) {
if let Some(recorder) = &self.main_queue {
recorder.observe(value, &[]);
}
}
fn record_pending_queue_depth(&self, value: u64) {
if let Some(recorder) = &self.pending_queue {
recorder.observe(value, &[]);
}
}
fn record_delayed_queue_depth(&self, value: u64) {
if let Some(recorder) = &self.delayed_queue {
recorder.observe(value, &[]);
}
}
fn record_deadletter_queue_depth(&self, value: u64) {
if let Some(recorder) = &self.deadletter_queue {
recorder.observe(value, &[]);
}
}
}
9 changes: 9 additions & 0 deletions server/svix-server/src/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
mod redis;
mod worker;

pub use svix_server_core::metrics::*;

pub use self::{
redis::{RedisQueueMetrics, RedisQueueType},
worker::WorkerMetrics,
};
Loading

0 comments on commit bf75afe

Please sign in to comment.