Skip to content

Commit

Permalink
Merge branch 'main' into end-to-end-docs
Browse files Browse the repository at this point in the history
  • Loading branch information
rukai authored Jan 6, 2025
2 parents edd168a + ea0bf41 commit 369f5bd
Show file tree
Hide file tree
Showing 30 changed files with 488 additions and 501 deletions.
603 changes: 271 additions & 332 deletions Cargo.lock

Large diffs are not rendered by default.

8 changes: 5 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,12 @@ inherits = "release"
debug = true

[workspace.dependencies]
scylla = { version = "0.14.0", features = ["ssl"] }
scylla = { version = "0.15.0", features = ["ssl"] }
bytes = { version = "1.0.0", features = ["serde"] }
tokio = { version = "1.25.0", features = ["full"] }
tokio-util = { version = "0.7.7", features = ["codec"] }
tokio-openssl = "0.6.2"
itertools = "0.13.0"
itertools = "0.14.0"
openssl = { version = "0.10.36", features = ["vendored"] }
anyhow = "1.0.76"
serde = { version = "1.0.111", features = ["derive"] }
Expand All @@ -36,7 +36,9 @@ reqwest = "0.12.0"
redis = { version = "0.24.0", features = ["tokio-comp", "cluster"] }
cdrs-tokio = "8.0"
cassandra-protocol = "3.0"
tracing = "0.1.15"
# https://docs.rs/tracing/latest/tracing/level_filters/index.html#compile-time-filters
# `trace` level is considered development only, and may contain sensitive data, do not include it in release builds.
tracing = { version = "0.1.15", features = ["release_max_level_debug"] }
tracing-subscriber = { version = "0.3.1", features = ["env-filter", "json"] }
tracing-appender = "0.2.0"
serde_json = "1.0"
Expand Down
8 changes: 5 additions & 3 deletions shotover-proxy/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ license = "Apache-2.0"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
shotover = { path = "../shotover", default-features = false}
shotover = { path = "../shotover", default-features = false }

