From 9ef88af2854e24486758d18382a71d355d111720 Mon Sep 17 00:00:00 2001 From: Ben Carter Date: Sat, 29 Oct 2022 21:22:58 +0100 Subject: [PATCH 1/4] Update Kamon --- project/Dependencies.scala | 2 +- .../com/sky/kms/monitoring/KamonMonitoring.scala | 14 -------------- .../scala/com/sky/kms/monitoring/Monitoring.scala | 8 -------- 3 files changed, 1 insertion(+), 23 deletions(-) delete mode 100644 scheduler/src/main/scala/com/sky/kms/monitoring/KamonMonitoring.scala delete mode 100644 scheduler/src/main/scala/com/sky/kms/monitoring/Monitoring.scala diff --git a/project/Dependencies.scala b/project/Dependencies.scala index 186f07db..33ee3b07 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -32,7 +32,7 @@ object Dependencies { } object Kamon { - private val version = "2.5.1" + private val version = "2.5.10" val core = "io.kamon" %% "kamon-core" % version val akka = "io.kamon" %% "kamon-akka" % version val prometheus = "io.kamon" %% "kamon-prometheus" % version diff --git a/scheduler/src/main/scala/com/sky/kms/monitoring/KamonMonitoring.scala b/scheduler/src/main/scala/com/sky/kms/monitoring/KamonMonitoring.scala deleted file mode 100644 index 4d5da81a..00000000 --- a/scheduler/src/main/scala/com/sky/kms/monitoring/KamonMonitoring.scala +++ /dev/null @@ -1,14 +0,0 @@ -package com.sky.kms.monitoring - -import kamon.Kamon - -class KamonMonitoring extends Monitoring { - - private val messages = Kamon.counter("scheduler-messages") - private val scheduledMessages = messages.withTag("status", "scheduled") - private val cancelledMessages = messages.withTag("status", "cancelled") - - override def scheduleReceived(): Unit = scheduledMessages.increment() - - override def scheduleDone(): Unit = cancelledMessages.increment() -} diff --git a/scheduler/src/main/scala/com/sky/kms/monitoring/Monitoring.scala b/scheduler/src/main/scala/com/sky/kms/monitoring/Monitoring.scala deleted file mode 100644 index 8219b3ab..00000000 --- a/scheduler/src/main/scala/com/sky/kms/monitoring/Monitoring.scala +++ /dev/null @@ -1,8 +0,0 @@ -package com.sky.kms.monitoring - -trait Monitoring { - - def scheduleReceived(): Unit - - def scheduleDone(): Unit -} From b10ae2d725c0cd7a69b8bbca63d4bf3c84a0395f Mon Sep 17 00:00:00 2001 From: Ben Carter Date: Sat, 29 Oct 2022 21:24:02 +0100 Subject: [PATCH 2/4] Add ScheduleGauge --- .../main/scala/com/sky/kms/SchedulerApp.scala | 6 +++-- .../com/sky/kms/actors/PublisherActor.scala | 9 +++++--- .../com/sky/kms/actors/SchedulingActor.scala | 19 ++++++++++------ .../sky/kms/monitoring/ScheduleGauge.scala | 22 +++++++++++++++++++ 4 files changed, 44 insertions(+), 12 deletions(-) create mode 100644 scheduler/src/main/scala/com/sky/kms/monitoring/ScheduleGauge.scala diff --git a/scheduler/src/main/scala/com/sky/kms/SchedulerApp.scala b/scheduler/src/main/scala/com/sky/kms/SchedulerApp.scala index de65e2f1..45221125 100644 --- a/scheduler/src/main/scala/com/sky/kms/SchedulerApp.scala +++ b/scheduler/src/main/scala/com/sky/kms/SchedulerApp.scala @@ -4,6 +4,7 @@ import akka.actor.{ActorRef, ActorSystem} import akka.kafka.scaladsl.Consumer.Control import com.sky.kms.actors._ import com.sky.kms.config.Configured +import com.sky.kms.monitoring.ScheduleGauge import com.sky.kms.streams.{ScheduleReader, ScheduledMessagePublisher} import kamon.Kamon import kamon.jmx.collector.KamonJmxMetricCollector @@ -21,8 +22,9 @@ object SchedulerApp { case class Running(reader: ScheduleReader.Running[Future[Control]], publisher: ScheduledMessagePublisher.Running) def configure(implicit system: ActorSystem): Configured[SchedulerApp] = { - val publisherActor = PublisherActor.create - val schedulingActor = SchedulingActor.create(publisherActor) + val scheduleGauge = ScheduleGauge.kamon() + val publisherActor = PublisherActor.create(scheduleGauge) + val schedulingActor = SchedulingActor.create(publisherActor, scheduleGauge) TerminatorActor.create(schedulingActor, publisherActor) for { diff --git a/scheduler/src/main/scala/com/sky/kms/actors/PublisherActor.scala b/scheduler/src/main/scala/com/sky/kms/actors/PublisherActor.scala index dc23171a..0498a0a6 100644 --- a/scheduler/src/main/scala/com/sky/kms/actors/PublisherActor.scala +++ b/scheduler/src/main/scala/com/sky/kms/actors/PublisherActor.scala @@ -8,10 +8,11 @@ import com.sky.kms.Start import com.sky.kms.actors.PublisherActor.{DownstreamFailure, Init, ScheduleQueue, Trigger} import com.sky.kms.domain.PublishableMessage.ScheduledMessage import com.sky.kms.domain.{ScheduleEvent, ScheduleId, ScheduleQueueOfferResult} +import com.sky.kms.monitoring.ScheduleGauge import scala.util.{Failure, Success} -class PublisherActor extends Actor with ActorLogging { +class PublisherActor(scheduleGauge: ScheduleGauge) extends Actor with ActorLogging { implicit val ec = context.dispatcher @@ -25,8 +26,10 @@ class PublisherActor extends Actor with ActorLogging { private def receiveWithQueue(queue: ScheduleQueue): Receive = { case Trigger(scheduleId, schedule) => queue.offer((scheduleId, messageFrom(schedule))) onComplete { case Success(QueueOfferResult.Enqueued) => + scheduleGauge.onDelete() log.debug(ScheduleQueueOfferResult(scheduleId, QueueOfferResult.Enqueued).show) case Success(res) => + scheduleGauge.onDelete() log.warning(ScheduleQueueOfferResult(scheduleId, res).show) case Failure(t) => log.error(t, s"Failed to enqueue $scheduleId") @@ -53,8 +56,8 @@ object PublisherActor { case class DownstreamFailure(t: Throwable) - def create(implicit system: ActorSystem): ActorRef = - system.actorOf(Props[PublisherActor](), "publisher-actor") + def create(scheduleGauge: ScheduleGauge)(implicit system: ActorSystem): ActorRef = + system.actorOf(Props(new PublisherActor(scheduleGauge)), "publisher-actor") def init(queue: ScheduleQueue): Start[Unit] = Start(_.publisherActor ! Init(queue)) diff --git a/scheduler/src/main/scala/com/sky/kms/actors/SchedulingActor.scala b/scheduler/src/main/scala/com/sky/kms/actors/SchedulingActor.scala index c9e69fbe..b086c788 100644 --- a/scheduler/src/main/scala/com/sky/kms/actors/SchedulingActor.scala +++ b/scheduler/src/main/scala/com/sky/kms/actors/SchedulingActor.scala @@ -8,7 +8,7 @@ import monix.execution.{Cancelable, Scheduler => MonixScheduler} import scala.collection.mutable -class SchedulingActor(publisher: ActorRef, monixScheduler: MonixScheduler, monitoring: Monitoring) +class SchedulingActor(publisher: ActorRef, monixScheduler: MonixScheduler, scheduleGauge: ScheduleGauge) extends Actor with ActorLogging { @@ -29,7 +29,7 @@ class SchedulingActor(publisher: ActorRef, monixScheduler: MonixScheduler, monit val finishInitialisation: Receive = { case Initialised => log.debug("State initialised - scheduling stored schedules") val scheduled = schedules.map { case (scheduleId, schedule) => - monitoring.scheduleReceived() + scheduleGauge.onUpdate() scheduleId -> scheduleOnce(scheduleId, schedule) } log.info("Reloaded state has been scheduled") @@ -45,19 +45,24 @@ class SchedulingActor(publisher: ActorRef, monixScheduler: MonixScheduler, monit val handleSchedulingMessage: Receive = { case CreateOrUpdate(scheduleId: ScheduleId, schedule: ScheduleEvent) => - scheduled.get(scheduleId).foreach(_.cancel()) + scheduled.get(scheduleId).foreach { schedule => + schedule.cancel() + scheduleGauge.onDelete() + log.info(s"Cancelled and updated $scheduleId") + } + val cancellable = scheduleOnce(scheduleId, schedule) log.info( s"Scheduled $scheduleId from ${schedule.inputTopic} to ${schedule.outputTopic} in ${schedule.delay.toMillis} millis" ) - monitoring.scheduleReceived() scheduled += (scheduleId -> cancellable) + scheduleGauge.onUpdate() case Cancel(scheduleId: String) => scheduled.get(scheduleId).foreach { schedule => schedule.cancel() - monitoring.scheduleDone() + scheduleGauge.onDelete() log.info(s"Cancelled $scheduleId") } scheduled -= scheduleId @@ -98,9 +103,9 @@ object SchedulingActor { case class UpstreamFailure(t: Throwable) - def create(publisherActor: ActorRef)(implicit system: ActorSystem): ActorRef = + def create(publisherActor: ActorRef, scheduleGauge: ScheduleGauge)(implicit system: ActorSystem): ActorRef = system.actorOf( - Props(new SchedulingActor(publisherActor, MonixScheduler(system.dispatcher), new KamonMonitoring())), + Props(new SchedulingActor(publisherActor, MonixScheduler(system.dispatcher), scheduleGauge)), "scheduling-actor" ) } diff --git a/scheduler/src/main/scala/com/sky/kms/monitoring/ScheduleGauge.scala b/scheduler/src/main/scala/com/sky/kms/monitoring/ScheduleGauge.scala new file mode 100644 index 00000000..0aecc438 --- /dev/null +++ b/scheduler/src/main/scala/com/sky/kms/monitoring/ScheduleGauge.scala @@ -0,0 +1,22 @@ +package com.sky.kms.monitoring + +import kamon.Kamon + +trait ScheduleGauge { + + def onUpdate(): Unit + + def onDelete(): Unit + +} + +object ScheduleGauge { + def kamon(): ScheduleGauge = new ScheduleGauge { + + private val gauge = Kamon.gauge("scheduler-messages").withTag("status", "scheduled") + + override def onUpdate(): Unit = gauge.increment() + + override def onDelete(): Unit = gauge.decrement() + } +} From db5dc1540ddbb660ec3c69e4f035dae6cafb3604 Mon Sep 17 00:00:00 2001 From: Ben Carter Date: Sat, 29 Oct 2022 21:24:17 +0100 Subject: [PATCH 3/4] Test gauge in actors --- .../com/sky/kms/e2e/SchedulerIntSpec.scala | 33 ++++++++++++-- .../com/sky/kms/unit/PublisherActorSpec.scala | 30 ++++++++++++- .../sky/kms/unit/SchedulingActorSpec.scala | 45 ++++++++++++++----- .../scala/com/sky/kms/utils/MockGauge.scala | 15 +++++++ 4 files changed, 107 insertions(+), 16 deletions(-) create mode 100644 scheduler/src/test/scala/com/sky/kms/utils/MockGauge.scala diff --git a/scheduler/src/test/scala/com/sky/kms/e2e/SchedulerIntSpec.scala b/scheduler/src/test/scala/com/sky/kms/e2e/SchedulerIntSpec.scala index 91caa995..b92aaa25 100644 --- a/scheduler/src/test/scala/com/sky/kms/e2e/SchedulerIntSpec.scala +++ b/scheduler/src/test/scala/com/sky/kms/e2e/SchedulerIntSpec.scala @@ -7,8 +7,10 @@ import com.sky.kms.domain._ import com.sky.kms.utils.TestDataUtils._ import eu.timepit.refined.auto._ import io.github.embeddedkafka.Codecs._ +import kamon.Kamon +import kamon.testkit.{InstrumentInspection, MetricInspection} -class SchedulerIntSpec extends SchedulerIntSpecBase { +class SchedulerIntSpec extends SchedulerIntSpecBase with MetricInspection.Syntax with InstrumentInspection.Syntax { "Scheduler stream" should { "schedule a message to be sent to Kafka and delete it after it has been emitted" in new TestContext { @@ -23,14 +25,39 @@ class SchedulerIntSpec extends SchedulerIntSpecBase { } } } + + "increment the internal store when a message is scheduled and remove it when it emits the scheduled message" in new TestContext { + withRunningKafka { + withSchedulerApp { + val schedules = createSchedules(1, forTopics = List(scheduleTopic), 10L) + + publish(schedules) + + eventually { + val g = Kamon.gauge("foobar").withoutTags() + gaugeInstrumentInspection(g) + + } + +// publish(schedules) +// .foreach(assertMessagesWrittenFrom(_, schedules)) + +// assertTombstoned(schedules) + } + } + } } private class TestContext { - def createSchedules(numSchedules: Int, forTopics: List[String]): List[(ScheduleId, ScheduleEvent)] = + def createSchedules( + numSchedules: Int, + forTopics: List[String], + expireTime: Long = 4 + ): List[(ScheduleId, ScheduleEvent)] = random[(ScheduleId, ScheduleEvent)](numSchedules).toList .zip(LazyList.continually(forTopics.to(LazyList)).flatten.take(numSchedules).toList) .map { case ((id, schedule), topic) => - id -> schedule.copy(inputTopic = topic).secondsFromNow(4) + id -> schedule.copy(inputTopic = topic).secondsFromNow(expireTime) } def publish: List[(ScheduleId, ScheduleEvent)] => List[OffsetDateTime] = _.map { case (id, scheduleEvent) => diff --git a/scheduler/src/test/scala/com/sky/kms/unit/PublisherActorSpec.scala b/scheduler/src/test/scala/com/sky/kms/unit/PublisherActorSpec.scala index f8ffeb75..a0dc6518 100644 --- a/scheduler/src/test/scala/com/sky/kms/unit/PublisherActorSpec.scala +++ b/scheduler/src/test/scala/com/sky/kms/unit/PublisherActorSpec.scala @@ -2,11 +2,13 @@ package com.sky.kms.unit import java.util.UUID +import akka.stream.QueueOfferResult import akka.testkit.TestActorRef import com.sky.kms.actors.PublisherActor import com.sky.kms.actors.PublisherActor._ import com.sky.kms.base.AkkaSpecBase import com.sky.kms.domain.{ScheduleEvent, ScheduleId} +import com.sky.kms.utils.MockGauge import com.sky.kms.utils.TestDataUtils._ import org.mockito.Mockito._ import org.scalatestplus.mockito.MockitoSugar @@ -53,11 +55,37 @@ class PublisherActorSpec extends AkkaSpecBase with MockitoSugar { expectTerminated(publisherActor) } + "decrement schedule counter when message is queued" in new TestContext { + val (scheduleId, schedule) = generateSchedule + + when(mockSourceQueue.offer((scheduleId, schedule.toScheduledMessage))) + .thenReturn(Future.successful(QueueOfferResult.Enqueued)) + + publisherActor ! Trigger(scheduleId, schedule) + + eventually { + mockGauge.counter.get() shouldBe -1L + } + } + + "decrement schedule counter when message is dropped" in new TestContext { + val (scheduleId, schedule) = generateSchedule + + when(mockSourceQueue.offer((scheduleId, schedule.toScheduledMessage))) + .thenReturn(Future.successful(QueueOfferResult.Dropped)) + + publisherActor ! Trigger(scheduleId, schedule) + + eventually { + mockGauge.counter.get() shouldBe -1L + } + } } private class TestContext { val mockSourceQueue = mock[ScheduleQueue] - val publisherActor = TestActorRef(new PublisherActor) + val mockGauge = new MockGauge() + val publisherActor = TestActorRef(new PublisherActor(mockGauge)) when(mockSourceQueue.watchCompletion()).thenReturn(Future.never) publisherActor ! Init(mockSourceQueue) diff --git a/scheduler/src/test/scala/com/sky/kms/unit/SchedulingActorSpec.scala b/scheduler/src/test/scala/com/sky/kms/unit/SchedulingActorSpec.scala index a504b315..926ab61f 100644 --- a/scheduler/src/test/scala/com/sky/kms/unit/SchedulingActorSpec.scala +++ b/scheduler/src/test/scala/com/sky/kms/unit/SchedulingActorSpec.scala @@ -9,7 +9,7 @@ import com.sky.kms.actors.SchedulingActor import com.sky.kms.actors.SchedulingActor._ import com.sky.kms.base.AkkaSpecBase import com.sky.kms.domain._ -import com.sky.kms.utils.SimpleCounterMonitoring +import com.sky.kms.utils.MockGauge import com.sky.kms.utils.TestDataUtils._ import monix.execution.schedulers.TestScheduler import org.scalatest.concurrent.Eventually @@ -96,35 +96,60 @@ class SchedulingActorSpec extends AkkaSpecBase with ImplicitSender with MockitoS expectTerminated(schedulingActor) } - "update monitoring when new schedule is received" in new Initialised { + "increment schedule counter when new schedule is received" in new Initialised { val (scheduleId, schedule) = generateSchedule createSchedule(scheduleId, schedule) eventually { - scheduleReceivedCounter shouldBe 1L + mockGauge.counter.get() shouldBe 1L } } - "update monitoring when a cancel message is received" in new Initialised { + "keep the schedule counter the same when updating a previous schedule" in new Initialised { val (scheduleId, schedule) = generateSchedule createSchedule(scheduleId, schedule) + val updatedSchedule = schedule.copy(delay = schedule.delay + 5.minutes) + createSchedule(scheduleId, updatedSchedule) + + eventually { + mockGauge.counter.get() shouldBe 1L + } + + advanceToTimeFrom(schedule) + probe.expectNoMessage(NoMsgTimeout) + + advanceToTimeFrom(updatedSchedule) + probe.expectMsg(Trigger(scheduleId, updatedSchedule)) + + eventually { + mockGauge.counter.get() shouldBe 1L + } + } + + "decrement schedule counter when a cancel message is received" in new Initialised { + val (scheduleId, schedule) = generateSchedule + createSchedule(scheduleId, schedule) + + eventually { + mockGauge.counter.get() shouldBe 1L + } + cancelSchedule(scheduleId) eventually { - scheduleDoneCounter shouldBe 1L + mockGauge.counter.get() shouldBe 0L } } } private class TestContext { - val monitoring = new SimpleCounterMonitoring() + val mockGauge = new MockGauge() val testScheduler = TestScheduler() val probe = TestProbe() - val schedulingActor = TestActorRef(new SchedulingActor(probe.ref, testScheduler, monitoring)) - val now = System.currentTimeMillis() + val schedulingActor = TestActorRef(new SchedulingActor(probe.ref, testScheduler, mockGauge)) def advanceToTimeFrom(schedule: ScheduleEvent): Unit = testScheduler.tick(schedule.delay) @@ -138,10 +163,6 @@ class SchedulingActorSpec extends AkkaSpecBase with ImplicitSender with MockitoS schedulingActor ! Cancel(scheduleId) expectMsg(Ack) } - - def scheduleReceivedCounter: Long = monitoring.scheduleReceivedCounter.get() - - def scheduleDoneCounter: Long = monitoring.scheduleDoneCounter.get() } private class Initialised extends TestContext { diff --git a/scheduler/src/test/scala/com/sky/kms/utils/MockGauge.scala b/scheduler/src/test/scala/com/sky/kms/utils/MockGauge.scala new file mode 100644 index 00000000..7054571e --- /dev/null +++ b/scheduler/src/test/scala/com/sky/kms/utils/MockGauge.scala @@ -0,0 +1,15 @@ +package com.sky.kms.utils + +import java.util.concurrent.atomic.AtomicLong + +import com.sky.kms.monitoring.ScheduleGauge + +class MockGauge extends ScheduleGauge { + + val counter: AtomicLong = new AtomicLong() + + override def onUpdate(): Unit = counter.incrementAndGet() + + override def onDelete(): Unit = counter.decrementAndGet() + +} From 95175c7329867b9485b039d388ffadd8a9baede0 Mon Sep 17 00:00:00 2001 From: Ben Carter Date: Sat, 29 Oct 2022 21:31:51 +0100 Subject: [PATCH 4/4] Fix changed test --- .../sky/kms/monitoring/ScheduleGauge.scala | 44 +++++++++---------- .../com/sky/kms/e2e/SchedulerIntSpec.scala | 33 ++------------ .../scala/com/sky/kms/utils/MockGauge.scala | 26 +++++------ .../kms/utils/SimpleCounterMonitoring.scala | 16 ------- 4 files changed, 38 insertions(+), 81 deletions(-) delete mode 100644 scheduler/src/test/scala/com/sky/kms/utils/SimpleCounterMonitoring.scala diff --git a/scheduler/src/main/scala/com/sky/kms/monitoring/ScheduleGauge.scala b/scheduler/src/main/scala/com/sky/kms/monitoring/ScheduleGauge.scala index 0aecc438..e5a1cf78 100644 --- a/scheduler/src/main/scala/com/sky/kms/monitoring/ScheduleGauge.scala +++ b/scheduler/src/main/scala/com/sky/kms/monitoring/ScheduleGauge.scala @@ -1,22 +1,22 @@ -package com.sky.kms.monitoring - -import kamon.Kamon - -trait ScheduleGauge { - - def onUpdate(): Unit - - def onDelete(): Unit - -} - -object ScheduleGauge { - def kamon(): ScheduleGauge = new ScheduleGauge { - - private val gauge = Kamon.gauge("scheduler-messages").withTag("status", "scheduled") - - override def onUpdate(): Unit = gauge.increment() - - override def onDelete(): Unit = gauge.decrement() - } -} +package com.sky.kms.monitoring + +import kamon.Kamon + +trait ScheduleGauge { + + def onUpdate(): Unit + + def onDelete(): Unit + +} + +object ScheduleGauge { + def kamon(): ScheduleGauge = new ScheduleGauge { + + private val gauge = Kamon.gauge("scheduler-messages").withTag("status", "scheduled") + + override def onUpdate(): Unit = gauge.increment() + + override def onDelete(): Unit = gauge.decrement() + } +} diff --git a/scheduler/src/test/scala/com/sky/kms/e2e/SchedulerIntSpec.scala b/scheduler/src/test/scala/com/sky/kms/e2e/SchedulerIntSpec.scala index b92aaa25..91caa995 100644 --- a/scheduler/src/test/scala/com/sky/kms/e2e/SchedulerIntSpec.scala +++ b/scheduler/src/test/scala/com/sky/kms/e2e/SchedulerIntSpec.scala @@ -7,10 +7,8 @@ import com.sky.kms.domain._ import com.sky.kms.utils.TestDataUtils._ import eu.timepit.refined.auto._ import io.github.embeddedkafka.Codecs._ -import kamon.Kamon -import kamon.testkit.{InstrumentInspection, MetricInspection} -class SchedulerIntSpec extends SchedulerIntSpecBase with MetricInspection.Syntax with InstrumentInspection.Syntax { +class SchedulerIntSpec extends SchedulerIntSpecBase { "Scheduler stream" should { "schedule a message to be sent to Kafka and delete it after it has been emitted" in new TestContext { @@ -25,39 +23,14 @@ class SchedulerIntSpec extends SchedulerIntSpecBase with MetricInspection.Syntax } } } - - "increment the internal store when a message is scheduled and remove it when it emits the scheduled message" in new TestContext { - withRunningKafka { - withSchedulerApp { - val schedules = createSchedules(1, forTopics = List(scheduleTopic), 10L) - - publish(schedules) - - eventually { - val g = Kamon.gauge("foobar").withoutTags() - gaugeInstrumentInspection(g) - - } - -// publish(schedules) -// .foreach(assertMessagesWrittenFrom(_, schedules)) - -// assertTombstoned(schedules) - } - } - } } private class TestContext { - def createSchedules( - numSchedules: Int, - forTopics: List[String], - expireTime: Long = 4 - ): List[(ScheduleId, ScheduleEvent)] = + def createSchedules(numSchedules: Int, forTopics: List[String]): List[(ScheduleId, ScheduleEvent)] = random[(ScheduleId, ScheduleEvent)](numSchedules).toList .zip(LazyList.continually(forTopics.to(LazyList)).flatten.take(numSchedules).toList) .map { case ((id, schedule), topic) => - id -> schedule.copy(inputTopic = topic).secondsFromNow(expireTime) + id -> schedule.copy(inputTopic = topic).secondsFromNow(4) } def publish: List[(ScheduleId, ScheduleEvent)] => List[OffsetDateTime] = _.map { case (id, scheduleEvent) => diff --git a/scheduler/src/test/scala/com/sky/kms/utils/MockGauge.scala b/scheduler/src/test/scala/com/sky/kms/utils/MockGauge.scala index 7054571e..7bcaf55b 100644 --- a/scheduler/src/test/scala/com/sky/kms/utils/MockGauge.scala +++ b/scheduler/src/test/scala/com/sky/kms/utils/MockGauge.scala @@ -1,15 +1,15 @@ -package com.sky.kms.utils - +package com.sky.kms.utils + import java.util.concurrent.atomic.AtomicLong -import com.sky.kms.monitoring.ScheduleGauge - -class MockGauge extends ScheduleGauge { - - val counter: AtomicLong = new AtomicLong() - - override def onUpdate(): Unit = counter.incrementAndGet() - - override def onDelete(): Unit = counter.decrementAndGet() - -} +import com.sky.kms.monitoring.ScheduleGauge + +class MockGauge extends ScheduleGauge { + + val counter: AtomicLong = new AtomicLong() + + override def onUpdate(): Unit = counter.incrementAndGet() + + override def onDelete(): Unit = counter.decrementAndGet() + +} diff --git a/scheduler/src/test/scala/com/sky/kms/utils/SimpleCounterMonitoring.scala b/scheduler/src/test/scala/com/sky/kms/utils/SimpleCounterMonitoring.scala deleted file mode 100644 index d3520924..00000000 --- a/scheduler/src/test/scala/com/sky/kms/utils/SimpleCounterMonitoring.scala +++ /dev/null @@ -1,16 +0,0 @@ -package com.sky.kms.utils - -import java.util.concurrent.atomic.AtomicLong - -import com.sky.kms.monitoring.Monitoring - -class SimpleCounterMonitoring extends Monitoring { - val scheduleReceivedCounter = new AtomicLong(0) - - val scheduleDoneCounter = new AtomicLong(0) - - override def scheduleReceived(): Unit = scheduleReceivedCounter.incrementAndGet() - - override def scheduleDone(): Unit = scheduleDoneCounter.incrementAndGet() - -}