From 4cb7800b82c331337d3710daf91c6388b4f95144 Mon Sep 17 00:00:00 2001 From: kciesielski Date: Thu, 11 Jul 2024 18:13:07 +0200 Subject: [PATCH 1/9] wip --- .../cats/internal/NettyCatsRequestBody.scala | 69 ++++++++++++++- .../netty/cats/NettyCatsServerTest.scala | 45 +++++++++- .../internal/NettyFutureRequestBody.scala | 12 ++- .../netty/internal/NettyRequestBody.scala | 87 ++++++++++++++++++- .../sync/internal/NettySyncRequestBody.scala | 11 +++ .../zio/internal/NettyZioRequestBody.scala | 15 +++- 6 files changed, 229 insertions(+), 10 deletions(-) diff --git a/server/netty-server/cats/src/main/scala/sttp/tapir/server/netty/cats/internal/NettyCatsRequestBody.scala b/server/netty-server/cats/src/main/scala/sttp/tapir/server/netty/cats/internal/NettyCatsRequestBody.scala index 98b0742b74..27a69b13a0 100644 --- a/server/netty-server/cats/src/main/scala/sttp/tapir/server/netty/cats/internal/NettyCatsRequestBody.scala +++ b/server/netty-server/cats/src/main/scala/sttp/tapir/server/netty/cats/internal/NettyCatsRequestBody.scala @@ -1,18 +1,25 @@ package sttp.tapir.server.netty.cats.internal import cats.effect.Async +import cats.effect.kernel.Sync import cats.syntax.all._ import fs2.Chunk +import fs2.interop.reactivestreams.StreamSubscriber import fs2.io.file.{Files, Path} -import io.netty.handler.codec.http.HttpContent +import io.netty.handler.codec.http.multipart.{DefaultHttpDataFactory, HttpPostRequestDecoder } +import io.netty.handler.codec.http.{HttpContent, LastHttpContent} +import org.playframework.netty.http.StreamedHttpRequest import org.reactivestreams.Publisher import sttp.capabilities.fs2.Fs2Streams +import sttp.model.Part import sttp.monad.MonadError -import sttp.tapir.TapirFile import sttp.tapir.integ.cats.effect.CatsMonadError import sttp.tapir.model.ServerRequest +import sttp.tapir.server.interpreter.RawValue import sttp.tapir.server.netty.internal.{NettyStreamingRequestBody, StreamCompatible} -import sttp.capabilities.WebSockets +import sttp.tapir.{RawBodyType, RawPart, TapirFile} + +import java.io.File private[cats] class NettyCatsRequestBody[F[_]: Async]( val createFile: ServerRequest => F[TapirFile], @@ -21,6 +28,53 @@ private[cats] class NettyCatsRequestBody[F[_]: Async]( override implicit val monad: MonadError[F] = new CatsMonadError() + // TODO handle maxBytes + def publisherToMultipart( + nettyRequest: StreamedHttpRequest, + serverRequest: ServerRequest, + m: RawBodyType.MultipartBody + ): F[RawValue[Seq[RawPart]]] = { + fs2.Stream + .eval(StreamSubscriber[F, HttpContent](bufferSize = 1)) + .flatMap(s => s.sub.stream(Sync[F].delay(nettyRequest.subscribe(s)))) + .evalMapAccumulate({ + // initialize the stream's "state" - a mutable, stateful HttpPostRequestDecoder + new HttpPostRequestDecoder(NettyCatsRequestBody.multiPartDataFactory, nettyRequest) + // + })({ case (decoder, httpContent) => + if (httpContent.isInstanceOf[LastHttpContent]) { + monad.eval { + decoder.destroy() + (decoder, Vector.empty) + } + } else + monad + .blocking { + // this operation is the one that does potential I/O (writing files) + // TODO not thread-safe? (visibility of internal state changes?) + decoder.offer(httpContent) + val parts = Stream + .continually(if (decoder.hasNext) decoder.next() else null) + .takeWhile(_ != null) + .toVector + ( + decoder, + parts + ) + } + .onError { case _ => + monad.eval(decoder.destroy()) + } + }) + .map(_._2) + .map(_.flatMap(p => m.partType(p.getName()).map((p, _)).toList)) + .evalMap(_.traverse { case (data, partType) => toRawPart(serverRequest, data, partType) }) + .compile + .toVector + .map(_.flatten) + .map(RawValue.fromParts(_)) + } + override def publisherToBytes(publisher: Publisher[HttpContent], contentLength: Option[Long], maxBytes: Option[Long]): F[Array[Byte]] = streamCompatible.fromPublisher(publisher, maxBytes).compile.to(Chunk).map(_.toArray[Byte]) @@ -32,4 +86,13 @@ private[cats] class NettyCatsRequestBody[F[_]: Async]( ) .compile .drain + + override def writeBytesToFile(bytes: Array[Byte], file: File): F[Unit] = + fs2.Stream.emits(bytes).through(Files.forAsync[F].writeAll(Path.fromNioPath(file.toPath))).compile.drain + +} + +private[cats] object NettyCatsRequestBody { + val multiPartDataFactory = + new DefaultHttpDataFactory() // writes to memory, then switches to disk if exceeds MINSIZE (16kB), check other constructors. } diff --git a/server/netty-server/cats/src/test/scala/sttp/tapir/server/netty/cats/NettyCatsServerTest.scala b/server/netty-server/cats/src/test/scala/sttp/tapir/server/netty/cats/NettyCatsServerTest.scala index 73053ed362..5b8b5fcee5 100644 --- a/server/netty-server/cats/src/test/scala/sttp/tapir/server/netty/cats/NettyCatsServerTest.scala +++ b/server/netty-server/cats/src/test/scala/sttp/tapir/server/netty/cats/NettyCatsServerTest.scala @@ -1,6 +1,7 @@ package sttp.tapir.server.netty.cats import cats.effect.{IO, Resource} +import cats.syntax.all._ import io.netty.channel.nio.NioEventLoopGroup import org.scalatest.EitherValues import sttp.capabilities.fs2.Fs2Streams @@ -12,8 +13,12 @@ import sttp.tapir.tests.{Test, TestSuite} import scala.concurrent.Future import scala.concurrent.duration.FiniteDuration +import sttp.model.Part +import sttp.model.StatusCode +import org.scalatest.matchers.should.Matchers +import sttp.tapir.tests.data.FruitAmount -class NettyCatsServerTest extends TestSuite with EitherValues { +class NettyCatsServerTest extends TestSuite with EitherValues with Matchers { override def tests: Resource[IO, List[Test]] = backendResource.flatMap { backend => @@ -30,6 +35,10 @@ class NettyCatsServerTest extends TestSuite with EitherValues { def drainFs2(stream: Fs2Streams[IO]#BinaryStream): IO[Unit] = stream.compile.drain.void + import createServerTest._ + import sttp.tapir.tests.Multipart._ + import sttp.tapir.tests.TestUtil.{readFromFile, writeToFile} + import sttp.client3.{multipartFile, _} val tests = new AllServerTests( createServerTest, interpreter, @@ -50,7 +59,39 @@ class NettyCatsServerTest extends TestSuite with EitherValues { ) { override def functionToPipe[A, B](f: A => B): streams.Pipe[A, B] = in => in.map(f) override def emptyPipe[A, B]: fs2.Pipe[IO, A, B] = _ => fs2.Stream.empty - }.tests() + }.tests() ++ List( + testServer(in_raw_multipart_out_string, "multi1")((parts: Seq[Part[Array[Byte]]]) => + pureResult( + Right(parts.map(part => s"${part.name}:${new String(part.body)}").mkString("\n")) + ) + ) { (backend, baseUri) => + val file1 = writeToFile("peach mario") + val file2 = writeToFile("daisy luigi") + basicStringRequest + .post(uri"$baseUri/api/echo/multipart") + .multipartBody( + multipartFile("file1", file1).fileName("file1.txt"), + multipartFile("file2", file2).fileName("file2.txt") + ) + .send(backend) + .map { r => + r.code shouldBe StatusCode.Ok + r.body should include("file1:peach mario") + r.body should include("file2:daisy luigi") + } + }, + testServer(in_simple_multipart_out_string, "multi2")((fa: FruitAmount) => + pureResult(fa.toString.asRight[Unit]) + ) { (backend, baseUri) => + basicStringRequest + .post(uri"$baseUri/api/echo/multipart") + .multipartBody(multipart("fruit", "pineapple"), multipart("amount", "120"), multipart("shape", "regular")) + .send(backend) + .map { r => + r.body shouldBe "FruitAmount(pineapple,120)" + } + } + ) IO.pure((tests, eventLoopGroup)) } { case (_, eventLoopGroup) => diff --git a/server/netty-server/src/main/scala/sttp/tapir/server/netty/internal/NettyFutureRequestBody.scala b/server/netty-server/src/main/scala/sttp/tapir/server/netty/internal/NettyFutureRequestBody.scala index 2316217275..57a4b83962 100644 --- a/server/netty-server/src/main/scala/sttp/tapir/server/netty/internal/NettyFutureRequestBody.scala +++ b/server/netty-server/src/main/scala/sttp/tapir/server/netty/internal/NettyFutureRequestBody.scala @@ -5,16 +5,26 @@ import org.playframework.netty.http.StreamedHttpRequest import org.reactivestreams.Publisher import sttp.capabilities import sttp.monad.{FutureMonad, MonadError} -import sttp.tapir.TapirFile import sttp.tapir.capabilities.NoStreams import sttp.tapir.model.ServerRequest +import sttp.tapir.server.interpreter.RawValue import sttp.tapir.server.netty.internal.reactivestreams._ +import sttp.tapir.{RawBodyType, RawPart, TapirFile} +import java.io.File import scala.concurrent.{ExecutionContext, Future} private[netty] class NettyFutureRequestBody(val createFile: ServerRequest => Future[TapirFile])(implicit ec: ExecutionContext) extends NettyRequestBody[Future, NoStreams] { + override def publisherToMultipart( + nettyRequest: StreamedHttpRequest, + serverRequest: ServerRequest, + m: RawBodyType.MultipartBody + ): Future[RawValue[Seq[RawPart]]] = Future.failed(new UnsupportedOperationException("Multipart requests not supported.")) + + override def writeBytesToFile(bytes: Array[Byte], file: File): Future[Unit] = Future.failed(new UnsupportedOperationException) + override val streams: capabilities.Streams[NoStreams] = NoStreams override implicit val monad: MonadError[Future] = new FutureMonad() diff --git a/server/netty-server/src/main/scala/sttp/tapir/server/netty/internal/NettyRequestBody.scala b/server/netty-server/src/main/scala/sttp/tapir/server/netty/internal/NettyRequestBody.scala index bcae1a9e39..9e69fefab1 100644 --- a/server/netty-server/src/main/scala/sttp/tapir/server/netty/internal/NettyRequestBody.scala +++ b/server/netty-server/src/main/scala/sttp/tapir/server/netty/internal/NettyRequestBody.scala @@ -16,6 +16,14 @@ import sttp.tapir.{FileRange, InputStreamRange, RawBodyType, TapirFile} import java.io.InputStream import java.nio.ByteBuffer +import scala.collection.JavaConverters._ +import sttp.tapir.RawPart +import io.netty.handler.codec.http.multipart.InterfaceHttpData +import sttp.model.Part +import io.netty.handler.codec.http.multipart.HttpData +import io.netty.handler.codec.http.multipart.FileUpload +import java.io.ByteArrayInputStream +import java.io.File /** Common logic for processing request body in all Netty backends. It requires particular backends to implement a few operations. */ private[netty] trait NettyRequestBody[F[_], S <: Streams[S]] extends RequestBody[F, S] { @@ -37,6 +45,11 @@ private[netty] trait NettyRequestBody[F[_], S <: Streams[S]] extends RequestBody */ def publisherToBytes(publisher: Publisher[HttpContent], contentLength: Option[Long], maxBytes: Option[Long]): F[Array[Byte]] + /** + * Reads the reactive stream emitting HttpData into a vector of parts. Implementation-specific, as file manipulations and stream processing logic can be different for different backends. + */ + def publisherToMultipart(nettyRequest: StreamedHttpRequest, serverRequest: ServerRequest, m: RawBodyType.MultipartBody): F[RawValue[Seq[RawPart]]] + /** Backend-specific way to process all elements emitted by a Publisher[HttpContent] and write their bytes into a file. * * @param serverRequest @@ -50,6 +63,8 @@ private[netty] trait NettyRequestBody[F[_], S <: Streams[S]] extends RequestBody */ def writeToFile(serverRequest: ServerRequest, file: TapirFile, maxBytes: Option[Long]): F[Unit] + def writeBytesToFile(bytes: Array[Byte], file: File): F[Unit] + override def toRaw[RAW]( serverRequest: ServerRequest, bodyType: RawBodyType[RAW], @@ -70,8 +85,8 @@ private[netty] trait NettyRequestBody[F[_], S <: Streams[S]] extends RequestBody file <- createFile(serverRequest) _ <- writeToFile(serverRequest, file, maxBytes) } yield RawValue(FileRange(file), Seq(FileRange(file))) - case _: RawBodyType.MultipartBody => - monad.error(new UnsupportedOperationException) + case m: RawBodyType.MultipartBody => + publisherToMultipart(serverRequest.underlying.asInstanceOf[StreamedHttpRequest], serverRequest, m) } private def readAllBytes(serverRequest: ServerRequest, maxBytes: Option[Long]): F[Array[Byte]] = @@ -96,4 +111,72 @@ private[netty] trait NettyRequestBody[F[_], S <: Streams[S]] extends RequestBody throw new UnsupportedOperationException(s"Unexpected Netty request of type ${other.getClass.getName}") } } + + protected def toRawPart[R]( + serverRequest: ServerRequest, + data: InterfaceHttpData, + partType: RawBodyType[R] + ): F[Part[R]] = { + val partName = data.getName() + data match { + case httpData: HttpData => + // TODO filename* attribute is not used by netty. Non-ascii filenames like https://github.com/http4s/http4s/issues/5809 are unsupported. + toRawPartHttpData(partName, serverRequest, httpData, partType) + case unsupportedDataType => + monad.error(new UnsupportedOperationException(s"Unsupported multipart data type: $unsupportedDataType in part $partName")) + } + } + + private def toRawPartHttpData[R]( + partName: String, + serverRequest: ServerRequest, + httpData: HttpData, + partType: RawBodyType[R] + ): F[Part[R]] = { + val fileName = httpData match { + case fileUpload: FileUpload => Option(fileUpload.getFilename()) + case _ => None + } + partType match { + case RawBodyType.StringBody(defaultCharset) => + // TODO otherDispositionParams not supported. They are normally a part of the content-disposition part header, but this header is not directly accessible, they are extracted internally by the decoder. + val charset = if (httpData.getCharset() != null) httpData.getCharset() else defaultCharset + readHttpData(httpData, _.getString(charset)).map(body => Part(partName, body, fileName = fileName)) + case RawBodyType.ByteArrayBody => + readHttpData(httpData, _.get()).map(body => Part(partName, body, fileName = fileName)) + case RawBodyType.ByteBufferBody => + readHttpData(httpData, _.get()).map(body => Part(partName, ByteBuffer.wrap(body), fileName = fileName)) + case RawBodyType.InputStreamBody => + (if (httpData.isInMemory()) + monad.unit(new ByteArrayInputStream(httpData.get())) + else { + monad.blocking(java.nio.file.Files.newInputStream(httpData.getFile().toPath())) + }).map(body => Part(partName, body, fileName = fileName)) + case RawBodyType.InputStreamRangeBody => + val body = () => { + if (httpData.isInMemory()) + new ByteArrayInputStream(httpData.get()) + else + java.nio.file.Files.newInputStream(httpData.getFile().toPath()) + } + monad.unit(Part(partName, InputStreamRange(body), fileName = fileName)) + case RawBodyType.FileBody => + val fileF: F[File] = + if (httpData.isInMemory()) + (for { + file <- createFile(serverRequest) + _ <- writeBytesToFile(httpData.get(), file) + } yield file) + else monad.unit(httpData.getFile()) + fileF.map(file => Part(partName, FileRange(file), fileName = fileName)) + case _: RawBodyType.MultipartBody => + monad.error(new UnsupportedOperationException(s"Nested multipart not supported, part name = $partName")) + } + } + + private def readHttpData[T](httpData: HttpData, f: HttpData => T): F[T] = + if (httpData.isInMemory()) + monad.unit(f(httpData)) + else + monad.blocking(f(httpData)) } diff --git a/server/netty-server/sync/src/main/scala/sttp/tapir/server/netty/sync/internal/NettySyncRequestBody.scala b/server/netty-server/sync/src/main/scala/sttp/tapir/server/netty/sync/internal/NettySyncRequestBody.scala index d6cc30a3f3..eaeefd7616 100644 --- a/server/netty-server/sync/src/main/scala/sttp/tapir/server/netty/sync/internal/NettySyncRequestBody.scala +++ b/server/netty-server/sync/src/main/scala/sttp/tapir/server/netty/sync/internal/NettySyncRequestBody.scala @@ -11,6 +11,10 @@ import sttp.tapir.model.ServerRequest import sttp.tapir.server.netty.internal.NettyRequestBody import sttp.tapir.server.netty.internal.reactivestreams.{FileWriterSubscriber, SimpleSubscriber} import sttp.tapir.server.netty.sync.* +import sttp.tapir.RawBodyType +import sttp.tapir.server.interpreter.RawValue +import sttp.tapir.RawPart +import java.io.File private[sync] class NettySyncRequestBody(val createFile: ServerRequest => TapirFile) extends NettyRequestBody[Identity, OxStreams]: @@ -20,6 +24,13 @@ private[sync] class NettySyncRequestBody(val createFile: ServerRequest => TapirF override def publisherToBytes(publisher: Publisher[HttpContent], contentLength: Option[Long], maxBytes: Option[Long]): Array[Byte] = SimpleSubscriber.processAllBlocking(publisher, contentLength, maxBytes) + override def publisherToMultipart( + nettyRequest: StreamedHttpRequest, + serverRequest: ServerRequest, + m: RawBodyType.MultipartBody + ): RawValue[Seq[RawPart]] = throw new UnsupportedOperationException("Multipart requests not supported.") + override def writeBytesToFile(bytes: Array[Byte], file: File) = throw new UnsupportedOperationException() + override def writeToFile(serverRequest: ServerRequest, file: TapirFile, maxBytes: Option[Long]): Unit = serverRequest.underlying match case r: StreamedHttpRequest => FileWriterSubscriber.processAllBlocking(r, file.toPath, maxBytes) diff --git a/server/netty-server/zio/src/main/scala/sttp/tapir/server/netty/zio/internal/NettyZioRequestBody.scala b/server/netty-server/zio/src/main/scala/sttp/tapir/server/netty/zio/internal/NettyZioRequestBody.scala index 3cb9b9ab21..ce582e4463 100644 --- a/server/netty-server/zio/src/main/scala/sttp/tapir/server/netty/zio/internal/NettyZioRequestBody.scala +++ b/server/netty-server/zio/src/main/scala/sttp/tapir/server/netty/zio/internal/NettyZioRequestBody.scala @@ -1,15 +1,19 @@ package sttp.tapir.server.netty.zio.internal import io.netty.handler.codec.http.HttpContent +import org.playframework.netty.http.StreamedHttpRequest import org.reactivestreams.Publisher import sttp.capabilities.zio.ZioStreams import sttp.monad.MonadError -import sttp.tapir.TapirFile import sttp.tapir.model.ServerRequest +import sttp.tapir.server.interpreter.RawValue import sttp.tapir.server.netty.internal.{NettyStreamingRequestBody, StreamCompatible} import sttp.tapir.ztapir.RIOMonadError -import zio.RIO +import sttp.tapir.{RawBodyType, RawPart, TapirFile} import zio.stream._ +import zio.{RIO, ZIO} + +import java.io.File private[zio] class NettyZioRequestBody[Env]( val createFile: ServerRequest => RIO[Env, TapirFile], @@ -19,6 +23,13 @@ private[zio] class NettyZioRequestBody[Env]( override val streams: ZioStreams = ZioStreams override implicit val monad: MonadError[RIO[Env, *]] = new RIOMonadError[Env] + override def publisherToMultipart( + nettyRequest: StreamedHttpRequest, + serverRequest: ServerRequest, + m: RawBodyType.MultipartBody + ): RIO[Env, RawValue[Seq[RawPart]]] = ZIO.die(new UnsupportedOperationException("Multipart requests not supported.")) + + override def writeBytesToFile(bytes: Array[Byte], file: File): RIO[Env, Unit] = ZIO.die(new UnsupportedOperationException) override def publisherToBytes( publisher: Publisher[HttpContent], contentLength: Option[Long], From 4720157294ea2066c309b87caad3c38e7ad2a35d Mon Sep 17 00:00:00 2001 From: kciesielski Date: Fri, 12 Jul 2024 17:32:12 +0200 Subject: [PATCH 2/9] Split multipart tests into request / response --- .../cats/internal/NettyCatsRequestBody.scala | 3 +- .../netty/cats/NettyCatsServerTest.scala | 47 +----- .../server/tests/ServerMultipartTests.scala | 142 +++++++++--------- 3 files changed, 81 insertions(+), 111 deletions(-) diff --git a/server/netty-server/cats/src/main/scala/sttp/tapir/server/netty/cats/internal/NettyCatsRequestBody.scala b/server/netty-server/cats/src/main/scala/sttp/tapir/server/netty/cats/internal/NettyCatsRequestBody.scala index 27a69b13a0..76c4461987 100644 --- a/server/netty-server/cats/src/main/scala/sttp/tapir/server/netty/cats/internal/NettyCatsRequestBody.scala +++ b/server/netty-server/cats/src/main/scala/sttp/tapir/server/netty/cats/internal/NettyCatsRequestBody.scala @@ -6,7 +6,7 @@ import cats.syntax.all._ import fs2.Chunk import fs2.interop.reactivestreams.StreamSubscriber import fs2.io.file.{Files, Path} -import io.netty.handler.codec.http.multipart.{DefaultHttpDataFactory, HttpPostRequestDecoder } +import io.netty.handler.codec.http.multipart.{DefaultHttpDataFactory, HttpPostRequestDecoder} import io.netty.handler.codec.http.{HttpContent, LastHttpContent} import org.playframework.netty.http.StreamedHttpRequest import org.reactivestreams.Publisher @@ -40,7 +40,6 @@ private[cats] class NettyCatsRequestBody[F[_]: Async]( .evalMapAccumulate({ // initialize the stream's "state" - a mutable, stateful HttpPostRequestDecoder new HttpPostRequestDecoder(NettyCatsRequestBody.multiPartDataFactory, nettyRequest) - // })({ case (decoder, httpContent) => if (httpContent.isInstanceOf[LastHttpContent]) { monad.eval { diff --git a/server/netty-server/cats/src/test/scala/sttp/tapir/server/netty/cats/NettyCatsServerTest.scala b/server/netty-server/cats/src/test/scala/sttp/tapir/server/netty/cats/NettyCatsServerTest.scala index 5b8b5fcee5..256e689956 100644 --- a/server/netty-server/cats/src/test/scala/sttp/tapir/server/netty/cats/NettyCatsServerTest.scala +++ b/server/netty-server/cats/src/test/scala/sttp/tapir/server/netty/cats/NettyCatsServerTest.scala @@ -13,10 +13,7 @@ import sttp.tapir.tests.{Test, TestSuite} import scala.concurrent.Future import scala.concurrent.duration.FiniteDuration -import sttp.model.Part -import sttp.model.StatusCode import org.scalatest.matchers.should.Matchers -import sttp.tapir.tests.data.FruitAmount class NettyCatsServerTest extends TestSuite with EitherValues with Matchers { @@ -35,10 +32,6 @@ class NettyCatsServerTest extends TestSuite with EitherValues with Matchers { def drainFs2(stream: Fs2Streams[IO]#BinaryStream): IO[Unit] = stream.compile.drain.void - import createServerTest._ - import sttp.tapir.tests.Multipart._ - import sttp.tapir.tests.TestUtil.{readFromFile, writeToFile} - import sttp.client3.{multipartFile, _} val tests = new AllServerTests( createServerTest, interpreter, @@ -50,6 +43,12 @@ class NettyCatsServerTest extends TestSuite with EitherValues with Matchers { new ServerCancellationTests(createServerTest)(m, IO.asyncForIO).tests() ++ new NettyFs2StreamingCancellationTest(createServerTest).tests() ++ new ServerGracefulShutdownTests(createServerTest, ioSleeper).tests() ++ + new ServerMultipartTests( + createServerTest, + partContentTypeHeaderSupport = false, + partOtherHeaderSupport = false, + multipartResponsesSupport = false + ).tests() ++ new ServerWebSocketTests( createServerTest, Fs2Streams[IO], @@ -59,39 +58,7 @@ class NettyCatsServerTest extends TestSuite with EitherValues with Matchers { ) { override def functionToPipe[A, B](f: A => B): streams.Pipe[A, B] = in => in.map(f) override def emptyPipe[A, B]: fs2.Pipe[IO, A, B] = _ => fs2.Stream.empty - }.tests() ++ List( - testServer(in_raw_multipart_out_string, "multi1")((parts: Seq[Part[Array[Byte]]]) => - pureResult( - Right(parts.map(part => s"${part.name}:${new String(part.body)}").mkString("\n")) - ) - ) { (backend, baseUri) => - val file1 = writeToFile("peach mario") - val file2 = writeToFile("daisy luigi") - basicStringRequest - .post(uri"$baseUri/api/echo/multipart") - .multipartBody( - multipartFile("file1", file1).fileName("file1.txt"), - multipartFile("file2", file2).fileName("file2.txt") - ) - .send(backend) - .map { r => - r.code shouldBe StatusCode.Ok - r.body should include("file1:peach mario") - r.body should include("file2:daisy luigi") - } - }, - testServer(in_simple_multipart_out_string, "multi2")((fa: FruitAmount) => - pureResult(fa.toString.asRight[Unit]) - ) { (backend, baseUri) => - basicStringRequest - .post(uri"$baseUri/api/echo/multipart") - .multipartBody(multipart("fruit", "pineapple"), multipart("amount", "120"), multipart("shape", "regular")) - .send(backend) - .map { r => - r.body shouldBe "FruitAmount(pineapple,120)" - } - } - ) + }.tests() IO.pure((tests, eventLoopGroup)) } { case (_, eventLoopGroup) => diff --git a/server/tests/src/main/scala/sttp/tapir/server/tests/ServerMultipartTests.scala b/server/tests/src/main/scala/sttp/tapir/server/tests/ServerMultipartTests.scala index 157b27cce3..b59b49a406 100644 --- a/server/tests/src/main/scala/sttp/tapir/server/tests/ServerMultipartTests.scala +++ b/server/tests/src/main/scala/sttp/tapir/server/tests/ServerMultipartTests.scala @@ -26,13 +26,14 @@ class ServerMultipartTests[F[_], OPTIONS, ROUTE]( createServerTest: CreateServerTest[F, Any, OPTIONS, ROUTE], partContentTypeHeaderSupport: Boolean = true, partOtherHeaderSupport: Boolean = true, - maxContentLengthSupport: Boolean = true + maxContentLengthSupport: Boolean = true, + multipartResponsesSupport: Boolean = true )(implicit m: MonadError[F]) { import createServerTest._ def tests(): List[Test] = basicTests() ++ (if (partContentTypeHeaderSupport) contentTypeHeaderTests() else Nil) ++ - (if (maxContentLengthSupport) maxContentLengthTests() else Nil) + (if (maxContentLengthSupport) maxContentLengthTests() else Nil) ++ (if (multipartResponsesSupport) multipartResponsesTests() else Nil) def maxContentLengthTests(): List[Test] = List( testServer( @@ -61,18 +62,6 @@ class ServerMultipartTests[F[_], OPTIONS, ROUTE]( def basicTests(): List[Test] = { List( - testServer(in_simple_multipart_out_multipart)((fa: FruitAmount) => - pureResult(FruitAmount(fa.fruit + " apple", fa.amount * 2).asRight[Unit]) - ) { (backend, baseUri) => - basicStringRequest - .post(uri"$baseUri/api/echo/multipart") - .multipartBody(multipart("fruit", "pineapple"), multipart("amount", "120")) - .send(backend) - .map { r => - r.body should include regex "name=\"fruit\"[\\s\\S]*pineapple apple" - r.body should include regex "name=\"amount\"[\\s\\S]*240" - } - }, testServer(in_simple_multipart_out_string, "discard unknown parts")((fa: FruitAmount) => pureResult(fa.toString.asRight[Unit])) { (backend, baseUri) => basicStringRequest @@ -83,61 +72,6 @@ class ServerMultipartTests[F[_], OPTIONS, ROUTE]( r.body shouldBe "FruitAmount(pineapple,120)" } }, - testServer(in_file_multipart_out_multipart)((fd: FruitData) => - pureResult( - data - .FruitData( - Part("", writeToFile(Await.result(readFromFile(fd.data.body), 3.seconds).reverse), fd.data.otherDispositionParams, Nil) - .header("X-Auth", fd.data.headers.find(_.is("X-Auth")).map(_.value).toString) - ) - .asRight[Unit] - ) - ) { (backend, baseUri) => - val file = writeToFile("peach mario") - basicStringRequest - .post(uri"$baseUri/api/echo/multipart") - .multipartBody(multipartFile("data", file).fileName("fruit-data.txt").header("X-Auth", "12Aa")) - .send(backend) - .map { r => - r.code shouldBe StatusCode.Ok - if (partOtherHeaderSupport) r.body should include regex "((?i)X-Auth):[ ]?Some\\(12Aa\\)" - r.body should include regex "name=\"data\"[\\s\\S]*oiram hcaep" - } - }, - testServer(in_file_list_multipart_out_multipart) { (mfu: MultipleFileUpload) => - val files = mfu.files.map { part => - Part( - part.name, - writeToFile(Await.result(readFromFile(part.body), 3.seconds) + " result"), - part.otherDispositionParams, - Nil - ).header("X-Auth", part.headers.find(_.is("X-Auth")).map(_.value + "x").getOrElse("")) - } - pureResult(MultipleFileUpload(files).asRight[Unit]) - } { (backend, baseUri) => - val file1 = writeToFile("peach mario 1") - val file2 = writeToFile("peach mario 2") - val file3 = writeToFile("peach mario 3") - basicStringRequest - .post(uri"$baseUri/api/echo/multipart") - .multipartBody( - multipartFile("files", file1).fileName("fruit-data-1.txt").header("X-Auth", "12Aa"), - multipartFile("files", file2).fileName("fruit-data-2.txt").header("X-Auth", "12Ab"), - multipartFile("files", file3).fileName("fruit-data-3.txt").header("X-Auth", "12Ac") - ) - .send(backend) - .map { r => - r.code shouldBe StatusCode.Ok - if (partOtherHeaderSupport) { - r.body should include regex "((?i)X-Auth):[ ]?12Aax" - r.body should include regex "((?i)X-Auth):[ ]?12Abx" - r.body should include regex "((?i)X-Auth):[ ]?12Acx" - } - r.body should include("peach mario 1 result") - r.body should include("peach mario 2 result") - r.body should include("peach mario 3 result") - } - }, testServer(in_raw_multipart_out_string)((parts: Seq[Part[Array[Byte]]]) => pureResult( parts.map(part => s"${part.name}:${new String(part.body)}").mkString("\n").asRight[Unit] @@ -188,6 +122,76 @@ class ServerMultipartTests[F[_], OPTIONS, ROUTE]( ) } + def multipartResponsesTests() = List( + testServer(in_simple_multipart_out_multipart)((fa: FruitAmount) => + pureResult(FruitAmount(fa.fruit + " apple", fa.amount * 2).asRight[Unit]) + ) { (backend, baseUri) => + basicStringRequest + .post(uri"$baseUri/api/echo/multipart") + .multipartBody(multipart("fruit", "pineapple"), multipart("amount", "120")) + .send(backend) + .map { r => + r.body should include regex "name=\"fruit\"[\\s\\S]*pineapple apple" + r.body should include regex "name=\"amount\"[\\s\\S]*240" + } + }, + testServer(in_file_multipart_out_multipart)((fd: FruitData) => + pureResult( + data + .FruitData( + Part("", writeToFile(Await.result(readFromFile(fd.data.body), 3.seconds).reverse), fd.data.otherDispositionParams, Nil) + .header("X-Auth", fd.data.headers.find(_.is("X-Auth")).map(_.value).toString) + ) + .asRight[Unit] + ) + ) { (backend, baseUri) => + val file = writeToFile("peach mario") + basicStringRequest + .post(uri"$baseUri/api/echo/multipart") + .multipartBody(multipartFile("data", file).fileName("fruit-data.txt").header("X-Auth", "12Aa")) + .send(backend) + .map { r => + r.code shouldBe StatusCode.Ok + if (partOtherHeaderSupport) r.body should include regex "((?i)X-Auth):[ ]?Some\\(12Aa\\)" + r.body should include regex "name=\"data\"[\\s\\S]*oiram hcaep" + } + }, + testServer(in_file_list_multipart_out_multipart) { (mfu: MultipleFileUpload) => + val files = mfu.files.map { part => + Part( + part.name, + writeToFile(Await.result(readFromFile(part.body), 3.seconds) + " result"), + part.otherDispositionParams, + Nil + ).header("X-Auth", part.headers.find(_.is("X-Auth")).map(_.value + "x").getOrElse("")) + } + pureResult(MultipleFileUpload(files).asRight[Unit]) + } { (backend, baseUri) => + val file1 = writeToFile("peach mario 1") + val file2 = writeToFile("peach mario 2") + val file3 = writeToFile("peach mario 3") + basicStringRequest + .post(uri"$baseUri/api/echo/multipart") + .multipartBody( + multipartFile("files", file1).fileName("fruit-data-1.txt").header("X-Auth", "12Aa"), + multipartFile("files", file2).fileName("fruit-data-2.txt").header("X-Auth", "12Ab"), + multipartFile("files", file3).fileName("fruit-data-3.txt").header("X-Auth", "12Ac") + ) + .send(backend) + .map { r => + r.code shouldBe StatusCode.Ok + if (partOtherHeaderSupport) { + r.body should include regex "((?i)X-Auth):[ ]?12Aax" + r.body should include regex "((?i)X-Auth):[ ]?12Abx" + r.body should include regex "((?i)X-Auth):[ ]?12Acx" + } + r.body should include("peach mario 1 result") + r.body should include("peach mario 2 result") + r.body should include("peach mario 3 result") + } + } + ) + def contentTypeHeaderTests(): List[Test] = List( testServer(in_file_multipart_out_multipart, "with part content type header")((fd: FruitData) => pureResult( From 33affa7634922a34cb3a96fac78a5ed88d10f70a Mon Sep 17 00:00:00 2001 From: kciesielski Date: Mon, 15 Jul 2024 11:41:34 +0200 Subject: [PATCH 3/9] Manage decoder as a Resource --- .../cats/internal/NettyCatsRequestBody.scala | 72 ++++++++++--------- .../netty/internal/NettyRequestBody.scala | 17 +++-- 2 files changed, 48 insertions(+), 41 deletions(-) diff --git a/server/netty-server/cats/src/main/scala/sttp/tapir/server/netty/cats/internal/NettyCatsRequestBody.scala b/server/netty-server/cats/src/main/scala/sttp/tapir/server/netty/cats/internal/NettyCatsRequestBody.scala index 76c4461987..959586b0b6 100644 --- a/server/netty-server/cats/src/main/scala/sttp/tapir/server/netty/cats/internal/NettyCatsRequestBody.scala +++ b/server/netty-server/cats/src/main/scala/sttp/tapir/server/netty/cats/internal/NettyCatsRequestBody.scala @@ -1,13 +1,13 @@ package sttp.tapir.server.netty.cats.internal import cats.effect.Async -import cats.effect.kernel.Sync +import cats.effect.kernel.{Resource, Sync} import cats.syntax.all._ import fs2.Chunk import fs2.interop.reactivestreams.StreamSubscriber import fs2.io.file.{Files, Path} import io.netty.handler.codec.http.multipart.{DefaultHttpDataFactory, HttpPostRequestDecoder} -import io.netty.handler.codec.http.{HttpContent, LastHttpContent} +import io.netty.handler.codec.http.HttpContent import org.playframework.netty.http.StreamedHttpRequest import org.reactivestreams.Publisher import sttp.capabilities.fs2.Fs2Streams @@ -27,6 +27,7 @@ private[cats] class NettyCatsRequestBody[F[_]: Async]( ) extends NettyStreamingRequestBody[F, Fs2Streams[F]] { override implicit val monad: MonadError[F] = new CatsMonadError() + import io.netty.handler.codec.http.multipart.HttpData // TODO handle maxBytes def publisherToMultipart( @@ -35,39 +36,40 @@ private[cats] class NettyCatsRequestBody[F[_]: Async]( m: RawBodyType.MultipartBody ): F[RawValue[Seq[RawPart]]] = { fs2.Stream - .eval(StreamSubscriber[F, HttpContent](bufferSize = 1)) - .flatMap(s => s.sub.stream(Sync[F].delay(nettyRequest.subscribe(s)))) - .evalMapAccumulate({ - // initialize the stream's "state" - a mutable, stateful HttpPostRequestDecoder - new HttpPostRequestDecoder(NettyCatsRequestBody.multiPartDataFactory, nettyRequest) - })({ case (decoder, httpContent) => - if (httpContent.isInstanceOf[LastHttpContent]) { - monad.eval { - decoder.destroy() - (decoder, Vector.empty) - } - } else - monad - .blocking { - // this operation is the one that does potential I/O (writing files) - // TODO not thread-safe? (visibility of internal state changes?) - decoder.offer(httpContent) - val parts = Stream - .continually(if (decoder.hasNext) decoder.next() else null) - .takeWhile(_ != null) - .toVector - ( - decoder, - parts - ) - } - .onError { case _ => - monad.eval(decoder.destroy()) - } - }) - .map(_._2) - .map(_.flatMap(p => m.partType(p.getName()).map((p, _)).toList)) - .evalMap(_.traverse { case (data, partType) => toRawPart(serverRequest, data, partType) }) + .resource( + Resource.make(Sync[F].delay(new HttpPostRequestDecoder(NettyCatsRequestBody.multiPartDataFactory, nettyRequest)))(d => + Sync[F].blocking(d.destroy()) + ) + ) + .flatMap { decoder => + fs2.Stream + .eval(StreamSubscriber[F, HttpContent](bufferSize = 1)) + .flatMap(s => s.sub.stream(Sync[F].delay(nettyRequest.subscribe(s)))) + .evalMapAccumulate({ + decoder + })({ case (decoder, httpContent) => + monad + .blocking { + // this operation is the one that does potential I/O (writing files) + // TODO not thread-safe? (visibility of internal state changes?) + decoder.offer(httpContent) + val parts = Stream + .continually(if (decoder.hasNext) decoder.next() else null) + .takeWhile(_ != null) + .toVector + ( + decoder, + parts + ) + } + .onError { case _ => + monad.eval(decoder.destroy()) + } + }) + .map(_._2) + .map(_.flatMap(p => m.partType(p.getName()).map((p, _)).toList)) + .evalMap(_.traverse { case (data, partType) => toRawPart(serverRequest, data, partType) }) + } .compile .toVector .map(_.flatten) diff --git a/server/netty-server/src/main/scala/sttp/tapir/server/netty/internal/NettyRequestBody.scala b/server/netty-server/src/main/scala/sttp/tapir/server/netty/internal/NettyRequestBody.scala index 9e69fefab1..f208364d66 100644 --- a/server/netty-server/src/main/scala/sttp/tapir/server/netty/internal/NettyRequestBody.scala +++ b/server/netty-server/src/main/scala/sttp/tapir/server/netty/internal/NettyRequestBody.scala @@ -24,6 +24,7 @@ import io.netty.handler.codec.http.multipart.HttpData import io.netty.handler.codec.http.multipart.FileUpload import java.io.ByteArrayInputStream import java.io.File + /** Common logic for processing request body in all Netty backends. It requires particular backends to implement a few operations. */ private[netty] trait NettyRequestBody[F[_], S <: Streams[S]] extends RequestBody[F, S] { @@ -45,10 +46,14 @@ private[netty] trait NettyRequestBody[F[_], S <: Streams[S]] extends RequestBody */ def publisherToBytes(publisher: Publisher[HttpContent], contentLength: Option[Long], maxBytes: Option[Long]): F[Array[Byte]] - /** - * Reads the reactive stream emitting HttpData into a vector of parts. Implementation-specific, as file manipulations and stream processing logic can be different for different backends. - */ - def publisherToMultipart(nettyRequest: StreamedHttpRequest, serverRequest: ServerRequest, m: RawBodyType.MultipartBody): F[RawValue[Seq[RawPart]]] + /** Reads the reactive stream emitting HttpData into a vector of parts. Implementation-specific, as file manipulations and stream + * processing logic can be different for different backends. + */ + def publisherToMultipart( + nettyRequest: StreamedHttpRequest, + serverRequest: ServerRequest, + m: RawBodyType.MultipartBody + ): F[RawValue[Seq[RawPart]]] /** Backend-specific way to process all elements emitted by a Publisher[HttpContent] and write their bytes into a file. * @@ -111,7 +116,7 @@ private[netty] trait NettyRequestBody[F[_], S <: Streams[S]] extends RequestBody throw new UnsupportedOperationException(s"Unexpected Netty request of type ${other.getClass.getName}") } } - + protected def toRawPart[R]( serverRequest: ServerRequest, data: InterfaceHttpData, @@ -127,7 +132,7 @@ private[netty] trait NettyRequestBody[F[_], S <: Streams[S]] extends RequestBody } } - private def toRawPartHttpData[R]( + private def toRawPartHttpData[R]( partName: String, serverRequest: ServerRequest, httpData: HttpData, From 1b843c3830e1f701ff5e43f8e449666a74d3290a Mon Sep 17 00:00:00 2001 From: kciesielski Date: Mon, 15 Jul 2024 12:50:16 +0200 Subject: [PATCH 4/9] No need for onError --- .../server/netty/cats/internal/NettyCatsRequestBody.scala | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/server/netty-server/cats/src/main/scala/sttp/tapir/server/netty/cats/internal/NettyCatsRequestBody.scala b/server/netty-server/cats/src/main/scala/sttp/tapir/server/netty/cats/internal/NettyCatsRequestBody.scala index 959586b0b6..4c6f724546 100644 --- a/server/netty-server/cats/src/main/scala/sttp/tapir/server/netty/cats/internal/NettyCatsRequestBody.scala +++ b/server/netty-server/cats/src/main/scala/sttp/tapir/server/netty/cats/internal/NettyCatsRequestBody.scala @@ -27,7 +27,6 @@ private[cats] class NettyCatsRequestBody[F[_]: Async]( ) extends NettyStreamingRequestBody[F, Fs2Streams[F]] { override implicit val monad: MonadError[F] = new CatsMonadError() - import io.netty.handler.codec.http.multipart.HttpData // TODO handle maxBytes def publisherToMultipart( @@ -53,6 +52,7 @@ private[cats] class NettyCatsRequestBody[F[_]: Async]( // this operation is the one that does potential I/O (writing files) // TODO not thread-safe? (visibility of internal state changes?) decoder.offer(httpContent) + val parts = Stream .continually(if (decoder.hasNext) decoder.next() else null) .takeWhile(_ != null) @@ -62,9 +62,6 @@ private[cats] class NettyCatsRequestBody[F[_]: Async]( parts ) } - .onError { case _ => - monad.eval(decoder.destroy()) - } }) .map(_._2) .map(_.flatMap(p => m.partType(p.getName()).map((p, _)).toList)) From b4b7be52859950e7521e584737adf15b1c20e4ea Mon Sep 17 00:00:00 2001 From: kciesielski Date: Mon, 15 Jul 2024 13:32:40 +0200 Subject: [PATCH 5/9] Handle max content length --- .../cats/internal/NettyCatsRequestBody.scala | 25 +++++++++++++------ .../internal/NettyFutureRequestBody.scala | 5 ++-- .../netty/internal/NettyRequestBody.scala | 5 ++-- 3 files changed, 24 insertions(+), 11 deletions(-) diff --git a/server/netty-server/cats/src/main/scala/sttp/tapir/server/netty/cats/internal/NettyCatsRequestBody.scala b/server/netty-server/cats/src/main/scala/sttp/tapir/server/netty/cats/internal/NettyCatsRequestBody.scala index 4c6f724546..8480c0de7d 100644 --- a/server/netty-server/cats/src/main/scala/sttp/tapir/server/netty/cats/internal/NettyCatsRequestBody.scala +++ b/server/netty-server/cats/src/main/scala/sttp/tapir/server/netty/cats/internal/NettyCatsRequestBody.scala @@ -6,10 +6,11 @@ import cats.syntax.all._ import fs2.Chunk import fs2.interop.reactivestreams.StreamSubscriber import fs2.io.file.{Files, Path} -import io.netty.handler.codec.http.multipart.{DefaultHttpDataFactory, HttpPostRequestDecoder} import io.netty.handler.codec.http.HttpContent +import io.netty.handler.codec.http.multipart.{DefaultHttpDataFactory, HttpData, HttpPostRequestDecoder} import org.playframework.netty.http.StreamedHttpRequest import org.reactivestreams.Publisher +import sttp.capabilities.StreamMaxLengthExceededException import sttp.capabilities.fs2.Fs2Streams import sttp.model.Part import sttp.monad.MonadError @@ -28,11 +29,11 @@ private[cats] class NettyCatsRequestBody[F[_]: Async]( override implicit val monad: MonadError[F] = new CatsMonadError() - // TODO handle maxBytes def publisherToMultipart( nettyRequest: StreamedHttpRequest, serverRequest: ServerRequest, - m: RawBodyType.MultipartBody + m: RawBodyType.MultipartBody, + maxBytes: Option[Long] ): F[RawValue[Seq[RawPart]]] = { fs2.Stream .resource( @@ -45,20 +46,30 @@ private[cats] class NettyCatsRequestBody[F[_]: Async]( .eval(StreamSubscriber[F, HttpContent](bufferSize = 1)) .flatMap(s => s.sub.stream(Sync[F].delay(nettyRequest.subscribe(s)))) .evalMapAccumulate({ - decoder - })({ case (decoder, httpContent) => + (decoder, 0L) + })({ case ((decoder, processedBytesNum), httpContent) => monad .blocking { // this operation is the one that does potential I/O (writing files) // TODO not thread-safe? (visibility of internal state changes?) decoder.offer(httpContent) + var processedBytesAndContentBytes = processedBytesNum val parts = Stream - .continually(if (decoder.hasNext) decoder.next() else null) + .continually(if (decoder.hasNext) { + val next = decoder.next() + processedBytesAndContentBytes = processedBytesAndContentBytes + next.asInstanceOf[HttpData].length() + maxBytes.foreach { max => + if (max < processedBytesAndContentBytes) { + throw new StreamMaxLengthExceededException(max) + } + } + next + } else null) .takeWhile(_ != null) .toVector ( - decoder, + (decoder, processedBytesAndContentBytes), parts ) } diff --git a/server/netty-server/src/main/scala/sttp/tapir/server/netty/internal/NettyFutureRequestBody.scala b/server/netty-server/src/main/scala/sttp/tapir/server/netty/internal/NettyFutureRequestBody.scala index 57a4b83962..a6c2e69dd7 100644 --- a/server/netty-server/src/main/scala/sttp/tapir/server/netty/internal/NettyFutureRequestBody.scala +++ b/server/netty-server/src/main/scala/sttp/tapir/server/netty/internal/NettyFutureRequestBody.scala @@ -20,9 +20,10 @@ private[netty] class NettyFutureRequestBody(val createFile: ServerRequest => Fut override def publisherToMultipart( nettyRequest: StreamedHttpRequest, serverRequest: ServerRequest, - m: RawBodyType.MultipartBody + m: RawBodyType.MultipartBody, + maxBytes: Option[Long] ): Future[RawValue[Seq[RawPart]]] = Future.failed(new UnsupportedOperationException("Multipart requests not supported.")) - + override def writeBytesToFile(bytes: Array[Byte], file: File): Future[Unit] = Future.failed(new UnsupportedOperationException) override val streams: capabilities.Streams[NoStreams] = NoStreams diff --git a/server/netty-server/src/main/scala/sttp/tapir/server/netty/internal/NettyRequestBody.scala b/server/netty-server/src/main/scala/sttp/tapir/server/netty/internal/NettyRequestBody.scala index f208364d66..0a19395c67 100644 --- a/server/netty-server/src/main/scala/sttp/tapir/server/netty/internal/NettyRequestBody.scala +++ b/server/netty-server/src/main/scala/sttp/tapir/server/netty/internal/NettyRequestBody.scala @@ -52,7 +52,8 @@ private[netty] trait NettyRequestBody[F[_], S <: Streams[S]] extends RequestBody def publisherToMultipart( nettyRequest: StreamedHttpRequest, serverRequest: ServerRequest, - m: RawBodyType.MultipartBody + m: RawBodyType.MultipartBody, + maxBytes: Option[Long] ): F[RawValue[Seq[RawPart]]] /** Backend-specific way to process all elements emitted by a Publisher[HttpContent] and write their bytes into a file. @@ -91,7 +92,7 @@ private[netty] trait NettyRequestBody[F[_], S <: Streams[S]] extends RequestBody _ <- writeToFile(serverRequest, file, maxBytes) } yield RawValue(FileRange(file), Seq(FileRange(file))) case m: RawBodyType.MultipartBody => - publisherToMultipart(serverRequest.underlying.asInstanceOf[StreamedHttpRequest], serverRequest, m) + publisherToMultipart(serverRequest.underlying.asInstanceOf[StreamedHttpRequest], serverRequest, m, maxBytes) } private def readAllBytes(serverRequest: ServerRequest, maxBytes: Option[Long]): F[Array[Byte]] = From eb5bc1ae888c2dce13be39013a561867c185ea18 Mon Sep 17 00:00:00 2001 From: kciesielski Date: Mon, 15 Jul 2024 13:47:13 +0200 Subject: [PATCH 6/9] Fix compilation issues --- .../server/netty/cats/internal/NettyCatsRequestBody.scala | 4 ++-- .../sttp/tapir/server/netty/cats/NettyCatsServerTest.scala | 1 - .../server/netty/sync/internal/NettySyncRequestBody.scala | 5 +++-- .../server/netty/zio/internal/NettyZioRequestBody.scala | 3 ++- 4 files changed, 7 insertions(+), 6 deletions(-) diff --git a/server/netty-server/cats/src/main/scala/sttp/tapir/server/netty/cats/internal/NettyCatsRequestBody.scala b/server/netty-server/cats/src/main/scala/sttp/tapir/server/netty/cats/internal/NettyCatsRequestBody.scala index 8480c0de7d..854f73c596 100644 --- a/server/netty-server/cats/src/main/scala/sttp/tapir/server/netty/cats/internal/NettyCatsRequestBody.scala +++ b/server/netty-server/cats/src/main/scala/sttp/tapir/server/netty/cats/internal/NettyCatsRequestBody.scala @@ -38,7 +38,7 @@ private[cats] class NettyCatsRequestBody[F[_]: Async]( fs2.Stream .resource( Resource.make(Sync[F].delay(new HttpPostRequestDecoder(NettyCatsRequestBody.multiPartDataFactory, nettyRequest)))(d => - Sync[F].blocking(d.destroy()) + Sync[F].blocking(d.destroy()) // after the stream finishes or fails, decoder data has to be cleaned up ) ) .flatMap { decoder => @@ -76,7 +76,7 @@ private[cats] class NettyCatsRequestBody[F[_]: Async]( }) .map(_._2) .map(_.flatMap(p => m.partType(p.getName()).map((p, _)).toList)) - .evalMap(_.traverse { case (data, partType) => toRawPart(serverRequest, data, partType) }) + .evalMap(_.traverse { case (data, partType) => toRawPart(serverRequest, data, partType).map(_.asInstanceOf[Part[Any]]) }) } .compile .toVector diff --git a/server/netty-server/cats/src/test/scala/sttp/tapir/server/netty/cats/NettyCatsServerTest.scala b/server/netty-server/cats/src/test/scala/sttp/tapir/server/netty/cats/NettyCatsServerTest.scala index 256e689956..62ba9777b1 100644 --- a/server/netty-server/cats/src/test/scala/sttp/tapir/server/netty/cats/NettyCatsServerTest.scala +++ b/server/netty-server/cats/src/test/scala/sttp/tapir/server/netty/cats/NettyCatsServerTest.scala @@ -1,7 +1,6 @@ package sttp.tapir.server.netty.cats import cats.effect.{IO, Resource} -import cats.syntax.all._ import io.netty.channel.nio.NioEventLoopGroup import org.scalatest.EitherValues import sttp.capabilities.fs2.Fs2Streams diff --git a/server/netty-server/sync/src/main/scala/sttp/tapir/server/netty/sync/internal/NettySyncRequestBody.scala b/server/netty-server/sync/src/main/scala/sttp/tapir/server/netty/sync/internal/NettySyncRequestBody.scala index eaeefd7616..0f0d542ab9 100644 --- a/server/netty-server/sync/src/main/scala/sttp/tapir/server/netty/sync/internal/NettySyncRequestBody.scala +++ b/server/netty-server/sync/src/main/scala/sttp/tapir/server/netty/sync/internal/NettySyncRequestBody.scala @@ -27,10 +27,11 @@ private[sync] class NettySyncRequestBody(val createFile: ServerRequest => TapirF override def publisherToMultipart( nettyRequest: StreamedHttpRequest, serverRequest: ServerRequest, - m: RawBodyType.MultipartBody + m: RawBodyType.MultipartBody, + maxBytes: Option[Long] ): RawValue[Seq[RawPart]] = throw new UnsupportedOperationException("Multipart requests not supported.") override def writeBytesToFile(bytes: Array[Byte], file: File) = throw new UnsupportedOperationException() - + override def writeToFile(serverRequest: ServerRequest, file: TapirFile, maxBytes: Option[Long]): Unit = serverRequest.underlying match case r: StreamedHttpRequest => FileWriterSubscriber.processAllBlocking(r, file.toPath, maxBytes) diff --git a/server/netty-server/zio/src/main/scala/sttp/tapir/server/netty/zio/internal/NettyZioRequestBody.scala b/server/netty-server/zio/src/main/scala/sttp/tapir/server/netty/zio/internal/NettyZioRequestBody.scala index ce582e4463..d32853f0f7 100644 --- a/server/netty-server/zio/src/main/scala/sttp/tapir/server/netty/zio/internal/NettyZioRequestBody.scala +++ b/server/netty-server/zio/src/main/scala/sttp/tapir/server/netty/zio/internal/NettyZioRequestBody.scala @@ -26,7 +26,8 @@ private[zio] class NettyZioRequestBody[Env]( override def publisherToMultipart( nettyRequest: StreamedHttpRequest, serverRequest: ServerRequest, - m: RawBodyType.MultipartBody + m: RawBodyType.MultipartBody, + maxBytes: Option[Long] ): RIO[Env, RawValue[Seq[RawPart]]] = ZIO.die(new UnsupportedOperationException("Multipart requests not supported.")) override def writeBytesToFile(bytes: Array[Byte], file: File): RIO[Env, Unit] = ZIO.die(new UnsupportedOperationException) From 86d3b2502076e2828989e204b745545319d241e1 Mon Sep 17 00:00:00 2001 From: kciesielski Date: Tue, 16 Jul 2024 14:16:33 +0200 Subject: [PATCH 7/9] Test chunked multipart requests --- .../server/akkahttp/AkkaHttpServerTest.scala | 4 ++- .../cats/internal/NettyCatsRequestBody.scala | 8 ++--- .../pekkohttp/PekkoHttpServerTest.scala | 3 +- .../tapir/server/play/PlayServerTest.scala | 3 +- .../tapir/server/play/PlayServerTest.scala | 3 +- .../server/tests/ServerMultipartTests.scala | 35 ++++++++++++++++++- 6 files changed, 47 insertions(+), 9 deletions(-) diff --git a/server/akka-http-server/src/test/scala/sttp/tapir/server/akkahttp/AkkaHttpServerTest.scala b/server/akka-http-server/src/test/scala/sttp/tapir/server/akkahttp/AkkaHttpServerTest.scala index db589bce84..2bec61bae1 100644 --- a/server/akka-http-server/src/test/scala/sttp/tapir/server/akkahttp/AkkaHttpServerTest.scala +++ b/server/akka-http-server/src/test/scala/sttp/tapir/server/akkahttp/AkkaHttpServerTest.scala @@ -155,7 +155,9 @@ class AkkaHttpServerTest extends TestSuite with EitherValues { def drainAkka(stream: AkkaStreams.BinaryStream): Future[Unit] = stream.runWith(Sink.ignore).map(_ => ()) - new AllServerTests(createServerTest, interpreter, backend).tests() ++ + new AllServerTests(createServerTest, interpreter, backend, multipart = false).tests() ++ + new ServerMultipartTests(createServerTest, chunkingSupport = false) + .tests() ++ // chunking disabled, akka-http rejects content-length with transfer-encoding new ServerStreamingTests(createServerTest).tests(AkkaStreams)(drainAkka) ++ new ServerWebSocketTests( createServerTest, diff --git a/server/netty-server/cats/src/main/scala/sttp/tapir/server/netty/cats/internal/NettyCatsRequestBody.scala b/server/netty-server/cats/src/main/scala/sttp/tapir/server/netty/cats/internal/NettyCatsRequestBody.scala index 854f73c596..bf6db38e0e 100644 --- a/server/netty-server/cats/src/main/scala/sttp/tapir/server/netty/cats/internal/NettyCatsRequestBody.scala +++ b/server/netty-server/cats/src/main/scala/sttp/tapir/server/netty/cats/internal/NettyCatsRequestBody.scala @@ -22,6 +22,7 @@ import sttp.tapir.{RawBodyType, RawPart, TapirFile} import java.io.File + private[cats] class NettyCatsRequestBody[F[_]: Async]( val createFile: ServerRequest => F[TapirFile], val streamCompatible: StreamCompatible[Fs2Streams[F]] @@ -51,13 +52,12 @@ private[cats] class NettyCatsRequestBody[F[_]: Async]( monad .blocking { // this operation is the one that does potential I/O (writing files) - // TODO not thread-safe? (visibility of internal state changes?) decoder.offer(httpContent) var processedBytesAndContentBytes = processedBytesNum - + val parts = Stream - .continually(if (decoder.hasNext) { - val next = decoder.next() + .continually(if (decoder.hasNext) { + val next = decoder.next() processedBytesAndContentBytes = processedBytesAndContentBytes + next.asInstanceOf[HttpData].length() maxBytes.foreach { max => if (max < processedBytesAndContentBytes) { diff --git a/server/pekko-http-server/src/test/scala/sttp/tapir/server/pekkohttp/PekkoHttpServerTest.scala b/server/pekko-http-server/src/test/scala/sttp/tapir/server/pekkohttp/PekkoHttpServerTest.scala index edf58cb4db..2690a45dc7 100644 --- a/server/pekko-http-server/src/test/scala/sttp/tapir/server/pekkohttp/PekkoHttpServerTest.scala +++ b/server/pekko-http-server/src/test/scala/sttp/tapir/server/pekkohttp/PekkoHttpServerTest.scala @@ -103,7 +103,8 @@ class PekkoHttpServerTest extends TestSuite with EitherValues { def drainPekko(stream: PekkoStreams.BinaryStream): Future[Unit] = stream.runWith(Sink.ignore).map(_ => ()) - new AllServerTests(createServerTest, interpreter, backend).tests() ++ + new AllServerTests(createServerTest, interpreter, backend, multipart = false).tests() ++ + new ServerMultipartTests(createServerTest, chunkingSupport = false).tests() ++ // chunking disabled, pekko-http rejects content-length with transfer-encoding new ServerStreamingTests(createServerTest).tests(PekkoStreams)(drainPekko) ++ new ServerWebSocketTests( createServerTest, diff --git a/server/play-server/src/test/scala/sttp/tapir/server/play/PlayServerTest.scala b/server/play-server/src/test/scala/sttp/tapir/server/play/PlayServerTest.scala index 7ef2dc432d..8569a33649 100644 --- a/server/play-server/src/test/scala/sttp/tapir/server/play/PlayServerTest.scala +++ b/server/play-server/src/test/scala/sttp/tapir/server/play/PlayServerTest.scala @@ -111,7 +111,8 @@ class PlayServerTest extends TestSuite { inputStreamSupport = false, invulnerableToUnsanitizedHeaders = false ).tests() ++ - new ServerMultipartTests(createServerTest, partOtherHeaderSupport = false).tests() ++ + // chunking disabled, akka-http rejects content-length with transfer-encoding + new ServerMultipartTests(createServerTest, partOtherHeaderSupport = false, chunkingSupport = false).tests() ++ new AllServerTests( createServerTest, interpreter, diff --git a/server/play29-server/src/test/scala/sttp/tapir/server/play/PlayServerTest.scala b/server/play29-server/src/test/scala/sttp/tapir/server/play/PlayServerTest.scala index c25bd41529..a15e9e4411 100644 --- a/server/play29-server/src/test/scala/sttp/tapir/server/play/PlayServerTest.scala +++ b/server/play29-server/src/test/scala/sttp/tapir/server/play/PlayServerTest.scala @@ -110,7 +110,8 @@ class PlayServerTest extends TestSuite { inputStreamSupport = false, invulnerableToUnsanitizedHeaders = false ).tests() ++ - new ServerMultipartTests(createServerTest, partOtherHeaderSupport = false).tests() ++ + // chunking disabled, akka-http rejects content-length with transfer-encoding + new ServerMultipartTests(createServerTest, partOtherHeaderSupport = false, chunkingSupport = false).tests() ++ new AllServerTests(createServerTest, interpreter, backend, basic = false, multipart = false, options = false).tests() ++ new ServerStreamingTests(createServerTest).tests(AkkaStreams)(drainAkka) ++ new PlayServerWithContextTest(backend).tests() ++ diff --git a/server/tests/src/main/scala/sttp/tapir/server/tests/ServerMultipartTests.scala b/server/tests/src/main/scala/sttp/tapir/server/tests/ServerMultipartTests.scala index b59b49a406..047392d387 100644 --- a/server/tests/src/main/scala/sttp/tapir/server/tests/ServerMultipartTests.scala +++ b/server/tests/src/main/scala/sttp/tapir/server/tests/ServerMultipartTests.scala @@ -27,13 +27,16 @@ class ServerMultipartTests[F[_], OPTIONS, ROUTE]( partContentTypeHeaderSupport: Boolean = true, partOtherHeaderSupport: Boolean = true, maxContentLengthSupport: Boolean = true, + chunkingSupport: Boolean = true, multipartResponsesSupport: Boolean = true )(implicit m: MonadError[F]) { import createServerTest._ def tests(): List[Test] = basicTests() ++ (if (partContentTypeHeaderSupport) contentTypeHeaderTests() else Nil) ++ - (if (maxContentLengthSupport) maxContentLengthTests() else Nil) ++ (if (multipartResponsesSupport) multipartResponsesTests() else Nil) + (if (maxContentLengthSupport) maxContentLengthTests() else Nil) ++ (if (multipartResponsesSupport) multipartResponsesTests() + else + Nil) ++ (if (chunkingSupport) chunkedMultipartTests() else Nil) def maxContentLengthTests(): List[Test] = List( testServer( @@ -122,6 +125,36 @@ class ServerMultipartTests[F[_], OPTIONS, ROUTE]( ) } + def chunkedMultipartTests() = List( + testServer(in_raw_multipart_out_string, "chunked multipart attribute")((parts: Seq[Part[Array[Byte]]]) => + pureResult( + parts.map(part => s"${part.name}:${new String(part.body)}").mkString("\n__\n").asRight[Unit] + ) + ) { (backend, baseUri) => + val testBody = "61\r\n--boundary123\r\n" + + "Content-Disposition: form-data; name=\"attr1\"\r\n" + + "Content-Type: text/plain\r\n" + + "\r\nValue1\r\n" + + "\r\n47\r\n--boundary123\r\n" + // 15 + "Content-Disposition: form-data; name=\"attr2\"\r\n" + // 46 + "\r\nPart1 of\r\n" + // 10 + "1E\r\n Attr2 Value\r\n" + + "--boundary123--\r\n\r\n" + + "0\r\n\r\n" + basicStringRequest + .post(uri"$baseUri/api/echo/multipart") + .header("Content-Type", "multipart/form-data; boundary=boundary123") + .header("Transfer-Encoding", "chunked") + .body(testBody) + .send(backend) + .map { r => + r.code shouldBe StatusCode.Ok + println(r.body) + r.body should be("attr1:Value1\n__\nattr2:Part1 of Attr2 Value") + } + } + ) + def multipartResponsesTests() = List( testServer(in_simple_multipart_out_multipart)((fa: FruitAmount) => pureResult(FruitAmount(fa.fruit + " apple", fa.amount * 2).asRight[Unit]) From 6e9b395b68f129251f48e1062e077891d0875e68 Mon Sep 17 00:00:00 2001 From: kciesielski Date: Tue, 16 Jul 2024 16:37:32 +0200 Subject: [PATCH 8/9] Count bytes before sending data to decoder --- .../cats/internal/NettyCatsRequestBody.scala | 21 +++++++++++-------- 1 file changed, 12 insertions(+), 9 deletions(-) diff --git a/server/netty-server/cats/src/main/scala/sttp/tapir/server/netty/cats/internal/NettyCatsRequestBody.scala b/server/netty-server/cats/src/main/scala/sttp/tapir/server/netty/cats/internal/NettyCatsRequestBody.scala index bf6db38e0e..e9d177bfff 100644 --- a/server/netty-server/cats/src/main/scala/sttp/tapir/server/netty/cats/internal/NettyCatsRequestBody.scala +++ b/server/netty-server/cats/src/main/scala/sttp/tapir/server/netty/cats/internal/NettyCatsRequestBody.scala @@ -51,25 +51,28 @@ private[cats] class NettyCatsRequestBody[F[_]: Async]( })({ case ((decoder, processedBytesNum), httpContent) => monad .blocking { + val newProcessedBytes = if (httpContent.content() != null) { + val processedBytesAndContentBytes = processedBytesNum + httpContent.content().readableBytes() + maxBytes.foreach { max => + if (max < processedBytesAndContentBytes) { + throw new StreamMaxLengthExceededException(max) + } + } + processedBytesAndContentBytes + } else processedBytesNum + // this operation is the one that does potential I/O (writing files) decoder.offer(httpContent) - var processedBytesAndContentBytes = processedBytesNum - val parts = Stream .continually(if (decoder.hasNext) { val next = decoder.next() - processedBytesAndContentBytes = processedBytesAndContentBytes + next.asInstanceOf[HttpData].length() - maxBytes.foreach { max => - if (max < processedBytesAndContentBytes) { - throw new StreamMaxLengthExceededException(max) - } - } next } else null) .takeWhile(_ != null) .toVector + ( - (decoder, processedBytesAndContentBytes), + (decoder, newProcessedBytes), parts ) } From 4567780eeb19b502a625278457da55648617afeb Mon Sep 17 00:00:00 2001 From: kciesielski Date: Tue, 16 Jul 2024 16:38:25 +0200 Subject: [PATCH 9/9] Update tests --- .../armeria/cats/ArmeriaCatsServerTest.scala | 5 +- .../armeria/ArmeriaFutureServerTest.scala | 5 +- .../armeria/zio/ArmeriaZioServerTest.scala | 5 +- .../server/jdkhttp/JdkHttpServerTest.scala | 4 +- .../tapir/server/play/PlayServerTest.scala | 2 +- .../tapir/server/play/PlayServerTest.scala | 2 +- .../server/tests/ServerMultipartTests.scala | 67 ++++++++++++++++--- .../scala/sttp/tapir/tests/Multipart.scala | 3 + 8 files changed, 76 insertions(+), 17 deletions(-) diff --git a/server/armeria-server/cats/src/test/scala/sttp/tapir/server/armeria/cats/ArmeriaCatsServerTest.scala b/server/armeria-server/cats/src/test/scala/sttp/tapir/server/armeria/cats/ArmeriaCatsServerTest.scala index 4181406515..0eada6fffc 100644 --- a/server/armeria-server/cats/src/test/scala/sttp/tapir/server/armeria/cats/ArmeriaCatsServerTest.scala +++ b/server/armeria-server/cats/src/test/scala/sttp/tapir/server/armeria/cats/ArmeriaCatsServerTest.scala @@ -16,7 +16,10 @@ class ArmeriaCatsServerTest extends TestSuite { def drainFs2(stream: Fs2Streams[IO]#BinaryStream): IO[Unit] = stream.compile.drain.void - new AllServerTests(createServerTest, interpreter, backend, basic = false, options = false, maxContentLength = false).tests() ++ + new AllServerTests(createServerTest, interpreter, backend, basic = false, options = false, maxContentLength = false, multipart = false) + .tests() ++ + new ServerMultipartTests(createServerTest, chunkingSupport = false) + .tests() ++ // chunking disabled, Armeria rejects content-length with transfer-encoding new ServerBasicTests(createServerTest, interpreter, supportsUrlEncodedPathSegments = false, maxContentLength = false).tests() ++ new ServerStreamingTests(createServerTest).tests(Fs2Streams[IO])(drainFs2) } diff --git a/server/armeria-server/src/test/scala/sttp/tapir/server/armeria/ArmeriaFutureServerTest.scala b/server/armeria-server/src/test/scala/sttp/tapir/server/armeria/ArmeriaFutureServerTest.scala index 3c7d99de62..f6c43cbfa5 100644 --- a/server/armeria-server/src/test/scala/sttp/tapir/server/armeria/ArmeriaFutureServerTest.scala +++ b/server/armeria-server/src/test/scala/sttp/tapir/server/armeria/ArmeriaFutureServerTest.scala @@ -15,7 +15,10 @@ class ArmeriaFutureServerTest extends TestSuite { val interpreter = new ArmeriaTestFutureServerInterpreter() val createServerTest = new DefaultCreateServerTest(backend, interpreter) - new AllServerTests(createServerTest, interpreter, backend, basic = false, options = false, maxContentLength = false).tests() ++ + new AllServerTests(createServerTest, interpreter, backend, basic = false, options = false, maxContentLength = false, multipart = false) + .tests() ++ + new ServerMultipartTests(createServerTest, chunkingSupport = false) + .tests() ++ // chunking disabled, Armeria rejects content-length with transfer-encoding new ServerBasicTests(createServerTest, interpreter, supportsUrlEncodedPathSegments = false, maxContentLength = false).tests() ++ new ServerStreamingTests(createServerTest, maxLengthSupported = false).tests(ArmeriaStreams)(_ => Future.unit) } diff --git a/server/armeria-server/zio/src/test/scala/sttp/tapir/server/armeria/zio/ArmeriaZioServerTest.scala b/server/armeria-server/zio/src/test/scala/sttp/tapir/server/armeria/zio/ArmeriaZioServerTest.scala index 1b00ad09ef..61031e63b0 100644 --- a/server/armeria-server/zio/src/test/scala/sttp/tapir/server/armeria/zio/ArmeriaZioServerTest.scala +++ b/server/armeria-server/zio/src/test/scala/sttp/tapir/server/armeria/zio/ArmeriaZioServerTest.scala @@ -20,7 +20,10 @@ class ArmeriaZioServerTest extends TestSuite { def drainZStream(zStream: ZioStreams.BinaryStream): Task[Unit] = zStream.run(ZSink.drain) - new AllServerTests(createServerTest, interpreter, backend, basic = false, options = false, maxContentLength = false).tests() ++ + new AllServerTests(createServerTest, interpreter, backend, basic = false, options = false, maxContentLength = false, multipart = false) + .tests() ++ + new ServerMultipartTests(createServerTest, chunkingSupport = false) + .tests() ++ // chunking disabled, Armeria rejects content-length with transfer-encoding new ServerBasicTests(createServerTest, interpreter, supportsUrlEncodedPathSegments = false, maxContentLength = false).tests() ++ new ServerStreamingTests(createServerTest).tests(ZioStreams)(drainZStream) } diff --git a/server/jdkhttp-server/src/test/scala/sttp/tapir/server/jdkhttp/JdkHttpServerTest.scala b/server/jdkhttp-server/src/test/scala/sttp/tapir/server/jdkhttp/JdkHttpServerTest.scala index 5e3f42cfd4..258c3775d9 100644 --- a/server/jdkhttp-server/src/test/scala/sttp/tapir/server/jdkhttp/JdkHttpServerTest.scala +++ b/server/jdkhttp-server/src/test/scala/sttp/tapir/server/jdkhttp/JdkHttpServerTest.scala @@ -15,7 +15,9 @@ class JdkHttpServerTest extends TestSuite with EitherValues { val createServerTest = new DefaultCreateServerTest(backend, interpreter) new ServerBasicTests(createServerTest, interpreter, invulnerableToUnsanitizedHeaders = false).tests() ++ - new AllServerTests(createServerTest, interpreter, backend, basic = false).tests() + new ServerMultipartTests(createServerTest, chunkingSupport = false) + .tests() ++ // chunking disabled, backend rejects content-length with transfer-encoding + new AllServerTests(createServerTest, interpreter, backend, basic = false, multipart = false).tests() }) } } diff --git a/server/play-server/src/test/scala/sttp/tapir/server/play/PlayServerTest.scala b/server/play-server/src/test/scala/sttp/tapir/server/play/PlayServerTest.scala index 8569a33649..e9f897e9e0 100644 --- a/server/play-server/src/test/scala/sttp/tapir/server/play/PlayServerTest.scala +++ b/server/play-server/src/test/scala/sttp/tapir/server/play/PlayServerTest.scala @@ -111,7 +111,7 @@ class PlayServerTest extends TestSuite { inputStreamSupport = false, invulnerableToUnsanitizedHeaders = false ).tests() ++ - // chunking disabled, akka-http rejects content-length with transfer-encoding + // chunking disabled, Play rejects content-length with transfer-encoding new ServerMultipartTests(createServerTest, partOtherHeaderSupport = false, chunkingSupport = false).tests() ++ new AllServerTests( createServerTest, diff --git a/server/play29-server/src/test/scala/sttp/tapir/server/play/PlayServerTest.scala b/server/play29-server/src/test/scala/sttp/tapir/server/play/PlayServerTest.scala index a15e9e4411..f2a1921cbd 100644 --- a/server/play29-server/src/test/scala/sttp/tapir/server/play/PlayServerTest.scala +++ b/server/play29-server/src/test/scala/sttp/tapir/server/play/PlayServerTest.scala @@ -110,7 +110,7 @@ class PlayServerTest extends TestSuite { inputStreamSupport = false, invulnerableToUnsanitizedHeaders = false ).tests() ++ - // chunking disabled, akka-http rejects content-length with transfer-encoding + // chunking disabled, Play rejects content-length with transfer-encoding new ServerMultipartTests(createServerTest, partOtherHeaderSupport = false, chunkingSupport = false).tests() ++ new AllServerTests(createServerTest, interpreter, backend, basic = false, multipart = false, options = false).tests() ++ new ServerStreamingTests(createServerTest).tests(AkkaStreams)(drainAkka) ++ diff --git a/server/tests/src/main/scala/sttp/tapir/server/tests/ServerMultipartTests.scala b/server/tests/src/main/scala/sttp/tapir/server/tests/ServerMultipartTests.scala index 047392d387..f2a15133b8 100644 --- a/server/tests/src/main/scala/sttp/tapir/server/tests/ServerMultipartTests.scala +++ b/server/tests/src/main/scala/sttp/tapir/server/tests/ServerMultipartTests.scala @@ -7,13 +7,7 @@ import sttp.model.{Part, StatusCode} import sttp.monad.MonadError import sttp.tapir._ import sttp.tapir.generic.auto._ -import sttp.tapir.tests.Multipart.{ - in_file_list_multipart_out_multipart, - in_file_multipart_out_multipart, - in_raw_multipart_out_string, - in_simple_multipart_out_multipart, - in_simple_multipart_out_string -} +import sttp.tapir.tests.Multipart._ import sttp.tapir.tests.TestUtil.{readFromFile, writeToFile} import sttp.tapir.tests.data.{DoubleFruit, FruitAmount, FruitData} import sttp.tapir.tests.{MultipleFileUpload, Test, data} @@ -21,6 +15,10 @@ import sttp.tapir.server.model.EndpointExtensions._ import scala.concurrent.Await import scala.concurrent.duration.DurationInt +import java.io.File +import fs2.io.file.Files +import cats.effect.IO +import fs2.io.file.Path class ServerMultipartTests[F[_], OPTIONS, ROUTE]( createServerTest: CreateServerTest[F, Any, OPTIONS, ROUTE], @@ -121,6 +119,54 @@ class ServerMultipartTests[F[_], OPTIONS, ROUTE]( r.code shouldBe StatusCode.Ok r.body should be("firstPart:BODYONE\r\n--AA\n__\nsecondPart:BODYTWO") } + }, + testServer(in_file_multipart_out_string, "simple file multipart body")((fd: FruitData) => { + val content = Await.result(readFromFile(fd.data.body), 3.seconds) + pureResult(content.reverse.asRight[Unit]) + }) { (backend, baseUri) => + val file = writeToFile("peach2 mario2") + basicStringRequest + .post(uri"$baseUri/api/echo/multipart") + .multipartBody(multipartFile("data", file).fileName("fruit-data7.txt").header("X-Auth", "12Aa")) + .send(backend) + .map { r => + r.code shouldBe StatusCode.Ok + r.body shouldBe "2oiram 2hcaep" + } + }, + testServer(in_file_multipart_out_string, "large file multipart body")((fd: FruitData) => { + val fileSize = fd.data.body.length() // FIXME is 0, because decoder.destroy() removes the file + pureResult(fileSize.toString.asRight[Unit]) + }) { (backend, baseUri) => + val file = File.createTempFile("test", "tapir") + file.deleteOnExit() + fs2.Stream + .constant[IO, Byte]('x') + .take(5 * 1024 * 1024) + .through(Files.forAsync[IO].writeAll(Path.fromNioPath(file.toPath))) + .compile + .drain >> + basicStringRequest + .post(uri"$baseUri/api/echo/multipart") + .multipartBody(multipartFile("data", file).fileName("fruit-data8.txt")) + .send(backend) + .map { r => + r.code shouldBe StatusCode.Ok + r.body shouldBe "5242880" + } + }, + testServer(in_file_multipart_out_string, "file from a multipart attribute")((fd: FruitData) => { + val content = Await.result(readFromFile(fd.data.body), 3.seconds) + pureResult(content.reverse.asRight[Unit]) + }) { (backend, baseUri) => + basicStringRequest + .post(uri"$baseUri/api/echo/multipart") + .multipartBody(multipart("data", "peach3 mario3")) + .send(backend) + .map { r => + // r.code shouldBe StatusCode.Ok + r.body shouldBe "3oiram 3hcaep" + } } ) } @@ -135,9 +181,9 @@ class ServerMultipartTests[F[_], OPTIONS, ROUTE]( "Content-Disposition: form-data; name=\"attr1\"\r\n" + "Content-Type: text/plain\r\n" + "\r\nValue1\r\n" + - "\r\n47\r\n--boundary123\r\n" + // 15 - "Content-Disposition: form-data; name=\"attr2\"\r\n" + // 46 - "\r\nPart1 of\r\n" + // 10 + "\r\n47\r\n--boundary123\r\n" + + "Content-Disposition: form-data; name=\"attr2\"\r\n" + + "\r\nPart1 of\r\n" + "1E\r\n Attr2 Value\r\n" + "--boundary123--\r\n\r\n" + "0\r\n\r\n" @@ -149,7 +195,6 @@ class ServerMultipartTests[F[_], OPTIONS, ROUTE]( .send(backend) .map { r => r.code shouldBe StatusCode.Ok - println(r.body) r.body should be("attr1:Value1\n__\nattr2:Part1 of Attr2 Value") } } diff --git a/tests/src/main/scala/sttp/tapir/tests/Multipart.scala b/tests/src/main/scala/sttp/tapir/tests/Multipart.scala index 7d41320cd2..38cd060379 100644 --- a/tests/src/main/scala/sttp/tapir/tests/Multipart.scala +++ b/tests/src/main/scala/sttp/tapir/tests/Multipart.scala @@ -23,6 +23,9 @@ object Multipart { val in_file_multipart_out_multipart: PublicEndpoint[FruitData, Unit, FruitData, Any] = endpoint.post.in("api" / "echo" / "multipart").in(multipartBody[FruitData]).out(multipartBody[FruitData]).name("echo file") + val in_file_multipart_out_string: PublicEndpoint[FruitData, Unit, String, Any] = + endpoint.post.in("api" / "echo" / "multipart").in(multipartBody[FruitData]).out(stringBody) + val in_file_list_multipart_out_multipart: PublicEndpoint[MultipleFileUpload, Unit, MultipleFileUpload, Any] = endpoint.post .in("api" / "echo" / "multipart")