diff --git a/server/svix-server/src/queue/redis.rs b/server/svix-server/src/queue/redis.rs index 6fadb37a5..9901a3a51 100644 --- a/server/svix-server/src/queue/redis.rs +++ b/server/svix-server/src/queue/redis.rs @@ -270,12 +270,10 @@ impl ToRedisArgs for Direction { } } -fn from_redis_key(key: &str) -> (String, Arc) { +fn task_from_redis_key(key: &str) -> Arc { // Get the first delimiter -> it has to have the | let pos = key.find('|').unwrap(); - let id = key[..pos].to_string(); - let task = serde_json::from_str(&key[pos + 1..]).unwrap(); - (id, task) + serde_json::from_str(&key[pos + 1..]).unwrap() } async fn migrate_v2_to_v3_queues(conn: &mut (impl ConnectionLike + Send + Sized)) -> Result<()> { @@ -308,7 +306,7 @@ async fn migrate_list_to_stream( let mut pipe = redis::pipe(); for key in legacy_keys { - let (_, task) = from_redis_key(&key); + let task = task_from_redis_key(&key); let _ = pipe.xadd( queue, GENERATE_STREAM_ID, @@ -382,25 +380,18 @@ async fn migrate_sset( #[cfg(test)] pub mod tests { - - use std::{/* sync::Arc, */ time::Duration}; + use std::time::Duration; use bb8_redis::RedisMultiplexedConnectionManager; - //use chrono::Utc; + use chrono::Utc; use redis::{streams::StreamReadReply, AsyncCommands as _}; - use super::{ - migrate_list, /* migrate_list_to_stream, */ migrate_sset, - new_pair_inner, /* to_redis_key, Direction, */ - }; + use super::{migrate_list, migrate_list_to_stream, migrate_sset, new_pair_inner, Direction}; use crate::{ cfg::{CacheType, Configuration}, core::types::{ApplicationId, EndpointId, MessageAttemptTriggerType, MessageId}, - queue::{ - /* Acker, */ MessageTask, QueueTask, TaskQueueConsumer, - /* TaskQueueDelivery, */ TaskQueueProducer, - }, + queue::{MessageTask, QueueTask, TaskQueueConsumer, TaskQueueProducer}, redis::RedisPool, }; @@ -579,7 +570,7 @@ pub mod tests { .del(&[ "{test}_ack", "{test}_ack_delayed", - "{test}_ack_delayed_lock", + "{test}_ack_delayed__lock", ]) .await .unwrap(); @@ -713,10 +704,14 @@ pub mod tests { recv1.ack().await.unwrap(); } - /* #[tokio::test] + fn to_redis_key(id: &str, task: &QueueTask) -> String { + format!("{id}|{}", serde_json::to_string(task).unwrap()) + } + + #[tokio::test] async fn test_migrations() { let cfg = crate::cfg::load().unwrap(); - let pool = get_pool(cfg).await; + let pool = get_pool(cfg.clone()).await; // Test queue name constants let v1_main = "{test}_migrations_main_v1"; @@ -729,8 +724,7 @@ pub mod tests { let v1_delayed = "{test}_migrations_delayed_v1"; let v2_delayed = "{test}_migrations_delayed_v2"; - let v2_delayed_lock = "{test}_migrations_delayed_lock_v2"; - // v3_delayed doesn not yet exist + // v3_delayed doesn't yet exist { let mut conn = pool.get().await.unwrap(); @@ -760,20 +754,16 @@ pub mod tests { let _: () = conn .rpush( v1_main, - to_redis_key(&TaskQueueDelivery { - id: num.to_string(), - task: Arc::new(QueueTask::MessageV1(MessageTask { + to_redis_key( + &num.to_string(), + &QueueTask::MessageV1(MessageTask { msg_id: MessageId(format!("TestMessageID{num}")), app_id: ApplicationId("TestApplicationID".to_owned()), endpoint_id: EndpointId("TestEndpointID".to_owned()), trigger_type: MessageAttemptTriggerType::Manual, attempt_count: 0, - })), - acker: Acker::Redis(Arc::new(RedisQueueInner { - pool: pool.clone(), - main_queue_name: v1_main.to_owned(), - })), - }), + }), + ), ) .await .unwrap(); @@ -783,20 +773,16 @@ pub mod tests { let _: () = conn .zadd( v1_delayed, - to_redis_key(&TaskQueueDelivery { - id: num.to_string(), - task: Arc::new(QueueTask::MessageV1(MessageTask { + to_redis_key( + &num.to_string(), + &QueueTask::MessageV1(MessageTask { msg_id: MessageId(format!("TestMessageID{num}")), app_id: ApplicationId("TestApplicationID".to_owned()), endpoint_id: EndpointId("TestEndpointID".to_owned()), trigger_type: MessageAttemptTriggerType::Manual, attempt_count: 0, - })), - acker: Acker::Redis(Arc::new(RedisQueueInner { - pool: pool.clone(), - main_queue_name: v1_main.to_owned(), - })), - }), + }), + ), Utc::now().timestamp() + 2, ) .await @@ -834,13 +820,13 @@ pub mod tests { } // Read - let (_p, mut c) = new_pair_inner( - pool, + let (_p, mut c) = new_pair_inner::( + cfg.redis_dsn.as_deref().unwrap(), + cfg.redis_pool_max_size, Duration::from_secs(5), "", v3_main, v2_delayed, - v2_delayed_lock, ) .await; @@ -889,5 +875,5 @@ pub mod tests { ); recv.ack().await.unwrap(); } - } */ + } }