Skip to content

Commit

Permalink
fix: refactor queue handler to prevent blocking (#170)
Browse files Browse the repository at this point in the history
  • Loading branch information
seanmcgary authored Dec 20, 2024
2 parents 331a8b2 + 17b31ea commit f90bc85
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 8 deletions.
21 changes: 14 additions & 7 deletions pkg/rewardsCalculatorQueue/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,23 @@ func (rcq *RewardsCalculatorQueue) Process() {
return
case msg := <-rcq.queue:
rcq.logger.Sugar().Infow("Processing rewards calculation message", "data", msg.Data)
rcq.processMessage(msg)
response := rcq.processMessage(msg)

if msg.ResponseChan != nil {
select {
case msg.ResponseChan <- response:
rcq.logger.Sugar().Infow("Sent rewards calculation response", "data", msg.Data)
default:
rcq.logger.Sugar().Infow("No receiver for response, dropping", "data", msg.Data)
}
} else {
rcq.logger.Sugar().Infow("No response channel, dropping response", "data", msg.Data)
}
}
}
}

func (rcq *RewardsCalculatorQueue) processMessage(msg *RewardsCalculationMessage) {
func (rcq *RewardsCalculatorQueue) processMessage(msg *RewardsCalculationMessage) *RewardsCalculatorResponse {
response := &RewardsCalculatorResponse{}
cutoffDate := msg.Data.CutoffDate

Expand All @@ -42,9 +53,5 @@ func (rcq *RewardsCalculatorQueue) processMessage(msg *RewardsCalculationMessage
default:
response.Error = fmt.Errorf("unknown calculation type %s", msg.Data.CalculationType)
}
if msg.ResponseChan == nil {
rcq.logger.Sugar().Errorw("No response channel for rewards calculation message", "data", msg.Data)
return
}
msg.ResponseChan <- response
return response
}
3 changes: 2 additions & 1 deletion pkg/rewardsCalculatorQueue/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ func NewRewardsCalculatorQueue(rc *rewards.RewardsCalculator, logger *zap.Logger
rewardsCalculator: rc,
// allow the queue to buffer up to 100 messages
queue: make(chan *RewardsCalculationMessage, 100),
done: make(chan struct{}),
}
return queue
}
Expand All @@ -25,7 +26,7 @@ func (rcq *RewardsCalculatorQueue) Enqueue(payload *RewardsCalculationMessage) {

// EnqueueAndWait adds a new message to the queue and waits for a response or returns if the context is done
func (rcq *RewardsCalculatorQueue) EnqueueAndWait(ctx context.Context, data RewardsCalculationData) (*RewardsCalculatorResponseData, error) {
responseChan := make(chan *RewardsCalculatorResponse)
responseChan := make(chan *RewardsCalculatorResponse, 1)

payload := &RewardsCalculationMessage{
Data: data,
Expand Down

0 comments on commit f90bc85

Please sign in to comment.