From c7ea59995176528980754ad18de78eb969ff1c36 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jakub=20Bedn=C3=A1=C5=99?= Date: Wed, 7 Jun 2023 13:49:38 +0200 Subject: [PATCH] feat: query and write (#1) --- .circleci/config.yml | 74 +-- .github/workflows/linter.yml | 1 + README.md | 72 ++- checkstyle.xml | 3 +- examples/.gitignore | 38 ++ examples/README.md | 8 + examples/pom.xml | 81 ++++ .../main/java/com/influxdb/v3/IOxExample.java | 91 ++++ pom.xml | 148 +++++- .../v3/client/InfluxDBApiException.java | 47 ++ .../influxdb/v3/client/InfluxDBClient.java | 181 ++++++- .../client/config/InfluxDBClientConfigs.java | 319 +++++++++++++ .../v3/client/internal/Arguments.java | 165 +++++++ .../v3/client/internal/FlightSqlClient.java | 170 +++++++ .../client/internal/InfluxDBClientImpl.java | 236 +++++++++ .../client/internal/NanosecondConverter.java | 114 +++++ .../v3/client/internal/RestClient.java | 188 ++++++++ .../v3/client/query/QueryParameters.java | 99 ++++ .../influxdb/v3/client/query/QueryType.java} | 22 +- .../com/influxdb/v3/client/write/Point.java | 451 ++++++++++++++++++ .../v3/client/write/WriteParameters.java | 146 ++++++ .../v3/client/write/WritePrecision.java | 46 ++ .../v3/client/AbstractMockServerTest.java | 63 +++ .../com/influxdb/v3/client/ITQueryWrite.java | 108 +++++ .../v3/client/InfluxDBClientTest.java | 47 ++ .../v3/client/InfluxDBClientWriteTest.java | 217 +++++++++ .../config/InfluxDBClientConfigsTest.java | 58 +++ .../internal/ArgumentsDurationTest.java | 85 ++++ .../v3/client/internal/ArgumentsTest.java | 164 +++++++ .../v3/client/internal/RestClientTest.java | 222 +++++++++ .../influxdb/v3/client/write/PointTest.java | 375 +++++++++++++++ .../v3/client/write/WriteParametersTest.java | 72 +++ 32 files changed, 4026 insertions(+), 85 deletions(-) create mode 100644 examples/.gitignore create mode 100644 examples/README.md create mode 100644 examples/pom.xml create mode 100644 examples/src/main/java/com/influxdb/v3/IOxExample.java create mode 100644 src/main/java/com/influxdb/v3/client/InfluxDBApiException.java create mode 100644 src/main/java/com/influxdb/v3/client/config/InfluxDBClientConfigs.java create mode 100644 src/main/java/com/influxdb/v3/client/internal/Arguments.java create mode 100644 src/main/java/com/influxdb/v3/client/internal/FlightSqlClient.java create mode 100644 src/main/java/com/influxdb/v3/client/internal/InfluxDBClientImpl.java create mode 100644 src/main/java/com/influxdb/v3/client/internal/NanosecondConverter.java create mode 100644 src/main/java/com/influxdb/v3/client/internal/RestClient.java create mode 100644 src/main/java/com/influxdb/v3/client/query/QueryParameters.java rename src/{test/java/com/influxdb/v3/client/DummyTest.java => main/java/com/influxdb/v3/client/query/QueryType.java} (83%) create mode 100644 src/main/java/com/influxdb/v3/client/write/Point.java create mode 100644 src/main/java/com/influxdb/v3/client/write/WriteParameters.java create mode 100644 src/main/java/com/influxdb/v3/client/write/WritePrecision.java create mode 100644 src/test/java/com/influxdb/v3/client/AbstractMockServerTest.java create mode 100644 src/test/java/com/influxdb/v3/client/ITQueryWrite.java create mode 100644 src/test/java/com/influxdb/v3/client/InfluxDBClientTest.java create mode 100644 src/test/java/com/influxdb/v3/client/InfluxDBClientWriteTest.java create mode 100644 src/test/java/com/influxdb/v3/client/config/InfluxDBClientConfigsTest.java create mode 100644 src/test/java/com/influxdb/v3/client/internal/ArgumentsDurationTest.java create mode 100644 src/test/java/com/influxdb/v3/client/internal/ArgumentsTest.java create mode 100644 src/test/java/com/influxdb/v3/client/internal/RestClientTest.java create mode 100644 src/test/java/com/influxdb/v3/client/write/PointTest.java create mode 100644 src/test/java/com/influxdb/v3/client/write/WriteParametersTest.java diff --git a/.circleci/config.yml b/.circleci/config.yml index 149cfad..da3f050 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -1,51 +1,4 @@ version: 2.1 -commands: - client-test: - parameters: - project: - type: string - default: "Client.Test" - steps: - - checkout - - run: - name: Install Dependencies - command: | - dotnet restore - dotnet build --no-restore - - run: - name: Create a temp directory for artifacts - command: | - mkdir -p /tmp/artifacts - mkdir test-results - - run: - name: Run tests - command: dotnet test << parameters.project >> --collect "Xplat Code Coverage" --logger "junit;LogFilePath=../test-results/test-result.xml" - - run: - name: Coverage Report - command: | - dotnet tool install --tool-path="./reportgenerator/" dotnet-reportgenerator-globaltool - ./reportgenerator/reportgenerator -reports:"<< parameters.project >>/TestResults/*/coverage.cobertura.xml" -targetdir:"report" -reporttypes:HtmlSummary "-sourcedirs:Client/" - mv report/summary.html /tmp/artifacts - cp test-results/test-result.xml /tmp/artifacts - when: always - - run: - name: Report test results to codecov - command: | - apt-get update - apt-get install gpg --yes - curl -Os https://uploader.codecov.io/latest/linux/codecov - curl -Os https://uploader.codecov.io/latest/linux/codecov.SHA256SUM - curl -Os https://uploader.codecov.io/latest/linux/codecov.SHA256SUM.sig - curl https://keybase.io/codecovsecurity/pgp_keys.asc | gpg --no-default-keyring --keyring trustedkeys.gpg --import - gpgv codecov.SHA256SUM.sig codecov.SHA256SUM - shasum -a 256 -c codecov.SHA256SUM - chmod +x ./codecov - ./codecov - - store_artifacts: - path: /tmp/artifacts - - store_test_results: - path: test-results - jobs: tests-java: @@ -53,20 +6,11 @@ jobs: maven-image: type: string default: &default-maven-image "cimg/openjdk:8.0" + arg-line: + type: string + default: "" docker: - image: << parameters.maven-image >> - - image: voltrondata/flight-sql:latest - environment: - FLIGHT_PASSWORD: "flight_password" - PRINT_QUERIES: "1" - - image: influxdb:latest - environment: - DOCKER_INFLUXDB_INIT_MODE: "setup" - DOCKER_INFLUXDB_INIT_USERNAME: "my-user" - DOCKER_INFLUXDB_INIT_PASSWORD: "my-password" - DOCKER_INFLUXDB_INIT_ORG: "my-org" - DOCKER_INFLUXDB_INIT_BUCKET: "my-bucket" - DOCKER_INFLUXDB_INIT_ADMIN_TOKEN: "my-token" steps: - checkout - restore_cache: @@ -77,7 +21,7 @@ jobs: - run: name: "Running tests" command: | - mvn -B -U clean install -Dbuild.env=CI + mvn -B -U clean install -DargLine="@{argLine} << parameters.arg-line >>" - save_cache: name: Saving Maven Cache key: *cache-key @@ -125,7 +69,7 @@ jobs: - maven-cache_v1-<< parameters.maven-image >>- - run: name: "Check dependency rules" - command: mvn enforcer:enforce -Drules=banDuplicatePomDependencyVersions,dependencyConvergence + command: mvn enforcer:enforce -Denforcer.rules=banDuplicatePomDependencyVersions,dependencyConvergence check-licenses: parameters: @@ -150,14 +94,14 @@ workflows: jobs: - check-dependencies - check-licenses - - tests-java: - name: jdk-8 - tests-java: name: jdk-11 maven-image: "cimg/openjdk:11.0" - tests-java: name: jdk-17 maven-image: "cimg/openjdk:17.0" + arg-line: "--add-opens=java.base/java.nio=ALL-UNNAMED" - tests-java: - name: jdk-19 - maven-image: "cimg/openjdk:19.0" + name: jdk-20 + maven-image: "cimg/openjdk:20.0" + arg-line: "--add-opens=java.base/java.nio=ALL-UNNAMED" diff --git a/.github/workflows/linter.yml b/.github/workflows/linter.yml index 95ed7ff..e3beea2 100644 --- a/.github/workflows/linter.yml +++ b/.github/workflows/linter.yml @@ -33,4 +33,5 @@ jobs: VALIDATE_MARKDOWN: true VALIDATE_BASH: true VALIDATE_JAVA: true + JAVA_FILE_NAME: 'checkstyle.xml' VALIDATE_GITHUB_ACTIONS: true diff --git a/README.md b/README.md index b429035..9773348 100644 --- a/README.md +++ b/README.md @@ -28,11 +28,15 @@ The Java client that provides an easy and convenient way to interact with Influx This package supports both writing data to InfluxDB and querying data using the FlightSQL client, which allows you to execute SQL queries against InfluxDB IOx. +> :warning: This client requires Java 11 and is compatible up to and including Java 20. + ## Installation +> :warning: Some JDK internals must be exposed by adding `--add-opens=java.base/java.nio=ALL-UNNAMED` to your JVM arguments. + Add the latest version of the client to your project: -##### Maven dependency: +### Maven dependency ```xml @@ -42,7 +46,7 @@ Add the latest version of the client to your project: ``` -##### Or when using Gradle: +### Or when using Gradle ```groovy dependencies { @@ -55,19 +59,77 @@ dependencies { To start with the client, import the `com.influxdb.v3.client` package and create a `InfluxDBClient` by: ```java -// TBD +package com.influxdb.v3; + +import java.time.Instant; +import java.util.stream.Stream; + +import com.influxdb.v3.client.InfluxDBClient; +import com.influxdb.v3.client.query.QueryParameters; +import com.influxdb.v3.client.write.Point; + +public class IOxExample { + public static void main(String[] args) throws Exception { + String hostUrl = "https://us-east-1-1.aws.cloud2.influxdata.com"; + char[] authToken = "my-token".toCharArray(); + String database = "database"; + + try (InfluxDBClient client = InfluxDBClient.getInstance(hostUrl, authToken, database)) { + // ... + } + } +} ``` to insert data, you can use code like this: ```java -// TBD +// +// Write by Point +// +Point point = Point.measurement("temperature") + .addTag("location", "west") + .addField("value", 55.15) + .setTimestamp(Instant.now().minusSeconds(-10)); +client.writePoint(point); + +// +// Write by LineProtocol +// +String record = "temperature,location=north value=60.0"; +client.writeRecord(record); ``` to query your data, you can use code like this: ```java -// TBD +// +// Query by SQL +// +System.out.printf("--------------------------------------------------------%n"); +System.out.printf("| %-8s | %-8s | %-30s |%n", "location", "value", "time"); +System.out.printf("--------------------------------------------------------%n"); + +String sql = "select time,location,value from temperature order by time desc limit 10"; +try (Stream stream = client.query(sql)) { + stream.forEach(row -> System.out.printf("| %-8s | %-8s | %-30s |%n", row[1], row[2], row[0])); +} + +System.out.printf("--------------------------------------------------------%n%n"); + +// +// Query by InfluxQL +// +System.out.printf("-----------------------------------------%n"); +System.out.printf("| %-16s | %-18s |%n", "time", "mean"); +System.out.printf("-----------------------------------------%n"); + +String influxQL = "select MEAN(value) from temperature group by time(1d) fill(none) order by time desc limit 10"; +try (Stream stream = client.query(influxQL, QueryParameters.INFLUX_QL)) { + stream.forEach(row -> System.out.printf("| %-16s | %-18s |%n", row[1], row[2])); +} + +System.out.printf("-----------------------------------------%n"); ``` ## Feedback diff --git a/checkstyle.xml b/checkstyle.xml index 5652256..402c7d6 100644 --- a/checkstyle.xml +++ b/checkstyle.xml @@ -86,7 +86,7 @@ + value="STANDARD_JAVA_PACKAGE###THIRD_PARTY_PACKAGE###SPECIAL_IMPORTS###STATIC"/> @@ -143,7 +143,6 @@ - diff --git a/examples/.gitignore b/examples/.gitignore new file mode 100644 index 0000000..5ff6309 --- /dev/null +++ b/examples/.gitignore @@ -0,0 +1,38 @@ +target/ +!.mvn/wrapper/maven-wrapper.jar +!**/src/main/**/target/ +!**/src/test/**/target/ + +### IntelliJ IDEA ### +.idea/modules.xml +.idea/jarRepositories.xml +.idea/compiler.xml +.idea/libraries/ +*.iws +*.iml +*.ipr + +### Eclipse ### +.apt_generated +.classpath +.factorypath +.project +.settings +.springBeans +.sts4-cache + +### NetBeans ### +/nbproject/private/ +/nbbuild/ +/dist/ +/nbdist/ +/.nb-gradle/ +build/ +!**/src/main/**/build/ +!**/src/test/**/build/ + +### VS Code ### +.vscode/ + +### Mac OS ### +.DS_Store \ No newline at end of file diff --git a/examples/README.md b/examples/README.md new file mode 100644 index 0000000..832bd08 --- /dev/null +++ b/examples/README.md @@ -0,0 +1,8 @@ +# Examples + +> :warning: The examples depends on the "influxdb3-java" module and this module should be built first by running "mvn install" in the root directory. +> :warning: Some JDK internals must be exposed by adding `--add-opens=java.base/java.nio=ALL-UNNAMED` to your JVM arguments. + +## Basic + +- [IOxExample](src/main/java/com/influxdb/v3/IOxExample.java) - How to use write and query data from InfluxDB IOx diff --git a/examples/pom.xml b/examples/pom.xml new file mode 100644 index 0000000..aa145a7 --- /dev/null +++ b/examples/pom.xml @@ -0,0 +1,81 @@ + + + 4.0.0 + + com.influxdb.v3 + examples + 1.0-SNAPSHOT + jar + + + UTF-8 + + + + + com.influxdb + influxdb3-java + 1.0.0-SNAPSHOT + + + + + + + org.apache.maven.plugins + maven-compiler-plugin + 3.10.1 + + 11 + + + + + org.apache.maven.plugins + maven-checkstyle-plugin + 3.2.1 + + true + ../checkstyle.xml + true + false + + src/main/java + + + + + verify + + checkstyle + + + + + + + + diff --git a/examples/src/main/java/com/influxdb/v3/IOxExample.java b/examples/src/main/java/com/influxdb/v3/IOxExample.java new file mode 100644 index 0000000..698f397 --- /dev/null +++ b/examples/src/main/java/com/influxdb/v3/IOxExample.java @@ -0,0 +1,91 @@ +/* + * The MIT License + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + * THE SOFTWARE. + */ +package com.influxdb.v3; + +import java.time.Instant; +import java.util.stream.Stream; + +import com.influxdb.v3.client.InfluxDBClient; +import com.influxdb.v3.client.query.QueryParameters; +import com.influxdb.v3.client.write.Point; + +/** + * The example depends on the "influxdb3-java" module and this module should be built first + * by running "mvn install" in the root directory. + */ +public final class IOxExample { + private IOxExample() { + } + + public static void main(final String[] args) throws Exception { + String hostUrl = "https://us-east-1-1.aws.cloud2.influxdata.com"; + char[] authToken = "my-token".toCharArray(); + String database = "database"; + + try (InfluxDBClient client = InfluxDBClient.getInstance(hostUrl, authToken, database)) { + + // + // Write by Point + // + Point point = Point.measurement("temperature") + .addTag("location", "west") + .addField("value", 55.15) + .setTimestamp(Instant.now().minusSeconds(-10)); + client.writePoint(point); + + // + // Write by LineProtocol + // + String record = "temperature,location=north value=60.0"; + client.writeRecord(record); + + // + // Query by SQL + // + System.out.printf("--------------------------------------------------------%n"); + System.out.printf("| %-8s | %-8s | %-30s |%n", "location", "value", "time"); + System.out.printf("--------------------------------------------------------%n"); + + String sql = "select time,location,value from temperature order by time desc limit 10"; + try (Stream stream = client.query(sql)) { + stream.forEach(row -> System.out.printf("| %-8s | %-8s | %-30s |%n", row[1], row[2], row[0])); + } + + System.out.printf("--------------------------------------------------------%n%n"); + + // + // Query by InfluxQL + // + System.out.printf("-----------------------------------------%n"); + System.out.printf("| %-16s | %-18s |%n", "time", "mean"); + System.out.printf("-----------------------------------------%n"); + + String influxQL = + "select MEAN(value) from temperature group by time(1d) fill(none) order by time desc limit 10"; + try (Stream stream = client.query(influxQL, QueryParameters.INFLUX_QL)) { + stream.forEach(row -> System.out.printf("| %-16s | %-18s |%n", row[1], row[2])); + } + + System.out.printf("-----------------------------------------%n"); + } + } +} diff --git a/pom.xml b/pom.xml index 00b9bba..13aad8a 100644 --- a/pom.xml +++ b/pom.xml @@ -97,6 +97,126 @@ + + org.apache.arrow + flight-grpc + 12.0.0 + + + org.slf4j + slf4j-api + + + io.netty + netty-tcnative-boringssl-static + + + io.netty + netty-codec-http2 + + + io.netty + netty-transport + + + io.netty + netty-common + + + io.netty + netty-codec-http + + + io.netty + netty-codec + + + io.netty + netty-handler + + + io.netty + netty-buffer + + + com.google.protobuf + protobuf-java + + + com.google.guava + guava + + + com.google.errorprone + error_prone_annotations + + + + + + io.netty + netty-handler + 4.1.92.Final + + + + io.netty + netty-buffer + 4.1.92.Final + + + + io.netty + netty-codec + 4.1.92.Final + + + io.netty + netty-buffer + + + + + + io.netty + netty-codec-http2 + 4.1.92.Final + + + io.netty + netty-codec + + + io.netty + netty-handler + + + + + + io.netty + netty-tcnative-boringssl-static + 2.0.61.Final + + + + com.google.protobuf + protobuf-java + 3.21.7 + + + + com.google.guava + guava + 30.1.1-jre + + + + org.slf4j + slf4j-api + 2.0.7 + + org.junit.jupiter junit-jupiter-engine @@ -111,6 +231,20 @@ test + + com.squareup.okhttp3 + mockwebserver + 4.11.0 + test + + + + org.slf4j + slf4j-simple + 2.0.7 + test + + @@ -149,8 +283,7 @@ maven-compiler-plugin 3.10.1 - 1.8 - 1.8 + 11 @@ -179,6 +312,7 @@ false src/main/java + src/test/java @@ -231,11 +365,21 @@ APPROVE The MIT License + + LICENSE_NAME + APPROVE + MIT License + LICENSE_NAME APPROVE Apache-2.0 + + LICENSE_NAME + APPROVE + BSD-3-Clause + LICENSE_URL APPROVE diff --git a/src/main/java/com/influxdb/v3/client/InfluxDBApiException.java b/src/main/java/com/influxdb/v3/client/InfluxDBApiException.java new file mode 100644 index 0000000..70a4111 --- /dev/null +++ b/src/main/java/com/influxdb/v3/client/InfluxDBApiException.java @@ -0,0 +1,47 @@ +/* + * The MIT License + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + * THE SOFTWARE. + */ +package com.influxdb.v3.client; + +import javax.annotation.Nullable; + +/** + * The InfluxDBApiException is thrown when an error occurs while interacting with InfluxDB. + */ +public class InfluxDBApiException extends RuntimeException { + /** + * Construct a new InfluxDBApiException with the specified detail message. + * + * @param message the detail message. + */ + public InfluxDBApiException(@Nullable final String message) { + super(message); + } + + /** + * Construct a new InfluxDBApiException with the specified cause. + * + * @param cause the cause. + */ + public InfluxDBApiException(@Nullable final Throwable cause) { + super(cause); + } +} diff --git a/src/main/java/com/influxdb/v3/client/InfluxDBClient.java b/src/main/java/com/influxdb/v3/client/InfluxDBClient.java index 1a71009..1a19493 100644 --- a/src/main/java/com/influxdb/v3/client/InfluxDBClient.java +++ b/src/main/java/com/influxdb/v3/client/InfluxDBClient.java @@ -21,12 +21,189 @@ */ package com.influxdb.v3.client; -import java.io.Closeable; +import java.util.List; +import java.util.stream.Stream; +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + +import org.apache.arrow.vector.VectorSchemaRoot; + +import com.influxdb.v3.client.config.InfluxDBClientConfigs; +import com.influxdb.v3.client.internal.InfluxDBClientImpl; +import com.influxdb.v3.client.query.QueryParameters; +import com.influxdb.v3.client.write.Point; +import com.influxdb.v3.client.write.WriteParameters; /** * The InfluxDBClient interface provides a client for interact with InfluxDB 3. * This client supports both writing data to InfluxDB and querying data using the FlightSQL client, * which allows you to execute SQL queries against InfluxDB IOx. */ -public interface InfluxDBClient extends Closeable { +public interface InfluxDBClient extends AutoCloseable { + + /** + * Write a record specified in the InfluxDB Line Protocol to the InfluxDB server. + * + * @param record the record specified in the InfluxDB Line Protocol, can be null + */ + void writeRecord(@Nullable final String record); + + /** + * Write a record specified in the InfluxDB Line Protocol to the InfluxDB server. + * + * @param record the record specified in the InfluxDB Line Protocol, can be null + * @param parameters the parameters for writing data to InfluxDB + */ + void writeRecord(@Nullable final String record, @Nonnull final WriteParameters parameters); + + /** + * Write records specified in the InfluxDB Line Protocol to the InfluxDB server. + * + * @param records the records specified in the InfluxDB Line Protocol, cannot be null + */ + void writeRecords(@Nonnull final List records); + + /** + * Write records specified in the InfluxDB Line Protocol to the InfluxDB server. + * + * @param records the records specified in the InfluxDB Line Protocol, cannot be null + * @param parameters the parameters for writing data to InfluxDB + */ + void writeRecords(@Nonnull final List records, @Nonnull final WriteParameters parameters); + + /** + * Write a {@link Point} to the InfluxDB server. + * + * @param point the {@link Point} to write, can be null + */ + void writePoint(@Nullable final Point point); + + /** + * Write a {@link Point} to the InfluxDB server. + * + * @param point the {@link Point} to write, can be null + * @param parameters the parameters for writing data to InfluxDB + */ + void writePoint(@Nullable final Point point, @Nonnull final WriteParameters parameters); + + /** + * Write a list of {@link Point} to the InfluxDB server. + * + * @param points the list of {@link Point} to write, cannot be null + */ + void writePoints(@Nonnull final List points); + + /** + * Write a list of {@link Point} to the InfluxDB server. + * + * @param points the list of {@link Point} to write, cannot be null + * @param parameters the parameters for writing data to InfluxDB + */ + void writePoints(@Nonnull final List points, @Nonnull final WriteParameters parameters); + + /** + * Query data from InfluxDB IOx using FlightSQL. + *

+ * The result stream should be closed after use, you can use try-resource pattern to close it automatically: + *

+     * try (Stream<Object[]> rows = client.query("select * from cpu")) {
+     *      rows.forEach(row -> {
+     *          // process row
+     *      }
+     * });
+     * 
+ * + * @param query the SQL query string to execute, cannot be null + * @return Batches of rows returned by the query + */ + @Nonnull + Stream query(@Nonnull final String query); + + /** + * Query data from InfluxDB IOx using FlightSQL. + *

+ * The result stream should be closed after use, you can use try-resource pattern to close it automatically: + *

+     * try (Stream<Object[]> rows = client.query("select * from cpu", parameters)) {
+     *      rows.forEach(row -> {
+     *          // process row
+     *      }
+     * });
+     * 
+ * + * @param query the SQL query string to execute, cannot be null + * @param parameters the parameters for querying data from InfluxDB + * @return Batches of rows returned by the query + */ + @Nonnull + Stream query(@Nonnull final String query, @Nonnull final QueryParameters parameters); + + /** + * Query data from InfluxDB IOx using FlightSQL. + *

+ * The result stream should be closed after use, you can use try-resource pattern to close it automatically: + *

+     * try (Stream<VectorSchemaRoot> batches = client.queryBatches("select * from cpu")) {
+     *      batches.forEach(batch -> {
+     *          // process batch
+     *      }
+     * });
+     * 
+ * + * @param query the SQL query string to execute, cannot be null + * @return Batches of rows returned by the query + */ + @Nonnull + Stream queryBatches(@Nonnull final String query); + + /** + * Query data from InfluxDB IOx using FlightSQL. + *
+     * try (Stream<VectorSchemaRoot> batches = client.queryBatches("select * from cpu", parameters)) {
+     *      batches.forEach(batch -> {
+     *          // process batch
+     *      }
+     * });
+     * 
+ * + * @param query the SQL query string to execute, cannot be null + * @param parameters the parameters for querying data from InfluxDB + * @return Batches of rows returned by the query + */ + @Nonnull + Stream queryBatches(@Nonnull final String query, @Nonnull final QueryParameters parameters); + + /** + * Creates a new instance of the {@link InfluxDBClient} for interacting with an InfluxDB server, simplifying + * common operations such as writing, querying. + * + * @param hostUrl the hostname or IP address of the InfluxDB server + * @param authToken the authentication token for accessing the InfluxDB server, can be null + * @param database the database to be used for InfluxDB operations, can be null + * @return new instance of the {@link InfluxDBClient} + */ + @Nonnull + static InfluxDBClient getInstance(@Nonnull final String hostUrl, + @Nullable final char[] authToken, + @Nullable final String database) { + InfluxDBClientConfigs configs = new InfluxDBClientConfigs.Builder() + .hostUrl(hostUrl) + .authToken(authToken) + .database(database) + .build(); + + return getInstance(configs); + } + + /** + * Creates a new instance of the {@link InfluxDBClient} for interacting with an InfluxDB server, simplifying + * common operations such as writing, querying. + * + * @param configs the configuration for the InfluxDB client + * @return new instance of the {@link InfluxDBClient} + */ + @Nonnull + static InfluxDBClient getInstance(@Nonnull final InfluxDBClientConfigs configs) { + return new InfluxDBClientImpl(configs); + } } diff --git a/src/main/java/com/influxdb/v3/client/config/InfluxDBClientConfigs.java b/src/main/java/com/influxdb/v3/client/config/InfluxDBClientConfigs.java new file mode 100644 index 0000000..cff2546 --- /dev/null +++ b/src/main/java/com/influxdb/v3/client/config/InfluxDBClientConfigs.java @@ -0,0 +1,319 @@ +/* + * The MIT License + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + * THE SOFTWARE. + */ +package com.influxdb.v3.client.config; + +import java.time.Duration; +import java.util.Arrays; +import java.util.Objects; +import java.util.StringJoiner; +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + +import com.influxdb.v3.client.write.WritePrecision; + +/** + * Configuration properties for an {@code InfluxDBClient}. + *

+ * Immutable class. + */ +public final class InfluxDBClientConfigs { + + private final String hostUrl; + private final char[] authToken; + private final String organization; + private final String database; + private final WritePrecision writePrecision; + private final Duration responseTimeout; + private final Boolean allowHttpRedirects; + private final Boolean disableServerCertificateValidation; + + /** + * Gets hostname or IP address of the InfluxDB server. + * + * @return hostname or IP address of the InfluxDB server + */ + @Nonnull + public String getHostUrl() { + return hostUrl; + } + + /** + * Gets authentication token for accessing the InfluxDB server. + * + * @return authentication token for accessing the InfluxDB server, may be null + */ + @Nullable + public char[] getAuthToken() { + return authToken; + } + + /** + * Gets organization to be used for operations. + * + * @return organization to be used for operations, may be null + */ + @Nullable + public String getOrganization() { + return organization; + } + + /** + * Gets database to be used for InfluxDB operations. + * + * @return database to be used for InfluxDB operations, may be null + */ + @Nullable + public String getDatabase() { + return database; + } + + /** + * Gets the default precision to use for the timestamp of points. + * + * @return the default precision to use for the timestamp of points, may be null + */ + @Nullable + public WritePrecision getWritePrecision() { + return writePrecision; + } + + /** + * Gets the default response timeout to use for the API calls. Default to '10 seconds'. + * + * @return the default response timeout to use for the API calls + */ + @Nonnull + public Duration getResponseTimeout() { + return responseTimeout; + } + + /** + * Gets the automatically following HTTP 3xx redirects. Default to 'false'. + * + * @return the automatically following HTTP 3xx redirects + */ + @Nonnull + public Boolean getAllowHttpRedirects() { + return allowHttpRedirects; + } + + /** + * Gets the disable server SSL certificate validation. Default to 'false'. + * + * @return the disable server SSL certificate validation + */ + public Boolean getDisableServerCertificateValidation() { + return disableServerCertificateValidation; + } + + /** + * Validates the configuration properties. + */ + public void validate() { + if (hostUrl == null || hostUrl.isEmpty()) { + throw new IllegalArgumentException("The hostname or IP address of the InfluxDB server has to be defined."); + } + } + + @Override + public boolean equals(final Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + InfluxDBClientConfigs that = (InfluxDBClientConfigs) o; + return Objects.equals(hostUrl, that.hostUrl) + && Arrays.equals(authToken, that.authToken) + && Objects.equals(organization, that.organization) + && Objects.equals(database, that.database) + && writePrecision == that.writePrecision + && Objects.equals(responseTimeout, that.responseTimeout) + && Objects.equals(allowHttpRedirects, that.allowHttpRedirects) + && Objects.equals(disableServerCertificateValidation, that.disableServerCertificateValidation); + } + + @Override + public int hashCode() { + return Objects.hash(hostUrl, Arrays.hashCode(authToken), organization, database, writePrecision, + responseTimeout, allowHttpRedirects, disableServerCertificateValidation); + } + + @Override + public String toString() { + return new StringJoiner(", ", InfluxDBClientConfigs.class.getSimpleName() + "InfluxDBClientConfigs[", "]") + .add("hostUrl='" + hostUrl + "'") + .add("organization='" + organization + "'") + .add("database='" + database + "'") + .add("writePrecision=" + writePrecision) + .add("responseTimeout=" + responseTimeout) + .add("allowHttpRedirects=" + allowHttpRedirects) + .add("disableServerCertificateValidation=" + disableServerCertificateValidation) + .toString(); + } + + /** + * A builder for {@code InfluxDBClientConfigs}. + *

+ * Mutable. + */ + public static final class Builder { + private String hostUrl; + private char[] authToken; + private String organization; + private String database; + private WritePrecision writePrecision; + private Duration responseTimeout; + private Boolean allowHttpRedirects; + private Boolean disableServerCertificateValidation; + + /** + * Sets the hostname or IP address of the InfluxDB server. + * + * @param hostUrl hostname or IP address of the InfluxDB server + * @return this + */ + @Nonnull + public Builder hostUrl(@Nonnull final String hostUrl) { + + this.hostUrl = hostUrl; + return this; + } + + /** + * Sets the authentication token for accessing the InfluxDB server. + * + * @param authToken authentication token for accessing the InfluxDB server + * @return this + */ + @Nonnull + public Builder authToken(@Nullable final char[] authToken) { + + this.authToken = authToken; + return this; + } + + /** + * Sets organization to be used for operations. + * + * @param organization organization to be used for operations + * @return this + */ + @Nonnull + public Builder organization(@Nullable final String organization) { + + this.organization = organization; + return this; + } + + /** + * Sets database to be used for InfluxDB operations. + * + * @param database database to be used for InfluxDB operations + * @return this + */ + @Nonnull + public Builder database(@Nullable final String database) { + + this.database = database; + return this; + } + + /** + * Sets the default precision to use for the timestamp of points + * if no precision is specified in the write API call. + * + * @param writePrecision default precision to use for the timestamp of points + * if no precision is specified in the write API call + * @return this + */ + @Nonnull + public Builder writePrecision(@Nullable final WritePrecision writePrecision) { + + this.writePrecision = writePrecision; + return this; + } + + /** + * Sets the default response timeout to use for the API calls. Default to '10 seconds'. + * + * @param responseTimeout default response timeout to use for the API calls. Default to '10 seconds'. + * @return this + */ + @Nonnull + public Builder responseTimeout(@Nullable final Duration responseTimeout) { + + this.responseTimeout = responseTimeout; + return this; + } + + /** + * Sets the automatically following HTTP 3xx redirects. Default to 'false'. + * + * @param allowHttpRedirects automatically following HTTP 3xx redirects. Default to 'false'. + * @return this + */ + @Nonnull + public Builder allowHttpRedirects(@Nullable final Boolean allowHttpRedirects) { + + this.allowHttpRedirects = allowHttpRedirects; + return this; + } + + /** + * Sets the disable server SSL certificate validation. Default to 'false'. + * + * @param disableServerCertificateValidation Disable server SSL certificate validation. Default to 'false'. + * @return this + */ + @Nonnull + public Builder disableServerCertificateValidation(@Nullable final Boolean disableServerCertificateValidation) { + + this.disableServerCertificateValidation = disableServerCertificateValidation; + return this; + } + + /** + * Build an instance of {@code InfluxDBClientConfigs}. + * + * @return the configuration for an {@code InfluxDBClient}. + */ + @Nonnull + public InfluxDBClientConfigs build() { + return new InfluxDBClientConfigs(this); + } + } + + @SuppressWarnings("MagicNumber") + private InfluxDBClientConfigs(@Nonnull final Builder builder) { + hostUrl = builder.hostUrl; + authToken = builder.authToken; + organization = builder.organization; + database = builder.database; + writePrecision = builder.writePrecision; + responseTimeout = builder.responseTimeout != null ? builder.responseTimeout : Duration.ofSeconds(10); + allowHttpRedirects = builder.allowHttpRedirects != null ? builder.allowHttpRedirects : false; + disableServerCertificateValidation = builder.disableServerCertificateValidation != null + ? builder.disableServerCertificateValidation : false; + } +} diff --git a/src/main/java/com/influxdb/v3/client/internal/Arguments.java b/src/main/java/com/influxdb/v3/client/internal/Arguments.java new file mode 100644 index 0000000..637b465 --- /dev/null +++ b/src/main/java/com/influxdb/v3/client/internal/Arguments.java @@ -0,0 +1,165 @@ +/* + * The MIT License + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + * THE SOFTWARE. + */ +package com.influxdb.v3.client.internal; + +import java.time.temporal.ChronoUnit; +import java.util.EnumSet; +import java.util.Objects; +import java.util.regex.Pattern; +import javax.annotation.Nullable; + +/** + * Functions for parameter validation. + *

+ * Thanks + */ +public final class Arguments { + + private static final Pattern DURATION_PATTERN = Pattern.compile("([-+]?)([0-9]+(\\.[0-9]*)?[a-z]+)+|inf|-inf", + Pattern.CASE_INSENSITIVE); + + private static final String DURATION_MESSAGE = "Expecting a duration string for %s. But got: %s"; + + /** + * The precisions that are allowed to use in the writes. + */ + private static final EnumSet ALLOWED_PRECISION = EnumSet.of(ChronoUnit.NANOS, + ChronoUnit.MICROS, ChronoUnit.MILLIS, ChronoUnit.SECONDS); + + private Arguments() { + } + + /** + * Enforces that the string is {@linkplain String#isEmpty() not empty}. + * + * @param string the string to test + * @param name variable name for reporting + * @return {@code string} + * @throws IllegalArgumentException if the string is empty + */ + public static String checkNonEmpty(final String string, final String name) throws IllegalArgumentException { + if (string == null || string.isEmpty()) { + throw new IllegalArgumentException("Expecting a non-empty string for " + name); + } + return string; + } + + /** + * Enforces that the string has exactly one char. + * + * @param string the string to test + * @param name variable name for reporting + * @return {@code string} + * @throws IllegalArgumentException if the string has not one char + */ + public static String checkOneCharString(final String string, final String name) throws IllegalArgumentException { + if (string == null || string.length() != 1) { + throw new IllegalArgumentException("Expecting a one char string for " + name); + } + return string; + } + + /** + * Enforces that the string is duration literal. + * + * @param string the string to test + * @param name variable name for reporting + * @return {@code string} + * @throws IllegalArgumentException if the string is not duration literal + */ + public static String checkDuration(final String string, final String name) throws IllegalArgumentException { + if (string == null || string.isEmpty() || !DURATION_PATTERN.matcher(string).matches()) { + throw new IllegalArgumentException(String.format(DURATION_MESSAGE, name, string)); + } + + return string; + } + + /** + * Enforces that the string is duration literal. Empty or null strings are valid. + * + * @param string the string to test + * @param name variable name for reporting + * @return {@code string} + * @throws IllegalArgumentException if the string is not duration literal + */ + public static String checkDurationNotRequired(final String string, final String name) + throws IllegalArgumentException { + if (string != null && !string.isEmpty() && !DURATION_PATTERN.matcher(string).matches()) { + throw new IllegalArgumentException(String.format(DURATION_MESSAGE, name, string)); + } + + return string; + } + + /** + * Enforces that the number is larger than 0. + * + * @param number the number to test + * @param name variable name for reporting + * @throws IllegalArgumentException if the number is less or equal to 0 + */ + public static void checkPositiveNumber(final Number number, final String name) throws IllegalArgumentException { + if (number == null || number.doubleValue() <= 0) { + throw new IllegalArgumentException("Expecting a positive number for " + name); + } + } + + /** + * Enforces that the number is not negative. + * + * @param number the number to test + * @param name variable name for reporting + * @throws IllegalArgumentException if the number is less or equal to 0 + */ + public static void checkNotNegativeNumber(final Number number, final String name) throws IllegalArgumentException { + if (number == null || number.doubleValue() < 0) { + throw new IllegalArgumentException("Expecting a positive or zero number for " + name); + } + } + + /** + * Checks that the specified object reference is not {@code null}. + * + * @param obj the object reference to check for nullity + * @param name variable name for reporting + * @throws NullPointerException if the object is {@code null} + * @see Objects#requireNonNull(Object, String) + */ + public static void checkNotNull(final Object obj, final String name) throws NullPointerException { + + Objects.requireNonNull(obj, () -> "Expecting a not null reference for " + name); + } + + /** + * Checks that the precision reference to one of {@link Arguments#ALLOWED_PRECISION}. + * + * @param precision the precision to check + * @throws IllegalArgumentException if the object is not one of {@link Arguments#ALLOWED_PRECISION} + */ + public static void checkPrecision(@Nullable final ChronoUnit precision) throws IllegalArgumentException { + + if (!ALLOWED_PRECISION.contains(precision)) { + throw new IllegalArgumentException("Precision must be one of: " + ALLOWED_PRECISION); + } + } +} diff --git a/src/main/java/com/influxdb/v3/client/internal/FlightSqlClient.java b/src/main/java/com/influxdb/v3/client/internal/FlightSqlClient.java new file mode 100644 index 0000000..8360df7 --- /dev/null +++ b/src/main/java/com/influxdb/v3/client/internal/FlightSqlClient.java @@ -0,0 +1,170 @@ +/* + * The MIT License + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + * THE SOFTWARE. + */ +package com.influxdb.v3.client.internal; + +import java.net.URI; +import java.net.URISyntaxException; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.NoSuchElementException; +import java.util.Spliterator; +import java.util.Spliterators; +import java.util.stream.Stream; +import java.util.stream.StreamSupport; +import javax.annotation.Nonnull; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import io.grpc.Metadata; +import org.apache.arrow.flight.FlightClient; +import org.apache.arrow.flight.FlightStream; +import org.apache.arrow.flight.HeaderCallOption; +import org.apache.arrow.flight.Location; +import org.apache.arrow.flight.Ticket; +import org.apache.arrow.flight.grpc.MetadataAdapter; +import org.apache.arrow.memory.RootAllocator; +import org.apache.arrow.util.AutoCloseables; +import org.apache.arrow.vector.VectorSchemaRoot; + +import com.influxdb.v3.client.config.InfluxDBClientConfigs; +import com.influxdb.v3.client.query.QueryType; + +final class FlightSqlClient implements AutoCloseable { + + private final HeaderCallOption headers; + private final FlightClient client; + + private final ObjectMapper objectMapper = new ObjectMapper(); + + FlightSqlClient(@Nonnull final InfluxDBClientConfigs configs) { + Arguments.checkNotNull(configs, "configs"); + + MetadataAdapter metadata = new MetadataAdapter(new Metadata()); + if (configs.getAuthToken() != null && configs.getAuthToken().length > 0) { + metadata.insert("Authorization", "Bearer " + new String(configs.getAuthToken())); + } + + this.headers = new HeaderCallOption(metadata); + + Location location; + try { + URI uri = new URI(configs.getHostUrl()); + if ("https".equals(uri.getScheme())) { + location = Location.forGrpcTls(uri.getHost(), uri.getPort() != -1 ? uri.getPort() : 443); + } else { + location = Location.forGrpcInsecure(uri.getHost(), uri.getPort() != -1 ? uri.getPort() : 80); + } + } catch (URISyntaxException e) { + throw new RuntimeException(e); + } + + client = FlightClient.builder() + .location(location) + .allocator(new RootAllocator(Long.MAX_VALUE)) + .verifyServer(!configs.getDisableServerCertificateValidation()) + .build(); + } + + @Nonnull + Stream execute(@Nonnull final String query, + @Nonnull final String database, + @Nonnull final QueryType queryType) { + + HashMap ticketData = new HashMap() {{ + put("database", database); + put("sql_query", query); + put("query_type", queryType.name().toLowerCase()); + }}; + + String json; + try { + json = objectMapper.writeValueAsString(ticketData); + } catch (JsonProcessingException e) { + throw new RuntimeException(e); + } + + Ticket ticket = new Ticket(json.getBytes(StandardCharsets.UTF_8)); + FlightStream stream = client.getStream(ticket, headers); + FlightSqlIterator iterator = new FlightSqlIterator(stream); + + Spliterator spliterator = Spliterators.spliteratorUnknownSize(iterator, Spliterator.NONNULL); + return StreamSupport.stream(spliterator, false).onClose(iterator::close); + } + + @Override + public void close() throws Exception { + client.close(); + } + + private static final class FlightSqlIterator implements Iterator, AutoCloseable { + + private final List autoCloseable = new ArrayList<>(); + + private final FlightStream flightStream; + private VectorSchemaRoot currentVectorSchemaRoot = null; + + private FlightSqlIterator(@Nonnull final FlightStream flightStream) { + this.flightStream = flightStream; + loadNext(); + } + + @Override + public boolean hasNext() { + return currentVectorSchemaRoot != null; + } + + @Override + public VectorSchemaRoot next() { + if (currentVectorSchemaRoot == null) { + throw new NoSuchElementException(); + } + + VectorSchemaRoot oldVectorSchemaRoot = currentVectorSchemaRoot; + loadNext(); + + return oldVectorSchemaRoot; + } + + @Override + public void close() { + try { + AutoCloseables.close(autoCloseable); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + private void loadNext() { + + if (flightStream != null && flightStream.next()) { + currentVectorSchemaRoot = flightStream.getRoot(); + autoCloseable.add(currentVectorSchemaRoot); + + } else { + currentVectorSchemaRoot = null; + } + } + } +} diff --git a/src/main/java/com/influxdb/v3/client/internal/InfluxDBClientImpl.java b/src/main/java/com/influxdb/v3/client/internal/InfluxDBClientImpl.java new file mode 100644 index 0000000..dec0b9c --- /dev/null +++ b/src/main/java/com/influxdb/v3/client/internal/InfluxDBClientImpl.java @@ -0,0 +1,236 @@ +/* + * The MIT License + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + * THE SOFTWARE. + */ +package com.influxdb.v3.client.internal; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.logging.Logger; +import java.util.stream.Collectors; +import java.util.stream.IntStream; +import java.util.stream.Stream; +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + +import io.netty.handler.codec.http.HttpMethod; +import org.apache.arrow.vector.FieldVector; +import org.apache.arrow.vector.VectorSchemaRoot; + +import com.influxdb.v3.client.InfluxDBClient; +import com.influxdb.v3.client.config.InfluxDBClientConfigs; +import com.influxdb.v3.client.query.QueryParameters; +import com.influxdb.v3.client.write.Point; +import com.influxdb.v3.client.write.WriteParameters; +import com.influxdb.v3.client.write.WritePrecision; + +/** + * Implementation of the InfluxDBClient. It is thread-safe and can be safely shared between threads. + *

+ * Please use {@link InfluxDBClient} to create an instance. + */ +public final class InfluxDBClientImpl implements InfluxDBClient { + + private static final Logger LOG = Logger.getLogger(InfluxDBClientImpl.class.getName()); + + private static final String DATABASE_REQUIRED_MESSAGE = "Please specify the 'Database' as a method parameter " + + "or use default configuration at 'InfluxDBClientConfigs.database'."; + + private boolean closed = false; + private final InfluxDBClientConfigs configs; + + private final RestClient restClient; + private final FlightSqlClient flightSqlClient; + + /** + * Creates an instance using the specified configs. + *

+ * Please use {@link InfluxDBClient} to create an instance. + * + * @param configs the client configs. + */ + public InfluxDBClientImpl(@Nonnull final InfluxDBClientConfigs configs) { + Arguments.checkNotNull(configs, "configs"); + + configs.validate(); + + this.configs = configs; + this.restClient = new RestClient(configs); + this.flightSqlClient = new FlightSqlClient(configs); + } + + @Override + public void writeRecord(@Nullable final String record) { + writeRecord(record, WriteParameters.DEFAULTS); + } + + @Override + public void writeRecord(@Nullable final String record, @Nonnull final WriteParameters parameters) { + if (record == null) { + return; + } + + writeRecords(Collections.singletonList(record), parameters); + } + + @Override + public void writeRecords(@Nonnull final List records) { + writeRecords(records, WriteParameters.DEFAULTS); + } + + @Override + public void writeRecords(@Nonnull final List records, @Nonnull final WriteParameters parameters) { + writeData(records, parameters); + } + + @Override + public void writePoint(@Nullable final Point point) { + writePoint(point, WriteParameters.DEFAULTS); + } + + @Override + public void writePoint(@Nullable final Point point, @Nonnull final WriteParameters parameters) { + if (point == null) { + return; + } + + writePoints(Collections.singletonList(point), parameters); + } + + @Override + public void writePoints(@Nonnull final List points) { + writePoints(points, WriteParameters.DEFAULTS); + } + + @Override + public void writePoints(@Nonnull final List points, @Nonnull final WriteParameters parameters) { + writeData(points, parameters); + } + + @Nonnull + @Override + public Stream query(@Nonnull final String query) { + return query(query, QueryParameters.DEFAULTS); + } + + @Nonnull + @Override + public Stream query(@Nonnull final String query, @Nonnull final QueryParameters parameters) { + return queryData(query, parameters) + .flatMap(vector -> { + List fieldVectors = vector.getFieldVectors(); + return IntStream + .range(0, vector.getRowCount()) + .mapToObj(rowNumber -> { + + ArrayList row = new ArrayList<>(); + for (FieldVector fieldVector : fieldVectors) { + row.add(fieldVector.getObject(rowNumber)); + } + return row.toArray(); + }); + }); + } + + @Nonnull + @Override + public Stream queryBatches(@Nonnull final String query) { + return queryBatches(query, QueryParameters.DEFAULTS); + } + + @Nonnull + @Override + public Stream queryBatches(@Nonnull final String query, + @Nonnull final QueryParameters parameters) { + return queryData(query, parameters); + } + + @Override + public void close() throws Exception { + restClient.close(); + flightSqlClient.close(); + closed = true; + } + + private void writeData(@Nonnull final List data, @Nonnull final WriteParameters parameters) { + + Arguments.checkNotNull(data, "data"); + Arguments.checkNotNull(parameters, "parameters"); + + if (closed) { + throw new IllegalStateException("InfluxDBClient has been closed."); + } + + String database = parameters.databaseSafe(configs); + if (database == null || database.isEmpty()) { + throw new IllegalStateException("Please specify the 'Database' as a method parameter " + + "or use default configuration at 'InfluxDBClientConfigs.database'."); + } + + WritePrecision precision = parameters.precisionSafe(configs); + Map queryParams = new HashMap<>() {{ + put("bucket", database); + put("org", parameters.organizationSafe(configs)); + put("precision", precision.name().toLowerCase()); + }}; + + String lineProtocol = data.stream().map(item -> { + if (item == null) { + return null; + } else if (item instanceof Point) { + return ((Point) item).toLineProtocol(); + } else { + return item.toString(); + } + }) + .filter(it -> it != null && !it.isEmpty()) + .collect(Collectors.joining("\n")); + + if (lineProtocol.isEmpty()) { + + LOG.warning("No data to write, please check your input data."); + return; + } + + restClient.request("api/v2/write", HttpMethod.POST, lineProtocol, "text/plain; charset=utf-8", queryParams); + } + + @Nonnull + private Stream queryData(@Nonnull final String query, + @Nonnull final QueryParameters parameters) { + + Arguments.checkNonEmpty(query, "query"); + Arguments.checkNotNull(parameters, "parameters"); + + if (closed) { + throw new IllegalStateException("InfluxDBClient has been closed."); + } + + String database = parameters.databaseSafe(configs); + if (database == null || database.isEmpty()) { + throw new IllegalStateException(DATABASE_REQUIRED_MESSAGE); + } + + return flightSqlClient.execute(query, database, parameters.queryTypeSafe()); + } +} diff --git a/src/main/java/com/influxdb/v3/client/internal/NanosecondConverter.java b/src/main/java/com/influxdb/v3/client/internal/NanosecondConverter.java new file mode 100644 index 0000000..f47c922 --- /dev/null +++ b/src/main/java/com/influxdb/v3/client/internal/NanosecondConverter.java @@ -0,0 +1,114 @@ +/* + * The MIT License + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + * THE SOFTWARE. + */ +package com.influxdb.v3.client.internal; + +import java.math.BigDecimal; +import java.math.BigInteger; +import java.time.Instant; +import java.util.HashMap; +import java.util.Map; +import java.util.function.Function; +import javax.annotation.Nullable; + +import com.influxdb.v3.client.write.WritePrecision; + +import static java.util.function.Function.identity; + +/** + * Nanosecond converter. + *

+ * Utility class converting epoch nanoseconds to epoch with a given precision. + */ +public final class NanosecondConverter { + + private static final BigInteger NANOS_PER_SECOND = BigInteger.valueOf(1000_000_000L); + private static final BigInteger MICRO_PER_NANOS = BigInteger.valueOf(1000L); + private static final BigInteger MILLIS_PER_NANOS = BigInteger.valueOf(1000_000L); + private static final BigInteger SECONDS_PER_NANOS = BigInteger.valueOf(1000_000_000L); + + private NanosecondConverter() { + } + + /** + * Timestamp calculation functions to add timestamp to records. + */ + private static final Map> FROM_NANOS = new HashMap<>(); + + static { + FROM_NANOS.put(WritePrecision.S, (timestamp) -> timestamp.divide(SECONDS_PER_NANOS)); + FROM_NANOS.put(WritePrecision.MS, (timestamp) -> timestamp.divide(MILLIS_PER_NANOS)); + FROM_NANOS.put(WritePrecision.US, (timestamp) -> timestamp.divide(MICRO_PER_NANOS)); + FROM_NANOS.put(WritePrecision.NS, identity()); + } + + /** + * Timestamp calculation functions from specified precision to nanos. + */ + private static final Map> TO_NANOS = new HashMap<>(); + + static { + TO_NANOS.put(WritePrecision.S, (timestamp) -> timestamp.multiply(SECONDS_PER_NANOS)); + TO_NANOS.put(WritePrecision.MS, (timestamp) -> timestamp.multiply(MILLIS_PER_NANOS)); + TO_NANOS.put(WritePrecision.US, (timestamp) -> timestamp.multiply(MICRO_PER_NANOS)); + TO_NANOS.put(WritePrecision.NS, identity()); + } + + /** + * Convert timestamp in a given precision to nanoseconds. + * + * @param timestamp epoch timestamp + * @param precision precision + * @return epoch timestamp in precision, can be null + */ + @Nullable + public static BigInteger convertToNanos(@Nullable final Number timestamp, final WritePrecision precision) { + if (timestamp == null) { + return null; + } + + BigInteger t; + if (timestamp instanceof BigDecimal) { + t = ((BigDecimal) timestamp).toBigInteger(); + } else if (timestamp instanceof BigInteger) { + t = (BigInteger) timestamp; + } else { + t = BigInteger.valueOf(timestamp.longValue()); + } + + return TO_NANOS.get(precision).apply(t); + } + + /** + * Convert {@link Instant} timestamp to a given precision. + * + * @param instant Instant timestamp + * @param precision precision + * @return epoch timestamp in precision + */ + public static BigInteger convert(final Instant instant, final WritePrecision precision) { + BigInteger nanos = BigInteger.valueOf(instant.getEpochSecond()) + .multiply(NANOS_PER_SECOND) + .add(BigInteger.valueOf(instant.getNano())); + + return FROM_NANOS.get(precision).apply(nanos); + } +} diff --git a/src/main/java/com/influxdb/v3/client/internal/RestClient.java b/src/main/java/com/influxdb/v3/client/internal/RestClient.java new file mode 100644 index 0000000..8d4be4c --- /dev/null +++ b/src/main/java/com/influxdb/v3/client/internal/RestClient.java @@ -0,0 +1,188 @@ +/* + * The MIT License + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + * THE SOFTWARE. + */ +package com.influxdb.v3.client.internal; + +import java.net.URISyntaxException; +import java.net.http.HttpClient; +import java.net.http.HttpRequest; +import java.net.http.HttpResponse; +import java.security.SecureRandom; +import java.util.Map; +import java.util.stream.Stream; +import javax.annotation.Nonnull; +import javax.annotation.Nullable; +import javax.net.ssl.SSLContext; +import javax.net.ssl.TrustManager; +import javax.net.ssl.X509TrustManager; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import io.netty.handler.codec.http.HttpMethod; +import io.netty.handler.codec.http.HttpResponseStatus; +import io.netty.handler.codec.http.QueryStringEncoder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.influxdb.v3.client.InfluxDBApiException; +import com.influxdb.v3.client.config.InfluxDBClientConfigs; + +final class RestClient implements AutoCloseable { + + private static final Logger LOG = LoggerFactory.getLogger(RestClient.class); + + private static final TrustManager[] TRUST_ALL_CERTS = new TrustManager[]{ + new X509TrustManager() { + public java.security.cert.X509Certificate[] getAcceptedIssuers() { + return null; + } + + public void checkClientTrusted( + final java.security.cert.X509Certificate[] certs, final String authType) { + } + + public void checkServerTrusted( + final java.security.cert.X509Certificate[] certs, final String authType) { + } + } + }; + + final String baseUrl; + final String userAgent; + final HttpClient client; + + private final InfluxDBClientConfigs configs; + private final ObjectMapper objectMapper = new ObjectMapper(); + + RestClient(@Nonnull final InfluxDBClientConfigs configs) { + Arguments.checkNotNull(configs, "configs"); + + this.configs = configs; + + // user agent version + Package mainPackage = RestClient.class.getPackage(); + String version = mainPackage != null ? mainPackage.getImplementationVersion() : "unknown"; + this.userAgent = String.format("influxdb3-java/%s", version != null ? version : "unknown"); + + this.baseUrl = configs.getHostUrl().endsWith("/") + ? configs.getHostUrl() : String.format("%s/", configs.getHostUrl()); + + // timeout and redirects + HttpClient.Builder builder = HttpClient.newBuilder() + .connectTimeout(configs.getResponseTimeout()) + .followRedirects(configs.getAllowHttpRedirects() + ? HttpClient.Redirect.NORMAL : HttpClient.Redirect.NEVER); + + if (baseUrl.startsWith("https")) { + try { + if (configs.getDisableServerCertificateValidation()) { + SSLContext sslContext = SSLContext.getInstance("TLS"); + sslContext.init(null, TRUST_ALL_CERTS, new SecureRandom()); + builder.sslContext(sslContext); + } + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + this.client = builder.build(); + } + + void request(@Nonnull final String path, + @Nonnull final HttpMethod method, + @Nullable final String data, + @Nullable final String contentType, + @Nullable final Map queryParams) { + + QueryStringEncoder uriEncoder = new QueryStringEncoder(String.format("%s%s", baseUrl, path)); + if (queryParams != null) { + queryParams.forEach((name, value) -> { + if (value != null && !value.isEmpty()) { + uriEncoder.addParam(name, value); + } + }); + } + + HttpRequest.Builder request = HttpRequest.newBuilder(); + + // uri + try { + request.uri(uriEncoder.toUri()); + } catch (URISyntaxException e) { + throw new InfluxDBApiException(e); + } + + // method and body + request.method(method.name(), data == null + ? HttpRequest.BodyPublishers.noBody() : HttpRequest.BodyPublishers.ofString(data)); + + // headers + if (contentType != null) { + request.header("Content-Type", contentType); + } + request.header("User-Agent", userAgent); + if (configs.getAuthToken() != null && configs.getAuthToken().length > 0) { + request.header("Authorization", String.format("Token %s", new String(configs.getAuthToken()))); + } + + HttpResponse response; + try { + response = client.send(request.build(), HttpResponse.BodyHandlers.ofString()); + } catch (Exception e) { + throw new InfluxDBApiException(e); + } + + int statusCode = response.statusCode(); + if (statusCode < 200 || statusCode >= 300) { + String reason = ""; + String body = response.body(); + if (!body.isEmpty()) { + try { + reason = objectMapper.readTree(body).get("message").asText(); + } catch (JsonProcessingException e) { + LOG.debug("Can't parse msg from response {}", response); + } + } + + if (reason.isEmpty()) { + reason = Stream.of("X-Platform-Error-Code", "X-Influx-Error", "X-InfluxDb-Error") + .map(name -> response.headers().firstValue(name).orElse(null)) + .filter(message -> message != null && !message.isEmpty()).findFirst() + .orElse(""); + } + + if (reason.isEmpty()) { + reason = body; + } + + if (reason.isEmpty()) { + reason = HttpResponseStatus.valueOf(statusCode).reasonPhrase(); + } + + String message = String.format("HTTP status code: %d; Message: %s", statusCode, reason); + throw new InfluxDBApiException(message); + } + } + + @Override + public void close() { + } +} diff --git a/src/main/java/com/influxdb/v3/client/query/QueryParameters.java b/src/main/java/com/influxdb/v3/client/query/QueryParameters.java new file mode 100644 index 0000000..0a54d00 --- /dev/null +++ b/src/main/java/com/influxdb/v3/client/query/QueryParameters.java @@ -0,0 +1,99 @@ +/* + * The MIT License + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + * THE SOFTWARE. + */ +package com.influxdb.v3.client.query; + +import javax.annotation.Nonnull; +import javax.annotation.Nullable; +import javax.annotation.concurrent.ThreadSafe; + +import com.influxdb.v3.client.config.InfluxDBClientConfigs; +import com.influxdb.v3.client.internal.Arguments; + +/** + * Query API parameters. + *

+ * Supports to specify: + *

    + *
  • database - specifies the database to be used for InfluxDB operations
  • + *
  • queryType - specifies the type of query sent to InfluxDB. Default to 'SQL'.
  • + *
+ */ +@ThreadSafe +@SuppressWarnings("ConstantConditions") +public final class QueryParameters { + + /** + * Default QueryAPI parameters. + */ + public static final QueryParameters DEFAULTS = new QueryParameters(null); + /** + * Default QueryAPI parameters for InfluxQL. + */ + public static final QueryParameters INFLUX_QL = new QueryParameters(null, QueryType.InfluxQL); + + private final String database; + private final QueryType queryType; + + /** + * Construct QueryAPI parameters. + * + * @param database The database to be used for InfluxDB operations. + * If it is not specified then use {@link InfluxDBClientConfigs#getDatabase()}. + */ + public QueryParameters(@Nullable final String database) { + this(database, QueryType.SQL); + } + + /** + * Construct QueryAPI parameters. + * + * @param database The database to be used for InfluxDB operations. + * If it is not specified then use {@link InfluxDBClientConfigs#getDatabase()}. + * @param queryType The type of query sent to InfluxDB. If it is not specified then use {@link QueryType#SQL}. + */ + public QueryParameters(@Nullable final String database, @Nullable final QueryType queryType) { + this.database = database; + this.queryType = queryType; + } + + /** + * @param configs with default value + * @return The destination database for writes. + */ + @Nullable + public String databaseSafe(@Nonnull final InfluxDBClientConfigs configs) { + Arguments.checkNotNull(configs, "configs"); + return isNotDefined(database) ? configs.getDatabase() : database; + } + + /** + * @return The type of query sent to InfluxDB, cannot be null. + */ + @Nonnull + public QueryType queryTypeSafe() { + return queryType == null ? QueryType.SQL : queryType; + } + + private boolean isNotDefined(final String option) { + return option == null || option.isEmpty(); + } +} diff --git a/src/test/java/com/influxdb/v3/client/DummyTest.java b/src/main/java/com/influxdb/v3/client/query/QueryType.java similarity index 83% rename from src/test/java/com/influxdb/v3/client/DummyTest.java rename to src/main/java/com/influxdb/v3/client/query/QueryType.java index 6107535..a47dfcd 100644 --- a/src/test/java/com/influxdb/v3/client/DummyTest.java +++ b/src/main/java/com/influxdb/v3/client/query/QueryType.java @@ -19,16 +19,20 @@ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN * THE SOFTWARE. */ -package com.influxdb.v3.client; +package com.influxdb.v3.client.query; -import org.assertj.core.api.Assertions; -import org.junit.jupiter.api.Test; - -class DummyTest { +/** + * Defines type of query sent to InfluxDB. + */ +public enum QueryType { - @Test - void mappingLinks() { + /** + * Query by SQL. + */ + SQL, - Assertions.assertThat(true).isTrue(); - } + /** + * Query by InfluxQL. + */ + InfluxQL } diff --git a/src/main/java/com/influxdb/v3/client/write/Point.java b/src/main/java/com/influxdb/v3/client/write/Point.java new file mode 100644 index 0000000..4c1304e --- /dev/null +++ b/src/main/java/com/influxdb/v3/client/write/Point.java @@ -0,0 +1,451 @@ +/* + * The MIT License + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + * THE SOFTWARE. + */ +package com.influxdb.v3.client.write; + +import java.math.BigDecimal; +import java.math.BigInteger; +import java.text.NumberFormat; +import java.time.Instant; +import java.util.Locale; +import java.util.Map; +import java.util.TreeMap; +import java.util.concurrent.TimeUnit; +import javax.annotation.Nonnull; +import javax.annotation.Nullable; +import javax.annotation.concurrent.NotThreadSafe; + +import com.influxdb.v3.client.internal.Arguments; +import com.influxdb.v3.client.internal.NanosecondConverter; + + +/** + * Point defines the values that will be written to the database. + * See Go Implementation. + * + * @author Jakub Bednar (bednar@github) (11/10/2018 11:40) + */ +@NotThreadSafe +public final class Point { + + private static final int MAX_FRACTION_DIGITS = 340; + private static final ThreadLocal NUMBER_FORMATTER = + ThreadLocal.withInitial(() -> { + NumberFormat numberFormat = NumberFormat.getInstance(Locale.ENGLISH); + numberFormat.setMaximumFractionDigits(MAX_FRACTION_DIGITS); + numberFormat.setGroupingUsed(false); + numberFormat.setMinimumFractionDigits(1); + return numberFormat; + }); + + + private final String name; + private final Map tags = new TreeMap<>(); + private final Map fields = new TreeMap<>(); + private Number time; + + /** + * Create a new Point with specified a measurement name. + * + * @param measurementName the measurement name + */ + public Point(@Nonnull final String measurementName) { + + Arguments.checkNotNull(measurementName, "measurement"); + + this.name = measurementName; + } + + /** + * Create a new Point withe specified a measurement name. + * + * @param measurementName the measurement name + * @return new instance of {@link Point} + */ + @Nonnull + public static Point measurement(@Nonnull final String measurementName) { + + Arguments.checkNotNull(measurementName, "measurement"); + + return new Point(measurementName); + } + + /** + * Adds or replaces a tag value for this point. + * + * @param key the tag name + * @param value the tag value + * @return this + */ + @Nonnull + public Point addTag(@Nonnull final String key, @Nullable final String value) { + + Arguments.checkNotNull(key, "tagName"); + + tags.put(key, value); + + return this; + } + + /** + * Adds or replaces tags for this point. + * + * @param tagsToAdd the Map of tags to add + * @return this + */ + @Nonnull + public Point addTags(@Nonnull final Map tagsToAdd) { + + Arguments.checkNotNull(tagsToAdd, "tagsToAdd"); + + tagsToAdd.forEach(this::addTag); + + return this; + } + + /** + * Add {@link Boolean} field. + * + * @param field the field name + * @param value the field value + * @return this + */ + @Nonnull + public Point addField(@Nonnull final String field, final boolean value) { + return putField(field, value); + } + + /** + * Add {@link Long} field. + * + * @param field the field name + * @param value the field value + * @return this + */ + public Point addField(@Nonnull final String field, final long value) { + return putField(field, value); + } + + /** + * Add {@link Double} field. + * + * @param field the field name + * @param value the field value + * @return this + */ + @Nonnull + public Point addField(@Nonnull final String field, final double value) { + return putField(field, value); + } + + /** + * Add {@link Number} field. + * + * @param field the field name + * @param value the field value + * @return this + */ + @Nonnull + public Point addField(@Nonnull final String field, @Nullable final Number value) { + return putField(field, value); + } + + /** + * Add {@link String} field. + * + * @param field the field name + * @param value the field value + * @return this + */ + @Nonnull + public Point addField(@Nonnull final String field, @Nullable final String value) { + return putField(field, value); + } + + /** + * Adds or replaces fields for this point. + * + * @param fieldsToAdd the Map of fields to add + * @return this + */ + @Nonnull + public Point addFields(@Nonnull final Map fieldsToAdd) { + + Arguments.checkNotNull(fieldsToAdd, "fieldsToAdd"); + + fieldsToAdd.forEach(this::putField); + + return this; + } + + /** + * Updates the timestamp for the point. + * + * @param time the timestamp + * @return this + */ + @Nonnull + public Point setTimestamp(@Nullable final Instant time) { + + if (time == null) { + return setTimestamp(null, WritePrecision.NS); + } + + BigInteger convertedTime = NanosecondConverter.convert(time, WritePrecision.NS); + + return setTimestamp(convertedTime, WritePrecision.NS); + } + + /** + * Updates the timestamp for the point. + * + * @param time the timestamp + * @param precision the timestamp precision + * @return this + */ + @Nonnull + public Point setTimestamp(@Nullable final Number time, @Nonnull final WritePrecision precision) { + + Arguments.checkNotNull(precision, "precision"); + + this.time = NanosecondConverter.convertToNanos(time, precision); + + return this; + } + + /** + * Updates the timestamp for the point. + * + * @param time the timestamp + * @param precision the timestamp precision + * @return this + */ + @Nonnull + public Point setTimestamp(@Nullable final Long time, @Nonnull final WritePrecision precision) { + + return setTimestamp((Number) time, precision); + } + + /** + * Has point any fields? + * + * @return true, if the point contains any fields, false otherwise. + */ + public boolean hasFields() { + return !fields.isEmpty(); + } + + /** + * Transform to Line Protocol with nanosecond precision. + * + * @return Line Protocol + */ + @Nonnull + public String toLineProtocol() { + return toLineProtocol(null); + } + + /** + * Transform to Line Protocol. + * + * @param precision required precision + * @return Line Protocol + */ + @Nonnull + public String toLineProtocol(@Nullable final WritePrecision precision) { + + StringBuilder sb = new StringBuilder(); + + escapeKey(sb, name, false); + appendTags(sb); + boolean appendedFields = appendFields(sb); + if (!appendedFields) { + return ""; + } + appendTime(sb, precision); + + return sb.toString(); + } + + @Nonnull + private Point putField(@Nonnull final String field, @Nullable final Object value) { + + Arguments.checkNonEmpty(field, "fieldName"); + + fields.put(field, value); + return this; + } + + private void appendTags(@Nonnull final StringBuilder sb) { + + for (Map.Entry tag : this.tags.entrySet()) { + + String key = tag.getKey(); + String value = tag.getValue(); + + if (key.isEmpty() || value == null || value.isEmpty()) { + continue; + } + + sb.append(','); + escapeKey(sb, key); + sb.append('='); + escapeKey(sb, value); + } + sb.append(' '); + } + + private boolean appendFields(@Nonnull final StringBuilder sb) { + + boolean appended = false; + for (Map.Entry field : this.fields.entrySet()) { + Object value = field.getValue(); + if (isNotDefined(value)) { + continue; + } + escapeKey(sb, field.getKey()); + sb.append('='); + if (value instanceof Number) { + if (value instanceof Double || value instanceof Float || value instanceof BigDecimal) { + sb.append(NUMBER_FORMATTER.get().format(value)); + } else { + sb.append(value).append('i'); + } + } else if (value instanceof String) { + String stringValue = (String) value; + sb.append('"'); + escapeValue(sb, stringValue); + sb.append('"'); + } else { + sb.append(value); + } + + sb.append(','); + + appended = true; + } + + // efficiently chop off the trailing comma + int lengthMinusOne = sb.length() - 1; + if (sb.charAt(lengthMinusOne) == ',') { + sb.setLength(lengthMinusOne); + } + + return appended; + } + + private void appendTime(@Nonnull final StringBuilder sb, @Nullable final WritePrecision precision) { + + if (this.time == null) { + return; + } + + sb.append(" "); + + WritePrecision precisionNotNull = precision != null ? precision : WriteParameters.DEFAULT_WRITE_PRECISION; + + if (WritePrecision.NS.equals(precisionNotNull)) { + if (this.time instanceof BigDecimal) { + sb.append(((BigDecimal) this.time).toBigInteger()); + } else if (this.time instanceof BigInteger) { + sb.append(this.time); + } else { + sb.append(this.time.longValue()); + } + } else { + long time; + if (this.time instanceof BigDecimal) { + time = ((BigDecimal) this.time).longValueExact(); + } else if (this.time instanceof BigInteger) { + time = ((BigInteger) this.time).longValueExact(); + } else { + time = this.time.longValue(); + } + sb.append(toTimeUnit(precisionNotNull).convert(time, toTimeUnit(WritePrecision.NS))); + } + } + + private void escapeKey(@Nonnull final StringBuilder sb, @Nonnull final String key) { + escapeKey(sb, key, true); + } + + private void escapeKey(@Nonnull final StringBuilder sb, @Nonnull final String key, final boolean escapeEqual) { + for (int i = 0; i < key.length(); i++) { + switch (key.charAt(i)) { + case '\n': + sb.append("\\n"); + continue; + case '\r': + sb.append("\\r"); + continue; + case '\t': + sb.append("\\t"); + continue; + case ' ': + case ',': + sb.append('\\'); + break; + case '=': + if (escapeEqual) { + sb.append('\\'); + } + break; + default: + } + + sb.append(key.charAt(i)); + } + } + + private void escapeValue(@Nonnull final StringBuilder sb, @Nonnull final String value) { + for (int i = 0; i < value.length(); i++) { + switch (value.charAt(i)) { + case '\\': + case '\"': + sb.append('\\'); + default: + sb.append(value.charAt(i)); + } + } + } + + private boolean isNotDefined(final Object value) { + return value == null + || (value instanceof Double && !Double.isFinite((Double) value)) + || (value instanceof Float && !Float.isFinite((Float) value)); + } + + @Nonnull + private TimeUnit toTimeUnit(@Nonnull final WritePrecision precision) { + switch (precision) { + case MS: + return TimeUnit.MILLISECONDS; + case S: + return TimeUnit.SECONDS; + case US: + return TimeUnit.MICROSECONDS; + case NS: + return TimeUnit.NANOSECONDS; + default: + throw new IllegalStateException("Unexpected value: " + precision); + } + } +} diff --git a/src/main/java/com/influxdb/v3/client/write/WriteParameters.java b/src/main/java/com/influxdb/v3/client/write/WriteParameters.java new file mode 100644 index 0000000..9526468 --- /dev/null +++ b/src/main/java/com/influxdb/v3/client/write/WriteParameters.java @@ -0,0 +1,146 @@ +/* + * The MIT License + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + * THE SOFTWARE. + */ +package com.influxdb.v3.client.write; + +import java.util.Objects; +import javax.annotation.Nonnull; +import javax.annotation.Nullable; +import javax.annotation.concurrent.ThreadSafe; + +import com.influxdb.v3.client.config.InfluxDBClientConfigs; +import com.influxdb.v3.client.internal.Arguments; + +/** + * Write API parameters. + *

+ * Supports to specify: + *

    + *
  • database - specifies the database to be used for InfluxDB operations
  • + *
  • organization - specifies the organization to be used for InfluxDB operations
  • + *
  • precision - specified the precision to use for the timestamp of points
  • + *
+ */ +@ThreadSafe +@SuppressWarnings("ConstantConditions") +public final class WriteParameters { + + /** + * Default WritePrecision. + */ + public static final WritePrecision DEFAULT_WRITE_PRECISION = WritePrecision.NS; + /** + * Default WriteParameters. + */ + public static final WriteParameters DEFAULTS = new WriteParameters(null, null, null); + + private final String database; + private final String organization; + private final WritePrecision precision; + + /** + * Construct WriteAPI parameters. + * + * @param database The database to be used for InfluxDB operations. + * If it is not specified then use {@link InfluxDBClientConfigs#getDatabase()}. + * @param organization The organization to be used for InfluxDB operations. + * If it is not specified then use {@link InfluxDBClientConfigs#getOrganization()}. + * @param precision The precision to use for the timestamp of points. + * If it is not specified then use {@link WritePrecision#NS}. + */ + public WriteParameters(@Nullable final String database, + @Nullable final String organization, + @Nullable final WritePrecision precision) { + this.database = database; + this.organization = organization; + this.precision = precision; + } + + /** + * @param configs with default value + * @return The destination organization for writes. + */ + @Nullable + public String organizationSafe(@Nonnull final InfluxDBClientConfigs configs) { + Arguments.checkNotNull(configs, "configs"); + return isNotDefined(organization) ? configs.getOrganization() : organization; + } + + /** + * @param configs with default value + * @return The destination database for writes. + */ + @Nullable + public String databaseSafe(@Nonnull final InfluxDBClientConfigs configs) { + Arguments.checkNotNull(configs, "configs"); + return isNotDefined(database) ? configs.getDatabase() : database; + } + + /** + * @param configs with default value + * @return Precision for unix timestamps in the line protocol of the request payload. + */ + @Nonnull + public WritePrecision precisionSafe(@Nonnull final InfluxDBClientConfigs configs) { + Arguments.checkNotNull(configs, "configs"); + return precision != null ? precision + : (configs.getWritePrecision() != null ? configs.getWritePrecision() : DEFAULT_WRITE_PRECISION); + } + + /** + * Copy current parameters with new precision. + * + * @param precision new precision + * @param configs default values + * @return copied parameters + */ + @Nonnull + public WriteParameters copy(@Nonnull final WritePrecision precision, + @Nonnull final InfluxDBClientConfigs configs) { + + Arguments.checkNotNull(precision, "precision"); + Arguments.checkNotNull(configs, "configs"); + + return new WriteParameters(databaseSafe(configs), organizationSafe(configs), precision); + } + + @Override + public boolean equals(final Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + WriteParameters that = (WriteParameters) o; + return Objects.equals(database, that.database) && Objects.equals(organization, that.organization) + && precision == that.precision; + } + + @Override + public int hashCode() { + return Objects.hash(database, organization, precision); + } + + private boolean isNotDefined(final String option) { + return option == null || option.isEmpty(); + } +} diff --git a/src/main/java/com/influxdb/v3/client/write/WritePrecision.java b/src/main/java/com/influxdb/v3/client/write/WritePrecision.java new file mode 100644 index 0000000..0dde484 --- /dev/null +++ b/src/main/java/com/influxdb/v3/client/write/WritePrecision.java @@ -0,0 +1,46 @@ +/* + * The MIT License + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + * THE SOFTWARE. + */ +package com.influxdb.v3.client.write; + +/** + * Defines WritePrecision. + */ +public enum WritePrecision { + + /** + * Time precision in milliseconds. + */ + MS, + /** + * Time precision in seconds. + */ + S, + /** + * Time precision in microseconds. + */ + US, + /** + * Time precision in nanoseconds. + */ + NS +} + diff --git a/src/test/java/com/influxdb/v3/client/AbstractMockServerTest.java b/src/test/java/com/influxdb/v3/client/AbstractMockServerTest.java new file mode 100644 index 0000000..3140ce9 --- /dev/null +++ b/src/test/java/com/influxdb/v3/client/AbstractMockServerTest.java @@ -0,0 +1,63 @@ +/* + * The MIT License + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + * THE SOFTWARE. + */ +package com.influxdb.v3.client; + +import java.io.IOException; +import javax.annotation.Nonnull; + +import okhttp3.mockwebserver.MockResponse; +import okhttp3.mockwebserver.MockWebServer; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; + +public abstract class AbstractMockServerTest { + + protected String baseURL; + protected MockWebServer mockServer; + + @BeforeEach + protected void startMockServer() { + + mockServer = new MockWebServer(); + try { + mockServer.start(); + } catch (IOException e) { + throw new RuntimeException(e); + } + + baseURL = mockServer.url("/").url().toString(); + } + + @AfterEach + protected void shutdownMockServer() throws IOException { + mockServer.shutdown(); + } + + @Nonnull + protected MockResponse createResponse(final int responseCode) { + + return new MockResponse() + .setResponseCode(responseCode) + .setHeader("Content-Type", "text/csv; charset=utf-8") + .setHeader("Date", "Tue, 26 Jun 2018 13:15:01 GMT"); + } +} diff --git a/src/test/java/com/influxdb/v3/client/ITQueryWrite.java b/src/test/java/com/influxdb/v3/client/ITQueryWrite.java new file mode 100644 index 0000000..f27fd76 --- /dev/null +++ b/src/test/java/com/influxdb/v3/client/ITQueryWrite.java @@ -0,0 +1,108 @@ +/* + * The MIT License + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + * THE SOFTWARE. + */ +package com.influxdb.v3.client; + +import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import org.apache.arrow.vector.VectorSchemaRoot; +import org.assertj.core.api.Assertions; +import org.jetbrains.annotations.NotNull; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.condition.EnabledIfEnvironmentVariable; + +import com.influxdb.v3.client.query.QueryParameters; + +class ITQueryWrite { + + private InfluxDBClient client; + + @AfterEach + void closeClient() throws Exception { + if (client != null) { + client.close(); + } + } + + @EnabledIfEnvironmentVariable(named = "TESTING_INFLUXDB_URL", matches = ".*") + @EnabledIfEnvironmentVariable(named = "TESTING_INFLUXDB_TOKEN", matches = ".*") + @EnabledIfEnvironmentVariable(named = "TESTING_INFLUXDB_DATABASE", matches = ".*") + @Test + void queryWrite() { + client = getInstance(); + + String measurement = "integration_test"; + int testId = (int) System.currentTimeMillis(); + client.writeRecord(measurement + ",type=used value=123.0,testId=" + testId); + + String sql = String.format("SELECT value FROM %s WHERE \"testId\"=%d", measurement, testId); + try (Stream stream = client.query(sql)) { + + stream.forEach(row -> { + + Assertions.assertThat(row).hasSize(1); + Assertions.assertThat(row[0]).isEqualTo(123.0); + }); + } + + try (Stream stream = client.query(sql)) { + + List rows = stream.collect(Collectors.toList()); + + Assertions.assertThat(rows).hasSize(1); + } + + String influxQL = String.format("SELECT MEAN(value) FROM %s WHERE \"testId\"=%d " + + "group by time(1s) fill(none) order by time desc limit 1", measurement, testId); + try (Stream stream = client.query(influxQL, QueryParameters.INFLUX_QL)) { + + List rows = stream.collect(Collectors.toList()); + + Assertions.assertThat(rows).hasSize(1); + } + } + + @EnabledIfEnvironmentVariable(named = "TESTING_INFLUXDB_URL", matches = ".*") + @EnabledIfEnvironmentVariable(named = "TESTING_INFLUXDB_TOKEN", matches = ".*") + @EnabledIfEnvironmentVariable(named = "TESTING_INFLUXDB_DATABASE", matches = ".*") + @Test + void queryBatches() { + client = getInstance(); + + try (Stream batches = client.queryBatches("SELECT * FROM integration_test")) { + + List batchesAsList = batches.collect(Collectors.toList()); + + Assertions.assertThat(batchesAsList.size()).isGreaterThanOrEqualTo(1); + } + } + + @NotNull + private static InfluxDBClient getInstance() { + return InfluxDBClient.getInstance( + System.getenv("TESTING_INFLUXDB_URL"), + System.getenv("TESTING_INFLUXDB_TOKEN").toCharArray(), + System.getenv("TESTING_INFLUXDB_DATABASE")); + } +} diff --git a/src/test/java/com/influxdb/v3/client/InfluxDBClientTest.java b/src/test/java/com/influxdb/v3/client/InfluxDBClientTest.java new file mode 100644 index 0000000..f5d3f99 --- /dev/null +++ b/src/test/java/com/influxdb/v3/client/InfluxDBClientTest.java @@ -0,0 +1,47 @@ +/* + * The MIT License + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + * THE SOFTWARE. + */ +package com.influxdb.v3.client; + +import org.assertj.core.api.Assertions; +import org.junit.jupiter.api.Test; + +class InfluxDBClientTest { + + @Test + void requiredHostUrl() { + + //noinspection DataFlowIssue + Assertions.assertThatThrownBy(() -> InfluxDBClient.getInstance(null, "my-token".toCharArray(), "my-database")) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("The hostname or IP address of the InfluxDB server has to be defined."); + } + + @Test + public void autoCloseable() throws Exception { + + try (InfluxDBClient client = InfluxDBClient.getInstance("http://localhost:8086", + "my-token".toCharArray(), "my-database")) { + + Assertions.assertThat(client).isNotNull(); + } + } +} diff --git a/src/test/java/com/influxdb/v3/client/InfluxDBClientWriteTest.java b/src/test/java/com/influxdb/v3/client/InfluxDBClientWriteTest.java new file mode 100644 index 0000000..33eb9b6 --- /dev/null +++ b/src/test/java/com/influxdb/v3/client/InfluxDBClientWriteTest.java @@ -0,0 +1,217 @@ +/* + * The MIT License + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + * THE SOFTWARE. + */ +package com.influxdb.v3.client; + +import java.util.Arrays; +import java.util.Collections; + +import okhttp3.mockwebserver.RecordedRequest; +import org.assertj.core.api.Assertions; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import com.influxdb.v3.client.write.Point; +import com.influxdb.v3.client.write.WriteParameters; +import com.influxdb.v3.client.write.WritePrecision; + +class InfluxDBClientWriteTest extends AbstractMockServerTest { + + private InfluxDBClient client; + + @BeforeEach + void initClient() { + client = InfluxDBClient.getInstance(baseURL, "my-token".toCharArray(), "my-database"); + } + + @AfterEach + void closeClient() throws Exception { + if (client != null) { + client.close(); + } + } + + @Test + void writeToClosedClient() throws Exception { + + client.close(); + Assertions.assertThatThrownBy(() -> client.writeRecord("mem,tag=one value=1.0")) + .isInstanceOf(IllegalStateException.class) + .hasMessage("InfluxDBClient has been closed."); + } + + @Test + void writeEmptyBatch() { + mockServer.enqueue(createResponse(200)); + + client.writeRecords(Collections.singletonList(null)); + + Assertions.assertThat(mockServer.getRequestCount()).isEqualTo(0); + } + + @Test + void databaseParameter() throws InterruptedException { + mockServer.enqueue(createResponse(200)); + + client.writeRecord("mem,tag=one value=1.0"); + + Assertions.assertThat(mockServer.getRequestCount()).isEqualTo(1); + RecordedRequest request = mockServer.takeRequest(); + Assertions.assertThat(request).isNotNull(); + Assertions.assertThat(request.getRequestUrl()).isNotNull(); + Assertions.assertThat(request.getRequestUrl().queryParameter("bucket")).isEqualTo("my-database"); + } + + @Test + void databaseParameterSpecified() throws InterruptedException { + mockServer.enqueue(createResponse(200)); + + client.writeRecord("mem,tag=one value=1.0", new WriteParameters("my-database-2", null, null)); + + Assertions.assertThat(mockServer.getRequestCount()).isEqualTo(1); + RecordedRequest request = mockServer.takeRequest(); + Assertions.assertThat(request).isNotNull(); + Assertions.assertThat(request.getRequestUrl()).isNotNull(); + Assertions.assertThat(request.getRequestUrl().queryParameter("bucket")).isEqualTo("my-database-2"); + } + + @Test + void databaseParameterRequired() throws Exception { + client.close(); + client = InfluxDBClient.getInstance(baseURL, null, null); + mockServer.enqueue(createResponse(200)); + + Assertions.assertThatThrownBy(() -> client.writeRecord("mem,tag=one value=1.0")) + .isInstanceOf(IllegalStateException.class) + .hasMessage("Please specify the 'Database' as a method parameter or use " + + "default configuration at 'InfluxDBClientConfigs.database'."); + + Assertions.assertThat(mockServer.getRequestCount()).isEqualTo(0); + } + + + @Test + void precisionParameter() throws InterruptedException { + mockServer.enqueue(createResponse(200)); + + client.writeRecord("mem,tag=one value=1.0"); + + Assertions.assertThat(mockServer.getRequestCount()).isEqualTo(1); + RecordedRequest request = mockServer.takeRequest(); + Assertions.assertThat(request).isNotNull(); + Assertions.assertThat(request.getRequestUrl()).isNotNull(); + Assertions.assertThat(request.getRequestUrl().queryParameter("precision")).isEqualTo("ns"); + } + + @Test + void precisionParameterSpecified() throws InterruptedException { + mockServer.enqueue(createResponse(200)); + + client.writeRecord("mem,tag=one value=1.0", new WriteParameters(null, null, WritePrecision.S)); + + Assertions.assertThat(mockServer.getRequestCount()).isEqualTo(1); + RecordedRequest request = mockServer.takeRequest(); + Assertions.assertThat(request).isNotNull(); + Assertions.assertThat(request.getRequestUrl()).isNotNull(); + Assertions.assertThat(request.getRequestUrl().queryParameter("precision")).isEqualTo("s"); + } + + @Test + void orgParameterSpecified() throws InterruptedException { + mockServer.enqueue(createResponse(200)); + + client.writePoint( + Point + .measurement("h2o") + .addTag("location", "europe") + .addField("level", 2), + new WriteParameters(null, "my-org", null) + ); + + Assertions.assertThat(mockServer.getRequestCount()).isEqualTo(1); + RecordedRequest request = mockServer.takeRequest(); + Assertions.assertThat(request).isNotNull(); + Assertions.assertThat(request.getRequestUrl()).isNotNull(); + Assertions.assertThat(request.getRequestUrl().queryParameter("org")).isEqualTo("my-org"); + } + + @Test + void orgParameterNotSpecified() throws InterruptedException { + mockServer.enqueue(createResponse(200)); + + client.writePoint(Point + .measurement("h2o") + .addTag("location", "europe") + .addField("level", 2) + ); + + Assertions.assertThat(mockServer.getRequestCount()).isEqualTo(1); + RecordedRequest request = mockServer.takeRequest(); + Assertions.assertThat(request).isNotNull(); + Assertions.assertThat(request.getRequestUrl()).isNotNull(); + Assertions.assertThat(request.getRequestUrl().queryParameter("org")).isNull(); + } + + @Test + void contentType() throws InterruptedException { + mockServer.enqueue(createResponse(200)); + + client.writeRecord("mem,tag=one value=1.0"); + + Assertions.assertThat(mockServer.getRequestCount()).isEqualTo(1); + RecordedRequest request = mockServer.takeRequest(); + Assertions.assertThat(request).isNotNull(); + Assertions.assertThat(request.getHeader("Content-Type")).isEqualTo("text/plain; charset=utf-8"); + } + + @Test + void body() throws InterruptedException { + mockServer.enqueue(createResponse(200)); + + client.writeRecord("mem,tag=one value=1.0"); + + Assertions.assertThat(mockServer.getRequestCount()).isEqualTo(1); + RecordedRequest request = mockServer.takeRequest(); + Assertions.assertThat(request).isNotNull(); + Assertions.assertThat(request.getBody().readUtf8()).isEqualTo("mem,tag=one value=1.0"); + } + + @Test + void bodyConcat() throws InterruptedException { + mockServer.enqueue(createResponse(200)); + + Point point1 = Point.measurement("mem") + .addTag("tag", "one") + .addField("value", 1.0); + + Point point2 = Point.measurement("cpu") + .addTag("tag", "two") + .addField("value", 2.0); + + client.writePoints(Arrays.asList(point1, point2)); + + Assertions.assertThat(mockServer.getRequestCount()).isEqualTo(1); + RecordedRequest request = mockServer.takeRequest(); + Assertions.assertThat(request).isNotNull(); + Assertions.assertThat(request.getBody().readUtf8()).isEqualTo("mem,tag=one value=1.0\ncpu,tag=two value=2.0"); + } +} diff --git a/src/test/java/com/influxdb/v3/client/config/InfluxDBClientConfigsTest.java b/src/test/java/com/influxdb/v3/client/config/InfluxDBClientConfigsTest.java new file mode 100644 index 0000000..8dd7c78 --- /dev/null +++ b/src/test/java/com/influxdb/v3/client/config/InfluxDBClientConfigsTest.java @@ -0,0 +1,58 @@ +/* + * The MIT License + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + * THE SOFTWARE. + */ +package com.influxdb.v3.client.config; + +import java.time.Duration; + +import org.assertj.core.api.Assertions; +import org.junit.jupiter.api.Test; + +import com.influxdb.v3.client.write.WritePrecision; + +class InfluxDBClientConfigsTest { + + private final InfluxDBClientConfigs.Builder configsBuilder = new InfluxDBClientConfigs.Builder() + .hostUrl("http://localhost:9999") + .authToken("my-token".toCharArray()) + .organization("my-org") + .database("my-db") + .writePrecision(WritePrecision.NS) + .responseTimeout(Duration.ofSeconds(30)) + .allowHttpRedirects(true) + .disableServerCertificateValidation(true); + + @Test + void equal() { + InfluxDBClientConfigs configs = configsBuilder.build(); + + Assertions.assertThat(configs).isEqualTo(configsBuilder.build()); + Assertions.assertThat(configs).isNotEqualTo(configsBuilder.database("database").build()); + } + + @Test + void hash() { + InfluxDBClientConfigs configs = configsBuilder.build(); + + Assertions.assertThat(configs.hashCode()).isEqualTo(configsBuilder.build().hashCode()); + Assertions.assertThat(configs.hashCode()).isNotEqualTo(configsBuilder.database("database").build().hashCode()); + } +} diff --git a/src/test/java/com/influxdb/v3/client/internal/ArgumentsDurationTest.java b/src/test/java/com/influxdb/v3/client/internal/ArgumentsDurationTest.java new file mode 100644 index 0000000..cec704b --- /dev/null +++ b/src/test/java/com/influxdb/v3/client/internal/ArgumentsDurationTest.java @@ -0,0 +1,85 @@ +/* + * The MIT License + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + * THE SOFTWARE. + */ +package com.influxdb.v3.client.internal; + +import org.assertj.core.api.Assertions; +import org.junit.jupiter.api.Test; + +/** + * @author Jakub Bednar (bednar@github) (20/08/2018 12:31) + */ +@SuppressWarnings("ConstantConditions") +class ArgumentsDurationTest { + + @Test + void literals() { + + Arguments.checkDuration("1s", "duration"); + Arguments.checkDuration("10d", "duration"); + Arguments.checkDuration("1h15m", "duration"); + Arguments.checkDuration("5w", "duration"); + Arguments.checkDuration("1mo5d", "duration"); + Arguments.checkDuration("-1mo5d", "duration"); + Arguments.checkDuration("inf", "duration"); + Arguments.checkDuration("-inf", "duration"); + } + + @Test + void literalNull() { + + Assertions.assertThatThrownBy(() -> Arguments.checkDuration(null, "duration")) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("Expecting a duration string for duration. But got: null"); + } + + @Test + void literalEmpty() { + + Assertions.assertThatThrownBy(() -> Arguments.checkDuration("", "duration")) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("Expecting a duration string for duration. But got: "); + } + + @Test + void literalNotDuration() { + + Assertions.assertThatThrownBy(() -> Arguments.checkDuration("x", "duration")) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("Expecting a duration string for duration. But got: x"); + } + + @Test + void notRequiredValid() { + + Arguments.checkDurationNotRequired(null, "duration"); + Arguments.checkDurationNotRequired("", "duration"); + Arguments.checkDurationNotRequired("1s", "duration"); + } + + @Test + void notRequiredNotValid() { + + Assertions.assertThatThrownBy(() -> Arguments.checkDurationNotRequired("x", "duration")) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("Expecting a duration string for duration. But got: x"); + } +} diff --git a/src/test/java/com/influxdb/v3/client/internal/ArgumentsTest.java b/src/test/java/com/influxdb/v3/client/internal/ArgumentsTest.java new file mode 100644 index 0000000..fbb812a --- /dev/null +++ b/src/test/java/com/influxdb/v3/client/internal/ArgumentsTest.java @@ -0,0 +1,164 @@ +/* + * The MIT License + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + * THE SOFTWARE. + */ +package com.influxdb.v3.client.internal; + +import java.time.temporal.ChronoUnit; + +import org.assertj.core.api.Assertions; +import org.junit.jupiter.api.Test; + +/** + * @author Jakub Bednar (bednar@github) (01/08/2018 15:29) + */ +@SuppressWarnings("ConstantConditions") +class ArgumentsTest { + + @Test + void checkNonEmptyString() { + + Arguments.checkNonEmpty("valid", "property"); + } + + @Test + void checkNonEmptyStringEmpty() { + + Assertions.assertThatThrownBy(() -> Arguments.checkNonEmpty("", "property")) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("Expecting a non-empty string for property"); + } + + @Test + void checkNonEmptyStringNull() { + + Assertions.assertThatThrownBy(() -> Arguments.checkNonEmpty(null, "property")) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("Expecting a non-empty string for property"); + } + + @Test + void checkPositiveNumber() { + + Arguments.checkPositiveNumber(10, "property"); + } + + @Test + void checkPositiveNumberNull() { + + Assertions.assertThatThrownBy(() -> Arguments.checkPositiveNumber(null, "property")) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("Expecting a positive number for property"); + } + + @Test + void checkPositiveNumberZero() { + + Assertions.assertThatThrownBy(() -> Arguments.checkPositiveNumber(0, "property")) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("Expecting a positive number for property"); + } + + @Test + void checkPositiveNumberZeroNegative() { + + Assertions.assertThatThrownBy(() -> Arguments.checkPositiveNumber(-12L, "property")) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("Expecting a positive number for property"); + } + + @Test + void checkNotNegativeNumber() { + + Arguments.checkNotNegativeNumber(0, "valid"); + } + + @Test + void checkNotNegativeNumberNull() { + + Assertions.assertThatThrownBy(() -> Arguments.checkNotNegativeNumber(null, "property")) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("Expecting a positive or zero number for property"); + } + + @Test + void checkNotNegativeNumberNegative() { + + Assertions.assertThatThrownBy(() -> Arguments.checkNotNegativeNumber(-12L, "property")) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("Expecting a positive or zero number for property"); + } + + @Test + void checkOneCharString() { + + Arguments.checkOneCharString("#", "valid"); + } + + @Test + void checkOneCharStringEmpty() { + + Assertions.assertThatThrownBy(() -> Arguments.checkOneCharString("", "property")) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("Expecting a one char string for property"); + } + + @Test + void checkOneCharStringNull() { + + Assertions.assertThatThrownBy(() -> Arguments.checkOneCharString(null, "property")) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("Expecting a one char string for property"); + } + + @Test + void checkOneCharStringLarge() { + + Assertions.assertThatThrownBy(() -> Arguments.checkOneCharString("##", "property")) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("Expecting a one char string for property"); + } + + @Test + void checkNotNull() { + Arguments.checkNotNull("value", "property"); + } + + @Test + void checkNotNullFail() { + Assertions.assertThatThrownBy(() -> Arguments.checkNotNull(null, "property")) + .isInstanceOf(NullPointerException.class) + .hasMessage("Expecting a not null reference for property"); + } + + @Test + void checkPrecision() { + + Arguments.checkPrecision(ChronoUnit.SECONDS); + } + + @Test + void checkPrecisionNotSupported() { + + Assertions.assertThatThrownBy(() -> Arguments.checkPrecision(ChronoUnit.DAYS)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("Precision must be one of: [Nanos, Micros, Millis, Seconds]"); + } +} diff --git a/src/test/java/com/influxdb/v3/client/internal/RestClientTest.java b/src/test/java/com/influxdb/v3/client/internal/RestClientTest.java new file mode 100644 index 0000000..73ab7c7 --- /dev/null +++ b/src/test/java/com/influxdb/v3/client/internal/RestClientTest.java @@ -0,0 +1,222 @@ +/* + * The MIT License + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + * THE SOFTWARE. + */ +package com.influxdb.v3.client.internal; + +import java.net.http.HttpClient; +import java.time.Duration; +import java.util.Optional; + +import io.netty.handler.codec.http.HttpMethod; +import okhttp3.mockwebserver.RecordedRequest; +import org.assertj.core.api.Assertions; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Test; + +import com.influxdb.v3.client.AbstractMockServerTest; +import com.influxdb.v3.client.InfluxDBApiException; +import com.influxdb.v3.client.config.InfluxDBClientConfigs; + +public class RestClientTest extends AbstractMockServerTest { + + private RestClient restClient; + + @AfterEach + void tearDown() { + if (restClient != null) { + restClient.close(); + } + } + + @Test + public void baseUrl() { + restClient = new RestClient(new InfluxDBClientConfigs.Builder().hostUrl("http://localhost:8086").build()); + Assertions + .assertThat(restClient.baseUrl) + .isEqualTo("http://localhost:8086/"); + } + + @Test + public void baseUrlSlashEnd() { + restClient = new RestClient(new InfluxDBClientConfigs.Builder().hostUrl("http://localhost:8086/").build()); + Assertions + .assertThat(restClient.baseUrl) + .isEqualTo("http://localhost:8086/"); + } + + @Test + public void responseTimeout() { + restClient = new RestClient(new InfluxDBClientConfigs.Builder() + .hostUrl("http://localhost:8086") + .responseTimeout(Duration.ofSeconds(13)) + .build()); + + Optional connectTimeout = restClient.client.connectTimeout(); + + Assertions.assertThat(connectTimeout).isPresent(); + Assertions.assertThat(connectTimeout.get()).isEqualTo(Duration.ofSeconds(13)); + } + + @Test + public void allowHttpRedirectsDefaults() { + restClient = new RestClient(new InfluxDBClientConfigs.Builder() + .hostUrl("http://localhost:8086") + .build()); + + HttpClient.Redirect redirect = restClient.client.followRedirects(); + Assertions.assertThat(redirect).isEqualTo(HttpClient.Redirect.NEVER); + } + + @Test + public void authenticationHeader() throws InterruptedException { + mockServer.enqueue(createResponse(200)); + + restClient = new RestClient(new InfluxDBClientConfigs.Builder() + .hostUrl(baseURL) + .authToken("my-token".toCharArray()) + .build()); + + restClient.request("ping", HttpMethod.GET, null, null, null); + + RecordedRequest recordedRequest = mockServer.takeRequest(); + + String authorization = recordedRequest.getHeader("Authorization"); + Assertions.assertThat(authorization).isEqualTo("Token my-token"); + } + + @Test + public void authenticationHeaderNotDefined() throws InterruptedException { + mockServer.enqueue(createResponse(200)); + + restClient = new RestClient(new InfluxDBClientConfigs.Builder() + .hostUrl(baseURL) + .build()); + + restClient.request("ping", HttpMethod.GET, null, null, null); + + RecordedRequest recordedRequest = mockServer.takeRequest(); + + String authorization = recordedRequest.getHeader("Authorization"); + Assertions.assertThat(authorization).isNull(); + } + + @Test + public void userAgent() throws InterruptedException { + mockServer.enqueue(createResponse(200)); + + restClient = new RestClient(new InfluxDBClientConfigs.Builder() + .hostUrl(baseURL) + .build()); + + restClient.request("ping", HttpMethod.GET, null, null, null); + + RecordedRequest recordedRequest = mockServer.takeRequest(); + + String userAgent = recordedRequest.getHeader("User-Agent"); + Assertions.assertThat(userAgent).startsWith("influxdb3-java/"); + } + + @Test + public void uri() throws InterruptedException { + mockServer.enqueue(createResponse(200)); + + restClient = new RestClient(new InfluxDBClientConfigs.Builder() + .hostUrl(baseURL) + .build()); + + restClient.request("ping", HttpMethod.GET, null, null, null); + + RecordedRequest recordedRequest = mockServer.takeRequest(); + + Assertions.assertThat(recordedRequest.getRequestUrl()).isNotNull(); + Assertions.assertThat(recordedRequest.getRequestUrl().toString()).isEqualTo(baseURL + "ping"); + } + + @Test + public void allowHttpRedirects() { + restClient = new RestClient(new InfluxDBClientConfigs.Builder() + .hostUrl("http://localhost:8086") + .allowHttpRedirects(true) + .build()); + + HttpClient.Redirect redirect = restClient.client.followRedirects(); + Assertions.assertThat(redirect).isEqualTo(HttpClient.Redirect.NORMAL); + } + + @Test + public void error() { + mockServer.enqueue(createResponse(404)); + + restClient = new RestClient(new InfluxDBClientConfigs.Builder() + .hostUrl(baseURL) + .build()); + + Assertions.assertThatThrownBy( + () -> restClient.request("ping", HttpMethod.GET, null, null, null)) + .isInstanceOf(InfluxDBApiException.class) + .hasMessage("HTTP status code: 404; Message: Not Found"); + } + + @Test + public void errorFromHeader() { + mockServer.enqueue(createResponse(500).setHeader("X-Influx-Error", "not used")); + + restClient = new RestClient(new InfluxDBClientConfigs.Builder() + .hostUrl(baseURL) + .build()); + + Assertions.assertThatThrownBy( + () -> restClient.request("ping", HttpMethod.GET, null, null, null)) + .isInstanceOf(InfluxDBApiException.class) + .hasMessage("HTTP status code: 500; Message: not used"); + } + + @Test + public void errorFromBody() { + mockServer.enqueue(createResponse(401) + .setHeader("X-Influx-Error", "not used") + .setBody("{\"message\":\"token does not have sufficient permissions\"}")); + + restClient = new RestClient(new InfluxDBClientConfigs.Builder() + .hostUrl(baseURL) + .build()); + + Assertions.assertThatThrownBy( + () -> restClient.request("ping", HttpMethod.GET, null, null, null)) + .isInstanceOf(InfluxDBApiException.class) + .hasMessage("HTTP status code: 401; Message: token does not have sufficient permissions"); + } + + @Test + public void errorFromBodyText() { + mockServer.enqueue(createResponse(402) + .setBody("token is over the limit")); + + restClient = new RestClient(new InfluxDBClientConfigs.Builder() + .hostUrl(baseURL) + .build()); + + Assertions.assertThatThrownBy( + () -> restClient.request("ping", HttpMethod.GET, null, null, null)) + .isInstanceOf(InfluxDBApiException.class) + .hasMessage("HTTP status code: 402; Message: token is over the limit"); + } +} diff --git a/src/test/java/com/influxdb/v3/client/write/PointTest.java b/src/test/java/com/influxdb/v3/client/write/PointTest.java new file mode 100644 index 0000000..9adf88e --- /dev/null +++ b/src/test/java/com/influxdb/v3/client/write/PointTest.java @@ -0,0 +1,375 @@ +/* + * The MIT License + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + * THE SOFTWARE. + */ +package com.influxdb.v3.client.write; + +import java.math.BigDecimal; +import java.math.BigInteger; +import java.time.Instant; +import java.util.HashMap; + +import org.assertj.core.api.Assertions; +import org.junit.jupiter.api.Test; + +/** + * @author Jakub Bednar (bednar@github) (11/10/2018 12:57) + */ +class PointTest { + + @Test + void measurementEscape() { + + Point point = Point.measurement("h2 o") + .addTag("location", "europe") + .addTag("", "warn") + .addField("level", 2); + + Assertions.assertThat(point.toLineProtocol()).isEqualTo("h2\\ o,location=europe level=2i"); + + point = Point.measurement("h2=o") + .addTag("location", "europe") + .addTag("", "warn") + .addField("level", 2); + + Assertions.assertThat(point.toLineProtocol()).isEqualTo("h2=o,location=europe level=2i"); + + point = Point.measurement("h2,o") + .addTag("location", "europe") + .addTag("", "warn") + .addField("level", 2); + + Assertions.assertThat(point.toLineProtocol()).isEqualTo("h2\\,o,location=europe level=2i"); + } + + @Test + public void createByConstructor() { + Point point = new Point("h2o") + .addTag("location", "europe") + .addField("level", 2); + + Assertions.assertThat(point.toLineProtocol()).isEqualTo("h2o,location=europe level=2i"); + } + + @SuppressWarnings("ConstantConditions") + @Test + public void createByConstructorMeasurementRequired() { + Assertions.assertThatThrownBy(() -> new Point(null)) + .isInstanceOf(NullPointerException.class) + .hasMessage("Expecting a not null reference for measurement"); + } + + @Test + void tagEmptyKey() { + + Point point = Point.measurement("h2o") + .addTag("location", "europe") + .addTag("", "warn") + .addField("level", 2); + + Assertions.assertThat(point.toLineProtocol()).isEqualTo("h2o,location=europe level=2i"); + } + + @Test + void tagEmptyValue() { + + Point point = Point.measurement("h2o") + .addTag("location", "europe") + .addTag("log", "") + .addField("level", 2); + + Assertions.assertThat(point.toLineProtocol()).isEqualTo("h2o,location=europe level=2i"); + } + + @Test + void tagNullValue() { + + Point point = Point.measurement("h2o") + .addTag("location", "europe") + .addTag("log", null) + .addField("level", 2); + + Assertions.assertThat(point.toLineProtocol()).isEqualTo("h2o,location=europe level=2i"); + } + + @Test + public void tagEscapingKeyAndValue() { + + Point point = Point.measurement("h\n2\ro\t_data") + .addTag("new\nline", "new\nline") + .addTag("carriage\rreturn", "carriage\rreturn") + .addTag("t\tab", "t\tab") + .addField("level", 2); + + Assertions.assertThat(point.toLineProtocol()) + .isEqualTo("h\\n2\\ro\\t_data,carriage\\rreturn=carriage\\rreturn,new\\nline=new\\nline," + + "t\\tab=t\\tab level=2i"); + } + + @Test + public void equalSignEscaping() { + + Point point = Point.measurement("h=2o") + .addTag("l=ocation", "e=urope") + .addField("l=evel", 2); + + Assertions.assertThat(point.toLineProtocol()) + .isEqualTo("h=2o,l\\=ocation=e\\=urope l\\=evel=2i"); + } + + @Test + void fieldTypes() { + + Point point = Point.measurement("h2o").addTag("location", "europe") + .addField("long", 1L) + .addField("double", 2D) + .addField("float", 3F) + .addField("longObject", Long.valueOf("4")) + .addField("doubleObject", Double.valueOf("5")) + .addField("floatObject", Float.valueOf("6")) + .addField("bigDecimal", new BigDecimal("33.45")) + .addField("integer", 7) + .addField("integerObject", Integer.valueOf("8")) + .addField("boolean", false) + .addField("booleanObject", Boolean.TRUE) + .addField("string", "string value"); + + String expected = "h2o,location=europe bigDecimal=33.45,boolean=false,booleanObject=true,double=2.0," + + "doubleObject=5.0,float=3.0,floatObject=6.0,integer=7i,integerObject=8i,long=1i,longObject=4i," + + "string=\"string value\""; + Assertions.assertThat(point.toLineProtocol()).isEqualTo(expected); + } + + @Test + void fieldNullValue() { + + Point point = Point.measurement("h2o").addTag("location", "europe").addField("level", 2) + .addField("warning", (String) null); + + Assertions.assertThat(point.toLineProtocol()).isEqualTo("h2o,location=europe level=2i"); + } + + @Test + void fieldEscape() { + + Point point = Point.measurement("h2o") + .addTag("location", "europe") + .addField("level", "string esc\\ape value"); + + Assertions.assertThat(point.toLineProtocol()).isEqualTo("h2o,location=europe " + + "level=\"string esc\\\\ape value\""); + + point = Point.measurement("h2o") + .addTag("location", "europe") + .addField("level", "string esc\"ape value"); + + Assertions.assertThat(point.toLineProtocol()).isEqualTo("h2o,location=europe " + + "level=\"string esc\\\"ape value\""); + } + + @Test + void time() { + + Point point = Point.measurement("h2o") + .addTag("location", "europe") + .addField("level", 2) + .setTimestamp(123L, WritePrecision.S); + + Assertions.assertThat(point.toLineProtocol(WritePrecision.S)).isEqualTo("h2o,location=europe level=2i 123"); + } + + @Test + void timeBigInteger() { + + Point point = Point.measurement("h2o") + .addTag("location", "europe") + .addField("level", 2) + .setTimestamp(new BigInteger("123"), WritePrecision.S); + + Assertions.assertThat(point.toLineProtocol(WritePrecision.S)).isEqualTo("h2o,location=europe level=2i 123"); + + // Friday, June 22, 3353 + point = Point.measurement("h2o") + .addTag("location", "europe") + .addField("level", 2) + .setTimestamp(new BigInteger("43658216763800123456"), WritePrecision.NS); + + Assertions.assertThat(point.toLineProtocol()).isEqualTo("h2o,location=europe level=2i 43658216763800123456"); + } + + @Test + void timeBigDecimal() { + + Point point = Point.measurement("h2o") + .addTag("location", "europe") + .addField("level", 2) + .setTimestamp(new BigDecimal("123"), WritePrecision.S); + + Assertions.assertThat(point.toLineProtocol(WritePrecision.S)).isEqualTo("h2o,location=europe level=2i 123"); + + point = Point.measurement("h2o") + .addTag("location", "europe") + .addField("level", 2) + .setTimestamp(new BigDecimal("1.23E+02"), WritePrecision.NS); + + Assertions.assertThat(point.toLineProtocol()).isEqualTo("h2o,location=europe level=2i 123"); + + // Friday, June 22, 3353 + point = Point.measurement("h2o") + .addTag("location", "europe") + .addField("level", 2) + .setTimestamp(new BigDecimal("43658216763800123456"), WritePrecision.NS); + + Assertions.assertThat(point.toLineProtocol()).isEqualTo("h2o,location=europe level=2i 43658216763800123456"); + } + + @Test + void timeFloat() { + + Point point = Point.measurement("h2o") + .addTag("location", "europe") + .addField("level", 2) + .setTimestamp(Float.valueOf("123"), WritePrecision.S); + + Assertions.assertThat(point.toLineProtocol(WritePrecision.S)).isEqualTo("h2o,location=europe level=2i 123"); + + point = Point.measurement("h2o") + .addTag("location", "europe") + .addField("level", 2) + .setTimestamp(Float.valueOf("1.23"), WritePrecision.NS); + + Assertions.assertThat(point.toLineProtocol()).isEqualTo("h2o,location=europe level=2i 1"); + } + + @Test + void timeInstantOver2262() { + + Instant time = Instant.parse("3353-06-22T10:26:03.800123456Z"); + + Point point = Point.measurement("h2o") + .addTag("location", "europe") + .addField("level", 2) + .setTimestamp(time); + + Assertions.assertThat(point.toLineProtocol()).isEqualTo("h2o,location=europe level=2i 43658216763800123456"); + } + + @Test + void timeInstantNull() { + + Point point = Point.measurement("h2o") + .addTag("location", "europe") + .addField("level", 2) + .setTimestamp(null); + + Assertions.assertThat(point.toLineProtocol()).isEqualTo("h2o,location=europe level=2i"); + } + + @Test + void timeGetTime() { + + Instant time = Instant.parse("2022-06-12T10:26:03.800123456Z"); + + Point point = Point.measurement("h2o") + .addTag("location", "europe") + .addField("level", 2) + .setTimestamp(time); + + Assertions.assertThat(point.toLineProtocol()).isEqualTo("h2o,location=europe level=2i 1655029563800123456"); + } + + @Test + public void infinityValues() { + Point point = Point.measurement("h2o") + .addTag("location", "europe") + .addField("double-infinity-positive", Double.POSITIVE_INFINITY) + .addField("double-infinity-negative", Double.NEGATIVE_INFINITY) + .addField("double-nan", Double.NaN) + .addField("flout-infinity-positive", Float.POSITIVE_INFINITY) + .addField("flout-infinity-negative", Float.NEGATIVE_INFINITY) + .addField("flout-nan", Float.NaN) + .addField("level", 2); + + Assertions.assertThat(point.toLineProtocol()).isEqualTo("h2o,location=europe level=2i"); + } + + @Test + public void onlyInfinityValues() { + Point point = Point.measurement("h2o") + .addTag("location", "europe") + .addField("double-infinity-positive", Double.POSITIVE_INFINITY) + .addField("double-infinity-negative", Double.NEGATIVE_INFINITY) + .addField("double-nan", Double.NaN) + .addField("flout-infinity-positive", Float.POSITIVE_INFINITY) + .addField("flout-infinity-negative", Float.NEGATIVE_INFINITY) + .addField("flout-nan", Float.NaN); + + Assertions.assertThat(point.toLineProtocol()).isEqualTo(""); + } + + @Test + void hasFields() { + + Assertions.assertThat(Point.measurement("h2o").hasFields()).isFalse(); + Assertions.assertThat(Point.measurement("h2o").addTag("location", "europe").hasFields()).isFalse(); + Assertions.assertThat(Point.measurement("h2o").addField("level", 2).hasFields()).isTrue(); + Assertions.assertThat( + Point + .measurement("h2o") + .addTag("location", "europe") + .addField("level", 3) + .hasFields()) + .isTrue(); + } + + @Test + void addTags() { + + HashMap tags = new HashMap<>(); + tags.put("type", "production"); + tags.put("location", "europe"); + tags.put("expensive", ""); + + Point point = Point.measurement("h2o") + .addField("level", 2) + .addTags(tags); + + Assertions.assertThat(point.toLineProtocol()).isEqualTo("h2o,location=europe,type=production level=2i"); + } + + @Test + void addFields() { + + HashMap fields = new HashMap<>(); + fields.put("level", 2); + fields.put("accepted", true); + fields.put("power", 2.56); + fields.put("clean", null); + + Point point = Point + .measurement("h2o") + .addTag("location", "europe") + .addFields(fields); + + Assertions + .assertThat(point.toLineProtocol()) + .isEqualTo("h2o,location=europe accepted=true,level=2i,power=2.56"); + } +} diff --git a/src/test/java/com/influxdb/v3/client/write/WriteParametersTest.java b/src/test/java/com/influxdb/v3/client/write/WriteParametersTest.java new file mode 100644 index 0000000..12842d9 --- /dev/null +++ b/src/test/java/com/influxdb/v3/client/write/WriteParametersTest.java @@ -0,0 +1,72 @@ +/* + * The MIT License + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + * THE SOFTWARE. + */ +package com.influxdb.v3.client.write; + +import org.assertj.core.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import com.influxdb.v3.client.config.InfluxDBClientConfigs; + +class WriteParametersTest { + + private InfluxDBClientConfigs.Builder optionsBuilder; + + @BeforeEach + void before() { + optionsBuilder = new InfluxDBClientConfigs.Builder() + .hostUrl("http://localhost:8086") + .authToken("my-token".toCharArray()); + } + + @Test + void optionParameters() { + InfluxDBClientConfigs options = optionsBuilder + .database("my-database") + .organization("my-org") + .writePrecision(WritePrecision.S) + .build(); + + WriteParameters parameters = new WriteParameters(null, null, null); + + Assertions.assertThat(parameters.databaseSafe(options)).isEqualTo("my-database"); + Assertions.assertThat(parameters.organizationSafe(options)).isEqualTo("my-org"); + Assertions.assertThat(parameters.precisionSafe(options)).isEqualTo(WritePrecision.S); + } + + @Test + void nullableParameters() { + InfluxDBClientConfigs options = optionsBuilder.database("my-database").organization("my-org").build(); + + WriteParameters parameters = new WriteParameters(null, null, null); + + Assertions.assertThat(parameters.precisionSafe(options)).isEqualTo(WritePrecision.NS); + } + + @Test + void npe() { + WriteParameters parameters = new WriteParameters(null, null, null); + + Assertions.assertThat(parameters.hashCode()).isNotNull(); + Assertions.assertThat(parameters).isEqualTo(new WriteParameters(null, null, null)); + } +}