Skip to content

Commit

Permalink
fixup! WIP: Replace own redis queue implementation by omniqueue
Browse files Browse the repository at this point in the history
  • Loading branch information
svix-jplatte committed Feb 26, 2024
1 parent 1424f02 commit 2ab5553
Showing 1 changed file with 29 additions and 43 deletions.
72 changes: 29 additions & 43 deletions server/svix-server/src/queue/redis.rs
Original file line number Diff line number Diff line change
Expand Up @@ -270,12 +270,10 @@ impl ToRedisArgs for Direction {
}
}

fn from_redis_key(key: &str) -> (String, Arc<QueueTask>) {
fn task_from_redis_key(key: &str) -> Arc<QueueTask> {
// Get the first delimiter -> it has to have the |
let pos = key.find('|').unwrap();
let id = key[..pos].to_string();
let task = serde_json::from_str(&key[pos + 1..]).unwrap();
(id, task)
serde_json::from_str(&key[pos + 1..]).unwrap()
}

async fn migrate_v2_to_v3_queues(conn: &mut (impl ConnectionLike + Send + Sized)) -> Result<()> {
Expand Down Expand Up @@ -308,7 +306,7 @@ async fn migrate_list_to_stream(

let mut pipe = redis::pipe();
for key in legacy_keys {
let (_, task) = from_redis_key(&key);
let task = task_from_redis_key(&key);
let _ = pipe.xadd(
queue,
GENERATE_STREAM_ID,
Expand Down Expand Up @@ -382,25 +380,18 @@ async fn migrate_sset(

#[cfg(test)]
pub mod tests {

use std::{/* sync::Arc, */ time::Duration};
use std::time::Duration;

use bb8_redis::RedisMultiplexedConnectionManager;
//use chrono::Utc;
use chrono::Utc;
use redis::{streams::StreamReadReply, AsyncCommands as _};

use super::{
migrate_list, /* migrate_list_to_stream, */ migrate_sset,
new_pair_inner, /* to_redis_key, Direction, */
};
use super::{migrate_list, migrate_list_to_stream, migrate_sset, new_pair_inner, Direction};

use crate::{
cfg::{CacheType, Configuration},
core::types::{ApplicationId, EndpointId, MessageAttemptTriggerType, MessageId},
queue::{
/* Acker, */ MessageTask, QueueTask, TaskQueueConsumer,
/* TaskQueueDelivery, */ TaskQueueProducer,
},
queue::{MessageTask, QueueTask, TaskQueueConsumer, TaskQueueProducer},
redis::RedisPool,
};

Expand Down Expand Up @@ -579,7 +570,7 @@ pub mod tests {
.del(&[
"{test}_ack",
"{test}_ack_delayed",
"{test}_ack_delayed_lock",
"{test}_ack_delayed__lock",
])
.await
.unwrap();
Expand Down Expand Up @@ -713,10 +704,14 @@ pub mod tests {
recv1.ack().await.unwrap();
}

/* #[tokio::test]
fn to_redis_key(id: &str, task: &QueueTask) -> String {
format!("{id}|{}", serde_json::to_string(task).unwrap())
}

#[tokio::test]
async fn test_migrations() {
let cfg = crate::cfg::load().unwrap();
let pool = get_pool(cfg).await;
let pool = get_pool(cfg.clone()).await;

// Test queue name constants
let v1_main = "{test}_migrations_main_v1";
Expand All @@ -729,8 +724,7 @@ pub mod tests {

let v1_delayed = "{test}_migrations_delayed_v1";
let v2_delayed = "{test}_migrations_delayed_v2";
let v2_delayed_lock = "{test}_migrations_delayed_lock_v2";
// v3_delayed doesn not yet exist
// v3_delayed doesn't yet exist

{
let mut conn = pool.get().await.unwrap();
Expand Down Expand Up @@ -760,20 +754,16 @@ pub mod tests {
let _: () = conn
.rpush(
v1_main,
to_redis_key(&TaskQueueDelivery {
id: num.to_string(),
task: Arc::new(QueueTask::MessageV1(MessageTask {
to_redis_key(
&num.to_string(),
&QueueTask::MessageV1(MessageTask {
msg_id: MessageId(format!("TestMessageID{num}")),
app_id: ApplicationId("TestApplicationID".to_owned()),
endpoint_id: EndpointId("TestEndpointID".to_owned()),
trigger_type: MessageAttemptTriggerType::Manual,
attempt_count: 0,
})),
acker: Acker::Redis(Arc::new(RedisQueueInner {
pool: pool.clone(),
main_queue_name: v1_main.to_owned(),
})),
}),
}),
),
)
.await
.unwrap();
Expand All @@ -783,20 +773,16 @@ pub mod tests {
let _: () = conn
.zadd(
v1_delayed,
to_redis_key(&TaskQueueDelivery {
id: num.to_string(),
task: Arc::new(QueueTask::MessageV1(MessageTask {
to_redis_key(
&num.to_string(),
&QueueTask::MessageV1(MessageTask {
msg_id: MessageId(format!("TestMessageID{num}")),
app_id: ApplicationId("TestApplicationID".to_owned()),
endpoint_id: EndpointId("TestEndpointID".to_owned()),
trigger_type: MessageAttemptTriggerType::Manual,
attempt_count: 0,
})),
acker: Acker::Redis(Arc::new(RedisQueueInner {
pool: pool.clone(),
main_queue_name: v1_main.to_owned(),
})),
}),
}),
),
Utc::now().timestamp() + 2,
)
.await
Expand Down Expand Up @@ -834,13 +820,13 @@ pub mod tests {
}

// Read
let (_p, mut c) = new_pair_inner(
pool,
let (_p, mut c) = new_pair_inner::<RedisMultiplexedConnectionManager>(
cfg.redis_dsn.as_deref().unwrap(),
cfg.redis_pool_max_size,
Duration::from_secs(5),
"",
v3_main,
v2_delayed,
v2_delayed_lock,
)
.await;

Expand Down Expand Up @@ -889,5 +875,5 @@ pub mod tests {
);
recv.ack().await.unwrap();
}
} */
}
}

0 comments on commit 2ab5553

Please sign in to comment.