diff --git a/lib/amqp/helper.ex b/lib/amqp/helper.ex index 1dcbc40..a79901e 100644 --- a/lib/amqp/helper.ex +++ b/lib/amqp/helper.ex @@ -5,21 +5,52 @@ defmodule Amqpx.Helper do require Logger - alias Amqpx.{Exchange, Queue} - + alias Amqpx.{Basic, Channel, Exchange, Queue} + + @dead_letter_queue_defaults [durable: true] + + # Supervisor.module_spec() has been introduced with elixir 1.16 + # we can remove this when we update the minimum supported version + @type module_spec :: {module, arg :: any} + + @type exchange_spec :: %{ + name: Basic.exchange(), + type: atom, + routing_keys: [String.t()], + opts: Keyword.t() + } + + @type queue_spec :: %{ + :queue => Basic.queue(), + :exchanges => [exchange_spec], + optional(:opts) => Keyword.t() + } + + @type dead_letter_queue_spec :: %{ + :queue => Basic.queue(), + :exchange => Basic.exchange(), + :routing_key => String.t(), + optional(:original_routing_keys) => [String.t() | [String.t()]], + optional(:queue_opts) => Keyword.t() + } + + @spec manager_supervisor_configuration(Keyword.t()) :: module_spec def manager_supervisor_configuration(config) do {Amqpx.Gen.ConnectionManager, %{connection_params: encrypt_password(config)}} end + @spec consumers_supervisor_configuration([handler_conf :: map]) :: [Supervisor.child_spec()] def consumers_supervisor_configuration(handlers_conf) do amqp_signal_handler() ++ Enum.map(handlers_conf, &Supervisor.child_spec({Amqpx.Gen.Consumer, &1}, id: UUID.uuid1())) end + @spec producer_supervisor_configuration(producer_conf :: map) :: module_spec def producer_supervisor_configuration(producer_conf) do {Amqpx.Gen.Producer, producer_conf} end + @spec encrypt_password(Keyword.t()) :: Keyword.t() def encrypt_password(config) do case Keyword.get(config, :obfuscate_password, true) do true -> @@ -30,6 +61,7 @@ defmodule Amqpx.Helper do end end + @spec get_password(Keyword.t(), Keyword.t() | nil) :: Keyword.value() def get_password(config, nil) do case Keyword.get(config, :obfuscate_password, true) do true -> @@ -50,6 +82,7 @@ defmodule Amqpx.Helper do end end + @spec declare(Channel.t(), queue_spec) :: :ok | no_return def declare( channel, %{ @@ -58,10 +91,12 @@ defmodule Amqpx.Helper do exchanges: exchanges } = queue ) do - case Enum.find(opts[:arguments], &match?({"x-dead-letter-exchange", :longstr, _}, &1)) do + arguments = Keyword.get(opts, :arguments, []) + + case Enum.find(arguments, &match?({"x-dead-letter-exchange", :longstr, _}, &1)) do {_, _, dle} -> {dlr_config_key, dlr_config_value} = - case Enum.find(opts[:arguments], &match?({"x-dead-letter-routing-key", :longstr, _}, &1)) do + case Enum.find(arguments, &match?({"x-dead-letter-routing-key", :longstr, _}, &1)) do {_, _, dlrk} -> {:routing_key, dlrk} @@ -73,7 +108,8 @@ defmodule Amqpx.Helper do setup_dead_lettering(channel, %{ dlr_config_key => dlr_config_value, queue: "#{qname}_errored", - exchange: dle + exchange: dle, + queue_opts: set_dead_letter_queue_type(@dead_letter_queue_defaults, arguments) }) nil -> @@ -87,10 +123,11 @@ defmodule Amqpx.Helper do setup_queue(channel, queue) end - def setup_dead_lettering(channel, %{queue: dlq, exchange: "", routing_key: dlq}) do + @spec setup_dead_lettering(Channel.t(), dead_letter_queue_spec) :: :ok | {:ok, map} | Basic.error() + def setup_dead_lettering(channel, %{queue: dlq, exchange: "", routing_key: dlq} = spec) do # DLX will work through [default exchange](https://www.rabbitmq.com/tutorials/amqp-concepts.html#exchange-default) # since `x-dead-letter-routing-key` matches the queue name - Queue.declare(channel, dlq, durable: true) + Queue.declare(channel, dlq, dead_letter_queue_opts(spec)) end def setup_dead_lettering(_channel, %{queue: dlq, exchange: "", routing_key: bad_dlq}) do @@ -104,15 +141,18 @@ defmodule Amqpx.Helper do end end - def setup_dead_lettering(channel, %{queue: dlq, exchange: exchange, routing_key: routing_key}) do + def setup_dead_lettering(channel, %{queue: dlq, exchange: exchange, routing_key: routing_key} = spec) do Exchange.declare(channel, exchange, :topic, durable: true) - Queue.declare(channel, dlq, durable: true) + Queue.declare(channel, dlq, dead_letter_queue_opts(spec)) Queue.bind(channel, dlq, exchange, routing_key: routing_key) end - def setup_dead_lettering(channel, %{queue: dlq, exchange: exchange, original_routing_keys: original_routing_keys}) do + def setup_dead_lettering( + channel, + %{queue: dlq, exchange: exchange, original_routing_keys: original_routing_keys} = spec + ) do Exchange.declare(channel, exchange, :topic, durable: true) - Queue.declare(channel, dlq, durable: true) + Queue.declare(channel, dlq, dead_letter_queue_opts(spec)) original_routing_keys |> List.flatten() @@ -122,6 +162,7 @@ defmodule Amqpx.Helper do end) end + @spec setup_queue(Channel.t(), queue_spec) :: :ok | no_return def setup_queue(channel, %{ queue: queue, exchanges: exchanges, @@ -141,6 +182,7 @@ defmodule Amqpx.Helper do Enum.each(exchanges, &setup_exchange(channel, queue, &1)) end + @spec setup_exchange(Channel.t(), Basic.queue(), exchange_spec) :: :ok | Basic.error() | no_return def setup_exchange(channel, queue, %{ name: name, type: type, @@ -190,6 +232,23 @@ defmodule Amqpx.Helper do Exchange.declare(channel, name, type) end + @spec dead_letter_queue_opts(dead_letter_queue_spec) :: Keyword.t() + defp dead_letter_queue_opts(spec) do + Map.get(spec, :queue_opts, @dead_letter_queue_defaults) + end + + @spec set_dead_letter_queue_type(Keyword.t(), [{String.t(), atom, any}]) :: Keyword.t() + defp set_dead_letter_queue_type(dlq_opts, queue_args) do + case Enum.find(queue_args, &match?({"x-queue-type", :longstr, _}, &1)) do + nil -> + dlq_opts + + queue_type -> + Keyword.update(dlq_opts, :arguments, [queue_type], &[queue_type | &1]) + end + end + + @spec amqp_signal_handler() :: [Supervisor.child_spec()] defp amqp_signal_handler, do: [ %{ diff --git a/test/helper_test.exs b/test/helper_test.exs index a29086b..252481d 100644 --- a/test/helper_test.exs +++ b/test/helper_test.exs @@ -147,6 +147,46 @@ defmodule HelperTest do Application.put_env(:amqpx, :skip_dead_letter_routing_key_check_for, []) end + test "declare/2 propagates x-queue-type to dead letter queue declaration", + meta do + queue_name = rand_name() + routing_key_name = rand_name() + exchange_name = rand_name() + dead_letter_queue = "#{queue_name}_errored" + + assert :ok == + Helper.declare(meta[:chan], %{ + exchanges: [ + %{name: exchange_name, opts: [durable: true], routing_keys: [routing_key_name], type: :topic} + ], + opts: [ + durable: true, + arguments: [ + {"x-dead-letter-exchange", :longstr, ""}, + {"x-dead-letter-routing-key", :longstr, dead_letter_queue}, + {"x-queue-type", :longstr, "quorum"} + ] + ], + queue: queue_name + }) + + rabbit_manager = Application.get_env(:amqpx, :rabbit_manager_url).rabbit + amqp_conn = Application.get_env(:amqpx, :amqp_connection) + credentials = Base.encode64("#{amqp_conn[:username]}:#{amqp_conn[:password]}") + headers = [{~c"Authorization", "Basic #{credentials}"}] + + assert {:ok, {{_, 200, ~c"OK"}, _headers, body}} = + :httpc.request(:get, {"http://#{rabbit_manager}/api/queues", headers}, [], []) + + assert {:ok, queues} = Jason.decode(body) + + assert %{"durable" => true, "arguments" => %{"x-queue-type" => "quorum"}} = + Enum.find(queues, fn q -> match?(%{"name" => ^queue_name}, q) end) + + assert %{"durable" => true, "arguments" => %{"x-queue-type" => "quorum"}} = + Enum.find(queues, fn q -> match?(%{"name" => ^dead_letter_queue}, q) end) + end + defp rand_name do :crypto.strong_rand_bytes(8) |> Base.encode64() end