[dev-dependencies]
prometheus-parse = "0.2.4"
Expand All @@ -17,7 +17,7 @@ scylla.workspace = true
anyhow.workspace = true
tokio.workspace = true
tracing.workspace = true
rstest = "0.22.0"
rstest = "0.24.0"
rstest_reuse = "0.7.0"
test-helpers = { path = "../test-helpers" }
redis.workspace = true
Expand All @@ -44,7 +44,9 @@ rustls-pki-types = "1.1.0"
aws-throwaway.workspace = true
windsock = "0.2.0"
regex = "1.7.0"
opensearch = { version = "2.1.0", default-features = false, features = ["rustls-tls"] }
opensearch = { version = "2.1.0", default-features = false, features = [
"rustls-tls",
] }
serde_json = "1.0.103"
time = { version = "0.3.25" }
shell-quote.workspace = true
Expand Down
14 changes: 12 additions & 2 deletions shotover-proxy/tests/cassandra_int_tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,10 @@ use futures::Future;
use pretty_assertions::assert_eq;
use rstest::rstest;
use rstest_reuse::{self, *};
use scylla::transport::errors::{
ConnectionError, ConnectionPoolError, ConnectionSetupRequestError,
ConnectionSetupRequestErrorKind, DbError, NewSessionError,
};
use scylla::SessionBuilder;
use std::net::SocketAddr;
#[cfg(feature = "cassandra-cpp-driver-tests")]
Expand Down Expand Up @@ -141,10 +145,16 @@ async fn passthrough_cassandra_down() {
.await
.unwrap_err();
match err {
scylla::transport::errors::NewSessionError::IoError(err) => {
NewSessionError::ConnectionPoolError(ConnectionPoolError::Broken {
last_connection_error:
ConnectionError::ConnectionSetupRequestError(ConnectionSetupRequestError {
error: ConnectionSetupRequestErrorKind::DbError(DbError::ServerError, err),
..
}),
}) => {
assert_eq!(
format!("{err}"),
format!("No connections in the pool; last connection failed with: Database returned an error: Internal server error. This indicates a server-side bug, Error message: Internal shotover (or custom transform) bug: Chain failed to send and/or receive messages, the connection will now be closed.
format!("Internal shotover (or custom transform) bug: Chain failed to send and/or receive messages, the connection will now be closed.
Caused by:
0: CassandraSinkSingle transform failed
Expand Down
4 changes: 2 additions & 2 deletions shotover/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ default = ["cassandra", "valkey", "kafka", "opensearch"]

[dependencies]
atomic_enum = "0.3.0"
axum = { version = "0.7", default-features = false, features = ["tokio", "tracing", "http1"] }
axum = { version = "0.8", default-features = false, features = ["tokio", "tracing", "http1"] }
pretty-hex = "0.4.0"
tokio-stream = "0.1.2"
derivative = "2.1.1"
Expand Down Expand Up @@ -76,7 +76,7 @@ csv = { workspace = true, optional = true }
hex = { workspace = true, optional = true }
async-trait.workspace = true
typetag.workspace = true
tokio-tungstenite = "0.24.0"
tokio-tungstenite = "0.26.0"

# Error handling
thiserror = "2.0"
Expand Down
3 changes: 1 addition & 2 deletions shotover/src/codec/cassandra.rs
Original file line number Diff line number Diff line change
Expand Up @@ -642,8 +642,7 @@ impl Decoder for CassandraDecoder {
Err(CodecReadError::Parser(anyhow!(msg)))
}
err => Err(CodecReadError::Parser(anyhow!(
"Failed to parse frame {:?}",
err
"Failed to parse frame {err:?}"
))),
}
}
Expand Down
4 changes: 3 additions & 1 deletion shotover/src/codec/kafka.rs
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,9 @@ impl Encoder<Messages> for KafkaEncoder {
}) => {
dst.extend_from_slice(&body.auth_bytes);
}
_ => unreachable!("not expected {frame:?}"),
_ => unreachable!(
"Expected kafka sasl authenticate request or response but was not"
),
}
Ok(())
} else {
Expand Down
4 changes: 2 additions & 2 deletions shotover/src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -385,8 +385,8 @@ async fn reader_task<C: CodecBuilder + 'static, R: AsyncRead + Unpin + Send + 's
force_run_chain.notify_one();
}
Err(CodecReadError::RespondAndThenCloseConnection(messages)) => {
if let Err(err) = out_tx.send(messages) {
error!("Failed to send RespondAndThenCloseConnection message: {:?}", err);
if out_tx.send(messages).is_err() {
error!("Failed to send RespondAndThenCloseConnection message");
}
return Err(ConnectionError::ShotoverClosed);
}
Expand Down
20 changes: 20 additions & 0 deletions shotover/src/frame/cassandra.rs
Original file line number Diff line number Diff line change
Expand Up @@ -459,6 +459,26 @@ impl CassandraFrame {
}
}
}
pub(crate) fn operation_name(operation: &CassandraOperation) -> &'static str {
match operation {
CassandraOperation::Query { .. } => "Query",
CassandraOperation::Result(_) => "Result",
CassandraOperation::Error(_) => "Error",
CassandraOperation::Prepare(_) => "Prepare",
CassandraOperation::Execute(_) => "Execute",
CassandraOperation::Register(_) => "Register",
CassandraOperation::Event(_) => "Event",
CassandraOperation::Batch(_) => "Batch",
CassandraOperation::Startup(_) => "Startup",
CassandraOperation::Ready(_) => "Ready",
CassandraOperation::Authenticate(_) => "Authenticate",
CassandraOperation::Options(_) => "Options",
CassandraOperation::Supported(_) => "Supported",
CassandraOperation::AuthChallenge(_) => "AuthChallenge",
CassandraOperation::AuthResponse(_) => "AuthResponse",
CassandraOperation::AuthSuccess(_) => "AuthSuccess",
}
}

