Skip to content

Commit

Permalink
return DeadLetterQueueSourceArn in receive message attributes (#985)
Browse files Browse the repository at this point in the history
  • Loading branch information
micossow authored Apr 3, 2024
1 parent 092e541 commit d9d8591
Show file tree
Hide file tree
Showing 5 changed files with 73 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2097,6 +2097,35 @@ class AmazonJavaSdkTestSuite extends SqsClientServerCommunication with Matchers
secondReceiveResult.getMessages shouldBe empty
}

test("should return DeadLetterQueueSourceArn in receive message attributes") {
// given
val messageBody = "Message 1"
client.createQueue(new CreateQueueRequest("testDlq")).getQueueUrl
val redrivePolicy = RedrivePolicy("testDlq", awsRegion, awsAccountId, 1).toJson.toString

val createQueueResult = client
.createQueue(
new CreateQueueRequest("main")
.withAttributes(
Map(redrivePolicyAttribute -> redrivePolicy).asJava
)
)
.getQueueUrl

// when
client.sendMessage(createQueueResult, messageBody)
val receiveResult = client.receiveMessage(
new ReceiveMessageRequest()
.withQueueUrl(createQueueResult)
.withAttributeNames("All")
)

// then
receiveResult.getMessages.asScala.toList.flatMap(_.getAttributes.asScala.toList) should contain(
("DeadLetterQueueSourceArn", s"arn:aws:sqs:$awsRegion:$awsAccountId:testDlq")
)
}

test("should list all source queues for a dlq") {
// given
val dlqUrl = client.createQueue(new CreateQueueRequest("testDlq")).getQueueUrl
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
package org.elasticmq.rest.sqs

import org.elasticmq.rest.sqs.model.RedrivePolicy
import org.elasticmq.{BinaryMessageAttribute, MessageAttribute, NumberMessageAttribute, StringMessageAttribute}
import org.elasticmq.rest.sqs.model.RedrivePolicyJson.format
import org.scalatest.matchers.should.Matchers
import software.amazon.awssdk.core.SdkBytes
import software.amazon.awssdk.services.sqs.model.{GetQueueUrlRequest => AwsSdkGetQueueUrlRequest, _}
import spray.json.enrichAny

import scala.collection.JavaConverters._

Expand Down Expand Up @@ -80,6 +83,30 @@ class AmazonJavaSdkV2TestSuite extends SqsClientServerWithSdkV2Communication wit
)
}

test("should return DeadLetterQueueSourceArn in receive message attributes") {
// Given
clientV2.createQueue(CreateQueueRequest.builder().queueName("testDlq").build())
val queue = clientV2.createQueue(CreateQueueRequest.builder()
.queueName("testQueue1")
.attributes(Map(QueueAttributeName.REDRIVE_POLICY -> RedrivePolicy("testDlq", awsRegion, awsAccountId, 1).toJson.toString).asJava)
.build())

// When
clientV2.sendMessage(SendMessageRequest.builder()
.queueUrl(queue.queueUrl())
.messageBody("test123")
.build())
val receiveResult = clientV2.receiveMessage(ReceiveMessageRequest.builder()
.queueUrl(queue.queueUrl())
.attributeNamesWithStrings("All")
.build())

// Then
receiveResult.messages().asScala.toList.flatMap(_.attributes().asScala.toList) should contain(
(MessageSystemAttributeName.DEAD_LETTER_QUEUE_SOURCE_ARN, s"arn:aws:sqs:$awsRegion:$awsAccountId:testDlq")
)
}

private def doTestSendAndReceiveMessage(content: String): Unit = {
doTestSendAndReceiveMessageWithAttributes(content, Map(), List())
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package org.elasticmq.rest.sqs

trait AwsConfiguration {

def awsRegion: String

def awsAccountId: String

def getArn(queueName: String): String = s"arn:aws:sqs:$awsRegion:$awsAccountId:$queueName"
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,9 @@ import java.time.Duration
import scala.async.Async.{async, await}
import scala.concurrent.{ExecutionContext, Future}

trait QueueAttributesOps extends AttributesModule {
trait QueueAttributesOps extends AttributesModule with AwsConfiguration {
this: Logging =>

def awsRegion: String

def awsAccountId: String

val attributeValuesCalculator = new AttributeValuesCalculator

def getAllQueueAttributes(queueActor: ActorRef, queueData: QueueData)(implicit
Expand Down Expand Up @@ -75,7 +71,7 @@ trait QueueAttributesOps extends AttributesModule {
),
AttributeValuesCalculator.Rule(
QueueArnAttribute,
() => Future.successful(s"arn:aws:sqs:$awsRegion:$awsAccountId:${queueData.name}")
() => Future.successful(getArn(queueData.name))
)
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import java.time.Duration
import scala.xml.Elem

trait ReceiveMessageDirectives {
this: ElasticMQDirectives with AttributesModule with SQSLimitsModule with ResponseMarshaller =>
this: ElasticMQDirectives with AttributesModule with SQSLimitsModule with ResponseMarshaller with AwsConfiguration =>
object MessageReadeableAttributeNames {
val SentTimestampAttribute = "SentTimestamp"
val ApproximateReceiveCountAttribute = "ApproximateReceiveCount"
Expand All @@ -31,10 +31,11 @@ trait ReceiveMessageDirectives {
val MessageGroupIdAttribute = "MessageGroupId"
val AWSTraceHeaderAttribute = "AWSTraceHeader"
val SequenceNumberAttribute = "SequenceNumber"
val DeadLetterQueueSourceArn = "DeadLetterQueueSourceArn"

val AllAttributeNames = SentTimestampAttribute :: ApproximateReceiveCountAttribute ::
ApproximateFirstReceiveTimestampAttribute :: SenderIdAttribute :: MessageDeduplicationIdAttribute ::
MessageGroupIdAttribute :: AWSTraceHeaderAttribute :: SequenceNumberAttribute :: Nil
MessageGroupIdAttribute :: AWSTraceHeaderAttribute :: SequenceNumberAttribute :: DeadLetterQueueSourceArn :: Nil
}

def receiveMessage(p: RequestPayload)(implicit marshallerDependencies: MarshallerDependencies) = {
Expand Down Expand Up @@ -112,7 +113,8 @@ trait ReceiveMessageDirectives {
}).toString)
),
Rule(AWSTraceHeaderAttribute, () => msg.tracingId.map(_.id)),
Rule(SequenceNumberAttribute, () => msg.sequenceNumber)
Rule(SequenceNumberAttribute, () => msg.sequenceNumber),
Rule(DeadLetterQueueSourceArn, () => queueData.deadLettersQueue.map(qd => getArn(qd.name)))
)
}

Expand Down

0 comments on commit d9d8591

Please sign in to comment.