Skip to content

Commit

Permalink
Rename RedisPool to RedisManager
Browse files Browse the repository at this point in the history
Now that our redis backends are not always pooled,
having `Pool` in the name is sort of confusing, so
rename the type to something a bit more generic.
  • Loading branch information
jaymell authored and svix-james committed Jun 5, 2024
1 parent 5f88c1c commit d89d063
Show file tree
Hide file tree
Showing 4 changed files with 24 additions and 24 deletions.
8 changes: 4 additions & 4 deletions server/svix-server/src/core/cache/redis.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,15 @@ use axum::async_trait;
use redis::AsyncCommands as _;

use super::{Cache, CacheBehavior, CacheKey, Error, Result};
use crate::redis::RedisPool;
use crate::redis::RedisManager;

pub fn new(redis: RedisPool) -> Cache {
pub fn new(redis: RedisManager) -> Cache {
RedisCache { redis }.into()
}

#[derive(Clone)]
pub struct RedisCache {
redis: RedisPool,
redis: RedisManager,
}

#[async_trait]
Expand Down Expand Up @@ -112,7 +112,7 @@ mod tests {
}
}

async fn get_pool(redis_dsn: &str, cfg: &crate::cfg::Configuration) -> RedisPool {
async fn get_pool(redis_dsn: &str, cfg: &crate::cfg::Configuration) -> RedisManager {
match cfg.cache_type {
CacheType::RedisCluster => crate::redis::new_redis_clustered_unpooled(redis_dsn).await,
CacheType::Redis => crate::redis::new_redis_unpooled(redis_dsn).await,
Expand Down
8 changes: 4 additions & 4 deletions server/svix-server/src/queue/redis.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ use super::{QueueTask, TaskQueueConsumer, TaskQueueProducer};
use crate::{
cfg::{Configuration, QueueType},
error::Result,
redis::{PooledConnection, RedisPool},
redis::{PooledConnection, RedisManager},
};

/// This is the key of the main queue. As a KV store, redis places the entire stream under this key.
Expand Down Expand Up @@ -93,7 +93,7 @@ pub async fn new_pair(
/// Runs Redis queue migrations with the given delay schedule. Migrations are run on this schedule
/// such that if an old instance of the server is online after the migrations are made, that no data
/// will be lost assuming the old server is taken offline before the last scheduled delay.
async fn run_migration_schedule(delays: &[Duration], pool: RedisPool) {
async fn run_migration_schedule(delays: &[Duration], pool: RedisManager) {
let mut conn = pool
.get()
.await
Expand Down Expand Up @@ -358,10 +358,10 @@ pub mod tests {
cfg::{Configuration, QueueType},
core::types::{ApplicationId, EndpointId, MessageAttemptTriggerType, MessageId},
queue::{MessageTask, QueueTask, TaskQueueConsumer, TaskQueueProducer},
redis::RedisPool,
redis::RedisManager,
};

async fn get_pool(cfg: &Configuration) -> RedisPool {
async fn get_pool(cfg: &Configuration) -> RedisManager {
match cfg.queue_type {
QueueType::RedisCluster => {
crate::redis::new_redis_clustered_pooled(cfg.redis_dsn.as_deref().unwrap(), cfg)
Expand Down
28 changes: 14 additions & 14 deletions server/svix-server/src/redis/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,14 @@ use crate::cfg::Configuration;
pub const REDIS_CONN_TIMEOUT: Duration = Duration::from_secs(2);

#[derive(Clone, Debug)]
pub enum RedisPool {
pub enum RedisManager {
Clustered(ClusteredRedisPool),
ClusteredUnpooled(ClusteredRedisUnpooled),
NonClustered(NonClusteredRedisPool),
NonClusteredUnpooled(NonClusteredRedisUnpooled),
}

impl RedisPool {
impl RedisManager {
pub async fn get(&self) -> Result<PooledConnection<'_>, RunError<RedisError>> {
match self {
Self::Clustered(pool) => pool.get().await,
Expand Down Expand Up @@ -234,7 +234,7 @@ async fn new_redis_pool_helper(
redis_dsn: &str,
clustered: bool,
max_connections: u16,
) -> RedisPool {
) -> RedisManager {
if clustered {
let mgr = RedisClusterConnectionManager::new(redis_dsn)
.expect("Error initializing redis cluster client");
Expand All @@ -244,7 +244,7 @@ async fn new_redis_pool_helper(
.await
.expect("Error initializing redis cluster connection pool");
let pool = ClusteredRedisPool { pool };
RedisPool::Clustered(pool)
RedisManager::Clustered(pool)
} else {
let mgr = RedisConnectionManager::new(redis_dsn).expect("Error initializing redis client");
let pool = bb8::Pool::builder()
Expand All @@ -253,11 +253,11 @@ async fn new_redis_pool_helper(
.await
.expect("Error initializing redis connection pool");
let pool = NonClusteredRedisPool { pool };
RedisPool::NonClustered(pool)
RedisManager::NonClustered(pool)
}
}

async fn new_redis_unpooled_helper(redis_dsn: &str, clustered: bool) -> RedisPool {
async fn new_redis_unpooled_helper(redis_dsn: &str, clustered: bool) -> RedisManager {
if clustered {
let cli = redis::cluster::ClusterClient::builder(vec![redis_dsn])
.retries(1)
Expand All @@ -268,7 +268,7 @@ async fn new_redis_unpooled_helper(redis_dsn: &str, clustered: bool) -> RedisPoo
.get_async_connection()
.await
.expect("Failed to get redis-cluster-unpooled connection");
RedisPool::ClusteredUnpooled(ClusteredRedisUnpooled { con })
RedisManager::ClusteredUnpooled(ClusteredRedisUnpooled { con })
} else {
let cli = redis::Client::open(redis_dsn).expect("Error initializing redis unpooled client");
let con = redis::aio::ConnectionManager::new_with_backoff_and_timeouts(
Expand All @@ -281,34 +281,34 @@ async fn new_redis_unpooled_helper(redis_dsn: &str, clustered: bool) -> RedisPoo
)
.await
.expect("Failed to get redis-unpooled connection manager");
RedisPool::NonClusteredUnpooled(NonClusteredRedisUnpooled { con })
RedisManager::NonClusteredUnpooled(NonClusteredRedisUnpooled { con })
}
}

pub async fn new_redis_clustered_pooled(redis_dsn: &str, cfg: &Configuration) -> RedisPool {
pub async fn new_redis_clustered_pooled(redis_dsn: &str, cfg: &Configuration) -> RedisManager {
new_redis_pool_helper(redis_dsn, true, cfg.redis_pool_max_size).await
}

pub async fn new_redis_clustered_unpooled(redis_dsn: &str) -> RedisPool {
pub async fn new_redis_clustered_unpooled(redis_dsn: &str) -> RedisManager {
new_redis_unpooled_helper(redis_dsn, true).await
}

pub async fn new_redis_pooled(redis_dsn: &str, cfg: &Configuration) -> RedisPool {
pub async fn new_redis_pooled(redis_dsn: &str, cfg: &Configuration) -> RedisManager {
new_redis_pool_helper(redis_dsn, false, cfg.redis_pool_max_size).await
}

pub async fn new_redis_unpooled(redis_dsn: &str) -> RedisPool {
pub async fn new_redis_unpooled(redis_dsn: &str) -> RedisManager {
new_redis_unpooled_helper(redis_dsn, false).await
}

#[cfg(test)]
mod tests {
use redis::AsyncCommands;

use super::RedisPool;
use super::RedisManager;
use crate::cfg::{CacheType, Configuration};

async fn get_pool(redis_dsn: &str, cfg: &Configuration) -> RedisPool {
async fn get_pool(redis_dsn: &str, cfg: &Configuration) -> RedisManager {
match cfg.cache_type {
CacheType::RedisCluster => super::new_redis_clustered_unpooled(redis_dsn).await,
CacheType::Redis => super::new_redis_unpooled(redis_dsn).await,
Expand Down
4 changes: 2 additions & 2 deletions server/svix-server/tests/it/redis_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,11 @@ use svix_server::{
queue::{
new_pair, MessageTask, QueueTask, TaskQueueConsumer, TaskQueueDelivery, TaskQueueProducer,
},
redis::{new_redis_clustered_pooled, new_redis_pooled, RedisPool},
redis::{new_redis_clustered_pooled, new_redis_pooled, RedisManager},
};

// TODO: Don't copy this from the Redis queue test directly, place the fn somewhere both can access
async fn get_pool(cfg: &Configuration) -> RedisPool {
async fn get_pool(cfg: &Configuration) -> RedisManager {
match cfg.queue_type {
QueueType::RedisCluster => {
new_redis_clustered_pooled(cfg.redis_dsn.as_deref().unwrap(), cfg).await
Expand Down

0 comments on commit d89d063

Please sign in to comment.