diff --git a/framework/pom.xml b/framework/pom.xml index ddf84506..89fc8512 100644 --- a/framework/pom.xml +++ b/framework/pom.xml @@ -4,7 +4,7 @@ com.github.dbmdz.flusswerk flusswerk - 6.0.2-SNAPSHOT + 7.0.0-SNAPSHOT 4.0.0 diff --git a/framework/src/main/java/com/github/dbmdz/flusswerk/framework/config/properties/RoutingProperties.java b/framework/src/main/java/com/github/dbmdz/flusswerk/framework/config/properties/RoutingProperties.java index ab5c8e20..044b3dfe 100644 --- a/framework/src/main/java/com/github/dbmdz/flusswerk/framework/config/properties/RoutingProperties.java +++ b/framework/src/main/java/com/github/dbmdz/flusswerk/framework/config/properties/RoutingProperties.java @@ -143,12 +143,19 @@ public List getIncoming() { } /** - * @return The queues to send to (optional). + * @return The queues to send to, organized by route (optional). */ public Map> getOutgoing() { return outgoing; } + /** + * @return A list of all the queues to send to. + */ + public List allOutgoing() { + return outgoing.values().stream().flatMap(List::stream).toList(); + } + public FailurePolicy getFailurePolicy(String queue) { return failurePolicies.get(queue); } diff --git a/framework/src/main/java/com/github/dbmdz/flusswerk/framework/rabbitmq/MessageBroker.java b/framework/src/main/java/com/github/dbmdz/flusswerk/framework/rabbitmq/MessageBroker.java index da36b492..6666ce0f 100644 --- a/framework/src/main/java/com/github/dbmdz/flusswerk/framework/rabbitmq/MessageBroker.java +++ b/framework/src/main/java/com/github/dbmdz/flusswerk/framework/rabbitmq/MessageBroker.java @@ -193,8 +193,7 @@ private void provideInputQueues() throws IOException { } private void provideOutputQueues() throws IOException { - for (String topic : - routingConfig.getOutgoing().values().stream().flatMap(List::stream).toList()) { + for (String topic : routingConfig.allOutgoing()) { rabbitClient.declareQueue( topic, routingConfig.getExchange(topic), diff --git a/framework/src/main/java/com/github/dbmdz/flusswerk/framework/rabbitmq/Route.java b/framework/src/main/java/com/github/dbmdz/flusswerk/framework/rabbitmq/Route.java index 85ba7631..ae21dfed 100644 --- a/framework/src/main/java/com/github/dbmdz/flusswerk/framework/rabbitmq/Route.java +++ b/framework/src/main/java/com/github/dbmdz/flusswerk/framework/rabbitmq/Route.java @@ -8,9 +8,9 @@ import java.util.Objects; /** Flusswerk-specific abstraction, collection of topics/queues. No equivalent in RabbitMQ. */ -public class Route { - private String name; - private List topics; +public class Route implements Sender { + private final String name; + private final List topics; public Route(String name) { this.name = name; @@ -27,14 +27,13 @@ public void addTopic(Topic topic) { } /** - * Sends a message to the topics on this route. The message will have a tracing id either based on - * the incoming message or newly generated for applications that do not work on incoming messages. - * In that case, every time you call this method it creates a new tracing path. + * Sends a message to the topics on this route. * * @param message The message to send. * @throws IOException If communication with RabbitMQ fails or if the message cannot be serialized * to JSON. */ + @Override public void send(Message message) throws IOException { for (Topic topic : topics) { topic.send(message); @@ -48,6 +47,7 @@ public void send(Message message) throws IOException { * @throws IOException If communication with RabbitMQ fails or if the message cannot be serialized * to JSON. */ + @Override public void send(Collection messages) throws IOException { for (Topic topic : topics) { topic.send(messages); @@ -61,6 +61,7 @@ public void send(Collection messages) throws IOException { * @throws IOException If communication with RabbitMQ fails or if the message cannot be serialized * to JSON. */ + @Override public void send(Message... messages) throws IOException { send(List.of(messages)); } @@ -72,6 +73,7 @@ public void send(Message... messages) throws IOException { * * @param message The message serialized to bytes */ + @Override public void sendRaw(byte[] message) { for (Topic topic : topics) { topic.sendRaw(message); diff --git a/framework/src/main/java/com/github/dbmdz/flusswerk/framework/rabbitmq/Sender.java b/framework/src/main/java/com/github/dbmdz/flusswerk/framework/rabbitmq/Sender.java new file mode 100644 index 00000000..c4fbb8b5 --- /dev/null +++ b/framework/src/main/java/com/github/dbmdz/flusswerk/framework/rabbitmq/Sender.java @@ -0,0 +1,16 @@ +package com.github.dbmdz.flusswerk.framework.rabbitmq; + +import com.github.dbmdz.flusswerk.framework.model.Message; +import java.io.IOException; +import java.util.Collection; + +/** High-level interface for sending messages to RabbitMQ. */ +public interface Sender { + void send(Message message) throws IOException; + + void send(Collection messages) throws IOException; + + void send(Message... messages) throws IOException; + + void sendRaw(byte[] message); +} diff --git a/framework/src/main/java/com/github/dbmdz/flusswerk/framework/rabbitmq/Topic.java b/framework/src/main/java/com/github/dbmdz/flusswerk/framework/rabbitmq/Topic.java index 0f369786..4dab2095 100644 --- a/framework/src/main/java/com/github/dbmdz/flusswerk/framework/rabbitmq/Topic.java +++ b/framework/src/main/java/com/github/dbmdz/flusswerk/framework/rabbitmq/Topic.java @@ -13,7 +13,7 @@ * Represents a AMQP/RabbitMQ topic to send messages to. In many setups this is equal to the * respective queue name. */ -public class Topic { +public class Topic implements Sender { private final String name; private final MessageBroker messageBroker; @@ -34,6 +34,7 @@ public class Topic { * @throws IOException If communication with RabbitMQ fails or if the message cannot be serialized * to JSON. */ + @Override public void send(Message message) throws IOException { // Only set a tracing path if there is none yet if (message.getTracing() == null || message.getTracing().isEmpty()) { @@ -49,6 +50,7 @@ public void send(Message message) throws IOException { * @throws IOException If communication with RabbitMQ fails or if the message cannot be serialized * to JSON. */ + @Override public void send(Collection messages) throws IOException { // Get a new tracing path in case one is needed final List tracingPath = getTracingPath(); @@ -65,6 +67,7 @@ public void send(Collection messages) throws IOException { * @throws IOException If communication with RabbitMQ fails or if the message cannot be serialized * to JSON. */ + @Override public void send(Message... messages) throws IOException { send(List.of(messages)); } @@ -76,6 +79,7 @@ public void send(Message... messages) throws IOException { * * @param message The message serialized to bytes */ + @Override public void sendRaw(byte[] message) { messageBroker.sendRaw(name, message); } diff --git a/integration-tests/pom.xml b/integration-tests/pom.xml index c84a5ea8..1d30ed39 100644 --- a/integration-tests/pom.xml +++ b/integration-tests/pom.xml @@ -3,7 +3,7 @@ com.github.dbmdz.flusswerk flusswerk - 6.0.2-SNAPSHOT + 7.0.0-SNAPSHOT 4.0.0 diff --git a/integration-tests/src/test/java/com/github/dbmdz/flusswerk/integration/RabbitUtil.java b/integration-tests/src/test/java/com/github/dbmdz/flusswerk/integration/RabbitUtil.java index f53ffaf8..1256f68a 100644 --- a/integration-tests/src/test/java/com/github/dbmdz/flusswerk/integration/RabbitUtil.java +++ b/integration-tests/src/test/java/com/github/dbmdz/flusswerk/integration/RabbitUtil.java @@ -98,8 +98,7 @@ public void purgeQueues() { public List allQueues() { Stream queueNames = routing.getIncoming().stream(); - queueNames = - Stream.concat(queueNames, routing.getOutgoing().values().stream().flatMap(List::stream)); + queueNames = Stream.concat(queueNames, routing.allOutgoing().stream()); queueNames = Stream.concat( queueNames, diff --git a/pom.xml b/pom.xml index 14c389c2..d1b50306 100644 --- a/pom.xml +++ b/pom.xml @@ -6,7 +6,7 @@ com.github.dbmdz.flusswerk flusswerk - 6.0.2-SNAPSHOT + 7.0.0-SNAPSHOT pom Flusswerk @@ -81,7 +81,7 @@ com.github.dbmdz.flusswerk framework - 6.0.2-SNAPSHOT + 7.0.0-SNAPSHOT net.logstash.logback