Skip to content

Commit

Permalink
Cleanup code for server resources
Browse files Browse the repository at this point in the history
  • Loading branch information
ivan-klass committed Jan 15, 2025
1 parent b0bed91 commit 84ea3d5
Show file tree
Hide file tree
Showing 13 changed files with 33 additions and 48 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,25 +16,22 @@ import org.http4s._
import org.slf4j.LoggerFactory
import org.typelevel.ci.CIString
import scodec.bits.ByteVector
import sttp.tapir.client.tests.HttpServer._

import scala.concurrent.ExecutionContext

object HttpServer {
object HttpServer extends ResourceApp.Forever {
type Port = Int

def main(args: Array[String]): Unit = {
def run(args: List[String]): Resource[IO, Unit] = {
val port = args.headOption.map(_.toInt).getOrElse(51823)
new HttpServer(port).start()
new HttpServer(port).build.void
}
}

class HttpServer(port: Port) {
class HttpServer(port: HttpServer.Port) {

private val logger = LoggerFactory.getLogger(getClass)

private var stopServer: IO[Unit] = _

//

private object numParam extends QueryParamDecoderMatcher[Int]("num")
Expand Down Expand Up @@ -75,7 +72,7 @@ class HttpServer(port: Port) {
case r @ POST -> Root / "api" / "echo" / "multipart" =>
r.decode[multipart.Multipart[IO]] { mp =>
val parts: Vector[multipart.Part[IO]] = mp.parts
def toString(s: fs2.Stream[IO, Byte]): IO[String] = s.through(fs2.text.utf8Decode).compile.foldMonoid
def toString(s: fs2.Stream[IO, Byte]): IO[String] = s.through(fs2.text.utf8.decode).compile.foldMonoid
def partToString(name: String): IO[String] = parts.find(_.name.contains(name)).map(p => toString(p.body)).getOrElse(IO.pure(""))
partToString("fruit").product(partToString("amount")).flatMap { case (fruit, amount) =>
Ok(s"$fruit=$amount")
Expand Down Expand Up @@ -212,23 +209,11 @@ class HttpServer(port: Port) {

//

def start(): Unit = {
val (_, _stopServer) = BlazeServerBuilder[IO]
def build: Resource[IO, server.Server] = BlazeServerBuilder[IO]
.withExecutionContext(ExecutionContext.global)
.bindHttp(port)
.withHttpWebSocketApp(app)
.resource
.map(_.address.getPort)
.allocated
.unsafeRunSync()

stopServer = _stopServer

logger.info(s"Server on port $port started")
}

def close(): Unit = {
stopServer.unsafeRunSync()
logger.info(s"Server on port $port stopped")
}
.evalTap(_ => IO(logger.info(s"Server on port $port started")))
.onFinalize(IO(logger.info(s"Server on port $port stopped")))
}
3 changes: 1 addition & 2 deletions doc/tutorials/07_cats_effect.md
Original file line number Diff line number Diff line change
Expand Up @@ -230,8 +230,7 @@ object HelloWorldTapir extends IOApp:
.bindHttp(8080, "localhost")
.withHttpApp(Router("/" -> allRoutes).orNotFound)
.resource
.use(_ => IO.never)
.as(ExitCode.Success)
.useForever
```

Hence, we first generate endpoint descriptions, which correspond to exposing the Swagger UI (containing the generated
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,4 @@ object ProxyHttp4sFs2Server extends IOApp:
.bindHttp(8080, "localhost")
.withHttpApp(Router("/" -> routes).orNotFound)
.resource
} yield ())
.use { _ => IO.never }
.as(ExitCode.Success)
} yield ()).useForever
Original file line number Diff line number Diff line change
Expand Up @@ -56,5 +56,4 @@ object StreamingHttp4sFs2ServerOrError extends IOApp:
.bindHttp(8080, "localhost")
.withHttpApp(Router("/" -> userDataRoutes).orNotFound)
.resource
.use { _ => IO.never }
.as(ExitCode.Success)
.useForever
3 changes: 1 addition & 2 deletions generated-doc/out/tutorials/07_cats_effect.md
Original file line number Diff line number Diff line change
Expand Up @@ -230,8 +230,7 @@ object HelloWorldTapir extends IOApp:
.bindHttp(8080, "localhost")
.withHttpApp(Router("/" -> allRoutes).orNotFound)
.resource
.use(_ => IO.never)
.as(ExitCode.Success)
.useForever
```

Hence, we first generate endpoint descriptions, which correspond to exposing the Swagger UI (containing the generated
Expand Down
8 changes: 3 additions & 5 deletions perf-tests/src/main/scala/sttp/tapir/perf/http4s/Http4s.scala
Original file line number Diff line number Diff line change
Expand Up @@ -115,11 +115,9 @@ object server {
.withMaxConnections(maxConnections)
.withConnectorPoolSize(connectorPoolSize)
.resource
.allocated
.map(_._2)
.map(_.flatTap { _ =>
IO.println("Http4s server closed.")
})
.useForever
.start
.map(_.cancel *> IO.println("Http4s server closed."))
}

object TapirServer extends ServerRunner { override def start = server.runServer(Tapir.router(1)) }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,12 +109,12 @@ class Http4sServerTest[R >: Fs2Streams[IO] with WebSockets] extends TestSuite wi
endpoint.out(streamBinaryBody(Fs2Streams[IO])(CodecFormat.OctetStream())),
"streaming should send data according to producer stream rate"
)((_: Unit) =>
IO(Right(fs2.Stream.awakeEvery[IO](1.second).map(_.toString()).through(fs2.text.utf8Encode).interruptAfter(10.seconds)))
IO(Right(fs2.Stream.awakeEvery[IO](1.second).map(_.toString()).through(fs2.text.utf8.encode).interruptAfter(10.seconds)))
) { (backend, baseUri) =>
basicRequest
.response(
asStream(Fs2Streams[IO])(bs => {
bs.through(fs2.text.utf8Decode).mapAccumulate(0)((pings, currentTime) => (pings + 1, currentTime)).compile.last
bs.through(fs2.text.utf8.decode).mapAccumulate(0)((pings, currentTime) => (pings + 1, currentTime)).compile.last
})
)
.get(baseUri)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@ import sttp.tapir.server.tests.TestServerInterpreter
import sttp.tapir.tests._
import sttp.tapir.ztapir.ZServerEndpoint
import zio.{Runtime, Task, Unsafe}
import zio.interop._
import zio.interop.catz._
import zio.interop.catz.implicits._

import scala.concurrent.ExecutionContext
import scala.concurrent.duration.FiniteDuration
Expand All @@ -27,7 +29,6 @@ object ZHttp4sTestServerInterpreter {
}

class ZHttp4sTestServerInterpreter extends TestServerInterpreter[Task, ZioStreams with WebSockets, ServerOptions, Routes] {
implicit val ec: ExecutionContext = scala.concurrent.ExecutionContext.Implicits.global

override def route(es: List[ZServerEndpoint[Any, ZioStreams with WebSockets]], interceptors: Interceptors): Routes = {
val serverOptions: ServerOptions = interceptors(Http4sServerOptions.customiseInterceptors[Task]).options
Expand All @@ -49,7 +50,7 @@ class ZHttp4sTestServerInterpreter extends TestServerInterpreter[Task, ZioStream
.map(_.address.getPort)
.mapK(new ~>[Task, IO] {
// Converting a ZIO effect to an Cats Effect IO effect
def apply[B](fa: Task[B]): IO[B] = IO.fromFuture(Unsafe.unsafe(implicit u => IO(Runtime.default.unsafe.runToFuture(fa))))
def apply[B](fa: Task[B]): IO[B] = fa.toEffect[IO]
})
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,9 @@ class DefaultCreateServerTest[F[_], +R, OPTIONS, ROUTE](
Test(name)(
resources
.use { port =>
runTest(backend, uri"http://localhost:$port").guarantee(IO(logger.info(s"Tests completed on port $port")))
runTest(backend, uri"http://localhost:$port").guaranteeCase(exitCase =>
IO(logger.info(s"Test on port $port: ${exitCase.getClass.getSimpleName}"))
)
}
.unsafeToFuture()
)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package sttp.tapir.server.tests

import cats.data.NonEmptyList
import cats.effect.{IO, Resource}
import cats.effect.{Deferred, IO, Resource}
import sttp.tapir.server.ServerEndpoint
import sttp.tapir.server.interceptor.CustomiseInterceptors
import sttp.tapir.tests._
Expand All @@ -21,8 +21,12 @@ trait TestServerInterpreter[F[_], +R, OPTIONS, ROUTE] {
def serverWithStop(
routes: NonEmptyList[ROUTE],
gracefulShutdownTimeout: Option[FiniteDuration] = None
): Resource[IO, (Port, KillSwitch)] =
Resource.eval(server(routes, gracefulShutdownTimeout).allocated)
): Resource[IO, (Port, KillSwitch)] = for {
stopSignal <- Resource.eval(Deferred[IO, Unit])
portValue <- Resource.eval(Deferred[IO, Port])
_ <- server(routes, gracefulShutdownTimeout).use(port => portValue.complete(port) *> stopSignal.get).background
port <- Resource.eval(portValue.get)
} yield (port, stopSignal.complete(()).void)

def server(routes: NonEmptyList[ROUTE], gracefulShutdownTimeout: Option[FiniteDuration] = None): Resource[IO, Port]
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import sttp.tapir.server.tests.{CreateServerStubTest, ServerStubStreamingTest, S
import scala.concurrent.Future

class VertxCatsCreateServerStubTest extends CreateServerStubTest[IO, VertxCatsServerOptions[IO]] {
private val (dispatcher, shutdownDispatcher) = Dispatcher[IO].allocated.unsafeRunSync()
private val (dispatcher, shutdownDispatcher) = Dispatcher.sequential[IO].allocated.unsafeRunSync()

override def customiseInterceptors: CustomiseInterceptors[IO, VertxCatsServerOptions[IO]] =
VertxCatsServerOptions.customiseInterceptors(dispatcher)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import scala.concurrent.duration._

class Fs2StreamTest extends AsyncFlatSpec with Matchers with BeforeAndAfterAll {

private val (dispatcher, shutdownDispatcher) = Dispatcher[IO].allocated.unsafeRunSync()
private val (dispatcher, shutdownDispatcher) = Dispatcher.sequential[IO].allocated.unsafeRunSync()

override protected def afterAll(): Unit = {
shutdownDispatcher.unsafeRunSync()
Expand Down
2 changes: 1 addition & 1 deletion tests/src/main/scalajvm/sttp/tapir/tests/TestSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ trait TestSuite extends AsyncFunSuite with BeforeAndAfterAll {
def tests: Resource[IO, List[Test]]
def testNameFilter: Option[String] = None // define to run a single test (temporarily for debugging)

protected val (dispatcher, shutdownDispatcher) = Dispatcher[IO].allocated.unsafeRunSync()
protected val (dispatcher, shutdownDispatcher) = Dispatcher.sequential[IO].allocated.unsafeRunSync()

// we need to register the tests when the class is constructed, as otherwise scalatest skips it
val (allTests, doRelease) = tests.allocated.unsafeRunSync()
Expand Down

0 comments on commit 84ea3d5

Please sign in to comment.