diff --git a/framework/src/main/java/com/github/dbmdz/flusswerk/framework/config/FlusswerkConfiguration.java b/framework/src/main/java/com/github/dbmdz/flusswerk/framework/config/FlusswerkConfiguration.java index 689425cd..ccccab85 100644 --- a/framework/src/main/java/com/github/dbmdz/flusswerk/framework/config/FlusswerkConfiguration.java +++ b/framework/src/main/java/com/github/dbmdz/flusswerk/framework/config/FlusswerkConfiguration.java @@ -99,8 +99,8 @@ public RabbitMQ rabbitMQ( } @Bean - public MessageBroker messageBroker(RoutingProperties routingProperties, RabbitClient rabbitClient) - throws IOException { + public MessageBroker messageBroker( + RoutingProperties routingProperties, RabbitClient rabbitClient) { return new MessageBroker(routingProperties, rabbitClient); } diff --git a/framework/src/main/java/com/github/dbmdz/flusswerk/framework/engine/Worker.java b/framework/src/main/java/com/github/dbmdz/flusswerk/framework/engine/Worker.java index b56dbf1c..80dcb4f7 100644 --- a/framework/src/main/java/com/github/dbmdz/flusswerk/framework/engine/Worker.java +++ b/framework/src/main/java/com/github/dbmdz/flusswerk/framework/engine/Worker.java @@ -10,7 +10,6 @@ import com.github.dbmdz.flusswerk.framework.rabbitmq.MessageBroker; import com.github.dbmdz.flusswerk.framework.reporting.ProcessReport; import com.github.dbmdz.flusswerk.framework.reporting.Tracing; -import java.io.IOException; import java.util.Collection; import java.util.concurrent.PriorityBlockingQueue; import java.util.concurrent.TimeUnit; @@ -125,49 +124,34 @@ public void process(Message message) { } private void retryOrFail(Message receivedMessage, RuntimeException e) { - try { - messageBroker.ack(receivedMessage); - boolean isRejected = messageBroker.reject(receivedMessage); - if (isRejected) { - processReport.reportRetry(receivedMessage, e); - } else { - processReport.reportFailAfterMaxRetries(receivedMessage, e); - } - } catch (IOException fatalException) { - var body = receivedMessage.getEnvelope().getBody(); - LOGGER.error("Could not reject message" + body, fatalException); + messageBroker.ack(receivedMessage); + boolean isRejected = messageBroker.reject(receivedMessage); + if (isRejected) { + processReport.reportRetry(receivedMessage, e); + } else { + processReport.reportFailAfterMaxRetries(receivedMessage, e); } } private void complexRetry(Message receivedMessage, RetryProcessingException e) { - try { - messageBroker.ack(receivedMessage); - for (Message retryMessage : e.getMessagesToRetry()) { - Envelope envelope = retryMessage.getEnvelope(); - envelope.setRetries(receivedMessage.getEnvelope().getRetries()); - envelope.setSource(receivedMessage.getEnvelope().getSource()); - tracing.ensureFor(retryMessage); - messageBroker.reject(retryMessage); - } - // Send the messages that should be sent anyway - tracing.ensureFor(e.getMessagesToSend()); - messageBroker.send(e.getMessagesToSend()); - - processReport.reportRetry(receivedMessage, e); - } catch (IOException fatalException) { - var body = receivedMessage.getEnvelope().getBody(); - LOGGER.error("Complex retry failed" + body, fatalException); + messageBroker.ack(receivedMessage); + for (Message retryMessage : e.getMessagesToRetry()) { + Envelope envelope = retryMessage.getEnvelope(); + envelope.setRetries(receivedMessage.getEnvelope().getRetries()); + envelope.setSource(receivedMessage.getEnvelope().getSource()); + tracing.ensureFor(retryMessage); + messageBroker.reject(retryMessage); } + // Send the messages that should be sent anyway + tracing.ensureFor(e.getMessagesToSend()); + messageBroker.send(e.getMessagesToSend()); + + processReport.reportRetry(receivedMessage, e); } private void fail(Message message, StopProcessingException e) { - try { - processReport.reportFail(message, e); - messageBroker.fail(message); - } catch (IOException fatalException) { - var body = message.getEnvelope().getBody(); - LOGGER.error("Could not fail message" + body, fatalException); - } + processReport.reportFail(message, e); + messageBroker.fail(message); } /** diff --git a/framework/src/main/java/com/github/dbmdz/flusswerk/framework/rabbitmq/MessageBroker.java b/framework/src/main/java/com/github/dbmdz/flusswerk/framework/rabbitmq/MessageBroker.java index 6666ce0f..d3a00201 100644 --- a/framework/src/main/java/com/github/dbmdz/flusswerk/framework/rabbitmq/MessageBroker.java +++ b/framework/src/main/java/com/github/dbmdz/flusswerk/framework/rabbitmq/MessageBroker.java @@ -4,7 +4,6 @@ import com.github.dbmdz.flusswerk.framework.exceptions.InvalidMessageException; import com.github.dbmdz.flusswerk.framework.model.Envelope; import com.github.dbmdz.flusswerk.framework.model.Message; -import java.io.IOException; import java.util.Collection; import java.util.HashMap; import java.util.List; @@ -27,7 +26,7 @@ public class MessageBroker { private final RabbitClient rabbitClient; - public MessageBroker(RoutingProperties routing, RabbitClient rabbitClient) throws IOException { + public MessageBroker(RoutingProperties routing, RabbitClient rabbitClient) { this.routingConfig = routing; this.rabbitClient = rabbitClient; @@ -40,11 +39,10 @@ public MessageBroker(RoutingProperties routing, RabbitClient rabbitClient) throw * Sends a message to the default output queue as JSON document. * * @param message the message to send. - * @throws IOException if sending the message fails. * @deprecated Use {@link Topic#send(Message)} instead */ @Deprecated - void send(Message message) throws IOException { + void send(Message message) { List topics = routingConfig.getOutgoing().get("default"); if (topics == null || topics.isEmpty()) { throw new RuntimeException("Cannot send message, no default queue specified"); @@ -58,11 +56,10 @@ void send(Message message) throws IOException { * Sends messages to the default output queue as JSON document. * * @param messages the message to send. - * @throws IOException if sending the message fails. * @deprecated Use {@link Topic#send(Message)} instead */ @Deprecated - public void send(Collection messages) throws IOException { + public void send(Collection messages) { List topics = routingConfig.getOutgoing().get("default"); if (topics == null || topics.isEmpty()) { throw new RuntimeException("Cannot send message, no default queue specified"); @@ -78,9 +75,8 @@ public void send(Collection messages) throws IOException { * @param routingKey the routing key for the queue to send the message to (usually the queue * name). * @param message the message to send. - * @throws IOException if sending the message fails. */ - void send(String routingKey, Message message) throws IOException { + void send(String routingKey, Message message) { rabbitClient.send(routingConfig.getExchange(routingKey), routingKey, message); } @@ -95,9 +91,8 @@ void sendRaw(String routingKey, byte[] message) { * @param routingKey the routing key for the queue to send the message to (usually the queue * name). * @param messages the messages to send. - * @throws IOException if sending a message fails. */ - void send(String routingKey, Collection messages) throws IOException { + void send(String routingKey, Collection messages) { for (Message message : messages) { send(routingKey, message); } @@ -164,7 +159,7 @@ private void failInvalidMessage(InvalidMessageException e) { } } - private void provideInputQueues() throws IOException { + private void provideInputQueues() { for (String inputQueue : routingConfig.getIncoming()) { FailurePolicy failurePolicy = routingConfig.getFailurePolicy(inputQueue); final String exchange = routingConfig.getExchange(inputQueue); @@ -192,7 +187,7 @@ private void provideInputQueues() throws IOException { } } - private void provideOutputQueues() throws IOException { + private void provideOutputQueues() { for (String topic : routingConfig.allOutgoing()) { rabbitClient.declareQueue( topic, @@ -217,9 +212,8 @@ public void ack(Message message) { * * @param message the message to reject * @return true if retry, false if failed - * @throws IOException if communication with RabbitMQ failed */ - public boolean reject(Message message) throws IOException { + public boolean reject(Message message) { final Envelope envelope = message.getEnvelope(); final long maxRetries = routingConfig.getFailurePolicy(message).getRetries(); if (envelope.getRetries() < maxRetries) { @@ -232,7 +226,7 @@ public boolean reject(Message message) throws IOException { } } - void fail(Message message, boolean ackMessage) throws IOException { + void fail(Message message, boolean ackMessage) { if (ackMessage) { ack(message); } @@ -244,11 +238,11 @@ void fail(Message message, boolean ackMessage) throws IOException { } } - public void fail(Message message) throws IOException { + public void fail(Message message) { fail(message, true); } - public void retry(Message message) throws IOException { + public void retry(Message message) { LOGGER.debug("Send message to retry queue: " + message); FailurePolicy failurePolicy = routingConfig.getFailurePolicy(message); String retryRoutingKey = failurePolicy.getRetryRoutingKey(); @@ -259,7 +253,7 @@ public void retry(Message message) throws IOException { } } - private void provideExchanges() throws IOException { + private void provideExchanges() { for (String exchange : routingConfig.getExchanges()) { rabbitClient.provideExchange(exchange); } @@ -272,11 +266,10 @@ private void provideExchanges() throws IOException { * Returns the number of messages in known queues * * @return a map of queue names and the number of messages in these queues - * @throws IOException if communication with RabbitMQ fails * @deprecated Use {@link Queue#messageCount()} instead. */ @Deprecated - Map getMessageCounts() throws IOException { + Map getMessageCounts() { Map result = new HashMap<>(); for (String queue : routingConfig.getIncoming()) { result.put(queue, rabbitClient.getMessageCount(queue)); @@ -284,7 +277,7 @@ Map getMessageCounts() throws IOException { return result; } - Map getFailedMessageCounts() throws IOException { + Map getFailedMessageCounts() { Map result = new HashMap<>(); for (String inputQueue : routingConfig.getIncoming()) { FailurePolicy failurePolicy = routingConfig.getFailurePolicy(inputQueue); @@ -296,7 +289,7 @@ Map getFailedMessageCounts() throws IOException { return result; } - public Map getRetryMessageCounts() throws IOException { + public Map getRetryMessageCounts() { Map result = new HashMap<>(); for (String inputQueue : routingConfig.getIncoming()) { FailurePolicy failurePolicy = routingConfig.getFailurePolicy(inputQueue); diff --git a/framework/src/main/java/com/github/dbmdz/flusswerk/framework/rabbitmq/RabbitClient.java b/framework/src/main/java/com/github/dbmdz/flusswerk/framework/rabbitmq/RabbitClient.java index f5091df0..94e8fe9b 100644 --- a/framework/src/main/java/com/github/dbmdz/flusswerk/framework/rabbitmq/RabbitClient.java +++ b/framework/src/main/java/com/github/dbmdz/flusswerk/framework/rabbitmq/RabbitClient.java @@ -82,7 +82,7 @@ public RabbitClient(IncomingMessageType incomingMessageType, RabbitConnection co this(new FlusswerkObjectMapper(incomingMessageType), connection); } - void send(String exchange, String routingKey, Message message) throws IOException { + void send(String exchange, String routingKey, Message message) { byte[] data = serialize(message); sendRaw(exchange, routingKey, data); } @@ -101,8 +101,12 @@ Message deserialize(String body) throws JsonProcessingException { return objectMapper.deserialize(body); } - byte[] serialize(Message message) throws IOException { - return objectMapper.writeValueAsBytes(message); + byte[] serialize(Message message) { + try { + return objectMapper.writeValueAsBytes(message); + } catch (JsonProcessingException e) { + throw new RuntimeException("Cannot serialize message", e); + } } public void ack(com.github.dbmdz.flusswerk.framework.model.Envelope envelope) { @@ -182,7 +186,7 @@ Channel getChannel() { } public AMQP.Queue.PurgeOk queuePurge(String name) { - return (AMQP.Queue.PurgeOk) execute(commands.queuePurge(name)); + return execute(commands.queuePurge(name)); } private T execute(ChannelCommand channelCommand) { diff --git a/framework/src/main/java/com/github/dbmdz/flusswerk/framework/rabbitmq/Route.java b/framework/src/main/java/com/github/dbmdz/flusswerk/framework/rabbitmq/Route.java index ae21dfed..aa251e7f 100644 --- a/framework/src/main/java/com/github/dbmdz/flusswerk/framework/rabbitmq/Route.java +++ b/framework/src/main/java/com/github/dbmdz/flusswerk/framework/rabbitmq/Route.java @@ -1,7 +1,6 @@ package com.github.dbmdz.flusswerk.framework.rabbitmq; import com.github.dbmdz.flusswerk.framework.model.Message; -import java.io.IOException; import java.util.ArrayList; import java.util.Collection; import java.util.List; @@ -30,11 +29,9 @@ public void addTopic(Topic topic) { * Sends a message to the topics on this route. * * @param message The message to send. - * @throws IOException If communication with RabbitMQ fails or if the message cannot be serialized - * to JSON. */ @Override - public void send(Message message) throws IOException { + public void send(Message message) { for (Topic topic : topics) { topic.send(message); } @@ -44,11 +41,9 @@ public void send(Message message) throws IOException { * Sends multiple messages to the topics on this route. * * @param messages The messages to send. - * @throws IOException If communication with RabbitMQ fails or if the message cannot be serialized - * to JSON. */ @Override - public void send(Collection messages) throws IOException { + public void send(Collection messages) { for (Topic topic : topics) { topic.send(messages); } @@ -58,11 +53,9 @@ public void send(Collection messages) throws IOException { * Convenience implementation, mostly for tests. * * @param messages The messages to send. - * @throws IOException If communication with RabbitMQ fails or if the message cannot be serialized - * to JSON. */ @Override - public void send(Message... messages) throws IOException { + public void send(Message... messages) { send(List.of(messages)); } diff --git a/framework/src/main/java/com/github/dbmdz/flusswerk/framework/rabbitmq/Sender.java b/framework/src/main/java/com/github/dbmdz/flusswerk/framework/rabbitmq/Sender.java index c4fbb8b5..dca9b589 100644 --- a/framework/src/main/java/com/github/dbmdz/flusswerk/framework/rabbitmq/Sender.java +++ b/framework/src/main/java/com/github/dbmdz/flusswerk/framework/rabbitmq/Sender.java @@ -1,16 +1,15 @@ package com.github.dbmdz.flusswerk.framework.rabbitmq; import com.github.dbmdz.flusswerk.framework.model.Message; -import java.io.IOException; import java.util.Collection; /** High-level interface for sending messages to RabbitMQ. */ public interface Sender { - void send(Message message) throws IOException; + void send(Message message); - void send(Collection messages) throws IOException; + void send(Collection messages); - void send(Message... messages) throws IOException; + void send(Message... messages); void sendRaw(byte[] message); } diff --git a/framework/src/main/java/com/github/dbmdz/flusswerk/framework/rabbitmq/Topic.java b/framework/src/main/java/com/github/dbmdz/flusswerk/framework/rabbitmq/Topic.java index 4dab2095..c80b54ef 100644 --- a/framework/src/main/java/com/github/dbmdz/flusswerk/framework/rabbitmq/Topic.java +++ b/framework/src/main/java/com/github/dbmdz/flusswerk/framework/rabbitmq/Topic.java @@ -4,7 +4,6 @@ import com.github.dbmdz.flusswerk.framework.model.Message; import com.github.dbmdz.flusswerk.framework.reporting.Tracing; -import java.io.IOException; import java.util.Collection; import java.util.List; import java.util.Objects; @@ -31,11 +30,9 @@ public class Topic implements Sender { * case, every time you call this method it creates a new tracing path. * * @param message The message to send. - * @throws IOException If communication with RabbitMQ fails or if the message cannot be serialized - * to JSON. */ @Override - public void send(Message message) throws IOException { + public void send(Message message) { // Only set a tracing path if there is none yet if (message.getTracing() == null || message.getTracing().isEmpty()) { message.setTracing(getTracingPath()); @@ -47,11 +44,9 @@ public void send(Message message) throws IOException { * Sends multiple messages to this topic. * * @param messages The messages to send. - * @throws IOException If communication with RabbitMQ fails or if the message cannot be serialized - * to JSON. */ @Override - public void send(Collection messages) throws IOException { + public void send(Collection messages) { // Get a new tracing path in case one is needed final List tracingPath = getTracingPath(); messages.stream() @@ -64,11 +59,9 @@ public void send(Collection messages) throws IOException { * Convenience implementation, mostly for tests. * * @param messages The messages to send. - * @throws IOException If communication with RabbitMQ fails or if the message cannot be serialized - * to JSON. */ @Override - public void send(Message... messages) throws IOException { + public void send(Message... messages) { send(List.of(messages)); } diff --git a/framework/src/test/java/com/github/dbmdz/flusswerk/framework/engine/WorkerTest.java b/framework/src/test/java/com/github/dbmdz/flusswerk/framework/engine/WorkerTest.java index 3f3e2ab3..1cbe5ead 100644 --- a/framework/src/test/java/com/github/dbmdz/flusswerk/framework/engine/WorkerTest.java +++ b/framework/src/test/java/com/github/dbmdz/flusswerk/framework/engine/WorkerTest.java @@ -14,7 +14,6 @@ import com.github.dbmdz.flusswerk.framework.rabbitmq.MessageBroker; import com.github.dbmdz.flusswerk.framework.reporting.ProcessReport; import com.github.dbmdz.flusswerk.framework.reporting.Tracing; -import java.io.IOException; import java.util.Collection; import java.util.List; import java.util.concurrent.PriorityBlockingQueue; @@ -74,7 +73,7 @@ void registersTracingInformation() { @DisplayName("should fail message on StopProcessingException") @Test - void shouldFailMessageOnStopProcessingException() throws IOException { + void shouldFailMessageOnStopProcessingException() { when(flow.process(message)).thenThrow(new StopProcessingException("Intentional Exception")); worker.process(message); verify(messageBroker).fail(message); @@ -83,7 +82,7 @@ void shouldFailMessageOnStopProcessingException() throws IOException { @DisplayName("should retry message on RetryProcessingException") @ParameterizedTest @MethodSource("retryableExceptions") - void shouldRetryMessageOnRetryProcessingException(RuntimeException exception) throws IOException { + void shouldRetryMessageOnRetryProcessingException(RuntimeException exception) { when(flow.process(message)).thenThrow(exception); worker.process(message); verify(messageBroker).reject(message); @@ -116,7 +115,7 @@ void shouldLogFailure() { @DisplayName("should log retry on exception") @Test - void shouldLogRetry() throws IOException { + void shouldLogRetry() { Exception exception = new RetryProcessingException("intentional"); when(flow.process(message)).thenThrow(exception); when(messageBroker.reject(message)).thenReturn(true); // retry @@ -136,7 +135,7 @@ void shouldLogFailureAfterTooManyRetries() { @DisplayName("should send messages") @Test - void shouldSendMessages() throws IOException { + void shouldSendMessages() { when(flow.process(message)).thenReturn(List.of(message)); worker.process(message); verify(messageBroker).send(List.of(message)); @@ -144,9 +143,9 @@ void shouldSendMessages() throws IOException { @DisplayName("should fail processing when sending messages fails") @Test - void shouldFailProcessingWhenSendingMessagesFails() throws IOException { + void shouldFailProcessingWhenSendingMessagesFails() { when(flow.process(message)).thenReturn(List.of(message)); - doThrow(IOException.class).when(messageBroker).send(any()); + doThrow(RuntimeException.class).when(messageBroker).send(any()); worker.process(message); verify(processReport).reportFail(any(), any()); } @@ -174,7 +173,7 @@ void shouldRegisterActiveWorkers() { @DisplayName("should add tracing information to messages after skipping") @Test - void shouldAddTracingAfterSkipping() throws IOException { + void shouldAddTracingAfterSkipping() { List tracingPath = List.of("abcde", "1234567"); Message incomingMessage = new Message(); incomingMessage.setTracing(tracingPath); @@ -210,7 +209,7 @@ private Message unwrapOne(Collection actual) { @DisplayName("should perform complex retry sending messages") @Test - void shouldPerformComplexRetrySendingMessages() throws IOException { + void shouldPerformComplexRetrySendingMessages() { Message incomingMessage = new TestMessage("incoming"); Message outgoingMessage = new TestMessage("outgoing"); when(flow.process(incomingMessage)) @@ -221,7 +220,7 @@ void shouldPerformComplexRetrySendingMessages() throws IOException { @DisplayName("should perform complex retry with new messages") @Test - void shouldPerformComplexRetryWithNewMessages() throws IOException { + void shouldPerformComplexRetryWithNewMessages() { Message incomingMessage = new TestMessage("incoming"); List messagesToRetry = List.of(new TestMessage("retry1"), new TestMessage("retry2")); when(flow.process(incomingMessage)) diff --git a/framework/src/test/java/com/github/dbmdz/flusswerk/framework/rabbitmq/MessageBrokerTest.java b/framework/src/test/java/com/github/dbmdz/flusswerk/framework/rabbitmq/MessageBrokerTest.java index 9cf35b9e..24357b79 100644 --- a/framework/src/test/java/com/github/dbmdz/flusswerk/framework/rabbitmq/MessageBrokerTest.java +++ b/framework/src/test/java/com/github/dbmdz/flusswerk/framework/rabbitmq/MessageBrokerTest.java @@ -13,7 +13,6 @@ import com.github.dbmdz.flusswerk.framework.exceptions.InvalidMessageException; import com.github.dbmdz.flusswerk.framework.model.Envelope; import com.github.dbmdz.flusswerk.framework.model.Message; -import java.io.IOException; import java.util.Arrays; import java.util.HashMap; import java.util.List; @@ -36,7 +35,7 @@ class MessageBrokerTest { private FailurePolicy failurePolicy; @BeforeEach - void setUp() throws IOException { + void setUp() { failurePolicy = new FailurePolicy("some.input.queue"); routing = RoutingProperties.minimal( @@ -51,7 +50,7 @@ void setUp() throws IOException { @Test @DisplayName("Send should use the specified routing key") - void sendShouldUseSpecifiedRoutingKey() throws IOException { + void sendShouldUseSpecifiedRoutingKey() { messageBroker.send("there", message); verify(rabbitClient).send(anyString(), eq("there"), eq(message)); } @@ -65,7 +64,7 @@ void ack() { @Test @DisplayName("Reject should count rejections") - void rejectShouldCountRejections() throws IOException { + void rejectShouldCountRejections() { int numberOfRejections = 3; for (int i = 0; i < numberOfRejections; i++) { messageBroker.reject(message); @@ -75,7 +74,7 @@ void rejectShouldCountRejections() throws IOException { @Test @DisplayName("Should route a message to the failed queue if it has been rejected to often") - void rejectShouldRouteToFailedQueueIfMessageIsRejectedTooOften() throws IOException { + void rejectShouldRouteToFailedQueueIfMessageIsRejectedTooOften() { int numberOfRejections = failurePolicy.getRetries() + 1; for (int i = 0; i < numberOfRejections; i++) { messageBroker.reject(message); @@ -86,14 +85,14 @@ void rejectShouldRouteToFailedQueueIfMessageIsRejectedTooOften() throws IOExcept @Test @DisplayName("Should send a message to the output queue") - void sendShouldRouteMessageToOutputQueue() throws IOException { + void sendShouldRouteMessageToOutputQueue() { messageBroker.send(new Message()); verify(rabbitClient).send(any(), eq(routing.getOutgoing().get("default").get(0)), any()); } @Test @DisplayName("Should send multiple messages to the specified queue") - void sendMultipleMessagesShouldRouteMessagesToSpecifiedQueue() throws IOException { + void sendMultipleMessagesShouldRouteMessagesToSpecifiedQueue() { String queue = "specified.queue"; List messages = Arrays.asList(new Message(), new Message(), new Message()); messageBroker.send(queue, messages); @@ -117,7 +116,7 @@ void defaultReceiveShouldPullTheInputQueue() throws InvalidMessageException { @Test @DisplayName("getMessageCount should return all message counts") - void getMessageCountsShouldGetAllMessageCounts() throws IOException { + void getMessageCountsShouldGetAllMessageCounts() { RoutingProperties routing = RoutingProperties.minimal(List.of("input1", "input2"), null); Map expected = new HashMap<>(); diff --git a/framework/src/test/java/com/github/dbmdz/flusswerk/framework/rabbitmq/QueueTest.java b/framework/src/test/java/com/github/dbmdz/flusswerk/framework/rabbitmq/QueueTest.java index 1af396f4..8b54c80e 100644 --- a/framework/src/test/java/com/github/dbmdz/flusswerk/framework/rabbitmq/QueueTest.java +++ b/framework/src/test/java/com/github/dbmdz/flusswerk/framework/rabbitmq/QueueTest.java @@ -7,7 +7,6 @@ import com.github.dbmdz.flusswerk.framework.exceptions.InvalidMessageException; import com.github.dbmdz.flusswerk.framework.model.Message; import com.rabbitmq.client.AMQP.Queue.PurgeOk; -import java.io.IOException; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.DisplayName; import org.junit.jupiter.api.Test; @@ -34,7 +33,7 @@ void purge() { } @Test - void messageCount() throws IOException { + void messageCount() { var expected = 123123L; when(rabbitClient.getMessageCount(queue.getName())).thenReturn(expected); diff --git a/framework/src/test/java/com/github/dbmdz/flusswerk/framework/rabbitmq/RabbitClientTest.java b/framework/src/test/java/com/github/dbmdz/flusswerk/framework/rabbitmq/RabbitClientTest.java index fb24e532..90da5cf2 100644 --- a/framework/src/test/java/com/github/dbmdz/flusswerk/framework/rabbitmq/RabbitClientTest.java +++ b/framework/src/test/java/com/github/dbmdz/flusswerk/framework/rabbitmq/RabbitClientTest.java @@ -132,8 +132,7 @@ private Message createMessage(int retries, Instant created) { return messageToReceive; } - private GetResponse createResponse(long deliveryTag, Message message, RabbitClient rabbitClient) - throws IOException { + private GetResponse createResponse(long deliveryTag, Message message, RabbitClient rabbitClient) { com.rabbitmq.client.Envelope envelope = new com.rabbitmq.client.Envelope(deliveryTag, true, "workflow", "some.input.queue"); BasicProperties basicProperties = new BasicProperties.Builder().build(); diff --git a/framework/src/test/java/com/github/dbmdz/flusswerk/framework/rabbitmq/RouteTest.java b/framework/src/test/java/com/github/dbmdz/flusswerk/framework/rabbitmq/RouteTest.java index 4e577292..a86bf440 100644 --- a/framework/src/test/java/com/github/dbmdz/flusswerk/framework/rabbitmq/RouteTest.java +++ b/framework/src/test/java/com/github/dbmdz/flusswerk/framework/rabbitmq/RouteTest.java @@ -7,7 +7,6 @@ import com.github.dbmdz.flusswerk.framework.TestMessage; import com.github.dbmdz.flusswerk.framework.model.Message; -import java.io.IOException; import java.util.List; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.DisplayName; @@ -26,7 +25,7 @@ void setUp() { @DisplayName("should send a single message") @Test - void shouldSendOneMessage() throws IOException { + void shouldSendOneMessage() { var message = new TestMessage("123"); route.send(message); for (Topic topic : topics) { @@ -36,7 +35,7 @@ void shouldSendOneMessage() throws IOException { @DisplayName("should send all messages") @Test - void shouldSendManyMessages() throws IOException { + void shouldSendManyMessages() { List messages = List.of(new TestMessage("1"), new TestMessage("2")); route.send(messages); for (Topic topic : topics) { diff --git a/framework/src/test/java/com/github/dbmdz/flusswerk/framework/rabbitmq/TopicTest.java b/framework/src/test/java/com/github/dbmdz/flusswerk/framework/rabbitmq/TopicTest.java index 46ef8e78..1896d91a 100644 --- a/framework/src/test/java/com/github/dbmdz/flusswerk/framework/rabbitmq/TopicTest.java +++ b/framework/src/test/java/com/github/dbmdz/flusswerk/framework/rabbitmq/TopicTest.java @@ -9,7 +9,6 @@ import com.github.dbmdz.flusswerk.framework.TestMessage; import com.github.dbmdz.flusswerk.framework.model.Message; import com.github.dbmdz.flusswerk.framework.reporting.Tracing; -import java.io.IOException; import java.util.List; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.DisplayName; @@ -31,7 +30,7 @@ void setUp() { @DisplayName("should send a single message") @Test - void shouldSendOneMessage() throws IOException { + void shouldSendOneMessage() { var message = new TestMessage("123"); topic.send(message); verify(messageBroker).send(any(), eq(message)); @@ -39,7 +38,7 @@ void shouldSendOneMessage() throws IOException { @DisplayName("should send all messages") @Test - void shouldSendManyMessages() throws IOException { + void shouldSendManyMessages() { List messages = List.of(new TestMessage("1"), new TestMessage("2")); topic.send(messages); verify(messageBroker).send(any(), eq(messages)); diff --git a/integration-tests/src/test/java/com/github/dbmdz/flusswerk/integration/NoFlowTest.java b/integration-tests/src/test/java/com/github/dbmdz/flusswerk/integration/NoFlowTest.java index 1b8be4f5..00da87d0 100644 --- a/integration-tests/src/test/java/com/github/dbmdz/flusswerk/integration/NoFlowTest.java +++ b/integration-tests/src/test/java/com/github/dbmdz/flusswerk/integration/NoFlowTest.java @@ -88,7 +88,7 @@ void stopEngine() { @DisplayName("then it still should send a message to a route") @Test - public void shouldSendMessageToRoute() throws IOException { + public void shouldSendMessageToRoute() { Message message = new TestMessage("123"); rabbitMQ.topic(rabbitUtil.output()).send(message); Message received = rabbitUtil.receive(); diff --git a/integration-tests/src/test/java/com/github/dbmdz/flusswerk/integration/RabbitUtil.java b/integration-tests/src/test/java/com/github/dbmdz/flusswerk/integration/RabbitUtil.java index 1256f68a..1167dbcb 100644 --- a/integration-tests/src/test/java/com/github/dbmdz/flusswerk/integration/RabbitUtil.java +++ b/integration-tests/src/test/java/com/github/dbmdz/flusswerk/integration/RabbitUtil.java @@ -6,7 +6,6 @@ import com.github.dbmdz.flusswerk.framework.rabbitmq.FailurePolicy; import com.github.dbmdz.flusswerk.framework.rabbitmq.Queue; import com.github.dbmdz.flusswerk.framework.rabbitmq.RabbitMQ; -import java.io.IOException; import java.util.List; import java.util.Optional; import java.util.stream.Collectors; @@ -35,7 +34,7 @@ public String output() { return routing.getOutgoing().get("default").get(0); } - public void send(Message message) throws IOException { + public void send(Message message) { rabbitMQ.topic(firstInput()).send(message); } diff --git a/integration-tests/src/test/java/com/github/dbmdz/flusswerk/integration/RabbitUtilAssert.java b/integration-tests/src/test/java/com/github/dbmdz/flusswerk/integration/RabbitUtilAssert.java index b1ef302d..3572f9a5 100644 --- a/integration-tests/src/test/java/com/github/dbmdz/flusswerk/integration/RabbitUtilAssert.java +++ b/integration-tests/src/test/java/com/github/dbmdz/flusswerk/integration/RabbitUtilAssert.java @@ -1,7 +1,6 @@ package com.github.dbmdz.flusswerk.integration; import com.github.dbmdz.flusswerk.framework.rabbitmq.Queue; -import java.io.IOException; import org.assertj.core.api.AbstractAssert; import org.assertj.core.api.Assertions; @@ -21,18 +20,14 @@ public RabbitUtilAssert allQueuesAreEmpty() { .allQueues() .forEach( queue -> { - try { - Assertions.assertThat(getMessageCount(queue)) - .as("Queue " + queue.getName() + " is not empty") - .isZero(); - } catch (IOException e) { - throw new RuntimeException(e); - } + Assertions.assertThat(getMessageCount(queue)) + .as("Queue " + queue.getName() + " is not empty") + .isZero(); }); return this; } - private static long getMessageCount(Queue queue) throws IOException { + private static long getMessageCount(Queue queue) { return queue.messageCount(); } }