From 5c79d6194681b2166fd5c6f664db8faa5f03e5c8 Mon Sep 17 00:00:00 2001 From: Katharina Schmid Date: Thu, 14 Dec 2023 10:16:22 +0100 Subject: [PATCH] Refactor code --- .../flusswerk/framework/engine/Engine.java | 3 +- .../framework/rabbitmq/ChannelCommand.java | 7 + .../framework/rabbitmq/ChannelCommands.java | 81 +++++++ .../{engine => rabbitmq}/ChannelListener.java | 2 +- .../flusswerk/framework/rabbitmq/Queue.java | 3 +- .../framework/rabbitmq/RabbitClient.java | 214 +++--------------- 6 files changed, 118 insertions(+), 192 deletions(-) create mode 100644 framework/src/main/java/com/github/dbmdz/flusswerk/framework/rabbitmq/ChannelCommand.java create mode 100644 framework/src/main/java/com/github/dbmdz/flusswerk/framework/rabbitmq/ChannelCommands.java rename framework/src/main/java/com/github/dbmdz/flusswerk/framework/{engine => rabbitmq}/ChannelListener.java (70%) diff --git a/framework/src/main/java/com/github/dbmdz/flusswerk/framework/engine/Engine.java b/framework/src/main/java/com/github/dbmdz/flusswerk/framework/engine/Engine.java index 2b144ba1..66d67774 100644 --- a/framework/src/main/java/com/github/dbmdz/flusswerk/framework/engine/Engine.java +++ b/framework/src/main/java/com/github/dbmdz/flusswerk/framework/engine/Engine.java @@ -1,6 +1,7 @@ package com.github.dbmdz.flusswerk.framework.engine; import com.github.dbmdz.flusswerk.framework.flow.Flow; +import com.github.dbmdz.flusswerk.framework.rabbitmq.ChannelListener; import com.github.dbmdz.flusswerk.framework.rabbitmq.RabbitClient; import java.io.IOException; import java.util.ArrayList; @@ -96,7 +97,7 @@ public void stop() { List remainingTasks = new ArrayList<>(); taskQueue.drainTo(remainingTasks); - // NACK and requeue all messages that have not be processed yet + // NACK and requeue all messages that have not been processed yet for (var task : remainingTasks) { long deliveryTag = task.getMessage().getEnvelope().getDeliveryTag(); try { diff --git a/framework/src/main/java/com/github/dbmdz/flusswerk/framework/rabbitmq/ChannelCommand.java b/framework/src/main/java/com/github/dbmdz/flusswerk/framework/rabbitmq/ChannelCommand.java new file mode 100644 index 00000000..200e8fba --- /dev/null +++ b/framework/src/main/java/com/github/dbmdz/flusswerk/framework/rabbitmq/ChannelCommand.java @@ -0,0 +1,7 @@ +package com.github.dbmdz.flusswerk.framework.rabbitmq; + +import java.io.IOException; + +public interface ChannelCommand { + T execute() throws IOException; +} diff --git a/framework/src/main/java/com/github/dbmdz/flusswerk/framework/rabbitmq/ChannelCommands.java b/framework/src/main/java/com/github/dbmdz/flusswerk/framework/rabbitmq/ChannelCommands.java new file mode 100644 index 00000000..b2329b69 --- /dev/null +++ b/framework/src/main/java/com/github/dbmdz/flusswerk/framework/rabbitmq/ChannelCommands.java @@ -0,0 +1,81 @@ +package com.github.dbmdz.flusswerk.framework.rabbitmq; + +import com.rabbitmq.client.*; +import java.util.Map; + +public class ChannelCommands { + + private final Channel channel; + + public ChannelCommands(Channel channel) { + this.channel = channel; + } + + public ChannelCommand basicPublish( + String exchange, String routingKey, AMQP.BasicProperties properties, byte[] data) { + return () -> { + channel.basicPublish(exchange, routingKey, properties, data); + return null; + }; + } + + public ChannelCommand basicAck(long deliveryTag, boolean multiple) { + return () -> { + channel.basicAck(deliveryTag, multiple); + return null; + }; + } + + public ChannelCommand basicReject(long deliveryTag, boolean requeue) { + return () -> { + channel.basicReject(deliveryTag, requeue); + return null; + }; + } + + public ChannelCommand basicGet(String queue, boolean autoAck) { + return () -> channel.basicGet(queue, autoAck); + } + + public ChannelCommand basicConsume(String queue, boolean autoAck, Consumer consumer) { + return () -> { + channel.basicConsume(queue, autoAck, consumer); + return null; + }; + } + + public ChannelCommand exchangeDeclare( + String exchange, BuiltinExchangeType type, boolean durable) { + return () -> { + channel.exchangeDeclare(exchange, type, durable); + return null; + }; + } + + public ChannelCommand queueDeclare( + String name, + boolean durable, + boolean exclusive, + boolean autoDelete, + Map args) { + return () -> { + channel.queueDeclare(name, durable, exclusive, autoDelete, args); + return null; + }; + } + + public ChannelCommand queueBind(String name, String exchange, String routingKey) { + return () -> { + channel.queueBind(name, exchange, routingKey); + return null; + }; + } + + public ChannelCommand messageCount(String queue) { + return () -> channel.messageCount(queue); + } + + public ChannelCommand queuePurge(String queue) { + return () -> channel.queuePurge(queue); + } +} diff --git a/framework/src/main/java/com/github/dbmdz/flusswerk/framework/engine/ChannelListener.java b/framework/src/main/java/com/github/dbmdz/flusswerk/framework/rabbitmq/ChannelListener.java similarity index 70% rename from framework/src/main/java/com/github/dbmdz/flusswerk/framework/engine/ChannelListener.java rename to framework/src/main/java/com/github/dbmdz/flusswerk/framework/rabbitmq/ChannelListener.java index d14bb4dc..4caacfab 100644 --- a/framework/src/main/java/com/github/dbmdz/flusswerk/framework/engine/ChannelListener.java +++ b/framework/src/main/java/com/github/dbmdz/flusswerk/framework/rabbitmq/ChannelListener.java @@ -1,4 +1,4 @@ -package com.github.dbmdz.flusswerk.framework.engine; +package com.github.dbmdz.flusswerk.framework.rabbitmq; /** A ChannelListener receives notifications about channel recovery. */ public interface ChannelListener { diff --git a/framework/src/main/java/com/github/dbmdz/flusswerk/framework/rabbitmq/Queue.java b/framework/src/main/java/com/github/dbmdz/flusswerk/framework/rabbitmq/Queue.java index 72d45fb9..465fc319 100644 --- a/framework/src/main/java/com/github/dbmdz/flusswerk/framework/rabbitmq/Queue.java +++ b/framework/src/main/java/com/github/dbmdz/flusswerk/framework/rabbitmq/Queue.java @@ -4,7 +4,6 @@ import com.github.dbmdz.flusswerk.framework.exceptions.InvalidMessageException; import com.github.dbmdz.flusswerk.framework.model.Message; -import java.io.IOException; import java.util.Objects; import java.util.Optional; import org.slf4j.Logger; @@ -38,7 +37,7 @@ public int purge() { /** * @return the number of messages in this queue. */ - public long messageCount() throws IOException { + public long messageCount() { return rabbitClient.getMessageCount(this.name); } diff --git a/framework/src/main/java/com/github/dbmdz/flusswerk/framework/rabbitmq/RabbitClient.java b/framework/src/main/java/com/github/dbmdz/flusswerk/framework/rabbitmq/RabbitClient.java index 7caa6847..adac40fe 100644 --- a/framework/src/main/java/com/github/dbmdz/flusswerk/framework/rabbitmq/RabbitClient.java +++ b/framework/src/main/java/com/github/dbmdz/flusswerk/framework/rabbitmq/RabbitClient.java @@ -1,7 +1,6 @@ package com.github.dbmdz.flusswerk.framework.rabbitmq; import com.fasterxml.jackson.core.JsonProcessingException; -import com.github.dbmdz.flusswerk.framework.engine.ChannelListener; import com.github.dbmdz.flusswerk.framework.engine.FlusswerkConsumer; import com.github.dbmdz.flusswerk.framework.exceptions.InvalidMessageException; import com.github.dbmdz.flusswerk.framework.jackson.FlusswerkObjectMapper; @@ -23,19 +22,19 @@ public class RabbitClient { private static final boolean DURABLE = true; - private static final boolean NO_AUTO_DELETE = false; + private static final boolean AUTO_DELETE = false; - private static final boolean NOT_EXCLUSIVE = false; + private static final boolean EXCLUSIVE = false; private static final Integer PERSISTENT = 2; - private static final boolean SINGLE_MESSAGE = false; + private static final boolean MULTIPLE_MESSAGES = false; private static final Logger log = LoggerFactory.getLogger(RabbitClient.class); private final RabbitConnection connection; private final Channel channel; - + private final ChannelCommands commands; private final Lock channelLock = new ReentrantLock(); private final Condition channelAvailableAgain = channelLock.newCondition(); private final FlusswerkObjectMapper objectMapper; @@ -75,6 +74,7 @@ public void handleRecoveryStarted(Recoverable recoverable) { } else { throw new RuntimeException("Flusswerk needs a recoverable connection to RabbitMQ"); } + commands = new ChannelCommands(channel); objectMapper = flusswerkObjectMapper; } @@ -94,34 +94,7 @@ void sendRaw(String exchange, String routingKey, byte[] data) { .deliveryMode(PERSISTENT) .build(); - // The channel might not be available or become unavailable due to a connection error. In this - // case, we wait until the connection becomes available again. - while (true) { - if (channelAvailable) { - try { - channel.basicPublish(exchange, routingKey, properties, data); - break; - } catch (IOException | AlreadyClosedException e) { - // Channel-level exceptions are not recoverable - if (e instanceof AlreadyClosedException && !((AlreadyClosedException) e).isHardError()) { - recoverChannel(); - } else { - log.warn( - "Failed to publish message to RabbitMQ: '{}', waiting for channel to become available again", - e.getMessage()); - channelAvailable = false; - } - } - } - // We loop here because the signal might be triggered due to what the JVM documentation calls - // a 'spurious wakeup', i.e. the signal is triggered even though no connection recovery has - // yet happened. - while (!channelAvailable) { - channelLock.lock(); - channelAvailableAgain.awaitUninterruptibly(); - channelLock.unlock(); - } - } + execute(commands.basicPublish(exchange, routingKey, properties, data)); } Message deserialize(String body) throws JsonProcessingException { @@ -137,102 +110,18 @@ public void ack(com.github.dbmdz.flusswerk.framework.model.Envelope envelope) { } public void ack(long deliveryTag) { - // The channel might not be available or become unavailable due to a connection error. In this - // case, we wait until the connection becomes available again. - while (true) { - if (channelAvailable) { - try { - channel.basicAck(deliveryTag, SINGLE_MESSAGE); - break; - } catch (IOException | AlreadyClosedException e) { - // Channel-level exceptions are not recoverable - if (e instanceof AlreadyClosedException && !((AlreadyClosedException) e).isHardError()) { - recoverChannel(); - } else { - log.warn( - "Failed to acknowledge message from RabbitMQ: '{}', waiting for channel to become available again", - e.getMessage()); - channelAvailable = false; - } - } - } - // We loop here because the signal might be triggered due to what the JVM documentation calls - // a 'spurious wakeup', i.e. the signal is triggered even though no connection recovery has - // yet happened. - while (!channelAvailable) { - channelLock.lock(); - channelAvailableAgain.awaitUninterruptibly(); - channelLock.unlock(); - } - } + execute(commands.basicAck(deliveryTag, MULTIPLE_MESSAGES)); } public void reject(Envelope envelope, boolean requeue) { - // The channel might not be available or become unavailable due to a connection error. In this - // case, we wait until the connection becomes available again. - while (true) { - if (channelAvailable) { - try { - channel.basicReject(envelope.getDeliveryTag(), requeue); - break; - } catch (IOException | AlreadyClosedException e) { - // Channel-level exceptions are not recoverable - if (e instanceof AlreadyClosedException && !((AlreadyClosedException) e).isHardError()) { - recoverChannel(); - } else { - log.warn( - "Failed to reject message from RabbitMQ: '{}', waiting for channel to become available again", - e.getMessage()); - channelAvailable = false; - } - } - } - // We loop here because the signal might be triggered due to what the JVM documentation calls - // a 'spurious wakeup', i.e. the signal is triggered even though no connection recovery has - // yet happened. - while (!channelAvailable) { - channelLock.lock(); - channelAvailableAgain.awaitUninterruptibly(); - channelLock.unlock(); - } - } + execute(commands.basicReject(envelope.getDeliveryTag(), requeue)); } public Message receive(String queueName, boolean autoAck) throws InvalidMessageException { - GetResponse response; - // The channel might not be available or become unavailable due to a connection error. In this - // case, we wait until the connection becomes available again. - while (true) { - if (channelAvailable) { - try { - response = channel.basicGet(queueName, autoAck); - break; - } catch (IOException | AlreadyClosedException e) { - // Channel-level exceptions are not recoverable - if (e instanceof AlreadyClosedException && !((AlreadyClosedException) e).isHardError()) { - recoverChannel(); - } else { - log.warn( - "Failed to get message from RabbitMQ: '{}', waiting for channel to become available again", - e.getMessage()); - channelAvailable = false; - } - } - } - // We loop here because the signal might be triggered due to what the JVM documentation calls - // a 'spurious wakeup', i.e. the signal is triggered even though no connection recovery has - // yet happened. - while (!channelAvailable) { - channelLock.lock(); - channelAvailableAgain.awaitUninterruptibly(); - channelLock.unlock(); - } - } - + GetResponse response = (GetResponse) execute(commands.basicGet(queueName, autoAck)); if (response == null) { return null; } - String body = new String(response.getBody(), StandardCharsets.UTF_8); try { Message message = deserialize(body); @@ -251,92 +140,37 @@ public Message receive(String queueName, boolean autoAck) throws InvalidMessageE } public void consume(FlusswerkConsumer consumer, boolean autoAck) { - // The channel might not be available or become unavailable due to a connection error. In this - // case, we wait until the connection becomes available again. - while (true) { - if (channelAvailable) { - try { - channel.basicConsume(consumer.getInputQueue(), autoAck, consumer); - break; - } catch (IOException | AlreadyClosedException e) { - // Channel-level exceptions are not recoverable - if (e instanceof AlreadyClosedException && !((AlreadyClosedException) e).isHardError()) { - recoverChannel(); - } else { - log.warn( - "Failed to start RabbitMQ consumer: '{}', waiting for channel to become available again", - e.getMessage()); - channelAvailable = false; - } - } - } - // We loop here because the signal might be triggered due to what the JVM documentation calls - // a 'spurious wakeup', i.e. the signal is triggered even though no connection recovery has - // yet happened. - while (!channelAvailable) { - channelLock.lock(); - channelAvailableAgain.awaitUninterruptibly(); - channelLock.unlock(); - } - } + execute(commands.basicConsume(consumer.getInputQueue(), autoAck, consumer)); } public void nack(long deliveryTag, boolean multiple, boolean requeue) throws IOException { - // The channel might not be available or become unavailable due to a connection error. In this - // case, we wait until the connection becomes available again. - while (true) { - if (channelAvailable) { - try { - channel.basicNack(deliveryTag, multiple, requeue); - break; - } catch (IOException | AlreadyClosedException e) { - // Channel-level exceptions are not recoverable - if (e instanceof AlreadyClosedException && !((AlreadyClosedException) e).isHardError()) { - recoverChannel(); - } else { - log.warn( - "Failed to start RabbitMQ consumer: '{}', waiting for channel to become available again", - e.getMessage()); - channelAvailable = false; - } - } - } - // We loop here because the signal might be triggered due to what the JVM documentation calls - // a 'spurious wakeup', i.e. the signal is triggered even though no connection recovery has - // yet happened. - while (!channelAvailable) { - channelLock.lock(); - channelAvailableAgain.awaitUninterruptibly(); - channelLock.unlock(); - } - } + channel.basicNack(deliveryTag, multiple, requeue); } public void cancel(String consumerTag) throws IOException { channel.basicCancel(consumerTag); } - public void provideExchange(String exchange) throws IOException { - channel.exchangeDeclare(exchange, BuiltinExchangeType.TOPIC, DURABLE); + public void provideExchange(String exchange) { + execute(commands.exchangeDeclare(exchange, BuiltinExchangeType.TOPIC, DURABLE)); } public void declareQueue( - String name, String exchange, String routingKey, Map args) - throws IOException { + String name, String exchange, String routingKey, Map args) { createQueue(name, args); bindQueue(name, exchange, routingKey); } - public void createQueue(String name, Map args) throws IOException { - channel.queueDeclare(name, DURABLE, NOT_EXCLUSIVE, NO_AUTO_DELETE, args); + public void createQueue(String name, Map args) { + execute(commands.queueDeclare(name, DURABLE, EXCLUSIVE, AUTO_DELETE, args)); } - public void bindQueue(String name, String exchange, String routingKey) throws IOException { - channel.queueBind(name, exchange, routingKey); + public void bindQueue(String name, String exchange, String routingKey) { + execute(commands.queueBind(name, exchange, routingKey)); } - public Long getMessageCount(String queue) throws IOException { - return channel.messageCount(queue); + public Long getMessageCount(String queue) { + return (Long) execute(commands.messageCount(queue)); } public boolean isChannelAvailable() { @@ -348,18 +182,22 @@ Channel getChannel() { } public AMQP.Queue.PurgeOk queuePurge(String name) { + return (AMQP.Queue.PurgeOk) execute(commands.queuePurge(name)); + } + + private Object execute(ChannelCommand channelCommand) { // The channel might not be available or become unavailable due to a connection error. In this // case, we wait until the connection becomes available again. while (true) { if (channelAvailable) { try { - return channel.queuePurge(name); + return channelCommand.execute(); } catch (IOException | AlreadyClosedException e) { if (e instanceof AlreadyClosedException && !((AlreadyClosedException) e).isHardError()) { recoverChannel(); } else { log.warn( - "Failed to purge queue from RabbitMQ: '{}', waiting for channel to become available again", + "Failed to communicate with RabbitMQ: '{}', waiting for channel to become available again", e.getMessage()); channelAvailable = false; }