Skip to content

Commit

Permalink
Revert "wip"
Browse files Browse the repository at this point in the history
This reverts commit 02ba4b0.
  • Loading branch information
sergiuszkierat committed Dec 31, 2024
1 parent 02ba4b0 commit dde904c
Show file tree
Hide file tree
Showing 2 changed files with 67 additions and 78 deletions.
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
//> using dep com.softwaremill.sttp.tapir::tapir-core:1.11.11
//> using dep com.softwaremill.sttp.tapir::tapir-netty-server-cats:1.11.11
//> using dep org.apache.pekko::pekko-stream:1.1.2
//> using dep org.typelevel::cats-effect:3.5.7
//> using dep com.softwaremill.sttp.client3::core:3.10.2
//> using dep com.softwaremill.sttp.client3::core:3.10.1
//> using dep com.softwaremill.sttp.client3::pekko-http-backend:3.10.1
//> using dep com.softwaremill.sttp.client3::fs2:3.10.2

package sttp.tapir.examples.streaming

Expand All @@ -23,31 +21,27 @@ import pekko.stream.scaladsl.{Flow, Source}
import pekko.util.ByteString
import cats.effect.*
import cats.syntax.all.*
import sttp.client3.httpclient.fs2.HttpClientFs2Backend

import scala.concurrent.duration.*
import scala.concurrent.duration.FiniteDuration
import sttp.capabilities.fs2.Fs2Streams
import fs2.{Chunk, Stream}

