Skip to content

Commit

Permalink
Handle errors during message serialization
Browse files Browse the repository at this point in the history
  • Loading branch information
schmika committed Jan 23, 2024
1 parent 67886b9 commit b5fd3b6
Show file tree
Hide file tree
Showing 16 changed files with 82 additions and 128 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -99,8 +99,8 @@ public RabbitMQ rabbitMQ(
}

@Bean
public MessageBroker messageBroker(RoutingProperties routingProperties, RabbitClient rabbitClient)
throws IOException {
public MessageBroker messageBroker(
RoutingProperties routingProperties, RabbitClient rabbitClient) {
return new MessageBroker(routingProperties, rabbitClient);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
import com.github.dbmdz.flusswerk.framework.rabbitmq.MessageBroker;
import com.github.dbmdz.flusswerk.framework.reporting.ProcessReport;
import com.github.dbmdz.flusswerk.framework.reporting.Tracing;
import java.io.IOException;
import java.util.Collection;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -125,49 +124,34 @@ public void process(Message message) {
}

private void retryOrFail(Message receivedMessage, RuntimeException e) {
try {
messageBroker.ack(receivedMessage);
boolean isRejected = messageBroker.reject(receivedMessage);
if (isRejected) {
processReport.reportRetry(receivedMessage, e);
} else {
processReport.reportFailAfterMaxRetries(receivedMessage, e);
}
} catch (IOException fatalException) {
var body = receivedMessage.getEnvelope().getBody();
LOGGER.error("Could not reject message" + body, fatalException);
messageBroker.ack(receivedMessage);
boolean isRejected = messageBroker.reject(receivedMessage);
if (isRejected) {
processReport.reportRetry(receivedMessage, e);
} else {
processReport.reportFailAfterMaxRetries(receivedMessage, e);
}
}

private void complexRetry(Message receivedMessage, RetryProcessingException e) {
try {
messageBroker.ack(receivedMessage);
for (Message retryMessage : e.getMessagesToRetry()) {
Envelope envelope = retryMessage.getEnvelope();
envelope.setRetries(receivedMessage.getEnvelope().getRetries());
envelope.setSource(receivedMessage.getEnvelope().getSource());
tracing.ensureFor(retryMessage);
messageBroker.reject(retryMessage);
}
// Send the messages that should be sent anyway
tracing.ensureFor(e.getMessagesToSend());
messageBroker.send(e.getMessagesToSend());

processReport.reportRetry(receivedMessage, e);
} catch (IOException fatalException) {
var body = receivedMessage.getEnvelope().getBody();
LOGGER.error("Complex retry failed" + body, fatalException);
messageBroker.ack(receivedMessage);
for (Message retryMessage : e.getMessagesToRetry()) {
Envelope envelope = retryMessage.getEnvelope();
envelope.setRetries(receivedMessage.getEnvelope().getRetries());
envelope.setSource(receivedMessage.getEnvelope().getSource());
tracing.ensureFor(retryMessage);
messageBroker.reject(retryMessage);
}
// Send the messages that should be sent anyway
tracing.ensureFor(e.getMessagesToSend());
messageBroker.send(e.getMessagesToSend());

processReport.reportRetry(receivedMessage, e);
}

private void fail(Message message, StopProcessingException e) {
try {
processReport.reportFail(message, e);
messageBroker.fail(message);
} catch (IOException fatalException) {
var body = message.getEnvelope().getBody();
LOGGER.error("Could not fail message" + body, fatalException);
}
processReport.reportFail(message, e);
messageBroker.fail(message);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
import com.github.dbmdz.flusswerk.framework.exceptions.InvalidMessageException;
import com.github.dbmdz.flusswerk.framework.model.Envelope;
import com.github.dbmdz.flusswerk.framework.model.Message;
import java.io.IOException;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
Expand All @@ -27,7 +26,7 @@ public class MessageBroker {

private final RabbitClient rabbitClient;

public MessageBroker(RoutingProperties routing, RabbitClient rabbitClient) throws IOException {
public MessageBroker(RoutingProperties routing, RabbitClient rabbitClient) {
this.routingConfig = routing;
this.rabbitClient = rabbitClient;

Expand All @@ -40,11 +39,10 @@ public MessageBroker(RoutingProperties routing, RabbitClient rabbitClient) throw
* Sends a message to the default output queue as JSON document.
*
* @param message the message to send.
* @throws IOException if sending the message fails.
* @deprecated Use {@link Topic#send(Message)} instead
*/
@Deprecated
void send(Message message) throws IOException {
void send(Message message) {
List<String> topics = routingConfig.getOutgoing().get("default");
if (topics == null || topics.isEmpty()) {
throw new RuntimeException("Cannot send message, no default queue specified");
Expand All @@ -58,11 +56,10 @@ void send(Message message) throws IOException {
* Sends messages to the default output queue as JSON document.
*
* @param messages the message to send.
* @throws IOException if sending the message fails.
* @deprecated Use {@link Topic#send(Message)} instead
*/
@Deprecated
public void send(Collection<? extends Message> messages) throws IOException {
public void send(Collection<? extends Message> messages) {
List<String> topics = routingConfig.getOutgoing().get("default");
if (topics == null || topics.isEmpty()) {
throw new RuntimeException("Cannot send message, no default queue specified");
Expand All @@ -78,9 +75,8 @@ public void send(Collection<? extends Message> messages) throws IOException {
* @param routingKey the routing key for the queue to send the message to (usually the queue
* name).
* @param message the message to send.
* @throws IOException if sending the message fails.
*/
void send(String routingKey, Message message) throws IOException {
void send(String routingKey, Message message) {
rabbitClient.send(routingConfig.getExchange(routingKey), routingKey, message);
}

Expand All @@ -95,9 +91,8 @@ void sendRaw(String routingKey, byte[] message) {
* @param routingKey the routing key for the queue to send the message to (usually the queue
* name).
* @param messages the messages to send.
* @throws IOException if sending a message fails.
*/
void send(String routingKey, Collection<? extends Message> messages) throws IOException {
void send(String routingKey, Collection<? extends Message> messages) {
for (Message message : messages) {
send(routingKey, message);
}
Expand Down Expand Up @@ -164,7 +159,7 @@ private void failInvalidMessage(InvalidMessageException e) {
}
}

private void provideInputQueues() throws IOException {
private void provideInputQueues() {
for (String inputQueue : routingConfig.getIncoming()) {
FailurePolicy failurePolicy = routingConfig.getFailurePolicy(inputQueue);
final String exchange = routingConfig.getExchange(inputQueue);
Expand Down Expand Up @@ -192,7 +187,7 @@ private void provideInputQueues() throws IOException {
}
}

private void provideOutputQueues() throws IOException {
private void provideOutputQueues() {
for (String topic : routingConfig.allOutgoing()) {
rabbitClient.declareQueue(
topic,
Expand All @@ -217,9 +212,8 @@ public void ack(Message message) {
*
* @param message the message to reject
* @return true if retry, false if failed
* @throws IOException if communication with RabbitMQ failed
*/
public boolean reject(Message message) throws IOException {
public boolean reject(Message message) {
final Envelope envelope = message.getEnvelope();
final long maxRetries = routingConfig.getFailurePolicy(message).getRetries();
if (envelope.getRetries() < maxRetries) {
Expand All @@ -232,7 +226,7 @@ public boolean reject(Message message) throws IOException {
}
}

void fail(Message message, boolean ackMessage) throws IOException {
void fail(Message message, boolean ackMessage) {
if (ackMessage) {
ack(message);
}
Expand All @@ -244,11 +238,11 @@ void fail(Message message, boolean ackMessage) throws IOException {
}
}

public void fail(Message message) throws IOException {
public void fail(Message message) {
fail(message, true);
}

public void retry(Message message) throws IOException {
public void retry(Message message) {
LOGGER.debug("Send message to retry queue: " + message);
FailurePolicy failurePolicy = routingConfig.getFailurePolicy(message);
String retryRoutingKey = failurePolicy.getRetryRoutingKey();
Expand All @@ -259,7 +253,7 @@ public void retry(Message message) throws IOException {
}
}

private void provideExchanges() throws IOException {
private void provideExchanges() {
for (String exchange : routingConfig.getExchanges()) {
rabbitClient.provideExchange(exchange);
}
Expand All @@ -272,19 +266,18 @@ private void provideExchanges() throws IOException {
* Returns the number of messages in known queues
*
* @return a map of queue names and the number of messages in these queues
* @throws IOException if communication with RabbitMQ fails
* @deprecated Use {@link Queue#messageCount()} instead.
*/
@Deprecated
Map<String, Long> getMessageCounts() throws IOException {
Map<String, Long> getMessageCounts() {
Map<String, Long> result = new HashMap<>();
for (String queue : routingConfig.getIncoming()) {
result.put(queue, rabbitClient.getMessageCount(queue));
}
return result;
}

Map<String, Long> getFailedMessageCounts() throws IOException {
Map<String, Long> getFailedMessageCounts() {
Map<String, Long> result = new HashMap<>();
for (String inputQueue : routingConfig.getIncoming()) {
FailurePolicy failurePolicy = routingConfig.getFailurePolicy(inputQueue);
Expand All @@ -296,7 +289,7 @@ Map<String, Long> getFailedMessageCounts() throws IOException {
return result;
}

public Map<String, Long> getRetryMessageCounts() throws IOException {
public Map<String, Long> getRetryMessageCounts() {
Map<String, Long> result = new HashMap<>();
for (String inputQueue : routingConfig.getIncoming()) {
FailurePolicy failurePolicy = routingConfig.getFailurePolicy(inputQueue);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ public RabbitClient(IncomingMessageType incomingMessageType, RabbitConnection co
this(new FlusswerkObjectMapper(incomingMessageType), connection);
}

void send(String exchange, String routingKey, Message message) throws IOException {
void send(String exchange, String routingKey, Message message) {
byte[] data = serialize(message);
sendRaw(exchange, routingKey, data);
}
Expand All @@ -101,8 +101,12 @@ Message deserialize(String body) throws JsonProcessingException {
return objectMapper.deserialize(body);
}

byte[] serialize(Message message) throws IOException {
return objectMapper.writeValueAsBytes(message);
byte[] serialize(Message message) {
try {
return objectMapper.writeValueAsBytes(message);
} catch (JsonProcessingException e) {
throw new RuntimeException("Cannot serialize message", e);
}
}

public void ack(com.github.dbmdz.flusswerk.framework.model.Envelope envelope) {
Expand Down Expand Up @@ -182,7 +186,7 @@ Channel getChannel() {
}

public AMQP.Queue.PurgeOk queuePurge(String name) {
return (AMQP.Queue.PurgeOk) execute(commands.queuePurge(name));
return execute(commands.queuePurge(name));
}

private <T> T execute(ChannelCommand<T> channelCommand) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package com.github.dbmdz.flusswerk.framework.rabbitmq;

import com.github.dbmdz.flusswerk.framework.model.Message;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
Expand Down Expand Up @@ -30,11 +29,9 @@ public void addTopic(Topic topic) {
* Sends a message to the topics on this route.
*
* @param message The message to send.
* @throws IOException If communication with RabbitMQ fails or if the message cannot be serialized
* to JSON.
*/
@Override
public void send(Message message) throws IOException {
public void send(Message message) {
for (Topic topic : topics) {
topic.send(message);
}
Expand All @@ -44,11 +41,9 @@ public void send(Message message) throws IOException {
* Sends multiple messages to the topics on this route.
*
* @param messages The messages to send.
* @throws IOException If communication with RabbitMQ fails or if the message cannot be serialized
* to JSON.
*/
@Override
public void send(Collection<Message> messages) throws IOException {
public void send(Collection<Message> messages) {
for (Topic topic : topics) {
topic.send(messages);
}
Expand All @@ -58,11 +53,9 @@ public void send(Collection<Message> messages) throws IOException {
* Convenience implementation, mostly for tests.
*
* @param messages The messages to send.
* @throws IOException If communication with RabbitMQ fails or if the message cannot be serialized
* to JSON.
*/
@Override
public void send(Message... messages) throws IOException {
public void send(Message... messages) {
send(List.of(messages));
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,16 +1,15 @@
package com.github.dbmdz.flusswerk.framework.rabbitmq;

import com.github.dbmdz.flusswerk.framework.model.Message;
import java.io.IOException;
import java.util.Collection;

/** High-level interface for sending messages to RabbitMQ. */
public interface Sender {
void send(Message message) throws IOException;
void send(Message message);

void send(Collection<Message> messages) throws IOException;
void send(Collection<Message> messages);

void send(Message... messages) throws IOException;
void send(Message... messages);

void sendRaw(byte[] message);
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@

import com.github.dbmdz.flusswerk.framework.model.Message;
import com.github.dbmdz.flusswerk.framework.reporting.Tracing;
import java.io.IOException;
import java.util.Collection;
import java.util.List;
import java.util.Objects;
Expand All @@ -31,11 +30,9 @@ public class Topic implements Sender {
* case, every time you call this method it creates a new tracing path.
*
* @param message The message to send.
* @throws IOException If communication with RabbitMQ fails or if the message cannot be serialized
* to JSON.
*/
@Override
public void send(Message message) throws IOException {
public void send(Message message) {
// Only set a tracing path if there is none yet
if (message.getTracing() == null || message.getTracing().isEmpty()) {
message.setTracing(getTracingPath());
Expand All @@ -47,11 +44,9 @@ public void send(Message message) throws IOException {
* Sends multiple messages to this topic.
*
* @param messages The messages to send.
* @throws IOException If communication with RabbitMQ fails or if the message cannot be serialized
* to JSON.
*/
@Override
public void send(Collection<Message> messages) throws IOException {
public void send(Collection<Message> messages) {
// Get a new tracing path in case one is needed
final List<String> tracingPath = getTracingPath();
messages.stream()
Expand All @@ -64,11 +59,9 @@ public void send(Collection<Message> messages) throws IOException {
* Convenience implementation, mostly for tests.
*
* @param messages The messages to send.
* @throws IOException If communication with RabbitMQ fails or if the message cannot be serialized
* to JSON.
*/
@Override
public void send(Message... messages) throws IOException {
public void send(Message... messages) {
send(List.of(messages));
}

Expand Down
Loading

0 comments on commit b5fd3b6

Please sign in to comment.