diff --git a/docker-compose.yml b/docker-compose.yml index c67cfd3..48645bd 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -51,7 +51,7 @@ services: localstack: # changes to multi region support after this version break tests - image: localstack/localstack:latest + image: localstack/localstack:stable container_name: localstack environment: - SQS_ENDPOINT_STRATEGY=off @@ -60,7 +60,7 @@ services: - "8085:8080" - "4566:4566" healthcheck: - test: if [ $$(curl -s -o /dev/null -w "%{http_code}" http://localhost:4566/health?reload) -ne 200 ]; then exit 1; fi + test: if [ $$(curl -s -o /dev/null -w "%{http_code}" http://localhost:4566/_localstack/health?reload) -ne 200 ]; then exit 1; fi interval: 10s timeout: 5s retries: 10 diff --git a/eventq.gemspec b/eventq.gemspec index f0750e3..9175746 100644 --- a/eventq.gemspec +++ b/eventq.gemspec @@ -23,6 +23,7 @@ Gem::Specification.new do |spec| spec.add_development_dependency 'rspec' spec.add_development_dependency 'shoulda-matchers' spec.add_development_dependency 'simplecov', '< 0.18.0' + spec.add_development_dependency 'debug' spec.add_dependency 'aws-sdk-core' spec.add_dependency 'aws-sdk-sns' @@ -33,4 +34,5 @@ Gem::Specification.new do |spec| spec.add_dependency 'oj' spec.add_dependency 'openssl' spec.add_dependency 'redlock' + spec.add_dependency 'connection_pool' end diff --git a/lib/eventq/eventq_base/nonce_manager.rb b/lib/eventq/eventq_base/nonce_manager.rb index 12c807c..7c9b0e9 100644 --- a/lib/eventq/eventq_base/nonce_manager.rb +++ b/lib/eventq/eventq_base/nonce_manager.rb @@ -1,10 +1,12 @@ module EventQ class NonceManager - def self.configure(server:,timeout:10000,lifespan:3600) + def self.configure(server:,timeout:10000,lifespan:3600, pool_size: 5, pool_timeout: 5) @server_url = server @timeout = timeout @lifespan = lifespan + @pool_size = pool_size + @pool_timeout = pool_timeout end def self.server_url @@ -19,33 +21,51 @@ def self.lifespan @lifespan end - def self.is_allowed?(nonce) - if @server_url == nil - return true + def self.pool_size + @pool_size + end + + def self.pool_timeout + @pool_timeout + end + + def self.lock(nonce) + # act as if successfully locked if not nonce manager configured - makes it a no-op + return true if not_configured? + + successfully_locked = false + with_redis_connection do |conn| + successfully_locked = conn.set(nonce, 1, ex: lifespan, nx: true) end - require 'redlock' - lock = Redlock::Client.new([ @server_url ]).lock(nonce, @timeout) - if lock == false + if !successfully_locked EventQ.log(:info, "[#{self.class}] - Message has already been processed: #{nonce}") - return false end - return true + successfully_locked end + # if the message was successfully procesed, lock for another lifespan length + # so it isn't reprocessed def self.complete(nonce) - if @server_url != nil - Redis.new(url: @server_url).expire(nonce, @lifespan) + return true if not_configured? + + with_redis_connection do |conn| + conn.expire(nonce, lifespan) end - return true + + true end + # if it failed, unlock immediately so that retries can kick in def self.failed(nonce) - if @server_url != nil - Redis.new(url: @server_url).del(nonce) + return true if not_configured? + + with_redis_connection do |conn| + conn.del(nonce) end - return true + + true end def self.reset @@ -53,5 +73,28 @@ def self.reset @timeout = nil @lifespan = nil end + + private + + def self.with_redis_connection + redis_pool.with do |conn| + yield conn + end + end + + def self.redis_pool + @redis_pool ||= begin + require 'connection_pool' + require 'redis' + + ConnectionPool.new(size: @pool_size, timeout: @pool_timeout) do + Redis.new(url: @server_url) + end + end + end + + def self.not_configured? + @server_url.nil? || @server_url.empty? + end end end diff --git a/lib/eventq/queue_worker.rb b/lib/eventq/queue_worker.rb index a61b730..1f27f14 100644 --- a/lib/eventq/queue_worker.rb +++ b/lib/eventq/queue_worker.rb @@ -127,7 +127,7 @@ def process_message(block, message, retry_attempts, acceptance_args) EventQ.logger.debug("[#{self.class}] - Message received. Id: #{message.id}. Retry Attempts: #{retry_attempts}") - if (!EventQ::NonceManager.is_allowed?(message.id)) + if (!EventQ::NonceManager.lock(message.id)) EventQ.logger.warn("[#{self.class}] - Duplicate Message received. Id: #{message.id}. Ignoring message.") status = :duplicate return status, message_args diff --git a/spec/eventq_base/nonce_manager_spec.rb b/spec/eventq_base/nonce_manager_spec.rb index 02409b2..17fdc5d 100644 --- a/spec/eventq_base/nonce_manager_spec.rb +++ b/spec/eventq_base/nonce_manager_spec.rb @@ -6,13 +6,17 @@ let(:server_url) { 'redis://server:6379' } let(:timeout) { 5000 } let(:lifespan) { 20000} + let(:pool_size) { 1 } + let(:pool_timeout) { 1 } context 'when all values are specified' do it 'should set the configuration values correctly' do - described_class.configure(server: server_url, timeout: timeout, lifespan: lifespan) + described_class.configure(server: server_url, timeout: timeout, lifespan: lifespan, pool_size: pool_size, pool_timeout: pool_timeout) expect(described_class.server_url).to eq server_url expect(described_class.timeout).to eq timeout expect(described_class.lifespan).to eq lifespan + expect(described_class.pool_size).to eq pool_size + expect(described_class.pool_timeout).to eq pool_timeout end end @@ -22,6 +26,8 @@ expect(described_class.server_url).to eq server_url expect(described_class.timeout).to eq 10000 expect(described_class.lifespan).to eq 3600 + expect(described_class.pool_size).to eq 5 + expect(described_class.pool_timeout).to eq 5 end end @@ -31,7 +37,7 @@ end - describe '#is_allowed?' do + describe '#lock' do let(:nonce) { SecureRandom.uuid } context 'when NonceManager has been configured' do @@ -40,13 +46,13 @@ end context 'when a nonce has NOT been used' do it 'should return true' do - expect(described_class.is_allowed?(nonce)).to be true + expect(described_class.lock(nonce)).to be true end end context 'when a nonce has already been used' do it 'should return false' do - described_class.is_allowed?(nonce) - expect(described_class.is_allowed?(nonce)).to be false + described_class.lock(nonce) + expect(described_class.lock(nonce)).to be false end end after do @@ -60,15 +66,20 @@ end context 'when a nonce has NOT been used' do it 'should return true' do - expect(described_class.is_allowed?(nonce)).to be true + expect(described_class.lock(nonce)).to be true end end context 'when a nonce has already been used' do it 'should return false' do - described_class.is_allowed?(nonce) - expect(described_class.is_allowed?(nonce)).to be true + described_class.lock(nonce) + expect(described_class.lock(nonce)).to be true end end + + it 'should not attempt to hit redis' do + expect_any_instance_of(Redis).not_to receive(:set) + described_class.lock(nonce) + end end end @@ -78,7 +89,7 @@ context 'when NonceManager has been configured' do before do described_class.configure(server: ENV.fetch('REDIS_ENDPOINT', 'redis://redis:6379')) - described_class.is_allowed?(nonce) + described_class.lock(nonce) end it 'should extend the expiry of the nonce key' do expect(described_class.complete(nonce)).to eq true @@ -103,7 +114,7 @@ context 'when NonceManager has been configured' do before do described_class.configure(server: ENV.fetch('REDIS_ENDPOINT', 'redis://redis:6379')) - described_class.is_allowed?(nonce) + described_class.lock(nonce) end it 'should extend the expiry of the nonce key' do expect(described_class.failed(nonce)).to eq true