#[derive(PartialEq, Debug, Clone)]
pub enum CassandraOperation {
Expand Down
4 changes: 2 additions & 2 deletions shotover/src/frame/kafka.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ impl Display for KafkaFrame {
header.unknown_tagged_fields
)?;
}
write!(f, " {:?}", body)?;
write!(f, " {body:?}")?;
}
KafkaFrame::Response {
version,
Expand All @@ -63,7 +63,7 @@ impl Display for KafkaFrame {
header.unknown_tagged_fields
)?;
}
write!(f, " {body:?}",)?;
write!(f, " {body:?}")?;
}
}
Ok(())
Expand Down
8 changes: 4 additions & 4 deletions shotover/src/frame/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -237,14 +237,14 @@ impl Display for Frame {
fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult {
match self {
#[cfg(feature = "cassandra")]
Frame::Cassandra(frame) => write!(f, "Cassandra {}", frame),
Frame::Cassandra(frame) => write!(f, "Cassandra {frame}"),
#[cfg(feature = "valkey")]
Frame::Valkey(frame) => write!(f, "Valkey {:?}", frame),
Frame::Valkey(frame) => write!(f, "Valkey {frame:?}"),
#[cfg(feature = "kafka")]
Frame::Kafka(frame) => write!(f, "Kafka {}", frame),
Frame::Kafka(frame) => write!(f, "Kafka {frame}"),
Frame::Dummy => write!(f, "Shotover internal dummy message"),
#[cfg(feature = "opensearch")]
Frame::OpenSearch(frame) => write!(f, "OpenSearch: {:?}", frame),
Frame::OpenSearch(frame) => write!(f, "OpenSearch: {frame:?}"),
}
}
}
5 changes: 1 addition & 4 deletions shotover/src/frame/valkey.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,7 @@ pub fn valkey_query_name(frame: &ValkeyFrame) -> Option<String> {
return Some(query_type);
}
Err(err) => {
tracing::error!(
"Failed to convert valkey bulkstring to string, err: {:?}",
err
)
tracing::error!("Failed to convert valkey bulkstring to string, err: {err:?}")
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion shotover/src/message/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -553,7 +553,7 @@ impl Message {
message_type,
}) = &self.inner
{
format!("Unparseable {:?} message {:?}", message_type, bytes)
format!("Unparseable {message_type:?} message {bytes:?}")
} else {
unreachable!("self.frame() failed so MessageInner must still be RawBytes")
}
Expand Down
22 changes: 12 additions & 10 deletions shotover/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ use tokio_tungstenite::tungstenite::{
protocol::Message as WsMessage,
};
use tokio_util::codec::{Decoder, Encoder, FramedRead, FramedWrite};
use tracing::Instrument;
use tracing::{debug, error, warn};
use tracing::{trace, Instrument};

