Skip to content

Commit

Permalink
Add redis sentinel support
Browse files Browse the repository at this point in the history
This adds support for using Redis Sentinel for queuing and caching.
Sentinel potentially requires a lot of additional configuration params,
so support for that that has been added here, although the `redis_dsn`
field has been reused and should point to a sentinel server if using the
`redissentinel` queue/cache types.
  • Loading branch information
jaymell authored and svix-james committed Oct 4, 2024
1 parent 5e4e33d commit d5d90f1
Show file tree
Hide file tree
Showing 11 changed files with 484 additions and 71 deletions.
62 changes: 52 additions & 10 deletions server/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

13 changes: 13 additions & 0 deletions server/run-tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -80,3 +80,16 @@ echo "*********** RUN 6 ***********"
${TEST_COMMAND} -- --ignored rabbitmq
fi
)

echo "*********** RUN 7 ***********"
(
export SVIX_QUEUE_TYPE="redissentinel"
export SVIX_CACHE_TYPE="redissentinel"
export SVIX_REDIS_DSN="redis://localhost:26379"
export SVIX_SENTINEL_SERVICE_NAME="master0"

${TEST_COMMAND} "$@"
if [[ -z "$@" ]]; then
${TEST_COMMAND} -- --ignored redis
fi
)
6 changes: 3 additions & 3 deletions server/svix-server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,8 @@ ed25519-compact = "2.1.1"
chrono = { version="0.4.26", features = ["serde"] }
reqwest = { version = "0.11.27", features = ["json", "rustls-tls", "hickory-resolver"], default-features = false }
bb8 = "0.8"
bb8-redis = "0.15.0"
redis = { version = "0.25.4", features = ["tokio-comp", "tokio-native-tls-comp", "streams", "cluster-async", "tcp_nodelay", "connection-manager"] }
bb8-redis = "0.16.0"
redis = { version = "0.26", features = ["tokio-comp", "tokio-native-tls-comp", "streams", "cluster-async", "tcp_nodelay", "connection-manager", "sentinel"] }
thiserror = "1.0.30"
bytes = "1.1.0"
blake2 = "0.10.4"
Expand All @@ -68,7 +68,7 @@ urlencoding = "2.1.2"
form_urlencoded = "1.1.0"
lapin = "2.1.1"
sentry = { version = "0.32.2", features = ["tracing"] }
omniqueue = { git = "https://github.com/svix/omniqueue-rs", rev = "5ae22000e2ea214ba707cac81657f098e5785a76", default-features = false, features = ["in_memory", "rabbitmq-with-message-ids", "redis_cluster"] }
omniqueue = { git = "https://github.com/svix/omniqueue-rs", rev = "75e5a9510ad338ac3702b2e911bacf8967ac58d8", default-features = false, features = ["in_memory", "rabbitmq-with-message-ids", "redis_cluster", "redis_sentinel"] }
# Not a well-known author, and no longer gets updates => pinned.
# Switch to hyper-http-proxy when upgrading hyper to 1.0.
hyper-proxy = { version = "=0.9.1", default-features = false, features = ["openssl-tls"] }
Expand Down
93 changes: 93 additions & 0 deletions server/svix-server/src/cfg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,11 +131,16 @@ pub struct ConfigurationInner {
pub db_pool_max_size: u16,

/// The DSN for redis (can be left empty if not using redis)
/// Note that if using Redis Sentinel, this will be the the DSN
/// for a Sentinel instance.
pub redis_dsn: Option<String>,
/// The maximum number of connections for the Redis pool
#[validate(range(min = 10))]
pub redis_pool_max_size: u16,

#[serde(flatten, default)]
pub redis_sentinel_cfg: Option<SentinelConfig>,

/// What kind of message queue to use. Supported: memory, redis (must have redis_dsn or
/// queue_dsn configured).
pub queue_type: QueueType,
Expand Down Expand Up @@ -255,6 +260,27 @@ fn validate_config_complete(config: &ConfigurationInner) -> Result<(), Validatio
});
}
}
CacheType::RedisSentinel => {
if config.cache_dsn().is_none() {
return Err(ValidationError {
code: Cow::from("missing field"),
message: Some(Cow::from(
"The redis_dsn or cache_dsn field must be set if the cache_type is `redissentinel`"
)),
params: HashMap::new(),
});
}

if config.redis_sentinel_cfg.is_none() {
return Err(ValidationError {
code: Cow::from("missing field"),
message: Some(Cow::from(
"sentinel_service_name must be set if the cache_type is `redissentinel`",
)),
params: HashMap::new(),
});
}
}
}

match config.queue_type {
Expand All @@ -281,6 +307,27 @@ fn validate_config_complete(config: &ConfigurationInner) -> Result<(), Validatio
});
}
}
QueueType::RedisSentinel => {
if config.queue_dsn().is_none() {
return Err(ValidationError {
code: Cow::from("missing field"),
message: Some(Cow::from(
"The redis_dsn or queue_dsn field must be set if the queue_type is `redissentinel`"
)),
params: HashMap::new(),
});
}

if config.redis_sentinel_cfg.is_none() {
return Err(ValidationError {
code: Cow::from("missing field"),
message: Some(Cow::from(
"sentinel_service_name must be set if the queue_type is `redissentinel`",
)),
params: HashMap::new(),
});
}
}
}

