diff --git a/consumer.go b/consumer.go index eb84bc2..14a2e72 100644 --- a/consumer.go +++ b/consumer.go @@ -55,6 +55,7 @@ type Consumer struct { stopStreams chan bool close chan bool stopCleanup chan struct{} + wg sync.WaitGroup topicCount TopicsToNumStreams metrics *ConsumerMetrics @@ -869,7 +870,10 @@ func (c *Consumer) reflectPartitionOwnershipDecision(partitionOwnershipDecision successfullyOwnedPartitions := make([]*TopicAndPartition, 0) successChan := make(chan TopicAndPartition) + go func() { + c.wg.Add(1) + defer c.wg.Done() for tp := range successChan { successfullyOwnedPartitions = append(successfullyOwnedPartitions, &tp) } @@ -881,6 +885,7 @@ func (c *Consumer) reflectPartitionOwnershipDecision(partitionOwnershipDecision pool.Stop() close(successChan) + c.wg.Wait() if len(partitionOwnershipDecision) > len(successfullyOwnedPartitions) { if Logger.IsAllowed(WarnLevel) {