Skip to content

Commit

Permalink
Refactor redis helper functions into methods
Browse files Browse the repository at this point in the history
This is slightly less error-prone, since the caller no longer has
to specify whether to create a pooled or unpooled manager, and it's
also less repetitive.
  • Loading branch information
jaymell authored and svix-james committed Jun 5, 2024
1 parent d89d063 commit 333f591
Show file tree
Hide file tree
Showing 6 changed files with 93 additions and 145 deletions.
18 changes: 6 additions & 12 deletions server/svix-server/src/core/cache/redis.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ mod tests {
super::{kv_def, string_kv_def, CacheValue},
*,
};
use crate::cfg::CacheType;
use crate::cfg::Configuration;

// Test structures

Expand Down Expand Up @@ -112,14 +112,8 @@ mod tests {
}
}

async fn get_pool(redis_dsn: &str, cfg: &crate::cfg::Configuration) -> RedisManager {
match cfg.cache_type {
CacheType::RedisCluster => crate::redis::new_redis_clustered_unpooled(redis_dsn).await,
CacheType::Redis => crate::redis::new_redis_unpooled(redis_dsn).await,
_ => panic!(
"This test should only be run when redis is configured as the cache provider"
),
}
async fn get_pool(cfg: &Configuration) -> RedisManager {
RedisManager::from_cache_backend(&cfg.cache_backend()).await
}

#[tokio::test]
Expand All @@ -129,7 +123,7 @@ mod tests {
dotenvy::dotenv().ok();
let cfg = crate::cfg::load().unwrap();

let redis_pool = get_pool(cfg.redis_dsn.as_ref().unwrap().as_str(), &cfg).await;
let redis_pool = get_pool(&cfg).await;
let cache = super::new(redis_pool);

let (first_key, first_val_a, first_val_b) =
Expand Down Expand Up @@ -206,7 +200,7 @@ mod tests {
dotenvy::dotenv().ok();
let cfg = crate::cfg::load().unwrap();

let redis_pool = get_pool(cfg.redis_dsn.as_ref().unwrap().as_str(), &cfg).await;
let redis_pool = get_pool(&cfg).await;
let cache = super::new(redis_pool);

let key = TestKeyA::new("key".to_owned());
Expand All @@ -225,7 +219,7 @@ mod tests {
dotenvy::dotenv().ok();
let cfg = crate::cfg::load().unwrap();

let redis_pool = get_pool(cfg.redis_dsn.as_ref().unwrap().as_str(), &cfg).await;
let redis_pool = get_pool(&cfg).await;
let cache = super::new(redis_pool);

let key = TestKeyA::new("nx_status_test_key".to_owned());
Expand Down
12 changes: 5 additions & 7 deletions server/svix-server/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use cfg::ConfigurationInner;
use opentelemetry_otlp::WithExportConfig;
use opentelemetry_sdk::runtime::Tokio;
use queue::TaskQueueProducer;
use redis::RedisManager;
use sea_orm::DatabaseConnection;
use sentry::integrations::tracing::EventFilter;
use tower::layer::layer_fn;
Expand Down Expand Up @@ -106,15 +107,12 @@ pub async fn run_with_prefix(
tracing::debug!("DB: Started");

tracing::debug!("Cache: Initializing {:?}", cfg.cache_type);
let cache = match cfg.cache_backend() {
let cache_backend = cfg.cache_backend();
let cache = match &cache_backend {
CacheBackend::None => cache::none::new(),
CacheBackend::Memory => cache::memory::new(),
CacheBackend::Redis(dsn) => {
let mgr = crate::redis::new_redis_unpooled(dsn).await;
cache::redis::new(mgr)
}
CacheBackend::RedisCluster(dsn) => {
let mgr = crate::redis::new_redis_clustered_unpooled(dsn).await;
CacheBackend::Redis(_) | CacheBackend::RedisCluster(_) => {
let mgr = RedisManager::from_cache_backend(&cache_backend).await;
cache::redis::new(mgr)
}
};
Expand Down
21 changes: 4 additions & 17 deletions server/svix-server/src/queue/redis.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,10 +133,8 @@ async fn new_pair_inner(
// - `queue::new_pair` if the queue type is redis and a DSN is set
// - redis tests that only makes sense to run with the DSN set
let dsn = cfg.redis_dsn.as_deref().unwrap();
let pool = match &cfg.queue_type {
QueueType::RedisCluster => crate::redis::new_redis_clustered_pooled(dsn, cfg).await,
_ => crate::redis::new_redis_pooled(dsn, cfg).await,
};
let pool =
RedisManager::from_queue_backend(&cfg.queue_backend(), cfg.redis_pool_max_size).await;

// Create the stream and consumer group for the MAIN queue should it not already exist. The
// consumer is created automatically upon use so it does not have to be created here.
Expand Down Expand Up @@ -355,25 +353,14 @@ pub mod tests {

use super::{migrate_list, migrate_list_to_stream, migrate_sset, new_pair_inner};
use crate::{
cfg::{Configuration, QueueType},
cfg::Configuration,
core::types::{ApplicationId, EndpointId, MessageAttemptTriggerType, MessageId},
queue::{MessageTask, QueueTask, TaskQueueConsumer, TaskQueueProducer},
redis::RedisManager,
};

async fn get_pool(cfg: &Configuration) -> RedisManager {
match cfg.queue_type {
QueueType::RedisCluster => {
crate::redis::new_redis_clustered_pooled(cfg.redis_dsn.as_deref().unwrap(), cfg)
.await
}
QueueType::Redis => {
crate::redis::new_redis_pooled(cfg.redis_dsn.as_deref().unwrap(), cfg).await
}
_ => {
panic!("This test should only be run when redis is configured as the queue backend")
}
}
RedisManager::from_queue_backend(&cfg.queue_backend(), cfg.redis_pool_max_size).await
}

#[tokio::test]
Expand Down
155 changes: 70 additions & 85 deletions server/svix-server/src/redis/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use bb8_redis::RedisConnectionManager;
use redis::{FromRedisValue, RedisError, RedisResult};

pub use self::cluster::RedisClusterConnectionManager;
use crate::cfg::Configuration;
use crate::cfg::{CacheBackend, QueueBackend};

pub const REDIS_CONN_TIMEOUT: Duration = Duration::from_secs(2);

Expand All @@ -20,6 +20,73 @@ pub enum RedisManager {
}

impl RedisManager {
async fn new_pooled(dsn: &str, clustered: bool, max_conns: u16) -> Self {
if clustered {
let mgr = RedisClusterConnectionManager::new(dsn)
.expect("Error initializing redis cluster client");
let pool = bb8::Pool::builder()
.max_size(max_conns.into())
.build(mgr)
.await
.expect("Error initializing redis cluster connection pool");
let pool = ClusteredRedisPool { pool };
RedisManager::Clustered(pool)
} else {
let mgr = RedisConnectionManager::new(dsn).expect("Error initializing redis client");
let pool = bb8::Pool::builder()
.max_size(max_conns.into())
.build(mgr)
.await
.expect("Error initializing redis connection pool");
let pool = NonClusteredRedisPool { pool };
RedisManager::NonClustered(pool)
}
}

async fn new_unpooled(dsn: &str, clustered: bool) -> Self {
if clustered {
let cli = redis::cluster::ClusterClient::builder(vec![dsn])
.retries(1)
.connection_timeout(REDIS_CONN_TIMEOUT)
.build()
.expect("Error initializing redis-unpooled cluster client");
let con = cli
.get_async_connection()
.await
.expect("Failed to get redis-cluster-unpooled connection");
RedisManager::ClusteredUnpooled(ClusteredRedisUnpooled { con })
} else {
let cli = redis::Client::open(dsn).expect("Error initializing redis unpooled client");
let con = redis::aio::ConnectionManager::new_with_backoff_and_timeouts(
cli,
2,
100,
1,
Duration::MAX,
REDIS_CONN_TIMEOUT,
)
.await
.expect("Failed to get redis-unpooled connection manager");
RedisManager::NonClusteredUnpooled(NonClusteredRedisUnpooled { con })
}
}

pub async fn from_cache_backend(cache_backend: &CacheBackend<'_>) -> Self {
match cache_backend {
CacheBackend::Redis(dsn) => Self::new_unpooled(dsn, false).await,
CacheBackend::RedisCluster(dsn) => Self::new_unpooled(dsn, true).await,
_ => panic!("Queue type not supported with redis"),
}
}

pub async fn from_queue_backend(queue_backend: &QueueBackend<'_>, max_conns: u16) -> Self {
match queue_backend {
QueueBackend::Redis(dsn) => Self::new_pooled(dsn, false, max_conns).await,
QueueBackend::RedisCluster(dsn) => Self::new_pooled(dsn, true, max_conns).await,
_ => panic!("Queue type not supported with redis"),
}
}

pub async fn get(&self) -> Result<PooledConnection<'_>, RunError<RedisError>> {
match self {
Self::Clustered(pool) => pool.get().await,
Expand Down Expand Up @@ -230,93 +297,11 @@ impl ClusteredUnpooledConnection {
}
}

async fn new_redis_pool_helper(
redis_dsn: &str,
clustered: bool,
max_connections: u16,
) -> RedisManager {
if clustered {
let mgr = RedisClusterConnectionManager::new(redis_dsn)
.expect("Error initializing redis cluster client");
let pool = bb8::Pool::builder()
.max_size(max_connections.into())
.build(mgr)
.await
.expect("Error initializing redis cluster connection pool");
let pool = ClusteredRedisPool { pool };
RedisManager::Clustered(pool)
} else {
let mgr = RedisConnectionManager::new(redis_dsn).expect("Error initializing redis client");
let pool = bb8::Pool::builder()
.max_size(max_connections.into())
.build(mgr)
.await
.expect("Error initializing redis connection pool");
let pool = NonClusteredRedisPool { pool };
RedisManager::NonClustered(pool)
}
}

async fn new_redis_unpooled_helper(redis_dsn: &str, clustered: bool) -> RedisManager {
if clustered {
let cli = redis::cluster::ClusterClient::builder(vec![redis_dsn])
.retries(1)
.connection_timeout(REDIS_CONN_TIMEOUT)
.build()
.expect("Error initializing redis-unpooled cluster client");
let con = cli
.get_async_connection()
.await
.expect("Failed to get redis-cluster-unpooled connection");
RedisManager::ClusteredUnpooled(ClusteredRedisUnpooled { con })
} else {
let cli = redis::Client::open(redis_dsn).expect("Error initializing redis unpooled client");
let con = redis::aio::ConnectionManager::new_with_backoff_and_timeouts(
cli,
2,
100,
1,
Duration::MAX,
REDIS_CONN_TIMEOUT,
)
.await
.expect("Failed to get redis-unpooled connection manager");
RedisManager::NonClusteredUnpooled(NonClusteredRedisUnpooled { con })
}
}

pub async fn new_redis_clustered_pooled(redis_dsn: &str, cfg: &Configuration) -> RedisManager {
new_redis_pool_helper(redis_dsn, true, cfg.redis_pool_max_size).await
}

pub async fn new_redis_clustered_unpooled(redis_dsn: &str) -> RedisManager {
new_redis_unpooled_helper(redis_dsn, true).await
}

pub async fn new_redis_pooled(redis_dsn: &str, cfg: &Configuration) -> RedisManager {
new_redis_pool_helper(redis_dsn, false, cfg.redis_pool_max_size).await
}

pub async fn new_redis_unpooled(redis_dsn: &str) -> RedisManager {
new_redis_unpooled_helper(redis_dsn, false).await
}

#[cfg(test)]
mod tests {
use redis::AsyncCommands;

use super::RedisManager;
use crate::cfg::{CacheType, Configuration};

async fn get_pool(redis_dsn: &str, cfg: &Configuration) -> RedisManager {
match cfg.cache_type {
CacheType::RedisCluster => super::new_redis_clustered_unpooled(redis_dsn).await,
CacheType::Redis => super::new_redis_unpooled(redis_dsn).await,
_ => panic!(
"This test should only be run when redis is configured as the cache provider"
),
}
}

// Ensure basic set/get works -- should test sharding as well:
#[tokio::test]
Expand All @@ -326,8 +311,8 @@ mod tests {
dotenvy::dotenv().ok();
let cfg = crate::cfg::load().unwrap();

let pool = get_pool(cfg.redis_dsn.as_ref().unwrap().as_str(), &cfg).await;
let mut conn = pool.get().await.unwrap();
let mgr = RedisManager::from_cache_backend(&cfg.cache_backend()).await;
let mut conn = mgr.get().await.unwrap();

for (val, key) in "abcdefghijklmnopqrstuvwxyz".chars().enumerate() {
let key = key.to_string();
Expand Down
18 changes: 5 additions & 13 deletions server/svix-server/tests/it/message_app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use svix_server::{
message_app::AppEndpointKey,
types::{BaseId, OrganizationId},
},
redis::{new_redis_clustered_unpooled, new_redis_unpooled},
redis::RedisManager,
};

use crate::utils::{
Expand Down Expand Up @@ -65,12 +65,8 @@ async fn test_app_deletion() {
// Delete the cached [`CreateMessageApp`] here instead of waiting 30s for it to expire
let cache = match cfg.cache_backend() {
CacheBackend::None => cache::none::new(),
CacheBackend::Redis(dsn) => {
let mgr = new_redis_unpooled(dsn).await;
cache::redis::new(mgr)
}
CacheBackend::RedisCluster(dsn) => {
let mgr = new_redis_clustered_unpooled(dsn).await;
CacheBackend::Redis(_) | CacheBackend::RedisCluster(_) => {
let mgr = RedisManager::from_cache_backend(&cfg.cache_backend()).await;
cache::redis::new(mgr)
}

Expand Down Expand Up @@ -149,12 +145,8 @@ async fn test_endp_deletion() {
// Delete the cached [`CreateMessageApp`] here instead of waiting 30s for it to expire
let cache = match cfg.cache_backend() {
CacheBackend::None => cache::none::new(),
CacheBackend::Redis(dsn) => {
let mgr = new_redis_unpooled(dsn).await;
cache::redis::new(mgr)
}
CacheBackend::RedisCluster(dsn) => {
let mgr = new_redis_clustered_unpooled(dsn).await;
CacheBackend::Redis(_) | CacheBackend::RedisCluster(_) => {
let mgr = RedisManager::from_cache_backend(&cfg.cache_backend()).await;
cache::redis::new(mgr)
}

Expand Down
14 changes: 3 additions & 11 deletions server/svix-server/tests/it/redis_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,25 +8,17 @@ use std::{str::FromStr, time::Duration};

use redis::AsyncCommands as _;
use svix_server::{
cfg::{Configuration, QueueType},
cfg::Configuration,
core::types::{ApplicationId, EndpointId, MessageAttemptTriggerType, MessageId},
queue::{
new_pair, MessageTask, QueueTask, TaskQueueConsumer, TaskQueueDelivery, TaskQueueProducer,
},
redis::{new_redis_clustered_pooled, new_redis_pooled, RedisManager},
redis::RedisManager,
};

// TODO: Don't copy this from the Redis queue test directly, place the fn somewhere both can access
async fn get_pool(cfg: &Configuration) -> RedisManager {
match cfg.queue_type {
QueueType::RedisCluster => {
new_redis_clustered_pooled(cfg.redis_dsn.as_deref().unwrap(), cfg).await
}
QueueType::Redis => new_redis_pooled(cfg.redis_dsn.as_deref().unwrap(), cfg).await,
_ => {
panic!("This test should only be run when redis is configured as the queue backend")
}
}
RedisManager::from_queue_backend(&cfg.queue_backend(), cfg.redis_pool_max_size).await
}

fn task_queue_delivery_to_u16(tqd: &TaskQueueDelivery) -> u16 {
Expand Down

0 comments on commit 333f591

Please sign in to comment.