Skip to content

Commit

Permalink
fix(test): guard counter in parallel
Browse files Browse the repository at this point in the history
Signed-off-by: Nguyen Khac Thanh <[email protected]>
  • Loading branch information
magiskboy committed Jan 16, 2025
1 parent a42d543 commit 2eba1c5
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 17 deletions.
14 changes: 8 additions & 6 deletions notify/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import (
type Notifier struct {
conf *config.KafkaConfig
logger *slog.Logger
partition int
Partition int
partitionMutex sync.Mutex
sendFunc func(ctx context.Context, msgs ...ckafka.Message) error
}
Expand Down Expand Up @@ -80,9 +80,9 @@ func New(c *config.KafkaConfig, l *slog.Logger, sendFunc *func(ctx context.Conte
}

if c.NumberOfPartition > 0 {
n.partition = c.NumberOfPartition
n.Partition = c.NumberOfPartition
} else {
n.partition = 1
n.Partition = 1
}

n.sendFunc = func(ctx context.Context, msgs ...ckafka.Message) error {
Expand All @@ -94,14 +94,16 @@ func New(c *config.KafkaConfig, l *slog.Logger, sendFunc *func(ctx context.Conte

// GetPartitionIndex returns the current partition index.
func (n *Notifier) GetPartitionIndex() int {
return n.partition
n.partitionMutex.Lock()
defer n.partitionMutex.Unlock()
return n.Partition
}

// NextPartition returns the next partition index.
func (n *Notifier) NextPartition() {
n.partitionMutex.Lock()
n.partition = (n.partition + 1) % n.conf.NumberOfPartition
n.partitionMutex.Unlock()
defer n.partitionMutex.Unlock()
n.Partition = (n.Partition + 1) % n.conf.NumberOfPartition
}

// Notify implements the Notifier interface.
Expand Down
20 changes: 9 additions & 11 deletions notify/kafka/kafka_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,7 @@ import (
)

func TestKafkaNotify(t *testing.T) {
var counter int
sendFunc := func(ctx context.Context, msgs ...ckafka.Message) error {
counter++
return nil
}

Expand All @@ -47,13 +45,18 @@ func TestKafkaNotify(t *testing.T) {

notifier.Notify(context.Background(), &types.Alert{})

require.Equal(t, 1, counter)
require.Equal(t, 0, notifier.Partition)
}

func TestKafkaNotifyRoundRobin(t *testing.T) {
var counter int
var (
counter int
counterMutex sync.Mutex
)
partitions := 2
sendFunc := func(ctx context.Context, msgs ...ckafka.Message) error {
counterMutex.Lock()
defer counterMutex.Unlock()
require.Equal(t, counter%partitions, msgs[0].Partition)
counter++
return nil
Expand All @@ -73,10 +76,6 @@ func TestKafkaNotifyRoundRobin(t *testing.T) {

var wg sync.WaitGroup

notifier.Notify(context.Background(), &types.Alert{})
notifier.Notify(context.Background(), &types.Alert{})
notifier.Notify(context.Background(), &types.Alert{})

wg.Add(1)
go func() {
defer wg.Done()
Expand All @@ -89,8 +88,7 @@ func TestKafkaNotifyRoundRobin(t *testing.T) {
notifier.Notify(context.Background(), &types.Alert{})
}()

notifier.Notify(context.Background(), &types.Alert{})
notifier.Notify(context.Background(), &types.Alert{})

wg.Wait()

require.Equal(t, partitions, counter)
}

0 comments on commit 2eba1c5

Please sign in to comment.