object longLastingClient extends IOApp:
private def makeRequest(backend: SttpBackend[IO, Fs2Streams[IO] & WebSockets]): IO[Response[Either[String, String]]] =
def createStream(chunkSize: Int, beforeSendingSecondChunk: FiniteDuration): Stream[IO, Byte] = {
val chunk = Chunk.array(Array.fill(chunkSize)('A'.toByte))
val initialChunks = Stream.chunk(chunk)
val delayedChunk = Stream.sleep[IO](beforeSendingSecondChunk) >> Stream.chunk(chunk)
initialChunks ++ delayedChunk
implicit val actorSystem: ActorSystem = ActorSystem("longLastingClient")

private def makeRequest(backend: SttpBackend[Future, PekkoStreams & WebSockets]): Future[Response[Either[String, String]]] =
val stream: Source[ByteString, Any] = Source.tick(1.seconds, 1.seconds, ByteString(Array.fill(10)('A').map(_.toByte))).map { elem =>
println(s"$elem ${java.time.LocalTime.now()}"); elem
}
val stream = createStream(100, 2.seconds)


basicRequest
.post(uri"http://localhost:9000/chunks")
.header(Header(HeaderNames.ContentLength, "10000"))
.streamBody(Fs2Streams[IO])(stream)
.streamBody(PekkoStreams)(stream)
.send(backend)

override def run(args: List[String]): IO[ExitCode] =
HttpClientFs2Backend.resource[IO]().use { backend =>
makeRequest(backend).flatMap { response =>
val backend = PekkoHttpBackend.usingActorSystem(actorSystem)
val responseIO: IO[Response[Either[String, String]]] = IO.fromFuture(IO(makeRequest(backend)))
responseIO.flatMap { response =>
IO(println(response.body))
}
}.as(ExitCode.Success)
}.as(ExitCode.Success)
111 changes: 53 additions & 58 deletions examples/src/main/scala/sttp/tapir/examples/streaming/playServer.scala
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
//> using dep com.softwaremill.sttp.tapir::tapir-core:1.11.11
//> using dep com.softwaremill.sttp.tapir::tapir-play-server:1.11.11
//> using dep com.softwaremill.sttp.tapir::tapir-netty-server-cats:1.11.11
//> using dep org.playframework::play-netty-server:3.0.6
//> using dep com.softwaremill.sttp.client3::core:3.10.2
//> using dep com.softwaremill.sttp.client3::core:3.10.1

package sttp.tapir.examples.streaming

import play.core.server.*
import play.api.routing.Router.Routes
import org.apache.pekko.actor.ActorSystem
import org.apache.pekko.stream.Materializer
import sttp.capabilities.pekko.PekkoStreams
Expand All @@ -17,61 +18,42 @@ import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.Future
import sttp.model.{HeaderNames, MediaType, Part, StatusCode}
import sttp.tapir.*

import scala.concurrent.{ExecutionContext, Future}
import scala.util.*
import org.apache.pekko
import pekko.stream.scaladsl.{Flow, Source}
import pekko.util.ByteString
import sttp.client3.UriContext
import sttp.tapir.server.netty.cats.NettyCatsServer
import sttp.tapir.server.netty.NettyConfig
import scala.concurrent.duration.DurationInt
import cats.effect.{IO, Resource}
import cats.effect.std.Dispatcher
import scala.concurrent.duration.FiniteDuration
import fs2.{Chunk, Stream}
import cats.effect.unsafe.implicits.global

import sttp.capabilities.fs2.Fs2Streams

given ExecutionContext = ExecutionContext.global

type ErrorInfo = String

implicit val actorSystem: ActorSystem = ActorSystem("playServer")

val givenRequestTimeout = 2.seconds
val chunkSize = 100
val beforeSendingSecondChunk: FiniteDuration = 2.second

def createStream(chunkSize: Int, beforeSendingSecondChunk: FiniteDuration): fs2.Stream[IO, Byte] = {
val chunk = Chunk.array(Array.fill(chunkSize)('A'.toByte))
val initialChunks = fs2.Stream.chunk(chunk)
val delayedChunk = fs2.Stream.sleep[IO](beforeSendingSecondChunk) >> fs2.Stream.chunk(chunk)
initialChunks ++ delayedChunk
def handleErrors[T](f: Future[T]): Future[Either[ErrorInfo, T]] =
f.transform {
case Success(v) => Success(Right(v))
case Failure(e) =>
println(s"Exception when running endpoint logic: $e")
Success(Left(e.getMessage))
}

def logic(s: (Long, Source[ByteString, Any])): Future[(Long, Source[ByteString, Any])] = {
val (length, stream) = s
println(s"Received $length bytes, ${stream.map(_.length)} bytes in total")
Future.successful((length, stream))
}

val inputStream = createStream(chunkSize, beforeSendingSecondChunk)

val e = endpoint.post
.in("chunks")
.in(header[Long](HeaderNames.ContentLength))
.in(streamTextBody(Fs2Streams[IO])(CodecFormat.TextPlain()))
.in(streamTextBody(PekkoStreams)(CodecFormat.TextPlain()))
.out(header[Long](HeaderNames.ContentLength))
.out(streamTextBody(Fs2Streams[IO])(CodecFormat.TextPlain()))
.serverLogicSuccess[IO] { case (length, stream) =>
IO((length, stream))
}
.out(streamTextBody(PekkoStreams)(CodecFormat.TextPlain()))
.errorOut(plainBody[ErrorInfo])
.serverLogic((logic _).andThen(handleErrors))

val config =
NettyConfig.default
.host("0.0.0.0")
.port(9000)
.requestTimeout(givenRequestTimeout)


//val routes = PlayServerInterpreter().toRoutes(e)
val routes = PlayServerInterpreter().toRoutes(e)

@main def playServer(): Unit =
import play.api.Configuration
Expand All @@ -81,23 +63,36 @@ val config =
import java.io.File
import java.util.Properties

println(s"Server is starting...")

NettyCatsServer
.io(config)
.use { server =>
for {
binding <- server
.addEndpoint(e)
.start()
result <- IO
.blocking {
val port = binding.port
val host = binding.hostName
println(s"Server started at port = ${binding.port}")
}
.guarantee(binding.stop())
} yield result
}.unsafeRunSync()

println(s"Server started at port ???")
val customConfig = Configuration(
"play.server.http.idleTimeout" -> "75 seconds",
"play.server.https.idleTimeout" -> "75 seconds",
"play.server.https.wantClientAuth" -> false,
"play.server.https.needClientAuth" -> false,
"play.server.netty.server-header" -> null,
"play.server.netty.shutdownQuietPeriod" -> "2 seconds",
"play.server.netty.maxInitialLineLength" -> "4096",
"play.server.netty.maxChunkSize" -> "8192",
"play.server.netty.eventLoopThreads" -> "0",
"play.server.netty.transport" -> "jdk",
"play.server.max-header-size" -> "8k",
"play.server.waitBeforeTermination" -> "0",
"play.server.deferBodyParsing" -> false,
"play.server.websocket.frame.maxLength" -> "64k",
"play.server.websocket.periodic-keep-alive-mode" -> "ping",
"play.server.websocket.periodic-keep-alive-max-idle" -> "infinite",
"play.server.max-content-length" -> "infinite",
"play.server.netty.log.wire" -> true,
"play.server.netty.option.child.tcpNoDelay" -> true,
"play.server.pekko.requestTimeout" -> "5 seconds",
)
val serverConfig = ServerConfig(
rootDir = new File("."),
port = Some(9000),
sslPort = Some(9443),
address = "0.0.0.0",
mode = Mode.Dev,
properties = System.getProperties,
configuration = customConfig
)

NettyServer.fromRouterWithComponents(serverConfig) { components => routes }

0 comments on commit dde904c

Please sign in to comment.