Skip to content

Commit

Permalink
bridge: Add Kafka as an input
Browse files Browse the repository at this point in the history
… that is, support converting Kafka messages into Svix API calls.
  • Loading branch information
svix-jplatte committed Jun 11, 2024
1 parent 7fb1958 commit 217a64b
Show file tree
Hide file tree
Showing 9 changed files with 446 additions and 12 deletions.
80 changes: 74 additions & 6 deletions bridge/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions bridge/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ members = [
"svix-bridge-types",
"svix-bridge",
"svix-bridge-plugin-queue",
"svix-bridge-plugin-kafka",
]

[profile.dev.package]
Expand Down
13 changes: 13 additions & 0 deletions bridge/svix-bridge-plugin-kafka/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
[package]
name = "svix-bridge-plugin-kafka"
version = "0.1.0"
edition = "2021"

[dependencies]
rdkafka = { version = "0.36.0", features = ["cmake-build", "ssl", "tracing"] }
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0.117"
svix-bridge-types = { path = "../svix-bridge-types" }
thiserror = "1.0.61"
tokio = { version = "1.28.1", features = ["time"] }
tracing = "0.1.40"
58 changes: 58 additions & 0 deletions bridge/svix-bridge-plugin-kafka/src/config.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
use serde::Deserialize;
use svix_bridge_types::{SenderInput, SenderOutputOpts, TransformationConfig};

use crate::{input::KafkaConsumer, Result};

#[derive(Deserialize)]
pub struct KafkaInputOpts {
/// Comma-separated list of addresses.
///
/// Example: `localhost:9094`
#[serde(rename = "kafka_bootstrap_brokers")]
pub(crate) bootstrap_brokers: String,

/// The consumer group ID, used to track the stream offset between restarts
/// (due to host maintenance, upgrades, crashes, etc.).
#[serde(rename = "kafka_group_id")]
pub(crate) group_id: String,

/// The topic to listen to.
#[serde(rename = "kafka_topic")]
pub(crate) topic: String,

/// The value for 'security.protocol' in the kafka config.
#[serde(flatten)]
pub(crate) security_protocol: KafkaSecurityProtocol,

/// The 'debug' config value for rdkafka - enables more verbose logging
/// for the selected 'contexts'
#[serde(rename = "kafka_debug_contexts")]
pub(crate) debug_contexts: Option<String>,
}

#[derive(Clone, Debug, Deserialize)]
#[serde(tag = "kafka_security_protocol", rename_all = "snake_case")]
pub enum KafkaSecurityProtocol {
Plaintext,
Ssl,
SaslSsl {
#[serde(rename = "kafka_sasl_username")]
sasl_username: String,
#[serde(rename = "kafka_sasl_password")]
sasl_password: String,
},
}

pub fn into_sender_input(
name: String,
opts: KafkaInputOpts,
transformation: Option<TransformationConfig>,
output: SenderOutputOpts,
) -> Result<Box<dyn SenderInput>> {
Ok(Box::new(KafkaConsumer::new(
name,
opts,
transformation,
output,
)?))
}
35 changes: 35 additions & 0 deletions bridge/svix-bridge-plugin-kafka/src/error.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
use std::str;

use rdkafka::error::KafkaError;
use svix_bridge_types::svix::error::Error as SvixClientError;

#[derive(Debug, thiserror::Error)]
pub enum Error {
#[error("kafka error")]
Kafka(#[from] KafkaError),

#[error("svix client error")]
SvixClient(#[from] SvixClientError),

#[error("JSON deserialization failed")]
Deserialization(#[source] serde_json::Error),

#[error("non-UTF8 payload")]
NonUtf8Payload(#[source] str::Utf8Error),

#[error("kafka message is missing payload")]
MissingPayload,

#[error("transformation error: {error}")]
Transformation { error: String },
}

impl Error {
pub(crate) fn transformation(error: impl Into<String>) -> Self {
Self::Transformation {
error: error.into(),
}
}
}

pub type Result<T, E = Error> = std::result::Result<T, E>;
Loading

0 comments on commit 217a64b

Please sign in to comment.