diff --git a/rest/rest-sqs-testing-amazon-java-sdk/src/test/scala/org/elasticmq/rest/sqs/AmazonJavaSdkTestSuite.scala b/rest/rest-sqs-testing-amazon-java-sdk/src/test/scala/org/elasticmq/rest/sqs/AmazonJavaSdkTestSuite.scala index 8cb2899a..bd83dce8 100644 --- a/rest/rest-sqs-testing-amazon-java-sdk/src/test/scala/org/elasticmq/rest/sqs/AmazonJavaSdkTestSuite.scala +++ b/rest/rest-sqs-testing-amazon-java-sdk/src/test/scala/org/elasticmq/rest/sqs/AmazonJavaSdkTestSuite.scala @@ -2097,6 +2097,35 @@ class AmazonJavaSdkTestSuite extends SqsClientServerCommunication with Matchers secondReceiveResult.getMessages shouldBe empty } + test("should return DeadLetterQueueSourceArn in receive message attributes") { + // given + val messageBody = "Message 1" + client.createQueue(new CreateQueueRequest("testDlq")).getQueueUrl + val redrivePolicy = RedrivePolicy("testDlq", awsRegion, awsAccountId, 1).toJson.toString + + val createQueueResult = client + .createQueue( + new CreateQueueRequest("main") + .withAttributes( + Map(redrivePolicyAttribute -> redrivePolicy).asJava + ) + ) + .getQueueUrl + + // when + client.sendMessage(createQueueResult, messageBody) + val receiveResult = client.receiveMessage( + new ReceiveMessageRequest() + .withQueueUrl(createQueueResult) + .withAttributeNames("All") + ) + + // then + receiveResult.getMessages.asScala.toList.flatMap(_.getAttributes.asScala.toList) should contain( + ("DeadLetterQueueSourceArn", s"arn:aws:sqs:$awsRegion:$awsAccountId:testDlq") + ) + } + test("should list all source queues for a dlq") { // given val dlqUrl = client.createQueue(new CreateQueueRequest("testDlq")).getQueueUrl diff --git a/rest/rest-sqs-testing-amazon-java-sdk/src/test/scala/org/elasticmq/rest/sqs/AmazonJavaSdkV2TestSuite.scala b/rest/rest-sqs-testing-amazon-java-sdk/src/test/scala/org/elasticmq/rest/sqs/AmazonJavaSdkV2TestSuite.scala index b8a8eea1..96e4f4fe 100644 --- a/rest/rest-sqs-testing-amazon-java-sdk/src/test/scala/org/elasticmq/rest/sqs/AmazonJavaSdkV2TestSuite.scala +++ b/rest/rest-sqs-testing-amazon-java-sdk/src/test/scala/org/elasticmq/rest/sqs/AmazonJavaSdkV2TestSuite.scala @@ -1,9 +1,12 @@ package org.elasticmq.rest.sqs +import org.elasticmq.rest.sqs.model.RedrivePolicy import org.elasticmq.{BinaryMessageAttribute, MessageAttribute, NumberMessageAttribute, StringMessageAttribute} +import org.elasticmq.rest.sqs.model.RedrivePolicyJson.format import org.scalatest.matchers.should.Matchers import software.amazon.awssdk.core.SdkBytes import software.amazon.awssdk.services.sqs.model.{GetQueueUrlRequest => AwsSdkGetQueueUrlRequest, _} +import spray.json.enrichAny import scala.collection.JavaConverters._ @@ -80,6 +83,30 @@ class AmazonJavaSdkV2TestSuite extends SqsClientServerWithSdkV2Communication wit ) } + test("should return DeadLetterQueueSourceArn in receive message attributes") { + // Given + clientV2.createQueue(CreateQueueRequest.builder().queueName("testDlq").build()) + val queue = clientV2.createQueue(CreateQueueRequest.builder() + .queueName("testQueue1") + .attributes(Map(QueueAttributeName.REDRIVE_POLICY -> RedrivePolicy("testDlq", awsRegion, awsAccountId, 1).toJson.toString).asJava) + .build()) + + // When + clientV2.sendMessage(SendMessageRequest.builder() + .queueUrl(queue.queueUrl()) + .messageBody("test123") + .build()) + val receiveResult = clientV2.receiveMessage(ReceiveMessageRequest.builder() + .queueUrl(queue.queueUrl()) + .attributeNamesWithStrings("All") + .build()) + + // Then + receiveResult.messages().asScala.toList.flatMap(_.attributes().asScala.toList) should contain( + (MessageSystemAttributeName.DEAD_LETTER_QUEUE_SOURCE_ARN, s"arn:aws:sqs:$awsRegion:$awsAccountId:testDlq") + ) + } + private def doTestSendAndReceiveMessage(content: String): Unit = { doTestSendAndReceiveMessageWithAttributes(content, Map(), List()) } diff --git a/rest/rest-sqs/src/main/scala/org/elasticmq/rest/sqs/AwsConfiguration.scala b/rest/rest-sqs/src/main/scala/org/elasticmq/rest/sqs/AwsConfiguration.scala new file mode 100644 index 00000000..186885e4 --- /dev/null +++ b/rest/rest-sqs/src/main/scala/org/elasticmq/rest/sqs/AwsConfiguration.scala @@ -0,0 +1,10 @@ +package org.elasticmq.rest.sqs + +trait AwsConfiguration { + + def awsRegion: String + + def awsAccountId: String + + def getArn(queueName: String): String = s"arn:aws:sqs:$awsRegion:$awsAccountId:$queueName" +} diff --git a/rest/rest-sqs/src/main/scala/org/elasticmq/rest/sqs/QueueAttributesOps.scala b/rest/rest-sqs/src/main/scala/org/elasticmq/rest/sqs/QueueAttributesOps.scala index 00af6486..42bcb2ba 100644 --- a/rest/rest-sqs/src/main/scala/org/elasticmq/rest/sqs/QueueAttributesOps.scala +++ b/rest/rest-sqs/src/main/scala/org/elasticmq/rest/sqs/QueueAttributesOps.scala @@ -17,13 +17,9 @@ import java.time.Duration import scala.async.Async.{async, await} import scala.concurrent.{ExecutionContext, Future} -trait QueueAttributesOps extends AttributesModule { +trait QueueAttributesOps extends AttributesModule with AwsConfiguration { this: Logging => - def awsRegion: String - - def awsAccountId: String - val attributeValuesCalculator = new AttributeValuesCalculator def getAllQueueAttributes(queueActor: ActorRef, queueData: QueueData)(implicit @@ -75,7 +71,7 @@ trait QueueAttributesOps extends AttributesModule { ), AttributeValuesCalculator.Rule( QueueArnAttribute, - () => Future.successful(s"arn:aws:sqs:$awsRegion:$awsAccountId:${queueData.name}") + () => Future.successful(getArn(queueData.name)) ) ) diff --git a/rest/rest-sqs/src/main/scala/org/elasticmq/rest/sqs/ReceiveMessageDirectives.scala b/rest/rest-sqs/src/main/scala/org/elasticmq/rest/sqs/ReceiveMessageDirectives.scala index b15f124b..565a0732 100644 --- a/rest/rest-sqs/src/main/scala/org/elasticmq/rest/sqs/ReceiveMessageDirectives.scala +++ b/rest/rest-sqs/src/main/scala/org/elasticmq/rest/sqs/ReceiveMessageDirectives.scala @@ -16,7 +16,7 @@ import java.time.Duration import scala.xml.Elem trait ReceiveMessageDirectives { - this: ElasticMQDirectives with AttributesModule with SQSLimitsModule with ResponseMarshaller => + this: ElasticMQDirectives with AttributesModule with SQSLimitsModule with ResponseMarshaller with AwsConfiguration => object MessageReadeableAttributeNames { val SentTimestampAttribute = "SentTimestamp" val ApproximateReceiveCountAttribute = "ApproximateReceiveCount" @@ -31,10 +31,11 @@ trait ReceiveMessageDirectives { val MessageGroupIdAttribute = "MessageGroupId" val AWSTraceHeaderAttribute = "AWSTraceHeader" val SequenceNumberAttribute = "SequenceNumber" + val DeadLetterQueueSourceArn = "DeadLetterQueueSourceArn" val AllAttributeNames = SentTimestampAttribute :: ApproximateReceiveCountAttribute :: ApproximateFirstReceiveTimestampAttribute :: SenderIdAttribute :: MessageDeduplicationIdAttribute :: - MessageGroupIdAttribute :: AWSTraceHeaderAttribute :: SequenceNumberAttribute :: Nil + MessageGroupIdAttribute :: AWSTraceHeaderAttribute :: SequenceNumberAttribute :: DeadLetterQueueSourceArn :: Nil } def receiveMessage(p: RequestPayload)(implicit marshallerDependencies: MarshallerDependencies) = { @@ -112,7 +113,8 @@ trait ReceiveMessageDirectives { }).toString) ), Rule(AWSTraceHeaderAttribute, () => msg.tracingId.map(_.id)), - Rule(SequenceNumberAttribute, () => msg.sequenceNumber) + Rule(SequenceNumberAttribute, () => msg.sequenceNumber), + Rule(DeadLetterQueueSourceArn, () => queueData.deadLettersQueue.map(qd => getArn(qd.name))) ) }