From 289fa9090f0fc5a1157d720ec0043e3ff1cf3d27 Mon Sep 17 00:00:00 2001 From: Sergii Kozlov Date: Mon, 6 Jan 2025 02:49:26 -0800 Subject: [PATCH] Pass sourceQueueName via System Message Attribute for DeadLetterQueueSourceArn (#1086) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Pass sourceQueueName via System Message Attribute for DeadLetterQueueSourceArn * use dedicated field in Message instead of system attributes --------- Co-authored-by: Sergii Kozlov Co-authored-by: MichaƂ Ossowski --- .../scala/org/elasticmq/MessageData.scala | 3 ++- .../scala/org/elasticmq/NewMessageData.scala | 4 +-- .../actor/queue/InternalMessage.scala | 15 ++++++----- .../actor/queue/QueueActorMessageOps.scala | 5 ++-- .../queue/operations/MoveMessageOps.scala | 19 +++++++++----- .../queue/operations/ReceiveMessageOps.scala | 2 +- .../scala/org/elasticmq/msg/QueueMsg.scala | 2 +- .../FifoDeduplicationIdsHistoryTest.scala | 4 +-- .../actor/queue/InternalMessageSpec.scala | 8 +++--- .../ReceiveRequestAttemptCacheTest.scala | 8 +++--- .../actor/test/DataCreationHelpers.scala | 11 ++++---- .../elasticmq/persistence/sql/DBMessage.scala | 13 ++++++---- .../persistence/sql/MessageRepository.scala | 11 +++++--- .../rest/sqs/AmazonJavaSdkTestSuite.scala | 26 ++++++++++++++----- .../sqs/aws/AmazonJavaSdkNewTestSuite.scala | 17 ++++++++---- .../rest/sqs/ReceiveMessageDirectives.scala | 2 +- .../rest/sqs/SendMessageDirectives.scala | 12 +++------ 17 files changed, 96 insertions(+), 66 deletions(-) diff --git a/core/src/main/scala/org/elasticmq/MessageData.scala b/core/src/main/scala/org/elasticmq/MessageData.scala index 206164e37..ac90b1e6f 100644 --- a/core/src/main/scala/org/elasticmq/MessageData.scala +++ b/core/src/main/scala/org/elasticmq/MessageData.scala @@ -13,5 +13,6 @@ case class MessageData( messageGroupId: Option[String], messageDeduplicationId: Option[DeduplicationId], tracingId: Option[TracingId], - sequenceNumber: Option[String] + sequenceNumber: Option[String], + deadLetterSourceQueueName: Option[String] ) diff --git a/core/src/main/scala/org/elasticmq/NewMessageData.scala b/core/src/main/scala/org/elasticmq/NewMessageData.scala index a38773cc6..8c04effeb 100644 --- a/core/src/main/scala/org/elasticmq/NewMessageData.scala +++ b/core/src/main/scala/org/elasticmq/NewMessageData.scala @@ -4,11 +4,11 @@ case class NewMessageData( id: Option[MessageId], content: String, messageAttributes: Map[String, MessageAttribute], - messageSystemAttributes: Map[String, MessageAttribute], nextDelivery: NextDelivery, messageGroupId: Option[String], messageDeduplicationId: Option[DeduplicationId], orderIndex: Int, tracingId: Option[TracingId], - sequenceNumber: Option[String] + sequenceNumber: Option[String], + deadLetterSourceQueueName: Option[String] ) diff --git a/core/src/main/scala/org/elasticmq/actor/queue/InternalMessage.scala b/core/src/main/scala/org/elasticmq/actor/queue/InternalMessage.scala index 3ad666ecf..9b4f799f5 100644 --- a/core/src/main/scala/org/elasticmq/actor/queue/InternalMessage.scala +++ b/core/src/main/scala/org/elasticmq/actor/queue/InternalMessage.scala @@ -13,7 +13,6 @@ case class InternalMessage( var nextDelivery: Long, content: String, messageAttributes: Map[String, MessageAttribute], - messageSystemAttributes: Map[String, MessageAttribute], created: OffsetDateTime, orderIndex: Int, var firstReceive: Received, @@ -22,7 +21,8 @@ case class InternalMessage( messageGroupId: Option[String], messageDeduplicationId: Option[DeduplicationId], tracingId: Option[TracingId], - sequenceNumber: Option[String] + sequenceNumber: Option[String], + deadLetterSourceQueueName: Option[String] ) extends Comparable[InternalMessage] { // Priority queues have biggest elements first @@ -67,7 +67,8 @@ case class InternalMessage( messageGroupId, messageDeduplicationId, tracingId, - sequenceNumber + sequenceNumber, + deadLetterSourceQueueName ) def toNewMessageData = @@ -75,13 +76,13 @@ case class InternalMessage( Some(MessageId(id)), content, messageAttributes, - messageSystemAttributes, MillisNextDelivery(nextDelivery), messageGroupId, messageDeduplicationId, orderIndex, tracingId, - sequenceNumber + sequenceNumber, + deadLetterSourceQueueName ) def deliverable(deliveryTime: Long): Boolean = nextDelivery <= deliveryTime @@ -97,7 +98,6 @@ object InternalMessage { newMessageData.nextDelivery.toMillis(now, queueData.delay.toMillis).millis, newMessageData.content, newMessageData.messageAttributes, - newMessageData.messageSystemAttributes, OffsetDateTime.now(), newMessageData.orderIndex, NeverReceived, @@ -106,7 +106,8 @@ object InternalMessage { newMessageData.messageGroupId, newMessageData.messageDeduplicationId, newMessageData.tracingId, - newMessageData.sequenceNumber + newMessageData.sequenceNumber, + newMessageData.deadLetterSourceQueueName ) } diff --git a/core/src/main/scala/org/elasticmq/actor/queue/QueueActorMessageOps.scala b/core/src/main/scala/org/elasticmq/actor/queue/QueueActorMessageOps.scala index 1a8920798..31e313b37 100644 --- a/core/src/main/scala/org/elasticmq/actor/queue/QueueActorMessageOps.scala +++ b/core/src/main/scala/org/elasticmq/actor/queue/QueueActorMessageOps.scala @@ -33,8 +33,9 @@ trait QueueActorMessageOps receiveMessages(visibilityTimeout, count, receiveRequestAttemptId).send() case DeleteMessage(deliveryReceipt) => deleteMessage(deliveryReceipt).send() - case LookupMessage(messageId) => messageQueue.getById(messageId.id).map(_.toMessageData) - case MoveMessage(message, destination) => moveMessage(message, destination).send() + case LookupMessage(messageId) => messageQueue.getById(messageId.id).map(_.toMessageData) + case MoveMessage(message, destination, sourceQueueName) => + moveMessage(message, destination, sourceQueueName).send() case DeduplicationIdsCleanup => fifoMessagesHistory = fifoMessagesHistory.cleanOutdatedMessages(nowProvider) DoNotReply() diff --git a/core/src/main/scala/org/elasticmq/actor/queue/operations/MoveMessageOps.scala b/core/src/main/scala/org/elasticmq/actor/queue/operations/MoveMessageOps.scala index 5bffb6c2e..dddaaaf50 100644 --- a/core/src/main/scala/org/elasticmq/actor/queue/operations/MoveMessageOps.scala +++ b/core/src/main/scala/org/elasticmq/actor/queue/operations/MoveMessageOps.scala @@ -8,22 +8,27 @@ import org.elasticmq.{DeduplicationId, MoveDestination, MoveToDLQ} trait MoveMessageOps extends Logging { this: QueueActorStorage => - def moveMessage(message: InternalMessage, destination: MoveDestination): ResultWithEvents[Unit] = { + def moveMessage( + message: InternalMessage, + destination: MoveDestination, + sourceQueueName: String + ): ResultWithEvents[Unit] = { - copyMessagesToActorRef.foreach { _ ! SendMessage(message.toNewMessageData) } + val messageWithSourceQueueName = message.copy(deadLetterSourceQueueName = Some(sourceQueueName)) + copyMessagesToActorRef.foreach { _ ! SendMessage(messageWithSourceQueueName.toNewMessageData) } destination match { case MoveToDLQ => if (queueData.isFifo) { - CommonOperations.wasRegistered(message.toNewMessageData, fifoMessagesHistory) match { + CommonOperations.wasRegistered(messageWithSourceQueueName.toNewMessageData, fifoMessagesHistory) match { case Some(_) => ResultWithEvents.empty case None => - logger.debug(s"Moved message (${message.id}) from FIFO queue to ${queueData.name}") - moveMessageToQueue(regenerateDeduplicationId(message)) + logger.debug(s"Moved message (${messageWithSourceQueueName.id}) from FIFO queue to ${queueData.name}") + moveMessageToQueue(regenerateDeduplicationId(messageWithSourceQueueName)) } } else { - logger.debug(s"Moved message (${message.id}) to ${queueData.name}") - moveMessageToQueue(message) + logger.debug(s"Moved message (${messageWithSourceQueueName.id}) to ${queueData.name}") + moveMessageToQueue(messageWithSourceQueueName) } } } diff --git a/core/src/main/scala/org/elasticmq/actor/queue/operations/ReceiveMessageOps.scala b/core/src/main/scala/org/elasticmq/actor/queue/operations/ReceiveMessageOps.scala index 6b9402016..6c82c7731 100644 --- a/core/src/main/scala/org/elasticmq/actor/queue/operations/ReceiveMessageOps.scala +++ b/core/src/main/scala/org/elasticmq/actor/queue/operations/ReceiveMessageOps.scala @@ -85,7 +85,7 @@ trait ReceiveMessageOps extends Logging { messageQueue.dequeue(count, deliveryTime).map { internalMessage => if (queueData.deadLettersQueue.map(_.maxReceiveCount).exists(_ <= internalMessage.receiveCount)) { logger.debug(s"${queueData.name}: send message $internalMessage to dead letters actor $deadLettersActorRef") - deadLettersActorRef.foreach(_ ! MoveMessage(internalMessage, MoveToDLQ)) + deadLettersActorRef.foreach(_ ! MoveMessage(internalMessage, MoveToDLQ, queueData.name)) MessageToDelete(internalMessage) } else { MessageToReturn(internalMessage) diff --git a/core/src/main/scala/org/elasticmq/msg/QueueMsg.scala b/core/src/main/scala/org/elasticmq/msg/QueueMsg.scala index a8a8bf0f1..3d3dff360 100644 --- a/core/src/main/scala/org/elasticmq/msg/QueueMsg.scala +++ b/core/src/main/scala/org/elasticmq/msg/QueueMsg.scala @@ -42,7 +42,7 @@ case class GetQueueStatistics(deliveryTime: Long) extends QueueQueueMsg[QueueSta case class ClearQueue() extends QueueQueueMsg[Unit] case class SendMessage(message: NewMessageData) extends QueueMessageMsg[MessageData] -case class MoveMessage(message: InternalMessage, moveDestination: MoveDestination) extends QueueMessageMsg[Unit] +case class MoveMessage(message: InternalMessage, moveDestination: MoveDestination, sourceQueueName: String) extends QueueMessageMsg[Unit] case class UpdateVisibilityTimeout(deliveryReceipt: DeliveryReceipt, visibilityTimeout: VisibilityTimeout) extends QueueMessageMsg[Either[InvalidReceiptHandle, Unit]] case class ReceiveMessages( diff --git a/core/src/test/scala/org/elasticmq/FifoDeduplicationIdsHistoryTest.scala b/core/src/test/scala/org/elasticmq/FifoDeduplicationIdsHistoryTest.scala index e59028ba8..45e47406f 100644 --- a/core/src/test/scala/org/elasticmq/FifoDeduplicationIdsHistoryTest.scala +++ b/core/src/test/scala/org/elasticmq/FifoDeduplicationIdsHistoryTest.scala @@ -107,7 +107,6 @@ class FifoDeduplicationIdsHistoryTest extends AnyFunSuite with Matchers { nextDelivery = 100L, content = "", messageAttributes = Map.empty, - messageSystemAttributes = Map.empty, created = created, orderIndex = 0, firstReceive = NeverReceived, @@ -116,7 +115,8 @@ class FifoDeduplicationIdsHistoryTest extends AnyFunSuite with Matchers { messageGroupId = None, messageDeduplicationId = maybeDeduplicationId, tracingId = None, - sequenceNumber = None + sequenceNumber = None, + deadLetterSourceQueueName = None ) } diff --git a/core/src/test/scala/org/elasticmq/actor/queue/InternalMessageSpec.scala b/core/src/test/scala/org/elasticmq/actor/queue/InternalMessageSpec.scala index a7fc9eec3..13e55d322 100644 --- a/core/src/test/scala/org/elasticmq/actor/queue/InternalMessageSpec.scala +++ b/core/src/test/scala/org/elasticmq/actor/queue/InternalMessageSpec.scala @@ -18,7 +18,6 @@ class InternalMessageSpec extends AnyFunSuite with Matchers { nextDelivery = 0L, content = "content", messageAttributes = Map.empty, - messageSystemAttributes = Map.empty, created = freezedDateTime, orderIndex = 100, firstReceive = NeverReceived, @@ -27,7 +26,8 @@ class InternalMessageSpec extends AnyFunSuite with Matchers { messageGroupId = None, messageDeduplicationId = None, tracingId = None, - sequenceNumber = None + sequenceNumber = None, + deadLetterSourceQueueName = None ) val second = first.copy( @@ -46,7 +46,6 @@ class InternalMessageSpec extends AnyFunSuite with Matchers { nextDelivery = 0L, content = "content", messageAttributes = Map.empty, - messageSystemAttributes = Map.empty, created = freezedDateTime, orderIndex = 100, firstReceive = NeverReceived, @@ -55,7 +54,8 @@ class InternalMessageSpec extends AnyFunSuite with Matchers { messageGroupId = None, messageDeduplicationId = None, tracingId = None, - sequenceNumber = None + sequenceNumber = None, + deadLetterSourceQueueName = None ) val second = first.copy( diff --git a/core/src/test/scala/org/elasticmq/actor/queue/ReceiveRequestAttemptCacheTest.scala b/core/src/test/scala/org/elasticmq/actor/queue/ReceiveRequestAttemptCacheTest.scala index a1af5f878..f084f6cc9 100644 --- a/core/src/test/scala/org/elasticmq/actor/queue/ReceiveRequestAttemptCacheTest.scala +++ b/core/src/test/scala/org/elasticmq/actor/queue/ReceiveRequestAttemptCacheTest.scala @@ -23,7 +23,6 @@ class ReceiveRequestAttemptCacheTest extends AnyFunSuite with Matchers { 1L, "content", Map.empty, - Map.empty, nowProvider.now, orderIndex = 0, NeverReceived, @@ -32,7 +31,8 @@ class ReceiveRequestAttemptCacheTest extends AnyFunSuite with Matchers { messageGroupId = None, messageDeduplicationId = None, tracingId = None, - sequenceNumber = None + sequenceNumber = None, + deadLetterSourceQueueName = None ) val msg2 = msg1.copy(id = "id-2") val msg3 = msg1.copy(id = "id-3") @@ -74,7 +74,6 @@ class ReceiveRequestAttemptCacheTest extends AnyFunSuite with Matchers { 1L, "content", Map.empty, - Map.empty, nowProvider.now, orderIndex = 0, NeverReceived, @@ -83,7 +82,8 @@ class ReceiveRequestAttemptCacheTest extends AnyFunSuite with Matchers { messageGroupId = None, messageDeduplicationId = None, tracingId = None, - sequenceNumber = None + sequenceNumber = None, + deadLetterSourceQueueName = None ) val msg2 = msg1.copy(id = "id-2") val messageQueue = MessageQueue(isFifo = false) diff --git a/core/src/test/scala/org/elasticmq/actor/test/DataCreationHelpers.scala b/core/src/test/scala/org/elasticmq/actor/test/DataCreationHelpers.scala index 8eb125d49..8473b490d 100644 --- a/core/src/test/scala/org/elasticmq/actor/test/DataCreationHelpers.scala +++ b/core/src/test/scala/org/elasticmq/actor/test/DataCreationHelpers.scala @@ -52,7 +52,8 @@ trait DataCreationHelpers { messageGroupId, messageDeduplicationId, tracingId, - None + sequenceNumber = None, + deadLetterSourceQueueName = None ) def createNewMessageData( @@ -68,13 +69,13 @@ trait DataCreationHelpers { Some(MessageId(id)), content, messageAttributes, - Map.empty, nextDelivery, messageGroupId, messageDeduplicationId, orderIndex = 0, tracingId, - None + sequenceNumber = None, + deadLetterSourceQueueName = None ) def createNewMessageData(messageData: MessageData) = @@ -82,12 +83,12 @@ trait DataCreationHelpers { Some(messageData.id), messageData.content, messageData.messageAttributes, - Map.empty, messageData.nextDelivery, messageData.messageGroupId, messageData.messageDeduplicationId, orderIndex = 0, messageData.tracingId, - messageData.sequenceNumber + messageData.sequenceNumber, + messageData.deadLetterSourceQueueName ) } diff --git a/persistence/persistence-sql/src/main/scala/org/elasticmq/persistence/sql/DBMessage.scala b/persistence/persistence-sql/src/main/scala/org/elasticmq/persistence/sql/DBMessage.scala index d2b02c7c4..d9b4365ba 100644 --- a/persistence/persistence-sql/src/main/scala/org/elasticmq/persistence/sql/DBMessage.scala +++ b/persistence/persistence-sql/src/main/scala/org/elasticmq/persistence/sql/DBMessage.scala @@ -19,7 +19,8 @@ case class DBMessage( groupId: Option[String], deduplicationId: Option[String], tracingId: Option[String], - sequenceNumber: Option[String] + sequenceNumber: Option[String], + deadLetterSourceQueueName: Option[String] ) { def toInternalMessage: InternalMessage = { @@ -48,7 +49,6 @@ case class DBMessage( nextDelivery, new String(content), serializedAttrs, - Map.empty, OffsetDateTimeUtil.ofEpochMilli(created), orderIndex = 0, firstReceive, @@ -57,7 +57,8 @@ case class DBMessage( groupId, deduplicationId.map(id => DeduplicationId(id)), tracingId.map(TracingId.apply), - sequenceNumber + sequenceNumber, + deadLetterSourceQueueName ) } } @@ -76,7 +77,8 @@ object DBMessage { rs.stringOpt("group_id"), rs.stringOpt("deduplication_id"), rs.stringOpt("tracing_id"), - rs.stringOpt("sequence_number") + rs.stringOpt("sequence_number"), + rs.stringOpt("dead_letter_source_queue_name") ) def from(message: InternalMessage): DBMessage = { @@ -111,7 +113,8 @@ object DBMessage { message.messageGroupId, deduplicationId, message.tracingId.map(_.id), - message.sequenceNumber + message.sequenceNumber, + message.deadLetterSourceQueueName ) } } diff --git a/persistence/persistence-sql/src/main/scala/org/elasticmq/persistence/sql/MessageRepository.scala b/persistence/persistence-sql/src/main/scala/org/elasticmq/persistence/sql/MessageRepository.scala index ef2748c7e..386e57d6a 100644 --- a/persistence/persistence-sql/src/main/scala/org/elasticmq/persistence/sql/MessageRepository.scala +++ b/persistence/persistence-sql/src/main/scala/org/elasticmq/persistence/sql/MessageRepository.scala @@ -30,7 +30,8 @@ class MessageRepository(queueName: String, db: DB) extends Logging { group_id varchar, deduplication_id varchar, tracing_id varchar, - sequence_number varchar + sequence_number varchar, + dead_letter_source_queue_name varchar )""".execute.apply() def drop(): Unit = { @@ -50,7 +51,7 @@ class MessageRepository(queueName: String, db: DB) extends Logging { def add(internalMessage: InternalMessage): Int = { val message = DBMessage.from(internalMessage) sql"""insert into $tableName - (message_id, delivery_receipts, next_delivery, content, attributes, created, received, receive_count, group_id, deduplication_id, tracing_id, sequence_number) + (message_id, delivery_receipts, next_delivery, content, attributes, created, received, receive_count, group_id, deduplication_id, tracing_id, sequence_number, dead_letter_source_queue_name) values (${message.messageId}, ${message.deliveryReceipts}, ${message.nextDelivery}, @@ -62,7 +63,8 @@ class MessageRepository(queueName: String, db: DB) extends Logging { ${message.groupId}, ${message.deduplicationId}, ${message.tracingId}, - ${message.sequenceNumber})""".update.apply() + ${message.sequenceNumber}, + ${message.deadLetterSourceQueueName})""".update.apply() } def update(internalMessage: InternalMessage): Int = { @@ -74,7 +76,8 @@ class MessageRepository(queueName: String, db: DB) extends Logging { received = ${message.received}, receive_count = ${message.receiveCount}, tracing_id = ${message.tracingId}, - sequence_number = ${message.sequenceNumber} + sequence_number = ${message.sequenceNumber}, + dead_letter_source_queue_name = ${message.deadLetterSourceQueueName} where message_id = ${message.messageId}""".update.apply() } diff --git a/rest/rest-sqs-testing-amazon-java-sdk/src/test/scala/org/elasticmq/rest/sqs/AmazonJavaSdkTestSuite.scala b/rest/rest-sqs-testing-amazon-java-sdk/src/test/scala/org/elasticmq/rest/sqs/AmazonJavaSdkTestSuite.scala index f2d3f97da..9b10b4b94 100644 --- a/rest/rest-sqs-testing-amazon-java-sdk/src/test/scala/org/elasticmq/rest/sqs/AmazonJavaSdkTestSuite.scala +++ b/rest/rest-sqs-testing-amazon-java-sdk/src/test/scala/org/elasticmq/rest/sqs/AmazonJavaSdkTestSuite.scala @@ -2096,32 +2096,46 @@ class AmazonJavaSdkTestSuite extends SqsClientServerCommunication with Matchers secondReceiveResult.getMessages shouldBe empty } - test("should return DeadLetterQueueSourceArn in receive message attributes") { + test("should return DeadLetterQueueSourceArn in message received from DLQ") { // given val messageBody = "Message 1" - client.createQueue(new CreateQueueRequest("testDlq")).getQueueUrl + val createDlqQueueResult = client.createQueue(new CreateQueueRequest("testDlq")).getQueueUrl val redrivePolicy = RedrivePolicy("testDlq", awsRegion, awsAccountId, 1).toJson.toString val createQueueResult = client .createQueue( new CreateQueueRequest("main") .withAttributes( - Map(redrivePolicyAttribute -> redrivePolicy).asJava + Map(defaultVisibilityTimeoutAttribute -> "0", redrivePolicyAttribute -> redrivePolicy).asJava ) ) .getQueueUrl // when client.sendMessage(createQueueResult, messageBody) - val receiveResult = client.receiveMessage( + val firstReceiveResult = client.receiveMessage( new ReceiveMessageRequest() .withQueueUrl(createQueueResult) .withAttributeNames("All") ) + val secondReceiveResult = client.receiveMessage( + new ReceiveMessageRequest() + .withQueueUrl(createQueueResult) + .withAttributeNames("All") + ) + val receiveDlqResult = client.receiveMessage( + new ReceiveMessageRequest() + .withQueueUrl(createDlqQueueResult) + .withAttributeNames("All") + ) // then - receiveResult.getMessages.asScala.toList.flatMap(_.getAttributes.asScala.toList) should contain( - ("DeadLetterQueueSourceArn", s"arn:aws:sqs:$awsRegion:$awsAccountId:testDlq") + firstReceiveResult.getMessages.asScala.toList.flatMap(_.getAttributes.asScala.keys.toList) should not contain ( + "DeadLetterQueueSourceArn" + ) + secondReceiveResult.getMessages should be(empty) + receiveDlqResult.getMessages.asScala.toList.flatMap(_.getAttributes.asScala.toList) should contain( + ("DeadLetterQueueSourceArn", s"arn:aws:sqs:$awsRegion:$awsAccountId:main") ) } diff --git a/rest/rest-sqs-testing-amazon-java-sdk/src/test/scala/org/elasticmq/rest/sqs/aws/AmazonJavaSdkNewTestSuite.scala b/rest/rest-sqs-testing-amazon-java-sdk/src/test/scala/org/elasticmq/rest/sqs/aws/AmazonJavaSdkNewTestSuite.scala index 3a0c0f371..6bbccb9a0 100644 --- a/rest/rest-sqs-testing-amazon-java-sdk/src/test/scala/org/elasticmq/rest/sqs/aws/AmazonJavaSdkNewTestSuite.scala +++ b/rest/rest-sqs-testing-amazon-java-sdk/src/test/scala/org/elasticmq/rest/sqs/aws/AmazonJavaSdkNewTestSuite.scala @@ -136,19 +136,26 @@ abstract class AmazonJavaSdkNewTestSuite test("should return DeadLetterQueueSourceArn in receive message attributes") { // given - testClient.createQueue("testDlq") + val dlQueue = testClient.createQueue("testDlq") val queue = testClient.createQueue( "testQueue1", - Map(RedrivePolicyAttributeName -> RedrivePolicy("testDlq", awsRegion, awsAccountId, 1).toJson.compactPrint) + Map( + VisibilityTimeoutAttributeName -> "0", + RedrivePolicyAttributeName -> RedrivePolicy("testDlq", awsRegion, awsAccountId, 1).toJson.compactPrint + ) ) // when testClient.sendMessage(queue, "test123") - val receiveResult = testClient.receiveMessage(queue, List("All")) + val firstReceiveResult = testClient.receiveMessage(queue, List("All")) + val secondReceiveResult = testClient.receiveMessage(queue, List("All")) + val dlqReceiveResult = testClient.receiveMessage(dlQueue, List("All")) // then - receiveResult.flatMap(_.attributes.toList) should contain( - (DeadLetterQueueSourceArn, s"arn:aws:sqs:$awsRegion:$awsAccountId:testDlq") + firstReceiveResult.flatMap(_.attributes.keys.toList) should not contain DeadLetterQueueSourceArn + secondReceiveResult shouldBe empty + dlqReceiveResult.flatMap(_.attributes.toList) should contain( + (DeadLetterQueueSourceArn, s"arn:aws:sqs:$awsRegion:$awsAccountId:testQueue1") ) } diff --git a/rest/rest-sqs/src/main/scala/org/elasticmq/rest/sqs/ReceiveMessageDirectives.scala b/rest/rest-sqs/src/main/scala/org/elasticmq/rest/sqs/ReceiveMessageDirectives.scala index c5d45da7e..4d91b20b9 100644 --- a/rest/rest-sqs/src/main/scala/org/elasticmq/rest/sqs/ReceiveMessageDirectives.scala +++ b/rest/rest-sqs/src/main/scala/org/elasticmq/rest/sqs/ReceiveMessageDirectives.scala @@ -115,7 +115,7 @@ trait ReceiveMessageDirectives { ), Rule(AWSTraceHeaderAttribute, () => msg.tracingId.map(_.id)), Rule(SequenceNumberAttribute, () => msg.sequenceNumber), - Rule(DeadLetterQueueSourceArn, () => queueData.deadLettersQueue.map(qd => getArn(qd.name))) + Rule(DeadLetterQueueSourceArn, () => msg.deadLetterSourceQueueName.map(getArn)) ) } diff --git a/rest/rest-sqs/src/main/scala/org/elasticmq/rest/sqs/SendMessageDirectives.scala b/rest/rest-sqs/src/main/scala/org/elasticmq/rest/sqs/SendMessageDirectives.scala index da9b622f7..983678e93 100644 --- a/rest/rest-sqs/src/main/scala/org/elasticmq/rest/sqs/SendMessageDirectives.scala +++ b/rest/rest-sqs/src/main/scala/org/elasticmq/rest/sqs/SendMessageDirectives.scala @@ -163,13 +163,13 @@ trait SendMessageDirectives { None, body, messageAttributes, - messageSystemAttributes, nextDelivery, messageGroupId, messageDeduplicationId, orderIndex, maybeTracingId, - None + sequenceNumber = None, + deadLetterSourceQueueName = None ) } @@ -185,15 +185,9 @@ trait SendMessageDirectives { Some(md5AttributeDigest(message.messageAttributes)) } - val systemMessageAttributeDigest = if (message.messageSystemAttributes.isEmpty) { - None - } else { - Some(md5AttributeDigest(message.messageSystemAttributes)) - } - for { message <- queueActor ? SendMessage(message) - } yield MessageSendOutcome(message, digest, messageAttributeDigest, systemMessageAttributeDigest) + } yield MessageSendOutcome(message, digest, messageAttributeDigest, systemMessageAttributeDigest = None) } def verifyMessageNotTooLong(messageLength: Int): Unit =