Skip to content

Commit

Permalink
Add main, clean up some domain classes
Browse files Browse the repository at this point in the history
  • Loading branch information
bcarter97 committed Jan 11, 2025
1 parent 11ab9e2 commit c54da58
Show file tree
Hide file tree
Showing 5 changed files with 45 additions and 17 deletions.
16 changes: 16 additions & 0 deletions scheduler-3/src/main/scala/uk/sky/scheduler/Main.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package uk.sky.scheduler

import cats.effect.*
import fs2.Stream

object Main extends IOApp.Simple {

def stream[F[_] : Concurrent]: Stream[F, Unit] =
for {
scheduler <- Stream.resource(Scheduler.live[F])
message <- scheduler.stream
} yield message

override def run: IO[Unit] = stream[IO].compile.drain

}
12 changes: 11 additions & 1 deletion scheduler-3/src/main/scala/uk/sky/scheduler/Scheduler.scala
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package uk.sky.scheduler

import cats.effect.Concurrent
import cats.effect.*
import fs2.Stream
import uk.sky.scheduler.domain.ScheduleEvent
import uk.sky.scheduler.message.Message
Expand All @@ -23,3 +23,13 @@ class Scheduler[F[_] : Concurrent, O](
scheduleEvents.drain
.merge(scheduleQueue.schedules.through(schedulePublisher.publish))
}

object Scheduler {
def live[F[_] : Concurrent]: Resource[F, Scheduler[F, Unit]] =
for {
eventSubscriber <- Resource.pure(??? : EventSubscriber[F])
scheduleQueue <- Resource.pure(??? : ScheduleQueue[F])
schedulePublisher <- Resource.pure(??? : SchedulePublisher[F, Unit])
} yield Scheduler(eventSubscriber, scheduleQueue, schedulePublisher)

}
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,12 @@ import org.apache.avro.Schema

enum ScheduleError(val message: String, val cause: Throwable) extends Throwable(message, cause) {
case InvalidAvroError(schema: Schema, error: Throwable)
extends ScheduleError(s"Avro message did not conform to Schema: ${schema.getFullName}: $schema", error)
extends ScheduleError(show"Avro message did not conform to Schema: ${schema.getFullName}: $schema", error)
}

object ScheduleError {
given Eq[ScheduleError] = Eq.fromUniversalEquals
given Show[ScheduleError] = _.message

private given Show[Schema] = _.toString()
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,11 @@ import cats.{Eq, Functor, Show}
import monocle.syntax.all.*

final case class Message[V](key: String, source: String, value: V, metadata: Metadata) {
def map[B](f: V => B): Message[B] = this.copy(value = f(this.value))
def transform[B](f: V => B): Message[B] = this.copy(value = f(this.value))
}

object Message {
extension [T](message: Message[T]) {
def isExpired: Boolean = message.metadata.isExpired
def expire: Message[T] =
message.focus(_.metadata).modify(_.transform(_ + (Metadata.expiredKey -> Metadata.expiredValue)))
}
extension [T](message: Message[T]) def expire: Message[T] = message.focus(_.metadata).modify(_.expire)

given [V : Eq]: Eq[Message[V]] = Eq.by { case Message(key, source, value, metadata) =>
(key, source, value, metadata)
Expand All @@ -24,7 +20,6 @@ object Message {
}

given Functor[Message] = new Functor[Message] {
override def map[A, B](fa: Message[A])(f: A => B): Message[B] =
fa.map(f)
override def map[A, B](fa: Message[A])(f: A => B): Message[B] = fa.transform(f)
}
}
19 changes: 12 additions & 7 deletions scheduler-3/src/main/scala/uk/sky/scheduler/message/Metadata.scala
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package uk.sky.scheduler.message

import cats.syntax.all.*
import cats.{Eq, Monoid, Show}
import org.typelevel.ci.CIString

Expand All @@ -11,25 +12,29 @@ object Metadata {
case other => other.toMap
}

val empty: Metadata = Map.empty[CIString, String]

extension (metadata: Metadata) {
inline def value: Map[CIString, String] = metadata
inline def toMap: Map[String, String] = metadata.map(_.toString -> _)
inline def transform(f: Map[CIString, String] => Map[CIString, String]): Metadata = f(metadata)
inline def combine(other: Metadata): Metadata = metadata.concat(other)

inline def isExpired: Boolean = metadata.get(expiredKey).exists(_.equalsIgnoreCase(expiredValue))
inline def expire: Metadata = metadata + (expiredKey -> expiredValue)
}

val expiredKey = CIString("schedule:expired")
val expiredValue = "true"

val empty: Metadata = Map.empty[CIString, String]

given Monoid[Metadata] = new Monoid[Metadata] {
override def empty: Metadata = Metadata.empty
override def combine(x: Metadata, y: Metadata): Metadata = x.combine(y)
}

given Show[Metadata] = Show.catsShowForMap[CIString, String]
given Eq[Metadata] = Eq.catsKernelEqForMap[CIString, String]
given Show[Metadata] =
_.map((k, v) => show"$k: $v")
.mkString("Metadata(", ", ", ")")

given Eq[Metadata] = Eq.catsKernelEqForMap[CIString, String]

private val expiredKey = CIString("schedule:expired")
private val expiredValue = "true"
}

0 comments on commit c54da58

Please sign in to comment.