From f97837195f7fca92aa7247ca22f0e8e9e01a6234 Mon Sep 17 00:00:00 2001 From: Dean Chen Date: Thu, 25 Dec 2014 23:37:42 -0800 Subject: [PATCH] Fix bug causing random partitions with partition key Fixes #22 Array[Byte] does not have the identical hashcode even given identical contents. List[Byte] has the desired hashcode property at the cost of some performance. --- src/main/scala/KafkaProducer.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/main/scala/KafkaProducer.scala b/src/main/scala/KafkaProducer.scala index 9c59ac5..f3b7c40 100644 --- a/src/main/scala/KafkaProducer.scala +++ b/src/main/scala/KafkaProducer.scala @@ -93,7 +93,7 @@ case class KafkaProducer( val producer = new Producer[AnyRef, AnyRef](new ProducerConfig(props)) - def kafkaMesssage(message: Array[Byte], partition: Array[Byte]): KeyedMessage[AnyRef, AnyRef] = { + def kafkaMesssage(message: List[Byte], partition: List[Byte]): KeyedMessage[AnyRef, AnyRef] = { if (partition == null) { new KeyedMessage(topic,message) } else { @@ -101,9 +101,9 @@ case class KafkaProducer( } } - def send(message: String, partition: String = null): Unit = send(message.getBytes("UTF8"), if (partition == null) null else partition.getBytes("UTF8")) + def send(message: String, partition: String = null): Unit = send(message.getBytes("UTF8").toList, if (partition == null) null else partition.getBytes("UTF8").toList) - def send(message: Array[Byte], partition: Array[Byte]): Unit = { + def send(message: List[Byte], partition: List[Byte]): Unit = { try { producer.send(kafkaMesssage(message, partition)) } catch {