Skip to content

Commit

Permalink
Implement DLQ redrive actions (#987)
Browse files Browse the repository at this point in the history
* StartMessageMoveTask handling

* add some logging, test cases

* implement CancelMessageMoveTask

* fix missing ApproximateNumberOfMessagesMoved
refactor test

* ListMessageMoveTasksDirectives and error handling

* fix scala 2.12 build

* refactor ListMessageMoveTasksDirectives

* refactor QueueManagerActor

* refactor TaskId => TaskHandle

* make MessageMoveTaskTest support both SDKs

* fix build

* refactoring
  • Loading branch information
micossow authored Apr 10, 2024
1 parent bcd450b commit eda7e9f
Show file tree
Hide file tree
Showing 31 changed files with 1,252 additions and 36 deletions.
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ val jclOverSlf4j = "org.slf4j" % "jcl-over-slf4j" % "2.0.12" // needed form amaz
val scalatest = "org.scalatest" %% "scalatest" % "3.2.18"
val awaitility = "org.awaitility" % "awaitility-scala" % "4.2.1"

val amazonJavaSdkSqs = "com.amazonaws" % "aws-java-sdk-sqs" % "1.12.472" exclude ("commons-logging", "commons-logging")
val amazonJavaSdkSqs = "com.amazonaws" % "aws-java-sdk-sqs" % "1.12.580" exclude ("commons-logging", "commons-logging")
val amazonJavaV2SdkSqs = "software.amazon.awssdk" % "sqs" % "2.21.43"

val pekkoVersion = "1.0.2"
Expand Down
33 changes: 18 additions & 15 deletions core/src/main/scala/org/elasticmq/ElasticMQError.scala
Original file line number Diff line number Diff line change
@@ -1,32 +1,35 @@
package org.elasticmq
import org.elasticmq.msg.MessageMoveTaskHandle

trait ElasticMQError {
sealed trait ElasticMQError {
val queueName: String
val code: String
val code: String // TODO: code should be handled in rest-sqs module
val message: String
}

class QueueAlreadyExists(val queueName: String) extends ElasticMQError {
final case class QueueAlreadyExists(val queueName: String) extends ElasticMQError {
val code = "QueueAlreadyExists"
val message = s"Queue already exists: $queueName"
}

case class QueueCreationError(queueName: String, reason: String) extends ElasticMQError {
val code = "QueueCreationError"
val message = s"Queue named $queueName could not be created because of $reason"
}

case class InvalidParameterValue(queueName: String, reason: String) extends ElasticMQError {
final case class InvalidParameterValue(queueName: String, reason: String) extends ElasticMQError {
val code = "InvalidParameterValue"
val message = reason
}

class MessageDoesNotExist(val queueName: String, messageId: MessageId) extends ElasticMQError {
val code = "MessageDoesNotExist"
val message = s"Message does not exist: $messageId in queue: $queueName"
}

class InvalidReceiptHandle(val queueName: String, receiptHandle: String) extends ElasticMQError {
final case class InvalidReceiptHandle(val queueName: String, receiptHandle: String) extends ElasticMQError {
val code = "ReceiptHandleIsInvalid"
val message = s"""The receipt handle "$receiptHandle" is not valid."""
}

final case class InvalidMessageMoveTaskHandle(val taskHandle: MessageMoveTaskHandle) extends ElasticMQError {
val code = "ResourceNotFoundException"
val message = s"""The task handle "$taskHandle" is not valid or does not exist"""

override val queueName: String = "invalid"
}

final case class MessageMoveTaskAlreadyRunning(val queueName: String) extends ElasticMQError {
val code = "AWS.SimpleQueueService.UnsupportedOperation"
val message = s"""A message move task is already running on queue "$queueName""""
}
26 changes: 22 additions & 4 deletions core/src/main/scala/org/elasticmq/actor/QueueManagerActor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -7,18 +7,23 @@ import org.elasticmq.actor.reply._
import org.elasticmq.msg._
import org.elasticmq.util.{Logging, NowProvider}

import scala.collection.mutable
import scala.reflect._

class QueueManagerActor(nowProvider: NowProvider, limits: Limits, queueEventListener: Option[ActorRef])
extends ReplyingActor
with QueueManagerActorStorage
with QueueManagerMessageMoveOps
with Logging {

type M[X] = QueueManagerMsg[X]
val ev: ClassTag[QueueManagerMsg[Unit]] = classTag[M[Unit]]

case class ActorWithQueueData(actorRef: ActorRef, queueData: QueueData)
private val queues = collection.mutable.HashMap[String, ActorWithQueueData]()
val queues: mutable.Map[MessageMoveTaskHandle, ActorWithQueueData] = mutable.HashMap[String, ActorWithQueueData]()

def receiveAndReply[T](msg: QueueManagerMsg[T]): ReplyAction[T] =
// TODO: create *Ops class like in QueueActor
def receiveAndReply[T](msg: QueueManagerMsg[T]): ReplyAction[T] = {
val self = context.self
msg match {
case CreateQueue(request) =>
queues.get(request.name) match {
Expand Down Expand Up @@ -63,7 +68,19 @@ class QueueManagerActor(nowProvider: NowProvider, limits: Limits, queueEventList
queues.collect {
case (name, actor) if actor.queueData.deadLettersQueue.exists(_.name == queueName) => name
}.toList

case StartMessageMoveTask(
sourceQueue,
sourceArn,
destinationQueue,
destinationArn,
maxNumberOfMessagesPerSecond
) =>
startMessageMoveTask(sourceQueue, sourceArn, destinationQueue, destinationArn, maxNumberOfMessagesPerSecond)
case MessageMoveTaskFinished(taskHandle) => onMessageMoveTaskFinished(taskHandle)
case CancelMessageMoveTask(taskHandle) => cancelMessageMoveTask(taskHandle)
}
}

protected def createQueueActor(
nowProvider: NowProvider,
Expand All @@ -88,7 +105,8 @@ class QueueManagerActor(nowProvider: NowProvider, limits: Limits, queueEventList
moveMessagesToQueueActor,
queueEventListener
)
)
),
s"queue-${queueData.name}"
)
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package org.elasticmq.actor
import org.apache.pekko.actor.{ActorContext, ActorRef}
import org.apache.pekko.util.Timeout
import org.elasticmq.QueueData

import scala.collection.mutable
import scala.concurrent.ExecutionContext
import scala.concurrent.duration.DurationInt

trait QueueManagerActorStorage {

def context: ActorContext

implicit lazy val ec: ExecutionContext = context.dispatcher
implicit lazy val timeout: Timeout = 5.seconds

case class ActorWithQueueData(actorRef: ActorRef, queueData: QueueData)
def queues: mutable.Map[String, ActorWithQueueData]
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
package org.elasticmq.actor
import org.apache.pekko.actor.ActorRef
import org.apache.pekko.util.Timeout
import org.elasticmq.actor.reply._
import org.elasticmq.msg.{CancelMovingMessages, GetQueueData, MessageMoveTaskHandle, StartMovingMessages}
import org.elasticmq.util.Logging
import org.elasticmq.{ElasticMQError, InvalidMessageMoveTaskHandle}

import scala.collection.mutable
import scala.concurrent.Future
import scala.util.{Failure, Success}

trait QueueManagerMessageMoveOps extends Logging {
this: QueueManagerActorStorage =>

private val messageMoveTasks = mutable.HashMap[MessageMoveTaskHandle, ActorRef]()

def startMessageMoveTask(
sourceQueue: ActorRef,
sourceArn: String,
destinationQueue: Option[ActorRef],
destinationArn: Option[String],
maxNumberOfMessagesPerSecond: Option[Int]
)(implicit timeout: Timeout): ReplyAction[Either[ElasticMQError, MessageMoveTaskHandle]] = {
val self = context.self
val replyTo = context.sender()
(for {
destinationQueueActorRef <- destinationQueue
.map(Future.successful)
.getOrElse(findDeadLetterQueueSource(sourceQueue))
result <- sourceQueue ? StartMovingMessages(
destinationQueueActorRef,
destinationArn,
sourceArn,
maxNumberOfMessagesPerSecond,
self
)
} yield (result, destinationQueueActorRef)).onComplete {
case Success((result, destinationQueueActorRef)) =>
result match {
case Right(taskHandle) =>
logger.debug("Message move task {} => {} created", sourceQueue, destinationQueueActorRef)
messageMoveTasks.put(taskHandle, sourceQueue)
replyTo ! Right(taskHandle)
case Left(error) =>
logger.error("Failed to start message move task: {}", error)
replyTo ! Left(error)
}
case Failure(ex) => logger.error("Failed to start message move task", ex)
}
DoNotReply()
}

def onMessageMoveTaskFinished(taskHandle: MessageMoveTaskHandle): ReplyAction[Unit] = {
logger.debug("Message move task {} finished", taskHandle)
messageMoveTasks.remove(taskHandle)
DoNotReply()
}

def cancelMessageMoveTask(taskHandle: MessageMoveTaskHandle): ReplyAction[Either[ElasticMQError, Long]] = {
logger.info("Cancelling message move task {}", taskHandle)
messageMoveTasks.get(taskHandle) match {
case Some(sourceQueue) =>
val replyTo = context.sender()
sourceQueue ? CancelMovingMessages() onComplete {
case Success(numMessageMoved) =>
logger.debug("Message move task {} cancelled", taskHandle)
messageMoveTasks.remove(taskHandle)
replyTo ! Right(numMessageMoved)
case Failure(ex) =>
logger.error("Failed to cancel message move task", ex)
replyTo ! Left(ex)
}
DoNotReply()
case None =>
ReplyWith(Left(new InvalidMessageMoveTaskHandle(taskHandle)))
}
}

private def findDeadLetterQueueSource(sourceQueue: ActorRef) = {
val queueDataF = sourceQueue ? GetQueueData()
queueDataF.map { queueData =>
queues
.filter { case (_, data) =>
data.queueData.deadLettersQueue.exists(dlqData => dlqData.name == queueData.name)
}
.head
._2
.actorRef
}
}
}
21 changes: 21 additions & 0 deletions core/src/main/scala/org/elasticmq/actor/queue/MessageQueue.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,22 @@ sealed trait MessageQueue {
*/
def getById(id: String): Option[InternalMessage]

/** Remove the first message from the queue
*
* @return
* The message or None if not exists
*/
def pop: Option[InternalMessage]

/** Get all messages in queue
*
* @return
* All messages in queue
*/
def all: Iterable[InternalMessage]

def size: Long

/** Drop all messages on the queue
*/
def clear(): Unit
Expand Down Expand Up @@ -137,8 +146,20 @@ object MessageQueue {

override def getById(id: String): Option[InternalMessage] = messagesById.get(id)

override def pop: Option[InternalMessage] = {
if (messageQueue.isEmpty) {
None
} else {
val firstMessage = messageQueue.dequeue()
remove(firstMessage.id)
Some(firstMessage)
}
}

override def all: Iterable[InternalMessage] = messagesById.values

override def size: Long = messageQueue.size

override def clear(): Unit = {
messagesById.clear()
messageQueue.clear()
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
package org.elasticmq.actor.queue

import org.apache.pekko.actor.ActorRef
import org.apache.pekko.actor.{ActorLogging, ActorRef}
import org.elasticmq.QueueData
import org.elasticmq.actor.reply.ReplyingActor
import org.elasticmq.actor.reply.{ReplyAction, ReplyingActor}
import org.elasticmq.msg._
import org.elasticmq.util.{Logging, NowProvider}

Expand All @@ -24,7 +24,7 @@ class QueueActor(
type M[X] = QueueMsg[X]
val ev = classTag[M[Unit]]

def receiveAndReply[T](msg: QueueMsg[T]) =
def receiveAndReply[T](msg: QueueMsg[T]): ReplyAction[T] =
msg match {
case m: QueueQueueMsg[T] =>
val replyAction = receiveAndReplyQueueMsg(m)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ trait QueueActorMessageOps
with DeleteMessageOps
with ReceiveMessageOps
with MoveMessageOps
with MoveMessagesAsyncOps
with Timers {
this: QueueActorStorage =>

Expand All @@ -38,6 +39,20 @@ trait QueueActorMessageOps
fifoMessagesHistory = fifoMessagesHistory.cleanOutdatedMessages(nowProvider)
DoNotReply()
case RestoreMessages(messages) => restoreMessages(messages)
case StartMovingMessages(
destinationQueue,
destinationArn,
sourceArn,
maxNumberOfMessagesPerSecond,
queueManager
) =>
startMovingMessages(destinationQueue, destinationArn, sourceArn, maxNumberOfMessagesPerSecond, queueManager)
case CancelMovingMessages() =>
cancelMovingMessages()
case MoveFirstMessage(destinationQueue, queueManager) =>
moveFirstMessage(destinationQueue, queueManager).send()
case GetMovingMessagesTasks() =>
getMovingMessagesTasks
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,9 @@ trait QueueActorStorage {
notificationF.onComplete {
case Success(_) =>
result match {
case Some(r) => actualSender ! r
case Some(r) =>
logger.debug(s"Sending message $r from ${context.self} to $actualSender")
actualSender ! r
case None =>
}
case Failure(ex) => logger.error(s"Failed to notify queue event listener. The state may be inconsistent.", ex)
Expand Down
Loading

0 comments on commit eda7e9f

Please sign in to comment.