diff --git a/scheduler-3/src/main/scala/uk/sky/scheduler/Main.scala b/scheduler-3/src/main/scala/uk/sky/scheduler/Main.scala new file mode 100644 index 00000000..d9cc33ac --- /dev/null +++ b/scheduler-3/src/main/scala/uk/sky/scheduler/Main.scala @@ -0,0 +1,16 @@ +package uk.sky.scheduler + +import cats.effect.* +import fs2.Stream + +object Main extends IOApp.Simple { + + def stream[F[_] : Concurrent]: Stream[F, Unit] = + for { + scheduler <- Stream.resource(Scheduler.live[F]) + message <- scheduler.stream + } yield message + + override def run: IO[Unit] = stream[IO].compile.drain + +} diff --git a/scheduler-3/src/main/scala/uk/sky/scheduler/Scheduler.scala b/scheduler-3/src/main/scala/uk/sky/scheduler/Scheduler.scala index bd2a5b9a..3f95a9d2 100644 --- a/scheduler-3/src/main/scala/uk/sky/scheduler/Scheduler.scala +++ b/scheduler-3/src/main/scala/uk/sky/scheduler/Scheduler.scala @@ -1,6 +1,6 @@ package uk.sky.scheduler -import cats.effect.Concurrent +import cats.effect.* import fs2.Stream import uk.sky.scheduler.domain.ScheduleEvent import uk.sky.scheduler.message.Message @@ -23,3 +23,13 @@ class Scheduler[F[_] : Concurrent, O]( scheduleEvents.drain .merge(scheduleQueue.schedules.through(schedulePublisher.publish)) } + +object Scheduler { + def live[F[_] : Concurrent]: Resource[F, Scheduler[F, Unit]] = + for { + eventSubscriber <- Resource.pure(??? : EventSubscriber[F]) + scheduleQueue <- Resource.pure(??? : ScheduleQueue[F]) + schedulePublisher <- Resource.pure(??? : SchedulePublisher[F, Unit]) + } yield Scheduler(eventSubscriber, scheduleQueue, schedulePublisher) + +} diff --git a/scheduler-3/src/main/scala/uk/sky/scheduler/error/ScheduleError.scala b/scheduler-3/src/main/scala/uk/sky/scheduler/error/ScheduleError.scala index ca3a2849..dc499c17 100644 --- a/scheduler-3/src/main/scala/uk/sky/scheduler/error/ScheduleError.scala +++ b/scheduler-3/src/main/scala/uk/sky/scheduler/error/ScheduleError.scala @@ -6,10 +6,12 @@ import org.apache.avro.Schema enum ScheduleError(val message: String, val cause: Throwable) extends Throwable(message, cause) { case InvalidAvroError(schema: Schema, error: Throwable) - extends ScheduleError(s"Avro message did not conform to Schema: ${schema.getFullName}: $schema", error) + extends ScheduleError(show"Avro message did not conform to Schema: ${schema.getFullName}: $schema", error) } object ScheduleError { given Eq[ScheduleError] = Eq.fromUniversalEquals given Show[ScheduleError] = _.message + + private given Show[Schema] = _.toString() } diff --git a/scheduler-3/src/main/scala/uk/sky/scheduler/message/Message.scala b/scheduler-3/src/main/scala/uk/sky/scheduler/message/Message.scala index b9d64419..0943070d 100644 --- a/scheduler-3/src/main/scala/uk/sky/scheduler/message/Message.scala +++ b/scheduler-3/src/main/scala/uk/sky/scheduler/message/Message.scala @@ -5,15 +5,11 @@ import cats.{Eq, Functor, Show} import monocle.syntax.all.* final case class Message[V](key: String, source: String, value: V, metadata: Metadata) { - def map[B](f: V => B): Message[B] = this.copy(value = f(this.value)) + def transform[B](f: V => B): Message[B] = this.copy(value = f(this.value)) } object Message { - extension [T](message: Message[T]) { - def isExpired: Boolean = message.metadata.isExpired - def expire: Message[T] = - message.focus(_.metadata).modify(_.transform(_ + (Metadata.expiredKey -> Metadata.expiredValue))) - } + extension [T](message: Message[T]) def expire: Message[T] = message.focus(_.metadata).modify(_.expire) given [V : Eq]: Eq[Message[V]] = Eq.by { case Message(key, source, value, metadata) => (key, source, value, metadata) @@ -24,7 +20,6 @@ object Message { } given Functor[Message] = new Functor[Message] { - override def map[A, B](fa: Message[A])(f: A => B): Message[B] = - fa.map(f) + override def map[A, B](fa: Message[A])(f: A => B): Message[B] = fa.transform(f) } } diff --git a/scheduler-3/src/main/scala/uk/sky/scheduler/message/Metadata.scala b/scheduler-3/src/main/scala/uk/sky/scheduler/message/Metadata.scala index 37cee1bc..05092a1a 100644 --- a/scheduler-3/src/main/scala/uk/sky/scheduler/message/Metadata.scala +++ b/scheduler-3/src/main/scala/uk/sky/scheduler/message/Metadata.scala @@ -1,5 +1,6 @@ package uk.sky.scheduler.message +import cats.syntax.all.* import cats.{Eq, Monoid, Show} import org.typelevel.ci.CIString @@ -11,6 +12,8 @@ object Metadata { case other => other.toMap } + val empty: Metadata = Map.empty[CIString, String] + extension (metadata: Metadata) { inline def value: Map[CIString, String] = metadata inline def toMap: Map[String, String] = metadata.map(_.toString -> _) @@ -18,18 +21,20 @@ object Metadata { inline def combine(other: Metadata): Metadata = metadata.concat(other) inline def isExpired: Boolean = metadata.get(expiredKey).exists(_.equalsIgnoreCase(expiredValue)) + inline def expire: Metadata = metadata + (expiredKey -> expiredValue) } - val expiredKey = CIString("schedule:expired") - val expiredValue = "true" - - val empty: Metadata = Map.empty[CIString, String] - given Monoid[Metadata] = new Monoid[Metadata] { override def empty: Metadata = Metadata.empty override def combine(x: Metadata, y: Metadata): Metadata = x.combine(y) } - given Show[Metadata] = Show.catsShowForMap[CIString, String] - given Eq[Metadata] = Eq.catsKernelEqForMap[CIString, String] + given Show[Metadata] = + _.map((k, v) => show"$k: $v") + .mkString("Metadata(", ", ", ")") + + given Eq[Metadata] = Eq.catsKernelEqForMap[CIString, String] + + private val expiredKey = CIString("schedule:expired") + private val expiredValue = "true" }