diff --git a/mirror_maker.go b/mirror_maker.go index db6ffef..02780db 100644 --- a/mirror_maker.go +++ b/mirror_maker.go @@ -157,7 +157,8 @@ func (this *MirrorMaker) startConsumers() { if err != nil { panic(err) } - config.NumWorkers = 1 + //Let the NumWorkers be set via consumer configs + //config.NumWorkers = 1 config.AutoOffsetReset = SmallestOffset config.Coordinator = NewZookeeperCoordinator(zkConfig) config.WorkerFailureCallback = func(_ *WorkerManager) FailedDecision { diff --git a/producer.go b/producer.go index 5d5ce9c..b3d1dce 100644 --- a/producer.go +++ b/producer.go @@ -176,9 +176,12 @@ func (this *FixedPartitioner) Partition(key []byte, numPartitions int32) (int32, if key == nil { panic("FixedPartitioner does not work without keys.") } - partition, err := binary.ReadUvarint(bytes.NewBuffer(key)) - if err != nil { - return -1, err + + var partition int32 + buf := bytes.NewBuffer(key) + binary.Read(buf, binary.LittleEndian, &partition) + if (partition < 0) { + return -1, errors.New("Partition turned to be -1 (too big to be int32 little endian?)") } return int32(partition) % numPartitions, nil