From b18a5c39574528d2b6c5bafca1cd63f1facda1fd Mon Sep 17 00:00:00 2001 From: Jonas Platte Date: Thu, 6 Jun 2024 12:01:21 +0200 Subject: [PATCH 1/3] bridge: Rename sender and receiver config and add some docs --- bridge/svix-bridge/src/config/mod.rs | 26 +++++++++++-------- bridge/svix-bridge/src/config/tests.rs | 10 +++---- .../src/webhook_receiver/config.rs | 4 +-- .../svix-bridge/src/webhook_receiver/mod.rs | 4 +-- .../svix-bridge/src/webhook_receiver/types.rs | 4 +-- 5 files changed, 26 insertions(+), 22 deletions(-) diff --git a/bridge/svix-bridge/src/config/mod.rs b/bridge/svix-bridge/src/config/mod.rs index 2b934dd51..c62d640ac 100644 --- a/bridge/svix-bridge/src/config/mod.rs +++ b/bridge/svix-bridge/src/config/mod.rs @@ -19,10 +19,12 @@ use tracing::Level; #[derive(Deserialize)] #[serde(deny_unknown_fields)] pub struct Config { + /// Config for reading messages from plugins and forwarding to Svix. #[serde(default)] - pub senders: Vec, + pub senders: Vec, + /// Config for receiving webhooks and forwarding them to plugins. #[serde(default)] - pub receivers: Vec, + pub receivers: Vec, /// The log level to run the service with. Supported: info, debug, trace #[serde(default)] pub log_level: LogLevel, @@ -141,9 +143,10 @@ pub enum LogFormat { Json, } +/// Config for reading messages from plugins and forwarding to Svix. #[derive(Deserialize)] #[serde(untagged)] -pub enum SenderConfig { +pub enum WebhookSenderConfig { #[cfg(any( feature = "gcp-pubsub", feature = "rabbitmq", @@ -153,22 +156,22 @@ pub enum SenderConfig { Queue(QueueSenderConfig), } -impl SenderConfig { +impl WebhookSenderConfig { pub fn name(&self) -> &str { match self { - SenderConfig::Queue(cfg) => &cfg.name, + WebhookSenderConfig::Queue(cfg) => &cfg.name, } } pub fn transformation(&self) -> Option<&TransformationConfig> { match self { - SenderConfig::Queue(cfg) => cfg.transformation.as_ref(), + WebhookSenderConfig::Queue(cfg) => cfg.transformation.as_ref(), } } } -impl TryFrom for Box { +impl TryFrom for Box { type Error = &'static str; - fn try_from(value: SenderConfig) -> Result { + fn try_from(value: WebhookSenderConfig) -> Result { match value { #[cfg(any( feature = "gcp-pubsub", @@ -176,13 +179,14 @@ impl TryFrom for Box { feature = "redis", feature = "sqs" ))] - SenderConfig::Queue(backend) => backend.into_sender_input(), + WebhookSenderConfig::Queue(backend) => backend.into_sender_input(), } } } +/// Config for receiving webhooks and forwarding them to plugins. #[derive(Deserialize)] -pub struct ReceiverConfig { +pub struct WebhookReceiverConfig { pub name: String, pub input: ReceiverInputOpts, #[serde(default)] @@ -202,7 +206,7 @@ pub enum ReceiverOut { Queue(QueueOutOpts), } -impl ReceiverConfig { +impl WebhookReceiverConfig { pub async fn into_receiver_output(self) -> std::io::Result> { match self.output { ReceiverOut::Queue(x) => { diff --git a/bridge/svix-bridge/src/config/tests.rs b/bridge/svix-bridge/src/config/tests.rs index 027650c34..7145c545e 100644 --- a/bridge/svix-bridge/src/config/tests.rs +++ b/bridge/svix-bridge/src/config/tests.rs @@ -4,7 +4,7 @@ use svix_bridge_plugin_queue::config::{QueueSenderConfig, RabbitMqInputOpts, Sen use svix_bridge_types::{SenderOutputOpts, SvixSenderOutputOpts}; use super::Config; -use crate::config::{LogFormat, LogLevel, SenderConfig}; +use crate::config::{LogFormat, LogLevel, WebhookSenderConfig}; /// This is meant to be a kitchen sink config, hitting as many possible /// configuration options as possible to ensure they parse correctly. @@ -241,7 +241,7 @@ receivers: #[test] fn test_sender_parses_ok() { - let conf: Result = serde_yaml::from_str( + let conf: Result = serde_yaml::from_str( r#" name: "from-rabbit-local-to-svix" input: @@ -260,7 +260,7 @@ output: #[test] fn test_senders_parses_ok() { - let conf: Result, _> = serde_yaml::from_str( + let conf: Result, _> = serde_yaml::from_str( r#" - name: "from-rabbit-local-to-svix" @@ -455,7 +455,7 @@ fn test_variable_substitution_repeated_lookups() { vars.insert(String::from("SVIX_TOKEN"), String::from("x")); let cfg = Config::from_src(src, Some(&vars)).unwrap(); - if let SenderConfig::Queue(QueueSenderConfig { + if let WebhookSenderConfig::Queue(QueueSenderConfig { input: SenderInputOpts::RabbitMQ(RabbitMqInputOpts { uri, queue_name, .. @@ -471,7 +471,7 @@ fn test_variable_substitution_repeated_lookups() { panic!("sender did not match expected pattern"); } - if let SenderConfig::Queue(QueueSenderConfig { + if let WebhookSenderConfig::Queue(QueueSenderConfig { input: SenderInputOpts::RabbitMQ(RabbitMqInputOpts { uri, queue_name, .. diff --git a/bridge/svix-bridge/src/webhook_receiver/config.rs b/bridge/svix-bridge/src/webhook_receiver/config.rs index d2d2b03c7..6e4ff5e90 100644 --- a/bridge/svix-bridge/src/webhook_receiver/config.rs +++ b/bridge/svix-bridge/src/webhook_receiver/config.rs @@ -1,7 +1,7 @@ use serde::Deserialize; use svix_bridge_types::{TransformationConfig, WebhookVerifier}; -use crate::config::ReceiverConfig; +use crate::config::WebhookReceiverConfig; /// The [`IntegrationConfig`] is the struct associated with a given [`IntegrationId`]. When the route /// associated with an [`IntegrationId`] receives a webhook, or any other HTTP request, then it will @@ -11,7 +11,7 @@ use crate::config::ReceiverConfig; #[derive(Deserialize)] #[allow(dead_code)] pub struct IntegrationConfig { - pub receiver_cfg: ReceiverConfig, + pub receiver_cfg: WebhookReceiverConfig, pub verification: WebhookVerifier, #[serde(default)] pub transformation: Option, diff --git a/bridge/svix-bridge/src/webhook_receiver/mod.rs b/bridge/svix-bridge/src/webhook_receiver/mod.rs index 6d48188d9..c93dae0c3 100644 --- a/bridge/svix-bridge/src/webhook_receiver/mod.rs +++ b/bridge/svix-bridge/src/webhook_receiver/mod.rs @@ -13,7 +13,7 @@ use svix_bridge_types::{ use tracing::instrument; use types::{IntegrationId, IntegrationState, InternalState, SerializableRequest, Unvalidated}; -use crate::{config::ReceiverConfig, webhook_receiver::types::SerializablePayload}; +use crate::{config::WebhookReceiverConfig, webhook_receiver::types::SerializablePayload}; mod config; mod types; @@ -33,7 +33,7 @@ fn router() -> Router { pub async fn run( listen_addr: SocketAddr, - routes: Vec, + routes: Vec, transformer_tx: TransformerTx, ) -> std::io::Result<()> { let state = InternalState::from_receiver_configs(routes, transformer_tx) diff --git a/bridge/svix-bridge/src/webhook_receiver/types.rs b/bridge/svix-bridge/src/webhook_receiver/types.rs index 4d430fb71..424b360f9 100644 --- a/bridge/svix-bridge/src/webhook_receiver/types.rs +++ b/bridge/svix-bridge/src/webhook_receiver/types.rs @@ -14,7 +14,7 @@ use svix_bridge_types::{ }; use super::verification::{NoVerifier, SvixVerifier, VerificationMethod, Verifier}; -use crate::config::ReceiverConfig; +use crate::config::WebhookReceiverConfig; #[derive(Clone)] /// The [`InternalState`] is passed to the Axum route and is used to map the "IntegrationId" in the @@ -50,7 +50,7 @@ impl InternalState { } pub async fn from_receiver_configs( - routes: Vec, + routes: Vec, transformer_tx: TransformerTx, ) -> Result { let mut state_map = HashMap::new(); From 84269420240d640266776a3169f69721834ddfda Mon Sep 17 00:00:00 2001 From: Jonas Platte Date: Thu, 6 Jun 2024 12:03:48 +0200 Subject: [PATCH 2/3] bridge: Remove incomplete conditional compilation for queue plugin There were a lot of cfg attributes missing for the codebase to compile without one of the queue features, and they didn't actually independently toggle different queue support either. --- bridge/svix-bridge/Cargo.toml | 10 ++-------- bridge/svix-bridge/src/config/mod.rs | 18 ------------------ 2 files changed, 2 insertions(+), 26 deletions(-) diff --git a/bridge/svix-bridge/Cargo.toml b/bridge/svix-bridge/Cargo.toml index b258a79ae..726a801f5 100644 --- a/bridge/svix-bridge/Cargo.toml +++ b/bridge/svix-bridge/Cargo.toml @@ -19,7 +19,7 @@ serde = { version = "1", features = ["derive"] } serde_json = "1" serde_yaml = "0.9" svix-ksuid = "0.7.0" -svix-bridge-plugin-queue = { optional=true, path = "../svix-bridge-plugin-queue" } +svix-bridge-plugin-queue = { path = "../svix-bridge-plugin-queue" } svix-bridge-types = { path = "../svix-bridge-types" } tokio = { version = "1", features = ["full"] } tracing = "0.1" @@ -41,11 +41,5 @@ chrono = "0.4" tower = "0.4" [features] -default = ["gcp-pubsub", "rabbitmq", "redis", "sqs", "jemalloc"] - -gcp-pubsub = ["generic-queue"] -generic-queue = ["dep:svix-bridge-plugin-queue"] -rabbitmq = ["generic-queue"] -redis = ["generic-queue"] -sqs = ["generic-queue"] +default = ["jemalloc"] jemalloc = ["tikv-jemallocator", "tikv-jemalloc-ctl"] diff --git a/bridge/svix-bridge/src/config/mod.rs b/bridge/svix-bridge/src/config/mod.rs index c62d640ac..be29e9be9 100644 --- a/bridge/svix-bridge/src/config/mod.rs +++ b/bridge/svix-bridge/src/config/mod.rs @@ -147,12 +147,6 @@ pub enum LogFormat { #[derive(Deserialize)] #[serde(untagged)] pub enum WebhookSenderConfig { - #[cfg(any( - feature = "gcp-pubsub", - feature = "rabbitmq", - feature = "redis", - feature = "sqs" - ))] Queue(QueueSenderConfig), } @@ -173,12 +167,6 @@ impl TryFrom for Box { type Error = &'static str; fn try_from(value: WebhookSenderConfig) -> Result { match value { - #[cfg(any( - feature = "gcp-pubsub", - feature = "rabbitmq", - feature = "redis", - feature = "sqs" - ))] WebhookSenderConfig::Queue(backend) => backend.into_sender_input(), } } @@ -197,12 +185,6 @@ pub struct WebhookReceiverConfig { #[derive(Deserialize)] #[serde(untagged)] pub enum ReceiverOut { - #[cfg(any( - feature = "gcp-pubsub", - feature = "rabbitmq", - feature = "redis", - feature = "sqs" - ))] Queue(QueueOutOpts), } From 61c22cd84e8479295d5a6b08195c70f98983a440 Mon Sep 17 00:00:00 2001 From: Jonas Platte Date: Thu, 6 Jun 2024 14:09:20 +0200 Subject: [PATCH 3/3] bridge: Normalize config type names and structure across senders, receivers --- bridge/svix-bridge-plugin-queue/src/config.rs | 104 ++++++++---------- bridge/svix-bridge-plugin-queue/src/lib.rs | 3 +- .../src/receiver_output/mod.rs | 12 +- .../src/sender_input/mod.rs | 36 +++--- .../tests/it/gcp_pubsub_consumer.rs | 4 +- .../tests/it/rabbitmq_consumer.rs | 4 +- .../tests/it/redis_stream_consumer.rs | 4 +- .../tests/it/sqs_consumer.rs | 4 +- bridge/svix-bridge/src/config/mod.rs | 64 +++++++---- bridge/svix-bridge/src/config/tests.rs | 20 ++-- 10 files changed, 133 insertions(+), 122 deletions(-) diff --git a/bridge/svix-bridge-plugin-queue/src/config.rs b/bridge/svix-bridge-plugin-queue/src/config.rs index 216f5d0e5..ca9f34f87 100644 --- a/bridge/svix-bridge-plugin-queue/src/config.rs +++ b/bridge/svix-bridge-plugin-queue/src/config.rs @@ -12,45 +12,38 @@ pub use crate::{ sqs::{SqsInputOpts, SqsOutputOpts}, }; -#[derive(Deserialize)] -pub struct QueueSenderConfig { - pub name: String, - pub input: SenderInputOpts, - #[serde(default)] - pub transformation: Option, - pub output: SenderOutputOpts, -} - -impl QueueSenderConfig { - pub fn into_sender_input(self) -> Result, &'static str> { - // FIXME: see if this check is still needed. String transforms worked for the omniqueue redis receiver, I think? - if matches!(self.input, SenderInputOpts::Redis(_)) - && self - .transformation - .as_ref() - .map(|t| t.format() != TransformerInputFormat::Json) - .unwrap_or_default() - { - return Err("redis only supports json formatted transformations"); - } - - Ok(Box::new(QueueSender::new( - self.name, - self.input, - self.transformation, - self.output, - ))) +pub fn into_sender_input( + name: String, + input_opts: QueueInputOpts, + transformation: Option, + output: SenderOutputOpts, +) -> Result, &'static str> { + // FIXME: see if this check is still needed. String transforms worked for the omniqueue redis receiver, I think? + if matches!(input_opts, QueueInputOpts::Redis(_)) + && transformation + .as_ref() + .map(|t| t.format() != TransformerInputFormat::Json) + .unwrap_or_default() + { + return Err("redis only supports json formatted transformations"); } + + Ok(Box::new(QueueSender::new( + name, + input_opts, + transformation, + output, + ))) } pub async fn into_receiver_output( name: String, - opts: ReceiverOutputOpts, + opts: QueueOutputOpts, // Annoying to have to pass this, but certain backends (redis) only work with certain transformations (json). transformation: Option<&TransformationConfig>, ) -> Result, crate::Error> { // FIXME: see if this check is still needed. String transforms worked for the omniqueue redis receiver, I think? - if matches!(opts, ReceiverOutputOpts::Redis(_)) + if matches!(opts, QueueOutputOpts::Redis(_)) && transformation .as_ref() .map(|t| t.format() != TransformerInputFormat::Json) @@ -68,7 +61,7 @@ pub async fn into_receiver_output( // TODO: feature flag the variants, thread the features down through to generic-queue #[derive(Debug, Deserialize)] #[serde(tag = "type", rename_all = "lowercase")] -pub enum SenderInputOpts { +pub enum QueueInputOpts { #[serde(rename = "gcp-pubsub")] GCPPubSub(GCPPubSubInputOpts), RabbitMQ(RabbitMqInputOpts), @@ -78,7 +71,7 @@ pub enum SenderInputOpts { #[derive(Clone, Debug, Deserialize)] #[serde(tag = "type", rename_all = "lowercase")] -pub enum ReceiverOutputOpts { +pub enum QueueOutputOpts { #[serde(rename = "gcp-pubsub")] GCPPubSub(GCPPubSubOutputOpts), RabbitMQ(RabbitMqOutputOpts), @@ -92,9 +85,9 @@ mod tests { SenderOutputOpts, SvixSenderOutputOpts, TransformationConfig, TransformerInputFormat, }; - use super::{into_receiver_output, QueueSenderConfig}; + use super::{into_receiver_output, into_sender_input}; use crate::{ - config::{ReceiverOutputOpts, SenderInputOpts}, + config::{QueueInputOpts, QueueOutputOpts}, redis::{RedisInputOpts, RedisOutputOpts}, }; @@ -102,41 +95,40 @@ mod tests { // Revisit after `omniqueue` adoption. #[test] fn redis_sender_with_string_transformation_is_err() { - let cfg = QueueSenderConfig { - name: "redis-with-string-transformation".to_string(), - input: SenderInputOpts::Redis(RedisInputOpts { - dsn: "".to_string(), - max_connections: 0, - reinsert_on_nack: false, - queue_key: "".to_string(), - delayed_queue_key: None, - consumer_group: "".to_string(), - consumer_name: "".to_string(), - ack_deadline_ms: 2_000, - }), - transformation: Some(TransformationConfig::Explicit { + let input_opts = QueueInputOpts::Redis(RedisInputOpts { + dsn: "".to_string(), + max_connections: 0, + reinsert_on_nack: false, + queue_key: "".to_string(), + delayed_queue_key: None, + consumer_group: "".to_string(), + consumer_name: "".to_string(), + ack_deadline_ms: 2_000, + }); + + let err = into_sender_input( + "redis-with-string-transformation".to_owned(), + input_opts, + Some(TransformationConfig::Explicit { format: TransformerInputFormat::String, src: String::new(), }), - output: SenderOutputOpts::Svix(SvixSenderOutputOpts { + SenderOutputOpts::Svix(SvixSenderOutputOpts { token: "".to_string(), options: None, }), - }; - - assert_eq!( - cfg.into_sender_input() - .err() - .expect("invalid config didn't result in error"), - "redis only supports json formatted transformations" ) + .err() + .expect("invalid config didn't result in error"); + + assert_eq!(err, "redis only supports json formatted transformations") } // FIXME: can't support raw payload access for redis because it requires JSON internally. // Revisit after `omniqueue` adoption. #[tokio::test] async fn test_redis_receiver_string_transform_is_err() { - let redis_out = ReceiverOutputOpts::Redis(RedisOutputOpts { + let redis_out = QueueOutputOpts::Redis(RedisOutputOpts { dsn: "".to_string(), max_connections: 0, queue_key: "".to_string(), diff --git a/bridge/svix-bridge-plugin-queue/src/lib.rs b/bridge/svix-bridge-plugin-queue/src/lib.rs index d7f6d394e..6e047d265 100644 --- a/bridge/svix-bridge-plugin-queue/src/lib.rs +++ b/bridge/svix-bridge-plugin-queue/src/lib.rs @@ -18,7 +18,8 @@ mod redis; pub mod sender_input; mod sqs; -use error::Error; +pub use self::config::{into_receiver_output, into_sender_input}; +use self::error::Error; /// Newtype for [`omniqueue::queue::Delivery`]. /// diff --git a/bridge/svix-bridge-plugin-queue/src/receiver_output/mod.rs b/bridge/svix-bridge-plugin-queue/src/receiver_output/mod.rs index dc889722e..99bdc5726 100644 --- a/bridge/svix-bridge-plugin-queue/src/receiver_output/mod.rs +++ b/bridge/svix-bridge-plugin-queue/src/receiver_output/mod.rs @@ -3,7 +3,7 @@ use std::sync::Arc; use omniqueue::DynProducer; use svix_bridge_types::{async_trait, ForwardRequest, ReceiverOutput}; -use crate::{config::ReceiverOutputOpts, error::Result}; +use crate::{config::QueueOutputOpts, error::Result}; #[derive(Clone)] pub struct QueueForwarder { @@ -16,13 +16,13 @@ pub struct QueueForwarder { impl QueueForwarder { pub async fn from_receiver_output_opts( name: String, - opts: ReceiverOutputOpts, + opts: QueueOutputOpts, ) -> Result { let sender = match opts { - ReceiverOutputOpts::GCPPubSub(cfg) => crate::gcp_pubsub::producer(&cfg).await?, - ReceiverOutputOpts::RabbitMQ(cfg) => crate::rabbitmq::producer(&cfg).await?, - ReceiverOutputOpts::Redis(cfg) => crate::redis::producer(&cfg).await?, - ReceiverOutputOpts::SQS(cfg) => crate::sqs::producer(&cfg).await?, + QueueOutputOpts::GCPPubSub(cfg) => crate::gcp_pubsub::producer(&cfg).await?, + QueueOutputOpts::RabbitMQ(cfg) => crate::rabbitmq::producer(&cfg).await?, + QueueOutputOpts::Redis(cfg) => crate::redis::producer(&cfg).await?, + QueueOutputOpts::SQS(cfg) => crate::sqs::producer(&cfg).await?, }; Ok(QueueForwarder { name, diff --git a/bridge/svix-bridge-plugin-queue/src/sender_input/mod.rs b/bridge/svix-bridge-plugin-queue/src/sender_input/mod.rs index b56062208..54050f69e 100644 --- a/bridge/svix-bridge-plugin-queue/src/sender_input/mod.rs +++ b/bridge/svix-bridge-plugin-queue/src/sender_input/mod.rs @@ -4,15 +4,13 @@ use svix_bridge_types::{ TransformerTx, }; -use crate::{ - config::SenderInputOpts, error::Error, gcp_pubsub, rabbitmq, run_inner, sqs, Consumer, -}; +use crate::{config::QueueInputOpts, error::Error, gcp_pubsub, rabbitmq, run_inner, sqs, Consumer}; pub struct QueueSender { name: String, source: String, system: String, - input_opts: SenderInputOpts, + input_opts: QueueInputOpts, transformation: Option, transformer_tx: Option, svix_client: Svix, @@ -24,28 +22,28 @@ impl std::fmt::Debug for QueueSender { } } -fn system_name(opts: &SenderInputOpts) -> &'static str { +fn system_name(opts: &QueueInputOpts) -> &'static str { match opts { - SenderInputOpts::GCPPubSub(_) => "gcp-pubsub", - SenderInputOpts::RabbitMQ(_) => "rabbitmq", - SenderInputOpts::Redis(_) => "redis", - SenderInputOpts::SQS(_) => "sqs", + QueueInputOpts::GCPPubSub(_) => "gcp-pubsub", + QueueInputOpts::RabbitMQ(_) => "rabbitmq", + QueueInputOpts::Redis(_) => "redis", + QueueInputOpts::SQS(_) => "sqs", } } -fn source_name(opts: &SenderInputOpts) -> &str { +fn source_name(opts: &QueueInputOpts) -> &str { match opts { - SenderInputOpts::GCPPubSub(opts) => &opts.subscription_id, - SenderInputOpts::RabbitMQ(opts) => &opts.queue_name, - SenderInputOpts::Redis(opts) => &opts.queue_key, - SenderInputOpts::SQS(opts) => &opts.queue_dsn, + QueueInputOpts::GCPPubSub(opts) => &opts.subscription_id, + QueueInputOpts::RabbitMQ(opts) => &opts.queue_name, + QueueInputOpts::Redis(opts) => &opts.queue_key, + QueueInputOpts::SQS(opts) => &opts.queue_dsn, } } impl QueueSender { pub fn new( name: String, - input: SenderInputOpts, + input: QueueInputOpts, transformation: Option, output: SenderOutputOpts, ) -> Self { @@ -89,10 +87,10 @@ impl Consumer for QueueSender { async fn consumer(&self) -> std::io::Result { Ok(match &self.input_opts { - SenderInputOpts::GCPPubSub(cfg) => gcp_pubsub::consumer(cfg).await, - SenderInputOpts::RabbitMQ(cfg) => rabbitmq::consumer(cfg).await, - SenderInputOpts::Redis(cfg) => crate::redis::consumer(cfg).await, - SenderInputOpts::SQS(cfg) => sqs::consumer(cfg).await, + QueueInputOpts::GCPPubSub(cfg) => gcp_pubsub::consumer(cfg).await, + QueueInputOpts::RabbitMQ(cfg) => rabbitmq::consumer(cfg).await, + QueueInputOpts::Redis(cfg) => crate::redis::consumer(cfg).await, + QueueInputOpts::SQS(cfg) => sqs::consumer(cfg).await, } .map_err(Error::from)?) } diff --git a/bridge/svix-bridge-plugin-queue/tests/it/gcp_pubsub_consumer.rs b/bridge/svix-bridge-plugin-queue/tests/it/gcp_pubsub_consumer.rs index 3926adb82..01fd3ab62 100644 --- a/bridge/svix-bridge-plugin-queue/tests/it/gcp_pubsub_consumer.rs +++ b/bridge/svix-bridge-plugin-queue/tests/it/gcp_pubsub_consumer.rs @@ -13,7 +13,7 @@ use google_cloud_pubsub::{ }; use serde_json::json; use svix_bridge_plugin_queue::{ - config::{GCPPubSubInputOpts, SenderInputOpts}, + config::{GCPPubSubInputOpts, QueueInputOpts}, sender_input::QueueSender, }; use svix_bridge_types::{ @@ -35,7 +35,7 @@ fn get_test_plugin( ) -> QueueSender { QueueSender::new( "test".into(), - SenderInputOpts::GCPPubSub(GCPPubSubInputOpts { + QueueInputOpts::GCPPubSub(GCPPubSubInputOpts { subscription_id, credentials_file: None, }), diff --git a/bridge/svix-bridge-plugin-queue/tests/it/rabbitmq_consumer.rs b/bridge/svix-bridge-plugin-queue/tests/it/rabbitmq_consumer.rs index 3f889efaa..7c2cd6d16 100644 --- a/bridge/svix-bridge-plugin-queue/tests/it/rabbitmq_consumer.rs +++ b/bridge/svix-bridge-plugin-queue/tests/it/rabbitmq_consumer.rs @@ -10,7 +10,7 @@ use lapin::{ }; use serde_json::json; use svix_bridge_plugin_queue::{ - config::{RabbitMqInputOpts, SenderInputOpts}, + config::{QueueInputOpts, RabbitMqInputOpts}, sender_input::QueueSender, }; use svix_bridge_types::{ @@ -31,7 +31,7 @@ fn get_test_plugin( ) -> QueueSender { QueueSender::new( "test".into(), - SenderInputOpts::RabbitMQ(RabbitMqInputOpts { + QueueInputOpts::RabbitMQ(RabbitMqInputOpts { uri: mq_uri.to_string(), queue_name: queue_name.to_string(), consumer_tag: None, diff --git a/bridge/svix-bridge-plugin-queue/tests/it/redis_stream_consumer.rs b/bridge/svix-bridge-plugin-queue/tests/it/redis_stream_consumer.rs index d4ff4b5c7..eea291774 100644 --- a/bridge/svix-bridge-plugin-queue/tests/it/redis_stream_consumer.rs +++ b/bridge/svix-bridge-plugin-queue/tests/it/redis_stream_consumer.rs @@ -6,7 +6,7 @@ use std::time::Duration; use redis::{AsyncCommands, Client}; use serde_json::json; use svix_bridge_plugin_queue::{ - config::{RedisInputOpts, SenderInputOpts}, + config::{QueueInputOpts, RedisInputOpts}, sender_input::QueueSender, }; use svix_bridge_types::{ @@ -26,7 +26,7 @@ fn get_test_plugin( ) -> QueueSender { QueueSender::new( "test".into(), - SenderInputOpts::Redis(RedisInputOpts { + QueueInputOpts::Redis(RedisInputOpts { dsn: "redis://localhost/".to_owned(), max_connections: 8, reinsert_on_nack: false, diff --git a/bridge/svix-bridge-plugin-queue/tests/it/sqs_consumer.rs b/bridge/svix-bridge-plugin-queue/tests/it/sqs_consumer.rs index 69e3c0135..9343cb414 100644 --- a/bridge/svix-bridge-plugin-queue/tests/it/sqs_consumer.rs +++ b/bridge/svix-bridge-plugin-queue/tests/it/sqs_consumer.rs @@ -8,7 +8,7 @@ use std::time::Duration; use aws_sdk_sqs::Client; use serde_json::json; use svix_bridge_plugin_queue::{ - config::{SenderInputOpts, SqsInputOpts}, + config::{QueueInputOpts, SqsInputOpts}, sender_input::QueueSender, }; use svix_bridge_types::{ @@ -35,7 +35,7 @@ fn get_test_plugin( ) -> QueueSender { QueueSender::new( "test".into(), - SenderInputOpts::SQS(SqsInputOpts { + QueueInputOpts::SQS(SqsInputOpts { queue_dsn, override_endpoint: true, }), diff --git a/bridge/svix-bridge/src/config/mod.rs b/bridge/svix-bridge/src/config/mod.rs index be29e9be9..009449ebc 100644 --- a/bridge/svix-bridge/src/config/mod.rs +++ b/bridge/svix-bridge/src/config/mod.rs @@ -10,10 +10,10 @@ use std::{ use serde::Deserialize; use shellexpand::LookupError; -use svix_bridge_plugin_queue::config::{ - into_receiver_output, QueueSenderConfig, ReceiverOutputOpts as QueueOutOpts, +use svix_bridge_plugin_queue::config::{QueueInputOpts, QueueOutputOpts}; +use svix_bridge_types::{ + ReceiverInputOpts, ReceiverOutput, SenderInput, SenderOutputOpts, TransformationConfig, }; -use svix_bridge_types::{ReceiverInputOpts, ReceiverOutput, SenderInput, TransformationConfig}; use tracing::Level; #[derive(Deserialize)] @@ -144,31 +144,49 @@ pub enum LogFormat { } /// Config for reading messages from plugins and forwarding to Svix. +#[derive(Deserialize)] +pub struct WebhookSenderConfig { + pub name: String, + pub input: SenderInputOpts, + #[serde(default)] + pub transformation: Option, + pub output: SenderOutputOpts, +} + #[derive(Deserialize)] #[serde(untagged)] -pub enum WebhookSenderConfig { - Queue(QueueSenderConfig), +pub enum SenderInputOpts { + Queue(QueueInputOpts), } impl WebhookSenderConfig { - pub fn name(&self) -> &str { - match self { - WebhookSenderConfig::Queue(cfg) => &cfg.name, + pub fn into_sender_input(self) -> Result, &'static str> { + match self.input { + SenderInputOpts::Queue(input_opts) => svix_bridge_plugin_queue::into_sender_input( + self.name, + input_opts, + self.transformation, + self.output, + ), } } +} + +impl WebhookSenderConfig { + pub fn name(&self) -> &str { + &self.name + } + pub fn transformation(&self) -> Option<&TransformationConfig> { - match self { - WebhookSenderConfig::Queue(cfg) => cfg.transformation.as_ref(), - } + self.transformation.as_ref() } } impl TryFrom for Box { type Error = &'static str; + fn try_from(value: WebhookSenderConfig) -> Result { - match value { - WebhookSenderConfig::Queue(backend) => backend.into_sender_input(), - } + value.into_sender_input() } } @@ -179,23 +197,25 @@ pub struct WebhookReceiverConfig { pub input: ReceiverInputOpts, #[serde(default)] pub transformation: Option, - pub output: ReceiverOut, + pub output: ReceiverOutputOpts, } #[derive(Deserialize)] #[serde(untagged)] -pub enum ReceiverOut { - Queue(QueueOutOpts), +pub enum ReceiverOutputOpts { + Queue(QueueOutputOpts), } impl WebhookReceiverConfig { pub async fn into_receiver_output(self) -> std::io::Result> { match self.output { - ReceiverOut::Queue(x) => { - into_receiver_output(self.name.clone(), x, self.transformation.as_ref()) - .await - .map_err(Into::into) - } + ReceiverOutputOpts::Queue(x) => svix_bridge_plugin_queue::into_receiver_output( + self.name.clone(), + x, + self.transformation.as_ref(), + ) + .await + .map_err(Into::into), } } } diff --git a/bridge/svix-bridge/src/config/tests.rs b/bridge/svix-bridge/src/config/tests.rs index 7145c545e..c294d0614 100644 --- a/bridge/svix-bridge/src/config/tests.rs +++ b/bridge/svix-bridge/src/config/tests.rs @@ -1,9 +1,9 @@ use std::collections::HashMap; -use svix_bridge_plugin_queue::config::{QueueSenderConfig, RabbitMqInputOpts, SenderInputOpts}; +use svix_bridge_plugin_queue::config::{QueueInputOpts, RabbitMqInputOpts}; use svix_bridge_types::{SenderOutputOpts, SvixSenderOutputOpts}; -use super::Config; +use super::{Config, SenderInputOpts}; use crate::config::{LogFormat, LogLevel, WebhookSenderConfig}; /// This is meant to be a kitchen sink config, hitting as many possible @@ -455,14 +455,14 @@ fn test_variable_substitution_repeated_lookups() { vars.insert(String::from("SVIX_TOKEN"), String::from("x")); let cfg = Config::from_src(src, Some(&vars)).unwrap(); - if let WebhookSenderConfig::Queue(QueueSenderConfig { + if let WebhookSenderConfig { input: - SenderInputOpts::RabbitMQ(RabbitMqInputOpts { + SenderInputOpts::Queue(QueueInputOpts::RabbitMQ(RabbitMqInputOpts { uri, queue_name, .. - }), + })), output: SenderOutputOpts::Svix(SvixSenderOutputOpts { token, .. }), .. - }) = &cfg.senders[0] + } = &cfg.senders[0] { assert_eq!(uri, "amqp://guest:guest@localhost:5672/%2f"); assert_eq!(queue_name, "one"); @@ -471,14 +471,14 @@ fn test_variable_substitution_repeated_lookups() { panic!("sender did not match expected pattern"); } - if let WebhookSenderConfig::Queue(QueueSenderConfig { + if let WebhookSenderConfig { input: - SenderInputOpts::RabbitMQ(RabbitMqInputOpts { + SenderInputOpts::Queue(QueueInputOpts::RabbitMQ(RabbitMqInputOpts { uri, queue_name, .. - }), + })), output: SenderOutputOpts::Svix(SvixSenderOutputOpts { token, .. }), .. - }) = &cfg.senders[1] + } = &cfg.senders[1] { assert_eq!(uri, "amqp://guest:guest@localhost:5672/%2f"); assert_eq!(queue_name, "two");