From eda7e9f844a91826adb5238588945f1e3b8f03e1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Ossowski?= <623488+micossow@users.noreply.github.com> Date: Wed, 10 Apr 2024 18:10:24 +0200 Subject: [PATCH] Implement DLQ redrive actions (#987) * StartMessageMoveTask handling * add some logging, test cases * implement CancelMessageMoveTask * fix missing ApproximateNumberOfMessagesMoved refactor test * ListMessageMoveTasksDirectives and error handling * fix scala 2.12 build * refactor ListMessageMoveTasksDirectives * refactor QueueManagerActor * refactor TaskId => TaskHandle * make MessageMoveTaskTest support both SDKs * fix build * refactoring --- build.sbt | 2 +- .../scala/org/elasticmq/ElasticMQError.scala | 33 +-- .../elasticmq/actor/QueueManagerActor.scala | 26 ++- .../actor/QueueManagerActorStorage.scala | 19 ++ .../actor/QueueManagerMessageMoveOps.scala | 92 ++++++++ .../elasticmq/actor/queue/MessageQueue.scala | 21 ++ .../elasticmq/actor/queue/QueueActor.scala | 6 +- .../actor/queue/QueueActorMessageOps.scala | 15 ++ .../actor/queue/QueueActorStorage.scala | 4 +- .../operations/MoveMessagesAsyncOps.scala | 159 +++++++++++++ .../org/elasticmq/msg/QueueManagerMsg.scala | 13 ++ .../scala/org/elasticmq/msg/QueueMsg.scala | 14 ++ .../scala/org/elasticmq/msg/package.scala | 5 + .../rest/sqs/AmazonJavaSdkTestSuite.scala | 1 - .../org/elasticmq/rest/sqs/AwsConfig.scala | 7 + .../rest/sqs/MessageMoveTaskTest.scala | 208 ++++++++++++++++++ .../sqs/SqsClientServerCommunication.scala | 9 +- ...qsClientServerWithSdkV2Communication.scala | 17 +- .../rest/sqs/client/AwsSdkV1SqsClient.scala | 114 ++++++++++ .../rest/sqs/client/AwsSdkV2SqsClient.scala | 124 +++++++++++ .../rest/sqs/client/HasSqsTestClient.scala | 6 + .../elasticmq/rest/sqs/client/SqsClient.scala | 12 + .../rest/sqs/client/SqsClientError.scala | 3 + .../elasticmq/rest/sqs/client/package.scala | 52 +++++ .../scala/org/elasticmq/rest/sqs/Action.scala | 3 + .../org/elasticmq/rest/sqs/ArnSupport.scala | 12 + .../sqs/CancelMessageMoveTaskDirectives.scala | 67 ++++++ .../sqs/ListMessageMoveTasksDirectives.scala | 111 ++++++++++ .../org/elasticmq/rest/sqs/SQSException.scala | 16 +- .../rest/sqs/SQSRestServerBuilder.scala | 14 +- .../sqs/StartMessageMoveTaskDirectives.scala | 103 +++++++++ 31 files changed, 1252 insertions(+), 36 deletions(-) create mode 100644 core/src/main/scala/org/elasticmq/actor/QueueManagerActorStorage.scala create mode 100644 core/src/main/scala/org/elasticmq/actor/QueueManagerMessageMoveOps.scala create mode 100644 core/src/main/scala/org/elasticmq/actor/queue/operations/MoveMessagesAsyncOps.scala create mode 100644 core/src/main/scala/org/elasticmq/msg/package.scala create mode 100644 rest/rest-sqs-testing-amazon-java-sdk/src/test/scala/org/elasticmq/rest/sqs/AwsConfig.scala create mode 100644 rest/rest-sqs-testing-amazon-java-sdk/src/test/scala/org/elasticmq/rest/sqs/MessageMoveTaskTest.scala create mode 100644 rest/rest-sqs-testing-amazon-java-sdk/src/test/scala/org/elasticmq/rest/sqs/client/AwsSdkV1SqsClient.scala create mode 100644 rest/rest-sqs-testing-amazon-java-sdk/src/test/scala/org/elasticmq/rest/sqs/client/AwsSdkV2SqsClient.scala create mode 100644 rest/rest-sqs-testing-amazon-java-sdk/src/test/scala/org/elasticmq/rest/sqs/client/HasSqsTestClient.scala create mode 100644 rest/rest-sqs-testing-amazon-java-sdk/src/test/scala/org/elasticmq/rest/sqs/client/SqsClient.scala create mode 100644 rest/rest-sqs-testing-amazon-java-sdk/src/test/scala/org/elasticmq/rest/sqs/client/SqsClientError.scala create mode 100644 rest/rest-sqs-testing-amazon-java-sdk/src/test/scala/org/elasticmq/rest/sqs/client/package.scala create mode 100644 rest/rest-sqs/src/main/scala/org/elasticmq/rest/sqs/ArnSupport.scala create mode 100644 rest/rest-sqs/src/main/scala/org/elasticmq/rest/sqs/CancelMessageMoveTaskDirectives.scala create mode 100644 rest/rest-sqs/src/main/scala/org/elasticmq/rest/sqs/ListMessageMoveTasksDirectives.scala create mode 100644 rest/rest-sqs/src/main/scala/org/elasticmq/rest/sqs/StartMessageMoveTaskDirectives.scala diff --git a/build.sbt b/build.sbt index faf11cd05..77f7b5135 100644 --- a/build.sbt +++ b/build.sbt @@ -38,7 +38,7 @@ val jclOverSlf4j = "org.slf4j" % "jcl-over-slf4j" % "2.0.12" // needed form amaz val scalatest = "org.scalatest" %% "scalatest" % "3.2.18" val awaitility = "org.awaitility" % "awaitility-scala" % "4.2.1" -val amazonJavaSdkSqs = "com.amazonaws" % "aws-java-sdk-sqs" % "1.12.472" exclude ("commons-logging", "commons-logging") +val amazonJavaSdkSqs = "com.amazonaws" % "aws-java-sdk-sqs" % "1.12.580" exclude ("commons-logging", "commons-logging") val amazonJavaV2SdkSqs = "software.amazon.awssdk" % "sqs" % "2.21.43" val pekkoVersion = "1.0.2" diff --git a/core/src/main/scala/org/elasticmq/ElasticMQError.scala b/core/src/main/scala/org/elasticmq/ElasticMQError.scala index 47571ffe8..a8afe4dcd 100644 --- a/core/src/main/scala/org/elasticmq/ElasticMQError.scala +++ b/core/src/main/scala/org/elasticmq/ElasticMQError.scala @@ -1,32 +1,35 @@ package org.elasticmq +import org.elasticmq.msg.MessageMoveTaskHandle -trait ElasticMQError { +sealed trait ElasticMQError { val queueName: String - val code: String + val code: String // TODO: code should be handled in rest-sqs module val message: String } -class QueueAlreadyExists(val queueName: String) extends ElasticMQError { +final case class QueueAlreadyExists(val queueName: String) extends ElasticMQError { val code = "QueueAlreadyExists" val message = s"Queue already exists: $queueName" } -case class QueueCreationError(queueName: String, reason: String) extends ElasticMQError { - val code = "QueueCreationError" - val message = s"Queue named $queueName could not be created because of $reason" -} - -case class InvalidParameterValue(queueName: String, reason: String) extends ElasticMQError { +final case class InvalidParameterValue(queueName: String, reason: String) extends ElasticMQError { val code = "InvalidParameterValue" val message = reason } -class MessageDoesNotExist(val queueName: String, messageId: MessageId) extends ElasticMQError { - val code = "MessageDoesNotExist" - val message = s"Message does not exist: $messageId in queue: $queueName" -} - -class InvalidReceiptHandle(val queueName: String, receiptHandle: String) extends ElasticMQError { +final case class InvalidReceiptHandle(val queueName: String, receiptHandle: String) extends ElasticMQError { val code = "ReceiptHandleIsInvalid" val message = s"""The receipt handle "$receiptHandle" is not valid.""" } + +final case class InvalidMessageMoveTaskHandle(val taskHandle: MessageMoveTaskHandle) extends ElasticMQError { + val code = "ResourceNotFoundException" + val message = s"""The task handle "$taskHandle" is not valid or does not exist""" + + override val queueName: String = "invalid" +} + +final case class MessageMoveTaskAlreadyRunning(val queueName: String) extends ElasticMQError { + val code = "AWS.SimpleQueueService.UnsupportedOperation" + val message = s"""A message move task is already running on queue "$queueName"""" +} diff --git a/core/src/main/scala/org/elasticmq/actor/QueueManagerActor.scala b/core/src/main/scala/org/elasticmq/actor/QueueManagerActor.scala index 69d7ca300..934706927 100644 --- a/core/src/main/scala/org/elasticmq/actor/QueueManagerActor.scala +++ b/core/src/main/scala/org/elasticmq/actor/QueueManagerActor.scala @@ -7,18 +7,23 @@ import org.elasticmq.actor.reply._ import org.elasticmq.msg._ import org.elasticmq.util.{Logging, NowProvider} +import scala.collection.mutable import scala.reflect._ class QueueManagerActor(nowProvider: NowProvider, limits: Limits, queueEventListener: Option[ActorRef]) extends ReplyingActor + with QueueManagerActorStorage + with QueueManagerMessageMoveOps with Logging { + type M[X] = QueueManagerMsg[X] val ev: ClassTag[QueueManagerMsg[Unit]] = classTag[M[Unit]] - case class ActorWithQueueData(actorRef: ActorRef, queueData: QueueData) - private val queues = collection.mutable.HashMap[String, ActorWithQueueData]() + val queues: mutable.Map[MessageMoveTaskHandle, ActorWithQueueData] = mutable.HashMap[String, ActorWithQueueData]() - def receiveAndReply[T](msg: QueueManagerMsg[T]): ReplyAction[T] = + // TODO: create *Ops class like in QueueActor + def receiveAndReply[T](msg: QueueManagerMsg[T]): ReplyAction[T] = { + val self = context.self msg match { case CreateQueue(request) => queues.get(request.name) match { @@ -63,7 +68,19 @@ class QueueManagerActor(nowProvider: NowProvider, limits: Limits, queueEventList queues.collect { case (name, actor) if actor.queueData.deadLettersQueue.exists(_.name == queueName) => name }.toList + + case StartMessageMoveTask( + sourceQueue, + sourceArn, + destinationQueue, + destinationArn, + maxNumberOfMessagesPerSecond + ) => + startMessageMoveTask(sourceQueue, sourceArn, destinationQueue, destinationArn, maxNumberOfMessagesPerSecond) + case MessageMoveTaskFinished(taskHandle) => onMessageMoveTaskFinished(taskHandle) + case CancelMessageMoveTask(taskHandle) => cancelMessageMoveTask(taskHandle) } + } protected def createQueueActor( nowProvider: NowProvider, @@ -88,7 +105,8 @@ class QueueManagerActor(nowProvider: NowProvider, limits: Limits, queueEventList moveMessagesToQueueActor, queueEventListener ) - ) + ), + s"queue-${queueData.name}" ) } diff --git a/core/src/main/scala/org/elasticmq/actor/QueueManagerActorStorage.scala b/core/src/main/scala/org/elasticmq/actor/QueueManagerActorStorage.scala new file mode 100644 index 000000000..2392e683c --- /dev/null +++ b/core/src/main/scala/org/elasticmq/actor/QueueManagerActorStorage.scala @@ -0,0 +1,19 @@ +package org.elasticmq.actor +import org.apache.pekko.actor.{ActorContext, ActorRef} +import org.apache.pekko.util.Timeout +import org.elasticmq.QueueData + +import scala.collection.mutable +import scala.concurrent.ExecutionContext +import scala.concurrent.duration.DurationInt + +trait QueueManagerActorStorage { + + def context: ActorContext + + implicit lazy val ec: ExecutionContext = context.dispatcher + implicit lazy val timeout: Timeout = 5.seconds + + case class ActorWithQueueData(actorRef: ActorRef, queueData: QueueData) + def queues: mutable.Map[String, ActorWithQueueData] +} diff --git a/core/src/main/scala/org/elasticmq/actor/QueueManagerMessageMoveOps.scala b/core/src/main/scala/org/elasticmq/actor/QueueManagerMessageMoveOps.scala new file mode 100644 index 000000000..9bac64e8a --- /dev/null +++ b/core/src/main/scala/org/elasticmq/actor/QueueManagerMessageMoveOps.scala @@ -0,0 +1,92 @@ +package org.elasticmq.actor +import org.apache.pekko.actor.ActorRef +import org.apache.pekko.util.Timeout +import org.elasticmq.actor.reply._ +import org.elasticmq.msg.{CancelMovingMessages, GetQueueData, MessageMoveTaskHandle, StartMovingMessages} +import org.elasticmq.util.Logging +import org.elasticmq.{ElasticMQError, InvalidMessageMoveTaskHandle} + +import scala.collection.mutable +import scala.concurrent.Future +import scala.util.{Failure, Success} + +trait QueueManagerMessageMoveOps extends Logging { + this: QueueManagerActorStorage => + + private val messageMoveTasks = mutable.HashMap[MessageMoveTaskHandle, ActorRef]() + + def startMessageMoveTask( + sourceQueue: ActorRef, + sourceArn: String, + destinationQueue: Option[ActorRef], + destinationArn: Option[String], + maxNumberOfMessagesPerSecond: Option[Int] + )(implicit timeout: Timeout): ReplyAction[Either[ElasticMQError, MessageMoveTaskHandle]] = { + val self = context.self + val replyTo = context.sender() + (for { + destinationQueueActorRef <- destinationQueue + .map(Future.successful) + .getOrElse(findDeadLetterQueueSource(sourceQueue)) + result <- sourceQueue ? StartMovingMessages( + destinationQueueActorRef, + destinationArn, + sourceArn, + maxNumberOfMessagesPerSecond, + self + ) + } yield (result, destinationQueueActorRef)).onComplete { + case Success((result, destinationQueueActorRef)) => + result match { + case Right(taskHandle) => + logger.debug("Message move task {} => {} created", sourceQueue, destinationQueueActorRef) + messageMoveTasks.put(taskHandle, sourceQueue) + replyTo ! Right(taskHandle) + case Left(error) => + logger.error("Failed to start message move task: {}", error) + replyTo ! Left(error) + } + case Failure(ex) => logger.error("Failed to start message move task", ex) + } + DoNotReply() + } + + def onMessageMoveTaskFinished(taskHandle: MessageMoveTaskHandle): ReplyAction[Unit] = { + logger.debug("Message move task {} finished", taskHandle) + messageMoveTasks.remove(taskHandle) + DoNotReply() + } + + def cancelMessageMoveTask(taskHandle: MessageMoveTaskHandle): ReplyAction[Either[ElasticMQError, Long]] = { + logger.info("Cancelling message move task {}", taskHandle) + messageMoveTasks.get(taskHandle) match { + case Some(sourceQueue) => + val replyTo = context.sender() + sourceQueue ? CancelMovingMessages() onComplete { + case Success(numMessageMoved) => + logger.debug("Message move task {} cancelled", taskHandle) + messageMoveTasks.remove(taskHandle) + replyTo ! Right(numMessageMoved) + case Failure(ex) => + logger.error("Failed to cancel message move task", ex) + replyTo ! Left(ex) + } + DoNotReply() + case None => + ReplyWith(Left(new InvalidMessageMoveTaskHandle(taskHandle))) + } + } + + private def findDeadLetterQueueSource(sourceQueue: ActorRef) = { + val queueDataF = sourceQueue ? GetQueueData() + queueDataF.map { queueData => + queues + .filter { case (_, data) => + data.queueData.deadLettersQueue.exists(dlqData => dlqData.name == queueData.name) + } + .head + ._2 + .actorRef + } + } +} diff --git a/core/src/main/scala/org/elasticmq/actor/queue/MessageQueue.scala b/core/src/main/scala/org/elasticmq/actor/queue/MessageQueue.scala index 51521ac79..06dd070b1 100644 --- a/core/src/main/scala/org/elasticmq/actor/queue/MessageQueue.scala +++ b/core/src/main/scala/org/elasticmq/actor/queue/MessageQueue.scala @@ -22,6 +22,13 @@ sealed trait MessageQueue { */ def getById(id: String): Option[InternalMessage] + /** Remove the first message from the queue + * + * @return + * The message or None if not exists + */ + def pop: Option[InternalMessage] + /** Get all messages in queue * * @return @@ -29,6 +36,8 @@ sealed trait MessageQueue { */ def all: Iterable[InternalMessage] + def size: Long + /** Drop all messages on the queue */ def clear(): Unit @@ -137,8 +146,20 @@ object MessageQueue { override def getById(id: String): Option[InternalMessage] = messagesById.get(id) + override def pop: Option[InternalMessage] = { + if (messageQueue.isEmpty) { + None + } else { + val firstMessage = messageQueue.dequeue() + remove(firstMessage.id) + Some(firstMessage) + } + } + override def all: Iterable[InternalMessage] = messagesById.values + override def size: Long = messageQueue.size + override def clear(): Unit = { messagesById.clear() messageQueue.clear() diff --git a/core/src/main/scala/org/elasticmq/actor/queue/QueueActor.scala b/core/src/main/scala/org/elasticmq/actor/queue/QueueActor.scala index e81d1a222..ae9f72632 100644 --- a/core/src/main/scala/org/elasticmq/actor/queue/QueueActor.scala +++ b/core/src/main/scala/org/elasticmq/actor/queue/QueueActor.scala @@ -1,8 +1,8 @@ package org.elasticmq.actor.queue -import org.apache.pekko.actor.ActorRef +import org.apache.pekko.actor.{ActorLogging, ActorRef} import org.elasticmq.QueueData -import org.elasticmq.actor.reply.ReplyingActor +import org.elasticmq.actor.reply.{ReplyAction, ReplyingActor} import org.elasticmq.msg._ import org.elasticmq.util.{Logging, NowProvider} @@ -24,7 +24,7 @@ class QueueActor( type M[X] = QueueMsg[X] val ev = classTag[M[Unit]] - def receiveAndReply[T](msg: QueueMsg[T]) = + def receiveAndReply[T](msg: QueueMsg[T]): ReplyAction[T] = msg match { case m: QueueQueueMsg[T] => val replyAction = receiveAndReplyQueueMsg(m) diff --git a/core/src/main/scala/org/elasticmq/actor/queue/QueueActorMessageOps.scala b/core/src/main/scala/org/elasticmq/actor/queue/QueueActorMessageOps.scala index 819a62717..1a8920798 100644 --- a/core/src/main/scala/org/elasticmq/actor/queue/QueueActorMessageOps.scala +++ b/core/src/main/scala/org/elasticmq/actor/queue/QueueActorMessageOps.scala @@ -15,6 +15,7 @@ trait QueueActorMessageOps with DeleteMessageOps with ReceiveMessageOps with MoveMessageOps + with MoveMessagesAsyncOps with Timers { this: QueueActorStorage => @@ -38,6 +39,20 @@ trait QueueActorMessageOps fifoMessagesHistory = fifoMessagesHistory.cleanOutdatedMessages(nowProvider) DoNotReply() case RestoreMessages(messages) => restoreMessages(messages) + case StartMovingMessages( + destinationQueue, + destinationArn, + sourceArn, + maxNumberOfMessagesPerSecond, + queueManager + ) => + startMovingMessages(destinationQueue, destinationArn, sourceArn, maxNumberOfMessagesPerSecond, queueManager) + case CancelMovingMessages() => + cancelMovingMessages() + case MoveFirstMessage(destinationQueue, queueManager) => + moveFirstMessage(destinationQueue, queueManager).send() + case GetMovingMessagesTasks() => + getMovingMessagesTasks } } } diff --git a/core/src/main/scala/org/elasticmq/actor/queue/QueueActorStorage.scala b/core/src/main/scala/org/elasticmq/actor/queue/QueueActorStorage.scala index f021ab15e..24df654e9 100644 --- a/core/src/main/scala/org/elasticmq/actor/queue/QueueActorStorage.scala +++ b/core/src/main/scala/org/elasticmq/actor/queue/QueueActorStorage.scala @@ -55,7 +55,9 @@ trait QueueActorStorage { notificationF.onComplete { case Success(_) => result match { - case Some(r) => actualSender ! r + case Some(r) => + logger.debug(s"Sending message $r from ${context.self} to $actualSender") + actualSender ! r case None => } case Failure(ex) => logger.error(s"Failed to notify queue event listener. The state may be inconsistent.", ex) diff --git a/core/src/main/scala/org/elasticmq/actor/queue/operations/MoveMessagesAsyncOps.scala b/core/src/main/scala/org/elasticmq/actor/queue/operations/MoveMessagesAsyncOps.scala new file mode 100644 index 000000000..30d8057fe --- /dev/null +++ b/core/src/main/scala/org/elasticmq/actor/queue/operations/MoveMessagesAsyncOps.scala @@ -0,0 +1,159 @@ +package org.elasticmq.actor.queue.operations + +import org.apache.pekko.actor.ActorRef +import org.elasticmq.actor.queue.{QueueActorStorage, QueueEvent} +import org.elasticmq.msg.{MessageMoveTaskFinished, MessageMoveTaskHandle, MoveFirstMessage, SendMessage} +import org.elasticmq.util.Logging +import org.elasticmq.{ElasticMQError, MessageMoveTaskAlreadyRunning} + +import java.util.UUID +import scala.concurrent.duration.{DurationInt, FiniteDuration, NANOSECONDS} + +sealed trait MessageMoveTaskState +case object NotMovingMessages extends MessageMoveTaskState +case class MovingMessagesInProgress( + numberOfMessagesMoved: Long, + numberOfMessagesToMove: Long, + destinationArn: Option[String], + maxNumberOfMessagesPerSecond: Option[Int], + sourceArn: String, + startedTimestamp: Long, + taskHandle: MessageMoveTaskHandle +) extends MessageMoveTaskState + +case class MessageMoveTaskData( + numberOfMessagesMoved: Long, + numberOfMessagesToMove: Long, + destinationArn: Option[String], + maxNumberOfMessagesPerSecond: Option[Int], + sourceArn: String, + startedTimestamp: Long, + status: String, // RUNNING, COMPLETED, CANCELLING, CANCELLED, and FAILED + taskHandle: MessageMoveTaskHandle +) + +trait MoveMessagesAsyncOps extends Logging { + this: QueueActorStorage => + + private val prevMessageMoveTasks = collection.mutable.Buffer[MessageMoveTaskData]() + private var messageMoveTaskState: MessageMoveTaskState = NotMovingMessages + + def startMovingMessages( + destinationQueue: ActorRef, + destinationArn: Option[String], + sourceArn: String, + maxNumberOfMessagesPerSecond: Option[Int], + queueManager: ActorRef + ): Either[ElasticMQError, MessageMoveTaskHandle] = { + messageMoveTaskState match { + case NotMovingMessages => + val taskHandle = UUID.randomUUID().toString + logger.debug("Starting message move task to queue {} (task handle: {})", destinationQueue, taskHandle) + messageMoveTaskState = MovingMessagesInProgress( + 0, + messageQueue.size, + destinationArn, + maxNumberOfMessagesPerSecond, + sourceArn, + startedTimestamp = System.currentTimeMillis(), + taskHandle + ) + context.self ! MoveFirstMessage(destinationQueue, queueManager) + Right(taskHandle) + case _: MovingMessagesInProgress => + Left(new MessageMoveTaskAlreadyRunning(queueData.name)) + } + } + + def moveFirstMessage( + destinationQueue: ActorRef, + queueManager: ActorRef + ): ResultWithEvents[Unit] = { + messageMoveTaskState match { + case NotMovingMessages => + logger.debug("Not moving messages") + ResultWithEvents.empty + case mmInProgress @ MovingMessagesInProgress( + numberOfMessagesMoved, + _, + destinationArn, + maxNumberOfMessagesPerSecond, + sourceArn, + startedTimestamp, + taskHandle + ) => + logger.debug("Trying to move a single message to {} ({} messages left)", destinationQueue, messageQueue.size) + messageQueue.pop match { + case Some(internalMessage) => + messageMoveTaskState = mmInProgress.copy(numberOfMessagesMoved = numberOfMessagesMoved + 1) + destinationQueue ! SendMessage(internalMessage.toNewMessageData) + maxNumberOfMessagesPerSecond match { + case Some(v) => + val nanosInSecond = 1.second.toNanos.toDouble + val delayNanos = (nanosInSecond / v).toLong + val delay = FiniteDuration(delayNanos, NANOSECONDS) + context.system.scheduler.scheduleOnce( + delay, + context.self, + MoveFirstMessage(destinationQueue, queueManager) + ) + case None => + context.self ! MoveFirstMessage(destinationQueue, queueManager) + } + ResultWithEvents.onlyEvents(List(QueueEvent.MessageRemoved(queueData.name, internalMessage.id))) + case None => + logger.debug("No more messages to move") + prevMessageMoveTasks += MessageMoveTaskData( + numberOfMessagesMoved, + numberOfMessagesToMove = 0, + destinationArn, + maxNumberOfMessagesPerSecond, + sourceArn, + startedTimestamp, + status = "COMPLETED", + taskHandle + ) + messageMoveTaskState = NotMovingMessages + queueManager ! MessageMoveTaskFinished(taskHandle) + ResultWithEvents.empty + } + } + } + + def cancelMovingMessages(): Long = { + val numMessagesMoved = messageMoveTaskState match { + case NotMovingMessages => 0 + case mmInProgress: MovingMessagesInProgress => mmInProgress.numberOfMessagesMoved + } + messageMoveTaskState = NotMovingMessages + numMessagesMoved + } + + def getMovingMessagesTasks: List[MessageMoveTaskData] = { + val runningTaskAsList = messageMoveTaskState match { + case NotMovingMessages => List.empty + case MovingMessagesInProgress( + numberOfMessagesMoved, + numberOfMessagesToMove, + destinationArn, + maxNumberOfMessagesPerSecond, + sourceArn, + startedTimestamp, + taskHandle + ) => + List( + MessageMoveTaskData( + numberOfMessagesMoved, + numberOfMessagesToMove, + destinationArn, + maxNumberOfMessagesPerSecond, + sourceArn, + startedTimestamp, + status = "RUNNING", + taskHandle + ) + ) + } + (prevMessageMoveTasks.toList ++ runningTaskAsList).reverse + } +} diff --git a/core/src/main/scala/org/elasticmq/msg/QueueManagerMsg.scala b/core/src/main/scala/org/elasticmq/msg/QueueManagerMsg.scala index d11e5f371..bfcbe837a 100644 --- a/core/src/main/scala/org/elasticmq/msg/QueueManagerMsg.scala +++ b/core/src/main/scala/org/elasticmq/msg/QueueManagerMsg.scala @@ -11,3 +11,16 @@ case class DeleteQueue(queueName: String) extends QueueManagerMsg[Unit] case class LookupQueue(queueName: String) extends QueueManagerMsg[Option[ActorRef]] case class ListQueues() extends QueueManagerMsg[Seq[String]] case class ListDeadLetterSourceQueues(queueName: String) extends QueueManagerMsg[List[String]] +case class StartMessageMoveTask( + sourceQueue: ActorRef, + sourceArn: String, + destinationQueue: Option[ActorRef], + destinationArn: Option[String], + maxNumberOfMessagesPerSecond: Option[Int] +) extends QueueManagerMsg[Either[ElasticMQError, MessageMoveTaskHandle]] +case class MessageMoveTaskFinished( + taskHandle: MessageMoveTaskHandle +) extends QueueManagerMsg[Unit] +case class CancelMessageMoveTask( + taskHandle: MessageMoveTaskHandle +) extends QueueManagerMsg[Either[ElasticMQError, Long]] diff --git a/core/src/main/scala/org/elasticmq/msg/QueueMsg.scala b/core/src/main/scala/org/elasticmq/msg/QueueMsg.scala index 7e28e3dad..a8a8bf0f1 100644 --- a/core/src/main/scala/org/elasticmq/msg/QueueMsg.scala +++ b/core/src/main/scala/org/elasticmq/msg/QueueMsg.scala @@ -3,6 +3,7 @@ package org.elasticmq.msg import org.apache.pekko.actor.ActorRef import org.elasticmq._ import org.elasticmq.actor.queue.InternalMessage +import org.elasticmq.actor.queue.operations.MessageMoveTaskData import org.elasticmq.actor.reply.Replyable import java.time.Duration @@ -54,3 +55,16 @@ case class DeleteMessage(deliveryReceipt: DeliveryReceipt) extends QueueMessageM case class LookupMessage(messageId: MessageId) extends QueueMessageMsg[Option[MessageData]] case object DeduplicationIdsCleanup extends QueueMessageMsg[Unit] case class RestoreMessages(messages: List[InternalMessage]) extends QueueMessageMsg[Unit] +case class StartMovingMessages( + destinationQueue: ActorRef, + destinationArn: Option[String], + sourceArn: String, + maxNumberOfMessagesPerSecond: Option[Int], + queueManager: ActorRef +) extends QueueMessageMsg[Either[ElasticMQError, MessageMoveTaskHandle]] +case class CancelMovingMessages() extends QueueMessageMsg[Long] +case class MoveFirstMessage( + destinationQueue: ActorRef, + queueManager: ActorRef +) extends QueueMessageMsg[Unit] +case class GetMovingMessagesTasks() extends QueueMessageMsg[List[MessageMoveTaskData]] diff --git a/core/src/main/scala/org/elasticmq/msg/package.scala b/core/src/main/scala/org/elasticmq/msg/package.scala new file mode 100644 index 000000000..310c97d6c --- /dev/null +++ b/core/src/main/scala/org/elasticmq/msg/package.scala @@ -0,0 +1,5 @@ +package org.elasticmq + +package object msg { + type MessageMoveTaskHandle = String +} diff --git a/rest/rest-sqs-testing-amazon-java-sdk/src/test/scala/org/elasticmq/rest/sqs/AmazonJavaSdkTestSuite.scala b/rest/rest-sqs-testing-amazon-java-sdk/src/test/scala/org/elasticmq/rest/sqs/AmazonJavaSdkTestSuite.scala index bd83dce85..8700da0f0 100644 --- a/rest/rest-sqs-testing-amazon-java-sdk/src/test/scala/org/elasticmq/rest/sqs/AmazonJavaSdkTestSuite.scala +++ b/rest/rest-sqs-testing-amazon-java-sdk/src/test/scala/org/elasticmq/rest/sqs/AmazonJavaSdkTestSuite.scala @@ -24,7 +24,6 @@ import scala.util.control.Exception._ class AmazonJavaSdkTestSuite extends SqsClientServerCommunication with Matchers { val visibilityTimeoutAttribute = "VisibilityTimeout" val defaultVisibilityTimeoutAttribute = "VisibilityTimeout" - val redrivePolicyAttribute = "RedrivePolicy" val delaySecondsAttribute = "DelaySeconds" val receiveMessageWaitTimeSecondsAttribute = "ReceiveMessageWaitTimeSeconds" diff --git a/rest/rest-sqs-testing-amazon-java-sdk/src/test/scala/org/elasticmq/rest/sqs/AwsConfig.scala b/rest/rest-sqs-testing-amazon-java-sdk/src/test/scala/org/elasticmq/rest/sqs/AwsConfig.scala new file mode 100644 index 000000000..9bfdec729 --- /dev/null +++ b/rest/rest-sqs-testing-amazon-java-sdk/src/test/scala/org/elasticmq/rest/sqs/AwsConfig.scala @@ -0,0 +1,7 @@ +package org.elasticmq.rest.sqs + +trait AwsConfig { + + def awsRegion: String + def awsAccountId: String +} diff --git a/rest/rest-sqs-testing-amazon-java-sdk/src/test/scala/org/elasticmq/rest/sqs/MessageMoveTaskTest.scala b/rest/rest-sqs-testing-amazon-java-sdk/src/test/scala/org/elasticmq/rest/sqs/MessageMoveTaskTest.scala new file mode 100644 index 000000000..91ce88197 --- /dev/null +++ b/rest/rest-sqs-testing-amazon-java-sdk/src/test/scala/org/elasticmq/rest/sqs/MessageMoveTaskTest.scala @@ -0,0 +1,208 @@ +package org.elasticmq.rest.sqs + +import org.elasticmq.rest.sqs.client._ +import org.elasticmq.rest.sqs.model.RedrivePolicy +import org.elasticmq.rest.sqs.model.RedrivePolicyJson.format +import org.scalatest.concurrent.Eventually +import org.scalatest.funsuite.AnyFunSuite +import org.scalatest.matchers.should.Matchers +import spray.json.enrichAny + +import java.util.UUID +import scala.concurrent.duration.DurationInt + +abstract class MessageMoveTaskTest + extends AnyFunSuite + with HasSqsTestClient + with AwsConfig + with Matchers + with Eventually { + + private val NumMessages = 6 + private val DlqArn = s"arn:aws:sqs:$awsRegion:$awsAccountId:testQueue-dlq" + + test("should run message move task") { + // given + val (queue, dlq) = createQueuesAndPopulateDlq() + + // when: start message move task + testClient.startMessageMoveTask(DlqArn) + + // then: ensure that messages are moved back to the original queue + eventually(timeout(5.seconds), interval(100.millis)) { + fetchApproximateNumberOfMessages(queue) shouldEqual NumMessages + fetchApproximateNumberOfMessages(dlq) shouldEqual 0 + } + } + + test("should run message move task with max number of messages per second") { + // given + val (queue, dlq) = createQueuesAndPopulateDlq() + + // when: start message move task + testClient.startMessageMoveTask(DlqArn, maxNumberOfMessagesPerSecond = Some(1)) + + // then: ensure that not all messages were moved back to the original queue after 2 seconds + Thread.sleep(2000) + fetchApproximateNumberOfMessages(queue) should (be > 1 and be < 6) + fetchApproximateNumberOfMessages(dlq) should (be > 1 and be < 6) + } + + test("should not run two message move tasks in parallel") { + // given + val (queue, dlq) = createQueuesAndPopulateDlq() + + // when: start message move task + testClient.startMessageMoveTask(DlqArn, maxNumberOfMessagesPerSecond = Some(1)) + + // and: try to start another message move task + val result = testClient.startMessageMoveTask(DlqArn) + + // then + result shouldBe Left( + SqsClientError( + UnsupportedOperation, + "A message move task is already running on queue \"testQueue-dlq\"" + ) + ) + + // and: ensure that not all messages were moved back to the original queue after 2 seconds + Thread.sleep(2000) + fetchApproximateNumberOfMessages(queue) should (be > 1 and be < 6) + fetchApproximateNumberOfMessages(dlq) should (be > 1 and be < 6) + } + + test("should run message move task and list it") { + // given + val (queue, dlq) = createQueuesAndPopulateDlq() + + // when + val taskHandle = testClient.startMessageMoveTask(DlqArn, maxNumberOfMessagesPerSecond = Some(1)).right.get + + // and + val results = testClient.listMessageMoveTasks(DlqArn, maxResults = Some(10)).right.get + + // then + results.size shouldEqual 1 + results.head.taskHandle shouldBe taskHandle + results.head.status shouldBe "RUNNING" + } + + test("should run two message move task and list them in the correct order") { + // given + val (queue, dlq) = createQueuesAndPopulateDlq() + + // when + val firstTaskHandle = testClient.startMessageMoveTask(DlqArn).right.get + + // and + receiveAllMessagesTwice(queue) + + // and + val secondTaskHandle = testClient.startMessageMoveTask(DlqArn, maxNumberOfMessagesPerSecond = Some(1)).right.get + + // and + val results = testClient.listMessageMoveTasks(DlqArn, maxResults = Some(10)).right.get + + // then + results.size shouldEqual 2 + results(0).taskHandle shouldBe secondTaskHandle + results(0).status shouldBe "RUNNING" + results(1).taskHandle shouldBe firstTaskHandle + results(1).status shouldBe "COMPLETED" + } + + test("should fail to list tasks for non-existing source ARN") { + testClient.listMessageMoveTasks(s"arn:aws:sqs:$awsRegion:$awsAccountId:nonExistingQueue") shouldBe Left( + SqsClientError(QueueDoesNotExist, "The specified queue does not exist.") + ) + } + + test("should fail to list tasks for invalid ARN") { + testClient.listMessageMoveTasks(s"invalidArn") shouldBe Left( + SqsClientError(QueueDoesNotExist, "The specified queue does not exist.") + ) + } + + test("should run and cancel message move task") { + // given + val (queue, dlq) = createQueuesAndPopulateDlq() + + // when: start message move task + val taskHandle = testClient.startMessageMoveTask(DlqArn, maxNumberOfMessagesPerSecond = Some(1)).right.get + + // and: cancel the task after 2 seconds + Thread.sleep(2000) + val numMessagesMoved = testClient.cancelMessageMoveTask(taskHandle).right.get + + // and: fetch ApproximateNumberOfMessages + val numMessagesInMainQueue = fetchApproximateNumberOfMessages(queue) + val numMessagesInDlQueue = fetchApproximateNumberOfMessages(dlq) + + // then + numMessagesMoved shouldEqual numMessagesInMainQueue + + // then: ApproximateNumberOfMessages should not change after 2 seconds + Thread.sleep(2000) + fetchApproximateNumberOfMessages(queue) shouldEqual numMessagesInMainQueue + fetchApproximateNumberOfMessages(dlq) shouldEqual numMessagesInDlQueue + } + + test("should fail to cancel non-existing task") { + val randomTaskHandle = UUID.randomUUID().toString + testClient.cancelMessageMoveTask(randomTaskHandle) shouldBe Left( + SqsClientError(ResourceNotFound, s"The task handle ${'"'}$randomTaskHandle${'"'} is not valid or does not exist") + ) + } + + private def createQueuesAndPopulateDlq(): (QueueUrl, QueueUrl) = { + val dlq = testClient.createQueue("testQueue-dlq") + val redrivePolicy = RedrivePolicy("testQueue-dlq", awsRegion, awsAccountId, 1).toJson.compactPrint + val queue = + testClient.createQueue( + "testQueue", + attributes = Map( + RedrivePolicyAttributeName -> redrivePolicy, + VisibilityTimeoutAttributeName -> "1" + ) + ) + + // when: send messages + for (i <- 0 until NumMessages) { + testClient.sendMessage(queue, "Test message " + i) + } + + // and: receive messages twice to make them move to DLQ + receiveAllMessagesTwice(queue) + + // then: ensure that messages are in DLQ + eventually(timeout(2.seconds), interval(100.millis)) { + fetchApproximateNumberOfMessages(queue) shouldEqual 0 + fetchApproximateNumberOfMessages(dlq) shouldEqual NumMessages + } + + (queue, dlq) + } + + private def receiveAllMessagesTwice(queue: QueueUrl): Unit = { + for (_ <- 0 until NumMessages) { + testClient.receiveMessage(queue) + } + + Thread.sleep(1500) + for (_ <- 0 until NumMessages) { + testClient.receiveMessage(queue) + } + } + + private def fetchApproximateNumberOfMessages(queueUrl: String): Int = { + testClient + .getQueueAttributes(queueUrl, ApproximateNumberOfMessagesAttributeName)( + ApproximateNumberOfMessagesAttributeName.value + ) + .toInt + } +} + +class MessageMoveTaskSdkV1Test extends MessageMoveTaskTest with SqsClientServerCommunication +class MessageMoveTaskSdkV2Test extends MessageMoveTaskTest with SqsClientServerWithSdkV2Communication diff --git a/rest/rest-sqs-testing-amazon-java-sdk/src/test/scala/org/elasticmq/rest/sqs/SqsClientServerCommunication.scala b/rest/rest-sqs-testing-amazon-java-sdk/src/test/scala/org/elasticmq/rest/sqs/SqsClientServerCommunication.scala index ae094cd89..3a404c57c 100644 --- a/rest/rest-sqs-testing-amazon-java-sdk/src/test/scala/org/elasticmq/rest/sqs/SqsClientServerCommunication.scala +++ b/rest/rest-sqs-testing-amazon-java-sdk/src/test/scala/org/elasticmq/rest/sqs/SqsClientServerCommunication.scala @@ -5,16 +5,19 @@ import com.amazonaws.client.builder.AwsClientBuilder.EndpointConfiguration import com.amazonaws.services.sqs.model.{Message, ReceiveMessageRequest} import com.amazonaws.services.sqs.{AmazonSQS, AmazonSQSClientBuilder} import org.apache.http.impl.client.{CloseableHttpClient, HttpClients} +import org.elasticmq.rest.sqs.client.{AwsSdkV1SqsClient, SqsClient} import org.elasticmq.util.Logging import org.elasticmq.{NodeAddress, RelaxedSQSLimits} -import org.scalatest.{Args, BeforeAndAfter, Status} import org.scalatest.funsuite.AnyFunSuite +import org.scalatest.{Args, BeforeAndAfter, Status} import scala.collection.JavaConverters._ import scala.util.Try trait SqsClientServerCommunication extends AnyFunSuite with BeforeAndAfter with Logging { + var testClient: SqsClient = _ + var client: AmazonSQS = _ // strict server var relaxedClient: AmazonSQS = _ var httpClient: CloseableHttpClient = _ @@ -28,6 +31,8 @@ trait SqsClientServerCommunication extends AnyFunSuite with BeforeAndAfter with val ServiceEndpoint = "http://localhost:9321" + val redrivePolicyAttribute = "RedrivePolicy" + before { logger.info(s"\n---\nRunning test: $currentTestName\n---\n") @@ -53,6 +58,8 @@ trait SqsClientServerCommunication extends AnyFunSuite with BeforeAndAfter with .withEndpointConfiguration(new EndpointConfiguration(ServiceEndpoint, "us-east-1")) .build() + testClient = new AwsSdkV1SqsClient(client) + relaxedClient = AmazonSQSClientBuilder .standard() .withCredentials(new AWSStaticCredentialsProvider(new BasicAWSCredentials("x", "x"))) diff --git a/rest/rest-sqs-testing-amazon-java-sdk/src/test/scala/org/elasticmq/rest/sqs/SqsClientServerWithSdkV2Communication.scala b/rest/rest-sqs-testing-amazon-java-sdk/src/test/scala/org/elasticmq/rest/sqs/SqsClientServerWithSdkV2Communication.scala index 217f0427c..9223e9eec 100644 --- a/rest/rest-sqs-testing-amazon-java-sdk/src/test/scala/org/elasticmq/rest/sqs/SqsClientServerWithSdkV2Communication.scala +++ b/rest/rest-sqs-testing-amazon-java-sdk/src/test/scala/org/elasticmq/rest/sqs/SqsClientServerWithSdkV2Communication.scala @@ -1,20 +1,23 @@ package org.elasticmq.rest.sqs +import org.elasticmq.rest.sqs.client.{AwsSdkV2SqsClient, SqsClient} import org.elasticmq.util.Logging import org.elasticmq.{NodeAddress, RelaxedSQSLimits} +import org.scalatest.BeforeAndAfter import org.scalatest.funsuite.AnyFunSuite -import org.scalatest.{Args, BeforeAndAfter, Status} import software.amazon.awssdk.auth.credentials.{AwsBasicCredentials, StaticCredentialsProvider} import software.amazon.awssdk.regions.Region -import software.amazon.awssdk.services.sqs.{SqsClient, SqsClientBuilder} +import software.amazon.awssdk.services.sqs.{SqsClient => AwsSqsClient} import java.net.URI import scala.util.Try trait SqsClientServerWithSdkV2Communication extends AnyFunSuite with BeforeAndAfter with Logging { - var clientV2: SqsClient = _ // strict server - var relaxedClientV2: SqsClient = _ + var testClient: SqsClient = _ + + var clientV2: AwsSqsClient = _ // strict server + var relaxedClientV2: AwsSqsClient = _ var currentTestName: String = _ @@ -42,14 +45,16 @@ trait SqsClientServerWithSdkV2Communication extends AnyFunSuite with BeforeAndAf strictServer.waitUntilStarted() relaxedServer.waitUntilStarted() - clientV2 = SqsClient + clientV2 = AwsSqsClient .builder() .credentialsProvider(StaticCredentialsProvider.create(AwsBasicCredentials.create("x", "x"))) .region(Region.EU_CENTRAL_1) .endpointOverride(new URI("http://localhost:9321")) .build() - relaxedClientV2 = SqsClient + testClient = new AwsSdkV2SqsClient(clientV2) + + relaxedClientV2 = AwsSqsClient .builder() .credentialsProvider(StaticCredentialsProvider.create(AwsBasicCredentials.create("x", "x"))) .region(Region.EU_CENTRAL_1) diff --git a/rest/rest-sqs-testing-amazon-java-sdk/src/test/scala/org/elasticmq/rest/sqs/client/AwsSdkV1SqsClient.scala b/rest/rest-sqs-testing-amazon-java-sdk/src/test/scala/org/elasticmq/rest/sqs/client/AwsSdkV1SqsClient.scala new file mode 100644 index 000000000..c896f5f69 --- /dev/null +++ b/rest/rest-sqs-testing-amazon-java-sdk/src/test/scala/org/elasticmq/rest/sqs/client/AwsSdkV1SqsClient.scala @@ -0,0 +1,114 @@ +package org.elasticmq.rest.sqs.client + +import com.amazonaws.services.sqs.AmazonSQS +import com.amazonaws.services.sqs.model.{CancelMessageMoveTaskRequest, CreateQueueRequest, GetQueueAttributesRequest, ListMessageMoveTasksRequest, QueueDoesNotExistException, ReceiveMessageRequest, ResourceNotFoundException, SendMessageRequest, StartMessageMoveTaskRequest, UnsupportedOperationException} + +import scala.collection.JavaConverters._ + +class AwsSdkV1SqsClient(client: AmazonSQS) extends SqsClient { + + override def createQueue( + queueName: String, + attributes: Map[ + QueueAttributeName, + String + ] = Map.empty + ): QueueUrl = client + .createQueue( + new CreateQueueRequest() + .withQueueName(queueName) + .withAttributes(attributes.map { case (k, v) => (k.value, v) }.asJava) + ) + .getQueueUrl + + override def sendMessage( + queueUrl: QueueUrl, + messageBody: String + ): Unit = client.sendMessage( + new SendMessageRequest() + .withQueueUrl(queueUrl) + .withMessageBody(messageBody) + ) + + override def receiveMessage(queueUrl: QueueUrl): List[ReceivedMessage] = + client.receiveMessage(new ReceiveMessageRequest().withQueueUrl(queueUrl)).getMessages.asScala.toList.map { msg => + ReceivedMessage(msg.getMessageId, msg.getReceiptHandle, msg.getBody) + } + + override def getQueueAttributes( + queueUrl: QueueUrl, + attributeNames: QueueAttributeName* + ): Map[String, String] = client + .getQueueAttributes( + new GetQueueAttributesRequest() + .withQueueUrl(queueUrl) + .withAttributeNames(attributeNames.toList.map(_.value).asJava) + ) + .getAttributes + .asScala + .toMap + + override def startMessageMoveTask( + sourceArn: Arn, + maxNumberOfMessagesPerSecond: Option[Int] + ): Either[SqsClientError, TaskHandle] = interceptErrors { + client + .startMessageMoveTask( + new StartMessageMoveTaskRequest() + .withSourceArn(sourceArn) + .withMaxNumberOfMessagesPerSecond(maxNumberOfMessagesPerSecond match { + case Some(value) => value + case None => null + }) + ) + .getTaskHandle + } + + override def listMessageMoveTasks( + sourceArn: Arn, + maxResults: Option[Int] + ): Either[SqsClientError, List[MessageMoveTask]] = interceptErrors { + client + .listMessageMoveTasks( + new ListMessageMoveTasksRequest() + .withSourceArn(sourceArn) + .withMaxResults(maxResults match { + case Some(value) => value + case None => null + }) + ) + .getResults + .asScala + .toList + .map { task => + MessageMoveTask( + task.getTaskHandle, + task.getSourceArn, + task.getStatus, + Option(task.getMaxNumberOfMessagesPerSecond).map(_.intValue()) + ) + } + } + + override def cancelMessageMoveTask( + taskHandle: TaskHandle + ): Either[SqsClientError, ApproximateNumberOfMessagesMoved] = interceptErrors { + client + .cancelMessageMoveTask(new CancelMessageMoveTaskRequest().withTaskHandle(taskHandle)) + .getApproximateNumberOfMessagesMoved + } + + private def interceptErrors[T](f: => T): Either[SqsClientError, T] = { + try { + Right(f) + } catch { + case e: UnsupportedOperationException => + Left(SqsClientError(UnsupportedOperation, e.getErrorMessage)) + case e: ResourceNotFoundException => + Left(SqsClientError(ResourceNotFound, e.getErrorMessage)) + case e: QueueDoesNotExistException => + Left(SqsClientError(QueueDoesNotExist, e.getErrorMessage)) + case e: Exception => Left(SqsClientError(UnknownSqsClientErrorType, e.getMessage)) + } + } +} diff --git a/rest/rest-sqs-testing-amazon-java-sdk/src/test/scala/org/elasticmq/rest/sqs/client/AwsSdkV2SqsClient.scala b/rest/rest-sqs-testing-amazon-java-sdk/src/test/scala/org/elasticmq/rest/sqs/client/AwsSdkV2SqsClient.scala new file mode 100644 index 000000000..48fd325f9 --- /dev/null +++ b/rest/rest-sqs-testing-amazon-java-sdk/src/test/scala/org/elasticmq/rest/sqs/client/AwsSdkV2SqsClient.scala @@ -0,0 +1,124 @@ +package org.elasticmq.rest.sqs.client +import software.amazon.awssdk.services.sqs.model.{CancelMessageMoveTaskRequest, CreateQueueRequest, GetQueueAttributesRequest, ListMessageMoveTasksRequest, QueueDoesNotExistException, ReceiveMessageRequest, ResourceNotFoundException, SendMessageRequest, StartMessageMoveTaskRequest, UnsupportedOperationException, QueueAttributeName => AwsQueueAttributeName} + +import scala.collection.JavaConverters._ + +class AwsSdkV2SqsClient(client: software.amazon.awssdk.services.sqs.SqsClient) extends SqsClient { + + override def createQueue( + queueName: String, + attributes: Map[ + QueueAttributeName, + String + ] = Map.empty + ): QueueUrl = client + .createQueue( + CreateQueueRequest + .builder() + .queueName(queueName) + .attributes(attributes.map { case (k, v) => (AwsQueueAttributeName.fromValue(k.value), v) }.asJava) + .build() + ) + .queueUrl() + + override def sendMessage( + queueUrl: QueueUrl, + messageBody: String + ): Unit = client.sendMessage( + SendMessageRequest + .builder() + .queueUrl(queueUrl) + .messageBody(messageBody) + .build() + ) + + override def receiveMessage(queueUrl: QueueUrl): List[ReceivedMessage] = + client.receiveMessage(ReceiveMessageRequest.builder().queueUrl(queueUrl).build()).messages().asScala.toList.map { + msg => + ReceivedMessage(msg.messageId(), msg.receiptHandle(), msg.body()) + } + + override def getQueueAttributes( + queueUrl: QueueUrl, + attributeNames: QueueAttributeName* + ): Map[String, String] = client + .getQueueAttributes( + GetQueueAttributesRequest + .builder() + .queueUrl(queueUrl) + .attributeNames(attributeNames.toList.map(atr => AwsQueueAttributeName.fromValue(atr.value)).asJava) + .build() + ) + .attributes() + .asScala + .map { case (k, v) => (k.toString, v) } + .toMap + + override def startMessageMoveTask( + sourceArn: Arn, + maxNumberOfMessagesPerSecond: Option[Int] + ): Either[SqsClientError, TaskHandle] = interceptErrors { + client + .startMessageMoveTask( + StartMessageMoveTaskRequest + .builder() + .sourceArn(sourceArn) + .maxNumberOfMessagesPerSecond(maxNumberOfMessagesPerSecond match { + case Some(value) => value + case None => null + }) + .build() + ) + .taskHandle() + } + + override def listMessageMoveTasks( + sourceArn: Arn, + maxResults: Option[Int] + ): Either[SqsClientError, List[MessageMoveTask]] = interceptErrors { + client + .listMessageMoveTasks( + ListMessageMoveTasksRequest + .builder() + .sourceArn(sourceArn) + .maxResults(maxResults match { + case Some(value) => value + case None => null + }) + .build() + ) + .results() + .asScala + .toList + .map { task => + MessageMoveTask( + task.taskHandle(), + task.sourceArn(), + task.status().toString, + Option(task.maxNumberOfMessagesPerSecond()).map(_.intValue()) + ) + } + } + + override def cancelMessageMoveTask( + taskHandle: TaskHandle + ): Either[SqsClientError, ApproximateNumberOfMessagesMoved] = interceptErrors { + client + .cancelMessageMoveTask(CancelMessageMoveTaskRequest.builder().taskHandle(taskHandle).build()) + .approximateNumberOfMessagesMoved() + } + + private def interceptErrors[T](f: => T): Either[SqsClientError, T] = { + try { + Right(f) + } catch { + case e: UnsupportedOperationException => + Left(SqsClientError(UnsupportedOperation, e.awsErrorDetails().errorMessage())) + case e: ResourceNotFoundException => + Left(SqsClientError(ResourceNotFound, e.awsErrorDetails().errorMessage())) + case e: QueueDoesNotExistException => + Left(SqsClientError(QueueDoesNotExist, e.awsErrorDetails().errorMessage())) + case e: Exception => Left(SqsClientError(UnknownSqsClientErrorType, e.getMessage)) + } + } +} diff --git a/rest/rest-sqs-testing-amazon-java-sdk/src/test/scala/org/elasticmq/rest/sqs/client/HasSqsTestClient.scala b/rest/rest-sqs-testing-amazon-java-sdk/src/test/scala/org/elasticmq/rest/sqs/client/HasSqsTestClient.scala new file mode 100644 index 000000000..6e9792974 --- /dev/null +++ b/rest/rest-sqs-testing-amazon-java-sdk/src/test/scala/org/elasticmq/rest/sqs/client/HasSqsTestClient.scala @@ -0,0 +1,6 @@ +package org.elasticmq.rest.sqs.client + +trait HasSqsTestClient { + + def testClient: SqsClient +} diff --git a/rest/rest-sqs-testing-amazon-java-sdk/src/test/scala/org/elasticmq/rest/sqs/client/SqsClient.scala b/rest/rest-sqs-testing-amazon-java-sdk/src/test/scala/org/elasticmq/rest/sqs/client/SqsClient.scala new file mode 100644 index 000000000..7fb188b56 --- /dev/null +++ b/rest/rest-sqs-testing-amazon-java-sdk/src/test/scala/org/elasticmq/rest/sqs/client/SqsClient.scala @@ -0,0 +1,12 @@ +package org.elasticmq.rest.sqs.client + +trait SqsClient { + + def createQueue(queueName: String, attributes: Map[QueueAttributeName, String] = Map.empty): QueueUrl + def sendMessage(queueUrl: QueueUrl, messageBody: String): Unit + def receiveMessage(queueUrl: QueueUrl): List[ReceivedMessage] + def getQueueAttributes(queueUrl: QueueUrl, attributeNames: QueueAttributeName*): Map[String, String] + def startMessageMoveTask(sourceArn: Arn, maxNumberOfMessagesPerSecond: Option[Int] = None): Either[SqsClientError, TaskHandle] + def listMessageMoveTasks(sourceArn: Arn, maxResults: Option[Int] = None): Either[SqsClientError, List[MessageMoveTask]] + def cancelMessageMoveTask(taskHandle: TaskHandle): Either[SqsClientError, ApproximateNumberOfMessagesMoved] +} diff --git a/rest/rest-sqs-testing-amazon-java-sdk/src/test/scala/org/elasticmq/rest/sqs/client/SqsClientError.scala b/rest/rest-sqs-testing-amazon-java-sdk/src/test/scala/org/elasticmq/rest/sqs/client/SqsClientError.scala new file mode 100644 index 000000000..7e6413b75 --- /dev/null +++ b/rest/rest-sqs-testing-amazon-java-sdk/src/test/scala/org/elasticmq/rest/sqs/client/SqsClientError.scala @@ -0,0 +1,3 @@ +package org.elasticmq.rest.sqs.client + +case class SqsClientError(errorType: SqsClientErrorType, message: String) diff --git a/rest/rest-sqs-testing-amazon-java-sdk/src/test/scala/org/elasticmq/rest/sqs/client/package.scala b/rest/rest-sqs-testing-amazon-java-sdk/src/test/scala/org/elasticmq/rest/sqs/client/package.scala new file mode 100644 index 000000000..6d67fc515 --- /dev/null +++ b/rest/rest-sqs-testing-amazon-java-sdk/src/test/scala/org/elasticmq/rest/sqs/client/package.scala @@ -0,0 +1,52 @@ +package org.elasticmq.rest.sqs + +package object client { + type QueueUrl = String + type Arn = String + type TaskHandle = String + type MessageMoveTaskStatus = String + type ApproximateNumberOfMessagesMoved = Long +} + +package client { + sealed abstract class QueueAttributeName(val value: String) + case object AllAttributeNames extends QueueAttributeName("All") + case object PolicyAttributeName extends QueueAttributeName("Policy") + case object VisibilityTimeoutAttributeName extends QueueAttributeName("VisibilityTimeout") + case object MaximumMessageSizeAttributeName extends QueueAttributeName("MaximumMessageSize") + case object MessageRetentionPeriodAttributeName extends QueueAttributeName("MessageRetentionPeriod") + case object ApproximateNumberOfMessagesAttributeName extends QueueAttributeName("ApproximateNumberOfMessages") + case object ApproximateNumberOfMessagesNotVisibleAttributeName + extends QueueAttributeName("ApproximateNumberOfMessagesNotVisible") + case object CreatedTimestampAttributeName extends QueueAttributeName("CreatedTimestamp") + case object LastModifiedTimestampAttributeName extends QueueAttributeName("LastModifiedTimestamp") + case object QueueArnAttributeName extends QueueAttributeName("QueueArn") + case object ApproximateNumberOfMessagesDelayedAttributeName + extends QueueAttributeName("ApproximateNumberOfMessagesDelayed") + case object DelaySecondsAttributeName extends QueueAttributeName("DelaySeconds") + case object ReceiveMessageWaitTimeSecondsAttributeName extends QueueAttributeName("ReceiveMessageWaitTimeSeconds") + case object RedrivePolicyAttributeName extends QueueAttributeName("RedrivePolicy") + case object FifoQueueAttributeName extends QueueAttributeName("FifoQueue") + case object ContentBasedDeduplicationAttributeName extends QueueAttributeName("ContentBasedDeduplication") + case object KmsMasterKeyIdAttributeName extends QueueAttributeName("KmsMasterKeyId") + case object KmsDataKeyReusePeriodSecondsAttributeName extends QueueAttributeName("KmsDataKeyReusePeriodSeconds") + case object DeduplicationScopeAttributeName extends QueueAttributeName("DeduplicationScope") + case object FifoThroughputLimitAttributeName extends QueueAttributeName("FifoThroughputLimit") + case object RedriveAllowPolicyAttributeName extends QueueAttributeName("RedriveAllowPolicy") + case object SqsManagedSseEnabledAttributeName extends QueueAttributeName("SqsManagedSseEnabled") + + case class ReceivedMessage(messageId: String, receiptHandle: String, body: String) + + case class MessageMoveTask( + taskHandle: TaskHandle, + sourceArn: Arn, + status: MessageMoveTaskStatus, + maxNumberOfMessagesPerSecond: Option[Int] + ) + + sealed trait SqsClientErrorType + case object UnsupportedOperation extends SqsClientErrorType + case object ResourceNotFound extends SqsClientErrorType + case object QueueDoesNotExist extends SqsClientErrorType + case object UnknownSqsClientErrorType extends SqsClientErrorType +} diff --git a/rest/rest-sqs/src/main/scala/org/elasticmq/rest/sqs/Action.scala b/rest/rest-sqs/src/main/scala/org/elasticmq/rest/sqs/Action.scala index c2570c360..5623b637a 100644 --- a/rest/rest-sqs/src/main/scala/org/elasticmq/rest/sqs/Action.scala +++ b/rest/rest-sqs/src/main/scala/org/elasticmq/rest/sqs/Action.scala @@ -21,4 +21,7 @@ object Action extends Enumeration { val SendMessage = Value("SendMessage") val TagQueue = Value("TagQueue") val UntagQueue = Value("UntagQueue") + val StartMessageMoveTask = Value("StartMessageMoveTask") + val CancelMessageMoveTask = Value("CancelMessageMoveTask") + val ListMessageMoveTasks = Value("ListMessageMoveTasks") } diff --git a/rest/rest-sqs/src/main/scala/org/elasticmq/rest/sqs/ArnSupport.scala b/rest/rest-sqs/src/main/scala/org/elasticmq/rest/sqs/ArnSupport.scala new file mode 100644 index 000000000..52fa8042b --- /dev/null +++ b/rest/rest-sqs/src/main/scala/org/elasticmq/rest/sqs/ArnSupport.scala @@ -0,0 +1,12 @@ +package org.elasticmq.rest.sqs + +trait ArnSupport { + + private val ArnPattern = "(?:.+:(.+)?:(.+)?:)?(.+)".r + + def extractQueueName(arn: String): String = + arn match { + case ArnPattern(_, _, queueName) => queueName + case _ => throw new SQSException("InvalidParameterValue") + } +} diff --git a/rest/rest-sqs/src/main/scala/org/elasticmq/rest/sqs/CancelMessageMoveTaskDirectives.scala b/rest/rest-sqs/src/main/scala/org/elasticmq/rest/sqs/CancelMessageMoveTaskDirectives.scala new file mode 100644 index 000000000..9d16ab01b --- /dev/null +++ b/rest/rest-sqs/src/main/scala/org/elasticmq/rest/sqs/CancelMessageMoveTaskDirectives.scala @@ -0,0 +1,67 @@ +package org.elasticmq.rest.sqs + +import org.apache.pekko.http.scaladsl.server.Route +import org.elasticmq.ElasticMQError +import org.elasticmq.actor.reply._ +import org.elasticmq.msg.CancelMessageMoveTask +import org.elasticmq.rest.sqs.Action.{CancelMessageMoveTask => CancelMessageMoveTaskAction} +import org.elasticmq.rest.sqs.Constants._ +import org.elasticmq.rest.sqs.SQSException.ElasticMQErrorOps +import org.elasticmq.rest.sqs.directives.ElasticMQDirectives +import org.elasticmq.rest.sqs.model.RequestPayload +import spray.json.DefaultJsonProtocol._ +import spray.json.RootJsonFormat + +import scala.async.Async._ + +trait CancelMessageMoveTaskDirectives { this: ElasticMQDirectives with QueueURLModule with ResponseMarshaller => + + def cancelMessageMoveTask(p: RequestPayload)(implicit marshallerDependencies: MarshallerDependencies): Route = { + p.action(CancelMessageMoveTaskAction) { + val params = p.as[CancelMessageMoveTaskRequest] + async { + await( + queueManagerActor ? CancelMessageMoveTask(params.TaskHandle) + ) match { + case Left(e: ElasticMQError) => throw e.toSQSException + case Right(approximateNumberOfMessagesMoved) => + complete(CancelMessageMoveTaskResponse(approximateNumberOfMessagesMoved)) + } + } + } + } +} + +case class CancelMessageMoveTaskRequest( + TaskHandle: String +) + +object CancelMessageMoveTaskRequest { + implicit val requestJsonFormat: RootJsonFormat[CancelMessageMoveTaskRequest] = jsonFormat1( + CancelMessageMoveTaskRequest.apply + ) + + implicit val requestParamReader: FlatParamsReader[CancelMessageMoveTaskRequest] = + new FlatParamsReader[CancelMessageMoveTaskRequest] { + override def read(params: Map[String, String]): CancelMessageMoveTaskRequest = { + new CancelMessageMoveTaskRequest( + requiredParameter(params)(TaskHandleParameter) + ) + } + } +} + +case class CancelMessageMoveTaskResponse(ApproximateNumberOfMessagesMoved: Long) + +object CancelMessageMoveTaskResponse { + implicit val format: RootJsonFormat[CancelMessageMoveTaskResponse] = jsonFormat1(CancelMessageMoveTaskResponse.apply) + + implicit val xmlSerializer: XmlSerializer[CancelMessageMoveTaskResponse] = t => + + {t.ApproximateNumberOfMessagesMoved} + + + {EmptyRequestId} + + +} diff --git a/rest/rest-sqs/src/main/scala/org/elasticmq/rest/sqs/ListMessageMoveTasksDirectives.scala b/rest/rest-sqs/src/main/scala/org/elasticmq/rest/sqs/ListMessageMoveTasksDirectives.scala new file mode 100644 index 000000000..81cd61b84 --- /dev/null +++ b/rest/rest-sqs/src/main/scala/org/elasticmq/rest/sqs/ListMessageMoveTasksDirectives.scala @@ -0,0 +1,111 @@ +package org.elasticmq.rest.sqs + +import org.apache.pekko.http.scaladsl.server.Route +import org.elasticmq.actor.reply._ +import org.elasticmq.msg.GetMovingMessagesTasks +import org.elasticmq.rest.sqs.Action.{ListMessageMoveTasks => ListMessageMoveTasksAction} +import org.elasticmq.rest.sqs.Constants._ +import org.elasticmq.rest.sqs.directives.ElasticMQDirectives +import org.elasticmq.rest.sqs.model.RequestPayload +import spray.json.DefaultJsonProtocol._ +import spray.json.RootJsonFormat + +trait ListMessageMoveTasksDirectives extends ArnSupport { + this: ElasticMQDirectives with QueueURLModule with ResponseMarshaller => + + def listMessageMoveTasks(p: RequestPayload)(implicit marshallerDependencies: MarshallerDependencies): Route = { + p.action(ListMessageMoveTasksAction) { + val params = p.as[ListMessageMoveTasksRequest] + val sourceQueueName = extractQueueName(params.SourceArn) + queueActorAndDataFromQueueName(sourceQueueName) { (sourceQueue, _) => + for { + tasks <- sourceQueue ? GetMovingMessagesTasks() + } yield { + complete { + ListMessageMoveTasksResponse( + tasks.map(task => + MessageMoveTaskResponse( + task.numberOfMessagesMoved, + task.numberOfMessagesToMove, + task.destinationArn, + None, + task.maxNumberOfMessagesPerSecond, + task.sourceArn, + task.startedTimestamp, + task.status, + task.taskHandle + ) + ) + ) + } + } + } + } + } +} + +case class ListMessageMoveTasksRequest( + MaxResults: Option[Int], + SourceArn: String +) + +object ListMessageMoveTasksRequest { + implicit val requestJsonFormat: RootJsonFormat[ListMessageMoveTasksRequest] = jsonFormat2( + ListMessageMoveTasksRequest.apply + ) + + implicit val requestParamReader: FlatParamsReader[ListMessageMoveTasksRequest] = + new FlatParamsReader[ListMessageMoveTasksRequest] { + override def read(params: Map[String, String]): ListMessageMoveTasksRequest = { + new ListMessageMoveTasksRequest( + optionalParameter(params)(MaxResultsParameter).map(_.toInt), + requiredParameter(params)(SourceArnParameter) + ) + } + } +} + +case class MessageMoveTaskResponse( + ApproximateNumberOfMessagesMoved: Long, + ApproximateNumberOfMessagesToMove: Long, + DestinationArn: Option[String], + FailureReason: Option[String], + MaxNumberOfMessagesPerSecond: Option[Int], + SourceArn: String, + StartedTimestamp: Long, + Status: String, + TaskHandle: String +) +case class ListMessageMoveTasksResponse(Results: List[MessageMoveTaskResponse]) + +object ListMessageMoveTasksResponse { + implicit val format: RootJsonFormat[MessageMoveTaskResponse] = jsonFormat9(MessageMoveTaskResponse.apply) + implicit val formatList: RootJsonFormat[ListMessageMoveTasksResponse] = jsonFormat1( + ListMessageMoveTasksResponse.apply + ) + + implicit val xmlSerializer: XmlSerializer[ListMessageMoveTasksResponse] = t => + + {t.Results.map(taskToEntry)} + + + {EmptyRequestId} + + + + private def taskToEntry(task: MessageMoveTaskResponse) = + + {task.ApproximateNumberOfMessagesMoved} + {task.ApproximateNumberOfMessagesToMove} + {task.DestinationArn.map(arn => {arn}).getOrElse("")} + { + task.MaxNumberOfMessagesPerSecond + .map(v => {v}) + .getOrElse("") + } + {task.SourceArn} + {task.StartedTimestamp} + {task.Status} + {task.TaskHandle} + +} diff --git a/rest/rest-sqs/src/main/scala/org/elasticmq/rest/sqs/SQSException.scala b/rest/rest-sqs/src/main/scala/org/elasticmq/rest/sqs/SQSException.scala index a9161e3b9..7ca1c7f8a 100644 --- a/rest/rest-sqs/src/main/scala/org/elasticmq/rest/sqs/SQSException.scala +++ b/rest/rest-sqs/src/main/scala/org/elasticmq/rest/sqs/SQSException.scala @@ -1,8 +1,9 @@ package org.elasticmq.rest.sqs -import scala.xml.Elem +import org.elasticmq.ElasticMQError +import org.elasticmq.rest.sqs.Constants._ -import Constants._ +import scala.xml.Elem class SQSException( val code: String, @@ -76,4 +77,15 @@ object SQSException { errorType = "com.amazonaws.sqs#QueueDoesNotExist", errorMessage = Some("The specified queue does not exist.") ) + + implicit class ElasticMQErrorOps(e: ElasticMQError) { + def toSQSException: SQSException = { + val errorType = e.code match { + case "AWS.SimpleQueueService.UnsupportedOperation" => Some("com.amazonaws.sqs#UnsupportedOperation") + case "ResourceNotFoundException" => Some("com.amazonaws.sqs#ResourceNotFoundException") + case _ => None + } + new SQSException(e.code, errorType = errorType.getOrElse("Sender"), errorMessage = Some(e.message)) + } + } } diff --git a/rest/rest-sqs/src/main/scala/org/elasticmq/rest/sqs/SQSRestServerBuilder.scala b/rest/rest-sqs/src/main/scala/org/elasticmq/rest/sqs/SQSRestServerBuilder.scala index 4ae9fcb74..445e76e4e 100644 --- a/rest/rest-sqs/src/main/scala/org/elasticmq/rest/sqs/SQSRestServerBuilder.scala +++ b/rest/rest-sqs/src/main/scala/org/elasticmq/rest/sqs/SQSRestServerBuilder.scala @@ -11,7 +11,7 @@ import org.elasticmq.actor.QueueManagerActor import org.elasticmq.metrics.QueuesMetrics import org.elasticmq.rest.sqs.Constants._ import org.elasticmq.rest.sqs.XmlNsVersion.extractXmlNs -import org.elasticmq.rest.sqs.directives.{AnyParamDirectives, AWSProtocolDirectives, ElasticMQDirectives, UnmatchedActionRoutes} +import org.elasticmq.rest.sqs.directives.{ AnyParamDirectives,AWSProtocolDirectives, ElasticMQDirectives, UnmatchedActionRoutes} import org.elasticmq.rest.sqs.model.RequestPayload import org.elasticmq.util.{Logging, NowProvider} @@ -165,7 +165,10 @@ case class TheSQSRestServerBuilder( with UnmatchedActionRoutes with ResponseMarshaller with QueueAttributesOps - with ListDeadLetterSourceQueuesDirectives { + with ListDeadLetterSourceQueuesDirectives + with StartMessageMoveTaskDirectives + with CancelMessageMoveTaskDirectives + with ListMessageMoveTasksDirectives { def serverAddress = currentServerAddress.get() @@ -206,6 +209,9 @@ case class TheSQSRestServerBuilder( untagQueue(p) ~ listQueueTags(p) ~ listDeadLetterSourceQueues(p) ~ + startMessageMoveTask(p) ~ + cancelMessageMoveTask(p) ~ + listMessageMoveTasks(p) ~ // 4. Unmatched action unmatchedAction(p) @@ -332,6 +338,10 @@ object Constants { val MaxResultsParameter = "MaxResults" val NextTokenParameter = "NextToken" val QueueNamePrefixParameter = "QueueNamePrefix" + val SourceArnParameter = "SourceArn" + val DestinationArnParameter = "DestinationArn" + val MaxNumberOfMessagesPerSecondParameter = "MaxNumberOfMessagesPerSecond" + val TaskHandleParameter = "TaskHandle" } object ParametersUtil { diff --git a/rest/rest-sqs/src/main/scala/org/elasticmq/rest/sqs/StartMessageMoveTaskDirectives.scala b/rest/rest-sqs/src/main/scala/org/elasticmq/rest/sqs/StartMessageMoveTaskDirectives.scala new file mode 100644 index 000000000..178fe9a23 --- /dev/null +++ b/rest/rest-sqs/src/main/scala/org/elasticmq/rest/sqs/StartMessageMoveTaskDirectives.scala @@ -0,0 +1,103 @@ +package org.elasticmq.rest.sqs + +import org.apache.pekko.actor.ActorRef +import org.apache.pekko.http.scaladsl.server.Route +import org.elasticmq.ElasticMQError +import org.elasticmq.actor.reply._ +import org.elasticmq.msg.StartMessageMoveTask +import org.elasticmq.rest.sqs.Action.{StartMessageMoveTask => StartMessageMoveTaskAction} +import org.elasticmq.rest.sqs.Constants._ +import org.elasticmq.rest.sqs.SQSException.ElasticMQErrorOps +import org.elasticmq.rest.sqs.directives.ElasticMQDirectives +import org.elasticmq.rest.sqs.model.RequestPayload +import spray.json.DefaultJsonProtocol._ +import spray.json.RootJsonFormat + +trait StartMessageMoveTaskDirectives extends ArnSupport { + this: ElasticMQDirectives with QueueURLModule with ResponseMarshaller => + + def startMessageMoveTask(p: RequestPayload)(implicit marshallerDependencies: MarshallerDependencies): Route = { + p.action(StartMessageMoveTaskAction) { + val params = p.as[StartMessageMoveTaskActionRequest] + val sourceQueueName = extractQueueName(params.SourceArn) + queueActorAndDataFromQueueName(sourceQueueName) { (sourceQueue, _) => + params.DestinationArn match { + case Some(destinationQueueArn) => + val destinationQueueName = extractQueueName(destinationQueueArn) + queueActorAndDataFromQueueName(destinationQueueName) { (destinationQueue, _) => + startMessageMoveTask( + sourceQueue, + params.SourceArn, + Some(destinationQueue), + params.DestinationArn, + params.MaxNumberOfMessagesPerSecond + ) + } + case None => + startMessageMoveTask(sourceQueue, params.SourceArn, None, None, params.MaxNumberOfMessagesPerSecond) + } + } + } + } + + private def startMessageMoveTask( + sourceQueue: ActorRef, + sourceArn: String, + destinationQueue: Option[ActorRef], + destinationArn: Option[String], + maxNumberOfMessagesPerSecond: Option[Int] + )(implicit marshallerDependencies: MarshallerDependencies): Route = { + for { + res <- queueManagerActor ? StartMessageMoveTask( + sourceQueue, + sourceArn, + destinationQueue, + destinationArn, + maxNumberOfMessagesPerSecond + ) + } yield { + res match { + case Left(e: ElasticMQError) => throw e.toSQSException + case Right(taskHandle) => complete(StartMessageMoveTaskResponse(taskHandle)) + } + } + } +} + +case class StartMessageMoveTaskActionRequest( + SourceArn: String, + DestinationArn: Option[String], + MaxNumberOfMessagesPerSecond: Option[Int] +) + +object StartMessageMoveTaskActionRequest { + implicit val requestJsonFormat: RootJsonFormat[StartMessageMoveTaskActionRequest] = jsonFormat3( + StartMessageMoveTaskActionRequest.apply + ) + + implicit val requestParamReader: FlatParamsReader[StartMessageMoveTaskActionRequest] = + new FlatParamsReader[StartMessageMoveTaskActionRequest] { + override def read(params: Map[String, String]): StartMessageMoveTaskActionRequest = { + new StartMessageMoveTaskActionRequest( + requiredParameter(params)(SourceArnParameter), + optionalParameter(params)(DestinationArnParameter), + optionalParameter(params)(MaxNumberOfMessagesPerSecondParameter).map(_.toInt) + ) + } + } +} + +case class StartMessageMoveTaskResponse(TaskHandle: String) + +object StartMessageMoveTaskResponse { + implicit val format: RootJsonFormat[StartMessageMoveTaskResponse] = jsonFormat1(StartMessageMoveTaskResponse.apply) + + implicit val xmlSerializer: XmlSerializer[StartMessageMoveTaskResponse] = t => + + {t.TaskHandle} + + + {EmptyRequestId} + + +}