Skip to content

Commit

Permalink
Upgrade localstack version and remove pinned dependencies (#107)
Browse files Browse the repository at this point in the history
# Upgrade localstack version and remove pinned dependencies
In commit bdfb0d6 we pinned AWS
dependencies, since we weren't sure if `eventq` was ready for upgrading
them. Also they dropped Ruby 2.5 support. Since we have moved to have no
pinned dependencies if possible, each environment can choose the best
and most up to date dependencies available, so we can keep the Ruby 2.5
support and also ensure using the latest AWS libraries in Ruby 3.

Also we removed the fixed localstack version, as in version 1.4.0 they
addressed the multiregion support.

# Refactor AWS integration specs
Upgrading our localstack version showed how brittle our integration
tests are, because newer localstack versions react a bit slower (read:
100+ ms to pick up a message instead of 10+ ms). Since we used `sleep`
statements a ton to wait "just the right amount of time" to exceed the
TTL, specs became flaky or consistently failed because the worker was
picking them up later than intended.

Looking deeper into it showed that our approach was flawed from the get
go. While we name the variable in specs `retry_attempt_count` it is
actually a `call_count`. Therefore all `sleeps` were off by one
iteration. In the past this created the following patterns in specs:
* 0.0s Boot worker
* 0.1s `sleep 1` to let worker pick up message
* 0.2s Worker processes message
* 1.1s Check message has been processed
* 1.1s `sleep 1` to wait for retry
* 1.2s Worker processes retry

Sometimes there was just 10 ms between assertion and worker processing
the message. Since the worker is on a thread, this could have failed
with older localstack versions as well. With the newer and slower
localstack version most specs looked like this:
* 0.0s Boot worker
* 0.1s `sleep 1` to let worker pick up message
* 0.8s Worker processes message
* 1.1s Check message has been processed
* 1.1s `sleep 1` to wait for retry
* 2.0s Worker processes retry
* 2.1s `sleep 1` to wait for 2nd retry
* 3.1s Check for 2nd retry fails as worker has not picked up 2nd retry yet

The flaw here is that `visibility_timeout` does not tell when the
message is picked up again, but for how long it is NOT PICKED UP again.
So while we need to _at least_ wait for the `visibility_timeout` in
specs, we cannot know when the worker actually comes around to pick the
message up again. If we were using bigger numbers, like minutes for the
`visibility_timeout` rather than seconds, the 0.1 - 1.9 seconds
sometimes needed to pick up a message would not matter, but also would
slow down specs to a halt.

The second flaw is that we are basically testing libraries here. We
should trust AWS to ensure messages are not picked up for the
`visibility_timeout` we set. Instead we should only test if the right
`visibility_timeout` numbers are calculated for the libraries to use.

As a result we have adjusted the specs to use a `visibility_timeout` of
zero whenever possible to speed up specs. However, we note down the
calculated number to ensure the right one is normally passed on to AWS.
This allows us to use the exponential backoff strategy, without waiting
expontentially long.

In order to remove the need for `sleep` in specs, as the operation to
test is run asynchroneously, we introduced a thread based queue to wait
for results on. This way, as soon as a result is pushed to the queue in
another thread, the thread waiting for results immediately picks it up
and continues running the spec and assertions. By adding a decorator
around a method that is invoked after each time a message was processed,
we can simply say `wait_for_message_processed` to ensure the spec wait
just long enough to continue.

To test `visibility_timeout` we can now use
`aws_visibility_timeout_queue`, which gets pushed the
`visibility_timeout` calculated for each call, e.g.
```
expect(aws_visibility_timeout_queue.pop).to eq(call: 5, visibility_timeout: 5)
```

Using these techniques we sped up specs considerably, made them much
more reliable and still kept the integration part as real AWS libraries
are used. Also we can still integrate with a real AWS account instead of
using `localstack`.

# Set log level to FATAL during spec runs
Since the logger is printing messages to STDOUT, all DEBUG, INFO and
ERROR logs produced during tests are included in the test output. Since
we rely heavily on failing to process a message on purpose, we have
littered the output with ERROR logs and traces as well. By setting the
log level to FATAL during spec runs, we ensure that we only see the spec
output during test runs.

---------

Co-authored-by: Florian Pilz <[email protected]>
  • Loading branch information
guille-sage and florianpilz authored Jan 10, 2024
1 parent f47b8c7 commit 37bfbe3
Show file tree
Hide file tree
Showing 8 changed files with 109 additions and 185 deletions.
8 changes: 3 additions & 5 deletions .github/workflows/rspec.yml
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,10 @@ jobs:
--health-timeout 5s
--health-retries 10
localstack:
# changes to multi region support after this version break tests
image: localstack/localstack:0.12.16
image: localstack/localstack:latest
env:
SERVICES: sqs,sns
HOSTNAME: localhost
HOSTNAME_EXTERNAL: localhost
SQS_ENDPOINT_STRATEGY: off
LOCALSTACK_HOST: localhost
ports:
- "8085:8080"
- "4566:4566"
Expand Down
7 changes: 3 additions & 4 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -51,12 +51,11 @@ services:

localstack:
# changes to multi region support after this version break tests
image: localstack/localstack:0.12.16
image: localstack/localstack:latest
container_name: localstack
environment:
- SERVICES=sqs,sns
- HOSTNAME=localstack
- HOSTNAME_EXTERNAL=localstack
- SQS_ENDPOINT_STRATEGY=off
- LOCALSTACK_HOST=localstack
ports:
- "8085:8080"
- "4566:4566"
Expand Down
6 changes: 3 additions & 3 deletions eventq.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@ Gem::Specification.new do |spec|
spec.add_development_dependency 'shoulda-matchers'
spec.add_development_dependency 'simplecov', '< 0.18.0'

spec.add_dependency 'aws-sdk-sqs', '~> 1.65.0'
spec.add_dependency 'aws-sdk-sns', '~> 1.68.0'
spec.add_dependency 'aws-sdk-core', '3.187.0'
spec.add_dependency 'aws-sdk-core'
spec.add_dependency 'aws-sdk-sns'
spec.add_dependency 'aws-sdk-sqs'
spec.add_dependency 'bunny'
spec.add_dependency 'class_kit'
spec.add_dependency 'concurrent-ruby'
Expand Down
108 changes: 27 additions & 81 deletions spec/eventq_aws/integration/aws_queue_worker_spec.rb
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
require 'spec_helper'

RSpec.describe EventQ::Amazon::QueueWorker, integration: true do
include_context 'mock_aws_visibility_timeout'
include_context 'aws_wait_for_message_processed_helper'

let(:queue_worker) { EventQ::QueueWorker.new }

let(:queue_client) do
Expand Down Expand Up @@ -116,9 +119,6 @@
received = false
context = nil

# wait 1 second to allow the message to be sent and broadcast to the queue
sleep(1)

queue_worker.start(subscriber_queue, worker_adapter: subject, thread_count: 1, block_process: false, client: queue_client, wait: false) do |event, args|
expect(event).to eq(message)
expect(args).to be_a(EventQ::MessageArgs)
Expand All @@ -127,12 +127,12 @@
EventQ.logger.debug { "Message Received: #{event}" }
end

sleep(2)
wait_for_message_processed

queue_worker.stop
expect(received).to eq(true)
expect(context).to eq message_context

queue_worker.stop
expect(queue_worker.is_running).to eq(false)
end

Expand All @@ -143,9 +143,6 @@
received = false
context = nil

# wait 1 second to allow the message to be sent and broadcast to the queue
sleep(1)

queue_worker.start(
subscriber_queue,
worker_adapter: subject,
Expand All @@ -161,12 +158,12 @@
EventQ.logger.debug { "Message Received: #{event}" }
end

sleep(2)
wait_for_message_processed

queue_worker.stop
expect(received).to eq(true)
expect(context).to eq message_context

queue_worker.stop
expect(queue_worker.is_running).to eq(false)
end

Expand All @@ -185,22 +182,18 @@

received = false

# wait 1 second to allow the message to be sent and broadcast to the queue
sleep(1)

queue_worker.start(subscriber_queue, worker_adapter: subject, wait: false, block_process: false, sleep: 1, thread_count: 1, client: queue_client) do |event, args|
expect(event).to eq(message)
expect(args).to be_a(EventQ::MessageArgs)
received = true
EventQ.logger.debug { "Message Received: #{event}" }
end

sleep(2)

queue_worker.stop
wait_for_message_processed

expect(received).to eq(true)

queue_worker.stop
expect(queue_worker.is_running).to eq(false)
end
end
Expand All @@ -216,22 +209,18 @@

received = false

# wait 1 second to allow the message to be sent and broadcast to the queue
sleep(1)

queue_worker.start(subscriber_queue, worker_adapter: subject, wait: false, block_process: false, sleep: 1, thread_count: 1, client: queue_client) do |event, args|
expect(event).to eq(message)
expect(args).to be_a(EventQ::MessageArgs)
received = true
EventQ.logger.debug { "Message Received: #{event}" }
end

sleep(2)

queue_worker.stop
wait_for_message_processed

expect(received).to eq(true)

queue_worker.stop
expect(queue_worker.is_running).to eq(false)
end
end
Expand All @@ -248,9 +237,6 @@
received_count = 0
received_attribute = 0

# wait 1 second to allow the message to be sent and broadcast to the queue
sleep(1)

queue_worker.start(subscriber_queue, worker_adapter: subject, wait: false, block_process: false, sleep: 1, thread_count: 1, client: queue_client) do |event, args|
expect(event).to eq(message)
expect(args).to be_a(EventQ::MessageArgs)
Expand All @@ -261,13 +247,13 @@
args.abort = true if received_count != 2
end

sleep(4)

queue_worker.stop
2.times { wait_for_message_processed }

expect(received).to eq(true)
expect(received_count).to eq(2)
expect(received_attribute).to eq(1)

queue_worker.stop
expect(queue_worker.is_running).to eq(false)
end

Expand Down Expand Up @@ -296,7 +282,7 @@
end
end

sleep(5)
10.times { wait_for_message_processed }

expect(message_count).to eq(10)
expect(received_messages.length).to eq(5)
Expand All @@ -307,7 +293,6 @@
expect(received_messages[4][:events]).to be >= 1

queue_worker.stop

expect(queue_worker.is_running).to eq(false)
end

Expand All @@ -333,71 +318,40 @@
end

it 'should receive an event from the subscriber queue and retry it' do
retry_attempt_count = 0

# wait 1 second to allow the message to be sent and broadcast to the queue
sleep(1)

queue_worker.start(subscriber_queue, worker_adapter: subject, wait: false, block_process: false, sleep: 1, thread_count: 1, client: queue_client) do |event, args|
expect(event).to eq(message)
expect(args).to be_a(EventQ::MessageArgs)
retry_attempt_count = args.retry_attempts + 1
raise 'Fail on purpose to send event to retry queue.'
end

sleep(1)

expect(retry_attempt_count).to eq(1)

sleep(2)

expect(retry_attempt_count).to eq(2)

sleep(3)

expect(retry_attempt_count).to eq(3)

sleep(4)

expect(retry_attempt_count).to eq(4)
expect(aws_visibility_timeout_queue.pop).to eq(call: 1, visibility_timeout: 1)
expect(aws_visibility_timeout_queue.pop).to eq(call: 2, visibility_timeout: 2)
expect(aws_visibility_timeout_queue.pop).to eq(call: 3, visibility_timeout: 3)
expect(aws_visibility_timeout_queue.pop).to eq(call: 4, visibility_timeout: 4)
expect(aws_visibility_timeout_queue.pop).to eq(call: 5, visibility_timeout: 5)

queue_worker.stop

expect(queue_worker.is_running).to eq(false)
end

context 'queue.allow_exponential_back_off = true' do
let(:max_retry_delay) { 10_000 }
let(:max_retry_delay) { 20_000 }
let(:allow_exponential_back_off) { true }

it 'retries received event with an exponential waiting period' do
retry_attempt_count = 0

# wait 1 second to allow the message to be sent and broadcast to the queue
sleep(1)

queue_worker.start(subscriber_queue, worker_adapter: subject, wait: false, block_process: false, sleep: 1, thread_count: 1, client: queue_client) do |event, args|
expect(event).to eq(message)
expect(args).to be_a(EventQ::MessageArgs)
retry_attempt_count = args.retry_attempts + 1
raise 'Fail on purpose to send event to retry queue.'
end

sleep(1)

expect(retry_attempt_count).to eq(1)

sleep(2)

expect(retry_attempt_count).to eq(2)

sleep(4)

expect(retry_attempt_count).to eq(3)

sleep(8)

expect(retry_attempt_count).to eq(4)
expect(aws_visibility_timeout_queue.pop).to eq(call: 1, visibility_timeout: 1)
expect(aws_visibility_timeout_queue.pop).to eq(call: 2, visibility_timeout: 2)
expect(aws_visibility_timeout_queue.pop).to eq(call: 3, visibility_timeout: 4)
expect(aws_visibility_timeout_queue.pop).to eq(call: 4, visibility_timeout: 8)
expect(aws_visibility_timeout_queue.pop).to eq(call: 5, visibility_timeout: 16)

queue_worker.stop

Expand All @@ -416,21 +370,13 @@
end

it 'retries after half the retry delay has passed' do
retry_attempt_count = 0

# wait 1 second to allow the message to be sent and broadcast to the queue
sleep(1)

queue_worker.start(subscriber_queue, worker_adapter: subject, wait: false, block_process: false, sleep: 0.5, thread_count: 1, client: queue_client) do |event, args|
expect(event).to eq(message)
expect(args).to be_a(EventQ::MessageArgs)
retry_attempt_count = args.retry_attempts + 1
raise 'Fail on purpose to send event to retry queue.'
end

sleep(3)

expect(retry_attempt_count).to eq(2)
expect(aws_visibility_timeout_queue.pop).to eq(call: 1, visibility_timeout: 2)

queue_worker.stop

Expand Down
Loading

0 comments on commit 37bfbe3

Please sign in to comment.