Skip to content

Commit

Permalink
server: Upgrade omniqueue
Browse files Browse the repository at this point in the history
  • Loading branch information
svix-jplatte committed Feb 29, 2024
1 parent 1d1698a commit 09bdb29
Show file tree
Hide file tree
Showing 4 changed files with 15 additions and 23 deletions.
10 changes: 5 additions & 5 deletions server/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion server/svix-server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ urlencoding = "2.1.2"
form_urlencoded = "1.1.0"
lapin = "2.1.1"
sentry = { version = "0.32.2", features = ["tracing"] }
omniqueue = { git = "https://github.com/svix/omniqueue-rs.git", rev = "648f43f0778d02115df2d91b1a961519a62f42a7", default-features = false, features = ["memory_queue", "rabbitmq"] }
omniqueue = { git = "https://github.com/svix/omniqueue-rs.git", rev = "044fc688670759859f193bb646fac043bbfd08ba", default-features = false, features = ["in_memory", "rabbitmq"] }

[target.'cfg(not(target_env = "msvc"))'.dependencies]
tikv-jemallocator = { version = "0.5", optional = true }
Expand Down
16 changes: 5 additions & 11 deletions server/svix-server/src/queue/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,7 @@ use std::{sync::Arc, time::Duration};
use axum::async_trait;
use chrono::{DateTime, Utc};
use omniqueue::{
backends::memory_queue::MemoryQueueBackend,
queue::{
consumer::{DynConsumer, QueueConsumer},
producer::QueueProducer,
Delivery, QueueBackend as _,
},
scheduled::ScheduledProducer,
backends::InMemoryBackend, Delivery, DynConsumer, QueueConsumer, ScheduledQueueProducer,
};
use serde::{Deserialize, Serialize};
use svix_ksuid::*;
Expand Down Expand Up @@ -53,14 +47,14 @@ pub async fn new_pair(
redis::new_pair(pool, prefix).await
}
QueueBackend::Memory => {
let (producer, consumer) = MemoryQueueBackend::builder(())
let (producer, consumer) = InMemoryBackend::builder()
.build_pair()
.await
.expect("building in-memory queue can't fail");

(
TaskQueueProducer::Omni(Arc::new(producer.into_dyn_scheduled(Default::default()))),
TaskQueueConsumer::Omni(consumer.into_dyn(Default::default())),
TaskQueueProducer::Omni(Arc::new(producer.into_dyn_scheduled())),
TaskQueueConsumer::Omni(consumer.into_dyn()),
)
}
QueueBackend::RabbitMq(dsn) => {
Expand Down Expand Up @@ -150,7 +144,7 @@ impl QueueTask {
#[derive(Clone)]
pub enum TaskQueueProducer {
Redis(RedisQueueProducer),
Omni(Arc<omniqueue::scheduled::DynScheduledProducer>),
Omni(Arc<omniqueue::DynScheduledQueueProducer>),
}

impl TaskQueueProducer {
Expand Down
10 changes: 4 additions & 6 deletions server/svix-server/src/queue/rabbitmq.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,8 @@ use lapin::{
ConnectionProperties,
};
use omniqueue::{
backends::rabbitmq::{RabbitMqBackend, RabbitMqConfig},
queue::{consumer::QueueConsumer, QueueBackend},
scheduled::ScheduledProducer,
backends::{RabbitMqBackend, RabbitMqConfig},
QueueConsumer, ScheduledQueueProducer,
};
use svix_ksuid::{KsuidLike, KsuidMs};

Expand Down Expand Up @@ -86,9 +85,8 @@ pub async fn new_pair(
.await
.expect("Error initializing rabbitmq queue");

let producer =
TaskQueueProducer::Omni(Arc::new(producer.into_dyn_scheduled(Default::default())));
let consumer = TaskQueueConsumer::Omni(consumer.into_dyn(Default::default()));
let producer = TaskQueueProducer::Omni(Arc::new(producer.into_dyn_scheduled()));
let consumer = TaskQueueConsumer::Omni(consumer.into_dyn());

Ok((producer, consumer))
}
Expand Down

0 comments on commit 09bdb29

Please sign in to comment.