Skip to content

Commit

Permalink
Upgrade omniqueue and related dependencies
Browse files Browse the repository at this point in the history
  • Loading branch information
svix-jplatte committed Jun 4, 2024
1 parent b8a39cb commit 152d6b0
Show file tree
Hide file tree
Showing 4 changed files with 52 additions and 49 deletions.
31 changes: 19 additions & 12 deletions bridge/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 3 additions & 3 deletions bridge/svix-bridge-plugin-queue/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
omniqueue = "0.2.0"
omniqueue = { git = "https://github.com/svix/omniqueue-rs", rev = "62ca8fa5cb0ac47bbfbad4b1939bcfe7d4cdfb6b" }
serde_json = "1.0"
serde = { version = "1.0", features = ["derive"] }
svix-bridge-types = { path = "../svix-bridge-types" }
Expand All @@ -20,8 +20,8 @@ aws-config = "1.1.5"
aws-sdk-sqs = "1.13.0"
fastrand = "2.0.1"
google-cloud-googleapis = "0.12.0"
google-cloud-pubsub = "0.23.0"
google-cloud-pubsub = "0.24.0"
lapin = "2"
redis = { version = "0.24.0", features = ["tokio-comp", "streams"] }
redis = { version = "0.25.4", features = ["tokio-comp", "streams"] }
tracing-subscriber = "0.3"
wiremock = "0.5.18"
58 changes: 27 additions & 31 deletions bridge/svix-bridge-plugin-queue/src/redis/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,21 +42,19 @@ pub async fn consumer(cfg: &RedisInputOpts) -> Result<DynConsumer> {
.unwrap_or_else(|| format!("{}_delays", cfg.queue_key));
let delayed_lock_key = format!("{delayed_queue_key}_lock");

backends::RedisBackend::<backends::redis::RedisMultiplexedConnectionManager>::builder(
backends::RedisConfig {
dsn: cfg.dsn.clone(),
max_connections: cfg.max_connections,
reinsert_on_nack: cfg.reinsert_on_nack,
queue_key: cfg.queue_key.clone(),
delayed_queue_key,
delayed_lock_key,
consumer_group: cfg.consumer_group.clone(),
consumer_name: cfg.consumer_name.clone(),
// FIXME: expose in config?
payload_key: "payload".to_string(),
ack_deadline_ms: cfg.ack_deadline_ms,
},
)
backends::RedisBackend::builder(backends::RedisConfig {
dsn: cfg.dsn.clone(),
max_connections: cfg.max_connections,
reinsert_on_nack: cfg.reinsert_on_nack,
queue_key: cfg.queue_key.clone(),
delayed_queue_key,
delayed_lock_key,
consumer_group: cfg.consumer_group.clone(),
consumer_name: cfg.consumer_name.clone(),
// FIXME: expose in config?
payload_key: "payload".to_string(),
ack_deadline_ms: cfg.ack_deadline_ms,
})
.make_dynamic()
.build_consumer()
.await
Expand All @@ -69,22 +67,20 @@ pub async fn producer(cfg: &RedisOutputOpts) -> Result<DynProducer> {
.unwrap_or_else(|| format!("{}_delays", cfg.queue_key));
let delayed_lock_key = format!("{delayed_queue_key}_lock");

backends::RedisBackend::<backends::redis::RedisMultiplexedConnectionManager>::builder(
backends::RedisConfig {
dsn: cfg.dsn.clone(),
max_connections: cfg.max_connections,
queue_key: cfg.queue_key.clone(),
delayed_queue_key,
delayed_lock_key,
// FIXME: expose in config?
payload_key: "payload".to_string(),
// consumer stuff we don't care about.
reinsert_on_nack: false,
consumer_group: String::new(),
consumer_name: String::new(),
ack_deadline_ms: cfg.ack_deadline_ms,
},
)
backends::RedisBackend::builder(backends::RedisConfig {
dsn: cfg.dsn.clone(),
max_connections: cfg.max_connections,
queue_key: cfg.queue_key.clone(),
delayed_queue_key,
delayed_lock_key,
// FIXME: expose in config?
payload_key: "payload".to_string(),
// consumer stuff we don't care about.
reinsert_on_nack: false,
consumer_group: String::new(),
consumer_name: String::new(),
ack_deadline_ms: cfg.ack_deadline_ms,
})
.make_dynamic()
.build_producer()
.await
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ async fn create_test_stream(client: &Client) -> String {
.take(8)
.collect();

let mut conn = client.get_async_connection().await.unwrap();
let mut conn = client.get_multiplexed_async_connection().await.unwrap();

let _: () = conn
.xgroup_create_mkstream(&name, "test_cg", 0i8)
Expand All @@ -70,12 +70,12 @@ async fn create_test_stream(client: &Client) -> String {
}

async fn delete_test_stream(client: &Client, key: &str) {
let mut conn = client.get_async_connection().await.unwrap();
let mut conn = client.get_multiplexed_async_connection().await.unwrap();
let _: () = conn.del(key).await.unwrap();
}

async fn publish(client: &Client, key: &str, payload: &str) {
let mut conn = client.get_async_connection().await.unwrap();
let mut conn = client.get_multiplexed_async_connection().await.unwrap();
// N.b. the redis code relies on the messages being json with a `payload` key in there.
// The `payload` key can be any valid JSON value.
let _: () = conn.xadd(key, "*", &[("payload", payload)]).await.unwrap();
Expand Down

0 comments on commit 152d6b0

Please sign in to comment.