Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat: support RR[>=2023.3] streamed responses #130

Open
wants to merge 4 commits into
base: 3.x
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,9 @@ baldinof_road_runner:
max_jobs_dispersion: 0.2
```

## StreamedResponse

`$callback` should now return `\Generator` to be really streamed. Replace all `echo` (etc) with `yield`. Fallback simply loads all content to memory!

## Events

Expand Down
124 changes: 124 additions & 0 deletions src/Helpers/StreamedJsonResponseHelper.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
<?php

namespace Baldinof\RoadRunnerBundle\Helpers;

use Symfony\Component\HttpFoundation\StreamedJsonResponse;

// Basically copy of Symfony\Component\HttpFoundation\StreamedJsonResponse
// but adds `yield`ing, instead of `echo`s
class StreamedJsonResponseHelper
{
public static function toGenerator(StreamedJsonResponse $response): \Generator
{
$ref = new \ReflectionClass($response);

$encodingOptions = $ref->getProperty("encodingOptions")->getValue($response);
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here, I would prefere closure binding to help static analysis.

$data = $ref->getProperty("data")->getValue($response);
$placeholder = $ref->getConstant("PLACEHOLDER");

return self::stream($data, $encodingOptions, $placeholder);
}

private static function stream(iterable $data, int $encodingOptions, string $placeholder): \Generator
{
$jsonEncodingOptions = \JSON_THROW_ON_ERROR | $encodingOptions;
$keyEncodingOptions = $jsonEncodingOptions & ~\JSON_NUMERIC_CHECK;

return self::streamData($data, $jsonEncodingOptions, $keyEncodingOptions, $placeholder);
}

private static function streamData(mixed $data, int $jsonEncodingOptions, int $keyEncodingOptions, string $placeholder): \Generator
{
if (\is_array($data)) {
foreach (self::streamArray($data, $jsonEncodingOptions, $keyEncodingOptions, $placeholder) as $item) {
yield $item;
}

return;
}

if (is_iterable($data) && !$data instanceof \JsonSerializable) {
foreach (self::streamIterable($data, $jsonEncodingOptions, $keyEncodingOptions, $placeholder) as $item) {
yield $item;
}

return;
}

yield json_encode($data, $jsonEncodingOptions);
}

private static function streamArray(array $data, int $jsonEncodingOptions, int $keyEncodingOptions, string $placeholder): \Generator
{
$generators = [];

array_walk_recursive($data, function (&$item, $key) use (&$generators, $placeholder) {
if ($placeholder === $key) {
// if the placeholder is already in the structure it should be replaced with a new one that explode
// works like expected for the structure
$generators[] = $key;
}

// generators should be used but for better DX all kind of Traversable and objects are supported
if (\is_object($item)) {
$generators[] = $item;
$item = $placeholder;
} elseif ($placeholder === $item) {
// if the placeholder is already in the structure it should be replaced with a new one that explode
// works like expected for the structure
$generators[] = $item;
}
});

$jsonParts = explode('"' . $placeholder . '"', json_encode($data, $jsonEncodingOptions));

foreach ($generators as $index => $generator) {
// send first and between parts of the structure
yield $jsonParts[$index];

foreach (self::streamData($generator, $jsonEncodingOptions, $keyEncodingOptions, $placeholder) as $child) {
yield $child;
}
}

// send last part of the structure
yield $jsonParts[array_key_last($jsonParts)];
}

private static function streamIterable(iterable $iterable, int $jsonEncodingOptions, int $keyEncodingOptions, string $placeholder): \Generator
{
$isFirstItem = true;
$startTag = '[';

foreach ($iterable as $key => $item) {
if ($isFirstItem) {
$isFirstItem = false;
// depending on the first elements key the generator is detected as a list or map
// we can not check for a whole list or map because that would hurt the performance
// of the streamed response which is the main goal of this response class
if (0 !== $key) {
$startTag = '{';
}

yield $startTag;
} else {
// if not first element of the generic, a separator is required between the elements
yield ',';
}

if ('{' === $startTag) {
yield json_encode((string)$key, $keyEncodingOptions) . ':';
}

foreach (self::streamData($item, $jsonEncodingOptions, $keyEncodingOptions, $placeholder) as $child) {
yield $child;
}
}

if ($isFirstItem) { // indicates that the generator was empty
yield '[';
}

yield '[' === $startTag ? ']' : '}';
}
}
177 changes: 151 additions & 26 deletions src/RoadRunnerBridge/HttpFoundationWorker.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,18 @@

namespace Baldinof\RoadRunnerBundle\RoadRunnerBridge;

use Baldinof\RoadRunnerBundle\Helpers\StreamedJsonResponseHelper;
use Spiral\RoadRunner\Http\Exception\StreamStoppedException;
use Spiral\RoadRunner\Http\HttpWorkerInterface;
use Spiral\RoadRunner\Http\Request as RoadRunnerRequest;
use Spiral\RoadRunner\WorkerInterface;
use Symfony\Component\HttpFoundation\BinaryFileResponse;
use Symfony\Component\HttpFoundation\File\UploadedFile;
use Symfony\Component\HttpFoundation\Request;
use Symfony\Component\HttpFoundation\Request as SymfonyRequest;
use Symfony\Component\HttpFoundation\RequestStack;
use Symfony\Component\HttpFoundation\Response as SymfonyResponse;
use Symfony\Component\HttpFoundation\StreamedJsonResponse;
use Symfony\Component\HttpFoundation\StreamedResponse;

final class HttpFoundationWorker implements HttpFoundationWorkerInterface
Expand All @@ -35,32 +40,18 @@ public function waitRequest(): ?SymfonyRequest
return $this->toSymfonyRequest($rrRequest);
}

public function respond(SymfonyResponse $symfonyResponse): void
public function respond(SymfonyResponse $response): void
{
if ($symfonyResponse instanceof BinaryFileResponse && !$symfonyResponse->headers->has('Content-Range')) {
$content = file_get_contents($symfonyResponse->getFile()->getPathname());
if ($content === false) {
throw new \RuntimeException(sprintf("Cannot read file '%s'", $symfonyResponse->getFile()->getPathname())); // TODO: custom error
}
} else {
if ($symfonyResponse instanceof StreamedResponse || $symfonyResponse instanceof BinaryFileResponse) {
$content = '';
ob_start(function ($buffer) use (&$content) {
$content .= $buffer;

return '';
});

$symfonyResponse->sendContent();
ob_end_clean();
} else {
$content = (string) $symfonyResponse->getContent();
}
}
$content = match (true) {
$response instanceof StreamedJsonResponse => $this->createStreamedJsonResponseGenerator($response),
$response instanceof StreamedResponse => $this->createStreamedResponseGenerator($response),
$response instanceof BinaryFileResponse => $this->createFileStreamGenerator($response),
default => $this->createDefaultContentGetter($response),
};

$headers = $this->stringifyHeaders($symfonyResponse->headers->all());
$headers = $this->stringifyHeaders($response->headers->all());

$this->httpWorker->respond($symfonyResponse->getStatusCode(), $content, $headers);
$this->httpWorker->respond($response->getStatusCode(), $content(), $headers);
}

public function getWorker(): WorkerInterface
Expand Down Expand Up @@ -112,7 +103,7 @@ private function configureServer(RoadRunnerRequest $request): array
$server['REQUEST_URI'] = $components['path'] ?? '';
if (isset($components['query']) && $components['query'] !== '') {
$server['QUERY_STRING'] = $components['query'];
$server['REQUEST_URI'] .= '?'.$components['query'];
$server['REQUEST_URI'] .= '?' . $components['query'];
}

if (isset($components['scheme']) && $components['scheme'] === 'https') {
Expand All @@ -131,7 +122,7 @@ private function configureServer(RoadRunnerRequest $request): array
if (\in_array($key, ['CONTENT_TYPE', 'CONTENT_LENGTH'])) {
$server[$key] = implode(', ', $value);
} else {
$server['HTTP_'.$key] = implode(', ', $value);
$server['HTTP_' . $key] = implode(', ', $value);
}
}

Expand Down Expand Up @@ -188,7 +179,141 @@ private function timeFloat(): float
private function stringifyHeaders(array $headers): array
{
return array_map(static function ($headerValues) {
return array_map(static fn ($val) => (string) $val, (array) $headerValues);
return array_map(static fn($val) => (string)$val, (array)$headerValues);
}, $headers);
}

/**
* Basically a copy of BinaryFileResponse->sendContent()
* @param BinaryFileResponse $response
* @return \Closure
*/
private function createFileStreamGenerator(BinaryFileResponse $response): \Closure
{
return static function () use ($response) {
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is the extra callback needed?

Copy link
Contributor Author

@FluffyDiscord FluffyDiscord Dec 6, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess it's not, since every matched option returns generator anyway. Will fix

$ref = new \ReflectionClass($response);
$maxlen = $ref->getProperty("maxlen")->getValue($response);
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If possible I'd like to avoid reflexion because it breaks static analysis.

Luckily PHPStan is awesome and understands closure binding, see https://phpstan.org/r/eaa5b9fe-836e-41e5-9681-d4b8b2d062a0

Which means that we should be able to do something like

[$maxlen, $offset, $chunkSize, $deleteFileAfterSend] = Closure::bind(fn(BinaryFileResponse $r) => [
    $r->maxlen, 
    $r->offset, 
    $r->chunkSize, 
    $r->deleteFileAfterSend
], null, BinaryFileResponse::class)($response);

I have no idea of the perf impact, maybe we should cache the result of Closure::bind() into a class variable.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The closure binding is something that I have yet to use, since I always reach for reflections. I will have to test them both and see if there's a major difference.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did some testing and found out that it does not matter at all if we use Reflection or Closure::bind(), at least in the context of PHP 8.2 that I used. The first instance is always slow, but the the PHP JIT cache kicks in and following ones are pretty fast and since we use workers, the PHP JIT cache will remaing through requests.

X = iteration
Y - time in MS

New reflection each response
ref

Cached reflection, only accessing the properties
ref_cached

New Closure bind each response
closure

Cached closure bind
closure_cached

$offset = $ref->getProperty("offset")->getValue($response);
$chunkSize = $ref->getProperty("chunkSize")->getValue($response);
$deleteFileAfterSend = $ref->getProperty("deleteFileAfterSend")->getValue($response);

try {
if (!$response->isSuccessful()) {
return;
}

$file = fopen($response->getFile()->getPathname(), "r");

ignore_user_abort(true);
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the return value should be stored and send back to another ignore_user_abort() call in the finally block to restore it to the previous value.

(honestly I don't even think ignore_user_abort has an impact in RoadRunner context)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think either. It was simply copied along the rest of the code from the original BinaryFileResponse. I will remove it.


if ($maxlen === 0) {
return;
}

if ($offset !== 0) {
fseek($file, $offset);
}

$length = $maxlen;
while ($length && !feof($file)) {
$read = $length > $chunkSize || 0 > $length ? $chunkSize : $length;

if (false === $data = fread($file, $read)) {
break;
}

while ('' !== $data) {
try {
yield $data;
} catch (StreamStoppedException) {
break 2;
}

if (0 < $length) {
$length -= $read;
}
$data = substr($data, $read);
}
}

fclose($file);
} finally {
if ($deleteFileAfterSend && is_file($response->getFile()->getPathname())) {
unlink($response->getFile()->getPathname());
}
}
};
}

/**
* @param SymfonyResponse $response
* @return \Closure
*/
private function createDefaultContentGetter(SymfonyResponse $response): \Closure
{
return static function () use ($response) {
ob_start();
$response->sendContent();
return ob_get_clean();
};
}

/**
* StreamedResponse callback can now use `yield` to be really streamed
* @param StreamedResponse $response
* @return \Closure
*/
private function createStreamedResponseGenerator(StreamedResponse $response): \Closure
{
return function () use ($response): \Generator {
$kernelCallback = $response->getCallback();

$kernelCallbackRef = new \ReflectionFunction($kernelCallback);
$closureVars = $kernelCallbackRef->getClosureUsedVariables();

$ref = new \ReflectionFunction($closureVars["callback"]);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks like you have to have a very precise setup of callbacks used variables: $callback, $request and $requestStack. In addition to that there must be a specific instances and a provided callback must return some sort of iterable type.

Ideally, symfony's StreamedResponse callback should be compatible with response example at RR docs . But since symfony streamed response defines callback as userland function without any return - such implementation with hardcoded variable values adds coupling with RR as web runner.

There are multiple ways on how to create a Symfony's StreamedResponse, but I cannot think of possible implementation on how to transform them to generators in an elegant way without coupling and hardcoding precise structure of variables..

Copy link
Contributor Author

@FluffyDiscord FluffyDiscord Dec 4, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This very specific setup is because while user passes callback/Closure to the StreamedResponse, Symfony's kernel wraps that up with it's own thing, and we need to know if the wrapped callback is generator or not. There is not a better way to make this more transparent to the user than this. You would need to create bundle specific streamed response. I want these PR changes to be as plug&play as possible.

The examples from that article do not make sense - why would you stream file using StreamedResponse when you can just pass it to BinaryFileResponse and you get the benefit of partial downloads too?

The easy way would be to simply pass the stream to the callback, read it by X about of bytes and yield them. Example from the article:

$stream = getMyStream();

return new StreamedResponse(
    function () use ($stream) {
        while (! feof($stream)) {
            // echo fread($stream, 1024);
            yield fread($stream, 1024); // thats it, nothing else to change
        }
        fclose($stream);
    },
    Response::HTTP_OK,
    [
        'Content-Transfer-Encoding', 'binary',
        'Content-Type' => 'image/jpeg',
        'Content-Disposition' => 'attachment; filename="attachment.jpg"',
        'Content-Length' => fstat($stream)['size'],
    ]
);

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how to write functional tests then? Symfony wont print yielded values from function.

Leaving echo together with yield is also not an option since it will store the data to memory (ob_start(), ob_get_clean())

Copy link
Contributor Author

@FluffyDiscord FluffyDiscord Dec 4, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think I understand your issue - what do you mean by Symfony wont print yielded values from function. Just consume the generator - foreach or one of the generator functions, generator is basically an Iterable

Copy link
Contributor

@rmikalkenas rmikalkenas Dec 5, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mean that your provided example with yield fread($stream, 1024); won't work in a non RR environment. In other words you are breaking StreamedResponse callback's contract

You are coupling you code implementation to RoadRunner and loosing flexibility which runtime provides

Copy link
Contributor Author

@FluffyDiscord FluffyDiscord Dec 5, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, yes. Still, your argument loses meaning:

I said before, you would either need to use custom StreamedRespose that this bundle would provide or use Symfony's one and change echo's to yields.

Both changes make you runtime dependant.

By using runtime of your choice, you are immediately being "held hostage" by it, no matter which one it is. Doesn't matter if it's the default one, RR, Swoole or other variants, you will always need to adjust your code in some way, if it's running in worker mode.

Rather than arguing with me about the issue (which I am aware of), what about providing us with a solution? :)

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do agree that it add a little coupling with RR, but the user would be fully advised, as using yield in Symfony StreamedResponse don't work at all.

As soon as it's well documented, and how it should be reverted back when removing this bundle, I'm ok with it.

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, maybe Symfony would accept a PR that allows to pass callback that returns generators to StreamedResponse, if you have time for another PR :)

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just had some afterthoughts, in the end I think I would prefer to have a StreamedResponse in this bundle to handle streaming via generators.

By doing so, if a user removes this package, and was using this feature, static analysis will fail, saying class Baldinof\RoadRunnerBundle\Http\StreamedResponse does not exists. They will have a direct feedback on where they should change code to get it back to regular StreamedResponse.

Also It will be easier to maintains compatibility when not running with RR, we can just override sendContent() to consumes the generator and echo it.

Sorry for the back and forth 😅

What do you think?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure. I will look at this tomorrow. Do you also want one for the rest of them, or just this one?

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should be able to find a solution with reflection otherwise it could starts to fail without notice due to symfony internal refactoring.

I think we can access the original callback with a listener on the kernel.response event. In the listener we would store in a weakmap, the callback associated to a response object, and here we would be able to retrieve it.

The listener could also do a check of RR_MODE and convert the callback to echo the generator, so the app can still be run with RR, or anything else (while the bundle is still in use).

What do you think?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Your idea isn't a bad solution, but the likelihood of them refactoring this part of code or changing listener priorities to mess up your idea of the implementation will be probably the same. Don't forget tho, that these events can be stopPropagation() and then we will have to fall back to some other method of getting the callback - also low likelihood but the possibility is still there. This is really up to you.

PS: why would RR_MODE matter? if it's anything else then http, the listener will not be triggered anyway AFAIK

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it will depends on what we do here: #130 (comment)

If we add a custom StreamedResponse we can just extend the regular Response and Symfony will not call setCallback() to wrap it.

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

RR_MODE or any other var that indicate we are not running with RR and should consume the generator and echo it

if ($ref->isGenerator()) {
$request = $closureVars["request"];
assert($request instanceof Request);

$requestStack = $closureVars["requestStack"];
assert($requestStack instanceof RequestStack);

try {
$requestStack->push($request);

foreach ($closureVars["callback"]() as $output) {
try {
yield $output;
} catch (StreamStoppedException) {
break;
}
}
} finally {
$requestStack->pop();
}

return;
}

yield $this->createDefaultContentGetter($response)();
};
}

/**
* @param StreamedJsonResponse $response
* @return \Closure
*/
private function createStreamedJsonResponseGenerator(StreamedJsonResponse $response): \Closure
{
return static function () use ($response): \Generator {
foreach (StreamedJsonResponseHelper::toGenerator($response) as $item) {
try {
yield $item;
} catch (StreamStoppedException) {
break;
}
}
};
}
}