Skip to content

Commit

Permalink
bridge: More config cleanup (#1330)
Browse files Browse the repository at this point in the history
Follow-up to #1327 / further cleanup before adding kafka because the
things were still plenty confusing.
  • Loading branch information
svix-jplatte authored Jun 6, 2024
2 parents 333f591 + 61c22cd commit ef04e51
Show file tree
Hide file tree
Showing 14 changed files with 155 additions and 164 deletions.
104 changes: 48 additions & 56 deletions bridge/svix-bridge-plugin-queue/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,45 +12,38 @@ pub use crate::{
sqs::{SqsInputOpts, SqsOutputOpts},
};

#[derive(Deserialize)]
pub struct QueueSenderConfig {
pub name: String,
pub input: SenderInputOpts,
#[serde(default)]
pub transformation: Option<TransformationConfig>,
pub output: SenderOutputOpts,
}

impl QueueSenderConfig {
pub fn into_sender_input(self) -> Result<Box<dyn SenderInput>, &'static str> {
// FIXME: see if this check is still needed. String transforms worked for the omniqueue redis receiver, I think?
if matches!(self.input, SenderInputOpts::Redis(_))
&& self
.transformation
.as_ref()
.map(|t| t.format() != TransformerInputFormat::Json)
.unwrap_or_default()
{
return Err("redis only supports json formatted transformations");
}

Ok(Box::new(QueueSender::new(
self.name,
self.input,
self.transformation,
self.output,
)))
pub fn into_sender_input(
name: String,
input_opts: QueueInputOpts,
transformation: Option<TransformationConfig>,
output: SenderOutputOpts,
) -> Result<Box<dyn SenderInput>, &'static str> {
// FIXME: see if this check is still needed. String transforms worked for the omniqueue redis receiver, I think?
if matches!(input_opts, QueueInputOpts::Redis(_))
&& transformation
.as_ref()
.map(|t| t.format() != TransformerInputFormat::Json)
.unwrap_or_default()
{
return Err("redis only supports json formatted transformations");
}

Ok(Box::new(QueueSender::new(
name,
input_opts,
transformation,
output,
)))
}

pub async fn into_receiver_output(
name: String,
opts: ReceiverOutputOpts,
opts: QueueOutputOpts,
// Annoying to have to pass this, but certain backends (redis) only work with certain transformations (json).
transformation: Option<&TransformationConfig>,
) -> Result<Box<dyn ReceiverOutput>, crate::Error> {
// FIXME: see if this check is still needed. String transforms worked for the omniqueue redis receiver, I think?
if matches!(opts, ReceiverOutputOpts::Redis(_))
if matches!(opts, QueueOutputOpts::Redis(_))
&& transformation
.as_ref()
.map(|t| t.format() != TransformerInputFormat::Json)
Expand All @@ -68,7 +61,7 @@ pub async fn into_receiver_output(
// TODO: feature flag the variants, thread the features down through to generic-queue
#[derive(Debug, Deserialize)]
#[serde(tag = "type", rename_all = "lowercase")]
pub enum SenderInputOpts {
pub enum QueueInputOpts {
#[serde(rename = "gcp-pubsub")]
GCPPubSub(GCPPubSubInputOpts),
RabbitMQ(RabbitMqInputOpts),
Expand All @@ -78,7 +71,7 @@ pub enum SenderInputOpts {

#[derive(Clone, Debug, Deserialize)]
#[serde(tag = "type", rename_all = "lowercase")]
pub enum ReceiverOutputOpts {
pub enum QueueOutputOpts {
#[serde(rename = "gcp-pubsub")]
GCPPubSub(GCPPubSubOutputOpts),
RabbitMQ(RabbitMqOutputOpts),
Expand All @@ -92,51 +85,50 @@ mod tests {
SenderOutputOpts, SvixSenderOutputOpts, TransformationConfig, TransformerInputFormat,
};

use super::{into_receiver_output, QueueSenderConfig};
use super::{into_receiver_output, into_sender_input};
use crate::{
config::{ReceiverOutputOpts, SenderInputOpts},
config::{QueueInputOpts, QueueOutputOpts},
redis::{RedisInputOpts, RedisOutputOpts},
};

// FIXME: can't support raw payload access for redis because it requires JSON internally.
// Revisit after `omniqueue` adoption.
#[test]
fn redis_sender_with_string_transformation_is_err() {
let cfg = QueueSenderConfig {
name: "redis-with-string-transformation".to_string(),
input: SenderInputOpts::Redis(RedisInputOpts {
dsn: "".to_string(),
max_connections: 0,
reinsert_on_nack: false,
queue_key: "".to_string(),
delayed_queue_key: None,
consumer_group: "".to_string(),
consumer_name: "".to_string(),
ack_deadline_ms: 2_000,
}),
transformation: Some(TransformationConfig::Explicit {
let input_opts = QueueInputOpts::Redis(RedisInputOpts {
dsn: "".to_string(),
max_connections: 0,
reinsert_on_nack: false,
queue_key: "".to_string(),
delayed_queue_key: None,
consumer_group: "".to_string(),
consumer_name: "".to_string(),
ack_deadline_ms: 2_000,
});

let err = into_sender_input(
"redis-with-string-transformation".to_owned(),
input_opts,
Some(TransformationConfig::Explicit {
format: TransformerInputFormat::String,
src: String::new(),
}),
output: SenderOutputOpts::Svix(SvixSenderOutputOpts {
SenderOutputOpts::Svix(SvixSenderOutputOpts {
token: "".to_string(),
options: None,
}),
};

assert_eq!(
cfg.into_sender_input()
.err()
.expect("invalid config didn't result in error"),
"redis only supports json formatted transformations"
)
.err()
.expect("invalid config didn't result in error");

assert_eq!(err, "redis only supports json formatted transformations")
}

// FIXME: can't support raw payload access for redis because it requires JSON internally.
// Revisit after `omniqueue` adoption.
#[tokio::test]
async fn test_redis_receiver_string_transform_is_err() {
let redis_out = ReceiverOutputOpts::Redis(RedisOutputOpts {
let redis_out = QueueOutputOpts::Redis(RedisOutputOpts {
dsn: "".to_string(),
max_connections: 0,
queue_key: "".to_string(),
Expand Down
3 changes: 2 additions & 1 deletion bridge/svix-bridge-plugin-queue/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ mod redis;
pub mod sender_input;
mod sqs;

use error::Error;
pub use self::config::{into_receiver_output, into_sender_input};
use self::error::Error;

/// Newtype for [`omniqueue::queue::Delivery`].
///
Expand Down
12 changes: 6 additions & 6 deletions bridge/svix-bridge-plugin-queue/src/receiver_output/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::sync::Arc;
use omniqueue::DynProducer;
use svix_bridge_types::{async_trait, ForwardRequest, ReceiverOutput};

use crate::{config::ReceiverOutputOpts, error::Result};
use crate::{config::QueueOutputOpts, error::Result};

#[derive(Clone)]
pub struct QueueForwarder {
Expand All @@ -16,13 +16,13 @@ pub struct QueueForwarder {
impl QueueForwarder {
pub async fn from_receiver_output_opts(
name: String,
opts: ReceiverOutputOpts,
opts: QueueOutputOpts,
) -> Result<QueueForwarder> {
let sender = match opts {
ReceiverOutputOpts::GCPPubSub(cfg) => crate::gcp_pubsub::producer(&cfg).await?,
ReceiverOutputOpts::RabbitMQ(cfg) => crate::rabbitmq::producer(&cfg).await?,
ReceiverOutputOpts::Redis(cfg) => crate::redis::producer(&cfg).await?,
ReceiverOutputOpts::SQS(cfg) => crate::sqs::producer(&cfg).await?,
QueueOutputOpts::GCPPubSub(cfg) => crate::gcp_pubsub::producer(&cfg).await?,
QueueOutputOpts::RabbitMQ(cfg) => crate::rabbitmq::producer(&cfg).await?,
QueueOutputOpts::Redis(cfg) => crate::redis::producer(&cfg).await?,
QueueOutputOpts::SQS(cfg) => crate::sqs::producer(&cfg).await?,
};
Ok(QueueForwarder {
name,
Expand Down
36 changes: 17 additions & 19 deletions bridge/svix-bridge-plugin-queue/src/sender_input/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,13 @@ use svix_bridge_types::{
TransformerTx,
};

use crate::{
config::SenderInputOpts, error::Error, gcp_pubsub, rabbitmq, run_inner, sqs, Consumer,
};
use crate::{config::QueueInputOpts, error::Error, gcp_pubsub, rabbitmq, run_inner, sqs, Consumer};

pub struct QueueSender {
name: String,
source: String,
system: String,
input_opts: SenderInputOpts,
input_opts: QueueInputOpts,
transformation: Option<TransformationConfig>,
transformer_tx: Option<TransformerTx>,
svix_client: Svix,
Expand All @@ -24,28 +22,28 @@ impl std::fmt::Debug for QueueSender {
}
}

fn system_name(opts: &SenderInputOpts) -> &'static str {
fn system_name(opts: &QueueInputOpts) -> &'static str {
match opts {
SenderInputOpts::GCPPubSub(_) => "gcp-pubsub",
SenderInputOpts::RabbitMQ(_) => "rabbitmq",
SenderInputOpts::Redis(_) => "redis",
SenderInputOpts::SQS(_) => "sqs",
QueueInputOpts::GCPPubSub(_) => "gcp-pubsub",
QueueInputOpts::RabbitMQ(_) => "rabbitmq",
QueueInputOpts::Redis(_) => "redis",
QueueInputOpts::SQS(_) => "sqs",
}
}

fn source_name(opts: &SenderInputOpts) -> &str {
fn source_name(opts: &QueueInputOpts) -> &str {
match opts {
SenderInputOpts::GCPPubSub(opts) => &opts.subscription_id,
SenderInputOpts::RabbitMQ(opts) => &opts.queue_name,
SenderInputOpts::Redis(opts) => &opts.queue_key,
SenderInputOpts::SQS(opts) => &opts.queue_dsn,
QueueInputOpts::GCPPubSub(opts) => &opts.subscription_id,
QueueInputOpts::RabbitMQ(opts) => &opts.queue_name,
QueueInputOpts::Redis(opts) => &opts.queue_key,
QueueInputOpts::SQS(opts) => &opts.queue_dsn,
}
}

impl QueueSender {
pub fn new(
name: String,
input: SenderInputOpts,
input: QueueInputOpts,
transformation: Option<TransformationConfig>,
output: SenderOutputOpts,
) -> Self {
Expand Down Expand Up @@ -89,10 +87,10 @@ impl Consumer for QueueSender {

async fn consumer(&self) -> std::io::Result<DynConsumer> {
Ok(match &self.input_opts {
SenderInputOpts::GCPPubSub(cfg) => gcp_pubsub::consumer(cfg).await,
SenderInputOpts::RabbitMQ(cfg) => rabbitmq::consumer(cfg).await,
SenderInputOpts::Redis(cfg) => crate::redis::consumer(cfg).await,
SenderInputOpts::SQS(cfg) => sqs::consumer(cfg).await,
QueueInputOpts::GCPPubSub(cfg) => gcp_pubsub::consumer(cfg).await,
QueueInputOpts::RabbitMQ(cfg) => rabbitmq::consumer(cfg).await,
QueueInputOpts::Redis(cfg) => crate::redis::consumer(cfg).await,
QueueInputOpts::SQS(cfg) => sqs::consumer(cfg).await,
}
.map_err(Error::from)?)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use google_cloud_pubsub::{
};
use serde_json::json;
use svix_bridge_plugin_queue::{
config::{GCPPubSubInputOpts, SenderInputOpts},
config::{GCPPubSubInputOpts, QueueInputOpts},
sender_input::QueueSender,
};
use svix_bridge_types::{
Expand All @@ -35,7 +35,7 @@ fn get_test_plugin(
) -> QueueSender {
QueueSender::new(
"test".into(),
SenderInputOpts::GCPPubSub(GCPPubSubInputOpts {
QueueInputOpts::GCPPubSub(GCPPubSubInputOpts {
subscription_id,
credentials_file: None,
}),
Expand Down
4 changes: 2 additions & 2 deletions bridge/svix-bridge-plugin-queue/tests/it/rabbitmq_consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use lapin::{
};
use serde_json::json;
use svix_bridge_plugin_queue::{
config::{RabbitMqInputOpts, SenderInputOpts},
config::{QueueInputOpts, RabbitMqInputOpts},
sender_input::QueueSender,
};
use svix_bridge_types::{
Expand All @@ -31,7 +31,7 @@ fn get_test_plugin(
) -> QueueSender {
QueueSender::new(
"test".into(),
SenderInputOpts::RabbitMQ(RabbitMqInputOpts {
QueueInputOpts::RabbitMQ(RabbitMqInputOpts {
uri: mq_uri.to_string(),
queue_name: queue_name.to_string(),
consumer_tag: None,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use std::time::Duration;
use redis::{AsyncCommands, Client};
use serde_json::json;
use svix_bridge_plugin_queue::{
config::{RedisInputOpts, SenderInputOpts},
config::{QueueInputOpts, RedisInputOpts},
sender_input::QueueSender,
};
use svix_bridge_types::{
Expand All @@ -26,7 +26,7 @@ fn get_test_plugin(
) -> QueueSender {
QueueSender::new(
"test".into(),
SenderInputOpts::Redis(RedisInputOpts {
QueueInputOpts::Redis(RedisInputOpts {
dsn: "redis://localhost/".to_owned(),
max_connections: 8,
reinsert_on_nack: false,
Expand Down
4 changes: 2 additions & 2 deletions bridge/svix-bridge-plugin-queue/tests/it/sqs_consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use std::time::Duration;
use aws_sdk_sqs::Client;
use serde_json::json;
use svix_bridge_plugin_queue::{
config::{SenderInputOpts, SqsInputOpts},
config::{QueueInputOpts, SqsInputOpts},
sender_input::QueueSender,
};
use svix_bridge_types::{
Expand All @@ -35,7 +35,7 @@ fn get_test_plugin(
) -> QueueSender {
QueueSender::new(
"test".into(),
SenderInputOpts::SQS(SqsInputOpts {
QueueInputOpts::SQS(SqsInputOpts {
queue_dsn,
override_endpoint: true,
}),
Expand Down
10 changes: 2 additions & 8 deletions bridge/svix-bridge/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ serde = { version = "1", features = ["derive"] }
serde_json = "1"
serde_yaml = "0.9"
svix-ksuid = "0.7.0"
svix-bridge-plugin-queue = { optional=true, path = "../svix-bridge-plugin-queue" }
svix-bridge-plugin-queue = { path = "../svix-bridge-plugin-queue" }
svix-bridge-types = { path = "../svix-bridge-types" }
tokio = { version = "1", features = ["full"] }
tracing = "0.1"
Expand All @@ -41,11 +41,5 @@ chrono = "0.4"
tower = "0.4"

[features]
default = ["gcp-pubsub", "rabbitmq", "redis", "sqs", "jemalloc"]

gcp-pubsub = ["generic-queue"]
generic-queue = ["dep:svix-bridge-plugin-queue"]
rabbitmq = ["generic-queue"]
redis = ["generic-queue"]
sqs = ["generic-queue"]
default = ["jemalloc"]
jemalloc = ["tikv-jemallocator", "tikv-jemalloc-ctl"]
Loading

0 comments on commit ef04e51

Please sign in to comment.