diff --git a/pkg/rewardsCalculatorQueue/handler.go b/pkg/rewardsCalculatorQueue/handler.go index 05ab9350..a62eb8c4 100644 --- a/pkg/rewardsCalculatorQueue/handler.go +++ b/pkg/rewardsCalculatorQueue/handler.go @@ -10,12 +10,23 @@ func (rcq *RewardsCalculatorQueue) Process() { return case msg := <-rcq.queue: rcq.logger.Sugar().Infow("Processing rewards calculation message", "data", msg.Data) - rcq.processMessage(msg) + response := rcq.processMessage(msg) + + if msg.ResponseChan != nil { + select { + case msg.ResponseChan <- response: + rcq.logger.Sugar().Infow("Sent rewards calculation response", "data", msg.Data) + default: + rcq.logger.Sugar().Infow("No receiver for response, dropping", "data", msg.Data) + } + } else { + rcq.logger.Sugar().Infow("No response channel, dropping response", "data", msg.Data) + } } } } -func (rcq *RewardsCalculatorQueue) processMessage(msg *RewardsCalculationMessage) { +func (rcq *RewardsCalculatorQueue) processMessage(msg *RewardsCalculationMessage) *RewardsCalculatorResponse { response := &RewardsCalculatorResponse{} cutoffDate := msg.Data.CutoffDate @@ -42,9 +53,5 @@ func (rcq *RewardsCalculatorQueue) processMessage(msg *RewardsCalculationMessage default: response.Error = fmt.Errorf("unknown calculation type %s", msg.Data.CalculationType) } - if msg.ResponseChan == nil { - rcq.logger.Sugar().Errorw("No response channel for rewards calculation message", "data", msg.Data) - return - } - msg.ResponseChan <- response + return response } diff --git a/pkg/rewardsCalculatorQueue/queue.go b/pkg/rewardsCalculatorQueue/queue.go index eff015bf..b4416855 100644 --- a/pkg/rewardsCalculatorQueue/queue.go +++ b/pkg/rewardsCalculatorQueue/queue.go @@ -13,6 +13,7 @@ func NewRewardsCalculatorQueue(rc *rewards.RewardsCalculator, logger *zap.Logger rewardsCalculator: rc, // allow the queue to buffer up to 100 messages queue: make(chan *RewardsCalculationMessage, 100), + done: make(chan struct{}), } return queue } @@ -25,7 +26,7 @@ func (rcq *RewardsCalculatorQueue) Enqueue(payload *RewardsCalculationMessage) { // EnqueueAndWait adds a new message to the queue and waits for a response or returns if the context is done func (rcq *RewardsCalculatorQueue) EnqueueAndWait(ctx context.Context, data RewardsCalculationData) (*RewardsCalculatorResponseData, error) { - responseChan := make(chan *RewardsCalculatorResponse) + responseChan := make(chan *RewardsCalculatorResponse, 1) payload := &RewardsCalculationMessage{ Data: data,