Skip to content

Commit

Permalink
fix connection pooling for nonce manager
Browse files Browse the repository at this point in the history
  • Loading branch information
LyleDavis committed Jan 17, 2025
1 parent 78a824f commit 7c14af3
Show file tree
Hide file tree
Showing 5 changed files with 84 additions and 28 deletions.
4 changes: 2 additions & 2 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ services:

localstack:
# changes to multi region support after this version break tests
image: localstack/localstack:latest
image: localstack/localstack:stable
container_name: localstack
environment:
- SQS_ENDPOINT_STRATEGY=off
Expand All @@ -60,7 +60,7 @@ services:
- "8085:8080"
- "4566:4566"
healthcheck:
test: if [ $$(curl -s -o /dev/null -w "%{http_code}" http://localhost:4566/health?reload) -ne 200 ]; then exit 1; fi
test: if [ $$(curl -s -o /dev/null -w "%{http_code}" http://localhost:4566/_localstack/health?reload) -ne 200 ]; then exit 1; fi
interval: 10s
timeout: 5s
retries: 10
2 changes: 2 additions & 0 deletions eventq.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ Gem::Specification.new do |spec|
spec.add_development_dependency 'rspec'
spec.add_development_dependency 'shoulda-matchers'
spec.add_development_dependency 'simplecov', '< 0.18.0'
spec.add_development_dependency 'debug'

spec.add_dependency 'aws-sdk-core'
spec.add_dependency 'aws-sdk-sns'
Expand All @@ -33,4 +34,5 @@ Gem::Specification.new do |spec|
spec.add_dependency 'oj'
spec.add_dependency 'openssl'
spec.add_dependency 'redlock'
spec.add_dependency 'connection_pool'
end
73 changes: 58 additions & 15 deletions lib/eventq/eventq_base/nonce_manager.rb
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
module EventQ
class NonceManager

def self.configure(server:,timeout:10000,lifespan:3600)
def self.configure(server:,timeout:10000,lifespan:3600, pool_size: 5, pool_timeout: 5)
@server_url = server
@timeout = timeout
@lifespan = lifespan
@pool_size = pool_size
@pool_timeout = pool_timeout
end

def self.server_url
Expand All @@ -19,39 +21,80 @@ def self.lifespan
@lifespan
end

def self.is_allowed?(nonce)
if @server_url == nil
return true
def self.pool_size
@pool_size
end

def self.pool_timeout
@pool_timeout
end

def self.lock(nonce)
# act as if successfully locked if not nonce manager configured - makes it a no-op
return true if not_configured?

successfully_locked = false
with_redis_connection do |conn|
successfully_locked = conn.set(nonce, 1, ex: lifespan, nx: true)
end

require 'redlock'
lock = Redlock::Client.new([ @server_url ]).lock(nonce, @timeout)
if lock == false
if !successfully_locked
EventQ.log(:info, "[#{self.class}] - Message has already been processed: #{nonce}")
return false
end

return true
successfully_locked
end

# if the message was successfully procesed, lock for another lifespan length
# so it isn't reprocessed
def self.complete(nonce)
if @server_url != nil
Redis.new(url: @server_url).expire(nonce, @lifespan)
return true if not_configured?

with_redis_connection do |conn|
conn.expire(nonce, lifespan)
end
return true

true
end

# if it failed, unlock immediately so that retries can kick in
def self.failed(nonce)
if @server_url != nil
Redis.new(url: @server_url).del(nonce)
return true if not_configured?

with_redis_connection do |conn|
conn.del(nonce)
end
return true

true
end

def self.reset
@server_url = nil
@timeout = nil
@lifespan = nil
end

private

def self.with_redis_connection
redis_pool.with do |conn|
yield conn
end
end

def self.redis_pool
@redis_pool ||= begin
require 'connection_pool'
require 'redis'

ConnectionPool.new(size: @pool_size, timeout: @pool_timeout) do
Redis.new(url: @server_url)
end
end
end

def self.not_configured?
@server_url.nil? || @server_url.empty?
end
end
end
2 changes: 1 addition & 1 deletion lib/eventq/queue_worker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ def process_message(block, message, retry_attempts, acceptance_args)

EventQ.logger.debug("[#{self.class}] - Message received. Id: #{message.id}. Retry Attempts: #{retry_attempts}")

if (!EventQ::NonceManager.is_allowed?(message.id))
if (!EventQ::NonceManager.lock(message.id))
EventQ.logger.warn("[#{self.class}] - Duplicate Message received. Id: #{message.id}. Ignoring message.")
status = :duplicate
return status, message_args
Expand Down
31 changes: 21 additions & 10 deletions spec/eventq_base/nonce_manager_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,17 @@
let(:server_url) { 'redis://server:6379' }
let(:timeout) { 5000 }
let(:lifespan) { 20000}
let(:pool_size) { 1 }
let(:pool_timeout) { 1 }

context 'when all values are specified' do
it 'should set the configuration values correctly' do
described_class.configure(server: server_url, timeout: timeout, lifespan: lifespan)
described_class.configure(server: server_url, timeout: timeout, lifespan: lifespan, pool_size: pool_size, pool_timeout: pool_timeout)
expect(described_class.server_url).to eq server_url
expect(described_class.timeout).to eq timeout
expect(described_class.lifespan).to eq lifespan
expect(described_class.pool_size).to eq pool_size
expect(described_class.pool_timeout).to eq pool_timeout
end
end

Expand All @@ -22,6 +26,8 @@
expect(described_class.server_url).to eq server_url
expect(described_class.timeout).to eq 10000
expect(described_class.lifespan).to eq 3600
expect(described_class.pool_size).to eq 5
expect(described_class.pool_timeout).to eq 5
end
end

Expand All @@ -31,7 +37,7 @@

end

describe '#is_allowed?' do
describe '#lock' do
let(:nonce) { SecureRandom.uuid }

context 'when NonceManager has been configured' do
Expand All @@ -40,13 +46,13 @@
end
context 'when a nonce has NOT been used' do
it 'should return true' do
expect(described_class.is_allowed?(nonce)).to be true
expect(described_class.lock(nonce)).to be true
end
end
context 'when a nonce has already been used' do
it 'should return false' do
described_class.is_allowed?(nonce)
expect(described_class.is_allowed?(nonce)).to be false
described_class.lock(nonce)
expect(described_class.lock(nonce)).to be false
end
end
after do
Expand All @@ -60,15 +66,20 @@
end
context 'when a nonce has NOT been used' do
it 'should return true' do
expect(described_class.is_allowed?(nonce)).to be true
expect(described_class.lock(nonce)).to be true
end
end
context 'when a nonce has already been used' do
it 'should return false' do
described_class.is_allowed?(nonce)
expect(described_class.is_allowed?(nonce)).to be true
described_class.lock(nonce)
expect(described_class.lock(nonce)).to be true
end
end

it 'should not attempt to hit redis' do
expect_any_instance_of(Redis).not_to receive(:set)
described_class.lock(nonce)
end
end

end
Expand All @@ -78,7 +89,7 @@
context 'when NonceManager has been configured' do
before do
described_class.configure(server: ENV.fetch('REDIS_ENDPOINT', 'redis://redis:6379'))
described_class.is_allowed?(nonce)
described_class.lock(nonce)
end
it 'should extend the expiry of the nonce key' do
expect(described_class.complete(nonce)).to eq true
Expand All @@ -103,7 +114,7 @@
context 'when NonceManager has been configured' do
before do
described_class.configure(server: ENV.fetch('REDIS_ENDPOINT', 'redis://redis:6379'))
described_class.is_allowed?(nonce)
described_class.lock(nonce)
end
it 'should extend the expiry of the nonce key' do
expect(described_class.failed(nonce)).to eq true
Expand Down

0 comments on commit 7c14af3

Please sign in to comment.