pub struct TcpCodecListener<C: CodecBuilder> {
chain_builder: TransformChainBuilder,
Expand Down Expand Up @@ -343,7 +343,7 @@ async fn spawn_websocket_read_write_tasks<
Ok(WsMessage::Binary(ws_message_data)) => {
// Entire message is reallocated and copied here due to
// incompatibility between tokio codecs and tungstenite.
let message = decoder.decode(&mut BytesMut::from(ws_message_data.as_slice()));
let message = decoder.decode(&mut BytesMut::from(ws_message_data.as_ref()));
match message {
Ok(Some(message)) => {
if in_tx.send(message).await.is_err() {
Expand All @@ -358,13 +358,13 @@ async fn spawn_websocket_read_write_tasks<
Err(CodecReadError::RespondAndThenCloseConnection(messages)) => {
if let Err(err) = out_tx.send(messages) {
// TODO we need to send a close message to the client
error!("Failed to send RespondAndThenCloseConnection message: {:?}", err);
error!("Failed to send RespondAndThenCloseConnection message: {err}");
}
return;
}
Err(CodecReadError::Parser(err)) => {
// TODO we need to send a close message to the client, protocol error
warn!("failed to decode message: {:?}", err);
warn!("failed to decode message: {err:?}");
return;
}
Err(CodecReadError::Io(_err)) => {
Expand Down Expand Up @@ -480,12 +480,12 @@ pub fn spawn_read_write_tasks<
}
Err(CodecReadError::RespondAndThenCloseConnection(messages)) => {
if let Err(err) = out_tx.send(messages) {
error!("Failed to send RespondAndThenCloseConnection message: {:?}", err);
error!("Failed to send RespondAndThenCloseConnection message: {err}");
}
return;
}
Err(CodecReadError::Parser(err)) => {
warn!("failed to decode message: {:?}", err);
warn!("failed to decode message: {err:?}");
return;
}
Err(CodecReadError::Io(err)) => {
Expand All @@ -494,7 +494,7 @@ pub fn spawn_read_write_tasks<
// We shouldnt report that as a warning because its common for clients to do
// that for performance reasons.
if !matches!(err.kind(), ErrorKind::UnexpectedEof) {
warn!("failed to receive message on tcp stream: {:?}", err);
warn!("failed to receive message on tcp stream: {err:?}");
}
return;
}
Expand Down Expand Up @@ -699,7 +699,7 @@ impl<C: CodecBuilder + 'static> Handler<C> {
while let Ok(x) = in_rx.try_recv() {
requests.extend(x);
}
debug!("A transform in the chain requested that a chain run occur, requests {:?}", requests);
debug!("running transform chain because a transform in the chain requested that a chain run occur");
if let Some(close_reason) = self.send_receive_chain(local_addr, &out_tx, requests).await? {
return Ok(close_reason)
}
Expand All @@ -710,7 +710,7 @@ impl<C: CodecBuilder + 'static> Handler<C> {
while let Ok(x) = in_rx.try_recv() {
requests.extend(x);
}
debug!("Received requests from client {:?}", requests);
debug!("running transform chain because requests received from client");
if let Some(close_reason) = self.send_receive_chain(local_addr, &out_tx, requests).await? {
return Ok(close_reason)
}
Expand All @@ -731,6 +731,7 @@ impl<C: CodecBuilder + 'static> Handler<C> {
out_tx: &mpsc::UnboundedSender<Messages>,
requests: Messages,
) -> Result<Option<CloseReason>> {
trace!("running transform chain with requests: {requests:?}");
let mut wrapper = ChainState::new_with_addr(requests, local_addr);

self.pending_requests.process_requests(&wrapper.requests);
Expand All @@ -748,7 +749,8 @@ impl<C: CodecBuilder + 'static> Handler<C> {

// send the result of the process up stream
if !responses.is_empty() {
debug!("sending response to client: {:?}", responses);
debug!("sending {} responses to client", responses.len());
trace!("sending response to client: {responses:?}");
if out_tx.send(responses).is_err() {
// the client has disconnected so we should terminate this connection
return Ok(Some(CloseReason::ClientClosed));
Expand Down
4 changes: 2 additions & 2 deletions shotover/src/transforms/cassandra/sink_cluster/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -559,7 +559,7 @@ impl CassandraSinkCluster {
}
Err(GetReplicaErr::NoPreparedMetadata) => {
let id = execute.id.clone();
tracing::info!("forcing re-prepare on {:?}", id);
tracing::info!("forcing re-prepare on {id:?}");
// this shotover node doesn't have the metadata.
// send an unprepared error in response to force
// the client to reprepare the query
Expand Down Expand Up @@ -632,7 +632,7 @@ impl CassandraSinkCluster {
self.set_control_connection(connection, address);
}
tracing::info!(
"Control connection finalized against node at: {:?}",
"Control connection finalized against node at: {}",
self.control_connection_address.unwrap()
);

Expand Down
Loading

0 comments on commit 369f5bd

Please sign in to comment.