From ef46b79735252082170fa2ef8aa49ccad12f5a46 Mon Sep 17 00:00:00 2001 From: Jonas Platte Date: Wed, 28 Feb 2024 10:52:43 +0100 Subject: [PATCH] WIP: redis omniqueue --- server/Cargo.lock | 18 +- server/svix-server/Cargo.toml | 2 +- server/svix-server/src/queue/mod.rs | 34 +- server/svix-server/src/queue/redis.rs | 590 ++++++-------------------- 4 files changed, 153 insertions(+), 491 deletions(-) diff --git a/server/Cargo.lock b/server/Cargo.lock index e15aca8f3..954b02d51 100644 --- a/server/Cargo.lock +++ b/server/Cargo.lock @@ -2501,10 +2501,14 @@ version = "0.1.0" source = "git+https://github.com/svix/omniqueue-rs.git?rev=044fc688670759859f193bb646fac043bbfd08ba#044fc688670759859f193bb646fac043bbfd08ba" dependencies = [ "async-trait", + "bb8", + "bb8-redis", "futures-util", "lapin", + "redis", "serde", "serde_json", + "svix-ksuid 0.8.0", "thiserror", "tokio", "tracing", @@ -4252,6 +4256,18 @@ dependencies = [ "getrandom", ] +[[package]] +name = "svix-ksuid" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "66f014385b7fc154f59e9480770c2187b6e61037c2439895788a9a4d421d7859" +dependencies = [ + "base-encode", + "byteorder", + "getrandom", + "time", +] + [[package]] name = "svix-server" version = "1.20.0" @@ -4304,7 +4320,7 @@ dependencies = [ "serde_path_to_error", "sqlx", "svix", - "svix-ksuid", + "svix-ksuid 0.5.3", "svix-server_derive", "thiserror", "tikv-jemallocator", diff --git a/server/svix-server/Cargo.toml b/server/svix-server/Cargo.toml index 44aa11f2a..1365f77aa 100644 --- a/server/svix-server/Cargo.toml +++ b/server/svix-server/Cargo.toml @@ -72,7 +72,7 @@ urlencoding = "2.1.2" form_urlencoded = "1.1.0" lapin = "2.1.1" sentry = { version = "0.32.2", features = ["tracing"] } -omniqueue = { git = "https://github.com/svix/omniqueue-rs.git", rev = "044fc688670759859f193bb646fac043bbfd08ba", default-features = false, features = ["in_memory", "rabbitmq"] } +omniqueue = { git = "https://github.com/svix/omniqueue-rs.git", rev = "044fc688670759859f193bb646fac043bbfd08ba", default-features = false, features = ["in_memory", "rabbitmq", "redis_cluster"] } [target.'cfg(not(target_env = "msvc"))'.dependencies] tikv-jemallocator = { version = "0.5", optional = true } diff --git a/server/svix-server/src/queue/mod.rs b/server/svix-server/src/queue/mod.rs index eae43f448..375d033ac 100644 --- a/server/svix-server/src/queue/mod.rs +++ b/server/svix-server/src/queue/mod.rs @@ -1,12 +1,11 @@ use std::{sync::Arc, time::Duration}; use axum::async_trait; -use chrono::{DateTime, Utc}; use omniqueue::{ - backends::InMemoryBackend, Delivery, DynConsumer, QueueConsumer, ScheduledQueueProducer, + backends::InMemoryBackend, Delivery, DynConsumer, DynScheduledQueueProducer, QueueConsumer, + ScheduledQueueProducer, }; use serde::{Deserialize, Serialize}; -use svix_ksuid::*; use crate::error::Traceable; use crate::{ @@ -18,8 +17,6 @@ use crate::{ error::{Error, ErrorType, Result}, }; -use self::redis::{RedisQueueConsumer, RedisQueueInner, RedisQueueProducer}; - pub mod rabbitmq; pub mod redis; @@ -38,13 +35,9 @@ pub async fn new_pair( prefix: Option<&str>, ) -> (TaskQueueProducer, TaskQueueConsumer) { match cfg.queue_backend() { - QueueBackend::Redis(dsn) => { - let pool = crate::redis::new_redis_pool(dsn, cfg).await; - redis::new_pair(pool, prefix).await - } + QueueBackend::Redis(dsn) => redis::new_pair(dsn, cfg.redis_pool_max_size, prefix).await, QueueBackend::RedisCluster(dsn) => { - let pool = crate::redis::new_redis_pool_clustered(dsn, cfg).await; - redis::new_pair(pool, prefix).await + redis::new_cluster_pair(dsn, cfg.redis_pool_max_size, prefix).await } QueueBackend::Memory => { let (producer, consumer) = InMemoryBackend::builder() @@ -143,8 +136,7 @@ impl QueueTask { #[derive(Clone)] pub enum TaskQueueProducer { - Redis(RedisQueueProducer), - Omni(Arc), + Omni(Arc), } impl TaskQueueProducer { @@ -153,7 +145,6 @@ impl TaskQueueProducer { run_with_retries( || async { match self { - TaskQueueProducer::Redis(q) => q.send(task.clone(), delay).await, TaskQueueProducer::Omni(q) => if let Some(delay) = delay { q.send_serde_json_scheduled(task.as_ref(), delay).await } else { @@ -170,14 +161,12 @@ impl TaskQueueProducer { } pub enum TaskQueueConsumer { - Redis(RedisQueueConsumer), Omni(DynConsumer), } impl TaskQueueConsumer { pub async fn receive_all(&mut self) -> Result> { match self { - TaskQueueConsumer::Redis(q) => q.receive_all().await.trace(), TaskQueueConsumer::Omni(q) => { const MAX_MESSAGES: usize = 128; // FIXME(onelson): need to figure out what deadline/duration to use here @@ -196,7 +185,6 @@ impl TaskQueueConsumer { /// Used by TaskQueueDeliveries to Ack/Nack itself #[derive(Debug)] enum Acker { - Redis(Arc), Omni(Delivery), } @@ -208,16 +196,6 @@ pub struct TaskQueueDelivery { } impl TaskQueueDelivery { - /// The `timestamp` is when this message will be delivered at - fn from_arc(task: Arc, timestamp: Option>, acker: Acker) -> Self { - let ksuid = KsuidMs::new(timestamp, None); - Self { - id: ksuid.to_string(), - task, - acker, - } - } - pub async fn ack(self) -> Result<()> { tracing::trace!("ack {}", self.id); @@ -230,7 +208,6 @@ impl TaskQueueDelivery { .as_ref() .expect("acker is always Some when trying to ack"); match acker_ref { - Acker::Redis(q) => q.ack(&self.id, &self.task).await.trace(), Acker::Omni(_) => match acker.take() { Some(Acker::Omni(delivery)) => { delivery.ack().await.map_err(|(e, delivery)| { @@ -263,7 +240,6 @@ impl TaskQueueDelivery { .as_ref() .expect("acker is always Some when trying to ack"); match acker_ref { - Acker::Redis(q) => q.nack(&self.id, &self.task).await.trace(), Acker::Omni(_) => match acker.take() { Some(Acker::Omni(delivery)) => { delivery diff --git a/server/svix-server/src/queue/redis.rs b/server/svix-server/src/queue/redis.rs index aeec686ea..88dde9861 100644 --- a/server/svix-server/src/queue/redis.rs +++ b/server/svix-server/src/queue/redis.rs @@ -29,25 +29,18 @@ use std::{num::NonZeroUsize, sync::Arc, time::Duration}; -use axum::async_trait; - -use chrono::Utc; -use redis::{ - streams::{StreamClaimReply, StreamId, StreamReadOptions, StreamReadReply}, - AsyncCommands as _, Cmd, FromRedisValue, RedisResult, RedisWrite, ToRedisArgs, -}; -use tokio::time::sleep; - -use crate::{ - error::{Error, Result}, - queue::Acker, - redis::RedisPool, +use bb8_redis::RedisMultiplexedConnectionManager; +use omniqueue::{ + backends::{ + redis::{RedisClusterConnectionManager, RedisConnection}, + RedisBackend, RedisConfig, + }, + QueueConsumer as _, ScheduledQueueProducer as _, }; +use redis::{RedisResult, RedisWrite, ToRedisArgs}; -use super::{ - QueueTask, TaskQueueConsumer, TaskQueueDelivery, TaskQueueProducer, TaskQueueReceive, - TaskQueueSend, -}; +use super::{QueueTask, TaskQueueConsumer, TaskQueueProducer}; +use crate::error::Result; /// This is the key of the main queue. As a KV store, redis places the entire stream under this key. /// Confusingly, each message in the queue may have any number of KV pairs. @@ -79,26 +72,23 @@ const WORKER_CONSUMER: &str = "svix_workers_consumer"; /// Special ID for XADD command's which generates a stream ID automatically const GENERATE_STREAM_ID: &str = "*"; -/// Special ID for XREADGROUP commands which reads any new messages -const LISTEN_STREAM_ID: &str = ">"; /// Each queue item has a set of KV pairs associated with it, for simplicity a sing key, "data" is /// used with the entire [`QueueTask`] as the value in serialized JSON const QUEUE_KV_KEY: &str = "data"; -/// The maximum number of pending messages to reinsert into the queue after becoming stale per loop -const PENDING_BATCH_SIZE: i16 = 1000; - trait SendConnectionLike: redis::aio::ConnectionLike + Send {} impl SendConnectionLike for T {} /// Generates a [`TaskQueueProducer`] and a [`TaskQueueConsumer`] backed by Redis. pub async fn new_pair( - pool: RedisPool, + dsn: &str, + max_pool_size: u16, prefix: Option<&str>, ) -> (TaskQueueProducer, TaskQueueConsumer) { - new_pair_inner( - pool, + new_pair_inner::( + dsn, + max_pool_size, Duration::from_secs(45), prefix.unwrap_or_default(), MAIN, @@ -107,152 +97,22 @@ pub async fn new_pair( ) .await } - -struct StreamAutoclaimReply { - ids: Vec, -} - -impl FromRedisValue for StreamAutoclaimReply { - fn from_redis_value(v: &redis::Value) -> RedisResult { - // First try the two member array from before Redis 7.0 - match <((), StreamClaimReply)>::from_redis_value(v) { - Ok(res) => Ok(StreamAutoclaimReply { ids: res.1.ids }), - - // If it's a type error, then try the three member array from Redis 7.0 and after - Err(e) if e.kind() == redis::ErrorKind::TypeError => { - <((), StreamClaimReply, ())>::from_redis_value(v) - .map(|ok| StreamAutoclaimReply { ids: ok.1.ids }) - } - - // Any other error should be returned as is - Err(e) => Err(e), - } - } -} - -async fn background_task_pending( - pool: RedisPool, - main_queue_name: String, - pending_duration: i64, -) -> Result<()> { - let mut pool = pool.get().await?; - - // Every iteration checks whether the processing queue has items that should be picked back up, - // claiming them in the process - let mut cmd = redis::cmd("XAUTOCLAIM"); - cmd.arg(&main_queue_name) - .arg(WORKERS_GROUP) - .arg(WORKER_CONSUMER) - .arg(pending_duration) - .arg("-") - .arg("COUNT") - .arg(PENDING_BATCH_SIZE); - - let StreamAutoclaimReply { ids } = pool.query_async(cmd).await?; - - if !ids.is_empty() { - let mut pipe = redis::pipe(); - - // And reinsert the map of KV pairs into the MAIN qunue with a new stream ID - for StreamId { map, .. } in &ids { - let _ = pipe.xadd( - &main_queue_name, - GENERATE_STREAM_ID, - &map.iter() - .filter_map(|(k, v)| { - if let redis::Value::Data(data) = v { - Some((k.as_str(), data.as_slice())) - } else { - None - } - }) - .collect::>(), - ); - } - - let _: () = pool.query_async_pipeline(pipe).await?; - - // Acknowledge all the stale ones so the pending queue is cleared - let ids: Vec<_> = ids.iter().map(|wrapped| &wrapped.id).collect(); - - let mut pipe = redis::pipe(); - pipe.add_command(Cmd::xack(&main_queue_name, WORKERS_GROUP, &ids)); - pipe.add_command(Cmd::xdel(&main_queue_name, &ids)); - - let _: () = pool.query_async_pipeline(pipe).await?; - } else { - // Wait for half a second before attempting to fetch again if nothing was found - sleep(Duration::from_millis(500)).await; - } - - Ok(()) -} - -async fn background_task_delayed( - pool: RedisPool, - main_queue_name: String, - delayed_queue_name: String, - delayed_lock: &str, -) -> Result<()> { - let batch_size: isize = 50; - - let mut pool = pool.get().await?; - - // There is a lock on the delayed queue processing to avoid race conditions. So first try to - // acquire the lock should it not already exist. The lock expires after five seconds in case a - // worker crashes while holding the lock. - let mut cmd = redis::cmd("SET"); - cmd.arg(delayed_lock) - .arg(true) - .arg("NX") - .arg("PX") - .arg(5000); - // WIll be Some("OK") when set or None when not set - let resp: Option = pool.query_async(cmd).await?; - - if resp.as_deref() == Some("OK") { - // First look for delayed keys whose time is up and add them to the main qunue - let timestamp = Utc::now().timestamp(); - let keys: Vec = pool - .zrangebyscore_limit(&delayed_queue_name, 0isize, timestamp, 0isize, batch_size) - .await?; - - if !keys.is_empty() { - let tasks: Vec<&str> = keys - .iter() - // All information is stored in the key in which the ID and JSON formatted task - // are separated by a `|`. So, take the key, then take the part after the `|` - .map(|x| x.split('|').nth(1).expect("Improper key format")) - .collect(); - - // For each task, XADD them to the MAIN queue - let mut pipe = redis::pipe(); - for task in tasks { - let _ = pipe.xadd( - &main_queue_name, - GENERATE_STREAM_ID, - &[(QUEUE_KV_KEY, task)], - ); - } - let _: () = pool.query_async_pipeline(pipe).await?; - - // Then remove the tasks from the delayed queue so they aren't resent - let _: () = pool.zrem(&delayed_queue_name, keys).await?; - - // Make sure to release the lock after done processing - let _: () = pool.del(delayed_lock).await?; - } else { - // Make sure to release the lock before sleeping - let _: () = pool.del(delayed_lock).await?; - // Wait for half a second before attempting to fetch again if nothing was found - sleep(Duration::from_millis(500)).await; - } - } else { - // Also sleep half a second if the lock could not be fetched - sleep(Duration::from_millis(500)).await; - } - - Ok(()) +/// Generates a [`TaskQueueProducer`] and a [`TaskQueueConsumer`] backed by a Redis cluster. +pub async fn new_cluster_pair( + dsn: &str, + max_pool_size: u16, + prefix: Option<&str>, +) -> (TaskQueueProducer, TaskQueueConsumer) { + new_pair_inner::( + dsn, + max_pool_size, + Duration::from_secs(45), + prefix.unwrap_or_default(), + MAIN, + DELAYED, + DELAYED_LOCK, + ) + .await } /// Runs Redis queue migrations with the given delay schedule. Migrations are run on this schedule @@ -277,26 +137,30 @@ async fn run_migration_schedule(delays: &[Duration], conn: &mut impl SendConnect } /// An inner function allowing key constants to be variable for testing purposes -async fn new_pair_inner( - pool: RedisPool, +async fn new_pair_inner( + dsn: &str, + max_pool_size: u16, pending_duration: Duration, queue_prefix: &str, main_queue_name: &'static str, delayed_queue_name: &'static str, - delayed_lock: &'static str, + delayed_lock_name: &'static str, ) -> (TaskQueueProducer, TaskQueueConsumer) { + use redis::AsyncCommands as _; + let main_queue_name = format!("{queue_prefix}{main_queue_name}"); let delayed_queue_name = format!("{queue_prefix}{delayed_queue_name}"); - let delayed_lock = format!("{queue_prefix}{delayed_lock}"); + let delayed_lock_name = format!("{queue_prefix}{delayed_lock_name}"); + + let client = redis::Client::open(dsn).expect("Error creating redis client"); + let mut conn = client + .get_async_connection() + .await + .expect("Error establishing redis connection"); // Create the stream and consumer group for the MAIN queue should it not already exist. The // consumer is created automatically upon use so it does not have to be created here. { - let mut conn = pool - .get() - .await - .expect("Error retrieving connection from Redis pool"); - let consumer_group_resp: RedisResult<()> = conn .xgroup_create_mkstream(&main_queue_name, WORKERS_GROUP, 0i8) .await; @@ -325,89 +189,50 @@ async fn new_pair_inner( .try_into() .expect("Pending duration out of bounds"); - let worker_pool = pool.clone(); - let mqn = main_queue_name.clone(); - let dqn = delayed_queue_name.clone(); - // Migrate v1 queues to v2 and v2 queues to v3 on a loop with exponential backoff. - tokio::spawn({ - let pool = pool.clone(); - - async move { - let mut conn = pool - .get() - .await - .expect("Error retrieving connection from Redis pool"); - - let delays = [ - // 11.25 min - Duration::from_secs(60 * 11 + 15), - // 22.5 min - Duration::from_secs(60 * 22 + 30), - // 45 min - Duration::from_secs(60 * 45), - // 1.5 hours - Duration::from_secs(60 * 30 * 3), - // 3 hours - Duration::from_secs(60 * 60 * 3), - // 6 hours - Duration::from_secs(60 * 60 * 6), - // 12 hours - Duration::from_secs(60 * 60 * 12), - // 24 hours - Duration::from_secs(60 * 60 * 24), - ]; - - run_migration_schedule(&delays, &mut conn).await; - } + tokio::spawn(async move { + let delays = [ + // 11.25 min + Duration::from_secs(60 * 11 + 15), + // 22.5 min + Duration::from_secs(60 * 22 + 30), + // 45 min + Duration::from_secs(60 * 45), + // 1.5 hours + Duration::from_secs(60 * 30 * 3), + // 3 hours + Duration::from_secs(60 * 60 * 3), + // 6 hours + Duration::from_secs(60 * 60 * 6), + // 12 hours + Duration::from_secs(60 * 60 * 12), + // 24 hours + Duration::from_secs(60 * 60 * 24), + ]; + + run_migration_schedule(&delays, &mut conn).await; }); - tokio::spawn({ - let pool = pool.clone(); - let mqn = mqn.clone(); - async move { - loop { - if let Err(err) = - background_task_delayed(pool.clone(), mqn.clone(), dqn.clone(), &delayed_lock) - .await - { - tracing::error!("{}", err); - tokio::time::sleep(Duration::from_millis(500)).await; - continue; - }; - } - } - }); - - tokio::spawn({ - let pool = pool.clone(); - async move { - loop { - if let Err(err) = - background_task_pending(pool.clone(), mqn.clone(), pending_duration).await - { - tracing::error!("{}", err); - tokio::time::sleep(Duration::from_millis(500)).await; - continue; - } - } - } - }); + let (producer, consumer) = RedisBackend::::builder(RedisConfig { + dsn: dsn.to_owned(), + max_connections: max_pool_size, + reinsert_on_nack: false, // TODO + queue_key: main_queue_name, + delayed_queue_key: delayed_queue_name, + delayed_lock_key: delayed_lock_name, + consumer_group: WORKERS_GROUP.to_owned(), + consumer_name: WORKER_CONSUMER.to_owned(), + payload_key: QUEUE_KV_KEY.to_owned(), + ack_deadline_ms: pending_duration, + }) + .build_pair() + .await + .expect("Error initializing redis queue"); - let inner = Arc::new(RedisQueueInner { - pool: worker_pool, - main_queue_name, - }); + let producer = TaskQueueProducer::Omni(Arc::new(producer.into_dyn_scheduled())); + let consumer = TaskQueueConsumer::Omni(consumer.into_dyn()); - // Once the background thread has been started, simply return the [`TaskQueueProducer`] and - // [`TaskQueueConsumer`] - ( - TaskQueueProducer::Redis(RedisQueueProducer { - inner: inner.clone(), - delayed_queue_name: Arc::new(delayed_queue_name), - }), - TaskQueueConsumer::Redis(RedisQueueConsumer(inner)), - ) + (producer, consumer) } /// Enum for the LEFT | RIGHT args used by some commands @@ -429,166 +254,10 @@ impl ToRedisArgs for Direction { } } -#[derive(Debug)] -pub(super) struct RedisQueueInner { - pool: RedisPool, - main_queue_name: String, -} - -#[derive(Clone)] -pub struct RedisQueueProducer { - inner: Arc, - delayed_queue_name: Arc, -} - -fn to_redis_key(delivery: &TaskQueueDelivery) -> String { - format!( - "{}|{}", - delivery.id, - serde_json::to_string(&delivery.task).unwrap() - ) -} - -fn from_redis_key(key: &str) -> (String, Arc) { +fn task_from_redis_key(key: &str) -> Arc { // Get the first delimiter -> it has to have the | let pos = key.find('|').unwrap(); - let id = key[..pos].to_string(); - let task = serde_json::from_str(&key[pos + 1..]).unwrap(); - (id, task) -} - -impl RedisQueueInner { - async fn send_immediately(&self, task: Arc) -> Result<()> { - let mut pool = self.pool.get().await?; - - let _: () = pool - .xadd( - &self.main_queue_name, - GENERATE_STREAM_ID, - &[( - QUEUE_KV_KEY, - serde_json::to_string(&task) - .map_err(|e| Error::generic(format!("serialization error: {}", e)))?, - )], - ) - .await?; - - Ok(()) - } - - /// ACKing the delivery, XACKs the message in the queue so it will no longer be retried - pub(super) async fn ack(&self, delivery_id: &str, task: &QueueTask) -> Result<()> { - let mut pool = self.pool.get().await?; - - let mut pipe = redis::pipe(); - - pipe.add_command(Cmd::xack( - &self.main_queue_name, - WORKERS_GROUP, - &[delivery_id], - )) - .add_command(Cmd::xdel(&self.main_queue_name, &[delivery_id])); - - let (acked, deleted): (u8, u8) = pool.query_async_pipeline(pipe).await?; - if acked != 1 || deleted != 1 { - tracing::warn!( - "Expected to remove 1 from the list, acked {acked}, deleted {deleted},\ - for {delivery_id}|{}", - serde_json::to_string(task) - .map_err(|e| Error::generic(format!("serialization error: {e}")))? - ); - } - - Ok(()) - } - - pub(super) async fn nack(&self, delivery_id: &str, task: &Arc) -> Result<()> { - tracing::debug!("nack {delivery_id}"); - self.send_immediately(task.clone()).await?; - self.ack(delivery_id, task).await - } -} - -#[async_trait] -impl TaskQueueSend for RedisQueueProducer { - async fn send(&self, task: Arc, delay: Option) -> Result<()> { - let timestamp = if let Some(delay) = delay { - Utc::now() - + chrono::Duration::from_std(delay) - .map_err(|_| Error::generic("Duration out of bounds"))? - } else { - tracing::trace!("RedisQueue: event sent (no delay)"); - return self.inner.send_immediately(task).await; - }; - - // If there's a delay, add it to the DELAYED queue by ZADDING the Redis-key-ified - // delivery - let mut pool = self.inner.pool.get().await?; - let delivery = TaskQueueDelivery::from_arc( - task.clone(), - Some(timestamp), - Acker::Redis(self.inner.clone()), - ); - let key = to_redis_key(&delivery); - let delayed_queue_name: &str = &self.delayed_queue_name; - let _: () = pool - .zadd(delayed_queue_name, key, timestamp.timestamp()) - .await?; - - tracing::trace!("RedisQueue: event sent > (delay: {:?})", delay); - Ok(()) - } -} - -#[derive(Clone)] -pub struct RedisQueueConsumer(Arc); - -#[async_trait] -impl TaskQueueReceive for RedisQueueConsumer { - async fn receive_all(&mut self) -> Result> { - let consumer = self.clone(); - tokio::spawn(async move { - // TODO: Receive messages in batches so it's not always a Vec with one member - let mut pool = consumer.0.pool.get().await?; - - // There is no way to make it await a message for unbounded times, so simply block for a short - // amount of time (to avoid locking) and loop if no messages were retrieved - let resp: StreamReadReply = pool - .xread_options( - &[&consumer.0.main_queue_name], - &[LISTEN_STREAM_ID], - &StreamReadOptions::default() - .group(WORKERS_GROUP, WORKER_CONSUMER) - .count(1) - .block(10_000), - ) - .await?; - - if !resp.keys.is_empty() && !resp.keys[0].ids.is_empty() { - let element = &resp.keys[0].ids[0]; - let id = element.id.clone(); - let map = &element.map; - - let task: QueueTask = if let Some(redis::Value::Data(data)) = map.get("data") { - serde_json::from_slice(data).expect("Invalid QueueTask") - } else { - panic!("No QueueTask associated with key"); - }; - - tracing::trace!("RedisQueue: event recv <"); - - Ok(vec![TaskQueueDelivery { - id, - task: Arc::new(task), - acker: Acker::Redis(consumer.0.clone()), - }]) - } else { - Ok(Vec::new()) - } - }) - .await - .map_err(|e| Error::generic(format!("task join error {}", e)))? - } + serde_json::from_str(&key[pos + 1..]).unwrap() } async fn migrate_v2_to_v3_queues(conn: &mut impl SendConnectionLike) -> Result<()> { @@ -603,6 +272,8 @@ async fn migrate_list_to_stream( legacy_queue: &str, queue: &str, ) -> Result<()> { + use redis::AsyncCommands as _; + let batch_size = 1000; loop { let legacy_keys: Vec = conn @@ -619,7 +290,7 @@ async fn migrate_list_to_stream( let mut pipe = redis::pipe(); for key in legacy_keys { - let (_, task) = from_redis_key(&key); + let task = task_from_redis_key(&key); let _ = pipe.xadd( queue, GENERATE_STREAM_ID, @@ -644,6 +315,8 @@ async fn migrate_list( legacy_queue: &str, queue: &str, ) -> Result<()> { + use redis::AsyncCommands as _; + let batch_size = 1000; loop { // Checking for old messages from queue @@ -667,6 +340,8 @@ async fn migrate_sset( legacy_queue: &str, queue: &str, ) -> Result<()> { + use redis::AsyncCommands as _; + let batch_size = 1000; loop { // Checking for old messages from LEGACY_DELAYED @@ -689,27 +364,23 @@ async fn migrate_sset( #[cfg(test)] pub mod tests { - use std::{sync::Arc, time::Duration}; + use std::time::Duration; use assert_matches::assert_matches; + use bb8_redis::RedisMultiplexedConnectionManager; use chrono::Utc; use redis::{streams::StreamReadReply, AsyncCommands as _}; - use super::{ - migrate_list, migrate_list_to_stream, migrate_sset, new_pair_inner, to_redis_key, Direction, - }; + use super::{migrate_list, migrate_list_to_stream, migrate_sset, new_pair_inner, Direction}; use crate::{ cfg::{CacheType, Configuration}, core::types::{ApplicationId, EndpointId, MessageAttemptTriggerType, MessageId}, - queue::{ - redis::RedisQueueInner, Acker, MessageTask, QueueTask, TaskQueueConsumer, - TaskQueueDelivery, TaskQueueProducer, - }, + queue::{MessageTask, QueueTask, TaskQueueConsumer, TaskQueueProducer}, redis::RedisPool, }; - pub async fn get_pool(cfg: Configuration) -> RedisPool { + async fn get_pool(cfg: Configuration) -> RedisPool { match cfg.cache_type { CacheType::RedisCluster => { crate::redis::new_redis_pool_clustered( @@ -718,7 +389,7 @@ pub mod tests { ) .await } - _ => crate::redis::new_redis_pool(cfg.redis_dsn.as_ref().unwrap().as_str(), &cfg).await, + _ => crate::redis::new_redis_pool(cfg.redis_dsn.as_deref().unwrap(), &cfg).await, } } @@ -806,10 +477,11 @@ pub mod tests { #[tokio::test] async fn test_idle_period() { let cfg = crate::cfg::load().unwrap(); - let pool = get_pool(cfg).await; + let pool = get_pool(cfg.clone()).await; - let (p, mut c) = new_pair_inner( - pool.clone(), + let (p, mut c) = new_pair_inner::( + cfg.redis_dsn.as_deref().unwrap(), + cfg.redis_pool_max_size, Duration::from_millis(100), "", "{test}_idle_period", @@ -871,7 +543,7 @@ pub mod tests { #[tokio::test] async fn test_ack() { let cfg = crate::cfg::load().unwrap(); - let pool = get_pool(cfg).await; + let pool = get_pool(cfg.clone()).await; // Delete the keys used in this test to ensure nothing pollutes the output let mut conn = pool @@ -887,8 +559,9 @@ pub mod tests { .await .unwrap(); - let (p, mut c) = new_pair_inner( - pool.clone(), + let (p, mut c) = new_pair_inner::( + cfg.redis_dsn.as_deref().unwrap(), + cfg.redis_pool_max_size, Duration::from_millis(5000), "", "{test}_ack", @@ -930,10 +603,10 @@ pub mod tests { #[tokio::test] async fn test_nack() { let cfg = crate::cfg::load().unwrap(); - let pool = get_pool(cfg).await; - let (p, mut c) = new_pair_inner( - pool, + let (p, mut c) = new_pair_inner::( + cfg.redis_dsn.as_deref().unwrap(), + cfg.redis_pool_max_size, Duration::from_millis(500), "", "{test}_nack", @@ -973,10 +646,10 @@ pub mod tests { #[tokio::test] async fn test_delay() { let cfg = crate::cfg::load().unwrap(); - let pool = get_pool(cfg).await; - let (p, mut c) = new_pair_inner( - pool, + let (p, mut c) = new_pair_inner::( + cfg.redis_dsn.as_deref().unwrap(), + cfg.redis_pool_max_size, Duration::from_millis(500), "", "{test}_delay", @@ -1018,10 +691,14 @@ pub mod tests { recv1.ack().await.unwrap(); } + fn to_redis_key(id: &str, task: &QueueTask) -> String { + format!("{id}|{}", serde_json::to_string(task).unwrap()) + } + #[tokio::test] async fn test_migrations() { let cfg = crate::cfg::load().unwrap(); - let pool = get_pool(cfg).await; + let pool = get_pool(cfg.clone()).await; // Test queue name constants let v1_main = "{test}_migrations_main_v1"; @@ -1065,20 +742,16 @@ pub mod tests { let _: () = conn .rpush( v1_main, - to_redis_key(&TaskQueueDelivery { - id: num.to_string(), - task: Arc::new(QueueTask::MessageV1(MessageTask { + to_redis_key( + &num.to_string(), + &QueueTask::MessageV1(MessageTask { msg_id: MessageId(format!("TestMessageID{num}")), app_id: ApplicationId("TestApplicationID".to_owned()), endpoint_id: EndpointId("TestEndpointID".to_owned()), trigger_type: MessageAttemptTriggerType::Manual, attempt_count: 0, - })), - acker: Acker::Redis(Arc::new(RedisQueueInner { - pool: pool.clone(), - main_queue_name: v1_main.to_owned(), - })), - }), + }), + ), ) .await .unwrap(); @@ -1088,20 +761,16 @@ pub mod tests { let _: () = conn .zadd( v1_delayed, - to_redis_key(&TaskQueueDelivery { - id: num.to_string(), - task: Arc::new(QueueTask::MessageV1(MessageTask { + to_redis_key( + &num.to_string(), + &QueueTask::MessageV1(MessageTask { msg_id: MessageId(format!("TestMessageID{num}")), app_id: ApplicationId("TestApplicationID".to_owned()), endpoint_id: EndpointId("TestEndpointID".to_owned()), trigger_type: MessageAttemptTriggerType::Manual, attempt_count: 0, - })), - acker: Acker::Redis(Arc::new(RedisQueueInner { - pool: pool.clone(), - main_queue_name: v1_main.to_owned(), - })), - }), + }), + ), Utc::now().timestamp() + 2, ) .await @@ -1139,8 +808,9 @@ pub mod tests { } // Read - let (_p, mut c) = new_pair_inner( - pool, + let (_p, mut c) = new_pair_inner::( + cfg.redis_dsn.as_deref().unwrap(), + cfg.redis_pool_max_size, Duration::from_secs(5), "", v3_main,