Skip to content

Commit

Permalink
Merge branch 'main' into update_deps2
Browse files Browse the repository at this point in the history
  • Loading branch information
rukai authored Jan 5, 2025
2 parents b3653b3 + f9e208b commit 83c8c8d
Show file tree
Hide file tree
Showing 23 changed files with 166 additions and 141 deletions.
4 changes: 3 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,9 @@ reqwest = "0.12.0"
redis = { version = "0.24.0", features = ["tokio-comp", "cluster"] }
cdrs-tokio = "8.0"
cassandra-protocol = "3.0"
tracing = "0.1.15"
# https://docs.rs/tracing/latest/tracing/level_filters/index.html#compile-time-filters
# `trace` level is considered development only, and may contain sensitive data, do not include it in release builds.
tracing = { version = "0.1.15", features = ["release_max_level_debug"] }
tracing-subscriber = { version = "0.3.1", features = ["env-filter", "json"] }
tracing-appender = "0.2.0"
serde_json = "1.0"
Expand Down
3 changes: 1 addition & 2 deletions shotover/src/codec/cassandra.rs
Original file line number Diff line number Diff line change
Expand Up @@ -642,8 +642,7 @@ impl Decoder for CassandraDecoder {
Err(CodecReadError::Parser(anyhow!(msg)))
}
err => Err(CodecReadError::Parser(anyhow!(
"Failed to parse frame {:?}",
err
"Failed to parse frame {err:?}"
))),
}
}
Expand Down
4 changes: 3 additions & 1 deletion shotover/src/codec/kafka.rs
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,9 @@ impl Encoder<Messages> for KafkaEncoder {
}) => {
dst.extend_from_slice(&body.auth_bytes);
}
_ => unreachable!("not expected {frame:?}"),
_ => unreachable!(
"Expected kafka sasl authenticate request or response but was not"
),
}
Ok(())
} else {
Expand Down
4 changes: 2 additions & 2 deletions shotover/src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -385,8 +385,8 @@ async fn reader_task<C: CodecBuilder + 'static, R: AsyncRead + Unpin + Send + 's
force_run_chain.notify_one();
}
Err(CodecReadError::RespondAndThenCloseConnection(messages)) => {
if let Err(err) = out_tx.send(messages) {
error!("Failed to send RespondAndThenCloseConnection message: {:?}", err);
if out_tx.send(messages).is_err() {
error!("Failed to send RespondAndThenCloseConnection message");
}
return Err(ConnectionError::ShotoverClosed);
}
Expand Down
20 changes: 20 additions & 0 deletions shotover/src/frame/cassandra.rs
Original file line number Diff line number Diff line change
Expand Up @@ -459,6 +459,26 @@ impl CassandraFrame {
}
}
}
pub(crate) fn operation_name(operation: &CassandraOperation) -> &'static str {
match operation {
CassandraOperation::Query { .. } => "Query",
CassandraOperation::Result(_) => "Result",
CassandraOperation::Error(_) => "Error",
CassandraOperation::Prepare(_) => "Prepare",
CassandraOperation::Execute(_) => "Execute",
CassandraOperation::Register(_) => "Register",
CassandraOperation::Event(_) => "Event",
CassandraOperation::Batch(_) => "Batch",
CassandraOperation::Startup(_) => "Startup",
CassandraOperation::Ready(_) => "Ready",
CassandraOperation::Authenticate(_) => "Authenticate",
CassandraOperation::Options(_) => "Options",
CassandraOperation::Supported(_) => "Supported",
CassandraOperation::AuthChallenge(_) => "AuthChallenge",
CassandraOperation::AuthResponse(_) => "AuthResponse",
CassandraOperation::AuthSuccess(_) => "AuthSuccess",
}
}

