Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Upgrade dependencies & separate common version literals into val + etc #739

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 10 additions & 12 deletions app/kafka/manager/actor/cluster/KafkaStateActor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ case class KafkaAdminClientActor(config: KafkaAdminClientActorConfig) extends Ba
val options = new DescribeConsumerGroupsOptions
options.timeoutMs(1000)
client.describeConsumerGroups(groupList.asJava, options).all().whenComplete {
(mapGroupDescription, error) => mapGroupDescription.asScala.foreach {
(mapGroupDescription, _) => mapGroupDescription.asScala.foreach {
case (group, desc) =>
enqueue.offer(group -> desc.members().asScala.map(m => MemberMetadata.from(group, desc, m)).toList)
}
Expand Down Expand Up @@ -438,7 +438,7 @@ object ConsumerInstanceSubscriptions extends Logging {
import org.json4s.jackson.JsonMethods.parse
import org.json4s.scalaz.JsonScalaz.field
val json = parse(jsonString)
val subs: Map[String, Int] = field[Map[String,Int]]("subscription")(json).fold({ e =>
val subs: Map[String, Int] = field[Map[String,Int]]("subscription")(json).fold({ _ =>
error(s"[consumer=$consumer] Failed to parse consumer instance subscriptions : $id : $jsonString"); Map.empty}, identity)
new ConsumerInstanceSubscriptions(id, subs)
}
Expand Down Expand Up @@ -483,10 +483,8 @@ trait OffsetCache extends Logging {
// Get partition leader broker information
val optPartitionsWithLeaders : Option[List[(Int, Option[BrokerIdentity])]] = getTopicPartitionLeaders(topic)

val clientId = "partitionOffsetGetter"
val time = -1
val nOffsets = 1
val simpleConsumerBufferSize = 256 * 1024
val currentActiveBrokerSet:Set[String] = getBrokerList().list.map(_.host).toSet

val partitionsByBroker = optPartitionsWithLeaders.map {
Expand Down Expand Up @@ -672,27 +670,27 @@ trait OffsetCache extends Logging {
val (partitionOffsets, partitionOwners) = consumerType match {
case ZKManagedConsumer =>
val partitionOffsets = for {
td <- optTopic
_ <- optTopic
tpi <- optTpi
} yield {
readConsumerOffsetByTopicPartition(consumer, topic, tpi)
}
val partitionOwners = for {
td <- optTopic
_ <- optTopic
tpi <- optTpi
} yield {
readConsumerOwnerByTopicPartition(consumer, topic, tpi)
}
(partitionOffsets, partitionOwners)
case KafkaManagedConsumer =>
val partitionOffsets = for {
td <- optTopic
_ <- optTopic
tpi <- optTpi
} yield {
readKafkaManagedConsumerOffsetByTopicPartition(consumer, topic, tpi)
}
val partitionOwners = for {
td <- optTopic
_ <- optTopic
tpi <- optTpi
} yield {
readKafkaManagedConsumerOwnerByTopicPartition(consumer, topic, tpi)
Expand Down Expand Up @@ -818,7 +816,7 @@ case class OffsetCacheActive(curator: CuratorFramework
IndexedSeq.empty[ConsumerNameAndType]
} { data: java.util.Map[String, ChildData] =>
data.asScala.filter{
case (consumer, childData) =>
case (_, childData) =>
if (clusterContext.config.filterConsumers)
// Defining "inactive consumer" as a consumer that is missing one of three children ids/ offsets/ or owners/
childData.getStat.getNumChildren > 2
Expand Down Expand Up @@ -1203,7 +1201,7 @@ class KafkaStateActor(config: KafkaStateActorConfig) extends BaseClusterQueryCom
states.map(_.map{case (part, state) =>
val partition = part.toInt
val descJson = parse(state)
val leaderID = field[Int]("leader")(descJson).fold({ e =>
val leaderID = field[Int]("leader")(descJson).fold({ _ =>
log.error(s"[topic=$topic] Failed to get partitions from topic json $state"); 0}, identity)
val leader = targetBrokers.find(_.id == leaderID)
(partition, leader)
Expand Down Expand Up @@ -1444,7 +1442,7 @@ class KafkaStateActor(config: KafkaStateActorConfig) extends BaseClusterQueryCom
if (shutdown) {
return
}
var optPartitionsWithLeaders : Option[List[(Int, Option[BrokerIdentity])]] = getPartitionLeaders(topic)
val optPartitionsWithLeaders : Option[List[(Int, Option[BrokerIdentity])]] = getPartitionLeaders(topic)
optPartitionsWithLeaders match {
case Some(leaders) =>
leaders.foreach(leader => {
Expand Down Expand Up @@ -1492,7 +1490,7 @@ class KafkaStateActor(config: KafkaStateActorConfig) extends BaseClusterQueryCom
try {
kafkaConsumer = Option(new KafkaConsumer(consumerProperties))
val request = tpList.map(f => new TopicPartition(f._1.topic(), f._1.partition()))
var tpOffsetMapOption = kafkaConsumer.map(_.endOffsets(request.asJavaCollection).asScala)
val tpOffsetMapOption = kafkaConsumer.map(_.endOffsets(request.asJavaCollection).asScala)

var topicOffsetMap: Map[Int, Long] = null
tpOffsetMapOption.foreach(tpOffsetMap => tpOffsetMap.keys.foreach(tp => {
Expand Down
14 changes: 7 additions & 7 deletions app/kafka/manager/model/ActorModel.scala
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,7 @@ object ActorModel {
val portResult = fieldExtended[Int]("port")(json)
val jmxPortResult = fieldExtended[Int]("jmx_port")(json)
val hostPortResult: JsonScalaz.Result[(String, Map[SecurityProtocol, Int])] = json.findField(_._1 == "endpoints").map(_ => fieldExtended[List[String]]("endpoints")(json))
.fold((hostResult |@| portResult |@| DEFAULT_SECURE)((a, b, c) => (a, Map(PLAINTEXT.asInstanceOf[SecurityProtocol] -> b)))){
.fold((hostResult |@| portResult |@| DEFAULT_SECURE)((a, b, _) => (a, Map(PLAINTEXT.asInstanceOf[SecurityProtocol] -> b)))){
r =>
r.flatMap {
endpointList =>
Expand Down Expand Up @@ -296,19 +296,19 @@ object ActorModel {
}
result
} else {
(hostResult |@| portResult |@| DEFAULT_SECURE)((a, b, c) => (a, Map(PLAINTEXT.asInstanceOf[SecurityProtocol] -> b)))
(hostResult |@| portResult |@| DEFAULT_SECURE)((a, b, _) => (a, Map(PLAINTEXT.asInstanceOf[SecurityProtocol] -> b)))
}
}
}
for {
tpl <- hostPortResult
host = tpl._1
port = tpl._2
secure = (tpl._2.contains(PLAINTEXT) && tpl._2.size > 1) || (!tpl._2.contains(PLAINTEXT) && tpl._2.nonEmpty)
nonSecure = tpl._2.contains(PLAINTEXT)
secure = (port.contains(PLAINTEXT) && port.size > 1) || (!port.contains(PLAINTEXT) && port.nonEmpty)
nonSecure = port.contains(PLAINTEXT)
jmxPort <- jmxPortResult
} yield {
BrokerIdentity(brokerId, host, jmxPort, secure, nonSecure, tpl._2)
BrokerIdentity(brokerId, host, jmxPort, secure, nonSecure, port)
}
}
}
Expand Down Expand Up @@ -494,7 +494,7 @@ import scala.language.reflectiveCalls
private[this] def getPartitionReplicaMap(td: TopicDescription) : Map[String, List[Int]] = {
// Get the topic description information
val descJson = parse(td.description._2)
field[Map[String,List[Int]]]("partitions")(descJson).fold({ e =>
field[Map[String,List[Int]]]("partitions")(descJson).fold({ _ =>
logger.error(s"[topic=${td.topic}] Failed to get partitions from topic json ${td.description._2}")
Map.empty
}, identity)
Expand Down Expand Up @@ -550,7 +550,7 @@ import scala.language.reflectiveCalls
try {
val resultOption: Option[(Int,Map[String, String])] = td.config.map { configString =>
val configJson = parse(configString._2)
val configMap : Map[String, String] = field[Map[String,String]]("config")(configJson).fold({ e =>
val configMap : Map[String, String] = field[Map[String,String]]("config")(configJson).fold({ _ =>
logger.error(s"Failed to parse topic config ${configString._2}")
Map.empty
}, identity)
Expand Down
29 changes: 1 addition & 28 deletions app/kafka/manager/utils/two40/GroupMetadataManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -133,12 +133,6 @@ case class CommitRecordMetadataAndOffset(appendedBatchOffset: Option[Long], offs
}

object GroupMetadata {
private val validPreviousStates: Map[GroupState, Set[GroupState]] =
Map(Dead -> Set(Stable, PreparingRebalance, CompletingRebalance, Empty, Dead),
CompletingRebalance -> Set(PreparingRebalance),
Stable -> Set(CompletingRebalance),
PreparingRebalance -> Set(Stable, CompletingRebalance, Empty),
Empty -> Set(PreparingRebalance))

def loadGroup(groupId: String,
initialState: GroupState,
Expand Down Expand Up @@ -179,22 +173,18 @@ class GroupMetadata(val groupId: String, initialState: GroupState, time: Time) e

private[two40] val lock = new ReentrantLock

private var state: GroupState = initialState
private val state: GroupState = initialState
var currentStateTimestamp: Option[Long] = Some(time.milliseconds())
var protocolType: Option[String] = None
var generationId = 0
private var leaderId: Option[String] = None
private var protocol: Option[String] = None

private val members = new mutable.HashMap[String, MemberMetadata]
private val pendingMembers = new mutable.HashSet[String]
private var numMembersAwaitingJoin = 0
private val supportedProtocols = new mutable.HashMap[String, Integer]().withDefaultValue(0)
private val offsets = new mutable.HashMap[TopicPartition, CommitRecordMetadataAndOffset]
private val pendingOffsetCommits = new mutable.HashMap[TopicPartition, OffsetAndMetadata]
private val pendingTransactionalOffsetCommits = new mutable.HashMap[Long, mutable.Map[TopicPartition, CommitRecordMetadataAndOffset]]()
private var receivedTransactionalOffsetCommits = false
private var receivedConsumerOffsetCommits = false

var newMemberAdded: Boolean = false

Expand Down Expand Up @@ -828,23 +818,6 @@ object GroupMetadataManager {
}
}

private def parseOffsets(offsetKey: OffsetKey, payload: ByteBuffer): (Option[String], Option[String]) = {
val groupId = offsetKey.key.group
val topicPartition = offsetKey.key.topicPartition
val keyString = s"offset_commit::group=$groupId,partition=$topicPartition"

val offset = GroupMetadataManager.readOffsetMessageValue(payload)
val valueString = if (offset == null) {
"<DELETE>"
} else {
if (offset.metadata.isEmpty)
s"offset=${offset.offset}"
else
s"offset=${offset.offset},metadata=${offset.metadata}"
}

(Some(keyString), Some(valueString))
}
}

case class GroupTopicPartition(group: String, topicPartition: TopicPartition) {
Expand Down
47 changes: 26 additions & 21 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -21,35 +21,40 @@ assemblyMergeStrategy in assembly := {
case other => (assemblyMergeStrategy in assembly).value(other)
}

val akkaVersion = "2.6.2"
val curatorVersion = "4.2.0"
val json4sVersion = "3.6.7"
val kafkaVersion = "2.4.0"

libraryDependencies ++= Seq(
"com.typesafe.akka" %% "akka-actor" % "2.5.19",
"com.typesafe.akka" %% "akka-slf4j" % "2.5.19",
"com.typesafe.akka" %% "akka-actor" % akkaVersion,
"com.typesafe.akka" %% "akka-slf4j" % akkaVersion,
"com.google.code.findbugs" % "jsr305" % "3.0.2",
"org.webjars" %% "webjars-play" % "2.6.3",
"org.webjars" % "bootstrap" % "4.3.1",
"org.webjars" % "jquery" % "3.3.1-2",
"org.webjars" %% "webjars-play" % "2.8.0",
"org.webjars" % "bootstrap" % "4.4.1",
"org.webjars" % "jquery" % "3.4.0",
"org.webjars" % "backbonejs" % "1.3.3",
"org.webjars" % "underscorejs" % "1.9.0",
"org.webjars" % "dustjs-linkedin" % "2.7.2",
"org.webjars" % "octicons" % "4.3.0",
"org.apache.curator" % "curator-framework" % "2.12.0" exclude("log4j","log4j") exclude("org.slf4j", "slf4j-log4j12") force(),
"org.apache.curator" % "curator-recipes" % "2.12.0" exclude("log4j","log4j") exclude("org.slf4j", "slf4j-log4j12") force(),
"org.json4s" %% "json4s-jackson" % "3.6.5",
"org.json4s" %% "json4s-scalaz" % "3.6.5",
"org.slf4j" % "log4j-over-slf4j" % "1.7.25",
"com.adrianhurt" %% "play-bootstrap" % "1.4-P26-B4" exclude("com.typesafe.play", "*"),
"org.apache.curator" % "curator-framework" % curatorVersion exclude("log4j","log4j") exclude("org.slf4j", "slf4j-log4j12") force(),
"org.apache.curator" % "curator-recipes" % curatorVersion exclude("log4j","log4j") exclude("org.slf4j", "slf4j-log4j12") force(),
"org.json4s" %% "json4s-jackson" % json4sVersion,
"org.json4s" %% "json4s-scalaz" % json4sVersion,
"org.slf4j" % "log4j-over-slf4j" % "1.7.30",
"com.adrianhurt" %% "play-bootstrap" % "1.5.1-P27-B3" exclude("com.typesafe.play", "*"),
"org.clapper" %% "grizzled-slf4j" % "1.3.3",
"org.apache.kafka" %% "kafka" % "2.4.0" exclude("log4j","log4j") exclude("org.slf4j", "slf4j-log4j12") force(),
"org.apache.kafka" % "kafka-streams" % "2.2.0",
"com.beachape" %% "enumeratum" % "1.5.13",
"com.github.ben-manes.caffeine" % "caffeine" % "2.6.2",
"com.typesafe.play" %% "play-logback" % "2.6.21",
"org.scalatest" %% "scalatest" % "3.0.5" % "test",
"org.scalatestplus.play" %% "scalatestplus-play" % "3.1.2" % "test",
"org.apache.curator" % "curator-test" % "2.12.0" % "test",
"org.mockito" % "mockito-core" % "1.10.19" % "test",
"org.apache.kafka" %% "kafka" % kafkaVersion exclude("log4j","log4j") exclude("org.slf4j", "slf4j-log4j12") force(),
"org.apache.kafka" % "kafka-streams" % kafkaVersion,
"com.beachape" %% "enumeratum" % "1.5.14",
"com.github.ben-manes.caffeine" % "caffeine" % "2.8.0",
"com.typesafe.play" %% "play-logback" % "2.8.0",
"org.scalatest" %% "scalatest" % "3.1.0" % "test",
"org.scalatestplus.play" %% "scalatestplus-play" % "4.0.3" % "test",
"org.apache.curator" % "curator-test" % curatorVersion % "test",
"org.mockito" % "mockito-core" % "3.2.4" % "test",
"com.yammer.metrics" % "metrics-core" % "2.2.0" force(),
"com.unboundid" % "unboundid-ldapsdk" % "4.0.9"
"com.unboundid" % "unboundid-ldapsdk" % "4.0.14"
)

routesGenerator := InjectedRoutesGenerator
Expand Down