diff --git a/server/svix-server/src/core/cache/redis.rs b/server/svix-server/src/core/cache/redis.rs index d6b045755..19261c192 100644 --- a/server/svix-server/src/core/cache/redis.rs +++ b/server/svix-server/src/core/cache/redis.rs @@ -81,7 +81,7 @@ mod tests { super::{kv_def, string_kv_def, CacheValue}, *, }; - use crate::cfg::CacheType; + use crate::cfg::Configuration; // Test structures @@ -112,14 +112,8 @@ mod tests { } } - async fn get_pool(redis_dsn: &str, cfg: &crate::cfg::Configuration) -> RedisManager { - match cfg.cache_type { - CacheType::RedisCluster => crate::redis::new_redis_clustered_unpooled(redis_dsn).await, - CacheType::Redis => crate::redis::new_redis_unpooled(redis_dsn).await, - _ => panic!( - "This test should only be run when redis is configured as the cache provider" - ), - } + async fn get_pool(cfg: &Configuration) -> RedisManager { + RedisManager::from_cache_backend(&cfg.cache_backend()).await } #[tokio::test] @@ -129,7 +123,7 @@ mod tests { dotenvy::dotenv().ok(); let cfg = crate::cfg::load().unwrap(); - let redis_pool = get_pool(cfg.redis_dsn.as_ref().unwrap().as_str(), &cfg).await; + let redis_pool = get_pool(&cfg).await; let cache = super::new(redis_pool); let (first_key, first_val_a, first_val_b) = @@ -206,7 +200,7 @@ mod tests { dotenvy::dotenv().ok(); let cfg = crate::cfg::load().unwrap(); - let redis_pool = get_pool(cfg.redis_dsn.as_ref().unwrap().as_str(), &cfg).await; + let redis_pool = get_pool(&cfg).await; let cache = super::new(redis_pool); let key = TestKeyA::new("key".to_owned()); @@ -225,7 +219,7 @@ mod tests { dotenvy::dotenv().ok(); let cfg = crate::cfg::load().unwrap(); - let redis_pool = get_pool(cfg.redis_dsn.as_ref().unwrap().as_str(), &cfg).await; + let redis_pool = get_pool(&cfg).await; let cache = super::new(redis_pool); let key = TestKeyA::new("nx_status_test_key".to_owned()); diff --git a/server/svix-server/src/lib.rs b/server/svix-server/src/lib.rs index a85ba5879..6b20376b5 100644 --- a/server/svix-server/src/lib.rs +++ b/server/svix-server/src/lib.rs @@ -16,6 +16,7 @@ use cfg::ConfigurationInner; use opentelemetry_otlp::WithExportConfig; use opentelemetry_sdk::runtime::Tokio; use queue::TaskQueueProducer; +use redis::RedisManager; use sea_orm::DatabaseConnection; use sentry::integrations::tracing::EventFilter; use tower::layer::layer_fn; @@ -106,15 +107,12 @@ pub async fn run_with_prefix( tracing::debug!("DB: Started"); tracing::debug!("Cache: Initializing {:?}", cfg.cache_type); - let cache = match cfg.cache_backend() { + let cache_backend = cfg.cache_backend(); + let cache = match &cache_backend { CacheBackend::None => cache::none::new(), CacheBackend::Memory => cache::memory::new(), - CacheBackend::Redis(dsn) => { - let mgr = crate::redis::new_redis_unpooled(dsn).await; - cache::redis::new(mgr) - } - CacheBackend::RedisCluster(dsn) => { - let mgr = crate::redis::new_redis_clustered_unpooled(dsn).await; + CacheBackend::Redis(_) | CacheBackend::RedisCluster(_) => { + let mgr = RedisManager::from_cache_backend(&cache_backend).await; cache::redis::new(mgr) } }; diff --git a/server/svix-server/src/queue/redis.rs b/server/svix-server/src/queue/redis.rs index 39b044ea8..fda46d598 100644 --- a/server/svix-server/src/queue/redis.rs +++ b/server/svix-server/src/queue/redis.rs @@ -133,10 +133,8 @@ async fn new_pair_inner( // - `queue::new_pair` if the queue type is redis and a DSN is set // - redis tests that only makes sense to run with the DSN set let dsn = cfg.redis_dsn.as_deref().unwrap(); - let pool = match &cfg.queue_type { - QueueType::RedisCluster => crate::redis::new_redis_clustered_pooled(dsn, cfg).await, - _ => crate::redis::new_redis_pooled(dsn, cfg).await, - }; + let pool = + RedisManager::from_queue_backend(&cfg.queue_backend(), cfg.redis_pool_max_size).await; // Create the stream and consumer group for the MAIN queue should it not already exist. The // consumer is created automatically upon use so it does not have to be created here. @@ -355,25 +353,14 @@ pub mod tests { use super::{migrate_list, migrate_list_to_stream, migrate_sset, new_pair_inner}; use crate::{ - cfg::{Configuration, QueueType}, + cfg::Configuration, core::types::{ApplicationId, EndpointId, MessageAttemptTriggerType, MessageId}, queue::{MessageTask, QueueTask, TaskQueueConsumer, TaskQueueProducer}, redis::RedisManager, }; async fn get_pool(cfg: &Configuration) -> RedisManager { - match cfg.queue_type { - QueueType::RedisCluster => { - crate::redis::new_redis_clustered_pooled(cfg.redis_dsn.as_deref().unwrap(), cfg) - .await - } - QueueType::Redis => { - crate::redis::new_redis_pooled(cfg.redis_dsn.as_deref().unwrap(), cfg).await - } - _ => { - panic!("This test should only be run when redis is configured as the queue backend") - } - } + RedisManager::from_queue_backend(&cfg.queue_backend(), cfg.redis_pool_max_size).await } #[tokio::test] diff --git a/server/svix-server/src/redis/mod.rs b/server/svix-server/src/redis/mod.rs index 1be8a0d1a..87581aa4c 100644 --- a/server/svix-server/src/redis/mod.rs +++ b/server/svix-server/src/redis/mod.rs @@ -7,7 +7,7 @@ use bb8_redis::RedisConnectionManager; use redis::{FromRedisValue, RedisError, RedisResult}; pub use self::cluster::RedisClusterConnectionManager; -use crate::cfg::Configuration; +use crate::cfg::{CacheBackend, QueueBackend}; pub const REDIS_CONN_TIMEOUT: Duration = Duration::from_secs(2); @@ -20,6 +20,73 @@ pub enum RedisManager { } impl RedisManager { + async fn new_pooled(dsn: &str, clustered: bool, max_conns: u16) -> Self { + if clustered { + let mgr = RedisClusterConnectionManager::new(dsn) + .expect("Error initializing redis cluster client"); + let pool = bb8::Pool::builder() + .max_size(max_conns.into()) + .build(mgr) + .await + .expect("Error initializing redis cluster connection pool"); + let pool = ClusteredRedisPool { pool }; + RedisManager::Clustered(pool) + } else { + let mgr = RedisConnectionManager::new(dsn).expect("Error initializing redis client"); + let pool = bb8::Pool::builder() + .max_size(max_conns.into()) + .build(mgr) + .await + .expect("Error initializing redis connection pool"); + let pool = NonClusteredRedisPool { pool }; + RedisManager::NonClustered(pool) + } + } + + async fn new_unpooled(dsn: &str, clustered: bool) -> Self { + if clustered { + let cli = redis::cluster::ClusterClient::builder(vec![dsn]) + .retries(1) + .connection_timeout(REDIS_CONN_TIMEOUT) + .build() + .expect("Error initializing redis-unpooled cluster client"); + let con = cli + .get_async_connection() + .await + .expect("Failed to get redis-cluster-unpooled connection"); + RedisManager::ClusteredUnpooled(ClusteredRedisUnpooled { con }) + } else { + let cli = redis::Client::open(dsn).expect("Error initializing redis unpooled client"); + let con = redis::aio::ConnectionManager::new_with_backoff_and_timeouts( + cli, + 2, + 100, + 1, + Duration::MAX, + REDIS_CONN_TIMEOUT, + ) + .await + .expect("Failed to get redis-unpooled connection manager"); + RedisManager::NonClusteredUnpooled(NonClusteredRedisUnpooled { con }) + } + } + + pub async fn from_cache_backend(cache_backend: &CacheBackend<'_>) -> Self { + match cache_backend { + CacheBackend::Redis(dsn) => Self::new_unpooled(dsn, false).await, + CacheBackend::RedisCluster(dsn) => Self::new_unpooled(dsn, true).await, + _ => panic!("Queue type not supported with redis"), + } + } + + pub async fn from_queue_backend(queue_backend: &QueueBackend<'_>, max_conns: u16) -> Self { + match queue_backend { + QueueBackend::Redis(dsn) => Self::new_pooled(dsn, false, max_conns).await, + QueueBackend::RedisCluster(dsn) => Self::new_pooled(dsn, true, max_conns).await, + _ => panic!("Queue type not supported with redis"), + } + } + pub async fn get(&self) -> Result, RunError> { match self { Self::Clustered(pool) => pool.get().await, @@ -230,93 +297,11 @@ impl ClusteredUnpooledConnection { } } -async fn new_redis_pool_helper( - redis_dsn: &str, - clustered: bool, - max_connections: u16, -) -> RedisManager { - if clustered { - let mgr = RedisClusterConnectionManager::new(redis_dsn) - .expect("Error initializing redis cluster client"); - let pool = bb8::Pool::builder() - .max_size(max_connections.into()) - .build(mgr) - .await - .expect("Error initializing redis cluster connection pool"); - let pool = ClusteredRedisPool { pool }; - RedisManager::Clustered(pool) - } else { - let mgr = RedisConnectionManager::new(redis_dsn).expect("Error initializing redis client"); - let pool = bb8::Pool::builder() - .max_size(max_connections.into()) - .build(mgr) - .await - .expect("Error initializing redis connection pool"); - let pool = NonClusteredRedisPool { pool }; - RedisManager::NonClustered(pool) - } -} - -async fn new_redis_unpooled_helper(redis_dsn: &str, clustered: bool) -> RedisManager { - if clustered { - let cli = redis::cluster::ClusterClient::builder(vec![redis_dsn]) - .retries(1) - .connection_timeout(REDIS_CONN_TIMEOUT) - .build() - .expect("Error initializing redis-unpooled cluster client"); - let con = cli - .get_async_connection() - .await - .expect("Failed to get redis-cluster-unpooled connection"); - RedisManager::ClusteredUnpooled(ClusteredRedisUnpooled { con }) - } else { - let cli = redis::Client::open(redis_dsn).expect("Error initializing redis unpooled client"); - let con = redis::aio::ConnectionManager::new_with_backoff_and_timeouts( - cli, - 2, - 100, - 1, - Duration::MAX, - REDIS_CONN_TIMEOUT, - ) - .await - .expect("Failed to get redis-unpooled connection manager"); - RedisManager::NonClusteredUnpooled(NonClusteredRedisUnpooled { con }) - } -} - -pub async fn new_redis_clustered_pooled(redis_dsn: &str, cfg: &Configuration) -> RedisManager { - new_redis_pool_helper(redis_dsn, true, cfg.redis_pool_max_size).await -} - -pub async fn new_redis_clustered_unpooled(redis_dsn: &str) -> RedisManager { - new_redis_unpooled_helper(redis_dsn, true).await -} - -pub async fn new_redis_pooled(redis_dsn: &str, cfg: &Configuration) -> RedisManager { - new_redis_pool_helper(redis_dsn, false, cfg.redis_pool_max_size).await -} - -pub async fn new_redis_unpooled(redis_dsn: &str) -> RedisManager { - new_redis_unpooled_helper(redis_dsn, false).await -} - #[cfg(test)] mod tests { use redis::AsyncCommands; use super::RedisManager; - use crate::cfg::{CacheType, Configuration}; - - async fn get_pool(redis_dsn: &str, cfg: &Configuration) -> RedisManager { - match cfg.cache_type { - CacheType::RedisCluster => super::new_redis_clustered_unpooled(redis_dsn).await, - CacheType::Redis => super::new_redis_unpooled(redis_dsn).await, - _ => panic!( - "This test should only be run when redis is configured as the cache provider" - ), - } - } // Ensure basic set/get works -- should test sharding as well: #[tokio::test] @@ -326,8 +311,8 @@ mod tests { dotenvy::dotenv().ok(); let cfg = crate::cfg::load().unwrap(); - let pool = get_pool(cfg.redis_dsn.as_ref().unwrap().as_str(), &cfg).await; - let mut conn = pool.get().await.unwrap(); + let mgr = RedisManager::from_cache_backend(&cfg.cache_backend()).await; + let mut conn = mgr.get().await.unwrap(); for (val, key) in "abcdefghijklmnopqrstuvwxyz".chars().enumerate() { let key = key.to_string(); diff --git a/server/svix-server/tests/it/message_app.rs b/server/svix-server/tests/it/message_app.rs index 62a3df13d..808033fbf 100644 --- a/server/svix-server/tests/it/message_app.rs +++ b/server/svix-server/tests/it/message_app.rs @@ -12,7 +12,7 @@ use svix_server::{ message_app::AppEndpointKey, types::{BaseId, OrganizationId}, }, - redis::{new_redis_clustered_unpooled, new_redis_unpooled}, + redis::RedisManager, }; use crate::utils::{ @@ -65,12 +65,8 @@ async fn test_app_deletion() { // Delete the cached [`CreateMessageApp`] here instead of waiting 30s for it to expire let cache = match cfg.cache_backend() { CacheBackend::None => cache::none::new(), - CacheBackend::Redis(dsn) => { - let mgr = new_redis_unpooled(dsn).await; - cache::redis::new(mgr) - } - CacheBackend::RedisCluster(dsn) => { - let mgr = new_redis_clustered_unpooled(dsn).await; + CacheBackend::Redis(_) | CacheBackend::RedisCluster(_) => { + let mgr = RedisManager::from_cache_backend(&cfg.cache_backend()).await; cache::redis::new(mgr) } @@ -149,12 +145,8 @@ async fn test_endp_deletion() { // Delete the cached [`CreateMessageApp`] here instead of waiting 30s for it to expire let cache = match cfg.cache_backend() { CacheBackend::None => cache::none::new(), - CacheBackend::Redis(dsn) => { - let mgr = new_redis_unpooled(dsn).await; - cache::redis::new(mgr) - } - CacheBackend::RedisCluster(dsn) => { - let mgr = new_redis_clustered_unpooled(dsn).await; + CacheBackend::Redis(_) | CacheBackend::RedisCluster(_) => { + let mgr = RedisManager::from_cache_backend(&cfg.cache_backend()).await; cache::redis::new(mgr) } diff --git a/server/svix-server/tests/it/redis_queue.rs b/server/svix-server/tests/it/redis_queue.rs index 2297e9670..084a97277 100644 --- a/server/svix-server/tests/it/redis_queue.rs +++ b/server/svix-server/tests/it/redis_queue.rs @@ -8,25 +8,17 @@ use std::{str::FromStr, time::Duration}; use redis::AsyncCommands as _; use svix_server::{ - cfg::{Configuration, QueueType}, + cfg::Configuration, core::types::{ApplicationId, EndpointId, MessageAttemptTriggerType, MessageId}, queue::{ new_pair, MessageTask, QueueTask, TaskQueueConsumer, TaskQueueDelivery, TaskQueueProducer, }, - redis::{new_redis_clustered_pooled, new_redis_pooled, RedisManager}, + redis::RedisManager, }; // TODO: Don't copy this from the Redis queue test directly, place the fn somewhere both can access async fn get_pool(cfg: &Configuration) -> RedisManager { - match cfg.queue_type { - QueueType::RedisCluster => { - new_redis_clustered_pooled(cfg.redis_dsn.as_deref().unwrap(), cfg).await - } - QueueType::Redis => new_redis_pooled(cfg.redis_dsn.as_deref().unwrap(), cfg).await, - _ => { - panic!("This test should only be run when redis is configured as the queue backend") - } - } + RedisManager::from_queue_backend(&cfg.queue_backend(), cfg.redis_pool_max_size).await } fn task_queue_delivery_to_u16(tqd: &TaskQueueDelivery) -> u16 {