Ok(())
Expand All @@ -304,6 +351,10 @@ impl ConfigurationInner {
QueueType::Memory => QueueBackend::Memory,
QueueType::Redis => QueueBackend::Redis(self.queue_dsn().expect(err)),
QueueType::RedisCluster => QueueBackend::RedisCluster(self.queue_dsn().expect(err)),
QueueType::RedisSentinel => QueueBackend::RedisSentinel(
self.queue_dsn().expect(err),
self.redis_sentinel_cfg.as_ref().expect(err),
),
QueueType::RabbitMQ => QueueBackend::RabbitMq(self.rabbit_dsn.as_ref().expect(err)),
}
}
Expand All @@ -318,6 +369,10 @@ impl ConfigurationInner {
CacheType::Memory => CacheBackend::Memory,
CacheType::Redis => CacheBackend::Redis(self.cache_dsn().expect(err)),
CacheType::RedisCluster => CacheBackend::RedisCluster(self.cache_dsn().expect(err)),
CacheType::RedisSentinel => CacheBackend::RedisSentinel(
self.cache_dsn().expect(err),
self.redis_sentinel_cfg.as_ref().expect(err),
),
}
}
}
Expand Down Expand Up @@ -346,6 +401,7 @@ pub enum QueueBackend<'a> {
Memory,
Redis(&'a str),
RedisCluster(&'a str),
RedisSentinel(&'a str, &'a SentinelConfig),
RabbitMq(&'a str),
}

Expand All @@ -355,6 +411,7 @@ pub enum CacheBackend<'a> {
Memory,
Redis(&'a str),
RedisCluster(&'a str),
RedisSentinel(&'a str, &'a SentinelConfig),
}

#[derive(Clone, Debug, Deserialize)]
Expand All @@ -378,6 +435,7 @@ pub enum QueueType {
Memory,
Redis,
RedisCluster,
RedisSentinel,
RabbitMQ,
}

Expand All @@ -387,6 +445,7 @@ pub enum CacheType {
Memory,
Redis,
RedisCluster,
RedisSentinel,
None,
}

Expand Down Expand Up @@ -430,6 +489,40 @@ impl fmt::Display for LogLevel {
}
}

#[derive(Clone, Debug, Deserialize, Eq, PartialEq)]
pub struct SentinelConfig {
#[serde(rename = "sentinel_service_name")]
pub service_name: String,
#[serde(default)]
pub redis_tls_mode_secure: bool,
pub redis_db: Option<i64>,
pub redis_username: Option<String>,
pub redis_password: Option<String>,
#[serde(default)]
pub redis_use_resp3: bool,
}

impl From<SentinelConfig> for omniqueue::backends::redis::SentinelConfig {
fn from(val: SentinelConfig) -> Self {
let SentinelConfig {
service_name,
redis_tls_mode_secure,
redis_db,
redis_username,
redis_password,
redis_use_resp3,
} = val;
omniqueue::backends::redis::SentinelConfig {
service_name,
redis_tls_mode_secure,
redis_db,
redis_username,
redis_password,
redis_use_resp3,
}
}
}

pub fn load() -> Result<Arc<ConfigurationInner>> {
if let Ok(db_url) = std::env::var("DATABASE_URL") {
// If we have DATABASE_URL set, we should potentially use it.
Expand Down
4 changes: 3 additions & 1 deletion server/svix-server/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,9 @@ pub async fn run_with_prefix(
let cache = match &cache_backend {
CacheBackend::None => cache::none::new(),
CacheBackend::Memory => cache::memory::new(),
CacheBackend::Redis(_) | CacheBackend::RedisCluster(_) => {
CacheBackend::Redis(_)
| CacheBackend::RedisCluster(_)
| CacheBackend::RedisSentinel(_, _) => {
let mgr = RedisManager::from_cache_backend(&cache_backend).await;
cache::redis::new(mgr)
}
Expand Down
6 changes: 3 additions & 3 deletions server/svix-server/src/queue/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,9 @@ pub async fn new_pair(
prefix: Option<&str>,
) -> (TaskQueueProducer, TaskQueueConsumer) {
match cfg.queue_backend() {
QueueBackend::Redis(_) | QueueBackend::RedisCluster(_) => {
redis::new_pair(cfg, prefix).await
}
QueueBackend::Redis(_)
| QueueBackend::RedisCluster(_)
| QueueBackend::RedisSentinel(_, _) => redis::new_pair(cfg, prefix).await,
QueueBackend::Memory => {
let (producer, consumer) = InMemoryBackend::builder()
.build_pair()
Expand Down
15 changes: 14 additions & 1 deletion server/svix-server/src/queue/redis.rs
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,8 @@ async fn new_pair_inner(
consumer_name: WORKER_CONSUMER.to_owned(),
payload_key: QUEUE_KV_KEY.to_owned(),
ack_deadline_ms: pending_duration,
dlq_config: None,
sentinel_config: cfg.redis_sentinel_cfg.clone().map(|c| c.into()),
};

match &cfg.queue_type {
Expand All @@ -219,7 +221,17 @@ async fn new_pair_inner(
let consumer = TaskQueueConsumer::new(consumer);
(producer, consumer)
}
_ => {
QueueType::RedisSentinel => {
let (producer, consumer) = RedisBackend::sentinel_builder(config)
.build_pair()
.await
.expect("Error initializing redis-cluster queue");

let producer = TaskQueueProducer::new(producer);
let consumer = TaskQueueConsumer::new(consumer);
(producer, consumer)
}
QueueType::Redis => {
let (producer, consumer) = RedisBackend::builder(config)
.build_pair()
.await
Expand All @@ -229,6 +241,7 @@ async fn new_pair_inner(
let consumer = TaskQueueConsumer::new(consumer);
(producer, consumer)
}
_ => panic!("Unsupported backend!"),
}
}

Expand Down
Loading

0 comments on commit d5d90f1

Please sign in to comment.