Skip to content

Commit

Permalink
Pass sourceQueueName via System Message Attribute for DeadLetterQueue…
Browse files Browse the repository at this point in the history
…SourceArn (#1086)

* Pass sourceQueueName via System Message Attribute for DeadLetterQueueSourceArn

* use dedicated field in Message instead of system attributes

---------

Co-authored-by: Sergii Kozlov <[email protected]>
Co-authored-by: Michał Ossowski <[email protected]>
  • Loading branch information
3 people authored Jan 6, 2025
1 parent 5918d86 commit 289fa90
Show file tree
Hide file tree
Showing 17 changed files with 96 additions and 66 deletions.
3 changes: 2 additions & 1 deletion core/src/main/scala/org/elasticmq/MessageData.scala
Original file line number Diff line number Diff line change
Expand Up @@ -13,5 +13,6 @@ case class MessageData(
messageGroupId: Option[String],
messageDeduplicationId: Option[DeduplicationId],
tracingId: Option[TracingId],
sequenceNumber: Option[String]
sequenceNumber: Option[String],
deadLetterSourceQueueName: Option[String]
)
4 changes: 2 additions & 2 deletions core/src/main/scala/org/elasticmq/NewMessageData.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,11 @@ case class NewMessageData(
id: Option[MessageId],
content: String,
messageAttributes: Map[String, MessageAttribute],
messageSystemAttributes: Map[String, MessageAttribute],
nextDelivery: NextDelivery,
messageGroupId: Option[String],
messageDeduplicationId: Option[DeduplicationId],
orderIndex: Int,
tracingId: Option[TracingId],
sequenceNumber: Option[String]
sequenceNumber: Option[String],
deadLetterSourceQueueName: Option[String]
)
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ case class InternalMessage(
var nextDelivery: Long,
content: String,
messageAttributes: Map[String, MessageAttribute],
messageSystemAttributes: Map[String, MessageAttribute],
created: OffsetDateTime,
orderIndex: Int,
var firstReceive: Received,
Expand All @@ -22,7 +21,8 @@ case class InternalMessage(
messageGroupId: Option[String],
messageDeduplicationId: Option[DeduplicationId],
tracingId: Option[TracingId],
sequenceNumber: Option[String]
sequenceNumber: Option[String],
deadLetterSourceQueueName: Option[String]
) extends Comparable[InternalMessage] {

// Priority queues have biggest elements first
Expand Down Expand Up @@ -67,21 +67,22 @@ case class InternalMessage(
messageGroupId,
messageDeduplicationId,
tracingId,
sequenceNumber
sequenceNumber,
deadLetterSourceQueueName
)

def toNewMessageData =
NewMessageData(
Some(MessageId(id)),
content,
messageAttributes,
messageSystemAttributes,
MillisNextDelivery(nextDelivery),
messageGroupId,
messageDeduplicationId,
orderIndex,
tracingId,
sequenceNumber
sequenceNumber,
deadLetterSourceQueueName
)

def deliverable(deliveryTime: Long): Boolean = nextDelivery <= deliveryTime
Expand All @@ -97,7 +98,6 @@ object InternalMessage {
newMessageData.nextDelivery.toMillis(now, queueData.delay.toMillis).millis,
newMessageData.content,
newMessageData.messageAttributes,
newMessageData.messageSystemAttributes,
OffsetDateTime.now(),
newMessageData.orderIndex,
NeverReceived,
Expand All @@ -106,7 +106,8 @@ object InternalMessage {
newMessageData.messageGroupId,
newMessageData.messageDeduplicationId,
newMessageData.tracingId,
newMessageData.sequenceNumber
newMessageData.sequenceNumber,
newMessageData.deadLetterSourceQueueName
)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,9 @@ trait QueueActorMessageOps
receiveMessages(visibilityTimeout, count, receiveRequestAttemptId).send()
case DeleteMessage(deliveryReceipt) =>
deleteMessage(deliveryReceipt).send()
case LookupMessage(messageId) => messageQueue.getById(messageId.id).map(_.toMessageData)
case MoveMessage(message, destination) => moveMessage(message, destination).send()
case LookupMessage(messageId) => messageQueue.getById(messageId.id).map(_.toMessageData)
case MoveMessage(message, destination, sourceQueueName) =>
moveMessage(message, destination, sourceQueueName).send()
case DeduplicationIdsCleanup =>
fifoMessagesHistory = fifoMessagesHistory.cleanOutdatedMessages(nowProvider)
DoNotReply()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,22 +8,27 @@ import org.elasticmq.{DeduplicationId, MoveDestination, MoveToDLQ}
trait MoveMessageOps extends Logging {
this: QueueActorStorage =>

def moveMessage(message: InternalMessage, destination: MoveDestination): ResultWithEvents[Unit] = {
def moveMessage(
message: InternalMessage,
destination: MoveDestination,
sourceQueueName: String
): ResultWithEvents[Unit] = {

copyMessagesToActorRef.foreach { _ ! SendMessage(message.toNewMessageData) }
val messageWithSourceQueueName = message.copy(deadLetterSourceQueueName = Some(sourceQueueName))
copyMessagesToActorRef.foreach { _ ! SendMessage(messageWithSourceQueueName.toNewMessageData) }

destination match {
case MoveToDLQ =>
if (queueData.isFifo) {
CommonOperations.wasRegistered(message.toNewMessageData, fifoMessagesHistory) match {
CommonOperations.wasRegistered(messageWithSourceQueueName.toNewMessageData, fifoMessagesHistory) match {
case Some(_) => ResultWithEvents.empty
case None =>
logger.debug(s"Moved message (${message.id}) from FIFO queue to ${queueData.name}")
moveMessageToQueue(regenerateDeduplicationId(message))
logger.debug(s"Moved message (${messageWithSourceQueueName.id}) from FIFO queue to ${queueData.name}")
moveMessageToQueue(regenerateDeduplicationId(messageWithSourceQueueName))
}
} else {
logger.debug(s"Moved message (${message.id}) to ${queueData.name}")
moveMessageToQueue(message)
logger.debug(s"Moved message (${messageWithSourceQueueName.id}) to ${queueData.name}")
moveMessageToQueue(messageWithSourceQueueName)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ trait ReceiveMessageOps extends Logging {
messageQueue.dequeue(count, deliveryTime).map { internalMessage =>
if (queueData.deadLettersQueue.map(_.maxReceiveCount).exists(_ <= internalMessage.receiveCount)) {
logger.debug(s"${queueData.name}: send message $internalMessage to dead letters actor $deadLettersActorRef")
deadLettersActorRef.foreach(_ ! MoveMessage(internalMessage, MoveToDLQ))
deadLettersActorRef.foreach(_ ! MoveMessage(internalMessage, MoveToDLQ, queueData.name))
MessageToDelete(internalMessage)
} else {
MessageToReturn(internalMessage)
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/org/elasticmq/msg/QueueMsg.scala
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ case class GetQueueStatistics(deliveryTime: Long) extends QueueQueueMsg[QueueSta
case class ClearQueue() extends QueueQueueMsg[Unit]

case class SendMessage(message: NewMessageData) extends QueueMessageMsg[MessageData]
case class MoveMessage(message: InternalMessage, moveDestination: MoveDestination) extends QueueMessageMsg[Unit]
case class MoveMessage(message: InternalMessage, moveDestination: MoveDestination, sourceQueueName: String) extends QueueMessageMsg[Unit]
case class UpdateVisibilityTimeout(deliveryReceipt: DeliveryReceipt, visibilityTimeout: VisibilityTimeout)
extends QueueMessageMsg[Either[InvalidReceiptHandle, Unit]]
case class ReceiveMessages(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,6 @@ class FifoDeduplicationIdsHistoryTest extends AnyFunSuite with Matchers {
nextDelivery = 100L,
content = "",
messageAttributes = Map.empty,
messageSystemAttributes = Map.empty,
created = created,
orderIndex = 0,
firstReceive = NeverReceived,
Expand All @@ -116,7 +115,8 @@ class FifoDeduplicationIdsHistoryTest extends AnyFunSuite with Matchers {
messageGroupId = None,
messageDeduplicationId = maybeDeduplicationId,
tracingId = None,
sequenceNumber = None
sequenceNumber = None,
deadLetterSourceQueueName = None
)

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ class InternalMessageSpec extends AnyFunSuite with Matchers {
nextDelivery = 0L,
content = "content",
messageAttributes = Map.empty,
messageSystemAttributes = Map.empty,
created = freezedDateTime,
orderIndex = 100,
firstReceive = NeverReceived,
Expand All @@ -27,7 +26,8 @@ class InternalMessageSpec extends AnyFunSuite with Matchers {
messageGroupId = None,
messageDeduplicationId = None,
tracingId = None,
sequenceNumber = None
sequenceNumber = None,
deadLetterSourceQueueName = None
)

val second = first.copy(
Expand All @@ -46,7 +46,6 @@ class InternalMessageSpec extends AnyFunSuite with Matchers {
nextDelivery = 0L,
content = "content",
messageAttributes = Map.empty,
messageSystemAttributes = Map.empty,
created = freezedDateTime,
orderIndex = 100,
firstReceive = NeverReceived,
Expand All @@ -55,7 +54,8 @@ class InternalMessageSpec extends AnyFunSuite with Matchers {
messageGroupId = None,
messageDeduplicationId = None,
tracingId = None,
sequenceNumber = None
sequenceNumber = None,
deadLetterSourceQueueName = None
)

val second = first.copy(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ class ReceiveRequestAttemptCacheTest extends AnyFunSuite with Matchers {
1L,
"content",
Map.empty,
Map.empty,
nowProvider.now,
orderIndex = 0,
NeverReceived,
Expand All @@ -32,7 +31,8 @@ class ReceiveRequestAttemptCacheTest extends AnyFunSuite with Matchers {
messageGroupId = None,
messageDeduplicationId = None,
tracingId = None,
sequenceNumber = None
sequenceNumber = None,
deadLetterSourceQueueName = None
)
val msg2 = msg1.copy(id = "id-2")
val msg3 = msg1.copy(id = "id-3")
Expand Down Expand Up @@ -74,7 +74,6 @@ class ReceiveRequestAttemptCacheTest extends AnyFunSuite with Matchers {
1L,
"content",
Map.empty,
Map.empty,
nowProvider.now,
orderIndex = 0,
NeverReceived,
Expand All @@ -83,7 +82,8 @@ class ReceiveRequestAttemptCacheTest extends AnyFunSuite with Matchers {
messageGroupId = None,
messageDeduplicationId = None,
tracingId = None,
sequenceNumber = None
sequenceNumber = None,
deadLetterSourceQueueName = None
)
val msg2 = msg1.copy(id = "id-2")
val messageQueue = MessageQueue(isFifo = false)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,8 @@ trait DataCreationHelpers {
messageGroupId,
messageDeduplicationId,
tracingId,
None
sequenceNumber = None,
deadLetterSourceQueueName = None
)

def createNewMessageData(
Expand All @@ -68,26 +69,26 @@ trait DataCreationHelpers {
Some(MessageId(id)),
content,
messageAttributes,
Map.empty,
nextDelivery,
messageGroupId,
messageDeduplicationId,
orderIndex = 0,
tracingId,
None
sequenceNumber = None,
deadLetterSourceQueueName = None
)

def createNewMessageData(messageData: MessageData) =
NewMessageData(
Some(messageData.id),
messageData.content,
messageData.messageAttributes,
Map.empty,
messageData.nextDelivery,
messageData.messageGroupId,
messageData.messageDeduplicationId,
orderIndex = 0,
messageData.tracingId,
messageData.sequenceNumber
messageData.sequenceNumber,
messageData.deadLetterSourceQueueName
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ case class DBMessage(
groupId: Option[String],
deduplicationId: Option[String],
tracingId: Option[String],
sequenceNumber: Option[String]
sequenceNumber: Option[String],
deadLetterSourceQueueName: Option[String]
) {

def toInternalMessage: InternalMessage = {
Expand Down Expand Up @@ -48,7 +49,6 @@ case class DBMessage(
nextDelivery,
new String(content),
serializedAttrs,
Map.empty,
OffsetDateTimeUtil.ofEpochMilli(created),
orderIndex = 0,
firstReceive,
Expand All @@ -57,7 +57,8 @@ case class DBMessage(
groupId,
deduplicationId.map(id => DeduplicationId(id)),
tracingId.map(TracingId.apply),
sequenceNumber
sequenceNumber,
deadLetterSourceQueueName
)
}
}
Expand All @@ -76,7 +77,8 @@ object DBMessage {
rs.stringOpt("group_id"),
rs.stringOpt("deduplication_id"),
rs.stringOpt("tracing_id"),
rs.stringOpt("sequence_number")
rs.stringOpt("sequence_number"),
rs.stringOpt("dead_letter_source_queue_name")
)

def from(message: InternalMessage): DBMessage = {
Expand Down Expand Up @@ -111,7 +113,8 @@ object DBMessage {
message.messageGroupId,
deduplicationId,
message.tracingId.map(_.id),
message.sequenceNumber
message.sequenceNumber,
message.deadLetterSourceQueueName
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ class MessageRepository(queueName: String, db: DB) extends Logging {
group_id varchar,
deduplication_id varchar,
tracing_id varchar,
sequence_number varchar
sequence_number varchar,
dead_letter_source_queue_name varchar
)""".execute.apply()

def drop(): Unit = {
Expand All @@ -50,7 +51,7 @@ class MessageRepository(queueName: String, db: DB) extends Logging {
def add(internalMessage: InternalMessage): Int = {
val message = DBMessage.from(internalMessage)
sql"""insert into $tableName
(message_id, delivery_receipts, next_delivery, content, attributes, created, received, receive_count, group_id, deduplication_id, tracing_id, sequence_number)
(message_id, delivery_receipts, next_delivery, content, attributes, created, received, receive_count, group_id, deduplication_id, tracing_id, sequence_number, dead_letter_source_queue_name)
values (${message.messageId},
${message.deliveryReceipts},
${message.nextDelivery},
Expand All @@ -62,7 +63,8 @@ class MessageRepository(queueName: String, db: DB) extends Logging {
${message.groupId},
${message.deduplicationId},
${message.tracingId},
${message.sequenceNumber})""".update.apply()
${message.sequenceNumber},
${message.deadLetterSourceQueueName})""".update.apply()
}

def update(internalMessage: InternalMessage): Int = {
Expand All @@ -74,7 +76,8 @@ class MessageRepository(queueName: String, db: DB) extends Logging {
received = ${message.received},
receive_count = ${message.receiveCount},
tracing_id = ${message.tracingId},
sequence_number = ${message.sequenceNumber}
sequence_number = ${message.sequenceNumber},
dead_letter_source_queue_name = ${message.deadLetterSourceQueueName}
where message_id = ${message.messageId}""".update.apply()
}

Expand Down
Loading

0 comments on commit 289fa90

Please sign in to comment.