#[derive(PartialEq, Debug, Clone)]
pub enum CassandraOperation {
Expand Down
4 changes: 2 additions & 2 deletions shotover/src/frame/kafka.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ impl Display for KafkaFrame {
header.unknown_tagged_fields
)?;
}
write!(f, " {:?}", body)?;
write!(f, " {body:?}")?;
}
KafkaFrame::Response {
version,
Expand All @@ -63,7 +63,7 @@ impl Display for KafkaFrame {
header.unknown_tagged_fields
)?;
}
write!(f, " {body:?}",)?;
write!(f, " {body:?}")?;
}
}
Ok(())
Expand Down
8 changes: 4 additions & 4 deletions shotover/src/frame/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -237,14 +237,14 @@ impl Display for Frame {
fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult {
match self {
#[cfg(feature = "cassandra")]
Frame::Cassandra(frame) => write!(f, "Cassandra {}", frame),
Frame::Cassandra(frame) => write!(f, "Cassandra {frame}"),
#[cfg(feature = "valkey")]
Frame::Valkey(frame) => write!(f, "Valkey {:?}", frame),
Frame::Valkey(frame) => write!(f, "Valkey {frame:?}"),
#[cfg(feature = "kafka")]
Frame::Kafka(frame) => write!(f, "Kafka {}", frame),
Frame::Kafka(frame) => write!(f, "Kafka {frame}"),
Frame::Dummy => write!(f, "Shotover internal dummy message"),
#[cfg(feature = "opensearch")]
Frame::OpenSearch(frame) => write!(f, "OpenSearch: {:?}", frame),
Frame::OpenSearch(frame) => write!(f, "OpenSearch: {frame:?}"),
}
}
}
5 changes: 1 addition & 4 deletions shotover/src/frame/valkey.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,7 @@ pub fn valkey_query_name(frame: &ValkeyFrame) -> Option<String> {
return Some(query_type);
}
Err(err) => {
tracing::error!(
"Failed to convert valkey bulkstring to string, err: {:?}",
err
)
tracing::error!("Failed to convert valkey bulkstring to string, err: {err:?}")
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion shotover/src/message/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -553,7 +553,7 @@ impl Message {
message_type,
}) = &self.inner
{
format!("Unparseable {:?} message {:?}", message_type, bytes)
format!("Unparseable {message_type:?} message {bytes:?}")
} else {
unreachable!("self.frame() failed so MessageInner must still be RawBytes")
}
Expand Down
20 changes: 11 additions & 9 deletions shotover/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ use tokio_tungstenite::tungstenite::{
protocol::Message as WsMessage,
};
use tokio_util::codec::{Decoder, Encoder, FramedRead, FramedWrite};
use tracing::Instrument;
use tracing::{debug, error, warn};
use tracing::{trace, Instrument};

