From 127b91211c0d8d1b1126774af33f81e5f2e9ba58 Mon Sep 17 00:00:00 2001 From: Florian Pilz Date: Fri, 8 Dec 2023 14:46:36 +0100 Subject: [PATCH 1/4] Add option for exponential back off Our current implementation only has an algorithm that grows the back off linearly. For example if you have a 1 minute back off, the back off algorithm will grow the back off by 1 minute each time, i.e. 1 minute, 2 minutes, 3 minutes, etc. This is not ideal if you need your first retries within seconds, but want the last retry on the next day to allow for system recovery, and spread out retries nicely. This commit adds an option to use an exponential back off algorithm. For example if you have a 1 minute back off, the back off algorithm will grow the back off by 1 minute, 2 minutes, 4 minutes, 8 minutes, 16 minutes, etc. This way you can start with seconds and end with days. We already have the option `queue_max_retry_delay` to cap how big the exponential function is allowed to grow. --- README.md | 1 + .../aws_calculate_visibility_timeout.rb | 23 +++++--- lib/eventq/eventq_aws/aws_queue_worker.rb | 11 ++-- lib/eventq/eventq_base/queue.rb | 3 + .../eventq_rabbitmq/rabbitmq_queue_worker.rb | 7 ++- .../aws_calculate_visibility_timeout_spec.rb | 45 +++++++++++++- .../integration/aws_queue_worker_spec.rb | 59 +++++++++++++++++-- .../rabbitmq_queue_worker_spec.rb | 56 +++++++++++++++--- utilities/plot_visibility_timeout.rb | 49 +++++++-------- 9 files changed, 202 insertions(+), 52 deletions(-) diff --git a/README.md b/README.md index 690d4f8..0302a10 100644 --- a/README.md +++ b/README.md @@ -54,6 +54,7 @@ A subscription queue should be defined to receive any events raised for the subs - **allow_retry** [Bool] [Optional] [Default=false] This determines if the queue should allow processing failures to be retried. - **allow_retry_back_off** [Bool] [Optional] [Default=false] This is used to specify if failed messages that retry should incrementally backoff. + - **allow_exponential_back_off** [Bool] [Optional] [Default=false] This is used to specify if failed messages that retry should expontentially backoff. - **retry_back_off_grace** [Int] [Optional] [Default=0] This is the number of times to allow retries without applying retry back off if enabled. - **dlq** [EventQ::Queue] [Optional] [Default=nil] A queue that will receive the messages which were not successfully processed after maximum number of receives by consumers. This is created at the same time as the parent queue. - **max_retry_attempts** [Int] [Optional] [Default=5] This is used to specify the max number of times an event should be allowed to retry before failing. diff --git a/lib/eventq/eventq_aws/aws_calculate_visibility_timeout.rb b/lib/eventq/eventq_aws/aws_calculate_visibility_timeout.rb index ccf8cc8..039c99a 100644 --- a/lib/eventq/eventq_aws/aws_calculate_visibility_timeout.rb +++ b/lib/eventq/eventq_aws/aws_calculate_visibility_timeout.rb @@ -14,19 +14,21 @@ def initialize(max_timeout:, logger: EventQ.logger) # @param retry_attempts [Integer] Current retry # @param queue_settings [Hash] Queue settings # @option allow_retry_back_off [Bool] Enables/Disables backoff strategy + # @option allow_exponential_back_off [Bool] Enables/Disables exponential backoff strategy # @option max_retry_delay [Integer] Maximum amount of time a retry will take in ms # @option retry_back_off_grace [Integer] Amount of retries to wait before starting to backoff # @option retry_back_off_weight [Integer] Multiplier for the backoff retry # @option retry_delay [Integer] Amount of time to wait until retry in ms # @return [Integer] the calculated visibility timeout in seconds def call(retry_attempts:, queue_settings:) - @retry_attempts = retry_attempts + @retry_attempts = retry_attempts - @allow_retry_back_off = queue_settings.fetch(:allow_retry_back_off) - @max_retry_delay = queue_settings.fetch(:max_retry_delay) - @retry_back_off_grace = queue_settings.fetch(:retry_back_off_grace) - @retry_back_off_weight= queue_settings.fetch(:retry_back_off_weight) - @retry_delay = queue_settings.fetch(:retry_delay) + @allow_retry_back_off = queue_settings.fetch(:allow_retry_back_off) + @allow_exponential_back_off = queue_settings.fetch(:allow_exponential_back_off, false) + @max_retry_delay = queue_settings.fetch(:max_retry_delay) + @retry_back_off_grace = queue_settings.fetch(:retry_back_off_grace) + @retry_back_off_weight = queue_settings.fetch(:retry_back_off_weight) + @retry_delay = queue_settings.fetch(:retry_delay) if @allow_retry_back_off && retry_past_grace_period? visibility_timeout = timeout_with_back_off @@ -53,7 +55,12 @@ def timeout_without_back_off def timeout_with_back_off factor = @retry_attempts - @retry_back_off_grace - visibility_timeout = ms_to_seconds(@retry_delay * factor * @retry_back_off_weight) + visibility_timeout = if @allow_exponential_back_off + ms_to_seconds(@retry_delay * @retry_back_off_weight * 2 ** (factor - 1)) + else + ms_to_seconds(@retry_delay * @retry_back_off_weight * factor) + end + max_retry_delay = ms_to_seconds(@max_retry_delay) if visibility_timeout > max_retry_delay @@ -77,4 +84,4 @@ def check_for_max_timeout(visibility_timeout) end end end -end \ No newline at end of file +end diff --git a/lib/eventq/eventq_aws/aws_queue_worker.rb b/lib/eventq/eventq_aws/aws_queue_worker.rb index 385ae31..c1dc010 100644 --- a/lib/eventq/eventq_aws/aws_queue_worker.rb +++ b/lib/eventq/eventq_aws/aws_queue_worker.rb @@ -122,11 +122,12 @@ def reject_message(queue, poller, msg, retry_attempts, message, args) visibility_timeout = @calculate_visibility_timeout.call( retry_attempts: retry_attempts, queue_settings: { - allow_retry_back_off: queue.allow_retry_back_off, - max_retry_delay: queue.max_retry_delay, - retry_back_off_grace: queue.retry_back_off_grace, - retry_back_off_weight: queue.retry_back_off_weight, - retry_delay: queue.retry_delay + allow_retry_back_off: queue.allow_retry_back_off, + allow_exponential_back_off: queue.allow_exponential_back_off, + max_retry_delay: queue.max_retry_delay, + retry_back_off_grace: queue.retry_back_off_grace, + retry_back_off_weight: queue.retry_back_off_weight, + retry_delay: queue.retry_delay } ) diff --git a/lib/eventq/eventq_base/queue.rb b/lib/eventq/eventq_base/queue.rb index 085f0de..bb984ec 100644 --- a/lib/eventq/eventq_base/queue.rb +++ b/lib/eventq/eventq_base/queue.rb @@ -2,6 +2,7 @@ module EventQ class Queue attr_accessor :allow_retry attr_accessor :allow_retry_back_off + attr_accessor :allow_exponential_back_off attr_accessor :dlq attr_accessor :max_retry_attempts attr_accessor :max_retry_delay @@ -20,6 +21,8 @@ def initialize @allow_retry = false # Default retry back off settings @allow_retry_back_off = false + # Default exponential back off settings + @allow_exponential_back_off = false # Default max receive count is 30 @max_receive_count = 30 # Default max retry attempts is 5 diff --git a/lib/eventq/eventq_rabbitmq/rabbitmq_queue_worker.rb b/lib/eventq/eventq_rabbitmq/rabbitmq_queue_worker.rb index c8c9183..0da587d 100644 --- a/lib/eventq/eventq_rabbitmq/rabbitmq_queue_worker.rb +++ b/lib/eventq/eventq_rabbitmq/rabbitmq_queue_worker.rb @@ -80,7 +80,12 @@ def reject_message(channel, message, delivery_tag, retry_exchange, queue, abort) retry_attempts = 1 if retry_attempts < 1 if queue.allow_retry_back_off == true - message_ttl = retry_attempts * queue.retry_delay + message_ttl = if queue.allow_exponential_back_off + queue.retry_delay * 2 ** (retry_attempts - 1) + else + queue.retry_delay * retry_attempts + end + if (retry_attempts * queue.retry_delay) > queue.max_retry_delay EventQ.logger.debug { "[#{self.class}] - Max message back off retry delay reached." } message_ttl = queue.max_retry_delay diff --git a/spec/eventq_aws/aws_calculate_visibility_timeout_spec.rb b/spec/eventq_aws/aws_calculate_visibility_timeout_spec.rb index a9cc5b7..7ab8ec2 100644 --- a/spec/eventq_aws/aws_calculate_visibility_timeout_spec.rb +++ b/spec/eventq_aws/aws_calculate_visibility_timeout_spec.rb @@ -5,7 +5,7 @@ let(:retry_delay) { 30_000 } # 30s let(:max_retry_delay) { 100_000 } # 100s let(:retry_back_off_grace) { 1000 } # iterations before the backoff grace quicks in - let(:retry_back_off_weight) { 1 } # backoff multiplier + let(:retry_back_off_weight) { 1 } # backoff multiplier subject { described_class.new(max_timeout: max_timeout) } @@ -134,9 +134,50 @@ expect(result).to eq(ms_to_seconds(retry_delay) * retries_past_grace_period * retry_back_off_weight) end end + + context 'when exponential backoff is enabled' do + let(:max_retry_delay) { 40_000_000 } + let(:allow_exponential_back_off) { true } + + it 'grow the delay by 2 to the power of retries past grace period' do + retries_past_grace_period = 10 + + result = subject.call( + retry_attempts: retry_back_off_grace + retries_past_grace_period, + queue_settings: { + allow_retry_back_off: allow_retry_back_off, + allow_exponential_back_off: allow_exponential_back_off, + max_retry_delay: max_retry_delay, + retry_back_off_grace: retry_back_off_grace, + retry_delay: retry_delay, + retry_back_off_weight: retry_back_off_weight + } + ) + + expect(result).to eq(ms_to_seconds(retry_delay) * 2 ** (retries_past_grace_period - 1)) + end + + it 'still caps at max delay' do + retries_past_grace_period = 20 + + result = subject.call( + retry_attempts: retry_back_off_grace + retries_past_grace_period, + queue_settings: { + allow_retry_back_off: allow_retry_back_off, + allow_exponential_back_off: allow_exponential_back_off, + max_retry_delay: max_retry_delay, + retry_back_off_grace: retry_back_off_grace, + retry_delay: retry_delay, + retry_back_off_weight: retry_back_off_weight + } + ) + + expect(result).to eq(ms_to_seconds(max_retry_delay)) + end + end end def ms_to_seconds(value) value / 1000 end -end \ No newline at end of file +end diff --git a/spec/eventq_aws/integration/aws_queue_worker_spec.rb b/spec/eventq_aws/integration/aws_queue_worker_spec.rb index d8d81c0..1f352aa 100644 --- a/spec/eventq_aws/integration/aws_queue_worker_spec.rb +++ b/spec/eventq_aws/integration/aws_queue_worker_spec.rb @@ -312,17 +312,25 @@ end context 'queue.allow_retry_back_off = true' do + let(:retry_delay) { 1_000 } + let(:max_retry_delay) { 5_000 } + + let(:allow_retry) { true } + let(:allow_retry_back_off) { true } + let(:allow_exponential_back_off) { false } + before do - subscriber_queue.retry_delay = 1000 - subscriber_queue.allow_retry = true - subscriber_queue.allow_retry_back_off = true - subscriber_queue.max_retry_delay = 5000 - end + subscriber_queue.retry_delay = retry_delay + subscriber_queue.max_retry_delay = max_retry_delay + subscriber_queue.allow_retry = allow_retry + subscriber_queue.allow_retry_back_off = allow_retry_back_off + subscriber_queue.allow_exponential_back_off = allow_exponential_back_off - it 'should receive an event from the subscriber queue and retry it.' do subscription_manager.subscribe(event_type, subscriber_queue) eventq_client.raise_event(event_type, message) + end + it 'should receive an event from the subscriber queue and retry it' do retry_attempt_count = 0 # wait 1 second to allow the message to be sent and broadcast to the queue @@ -355,6 +363,45 @@ expect(queue_worker.is_running).to eq(false) end + + context 'queue.allow_exponential_back_off = true' do + let(:max_retry_delay) { 10_000 } + let(:allow_exponential_back_off) { true } + + it 'retries received event with an exponential waiting period' do + retry_attempt_count = 0 + + # wait 1 second to allow the message to be sent and broadcast to the queue + sleep(1) + + queue_worker.start(subscriber_queue, worker_adapter: subject, wait: false, block_process: false, sleep: 1, thread_count: 1, client: queue_client) do |event, args| + expect(event).to eq(message) + expect(args).to be_a(EventQ::MessageArgs) + retry_attempt_count = args.retry_attempts + 1 + raise 'Fail on purpose to send event to retry queue.' + end + + sleep(1) + + expect(retry_attempt_count).to eq(1) + + sleep(2) + + expect(retry_attempt_count).to eq(2) + + sleep(4) + + expect(retry_attempt_count).to eq(3) + + sleep(8) + + expect(retry_attempt_count).to eq(4) + + queue_worker.stop + + expect(queue_worker.is_running).to eq(false) + end + end end def add_to_received_list(received_messages) diff --git a/spec/eventq_rabbitmq/rabbitmq_queue_worker_spec.rb b/spec/eventq_rabbitmq/rabbitmq_queue_worker_spec.rb index e518dfe..6dda3b3 100644 --- a/spec/eventq_rabbitmq/rabbitmq_queue_worker_spec.rb +++ b/spec/eventq_rabbitmq/rabbitmq_queue_worker_spec.rb @@ -448,18 +448,25 @@ end context 'queue.allow_retry_back_off = true' do - it 'should send messages that fail to process to the retry queue and then receive them again after the retry delay' do + let(:subscriber_queue) { EventQ::Queue.new } + let(:allow_retry) { true } + let(:allow_retry_back_off) { true } + let(:allow_exponential_back_off) { false } - event_type = SecureRandom.uuid - subscriber_queue = EventQ::Queue.new + before do subscriber_queue.name = SecureRandom.uuid - #set queue retry delay to 0.5 seconds + subscriber_queue.allow_retry = allow_retry + subscriber_queue.allow_retry_back_off = allow_retry_back_off + subscriber_queue.allow_exponential_back_off = allow_exponential_back_off + + # set queue retry delay to 0.5 seconds subscriber_queue.retry_delay = 500 - subscriber_queue.allow_retry = true - subscriber_queue.allow_retry_back_off = true - #set to max retry delay to 5 seconds + + # set to max retry delay to 5 seconds subscriber_queue.max_retry_delay = 5000 + event_type = SecureRandom.uuid + qm = EventQ::RabbitMq::QueueManager.new q = qm.get_queue(channel, subscriber_queue) q.delete @@ -471,7 +478,9 @@ eqclient = EventQ::RabbitMq::EventQClient.new({client: client, subscription_manager: subscription_manager}) eqclient.raise_event(event_type, message) + end + it 'should send messages that fail to process to the retry queue and then receive them again after the retry delay' do retry_attempt_count = 0 queue_worker.start(subscriber_queue, { worker_adapter: subject, wait: false, block_process: false, :thread_count => 1, :sleep => 0.5, client: client}) do |event, args| @@ -498,8 +507,41 @@ queue_worker.stop expect(queue_worker.running?).to eq(false) + end + + context 'queue.allow_exponential_back_off = true' do + let(:allow_exponential_back_off) { true } + + it 'retries by exponential waiting period' do + retry_attempt_count = 0 + queue_worker.start(subscriber_queue, { worker_adapter: subject, wait: false, block_process: false, :thread_count => 1, :sleep => 0.5, client: client}) do |event, args| + retry_attempt_count = args.retry_attempts + raise 'Fail on purpose to send event to retry queue.' + end + + sleep(0.6) + + expect(retry_attempt_count).to eq(1) + + sleep(1.1) + + expect(retry_attempt_count).to eq(2) + + sleep(2.1) + + expect(retry_attempt_count).to eq(3) + + sleep(4.1) + + expect(retry_attempt_count).to eq(4) + + queue_worker.stop + + expect(queue_worker.running?).to eq(false) + end end + end context 'retry block execution' do diff --git a/utilities/plot_visibility_timeout.rb b/utilities/plot_visibility_timeout.rb index ff949a2..5ab8775 100644 --- a/utilities/plot_visibility_timeout.rb +++ b/utilities/plot_visibility_timeout.rb @@ -9,16 +9,17 @@ class PlotVisibilityTimeout def plot(settings) setup - @plot_seconds = settings.fetch(:plot_seconds) - @plot_min_timeout = settings.fetch(:plot_min_timeout) - @plot_file_name = "#{PLOT_FOLDER}/plot_#{settings.values.join('__')}" - - @queue_allow_retry_back_off = settings.fetch(:queue_allow_retry_back_off) - @queue_retry_back_off_weight = settings.fetch(:queue_retry_back_off_weight) - @queue_max_retry_delay = settings.fetch(:queue_max_retry_delay) - @queue_max_timeout = settings.fetch(:queue_max_timeout) - @queue_retry_back_off_grace = settings.fetch(:queue_retry_back_off_grace) - @queue_retry_delay = settings.fetch(:queue_retry_delay) + @plot_seconds = settings.fetch(:plot_seconds) + @plot_min_timeout = settings.fetch(:plot_min_timeout) + @plot_file_name = "#{PLOT_FOLDER}/plot_#{settings.values.join('__')}" + + @queue_allow_retry_back_off = settings.fetch(:queue_allow_retry_back_off) + @queue_allow_exponential_back_off = settings.fetch(:queue_allow_exponential_back_off) + @queue_retry_back_off_weight = settings.fetch(:queue_retry_back_off_weight) + @queue_max_retry_delay = settings.fetch(:queue_max_retry_delay) + @queue_max_timeout = settings.fetch(:queue_max_timeout) + @queue_retry_back_off_grace = settings.fetch(:queue_retry_back_off_grace) + @queue_retry_delay = settings.fetch(:queue_retry_delay) logger = Logger.new(STDOUT) @@ -70,11 +71,12 @@ def calculate(retry_counter) calculator.call( retry_attempts: retry_counter, queue_settings: { - allow_retry_back_off: @queue_allow_retry_back_off, - max_retry_delay: @queue_max_retry_delay, - retry_back_off_grace: @queue_retry_back_off_grace, - retry_back_off_weight: @queue_retry_back_off_weight, - retry_delay: @queue_retry_delay, + allow_retry_back_off: @queue_allow_retry_back_off, + allow_exponential_back_off: @queue_allow_exponential_back_off, + max_retry_delay: @queue_max_retry_delay, + retry_back_off_grace: @queue_retry_back_off_grace, + retry_back_off_weight: @queue_retry_back_off_weight, + retry_delay: @queue_retry_delay, } ) end @@ -110,24 +112,25 @@ def print_output(retry_counter, max_visibility_timeout,total_elapsed_time) settings = { # Sometimes the calculated timeout is zero so we must default to a value # since in real life the is no zero second connections. - plot_min_timeout: 0.03, # 30ms which is the average connection time between worker and queue + plot_min_timeout: 0.03, # 30ms which is the average connection time between worker and queue # The amount of time we should plot for. - plot_seconds: 72*60*60, # simulate 72h - queue_allow_retry_back_off: true, # enables backoff strategy + plot_seconds: 72*60*60, # simulate 72h + queue_allow_retry_back_off: true, # enables backoff strategy + queue_allow_exponential_back_off: false, # disables exponential backoff strategy # The cap value for the queue retry - queue_max_retry_delay: 1_500_000, # will wait max 1500s - queue_max_timeout: 43_200, # 12h which AWS max message visibility timeout + queue_max_retry_delay: 1_500_000, # will wait max 1500s + queue_max_timeout: 43_200, # 12h which AWS max message visibility timeout # Waiting period before the backoff strategy kicks in. # Multiply with query_retry_delay and divide by 60 to see how many minutes it will wait. - queue_retry_back_off_grace: 30_000, # wait 15min: (queue_retry_back_off_grace * queue_retry_delay)/60 + queue_retry_back_off_grace: 30_000, # wait 15min: (queue_retry_back_off_grace * queue_retry_delay)/60 # Delay and retry for each queue iterations. The multiplier is necessary in case the calculated values # are insignificant between iterations. - queue_retry_back_off_weight: 100, # Backoff multiplier - queue_retry_delay: 30 # 30ms + queue_retry_back_off_weight: 100, # Backoff multiplier + queue_retry_delay: 30 # 30ms } PlotVisibilityTimeout.new.plot(settings) From 8df507ca6669d8608b70443a094fe7452165ba9f Mon Sep 17 00:00:00 2001 From: Florian Pilz Date: Mon, 11 Dec 2023 14:31:57 +0100 Subject: [PATCH 2/4] Remove default setting inside calc visibility timeout Because the default is already set in the base queue class, it's not necessary to have a fallback value when fetching the queue settings. Since the new setting is now mandatory for calling the visibility timeout class, adjust the specs to use a let-block for the queue settings throughout the spec file to simplify it and make it easy to add a new queue setting. --- .../aws_calculate_visibility_timeout.rb | 2 +- .../aws_calculate_visibility_timeout_spec.rb | 112 ++++++------------ 2 files changed, 38 insertions(+), 76 deletions(-) diff --git a/lib/eventq/eventq_aws/aws_calculate_visibility_timeout.rb b/lib/eventq/eventq_aws/aws_calculate_visibility_timeout.rb index 039c99a..ff9a97c 100644 --- a/lib/eventq/eventq_aws/aws_calculate_visibility_timeout.rb +++ b/lib/eventq/eventq_aws/aws_calculate_visibility_timeout.rb @@ -24,7 +24,7 @@ def call(retry_attempts:, queue_settings:) @retry_attempts = retry_attempts @allow_retry_back_off = queue_settings.fetch(:allow_retry_back_off) - @allow_exponential_back_off = queue_settings.fetch(:allow_exponential_back_off, false) + @allow_exponential_back_off = queue_settings.fetch(:allow_exponential_back_off) @max_retry_delay = queue_settings.fetch(:max_retry_delay) @retry_back_off_grace = queue_settings.fetch(:retry_back_off_grace) @retry_back_off_weight = queue_settings.fetch(:retry_back_off_weight) diff --git a/spec/eventq_aws/aws_calculate_visibility_timeout_spec.rb b/spec/eventq_aws/aws_calculate_visibility_timeout_spec.rb index 7ab8ec2..2720434 100644 --- a/spec/eventq_aws/aws_calculate_visibility_timeout_spec.rb +++ b/spec/eventq_aws/aws_calculate_visibility_timeout_spec.rb @@ -1,12 +1,26 @@ require 'spec_helper' RSpec.describe EventQ::Amazon::CalculateVisibilityTimeout do + let(:allow_retry_back_off) { false } + let(:allow_exponential_back_off) { false } + let(:max_timeout) { 43_200 } # 43_200s (12h) - let(:retry_delay) { 30_000 } # 30s let(:max_retry_delay) { 100_000 } # 100s + let(:retry_delay) { 30_000 } # 30s let(:retry_back_off_grace) { 1000 } # iterations before the backoff grace quicks in let(:retry_back_off_weight) { 1 } # backoff multiplier + let(:queue_settings) do + { + allow_retry_back_off: allow_retry_back_off, + allow_exponential_back_off: allow_exponential_back_off, + max_retry_delay: max_retry_delay, + retry_delay: retry_delay, + retry_back_off_grace: retry_back_off_grace, + retry_back_off_weight: retry_back_off_weight + } + end + subject { described_class.new(max_timeout: max_timeout) } context 'when retry backoff is disabled' do @@ -14,28 +28,16 @@ it 'does not introduces backoff' do result = subject.call( - retry_attempts: 1, - queue_settings: { - allow_retry_back_off: allow_retry_back_off, - max_retry_delay: max_retry_delay, - retry_back_off_grace: retry_back_off_grace, - retry_delay: retry_delay, - retry_back_off_weight: retry_back_off_weight - } + retry_attempts: 1, + queue_settings: queue_settings ) expect(result).to eq(ms_to_seconds(retry_delay)) result = subject.call( - retry_attempts: retry_back_off_grace + 100, - queue_settings: { - allow_retry_back_off: allow_retry_back_off, - max_retry_delay: max_retry_delay, - retry_back_off_grace: retry_back_off_grace, - retry_delay: retry_delay, - retry_back_off_weight: retry_back_off_weight - } + retry_attempts: retry_back_off_grace + 100, + queue_settings: queue_settings ) expect(result).to eq(ms_to_seconds(retry_delay)) @@ -48,14 +50,8 @@ context 'when the retry_attempts is lower than the retry_back_off_grace' do it 'does not introduce backoff' do result = subject.call( - retry_attempts: retry_back_off_grace - 1, - queue_settings: { - allow_retry_back_off: allow_retry_back_off, - max_retry_delay: max_retry_delay, - retry_back_off_grace: retry_back_off_grace, - retry_delay: retry_delay, - retry_back_off_weight: retry_back_off_weight - } + retry_attempts: retry_back_off_grace - 1, + queue_settings: queue_settings ) expect(result).to eq(ms_to_seconds(retry_delay)) @@ -67,14 +63,8 @@ retries_past_grace_period = 2 result = subject.call( - retry_attempts: retry_back_off_grace + retries_past_grace_period, - queue_settings: { - allow_retry_back_off: allow_retry_back_off, - max_retry_delay: max_retry_delay, - retry_back_off_grace: retry_back_off_grace, - retry_delay: retry_delay, - retry_back_off_weight: retry_back_off_weight - } + retry_attempts: retry_back_off_grace + retries_past_grace_period, + queue_settings: queue_settings ) expect(result).to eq(ms_to_seconds(retry_delay) * retries_past_grace_period) @@ -84,14 +74,8 @@ context 'when the visible_timeout exceeds the max_retry_delay' do it 'returns the max_retry_delay' do result = subject.call( - retry_attempts: retry_back_off_grace + 100_000, - queue_settings: { - allow_retry_back_off: allow_retry_back_off, - max_retry_delay: max_retry_delay, - retry_back_off_grace: retry_back_off_grace, - retry_delay: retry_delay, - retry_back_off_weight: retry_back_off_weight - } + retry_attempts: retry_back_off_grace + 100_000, + queue_settings: queue_settings ) expect(result).to eq(ms_to_seconds(max_retry_delay)) @@ -99,16 +83,12 @@ end context 'when the visible_timeout is bigger than max_timeout' do + let(:max_retry_delay) { 50_000_000 } + it 'the visible_timeout is set to max_timeout' do result = subject.call( - retry_attempts: retry_back_off_grace + 100_000, - queue_settings: { - allow_retry_back_off: allow_retry_back_off, - max_retry_delay: 50_000_000, - retry_back_off_grace: retry_back_off_grace, - retry_delay: retry_delay, - retry_back_off_weight: retry_back_off_weight - } + retry_attempts: retry_back_off_grace + 100_000, + queue_settings: queue_settings ) expect(result).to eq(max_timeout) @@ -116,19 +96,15 @@ end context 'when retry_back_off_weight is added' do + let(:retry_back_off_weight) { 2 } + let(:max_retry_delay) { 1_000_000 } + it 'the backoff is multiplied' do retries_past_grace_period = 2 - retry_back_off_weight = 2 result = subject.call( - retry_attempts: retry_back_off_grace + retries_past_grace_period, - queue_settings: { - allow_retry_back_off: allow_retry_back_off, - max_retry_delay: 1_000_000, - retry_back_off_grace: retry_back_off_grace, - retry_delay: retry_delay, - retry_back_off_weight: retry_back_off_weight - } + retry_attempts: retry_back_off_grace + retries_past_grace_period, + queue_settings: queue_settings ) expect(result).to eq(ms_to_seconds(retry_delay) * retries_past_grace_period * retry_back_off_weight) @@ -143,15 +119,8 @@ retries_past_grace_period = 10 result = subject.call( - retry_attempts: retry_back_off_grace + retries_past_grace_period, - queue_settings: { - allow_retry_back_off: allow_retry_back_off, - allow_exponential_back_off: allow_exponential_back_off, - max_retry_delay: max_retry_delay, - retry_back_off_grace: retry_back_off_grace, - retry_delay: retry_delay, - retry_back_off_weight: retry_back_off_weight - } + retry_attempts: retry_back_off_grace + retries_past_grace_period, + queue_settings: queue_settings ) expect(result).to eq(ms_to_seconds(retry_delay) * 2 ** (retries_past_grace_period - 1)) @@ -162,14 +131,7 @@ result = subject.call( retry_attempts: retry_back_off_grace + retries_past_grace_period, - queue_settings: { - allow_retry_back_off: allow_retry_back_off, - allow_exponential_back_off: allow_exponential_back_off, - max_retry_delay: max_retry_delay, - retry_back_off_grace: retry_back_off_grace, - retry_delay: retry_delay, - retry_back_off_weight: retry_back_off_weight - } + queue_settings: queue_settings ) expect(result).to eq(ms_to_seconds(max_retry_delay)) From 2f01947875514aea16a787eddfb37e1623b34108 Mon Sep 17 00:00:00 2001 From: Florian Pilz Date: Mon, 11 Dec 2023 14:34:36 +0100 Subject: [PATCH 3/4] Extract calculating RabbitMQ retry delay to private method Because the `reject_message` method became way too long, extract calculating the retry delay to it's separate method and call it. This refactoring also fixes that the exponential backoff in RabbitMQ was not capped, as the check did re-calculate the condition as `retry_attempts * queue.retry_delay` instead of using the calculated `message_ttl` and therefore did not include exponential backoff. --- .../eventq_rabbitmq/rabbitmq_queue_worker.rb | 33 ++++++++++--------- 1 file changed, 18 insertions(+), 15 deletions(-) diff --git a/lib/eventq/eventq_rabbitmq/rabbitmq_queue_worker.rb b/lib/eventq/eventq_rabbitmq/rabbitmq_queue_worker.rb index 0da587d..b1f895e 100644 --- a/lib/eventq/eventq_rabbitmq/rabbitmq_queue_worker.rb +++ b/lib/eventq/eventq_rabbitmq/rabbitmq_queue_worker.rb @@ -78,21 +78,7 @@ def reject_message(channel, message, delivery_tag, retry_exchange, queue, abort) message.retry_attempts += 1 retry_attempts = message.retry_attempts - queue.retry_back_off_grace retry_attempts = 1 if retry_attempts < 1 - - if queue.allow_retry_back_off == true - message_ttl = if queue.allow_exponential_back_off - queue.retry_delay * 2 ** (retry_attempts - 1) - else - queue.retry_delay * retry_attempts - end - - if (retry_attempts * queue.retry_delay) > queue.max_retry_delay - EventQ.logger.debug { "[#{self.class}] - Max message back off retry delay reached." } - message_ttl = queue.max_retry_delay - end - else - message_ttl = queue.retry_delay - end + message_ttl = retry_delay(queue, retry_attempts) EventQ.logger.debug { "[#{self.class}] - Sending message for retry. Message TTL: #{message_ttl}" } retry_exchange.publish(serialize_message(message), :expiration => message_ttl) @@ -133,6 +119,23 @@ def process_message(payload, queue, channel, retry_exchange, delivery_tag, block raise "Unrecognized status: #{status}" end end + + def retry_delay(queue, retry_attempts) + return queue.retry_delay unless queue.allow_retry_back_off == true + + message_ttl = if queue.allow_exponential_back_off + queue.retry_delay * 2 ** (retry_attempts - 1) + else + queue.retry_delay * retry_attempts + end + + if message_ttl > queue.max_retry_delay + EventQ.logger.debug { "[#{self.class}] - Max message back off retry delay reached." } + message_ttl = queue.max_retry_delay + end + + message_ttl + end end end end From c659cf3f01ba61fcc7480ea16673060160df2691 Mon Sep 17 00:00:00 2001 From: Florian Pilz Date: Mon, 11 Dec 2023 14:56:45 +0100 Subject: [PATCH 4/4] Simplify guard logic to return default delay if backoff is disabled Co-authored-by: Guillermo Rodriguez --- lib/eventq/eventq_rabbitmq/rabbitmq_queue_worker.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/eventq/eventq_rabbitmq/rabbitmq_queue_worker.rb b/lib/eventq/eventq_rabbitmq/rabbitmq_queue_worker.rb index b1f895e..496b541 100644 --- a/lib/eventq/eventq_rabbitmq/rabbitmq_queue_worker.rb +++ b/lib/eventq/eventq_rabbitmq/rabbitmq_queue_worker.rb @@ -121,7 +121,7 @@ def process_message(payload, queue, channel, retry_exchange, delivery_tag, block end def retry_delay(queue, retry_attempts) - return queue.retry_delay unless queue.allow_retry_back_off == true + return queue.retry_delay unless queue.allow_retry_back_off message_ttl = if queue.allow_exponential_back_off queue.retry_delay * 2 ** (retry_attempts - 1)