queryBatches(@Nonnull final String query, @Nonnull final QueryParameters parameters);
+
+ /**
+ * Creates a new instance of the {@link InfluxDBClient} for interacting with an InfluxDB server, simplifying
+ * common operations such as writing, querying.
+ *
+ * @param hostUrl the hostname or IP address of the InfluxDB server
+ * @param authToken the authentication token for accessing the InfluxDB server, can be null
+ * @param database the database to be used for InfluxDB operations, can be null
+ * @return new instance of the {@link InfluxDBClient}
+ */
+ @Nonnull
+ static InfluxDBClient getInstance(@Nonnull final String hostUrl,
+ @Nullable final char[] authToken,
+ @Nullable final String database) {
+ InfluxDBClientConfigs configs = new InfluxDBClientConfigs.Builder()
+ .hostUrl(hostUrl)
+ .authToken(authToken)
+ .database(database)
+ .build();
+
+ return getInstance(configs);
+ }
+
+ /**
+ * Creates a new instance of the {@link InfluxDBClient} for interacting with an InfluxDB server, simplifying
+ * common operations such as writing, querying.
+ *
+ * @param configs the configuration for the InfluxDB client
+ * @return new instance of the {@link InfluxDBClient}
+ */
+ @Nonnull
+ static InfluxDBClient getInstance(@Nonnull final InfluxDBClientConfigs configs) {
+ return new InfluxDBClientImpl(configs);
+ }
}
diff --git a/src/main/java/com/influxdb/v3/client/config/InfluxDBClientConfigs.java b/src/main/java/com/influxdb/v3/client/config/InfluxDBClientConfigs.java
new file mode 100644
index 0000000..cff2546
--- /dev/null
+++ b/src/main/java/com/influxdb/v3/client/config/InfluxDBClientConfigs.java
@@ -0,0 +1,319 @@
+/*
+ * The MIT License
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in
+ * all copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+ * THE SOFTWARE.
+ */
+package com.influxdb.v3.client.config;
+
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Objects;
+import java.util.StringJoiner;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import com.influxdb.v3.client.write.WritePrecision;
+
+/**
+ * Configuration properties for an {@code InfluxDBClient}.
+ *
+ * Immutable class.
+ */
+public final class InfluxDBClientConfigs {
+
+ private final String hostUrl;
+ private final char[] authToken;
+ private final String organization;
+ private final String database;
+ private final WritePrecision writePrecision;
+ private final Duration responseTimeout;
+ private final Boolean allowHttpRedirects;
+ private final Boolean disableServerCertificateValidation;
+
+ /**
+ * Gets hostname or IP address of the InfluxDB server.
+ *
+ * @return hostname or IP address of the InfluxDB server
+ */
+ @Nonnull
+ public String getHostUrl() {
+ return hostUrl;
+ }
+
+ /**
+ * Gets authentication token for accessing the InfluxDB server.
+ *
+ * @return authentication token for accessing the InfluxDB server, may be null
+ */
+ @Nullable
+ public char[] getAuthToken() {
+ return authToken;
+ }
+
+ /**
+ * Gets organization to be used for operations.
+ *
+ * @return organization to be used for operations, may be null
+ */
+ @Nullable
+ public String getOrganization() {
+ return organization;
+ }
+
+ /**
+ * Gets database to be used for InfluxDB operations.
+ *
+ * @return database to be used for InfluxDB operations, may be null
+ */
+ @Nullable
+ public String getDatabase() {
+ return database;
+ }
+
+ /**
+ * Gets the default precision to use for the timestamp of points.
+ *
+ * @return the default precision to use for the timestamp of points, may be null
+ */
+ @Nullable
+ public WritePrecision getWritePrecision() {
+ return writePrecision;
+ }
+
+ /**
+ * Gets the default response timeout to use for the API calls. Default to '10 seconds'.
+ *
+ * @return the default response timeout to use for the API calls
+ */
+ @Nonnull
+ public Duration getResponseTimeout() {
+ return responseTimeout;
+ }
+
+ /**
+ * Gets the automatically following HTTP 3xx redirects. Default to 'false'.
+ *
+ * @return the automatically following HTTP 3xx redirects
+ */
+ @Nonnull
+ public Boolean getAllowHttpRedirects() {
+ return allowHttpRedirects;
+ }
+
+ /**
+ * Gets the disable server SSL certificate validation. Default to 'false'.
+ *
+ * @return the disable server SSL certificate validation
+ */
+ public Boolean getDisableServerCertificateValidation() {
+ return disableServerCertificateValidation;
+ }
+
+ /**
+ * Validates the configuration properties.
+ */
+ public void validate() {
+ if (hostUrl == null || hostUrl.isEmpty()) {
+ throw new IllegalArgumentException("The hostname or IP address of the InfluxDB server has to be defined.");
+ }
+ }
+
+ @Override
+ public boolean equals(final Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ InfluxDBClientConfigs that = (InfluxDBClientConfigs) o;
+ return Objects.equals(hostUrl, that.hostUrl)
+ && Arrays.equals(authToken, that.authToken)
+ && Objects.equals(organization, that.organization)
+ && Objects.equals(database, that.database)
+ && writePrecision == that.writePrecision
+ && Objects.equals(responseTimeout, that.responseTimeout)
+ && Objects.equals(allowHttpRedirects, that.allowHttpRedirects)
+ && Objects.equals(disableServerCertificateValidation, that.disableServerCertificateValidation);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(hostUrl, Arrays.hashCode(authToken), organization, database, writePrecision,
+ responseTimeout, allowHttpRedirects, disableServerCertificateValidation);
+ }
+
+ @Override
+ public String toString() {
+ return new StringJoiner(", ", InfluxDBClientConfigs.class.getSimpleName() + "InfluxDBClientConfigs[", "]")
+ .add("hostUrl='" + hostUrl + "'")
+ .add("organization='" + organization + "'")
+ .add("database='" + database + "'")
+ .add("writePrecision=" + writePrecision)
+ .add("responseTimeout=" + responseTimeout)
+ .add("allowHttpRedirects=" + allowHttpRedirects)
+ .add("disableServerCertificateValidation=" + disableServerCertificateValidation)
+ .toString();
+ }
+
+ /**
+ * A builder for {@code InfluxDBClientConfigs}.
+ *
+ * Mutable.
+ */
+ public static final class Builder {
+ private String hostUrl;
+ private char[] authToken;
+ private String organization;
+ private String database;
+ private WritePrecision writePrecision;
+ private Duration responseTimeout;
+ private Boolean allowHttpRedirects;
+ private Boolean disableServerCertificateValidation;
+
+ /**
+ * Sets the hostname or IP address of the InfluxDB server.
+ *
+ * @param hostUrl hostname or IP address of the InfluxDB server
+ * @return this
+ */
+ @Nonnull
+ public Builder hostUrl(@Nonnull final String hostUrl) {
+
+ this.hostUrl = hostUrl;
+ return this;
+ }
+
+ /**
+ * Sets the authentication token for accessing the InfluxDB server.
+ *
+ * @param authToken authentication token for accessing the InfluxDB server
+ * @return this
+ */
+ @Nonnull
+ public Builder authToken(@Nullable final char[] authToken) {
+
+ this.authToken = authToken;
+ return this;
+ }
+
+ /**
+ * Sets organization to be used for operations.
+ *
+ * @param organization organization to be used for operations
+ * @return this
+ */
+ @Nonnull
+ public Builder organization(@Nullable final String organization) {
+
+ this.organization = organization;
+ return this;
+ }
+
+ /**
+ * Sets database to be used for InfluxDB operations.
+ *
+ * @param database database to be used for InfluxDB operations
+ * @return this
+ */
+ @Nonnull
+ public Builder database(@Nullable final String database) {
+
+ this.database = database;
+ return this;
+ }
+
+ /**
+ * Sets the default precision to use for the timestamp of points
+ * if no precision is specified in the write API call.
+ *
+ * @param writePrecision default precision to use for the timestamp of points
+ * if no precision is specified in the write API call
+ * @return this
+ */
+ @Nonnull
+ public Builder writePrecision(@Nullable final WritePrecision writePrecision) {
+
+ this.writePrecision = writePrecision;
+ return this;
+ }
+
+ /**
+ * Sets the default response timeout to use for the API calls. Default to '10 seconds'.
+ *
+ * @param responseTimeout default response timeout to use for the API calls. Default to '10 seconds'.
+ * @return this
+ */
+ @Nonnull
+ public Builder responseTimeout(@Nullable final Duration responseTimeout) {
+
+ this.responseTimeout = responseTimeout;
+ return this;
+ }
+
+ /**
+ * Sets the automatically following HTTP 3xx redirects. Default to 'false'.
+ *
+ * @param allowHttpRedirects automatically following HTTP 3xx redirects. Default to 'false'.
+ * @return this
+ */
+ @Nonnull
+ public Builder allowHttpRedirects(@Nullable final Boolean allowHttpRedirects) {
+
+ this.allowHttpRedirects = allowHttpRedirects;
+ return this;
+ }
+
+ /**
+ * Sets the disable server SSL certificate validation. Default to 'false'.
+ *
+ * @param disableServerCertificateValidation Disable server SSL certificate validation. Default to 'false'.
+ * @return this
+ */
+ @Nonnull
+ public Builder disableServerCertificateValidation(@Nullable final Boolean disableServerCertificateValidation) {
+
+ this.disableServerCertificateValidation = disableServerCertificateValidation;
+ return this;
+ }
+
+ /**
+ * Build an instance of {@code InfluxDBClientConfigs}.
+ *
+ * @return the configuration for an {@code InfluxDBClient}.
+ */
+ @Nonnull
+ public InfluxDBClientConfigs build() {
+ return new InfluxDBClientConfigs(this);
+ }
+ }
+
+ @SuppressWarnings("MagicNumber")
+ private InfluxDBClientConfigs(@Nonnull final Builder builder) {
+ hostUrl = builder.hostUrl;
+ authToken = builder.authToken;
+ organization = builder.organization;
+ database = builder.database;
+ writePrecision = builder.writePrecision;
+ responseTimeout = builder.responseTimeout != null ? builder.responseTimeout : Duration.ofSeconds(10);
+ allowHttpRedirects = builder.allowHttpRedirects != null ? builder.allowHttpRedirects : false;
+ disableServerCertificateValidation = builder.disableServerCertificateValidation != null
+ ? builder.disableServerCertificateValidation : false;
+ }
+}
diff --git a/src/main/java/com/influxdb/v3/client/internal/Arguments.java b/src/main/java/com/influxdb/v3/client/internal/Arguments.java
new file mode 100644
index 0000000..637b465
--- /dev/null
+++ b/src/main/java/com/influxdb/v3/client/internal/Arguments.java
@@ -0,0 +1,165 @@
+/*
+ * The MIT License
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in
+ * all copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+ * THE SOFTWARE.
+ */
+package com.influxdb.v3.client.internal;
+
+import java.time.temporal.ChronoUnit;
+import java.util.EnumSet;
+import java.util.Objects;
+import java.util.regex.Pattern;
+import javax.annotation.Nullable;
+
+/**
+ * Functions for parameter validation.
+ *
+ * Thanks
+ */
+public final class Arguments {
+
+ private static final Pattern DURATION_PATTERN = Pattern.compile("([-+]?)([0-9]+(\\.[0-9]*)?[a-z]+)+|inf|-inf",
+ Pattern.CASE_INSENSITIVE);
+
+ private static final String DURATION_MESSAGE = "Expecting a duration string for %s. But got: %s";
+
+ /**
+ * The precisions that are allowed to use in the writes.
+ */
+ private static final EnumSet ALLOWED_PRECISION = EnumSet.of(ChronoUnit.NANOS,
+ ChronoUnit.MICROS, ChronoUnit.MILLIS, ChronoUnit.SECONDS);
+
+ private Arguments() {
+ }
+
+ /**
+ * Enforces that the string is {@linkplain String#isEmpty() not empty}.
+ *
+ * @param string the string to test
+ * @param name variable name for reporting
+ * @return {@code string}
+ * @throws IllegalArgumentException if the string is empty
+ */
+ public static String checkNonEmpty(final String string, final String name) throws IllegalArgumentException {
+ if (string == null || string.isEmpty()) {
+ throw new IllegalArgumentException("Expecting a non-empty string for " + name);
+ }
+ return string;
+ }
+
+ /**
+ * Enforces that the string has exactly one char.
+ *
+ * @param string the string to test
+ * @param name variable name for reporting
+ * @return {@code string}
+ * @throws IllegalArgumentException if the string has not one char
+ */
+ public static String checkOneCharString(final String string, final String name) throws IllegalArgumentException {
+ if (string == null || string.length() != 1) {
+ throw new IllegalArgumentException("Expecting a one char string for " + name);
+ }
+ return string;
+ }
+
+ /**
+ * Enforces that the string is duration literal.
+ *
+ * @param string the string to test
+ * @param name variable name for reporting
+ * @return {@code string}
+ * @throws IllegalArgumentException if the string is not duration literal
+ */
+ public static String checkDuration(final String string, final String name) throws IllegalArgumentException {
+ if (string == null || string.isEmpty() || !DURATION_PATTERN.matcher(string).matches()) {
+ throw new IllegalArgumentException(String.format(DURATION_MESSAGE, name, string));
+ }
+
+ return string;
+ }
+
+ /**
+ * Enforces that the string is duration literal. Empty or null strings are valid.
+ *
+ * @param string the string to test
+ * @param name variable name for reporting
+ * @return {@code string}
+ * @throws IllegalArgumentException if the string is not duration literal
+ */
+ public static String checkDurationNotRequired(final String string, final String name)
+ throws IllegalArgumentException {
+ if (string != null && !string.isEmpty() && !DURATION_PATTERN.matcher(string).matches()) {
+ throw new IllegalArgumentException(String.format(DURATION_MESSAGE, name, string));
+ }
+
+ return string;
+ }
+
+ /**
+ * Enforces that the number is larger than 0.
+ *
+ * @param number the number to test
+ * @param name variable name for reporting
+ * @throws IllegalArgumentException if the number is less or equal to 0
+ */
+ public static void checkPositiveNumber(final Number number, final String name) throws IllegalArgumentException {
+ if (number == null || number.doubleValue() <= 0) {
+ throw new IllegalArgumentException("Expecting a positive number for " + name);
+ }
+ }
+
+ /**
+ * Enforces that the number is not negative.
+ *
+ * @param number the number to test
+ * @param name variable name for reporting
+ * @throws IllegalArgumentException if the number is less or equal to 0
+ */
+ public static void checkNotNegativeNumber(final Number number, final String name) throws IllegalArgumentException {
+ if (number == null || number.doubleValue() < 0) {
+ throw new IllegalArgumentException("Expecting a positive or zero number for " + name);
+ }
+ }
+
+ /**
+ * Checks that the specified object reference is not {@code null}.
+ *
+ * @param obj the object reference to check for nullity
+ * @param name variable name for reporting
+ * @throws NullPointerException if the object is {@code null}
+ * @see Objects#requireNonNull(Object, String)
+ */
+ public static void checkNotNull(final Object obj, final String name) throws NullPointerException {
+
+ Objects.requireNonNull(obj, () -> "Expecting a not null reference for " + name);
+ }
+
+ /**
+ * Checks that the precision reference to one of {@link Arguments#ALLOWED_PRECISION}.
+ *
+ * @param precision the precision to check
+ * @throws IllegalArgumentException if the object is not one of {@link Arguments#ALLOWED_PRECISION}
+ */
+ public static void checkPrecision(@Nullable final ChronoUnit precision) throws IllegalArgumentException {
+
+ if (!ALLOWED_PRECISION.contains(precision)) {
+ throw new IllegalArgumentException("Precision must be one of: " + ALLOWED_PRECISION);
+ }
+ }
+}
diff --git a/src/main/java/com/influxdb/v3/client/internal/FlightSqlClient.java b/src/main/java/com/influxdb/v3/client/internal/FlightSqlClient.java
new file mode 100644
index 0000000..8360df7
--- /dev/null
+++ b/src/main/java/com/influxdb/v3/client/internal/FlightSqlClient.java
@@ -0,0 +1,170 @@
+/*
+ * The MIT License
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in
+ * all copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+ * THE SOFTWARE.
+ */
+package com.influxdb.v3.client.internal;
+
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.Spliterator;
+import java.util.Spliterators;
+import java.util.stream.Stream;
+import java.util.stream.StreamSupport;
+import javax.annotation.Nonnull;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import io.grpc.Metadata;
+import org.apache.arrow.flight.FlightClient;
+import org.apache.arrow.flight.FlightStream;
+import org.apache.arrow.flight.HeaderCallOption;
+import org.apache.arrow.flight.Location;
+import org.apache.arrow.flight.Ticket;
+import org.apache.arrow.flight.grpc.MetadataAdapter;
+import org.apache.arrow.memory.RootAllocator;
+import org.apache.arrow.util.AutoCloseables;
+import org.apache.arrow.vector.VectorSchemaRoot;
+
+import com.influxdb.v3.client.config.InfluxDBClientConfigs;
+import com.influxdb.v3.client.query.QueryType;
+
+final class FlightSqlClient implements AutoCloseable {
+
+ private final HeaderCallOption headers;
+ private final FlightClient client;
+
+ private final ObjectMapper objectMapper = new ObjectMapper();
+
+ FlightSqlClient(@Nonnull final InfluxDBClientConfigs configs) {
+ Arguments.checkNotNull(configs, "configs");
+
+ MetadataAdapter metadata = new MetadataAdapter(new Metadata());
+ if (configs.getAuthToken() != null && configs.getAuthToken().length > 0) {
+ metadata.insert("Authorization", "Bearer " + new String(configs.getAuthToken()));
+ }
+
+ this.headers = new HeaderCallOption(metadata);
+
+ Location location;
+ try {
+ URI uri = new URI(configs.getHostUrl());
+ if ("https".equals(uri.getScheme())) {
+ location = Location.forGrpcTls(uri.getHost(), uri.getPort() != -1 ? uri.getPort() : 443);
+ } else {
+ location = Location.forGrpcInsecure(uri.getHost(), uri.getPort() != -1 ? uri.getPort() : 80);
+ }
+ } catch (URISyntaxException e) {
+ throw new RuntimeException(e);
+ }
+
+ client = FlightClient.builder()
+ .location(location)
+ .allocator(new RootAllocator(Long.MAX_VALUE))
+ .verifyServer(!configs.getDisableServerCertificateValidation())
+ .build();
+ }
+
+ @Nonnull
+ Stream execute(@Nonnull final String query,
+ @Nonnull final String database,
+ @Nonnull final QueryType queryType) {
+
+ HashMap ticketData = new HashMap() {{
+ put("database", database);
+ put("sql_query", query);
+ put("query_type", queryType.name().toLowerCase());
+ }};
+
+ String json;
+ try {
+ json = objectMapper.writeValueAsString(ticketData);
+ } catch (JsonProcessingException e) {
+ throw new RuntimeException(e);
+ }
+
+ Ticket ticket = new Ticket(json.getBytes(StandardCharsets.UTF_8));
+ FlightStream stream = client.getStream(ticket, headers);
+ FlightSqlIterator iterator = new FlightSqlIterator(stream);
+
+ Spliterator spliterator = Spliterators.spliteratorUnknownSize(iterator, Spliterator.NONNULL);
+ return StreamSupport.stream(spliterator, false).onClose(iterator::close);
+ }
+
+ @Override
+ public void close() throws Exception {
+ client.close();
+ }
+
+ private static final class FlightSqlIterator implements Iterator, AutoCloseable {
+
+ private final List autoCloseable = new ArrayList<>();
+
+ private final FlightStream flightStream;
+ private VectorSchemaRoot currentVectorSchemaRoot = null;
+
+ private FlightSqlIterator(@Nonnull final FlightStream flightStream) {
+ this.flightStream = flightStream;
+ loadNext();
+ }
+
+ @Override
+ public boolean hasNext() {
+ return currentVectorSchemaRoot != null;
+ }
+
+ @Override
+ public VectorSchemaRoot next() {
+ if (currentVectorSchemaRoot == null) {
+ throw new NoSuchElementException();
+ }
+
+ VectorSchemaRoot oldVectorSchemaRoot = currentVectorSchemaRoot;
+ loadNext();
+
+ return oldVectorSchemaRoot;
+ }
+
+ @Override
+ public void close() {
+ try {
+ AutoCloseables.close(autoCloseable);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private void loadNext() {
+
+ if (flightStream != null && flightStream.next()) {
+ currentVectorSchemaRoot = flightStream.getRoot();
+ autoCloseable.add(currentVectorSchemaRoot);
+
+ } else {
+ currentVectorSchemaRoot = null;
+ }
+ }
+ }
+}
diff --git a/src/main/java/com/influxdb/v3/client/internal/InfluxDBClientImpl.java b/src/main/java/com/influxdb/v3/client/internal/InfluxDBClientImpl.java
new file mode 100644
index 0000000..dec0b9c
--- /dev/null
+++ b/src/main/java/com/influxdb/v3/client/internal/InfluxDBClientImpl.java
@@ -0,0 +1,236 @@
+/*
+ * The MIT License
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in
+ * all copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+ * THE SOFTWARE.
+ */
+package com.influxdb.v3.client.internal;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.logging.Logger;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import java.util.stream.Stream;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import io.netty.handler.codec.http.HttpMethod;
+import org.apache.arrow.vector.FieldVector;
+import org.apache.arrow.vector.VectorSchemaRoot;
+
+import com.influxdb.v3.client.InfluxDBClient;
+import com.influxdb.v3.client.config.InfluxDBClientConfigs;
+import com.influxdb.v3.client.query.QueryParameters;
+import com.influxdb.v3.client.write.Point;
+import com.influxdb.v3.client.write.WriteParameters;
+import com.influxdb.v3.client.write.WritePrecision;
+
+/**
+ * Implementation of the InfluxDBClient. It is thread-safe and can be safely shared between threads.
+ *
+ * Please use {@link InfluxDBClient} to create an instance.
+ */
+public final class InfluxDBClientImpl implements InfluxDBClient {
+
+ private static final Logger LOG = Logger.getLogger(InfluxDBClientImpl.class.getName());
+
+ private static final String DATABASE_REQUIRED_MESSAGE = "Please specify the 'Database' as a method parameter "
+ + "or use default configuration at 'InfluxDBClientConfigs.database'.";
+
+ private boolean closed = false;
+ private final InfluxDBClientConfigs configs;
+
+ private final RestClient restClient;
+ private final FlightSqlClient flightSqlClient;
+
+ /**
+ * Creates an instance using the specified configs.
+ *
+ * Please use {@link InfluxDBClient} to create an instance.
+ *
+ * @param configs the client configs.
+ */
+ public InfluxDBClientImpl(@Nonnull final InfluxDBClientConfigs configs) {
+ Arguments.checkNotNull(configs, "configs");
+
+ configs.validate();
+
+ this.configs = configs;
+ this.restClient = new RestClient(configs);
+ this.flightSqlClient = new FlightSqlClient(configs);
+ }
+
+ @Override
+ public void writeRecord(@Nullable final String record) {
+ writeRecord(record, WriteParameters.DEFAULTS);
+ }
+
+ @Override
+ public void writeRecord(@Nullable final String record, @Nonnull final WriteParameters parameters) {
+ if (record == null) {
+ return;
+ }
+
+ writeRecords(Collections.singletonList(record), parameters);
+ }
+
+ @Override
+ public void writeRecords(@Nonnull final List records) {
+ writeRecords(records, WriteParameters.DEFAULTS);
+ }
+
+ @Override
+ public void writeRecords(@Nonnull final List records, @Nonnull final WriteParameters parameters) {
+ writeData(records, parameters);
+ }
+
+ @Override
+ public void writePoint(@Nullable final Point point) {
+ writePoint(point, WriteParameters.DEFAULTS);
+ }
+
+ @Override
+ public void writePoint(@Nullable final Point point, @Nonnull final WriteParameters parameters) {
+ if (point == null) {
+ return;
+ }
+
+ writePoints(Collections.singletonList(point), parameters);
+ }
+
+ @Override
+ public void writePoints(@Nonnull final List points) {
+ writePoints(points, WriteParameters.DEFAULTS);
+ }
+
+ @Override
+ public void writePoints(@Nonnull final List points, @Nonnull final WriteParameters parameters) {
+ writeData(points, parameters);
+ }
+
+ @Nonnull
+ @Override
+ public Stream