Skip to content

Commit

Permalink
Refactoring
Browse files Browse the repository at this point in the history
  • Loading branch information
schmika committed Jan 18, 2024
1 parent 2337994 commit 67886b9
Show file tree
Hide file tree
Showing 9 changed files with 43 additions and 16 deletions.
2 changes: 1 addition & 1 deletion framework/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
<parent>
<groupId>com.github.dbmdz.flusswerk</groupId>
<artifactId>flusswerk</artifactId>
<version>6.0.2-SNAPSHOT</version>
<version>7.0.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,12 +143,19 @@ public List<String> getIncoming() {
}

/**
* @return The queues to send to (optional).
* @return The queues to send to, organized by route (optional).
*/
public Map<String, List<String>> getOutgoing() {
return outgoing;
}

/**
* @return A list of all the queues to send to.
*/
public List<String> allOutgoing() {
return outgoing.values().stream().flatMap(List::stream).toList();
}

public FailurePolicy getFailurePolicy(String queue) {
return failurePolicies.get(queue);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,8 +193,7 @@ private void provideInputQueues() throws IOException {
}

private void provideOutputQueues() throws IOException {
for (String topic :
routingConfig.getOutgoing().values().stream().flatMap(List::stream).toList()) {
for (String topic : routingConfig.allOutgoing()) {
rabbitClient.declareQueue(
topic,
routingConfig.getExchange(topic),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@
import java.util.Objects;

/** Flusswerk-specific abstraction, collection of topics/queues. No equivalent in RabbitMQ. */
public class Route {
private String name;
private List<Topic> topics;
public class Route implements Sender {
private final String name;
private final List<Topic> topics;

public Route(String name) {
this.name = name;
Expand All @@ -27,14 +27,13 @@ public void addTopic(Topic topic) {
}

/**
* Sends a message to the topics on this route. The message will have a tracing id either based on
* the incoming message or newly generated for applications that do not work on incoming messages.
* In that case, every time you call this method it creates a new tracing path.
* Sends a message to the topics on this route.
*
* @param message The message to send.
* @throws IOException If communication with RabbitMQ fails or if the message cannot be serialized
* to JSON.
*/
@Override
public void send(Message message) throws IOException {
for (Topic topic : topics) {
topic.send(message);
Expand All @@ -48,6 +47,7 @@ public void send(Message message) throws IOException {
* @throws IOException If communication with RabbitMQ fails or if the message cannot be serialized
* to JSON.
*/
@Override
public void send(Collection<Message> messages) throws IOException {
for (Topic topic : topics) {
topic.send(messages);
Expand All @@ -61,6 +61,7 @@ public void send(Collection<Message> messages) throws IOException {
* @throws IOException If communication with RabbitMQ fails or if the message cannot be serialized
* to JSON.
*/
@Override
public void send(Message... messages) throws IOException {
send(List.of(messages));
}
Expand All @@ -72,6 +73,7 @@ public void send(Message... messages) throws IOException {
*
* @param message The message serialized to bytes
*/
@Override
public void sendRaw(byte[] message) {
for (Topic topic : topics) {
topic.sendRaw(message);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package com.github.dbmdz.flusswerk.framework.rabbitmq;

import com.github.dbmdz.flusswerk.framework.model.Message;
import java.io.IOException;
import java.util.Collection;

/** High-level interface for sending messages to RabbitMQ. */
public interface Sender {
void send(Message message) throws IOException;

void send(Collection<Message> messages) throws IOException;

void send(Message... messages) throws IOException;

void sendRaw(byte[] message);
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
* Represents a AMQP/RabbitMQ topic to send messages to. In many setups this is equal to the
* respective queue name.
*/
public class Topic {
public class Topic implements Sender {

private final String name;
private final MessageBroker messageBroker;
Expand All @@ -34,6 +34,7 @@ public class Topic {
* @throws IOException If communication with RabbitMQ fails or if the message cannot be serialized
* to JSON.
*/
@Override
public void send(Message message) throws IOException {
// Only set a tracing path if there is none yet
if (message.getTracing() == null || message.getTracing().isEmpty()) {
Expand All @@ -49,6 +50,7 @@ public void send(Message message) throws IOException {
* @throws IOException If communication with RabbitMQ fails or if the message cannot be serialized
* to JSON.
*/
@Override
public void send(Collection<Message> messages) throws IOException {
// Get a new tracing path in case one is needed
final List<String> tracingPath = getTracingPath();
Expand All @@ -65,6 +67,7 @@ public void send(Collection<Message> messages) throws IOException {
* @throws IOException If communication with RabbitMQ fails or if the message cannot be serialized
* to JSON.
*/
@Override
public void send(Message... messages) throws IOException {
send(List.of(messages));
}
Expand All @@ -76,6 +79,7 @@ public void send(Message... messages) throws IOException {
*
* @param message The message serialized to bytes
*/
@Override
public void sendRaw(byte[] message) {
messageBroker.sendRaw(name, message);
}
Expand Down
2 changes: 1 addition & 1 deletion integration-tests/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
<parent>
<groupId>com.github.dbmdz.flusswerk</groupId>
<artifactId>flusswerk</artifactId>
<version>6.0.2-SNAPSHOT</version>
<version>7.0.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,8 +98,7 @@ public void purgeQueues() {

public List<Queue> allQueues() {
Stream<String> queueNames = routing.getIncoming().stream();
queueNames =
Stream.concat(queueNames, routing.getOutgoing().values().stream().flatMap(List::stream));
queueNames = Stream.concat(queueNames, routing.allOutgoing().stream());
queueNames =
Stream.concat(
queueNames,
Expand Down
4 changes: 2 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

<groupId>com.github.dbmdz.flusswerk</groupId>
<artifactId>flusswerk</artifactId>
<version>6.0.2-SNAPSHOT</version>
<version>7.0.0-SNAPSHOT</version>
<packaging>pom</packaging>

<name>Flusswerk</name>
Expand Down Expand Up @@ -81,7 +81,7 @@
<dependency>
<groupId>com.github.dbmdz.flusswerk</groupId>
<artifactId>framework</artifactId>
<version>6.0.2-SNAPSHOT</version>
<version>7.0.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>net.logstash.logback</groupId>
Expand Down

0 comments on commit 67886b9

Please sign in to comment.