diff --git a/README.md b/README.md index d833838..0473484 100644 --- a/README.md +++ b/README.md @@ -70,6 +70,7 @@ baldinof_road_runner: kernel_reboot: strategy: on_exception allowed_exceptions: + - Spiral\RoadRunner\Http\Exception\StreamStoppedException - Symfony\Component\HttpKernel\Exception\HttpExceptionInterface - Symfony\Component\Serializer\Exception\ExceptionInterface - App\Exception\YourDomainException @@ -112,6 +113,11 @@ baldinof_road_runner: max_jobs_dispersion: 0.2 ``` +## StreamedResponse + +Replace your `StreamedResponse`s with `Baldinof\RoadRunnerBundle\Http\Response\StreamedResponse`. The only difference is +that the `$callback` should be a `\Generator`. If you don't, the Symfony's Streamed Response will be loaded completely to memory +before it's sent! ## Events diff --git a/src/Helpers/StreamedJsonResponseHelper.php b/src/Helpers/StreamedJsonResponseHelper.php new file mode 100644 index 0000000..ae3de00 --- /dev/null +++ b/src/Helpers/StreamedJsonResponseHelper.php @@ -0,0 +1,131 @@ + $generator) { + // send first and between parts of the structure + yield $jsonParts[$index]; + + foreach (self::streamData($generator, $jsonEncodingOptions, $keyEncodingOptions, $placeholder) as $child) { + yield $child; + } + } + + // send last part of the structure + yield $jsonParts[array_key_last($jsonParts)]; + } + + private static function streamIterable(iterable $iterable, int $jsonEncodingOptions, int $keyEncodingOptions, string $placeholder): \Generator + { + $isFirstItem = true; + $startTag = '['; + + foreach ($iterable as $key => $item) { + if ($isFirstItem) { + $isFirstItem = false; + // depending on the first elements key the generator is detected as a list or map + // we can not check for a whole list or map because that would hurt the performance + // of the streamed response which is the main goal of this response class + if (0 !== $key) { + $startTag = '{'; + } + + yield $startTag; + } else { + // if not first element of the generic, a separator is required between the elements + yield ','; + } + + if ('{' === $startTag) { + yield json_encode((string)$key, $keyEncodingOptions) . ':'; + } + + foreach (self::streamData($item, $jsonEncodingOptions, $keyEncodingOptions, $placeholder) as $child) { + yield $child; + } + } + + if ($isFirstItem) { // indicates that the generator was empty + yield '['; + } + + yield '[' === $startTag ? ']' : '}'; + } + + private static function getStreamedJsonResponseParameterExtractor(): \Closure + { + return self::$streamedJsonResponseParameterExtractor ?? (self::$streamedJsonResponseParameterExtractor = \Closure::bind(static fn(StreamedJsonResponse $binaryFileResponse) => [ + $binaryFileResponse->data, + $binaryFileResponse->encodingOptions, + ], null, StreamedJsonResponse::class)); + } +} \ No newline at end of file diff --git a/src/Http/Response/StreamedResponse.php b/src/Http/Response/StreamedResponse.php new file mode 100644 index 0000000..6f4c844 --- /dev/null +++ b/src/Http/Response/StreamedResponse.php @@ -0,0 +1,95 @@ +setCallback($callback); + } + } + + public function setCallback(\Closure|\Generator $callback): static + { + $this->callback = $callback(...); + + return $this; + } + + public function getCallback(): ?\Closure + { + if (!isset($this->callback)) { + return null; + } + + return ($this->callback)(...); + } + + public function sendHeaders(int $statusCode = null): static + { + if ($this->headersSent) { + return $this; + } + + if ($statusCode < 100 || $statusCode >= 200) { + $this->headersSent = true; + } + + return parent::sendHeaders($statusCode); + } + + public function sendContent(): static + { + if ($this->streamed) { + return $this; + } + + $this->streamed = true; + + if (!isset($this->callback)) { + throw new \LogicException('The Response callback must be set.'); + } + + $callback = ($this->callback)(); + if($callback instanceof \Generator) { + foreach ($callback as $value) { + echo $value; + } + } else { + echo $callback; + } + + return $this; + } + + public function setContent(?string $content): static + { + if (null !== $content) { + throw new \LogicException('The content cannot be set on a StreamedResponse instance.'); + } + + $this->streamed = true; + + return $this; + } + + public function getContent(): string|false + { + return false; + } +} diff --git a/src/RoadRunnerBridge/HttpFoundationWorker.php b/src/RoadRunnerBridge/HttpFoundationWorker.php index 148493e..ba2f5e8 100644 --- a/src/RoadRunnerBridge/HttpFoundationWorker.php +++ b/src/RoadRunnerBridge/HttpFoundationWorker.php @@ -4,6 +4,9 @@ namespace Baldinof\RoadRunnerBundle\RoadRunnerBridge; +use Baldinof\RoadRunnerBundle\Helpers\StreamedJsonResponseHelper; +use Baldinof\RoadRunnerBundle\Http\Response\StreamedResponse; +use Spiral\RoadRunner\Http\Exception\StreamStoppedException; use Spiral\RoadRunner\Http\HttpWorkerInterface; use Spiral\RoadRunner\Http\Request as RoadRunnerRequest; use Spiral\RoadRunner\WorkerInterface; @@ -11,17 +14,25 @@ use Symfony\Component\HttpFoundation\File\UploadedFile; use Symfony\Component\HttpFoundation\Request as SymfonyRequest; use Symfony\Component\HttpFoundation\Response as SymfonyResponse; -use Symfony\Component\HttpFoundation\StreamedResponse; +use Symfony\Component\HttpFoundation\StreamedJsonResponse; final class HttpFoundationWorker implements HttpFoundationWorkerInterface { private HttpWorkerInterface $httpWorker; private array $originalServer; + private \Closure $binaryFileResponseParameterExtractor; public function __construct(HttpWorkerInterface $httpWorker) { $this->httpWorker = $httpWorker; $this->originalServer = $_SERVER; + + $this->binaryFileResponseParameterExtractor = \Closure::bind(static fn(BinaryFileResponse $binaryFileResponse) => [ + $binaryFileResponse->maxlen, + $binaryFileResponse->offset, + $binaryFileResponse->chunkSize, + $binaryFileResponse->deleteFileAfterSend + ], null, BinaryFileResponse::class); } public function waitRequest(): ?SymfonyRequest @@ -35,32 +46,22 @@ public function waitRequest(): ?SymfonyRequest return $this->toSymfonyRequest($rrRequest); } - public function respond(SymfonyResponse $symfonyResponse): void + public function respond(SymfonyResponse $response): void { - if ($symfonyResponse instanceof BinaryFileResponse && !$symfonyResponse->headers->has('Content-Range')) { - $content = file_get_contents($symfonyResponse->getFile()->getPathname()); - if ($content === false) { - throw new \RuntimeException(sprintf("Cannot read file '%s'", $symfonyResponse->getFile()->getPathname())); // TODO: custom error - } - } else { - if ($symfonyResponse instanceof StreamedResponse || $symfonyResponse instanceof BinaryFileResponse) { - $content = ''; - ob_start(function ($buffer) use (&$content) { - $content .= $buffer; + $content = match (true) { + $response instanceof StreamedJsonResponse => StreamedJsonResponseHelper::toGenerator($response), + $response instanceof StreamedResponse => $response->getCallback(), + $response instanceof BinaryFileResponse => $this->createFileStreamGenerator($response), + default => $this->createDefaultContentGetter($response), + }; - return ''; - }); - - $symfonyResponse->sendContent(); - ob_end_clean(); - } else { - $content = (string) $symfonyResponse->getContent(); - } - } + $headers = $this->stringifyHeaders($response->headers->all()); - $headers = $this->stringifyHeaders($symfonyResponse->headers->all()); + try { + $this->httpWorker->respond($response->getStatusCode(), $content, $headers); + } catch (StreamStoppedException){ - $this->httpWorker->respond($symfonyResponse->getStatusCode(), $content, $headers); + } } public function getWorker(): WorkerInterface @@ -112,7 +113,7 @@ private function configureServer(RoadRunnerRequest $request): array $server['REQUEST_URI'] = $components['path'] ?? ''; if (isset($components['query']) && $components['query'] !== '') { $server['QUERY_STRING'] = $components['query']; - $server['REQUEST_URI'] .= '?'.$components['query']; + $server['REQUEST_URI'] .= '?' . $components['query']; } if (isset($components['scheme']) && $components['scheme'] === 'https') { @@ -131,7 +132,7 @@ private function configureServer(RoadRunnerRequest $request): array if (\in_array($key, ['CONTENT_TYPE', 'CONTENT_LENGTH'])) { $server[$key] = implode(', ', $value); } else { - $server['HTTP_'.$key] = implode(', ', $value); + $server['HTTP_' . $key] = implode(', ', $value); } } @@ -188,7 +189,74 @@ private function timeFloat(): float private function stringifyHeaders(array $headers): array { return array_map(static function ($headerValues) { - return array_map(static fn ($val) => (string) $val, (array) $headerValues); + return array_map(static fn($val) => (string)$val, (array)$headerValues); }, $headers); } + + /** + * Basically a copy of BinaryFileResponse->sendContent() + * @param BinaryFileResponse $response + * @return \Generator + */ + private function createFileStreamGenerator(BinaryFileResponse $response): \Generator + { + $extractor = $this->binaryFileResponseParameterExtractor; + + [$maxlen, $offset, $chunkSize, $deleteFileAfterSend] = $extractor($response); + + try { + if (!$response->isSuccessful()) { + return; + } + + $file = fopen($response->getFile()->getPathname(), "r"); + + if ($maxlen === 0) { + return; + } + + if ($offset !== 0) { + fseek($file, $offset); + } + + $length = $maxlen; + while ($length && !feof($file)) { + $read = $length > $chunkSize || 0 > $length ? $chunkSize : $length; + + if (false === $data = fread($file, $read)) { + break; + } + + while ('' !== $data) { + try { + yield $data; + } catch (StreamStoppedException) { + break 2; + } + + if (0 < $length) { + $length -= $read; + } + $data = substr($data, $read); + } + } + + fclose($file); + } finally { + if ($deleteFileAfterSend && is_file($response->getFile()->getPathname())) { + unlink($response->getFile()->getPathname()); + } + } + } + + /** + * @param SymfonyResponse $response + * @return \Generator + */ + private function createDefaultContentGetter(SymfonyResponse $response): \Generator + { + ob_start(); + $response->sendContent(); + yield ob_get_clean(); + } }