pub struct TcpCodecListener<C: CodecBuilder> {
chain_builder: TransformChainBuilder,
Expand Down Expand Up @@ -358,13 +358,13 @@ async fn spawn_websocket_read_write_tasks<
Err(CodecReadError::RespondAndThenCloseConnection(messages)) => {
if let Err(err) = out_tx.send(messages) {
// TODO we need to send a close message to the client
error!("Failed to send RespondAndThenCloseConnection message: {:?}", err);
error!("Failed to send RespondAndThenCloseConnection message: {err}");
}
return;
}
Err(CodecReadError::Parser(err)) => {
// TODO we need to send a close message to the client, protocol error
warn!("failed to decode message: {:?}", err);
warn!("failed to decode message: {err:?}");
return;
}
Err(CodecReadError::Io(_err)) => {
Expand Down Expand Up @@ -480,12 +480,12 @@ pub fn spawn_read_write_tasks<
}
Err(CodecReadError::RespondAndThenCloseConnection(messages)) => {
if let Err(err) = out_tx.send(messages) {
error!("Failed to send RespondAndThenCloseConnection message: {:?}", err);
error!("Failed to send RespondAndThenCloseConnection message: {err}");
}
return;
}
Err(CodecReadError::Parser(err)) => {
warn!("failed to decode message: {:?}", err);
warn!("failed to decode message: {err:?}");
return;
}
Err(CodecReadError::Io(err)) => {
Expand All @@ -494,7 +494,7 @@ pub fn spawn_read_write_tasks<
// We shouldnt report that as a warning because its common for clients to do
// that for performance reasons.
if !matches!(err.kind(), ErrorKind::UnexpectedEof) {
warn!("failed to receive message on tcp stream: {:?}", err);
warn!("failed to receive message on tcp stream: {err:?}");
}
return;
}
Expand Down Expand Up @@ -699,7 +699,7 @@ impl<C: CodecBuilder + 'static> Handler<C> {
while let Ok(x) = in_rx.try_recv() {
requests.extend(x);
}
debug!("A transform in the chain requested that a chain run occur, requests {:?}", requests);
debug!("running transform chain because a transform in the chain requested that a chain run occur");
if let Some(close_reason) = self.send_receive_chain(local_addr, &out_tx, requests).await? {
return Ok(close_reason)
}
Expand All @@ -710,7 +710,7 @@ impl<C: CodecBuilder + 'static> Handler<C> {
while let Ok(x) = in_rx.try_recv() {
requests.extend(x);
}
debug!("Received requests from client {:?}", requests);
debug!("running transform chain because requests received from client");
if let Some(close_reason) = self.send_receive_chain(local_addr, &out_tx, requests).await? {
return Ok(close_reason)
}
Expand All @@ -731,6 +731,7 @@ impl<C: CodecBuilder + 'static> Handler<C> {
out_tx: &mpsc::UnboundedSender<Messages>,
requests: Messages,
) -> Result<Option<CloseReason>> {
trace!("running transform chain with requests: {requests:?}");
let mut wrapper = ChainState::new_with_addr(requests, local_addr);

self.pending_requests.process_requests(&wrapper.requests);
Expand All @@ -748,7 +749,8 @@ impl<C: CodecBuilder + 'static> Handler<C> {

// send the result of the process up stream
if !responses.is_empty() {
debug!("sending response to client: {:?}", responses);
debug!("sending {} responses to client", responses.len());
trace!("sending response to client: {responses:?}");
if out_tx.send(responses).is_err() {
// the client has disconnected so we should terminate this connection
return Ok(Some(CloseReason::ClientClosed));
Expand Down
4 changes: 2 additions & 2 deletions shotover/src/transforms/cassandra/sink_cluster/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -559,7 +559,7 @@ impl CassandraSinkCluster {
}
Err(GetReplicaErr::NoPreparedMetadata) => {
let id = execute.id.clone();
tracing::info!("forcing re-prepare on {:?}", id);
tracing::info!("forcing re-prepare on {id:?}");
// this shotover node doesn't have the metadata.
// send an unprepared error in response to force
// the client to reprepare the query
Expand Down Expand Up @@ -632,7 +632,7 @@ impl CassandraSinkCluster {
self.set_control_connection(connection, address);
}
tracing::info!(
"Control connection finalized against node at: {:?}",
"Control connection finalized against node at: {}",
self.control_connection_address.unwrap()
);

Expand Down
41 changes: 23 additions & 18 deletions shotover/src/transforms/cassandra/sink_cluster/rewrite.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use super::node::ConnectionFactory;
use super::node_pool::NodePool;
use super::ShotoverNode;
use crate::frame::cassandra::operation_name;
use crate::frame::{CassandraFrame, CassandraOperation, CassandraResult, Frame};
use crate::message::{Message, MessageIdMap, Messages};
use crate::{
Expand Down Expand Up @@ -350,9 +351,16 @@ impl MessageRewriter {
CassandraOperation::Error(_),
..
})) => None,
other => {
tracing::error!("Response to Prepare query was not a Prepared, was instead: {other:?}");
warnings.push(format!("Shotover: Response to Prepare query was not a Prepared, was instead: {other:?}"));
Some(Frame::Cassandra(CassandraFrame { operation, .. })) => {
let operation_name = operation_name(operation);
tracing::error!("Response to Prepare query was not a Prepared, was instead: {operation_name}");
warnings.push(format!("Shotover: Response to Prepare query was not a Prepared, was instead: {operation_name}"));
None
}
Some(_) => unreachable!("Response to prepare was not cassandra message"),
None => {
tracing::error!("Response to Prepare query was not parseable");
warnings.push("Shotover: Response to Prepare query was not parseable".to_owned());
None
}
})
Expand All @@ -366,12 +374,15 @@ impl MessageRewriter {
output
});

tracing::error!(
"Nodes did not return the same response to PREPARE statement {err_str}"
tracing::warn!(
"Nodes did not return the same response to PREPARE statement"
);
tracing::trace!(
"Nodes did not return the same response to PREPARE statement:{err_str}"
);
warnings.push(format!(
"Shotover: Nodes did not return the same response to PREPARE statement {err_str}"
));
"Shotover: Nodes did not return the same response to PREPARE statement {err_str}"
));
}
}

Expand Down Expand Up @@ -544,10 +555,7 @@ impl MessageRewriter {
}
Ok(())
} else {
Err(anyhow!(
"Failed to parse system.local response {:?}",
peers_response
))
Err(anyhow!("Failed to parse system.local response"))
}
}

Expand Down Expand Up @@ -670,10 +678,7 @@ impl MessageRewriter {
}
Ok(())
} else {
Err(anyhow!(
"Failed to parse system.local response {:?}",
local_response
))
Err(anyhow!("Failed to parse system.local response"))
}
}
}
Expand Down Expand Up @@ -831,13 +836,13 @@ fn parse_system_nodes(mut response: Message) -> Result<Vec<NodeInfo>, MessagePar
"system.local returned error: {error:?}",
))),
operation => Err(MessageParseError::ParseFailure(anyhow!(
"system.local returned unexpected cassandra operation: {operation:?}",
"system.local returned unexpected cassandra operation: {:?}",
operation_name(operation)
))),
}
} else {
Err(MessageParseError::ParseFailure(anyhow!(
"Failed to parse system.local response {:?}",
response
"Failed to parse system.local response"
)))
}
}
Loading

0 comments on commit 83c8c8d

Please sign in to comment.