From 84ea3d5a56ec6e9127b9ed041f9077500abc7629 Mon Sep 17 00:00:00 2001 From: Ivan Klass Date: Tue, 14 Jan 2025 18:53:39 +0100 Subject: [PATCH] Cleanup code for server resources --- .../sttp/tapir/client/tests/HttpServer.scala | 31 +++++-------------- doc/tutorials/07_cats_effect.md | 3 +- .../streaming/ProxyHttp4sFs2Server.scala | 4 +-- .../StreamingHttp4sFs2ServerOrError.scala | 3 +- generated-doc/out/tutorials/07_cats_effect.md | 3 +- .../scala/sttp/tapir/perf/http4s/Http4s.scala | 8 ++--- .../server/http4s/Http4sServerTest.scala | 4 +-- .../ztapir/ZHttp4sTestServerInterpreter.scala | 5 +-- .../tapir/server/tests/CreateServerTest.scala | 4 ++- .../server/tests/TestServerInterpreter.scala | 10 ++++-- .../vertx/cats/VertxStubServerTest.scala | 2 +- .../vertx/cats/streams/Fs2StreamTest.scala | 2 +- .../scalajvm/sttp/tapir/tests/TestSuite.scala | 2 +- 13 files changed, 33 insertions(+), 48 deletions(-) diff --git a/client/testserver/src/main/scala/sttp/tapir/client/tests/HttpServer.scala b/client/testserver/src/main/scala/sttp/tapir/client/tests/HttpServer.scala index 13b7303794..0a924675f8 100644 --- a/client/testserver/src/main/scala/sttp/tapir/client/tests/HttpServer.scala +++ b/client/testserver/src/main/scala/sttp/tapir/client/tests/HttpServer.scala @@ -16,25 +16,22 @@ import org.http4s._ import org.slf4j.LoggerFactory import org.typelevel.ci.CIString import scodec.bits.ByteVector -import sttp.tapir.client.tests.HttpServer._ import scala.concurrent.ExecutionContext -object HttpServer { +object HttpServer extends ResourceApp.Forever { type Port = Int - def main(args: Array[String]): Unit = { + def run(args: List[String]): Resource[IO, Unit] = { val port = args.headOption.map(_.toInt).getOrElse(51823) - new HttpServer(port).start() + new HttpServer(port).build.void } } -class HttpServer(port: Port) { +class HttpServer(port: HttpServer.Port) { private val logger = LoggerFactory.getLogger(getClass) - private var stopServer: IO[Unit] = _ - // private object numParam extends QueryParamDecoderMatcher[Int]("num") @@ -75,7 +72,7 @@ class HttpServer(port: Port) { case r @ POST -> Root / "api" / "echo" / "multipart" => r.decode[multipart.Multipart[IO]] { mp => val parts: Vector[multipart.Part[IO]] = mp.parts - def toString(s: fs2.Stream[IO, Byte]): IO[String] = s.through(fs2.text.utf8Decode).compile.foldMonoid + def toString(s: fs2.Stream[IO, Byte]): IO[String] = s.through(fs2.text.utf8.decode).compile.foldMonoid def partToString(name: String): IO[String] = parts.find(_.name.contains(name)).map(p => toString(p.body)).getOrElse(IO.pure("")) partToString("fruit").product(partToString("amount")).flatMap { case (fruit, amount) => Ok(s"$fruit=$amount") @@ -212,23 +209,11 @@ class HttpServer(port: Port) { // - def start(): Unit = { - val (_, _stopServer) = BlazeServerBuilder[IO] + def build: Resource[IO, server.Server] = BlazeServerBuilder[IO] .withExecutionContext(ExecutionContext.global) .bindHttp(port) .withHttpWebSocketApp(app) .resource - .map(_.address.getPort) - .allocated - .unsafeRunSync() - - stopServer = _stopServer - - logger.info(s"Server on port $port started") - } - - def close(): Unit = { - stopServer.unsafeRunSync() - logger.info(s"Server on port $port stopped") - } + .evalTap(_ => IO(logger.info(s"Server on port $port started"))) + .onFinalize(IO(logger.info(s"Server on port $port stopped"))) } diff --git a/doc/tutorials/07_cats_effect.md b/doc/tutorials/07_cats_effect.md index 1c774134c0..2024e2bc7d 100644 --- a/doc/tutorials/07_cats_effect.md +++ b/doc/tutorials/07_cats_effect.md @@ -230,8 +230,7 @@ object HelloWorldTapir extends IOApp: .bindHttp(8080, "localhost") .withHttpApp(Router("/" -> allRoutes).orNotFound) .resource - .use(_ => IO.never) - .as(ExitCode.Success) + .useForever ``` Hence, we first generate endpoint descriptions, which correspond to exposing the Swagger UI (containing the generated diff --git a/examples/src/main/scala/sttp/tapir/examples/streaming/ProxyHttp4sFs2Server.scala b/examples/src/main/scala/sttp/tapir/examples/streaming/ProxyHttp4sFs2Server.scala index fabda59b8d..f93d7b04c1 100644 --- a/examples/src/main/scala/sttp/tapir/examples/streaming/ProxyHttp4sFs2Server.scala +++ b/examples/src/main/scala/sttp/tapir/examples/streaming/ProxyHttp4sFs2Server.scala @@ -62,6 +62,4 @@ object ProxyHttp4sFs2Server extends IOApp: .bindHttp(8080, "localhost") .withHttpApp(Router("/" -> routes).orNotFound) .resource - } yield ()) - .use { _ => IO.never } - .as(ExitCode.Success) + } yield ()).useForever diff --git a/examples/src/main/scala/sttp/tapir/examples/streaming/StreamingHttp4sFs2ServerOrError.scala b/examples/src/main/scala/sttp/tapir/examples/streaming/StreamingHttp4sFs2ServerOrError.scala index 536848c5bc..1adbbabb0c 100644 --- a/examples/src/main/scala/sttp/tapir/examples/streaming/StreamingHttp4sFs2ServerOrError.scala +++ b/examples/src/main/scala/sttp/tapir/examples/streaming/StreamingHttp4sFs2ServerOrError.scala @@ -56,5 +56,4 @@ object StreamingHttp4sFs2ServerOrError extends IOApp: .bindHttp(8080, "localhost") .withHttpApp(Router("/" -> userDataRoutes).orNotFound) .resource - .use { _ => IO.never } - .as(ExitCode.Success) + .useForever diff --git a/generated-doc/out/tutorials/07_cats_effect.md b/generated-doc/out/tutorials/07_cats_effect.md index 53f4599e41..0a9896751d 100644 --- a/generated-doc/out/tutorials/07_cats_effect.md +++ b/generated-doc/out/tutorials/07_cats_effect.md @@ -230,8 +230,7 @@ object HelloWorldTapir extends IOApp: .bindHttp(8080, "localhost") .withHttpApp(Router("/" -> allRoutes).orNotFound) .resource - .use(_ => IO.never) - .as(ExitCode.Success) + .useForever ``` Hence, we first generate endpoint descriptions, which correspond to exposing the Swagger UI (containing the generated diff --git a/perf-tests/src/main/scala/sttp/tapir/perf/http4s/Http4s.scala b/perf-tests/src/main/scala/sttp/tapir/perf/http4s/Http4s.scala index 68eab44f21..e6fd81ee39 100644 --- a/perf-tests/src/main/scala/sttp/tapir/perf/http4s/Http4s.scala +++ b/perf-tests/src/main/scala/sttp/tapir/perf/http4s/Http4s.scala @@ -115,11 +115,9 @@ object server { .withMaxConnections(maxConnections) .withConnectorPoolSize(connectorPoolSize) .resource - .allocated - .map(_._2) - .map(_.flatTap { _ => - IO.println("Http4s server closed.") - }) + .useForever + .start + .map(_.cancel *> IO.println("Http4s server closed.")) } object TapirServer extends ServerRunner { override def start = server.runServer(Tapir.router(1)) } diff --git a/server/http4s-server/src/test/scala/sttp/tapir/server/http4s/Http4sServerTest.scala b/server/http4s-server/src/test/scala/sttp/tapir/server/http4s/Http4sServerTest.scala index 4a69014f2c..755718cfb5 100644 --- a/server/http4s-server/src/test/scala/sttp/tapir/server/http4s/Http4sServerTest.scala +++ b/server/http4s-server/src/test/scala/sttp/tapir/server/http4s/Http4sServerTest.scala @@ -109,12 +109,12 @@ class Http4sServerTest[R >: Fs2Streams[IO] with WebSockets] extends TestSuite wi endpoint.out(streamBinaryBody(Fs2Streams[IO])(CodecFormat.OctetStream())), "streaming should send data according to producer stream rate" )((_: Unit) => - IO(Right(fs2.Stream.awakeEvery[IO](1.second).map(_.toString()).through(fs2.text.utf8Encode).interruptAfter(10.seconds))) + IO(Right(fs2.Stream.awakeEvery[IO](1.second).map(_.toString()).through(fs2.text.utf8.encode).interruptAfter(10.seconds))) ) { (backend, baseUri) => basicRequest .response( asStream(Fs2Streams[IO])(bs => { - bs.through(fs2.text.utf8Decode).mapAccumulate(0)((pings, currentTime) => (pings + 1, currentTime)).compile.last + bs.through(fs2.text.utf8.decode).mapAccumulate(0)((pings, currentTime) => (pings + 1, currentTime)).compile.last }) ) .get(baseUri) diff --git a/server/http4s-server/zio/src/test/scala/sttp/tapir/server/http4s/ztapir/ZHttp4sTestServerInterpreter.scala b/server/http4s-server/zio/src/test/scala/sttp/tapir/server/http4s/ztapir/ZHttp4sTestServerInterpreter.scala index 9b46b6a198..389613fbb5 100644 --- a/server/http4s-server/zio/src/test/scala/sttp/tapir/server/http4s/ztapir/ZHttp4sTestServerInterpreter.scala +++ b/server/http4s-server/zio/src/test/scala/sttp/tapir/server/http4s/ztapir/ZHttp4sTestServerInterpreter.scala @@ -15,7 +15,9 @@ import sttp.tapir.server.tests.TestServerInterpreter import sttp.tapir.tests._ import sttp.tapir.ztapir.ZServerEndpoint import zio.{Runtime, Task, Unsafe} +import zio.interop._ import zio.interop.catz._ +import zio.interop.catz.implicits._ import scala.concurrent.ExecutionContext import scala.concurrent.duration.FiniteDuration @@ -27,7 +29,6 @@ object ZHttp4sTestServerInterpreter { } class ZHttp4sTestServerInterpreter extends TestServerInterpreter[Task, ZioStreams with WebSockets, ServerOptions, Routes] { - implicit val ec: ExecutionContext = scala.concurrent.ExecutionContext.Implicits.global override def route(es: List[ZServerEndpoint[Any, ZioStreams with WebSockets]], interceptors: Interceptors): Routes = { val serverOptions: ServerOptions = interceptors(Http4sServerOptions.customiseInterceptors[Task]).options @@ -49,7 +50,7 @@ class ZHttp4sTestServerInterpreter extends TestServerInterpreter[Task, ZioStream .map(_.address.getPort) .mapK(new ~>[Task, IO] { // Converting a ZIO effect to an Cats Effect IO effect - def apply[B](fa: Task[B]): IO[B] = IO.fromFuture(Unsafe.unsafe(implicit u => IO(Runtime.default.unsafe.runToFuture(fa)))) + def apply[B](fa: Task[B]): IO[B] = fa.toEffect[IO] }) } } diff --git a/server/tests/src/main/scala/sttp/tapir/server/tests/CreateServerTest.scala b/server/tests/src/main/scala/sttp/tapir/server/tests/CreateServerTest.scala index d1909c8185..aee0e69252 100644 --- a/server/tests/src/main/scala/sttp/tapir/server/tests/CreateServerTest.scala +++ b/server/tests/src/main/scala/sttp/tapir/server/tests/CreateServerTest.scala @@ -133,7 +133,9 @@ class DefaultCreateServerTest[F[_], +R, OPTIONS, ROUTE]( Test(name)( resources .use { port => - runTest(backend, uri"http://localhost:$port").guarantee(IO(logger.info(s"Tests completed on port $port"))) + runTest(backend, uri"http://localhost:$port").guaranteeCase(exitCase => + IO(logger.info(s"Test on port $port: ${exitCase.getClass.getSimpleName}")) + ) } .unsafeToFuture() ) diff --git a/server/tests/src/main/scala/sttp/tapir/server/tests/TestServerInterpreter.scala b/server/tests/src/main/scala/sttp/tapir/server/tests/TestServerInterpreter.scala index 8bb4548295..8a87c8c877 100644 --- a/server/tests/src/main/scala/sttp/tapir/server/tests/TestServerInterpreter.scala +++ b/server/tests/src/main/scala/sttp/tapir/server/tests/TestServerInterpreter.scala @@ -1,7 +1,7 @@ package sttp.tapir.server.tests import cats.data.NonEmptyList -import cats.effect.{IO, Resource} +import cats.effect.{Deferred, IO, Resource} import sttp.tapir.server.ServerEndpoint import sttp.tapir.server.interceptor.CustomiseInterceptors import sttp.tapir.tests._ @@ -21,8 +21,12 @@ trait TestServerInterpreter[F[_], +R, OPTIONS, ROUTE] { def serverWithStop( routes: NonEmptyList[ROUTE], gracefulShutdownTimeout: Option[FiniteDuration] = None - ): Resource[IO, (Port, KillSwitch)] = - Resource.eval(server(routes, gracefulShutdownTimeout).allocated) + ): Resource[IO, (Port, KillSwitch)] = for { + stopSignal <- Resource.eval(Deferred[IO, Unit]) + portValue <- Resource.eval(Deferred[IO, Port]) + _ <- server(routes, gracefulShutdownTimeout).use(port => portValue.complete(port) *> stopSignal.get).background + port <- Resource.eval(portValue.get) + } yield (port, stopSignal.complete(()).void) def server(routes: NonEmptyList[ROUTE], gracefulShutdownTimeout: Option[FiniteDuration] = None): Resource[IO, Port] } diff --git a/server/vertx-server/cats/src/test/scala/sttp/tapir/server/vertx/cats/VertxStubServerTest.scala b/server/vertx-server/cats/src/test/scala/sttp/tapir/server/vertx/cats/VertxStubServerTest.scala index bcea7b30a3..54c86129b8 100644 --- a/server/vertx-server/cats/src/test/scala/sttp/tapir/server/vertx/cats/VertxStubServerTest.scala +++ b/server/vertx-server/cats/src/test/scala/sttp/tapir/server/vertx/cats/VertxStubServerTest.scala @@ -12,7 +12,7 @@ import sttp.tapir.server.tests.{CreateServerStubTest, ServerStubStreamingTest, S import scala.concurrent.Future class VertxCatsCreateServerStubTest extends CreateServerStubTest[IO, VertxCatsServerOptions[IO]] { - private val (dispatcher, shutdownDispatcher) = Dispatcher[IO].allocated.unsafeRunSync() + private val (dispatcher, shutdownDispatcher) = Dispatcher.sequential[IO].allocated.unsafeRunSync() override def customiseInterceptors: CustomiseInterceptors[IO, VertxCatsServerOptions[IO]] = VertxCatsServerOptions.customiseInterceptors(dispatcher) diff --git a/server/vertx-server/cats/src/test/scala/sttp/tapir/server/vertx/cats/streams/Fs2StreamTest.scala b/server/vertx-server/cats/src/test/scala/sttp/tapir/server/vertx/cats/streams/Fs2StreamTest.scala index baadbe13f4..39966b12d2 100644 --- a/server/vertx-server/cats/src/test/scala/sttp/tapir/server/vertx/cats/streams/Fs2StreamTest.scala +++ b/server/vertx-server/cats/src/test/scala/sttp/tapir/server/vertx/cats/streams/Fs2StreamTest.scala @@ -18,7 +18,7 @@ import scala.concurrent.duration._ class Fs2StreamTest extends AsyncFlatSpec with Matchers with BeforeAndAfterAll { - private val (dispatcher, shutdownDispatcher) = Dispatcher[IO].allocated.unsafeRunSync() + private val (dispatcher, shutdownDispatcher) = Dispatcher.sequential[IO].allocated.unsafeRunSync() override protected def afterAll(): Unit = { shutdownDispatcher.unsafeRunSync() diff --git a/tests/src/main/scalajvm/sttp/tapir/tests/TestSuite.scala b/tests/src/main/scalajvm/sttp/tapir/tests/TestSuite.scala index a005d9bb08..7a5bf2ad65 100644 --- a/tests/src/main/scalajvm/sttp/tapir/tests/TestSuite.scala +++ b/tests/src/main/scalajvm/sttp/tapir/tests/TestSuite.scala @@ -11,7 +11,7 @@ trait TestSuite extends AsyncFunSuite with BeforeAndAfterAll { def tests: Resource[IO, List[Test]] def testNameFilter: Option[String] = None // define to run a single test (temporarily for debugging) - protected val (dispatcher, shutdownDispatcher) = Dispatcher[IO].allocated.unsafeRunSync() + protected val (dispatcher, shutdownDispatcher) = Dispatcher.sequential[IO].allocated.unsafeRunSync() // we need to register the tests when the class is constructed, as otherwise scalatest skips it val (allTests, doRelease) = tests.allocated.unsafeRunSync()