From c464ebcdb7b56885ca99ede04495a5bfc538c636 Mon Sep 17 00:00:00 2001 From: Simon Podlipsky Date: Wed, 15 Jan 2025 16:20:03 +0100 Subject: [PATCH] feat: add `Client::insertPayload()` --- composer.json | 1 + src/Client/ClickHouseClient.php | 17 ++++++++ src/Client/Http/RequestFactory.php | 27 ++++++++---- src/Client/Http/RequestOptions.php | 13 +----- src/Client/Http/RequestSettings.php | 22 ++++++++++ src/Client/PsrClickHouseAsyncClient.php | 11 +++-- src/Client/PsrClickHouseClient.php | 53 +++++++++++++++++++++-- src/Format/RowBinary.php | 28 ++++++++++++ tests/Client/Http/RequestFactoryTest.php | 15 ++++--- tests/Client/Http/RequestOptionsTest.php | 27 ------------ tests/Client/Http/RequestSettingsTest.php | 25 +++++++++++ tests/Client/InsertTest.php | 51 ++++++++++++++++++++++ 12 files changed, 230 insertions(+), 60 deletions(-) create mode 100644 src/Client/Http/RequestSettings.php create mode 100644 src/Format/RowBinary.php delete mode 100644 tests/Client/Http/RequestOptionsTest.php create mode 100644 tests/Client/Http/RequestSettingsTest.php diff --git a/composer.json b/composer.json index bdefceb..9b1c9cf 100644 --- a/composer.json +++ b/composer.json @@ -39,6 +39,7 @@ "require-dev": { "cdn77/coding-standard": "^7.0", "infection/infection": "^0.29.0", + "kafkiansky/phpclick": "dev-master", "nyholm/psr7": "^1.2", "php-http/message-factory": "^1.1", "phpstan/extension-installer": "^1.1", diff --git a/src/Client/ClickHouseClient.php b/src/Client/ClickHouseClient.php index 7e8abcc..559062e 100644 --- a/src/Client/ClickHouseClient.php +++ b/src/Client/ClickHouseClient.php @@ -5,6 +5,7 @@ namespace SimPod\ClickHouseClient\Client; use Psr\Http\Client\ClientExceptionInterface; +use Psr\Http\Message\StreamInterface; use SimPod\ClickHouseClient\Exception\CannotInsert; use SimPod\ClickHouseClient\Exception\ServerError; use SimPod\ClickHouseClient\Exception\UnsupportedParamType; @@ -85,4 +86,20 @@ public function insert(string $table, array $values, array|null $columns = null, * @template O of Output */ public function insertWithFormat(string $table, Format $inputFormat, string $data, array $settings = []): void; + + /** + * @param array $settings + * @param list $columns + * @param Format> $inputFormat + * + * @throws ClientExceptionInterface + * @throws ServerError + */ + public function insertPayload( + string $table, + Format $inputFormat, + StreamInterface $payload, + array $columns = [], + array $settings = [], + ): void; } diff --git a/src/Client/Http/RequestFactory.php b/src/Client/Http/RequestFactory.php index 4c74502..0c50559 100644 --- a/src/Client/Http/RequestFactory.php +++ b/src/Client/Http/RequestFactory.php @@ -49,11 +49,13 @@ public function __construct( $this->uri = $uri; } - /** @throws UnsupportedParamType */ - public function prepareRequest(RequestOptions $requestOptions): RequestInterface - { + /** @param array $additionalOptions */ + public function initRequest( + RequestSettings $requestSettings, + array $additionalOptions = [], + ): RequestInterface { $query = http_build_query( - $requestOptions->settings, + $requestSettings->settings + $additionalOptions, '', '&', PHP_QUERY_RFC3986, @@ -70,11 +72,20 @@ public function prepareRequest(RequestOptions $requestOptions): RequestInterface } } - $request = $this->requestFactory->createRequest('POST', $uri); + return $this->requestFactory->createRequest('POST', $uri); + } - preg_match_all('~\{([a-zA-Z\d]+):([a-zA-Z\d ]+(\(.+\))?)}~', $requestOptions->sql, $matches); + /** @throws UnsupportedParamType */ + public function prepareSqlRequest( + string $sql, + RequestSettings $requestSettings, + RequestOptions $requestOptions, + ): RequestInterface { + $request = $this->initRequest($requestSettings); + + preg_match_all('~\{([a-zA-Z\d]+):([a-zA-Z\d ]+(\(.+\))?)}~', $sql, $matches); if ($matches[0] === []) { - $body = $this->streamFactory->createStream($requestOptions->sql); + $body = $this->streamFactory->createStream($sql); try { return $request->withBody($body); } catch (InvalidArgumentException) { @@ -93,7 +104,7 @@ static function (array $acc, string|int $k) use ($matches) { [], ); - $streamElements = [['name' => 'query', 'contents' => $requestOptions->sql]]; + $streamElements = [['name' => 'query', 'contents' => $sql]]; foreach ($requestOptions->params as $name => $value) { $type = $paramToType[$name] ?? null; if ($type === null) { diff --git a/src/Client/Http/RequestOptions.php b/src/Client/Http/RequestOptions.php index 1f5e12b..772ffd5 100644 --- a/src/Client/Http/RequestOptions.php +++ b/src/Client/Http/RequestOptions.php @@ -6,20 +6,9 @@ final class RequestOptions { - /** @var array */ - public array $settings; - - /** - * @param array $params - * @param array $defaultSettings - * @param array $querySettings - */ + /** @param array $params */ public function __construct( - public string $sql, public array $params, - array $defaultSettings, - array $querySettings, ) { - $this->settings = $querySettings + $defaultSettings; } } diff --git a/src/Client/Http/RequestSettings.php b/src/Client/Http/RequestSettings.php new file mode 100644 index 0000000..0f99893 --- /dev/null +++ b/src/Client/Http/RequestSettings.php @@ -0,0 +1,22 @@ + */ + public array $settings; + + /** + * @param array $defaultSettings + * @param array $querySettings + */ + public function __construct( + array $defaultSettings, + array $querySettings, + ) { + $this->settings = $querySettings + $defaultSettings; + } +} diff --git a/src/Client/PsrClickHouseAsyncClient.php b/src/Client/PsrClickHouseAsyncClient.php index d1a1007..2cce723 100644 --- a/src/Client/PsrClickHouseAsyncClient.php +++ b/src/Client/PsrClickHouseAsyncClient.php @@ -12,6 +12,7 @@ use Psr\Http\Message\ResponseInterface; use SimPod\ClickHouseClient\Client\Http\RequestFactory; use SimPod\ClickHouseClient\Client\Http\RequestOptions; +use SimPod\ClickHouseClient\Client\Http\RequestSettings; use SimPod\ClickHouseClient\Exception\ServerError; use SimPod\ClickHouseClient\Format\Format; use SimPod\ClickHouseClient\Output\Output; @@ -83,13 +84,15 @@ private function executeRequest( array $settings = [], callable|null $processResponse = null, ): PromiseInterface { - $request = $this->requestFactory->prepareRequest( - new RequestOptions( - $sql, - $params, + $request = $this->requestFactory->prepareSqlRequest( + $sql, + new RequestSettings( $this->defaultSettings, $settings, ), + new RequestOptions( + $params, + ), ); return Create::promiseFor( diff --git a/src/Client/PsrClickHouseClient.php b/src/Client/PsrClickHouseClient.php index e274529..6541a6b 100644 --- a/src/Client/PsrClickHouseClient.php +++ b/src/Client/PsrClickHouseClient.php @@ -5,11 +5,14 @@ namespace SimPod\ClickHouseClient\Client; use DateTimeZone; +use InvalidArgumentException; use Psr\Http\Client\ClientExceptionInterface; use Psr\Http\Client\ClientInterface; use Psr\Http\Message\ResponseInterface; +use Psr\Http\Message\StreamInterface; use SimPod\ClickHouseClient\Client\Http\RequestFactory; use SimPod\ClickHouseClient\Client\Http\RequestOptions; +use SimPod\ClickHouseClient\Client\Http\RequestSettings; use SimPod\ClickHouseClient\Exception\CannotInsert; use SimPod\ClickHouseClient\Exception\ServerError; use SimPod\ClickHouseClient\Exception\UnsupportedParamType; @@ -198,6 +201,46 @@ public function insertWithFormat(string $table, Format $inputFormat, string $dat } } + public function insertPayload( + string $table, + Format $inputFormat, + StreamInterface $payload, + array $columns = [], + array $settings = [], + ): void { + $formatSql = $inputFormat::toSql(); + + $table = Escaper::quoteIdentifier($table); + + $columnsSql = $columns === [] ? '' : sprintf('(%s)', implode(',', $columns)); + + $sql = <<requestFactory->initRequest( + new RequestSettings( + $this->defaultSettings, + $settings, + ), + ['query' => $sql], + ); + + try { + $request = $request->withBody($payload); + } catch (InvalidArgumentException) { + absurd(); + } + + $response = $this->client->sendRequest($request); + + if ($response->getStatusCode() !== 200) { + throw ServerError::fromResponse($response); + } + + return; + } + /** * @param array $params * @param array $settings @@ -208,13 +251,15 @@ public function insertWithFormat(string $table, Format $inputFormat, string $dat */ private function executeRequest(string $sql, array $params, array $settings): ResponseInterface { - $request = $this->requestFactory->prepareRequest( - new RequestOptions( - $sql, - $params, + $request = $this->requestFactory->prepareSqlRequest( + $sql, + new RequestSettings( $this->defaultSettings, $settings, ), + new RequestOptions( + $params, + ), ); $response = $this->client->sendRequest($request); diff --git a/src/Format/RowBinary.php b/src/Format/RowBinary.php new file mode 100644 index 0000000..ff3eb55 --- /dev/null +++ b/src/Format/RowBinary.php @@ -0,0 +1,28 @@ +> + */ +final class RowBinary implements Format +{ + public static function output(string $contents): Output + { + /** @var Basic $output */ + $output = new Basic($contents); + + return $output; + } + + public static function toSql(): string + { + return 'FORMAT RowBinary'; + } +} diff --git a/tests/Client/Http/RequestFactoryTest.php b/tests/Client/Http/RequestFactoryTest.php index 04bc6a3..2bbb916 100644 --- a/tests/Client/Http/RequestFactoryTest.php +++ b/tests/Client/Http/RequestFactoryTest.php @@ -10,6 +10,7 @@ use PHPUnit\Framework\Attributes\DataProvider; use SimPod\ClickHouseClient\Client\Http\RequestFactory; use SimPod\ClickHouseClient\Client\Http\RequestOptions; +use SimPod\ClickHouseClient\Client\Http\RequestSettings; use SimPod\ClickHouseClient\Param\ParamValueConverterRegistry; use SimPod\ClickHouseClient\Tests\TestCaseBase; @@ -28,12 +29,16 @@ public function testPrepareRequest(string $uri, string $expectedUri): void $uri, ); - $request = $requestFactory->prepareRequest(new RequestOptions( + $request = $requestFactory->prepareSqlRequest( 'SELECT 1', - [], - ['max_block_size' => 1], - ['database' => 'database'], - )); + new RequestSettings( + ['max_block_size' => 1], + ['database' => 'database'], + ), + new RequestOptions( + [], + ), + ); self::assertSame('POST', $request->getMethod()); self::assertSame( diff --git a/tests/Client/Http/RequestOptionsTest.php b/tests/Client/Http/RequestOptionsTest.php deleted file mode 100644 index 3eb7018..0000000 --- a/tests/Client/Http/RequestOptionsTest.php +++ /dev/null @@ -1,27 +0,0 @@ - 'foo', 'a' => 1], - ['database' => 'bar', 'b' => 2], - ); - - self::assertSame('bar', $requestOptions->settings['database']); - self::assertSame(1, $requestOptions->settings['a']); - self::assertSame(2, $requestOptions->settings['b']); - } -} diff --git a/tests/Client/Http/RequestSettingsTest.php b/tests/Client/Http/RequestSettingsTest.php new file mode 100644 index 0000000..532874a --- /dev/null +++ b/tests/Client/Http/RequestSettingsTest.php @@ -0,0 +1,25 @@ + 'foo', 'a' => 1], + ['database' => 'bar', 'b' => 2], + ); + + self::assertSame('bar', $requestSettings->settings['database']); + self::assertSame(1, $requestSettings->settings['a']); + self::assertSame(2, $requestSettings->settings['b']); + } +} diff --git a/tests/Client/InsertTest.php b/tests/Client/InsertTest.php index 76415d9..8499b75 100644 --- a/tests/Client/InsertTest.php +++ b/tests/Client/InsertTest.php @@ -4,6 +4,8 @@ namespace SimPod\ClickHouseClient\Tests\Client; +use Kafkiansky\PHPClick; +use Nyholm\Psr7\Factory\Psr17Factory; use PHPUnit\Framework\Attributes\CoversClass; use PHPUnit\Framework\Attributes\DataProvider; use SimPod\ClickHouseClient\Client\Http\RequestFactory; @@ -12,6 +14,7 @@ use SimPod\ClickHouseClient\Exception\ServerError; use SimPod\ClickHouseClient\Format\JsonCompact; use SimPod\ClickHouseClient\Format\JsonEachRow; +use SimPod\ClickHouseClient\Format\RowBinary; use SimPod\ClickHouseClient\Tests\TestCaseBase; use SimPod\ClickHouseClient\Tests\WithClient; @@ -107,6 +110,54 @@ public function testInsertUseColumnsWithTypes(string $tableSql): void self::assertSame($expectedData, $output->data); } + #[DataProvider('providerInsert')] + public function testInsertPayload(string $tableSql): void + { + $data = [ + ['PageViews' => 5, 'UserID' => 4324182021466249494, 'Duration' => 146, 'Sign' => -1], + ['PageViews' => 6, 'UserID' => 4324182021466249494, 'Duration' => 185, 'Sign' => 1], + ]; + + $x = PHPClick\Batch::fromRows( + PHPClick\Row::columns( + PHPClick\Column::uint32(5), + PHPClick\Column::uint64(4324182021466249494), + PHPClick\Column::uint32(146), + PHPClick\Column::int8(-1), + ), + PHPClick\Row::columns( + PHPClick\Column::uint32(6), + PHPClick\Column::uint64(4324182021466249494), + PHPClick\Column::uint32(185), + PHPClick\Column::int8(1), + ), + ); + + $a = $x->content->getContent()->read(); + + self::$client->executeQuery($tableSql); + + $psr17Factory = new Psr17Factory(); + + self::$client->insertPayload( + 'UserActivity', + new RowBinary(), + $psr17Factory->createStream($a), + ['PageViews', 'UserID', 'Duration', 'Sign'], + ); + + $output = self::$client->select( + <<<'CLICKHOUSE' +SELECT * FROM UserActivity +CLICKHOUSE, + new JsonEachRow(), + ); + + $data[0]['UserID'] = (string) $data[0]['UserID']; + $data[1]['UserID'] = (string) $data[1]['UserID']; + self::assertSame($data, $output->data); + } + public function testInsertEscaping(): void { self::$client->executeQuery(