From dde904c9b4565f24c77a969f301ad5cbcf27d894 Mon Sep 17 00:00:00 2001 From: Sergiusz Kierat Date: Tue, 31 Dec 2024 16:32:54 +0100 Subject: [PATCH] Revert "wip" This reverts commit 02ba4b01925ef4a60064330838de4f0c414d2a88. --- .../streaming/longLastingClient.scala | 34 +++--- .../tapir/examples/streaming/playServer.scala | 111 +++++++++--------- 2 files changed, 67 insertions(+), 78 deletions(-) diff --git a/examples/src/main/scala/sttp/tapir/examples/streaming/longLastingClient.scala b/examples/src/main/scala/sttp/tapir/examples/streaming/longLastingClient.scala index 89f99739e8..8ae1764872 100644 --- a/examples/src/main/scala/sttp/tapir/examples/streaming/longLastingClient.scala +++ b/examples/src/main/scala/sttp/tapir/examples/streaming/longLastingClient.scala @@ -1,10 +1,8 @@ //> using dep com.softwaremill.sttp.tapir::tapir-core:1.11.11 -//> using dep com.softwaremill.sttp.tapir::tapir-netty-server-cats:1.11.11 //> using dep org.apache.pekko::pekko-stream:1.1.2 //> using dep org.typelevel::cats-effect:3.5.7 -//> using dep com.softwaremill.sttp.client3::core:3.10.2 +//> using dep com.softwaremill.sttp.client3::core:3.10.1 //> using dep com.softwaremill.sttp.client3::pekko-http-backend:3.10.1 -//> using dep com.softwaremill.sttp.client3::fs2:3.10.2 package sttp.tapir.examples.streaming @@ -23,31 +21,27 @@ import pekko.stream.scaladsl.{Flow, Source} import pekko.util.ByteString import cats.effect.* import cats.syntax.all.* -import sttp.client3.httpclient.fs2.HttpClientFs2Backend + import scala.concurrent.duration.* import scala.concurrent.duration.FiniteDuration -import sttp.capabilities.fs2.Fs2Streams -import fs2.{Chunk, Stream} object longLastingClient extends IOApp: - private def makeRequest(backend: SttpBackend[IO, Fs2Streams[IO] & WebSockets]): IO[Response[Either[String, String]]] = - def createStream(chunkSize: Int, beforeSendingSecondChunk: FiniteDuration): Stream[IO, Byte] = { - val chunk = Chunk.array(Array.fill(chunkSize)('A'.toByte)) - val initialChunks = Stream.chunk(chunk) - val delayedChunk = Stream.sleep[IO](beforeSendingSecondChunk) >> Stream.chunk(chunk) - initialChunks ++ delayedChunk + implicit val actorSystem: ActorSystem = ActorSystem("longLastingClient") + + private def makeRequest(backend: SttpBackend[Future, PekkoStreams & WebSockets]): Future[Response[Either[String, String]]] = + val stream: Source[ByteString, Any] = Source.tick(1.seconds, 1.seconds, ByteString(Array.fill(10)('A').map(_.toByte))).map { elem => + println(s"$elem ${java.time.LocalTime.now()}"); elem } - val stream = createStream(100, 2.seconds) - + basicRequest .post(uri"http://localhost:9000/chunks") .header(Header(HeaderNames.ContentLength, "10000")) - .streamBody(Fs2Streams[IO])(stream) + .streamBody(PekkoStreams)(stream) .send(backend) - + override def run(args: List[String]): IO[ExitCode] = - HttpClientFs2Backend.resource[IO]().use { backend => - makeRequest(backend).flatMap { response => + val backend = PekkoHttpBackend.usingActorSystem(actorSystem) + val responseIO: IO[Response[Either[String, String]]] = IO.fromFuture(IO(makeRequest(backend))) + responseIO.flatMap { response => IO(println(response.body)) - } - }.as(ExitCode.Success) \ No newline at end of file + }.as(ExitCode.Success) \ No newline at end of file diff --git a/examples/src/main/scala/sttp/tapir/examples/streaming/playServer.scala b/examples/src/main/scala/sttp/tapir/examples/streaming/playServer.scala index cf2e992a8b..c3df7ce830 100644 --- a/examples/src/main/scala/sttp/tapir/examples/streaming/playServer.scala +++ b/examples/src/main/scala/sttp/tapir/examples/streaming/playServer.scala @@ -1,11 +1,12 @@ //> using dep com.softwaremill.sttp.tapir::tapir-core:1.11.11 //> using dep com.softwaremill.sttp.tapir::tapir-play-server:1.11.11 -//> using dep com.softwaremill.sttp.tapir::tapir-netty-server-cats:1.11.11 //> using dep org.playframework::play-netty-server:3.0.6 -//> using dep com.softwaremill.sttp.client3::core:3.10.2 +//> using dep com.softwaremill.sttp.client3::core:3.10.1 package sttp.tapir.examples.streaming +import play.core.server.* +import play.api.routing.Router.Routes import org.apache.pekko.actor.ActorSystem import org.apache.pekko.stream.Materializer import sttp.capabilities.pekko.PekkoStreams @@ -17,23 +18,11 @@ import scala.concurrent.ExecutionContext.Implicits.global import scala.concurrent.Future import sttp.model.{HeaderNames, MediaType, Part, StatusCode} import sttp.tapir.* - import scala.concurrent.{ExecutionContext, Future} import scala.util.* import org.apache.pekko import pekko.stream.scaladsl.{Flow, Source} import pekko.util.ByteString -import sttp.client3.UriContext -import sttp.tapir.server.netty.cats.NettyCatsServer -import sttp.tapir.server.netty.NettyConfig -import scala.concurrent.duration.DurationInt -import cats.effect.{IO, Resource} -import cats.effect.std.Dispatcher -import scala.concurrent.duration.FiniteDuration -import fs2.{Chunk, Stream} -import cats.effect.unsafe.implicits.global - -import sttp.capabilities.fs2.Fs2Streams given ExecutionContext = ExecutionContext.global @@ -41,37 +30,30 @@ type ErrorInfo = String implicit val actorSystem: ActorSystem = ActorSystem("playServer") -val givenRequestTimeout = 2.seconds -val chunkSize = 100 -val beforeSendingSecondChunk: FiniteDuration = 2.second - -def createStream(chunkSize: Int, beforeSendingSecondChunk: FiniteDuration): fs2.Stream[IO, Byte] = { - val chunk = Chunk.array(Array.fill(chunkSize)('A'.toByte)) - val initialChunks = fs2.Stream.chunk(chunk) - val delayedChunk = fs2.Stream.sleep[IO](beforeSendingSecondChunk) >> fs2.Stream.chunk(chunk) - initialChunks ++ delayedChunk +def handleErrors[T](f: Future[T]): Future[Either[ErrorInfo, T]] = + f.transform { + case Success(v) => Success(Right(v)) + case Failure(e) => + println(s"Exception when running endpoint logic: $e") + Success(Left(e.getMessage)) + } + +def logic(s: (Long, Source[ByteString, Any])): Future[(Long, Source[ByteString, Any])] = { + val (length, stream) = s + println(s"Received $length bytes, ${stream.map(_.length)} bytes in total") + Future.successful((length, stream)) } -val inputStream = createStream(chunkSize, beforeSendingSecondChunk) - val e = endpoint.post .in("chunks") .in(header[Long](HeaderNames.ContentLength)) - .in(streamTextBody(Fs2Streams[IO])(CodecFormat.TextPlain())) + .in(streamTextBody(PekkoStreams)(CodecFormat.TextPlain())) .out(header[Long](HeaderNames.ContentLength)) - .out(streamTextBody(Fs2Streams[IO])(CodecFormat.TextPlain())) - .serverLogicSuccess[IO] { case (length, stream) => - IO((length, stream)) - } + .out(streamTextBody(PekkoStreams)(CodecFormat.TextPlain())) + .errorOut(plainBody[ErrorInfo]) + .serverLogic((logic _).andThen(handleErrors)) -val config = - NettyConfig.default - .host("0.0.0.0") - .port(9000) - .requestTimeout(givenRequestTimeout) - - -//val routes = PlayServerInterpreter().toRoutes(e) +val routes = PlayServerInterpreter().toRoutes(e) @main def playServer(): Unit = import play.api.Configuration @@ -81,23 +63,36 @@ val config = import java.io.File import java.util.Properties - println(s"Server is starting...") - - NettyCatsServer - .io(config) - .use { server => - for { - binding <- server - .addEndpoint(e) - .start() - result <- IO - .blocking { - val port = binding.port - val host = binding.hostName - println(s"Server started at port = ${binding.port}") - } - .guarantee(binding.stop()) - } yield result - }.unsafeRunSync() - - println(s"Server started at port ???") \ No newline at end of file + val customConfig = Configuration( + "play.server.http.idleTimeout" -> "75 seconds", + "play.server.https.idleTimeout" -> "75 seconds", + "play.server.https.wantClientAuth" -> false, + "play.server.https.needClientAuth" -> false, + "play.server.netty.server-header" -> null, + "play.server.netty.shutdownQuietPeriod" -> "2 seconds", + "play.server.netty.maxInitialLineLength" -> "4096", + "play.server.netty.maxChunkSize" -> "8192", + "play.server.netty.eventLoopThreads" -> "0", + "play.server.netty.transport" -> "jdk", + "play.server.max-header-size" -> "8k", + "play.server.waitBeforeTermination" -> "0", + "play.server.deferBodyParsing" -> false, + "play.server.websocket.frame.maxLength" -> "64k", + "play.server.websocket.periodic-keep-alive-mode" -> "ping", + "play.server.websocket.periodic-keep-alive-max-idle" -> "infinite", + "play.server.max-content-length" -> "infinite", + "play.server.netty.log.wire" -> true, + "play.server.netty.option.child.tcpNoDelay" -> true, + "play.server.pekko.requestTimeout" -> "5 seconds", + ) + val serverConfig = ServerConfig( + rootDir = new File("."), + port = Some(9000), + sslPort = Some(9443), + address = "0.0.0.0", + mode = Mode.Dev, + properties = System.getProperties, + configuration = customConfig + ) + + NettyServer.fromRouterWithComponents(serverConfig) { components => routes } \ No newline at end of file