Skip to content

Commit

Permalink
WIP: redis omniqueue
Browse files Browse the repository at this point in the history
  • Loading branch information
svix-jplatte committed Feb 28, 2024
1 parent d2331f7 commit ef46b79
Show file tree
Hide file tree
Showing 4 changed files with 153 additions and 491 deletions.
18 changes: 17 additions & 1 deletion server/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion server/svix-server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ urlencoding = "2.1.2"
form_urlencoded = "1.1.0"
lapin = "2.1.1"
sentry = { version = "0.32.2", features = ["tracing"] }
omniqueue = { git = "https://github.com/svix/omniqueue-rs.git", rev = "044fc688670759859f193bb646fac043bbfd08ba", default-features = false, features = ["in_memory", "rabbitmq"] }
omniqueue = { git = "https://github.com/svix/omniqueue-rs.git", rev = "044fc688670759859f193bb646fac043bbfd08ba", default-features = false, features = ["in_memory", "rabbitmq", "redis_cluster"] }

[target.'cfg(not(target_env = "msvc"))'.dependencies]
tikv-jemallocator = { version = "0.5", optional = true }
Expand Down
34 changes: 5 additions & 29 deletions server/svix-server/src/queue/mod.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
use std::{sync::Arc, time::Duration};

use axum::async_trait;
use chrono::{DateTime, Utc};
use omniqueue::{
backends::InMemoryBackend, Delivery, DynConsumer, QueueConsumer, ScheduledQueueProducer,
backends::InMemoryBackend, Delivery, DynConsumer, DynScheduledQueueProducer, QueueConsumer,
ScheduledQueueProducer,
};
use serde::{Deserialize, Serialize};
use svix_ksuid::*;

use crate::error::Traceable;
use crate::{
Expand All @@ -18,8 +17,6 @@ use crate::{
error::{Error, ErrorType, Result},
};

use self::redis::{RedisQueueConsumer, RedisQueueInner, RedisQueueProducer};

pub mod rabbitmq;
pub mod redis;

Expand All @@ -38,13 +35,9 @@ pub async fn new_pair(
prefix: Option<&str>,
) -> (TaskQueueProducer, TaskQueueConsumer) {
match cfg.queue_backend() {
QueueBackend::Redis(dsn) => {
let pool = crate::redis::new_redis_pool(dsn, cfg).await;
redis::new_pair(pool, prefix).await
}
QueueBackend::Redis(dsn) => redis::new_pair(dsn, cfg.redis_pool_max_size, prefix).await,
QueueBackend::RedisCluster(dsn) => {
let pool = crate::redis::new_redis_pool_clustered(dsn, cfg).await;
redis::new_pair(pool, prefix).await
redis::new_cluster_pair(dsn, cfg.redis_pool_max_size, prefix).await
}
QueueBackend::Memory => {
let (producer, consumer) = InMemoryBackend::builder()
Expand Down Expand Up @@ -143,8 +136,7 @@ impl QueueTask {

#[derive(Clone)]
pub enum TaskQueueProducer {
Redis(RedisQueueProducer),
Omni(Arc<omniqueue::DynScheduledQueueProducer>),
Omni(Arc<DynScheduledQueueProducer>),
}

impl TaskQueueProducer {
Expand All @@ -153,7 +145,6 @@ impl TaskQueueProducer {
run_with_retries(
|| async {
match self {
TaskQueueProducer::Redis(q) => q.send(task.clone(), delay).await,
TaskQueueProducer::Omni(q) => if let Some(delay) = delay {
q.send_serde_json_scheduled(task.as_ref(), delay).await
} else {
Expand All @@ -170,14 +161,12 @@ impl TaskQueueProducer {
}

pub enum TaskQueueConsumer {
Redis(RedisQueueConsumer),
Omni(DynConsumer),
}

impl TaskQueueConsumer {
pub async fn receive_all(&mut self) -> Result<Vec<TaskQueueDelivery>> {
match self {
TaskQueueConsumer::Redis(q) => q.receive_all().await.trace(),
TaskQueueConsumer::Omni(q) => {
const MAX_MESSAGES: usize = 128;
// FIXME(onelson): need to figure out what deadline/duration to use here
Expand All @@ -196,7 +185,6 @@ impl TaskQueueConsumer {
/// Used by TaskQueueDeliveries to Ack/Nack itself
#[derive(Debug)]
enum Acker {
Redis(Arc<RedisQueueInner>),
Omni(Delivery),
}

Expand All @@ -208,16 +196,6 @@ pub struct TaskQueueDelivery {
}

impl TaskQueueDelivery {
/// The `timestamp` is when this message will be delivered at
fn from_arc(task: Arc<QueueTask>, timestamp: Option<DateTime<Utc>>, acker: Acker) -> Self {
let ksuid = KsuidMs::new(timestamp, None);
Self {
id: ksuid.to_string(),
task,
acker,
}
}

pub async fn ack(self) -> Result<()> {
tracing::trace!("ack {}", self.id);

Expand All @@ -230,7 +208,6 @@ impl TaskQueueDelivery {
.as_ref()
.expect("acker is always Some when trying to ack");
match acker_ref {
Acker::Redis(q) => q.ack(&self.id, &self.task).await.trace(),
Acker::Omni(_) => match acker.take() {
Some(Acker::Omni(delivery)) => {
delivery.ack().await.map_err(|(e, delivery)| {
Expand Down Expand Up @@ -263,7 +240,6 @@ impl TaskQueueDelivery {
.as_ref()
.expect("acker is always Some when trying to ack");
match acker_ref {
Acker::Redis(q) => q.nack(&self.id, &self.task).await.trace(),
Acker::Omni(_) => match acker.take() {
Some(Acker::Omni(delivery)) => {
delivery
Expand Down
Loading

0 comments on commit ef46b79

Please sign in to comment.