From 6f214d9be38a7fd64462fa88406c1b8463a501e4 Mon Sep 17 00:00:00 2001 From: NguyenHoangSon96 Date: Thu, 21 Nov 2024 17:43:55 +0700 Subject: [PATCH 01/23] feat: respect iox::column_type::field metadata when mapping query --- CHANGELOG.md | 3 + .../client/internal/InfluxDBClientImpl.java | 58 ++++++-- .../client/internal/NanosecondConverter.java | 52 +++++++ .../internal/VectorSchemaRootConverter.java | 28 +++- src/main/java/main.java | 52 +++++++ .../v3/client/InfluxDBClientTest.java | 57 +++++++ .../internal/NanosecondConverterTest.java | 91 ++++++++++++ .../VectorSchemaRootConverterTest.java | 139 ++++++++++++++++++ 8 files changed, 469 insertions(+), 11 deletions(-) create mode 100644 src/main/java/main.java create mode 100644 src/test/java/com/influxdb/v3/client/internal/NanosecondConverterTest.java diff --git a/CHANGELOG.md b/CHANGELOG.md index 0d1fbdd3..f683b249 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,8 @@ ## 0.10.0 [unreleased] +### Features +1. [#197](https://github.com/InfluxCommunity/influxdb3-java/pull/197): Respect iox::column_type::field metadata when mapping query results into values + ## 0.9.0 [2024-08-12] ### Features diff --git a/src/main/java/com/influxdb/v3/client/internal/InfluxDBClientImpl.java b/src/main/java/com/influxdb/v3/client/internal/InfluxDBClientImpl.java index f43178b8..c11948de 100644 --- a/src/main/java/com/influxdb/v3/client/internal/InfluxDBClientImpl.java +++ b/src/main/java/com/influxdb/v3/client/internal/InfluxDBClientImpl.java @@ -23,6 +23,7 @@ import java.io.ByteArrayOutputStream; import java.io.IOException; +import java.math.BigInteger; import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Collections; @@ -41,6 +42,7 @@ import io.netty.handler.codec.http.HttpMethod; import org.apache.arrow.vector.FieldVector; import org.apache.arrow.vector.VectorSchemaRoot; +import org.apache.arrow.vector.util.Text; import com.influxdb.v3.client.InfluxDBApiException; import com.influxdb.v3.client.InfluxDBClient; @@ -183,16 +185,52 @@ public Stream query(@Nonnull final String query, return queryData(query, parameters, options) .flatMap(vector -> { List fieldVectors = vector.getFieldVectors(); - return IntStream - .range(0, vector.getRowCount()) - .mapToObj(rowNumber -> { - - ArrayList row = new ArrayList<>(); - for (FieldVector fieldVector : fieldVectors) { - row.add(fieldVector.getObject(rowNumber)); - } - return row.toArray(); - }); + return IntStream.range(0, vector.getRowCount()) + .mapToObj(rowNumber -> { + ArrayList row = new ArrayList<>(); + for (int i = 0; i < fieldVectors.size(); i++) { + var schema = vector.getSchema().getFields().get(i); + var metaType = schema.getMetadata().get("iox::column::type"); + String valueType = metaType != null ? metaType.split("::")[2] : null; + + if ("field".equals(valueType)) { + switch (metaType) { + case "iox::column_type::field::integer": + case "iox::column_type::field::uinteger": + var intValue = (Long) fieldVectors.get(i) + .getObject(rowNumber); + row.add(intValue); + break; + case "iox::column_type::field::float": + var doubleValue = (Double) fieldVectors.get(i) + .getObject(rowNumber); + row.add(doubleValue); + break; + case "iox::column_type::field::string": + var textValue = (Text) fieldVectors.get(i) + .getObject(rowNumber); + row.add(textValue.toString()); + break; + case "iox::column_type::field::boolean": + var boolValue = (Boolean) fieldVectors.get(i) + .getObject(rowNumber); + row.add(boolValue); + break; + default: + } + } else if ("timestamp".equals(valueType) + || Objects.equals(schema.getName(), "time")) { + var timestamp = fieldVectors.get(i).getObject(rowNumber); + BigInteger time = NanosecondConverter.getTimestampNano(timestamp, schema); + row.add(time); + } else { + Object value = fieldVectors.get(i).getObject(rowNumber); + row.add(value); + } + } + + return row.toArray(); + }); }); } diff --git a/src/main/java/com/influxdb/v3/client/internal/NanosecondConverter.java b/src/main/java/com/influxdb/v3/client/internal/NanosecondConverter.java index f47c9227..dd1ee196 100644 --- a/src/main/java/com/influxdb/v3/client/internal/NanosecondConverter.java +++ b/src/main/java/com/influxdb/v3/client/internal/NanosecondConverter.java @@ -24,11 +24,18 @@ import java.math.BigDecimal; import java.math.BigInteger; import java.time.Instant; +import java.time.LocalDateTime; +import java.time.ZoneOffset; import java.util.HashMap; import java.util.Map; +import java.util.concurrent.TimeUnit; import java.util.function.Function; +import javax.annotation.Nonnull; import javax.annotation.Nullable; +import org.apache.arrow.vector.types.pojo.ArrowType; +import org.apache.arrow.vector.types.pojo.Field; + import com.influxdb.v3.client.write.WritePrecision; import static java.util.function.Function.identity; @@ -111,4 +118,49 @@ public static BigInteger convert(final Instant instant, final WritePrecision pre return FROM_NANOS.get(precision).apply(nanos); } + + public static BigInteger getTimestampNano(@Nonnull final Object value, @Nonnull final Field schema) { + BigInteger result = null; + + if (value instanceof Long) { + if (schema.getFieldType().getType() instanceof ArrowType.Timestamp) { + ArrowType.Timestamp type = (ArrowType.Timestamp) schema.getFieldType().getType(); + TimeUnit timeUnit; + switch (type.getUnit()) { + case SECOND: + timeUnit = TimeUnit.SECONDS; + break; + case MILLISECOND: + timeUnit = TimeUnit.MILLISECONDS; + break; + case MICROSECOND: + timeUnit = TimeUnit.MICROSECONDS; + break; + default: + case NANOSECOND: + timeUnit = TimeUnit.NANOSECONDS; + break; + } + long nanoseconds = TimeUnit.NANOSECONDS.convert((Long) value, timeUnit); + Instant instant = Instant.ofEpochSecond(0, nanoseconds); + result = convertInstantToNano(instant, WritePrecision.NS); + } else { + Instant instant = Instant.ofEpochMilli((Long) value); + result = convertInstantToNano(instant, WritePrecision.NS); + } + } else if (value instanceof LocalDateTime) { + Instant instant = ((LocalDateTime) value).toInstant(ZoneOffset.UTC); + result = convertInstantToNano(instant, WritePrecision.NS); + } + return result; + } + + private static BigInteger convertInstantToNano(final Instant instant, final WritePrecision precision) { + var writePrecision = WritePrecision.NS; + if (precision != null) { + writePrecision = precision; + } + BigInteger convertedTime = NanosecondConverter.convert(instant, writePrecision); + return NanosecondConverter.convertToNanos(convertedTime, writePrecision); + } } diff --git a/src/main/java/com/influxdb/v3/client/internal/VectorSchemaRootConverter.java b/src/main/java/com/influxdb/v3/client/internal/VectorSchemaRootConverter.java index d9587052..42e3e259 100644 --- a/src/main/java/com/influxdb/v3/client/internal/VectorSchemaRootConverter.java +++ b/src/main/java/com/influxdb/v3/client/internal/VectorSchemaRootConverter.java @@ -95,7 +95,7 @@ PointValues toPointValues(final int rowNumber, String valueType = parts[2]; if ("field".equals(valueType)) { - p.setField(name, value); + setFieldWithMetaType(p, name, value, metaType); } else if ("tag".equals(valueType) && value instanceof String) { p.setTag(name, (String) value); } else if ("timestamp".equals(valueType)) { @@ -105,6 +105,32 @@ PointValues toPointValues(final int rowNumber, return p; } + private void setFieldWithMetaType(final PointValues p, + final String name, + final Object value, + final String metaType) { + if (value == null) { + return; + } + + switch (metaType) { + case "iox::column_type::field::integer": + case "iox::column_type::field::uinteger": + p.setIntegerField(name, (Long) value); + break; + case "iox::column_type::field::float": + p.setFloatField(name, (Double) value); + break; + case "iox::column_type::field::string": + p.setStringField(name, (String) value); + break; + case "iox::column_type::field::boolean": + p.setBooleanField(name, (Boolean) value); + break; + default: + } + } + private void setTimestamp(@Nonnull final Object value, @Nonnull final Field schema, @Nonnull final PointValues pointValues) { diff --git a/src/main/java/main.java b/src/main/java/main.java new file mode 100644 index 00000000..153efb52 --- /dev/null +++ b/src/main/java/main.java @@ -0,0 +1,52 @@ +/* + * The MIT License + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + * THE SOFTWARE. + */ +import com.influxdb.v3.client.InfluxDBClient; +import com.influxdb.v3.client.PointValues; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +public class main { + public static void main(String[] args) { + + String host = "https://us-east-1-1.aws.cloud2.influxdata.com"; + char[] token = "xWh3VQCb3pMJPw7T2lnEwFLXO-pb4OWzfNN76UTpmRKtlg83yJlz6maLC3AL0B6M6gMWWZY2QApzSdEeEopWlQ==".toCharArray(); + String database = "admin"; + +// List arrayList = new ArrayList<>(); + try (InfluxDBClient client = InfluxDBClient.getInstance(host, token, database)) { +// arrayList = client.queryPoints("SELECT * FROM host2") +// .collect(Collectors.toList()); +// System.out.println("arrayList = " + arrayList); + Stream query = client.query("SELECT * FROM host2"); + query.forEach(row -> { + System.out.println("row = " + Arrays.toString(row)); + }); + client.close(); + } catch (Exception e) { + throw new RuntimeException(e); + } + } +} diff --git a/src/test/java/com/influxdb/v3/client/InfluxDBClientTest.java b/src/test/java/com/influxdb/v3/client/InfluxDBClientTest.java index 2c9fdde0..d88758ec 100644 --- a/src/test/java/com/influxdb/v3/client/InfluxDBClientTest.java +++ b/src/test/java/com/influxdb/v3/client/InfluxDBClientTest.java @@ -21,11 +21,19 @@ */ package com.influxdb.v3.client; +import java.math.BigInteger; +import java.time.Instant; import java.util.Map; import java.util.Properties; +import java.util.UUID; +import java.util.stream.Stream; import org.assertj.core.api.Assertions; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.condition.EnabledIfEnvironmentVariable; + +import com.influxdb.v3.client.write.WriteOptions; +import com.influxdb.v3.client.write.WritePrecision; class InfluxDBClientTest { @@ -116,4 +124,53 @@ public void unsupportedQueryParams() throws Exception { + "class com.influxdb.v3.client.internal.InfluxDBClientImpl"); } } + + @EnabledIfEnvironmentVariable(named = "TESTING_INFLUXDB_URL", matches = ".*") + @EnabledIfEnvironmentVariable(named = "TESTING_INFLUXDB_TOKEN", matches = ".*") + @EnabledIfEnvironmentVariable(named = "TESTING_INFLUXDB_DATABASE", matches = ".*") + @Test + public void testQuery() throws Exception { + try (InfluxDBClient client = InfluxDBClient.getInstance( + System.getenv("TESTING_INFLUXDB_URL"), + System.getenv("TESTING_INFLUXDB_TOKEN").toCharArray(), + System.getenv("TESTING_INFLUXDB_DATABASE"), + null)) { + String uuid = UUID.randomUUID().toString(); + long timestamp = Instant.now().getEpochSecond(); + String record = String.format( + "host10,tag=empty " + + "name=\"intel\"," + + "mem_total=2048," + + "disk_free=100i," + + "temperature=100.86," + + "isActive=true," + + "testId=\"%s\" %d", + uuid, + timestamp + ); + client.writeRecord(record, new WriteOptions(null, WritePrecision.S, null)); + + Map parameters = Map.of("testId", uuid); + String sql = "Select * from host10 where \"testId\"=$testId"; + try (Stream stream = client.query(sql, parameters)) { + stream.findFirst() + .ifPresent(objects -> { + Assertions.assertThat(objects[0].getClass()).isEqualTo(Long.class); + Assertions.assertThat(objects[0]).isEqualTo(100L); + + Assertions.assertThat(objects[1].getClass()).isEqualTo(Boolean.class); + Assertions.assertThat(objects[1]).isEqualTo(true); + + Assertions.assertThat(objects[2].getClass()).isEqualTo(Double.class); + Assertions.assertThat(objects[2]).isEqualTo(2048.0); + + Assertions.assertThat(objects[3].getClass()).isEqualTo(String.class); + Assertions.assertThat(objects[3]).isEqualTo("intel"); + + Assertions.assertThat(objects[7].getClass()).isEqualTo(BigInteger.class); + Assertions.assertThat(objects[7]).isEqualTo(BigInteger.valueOf(timestamp * 1_000_000_000)); + }); + } + } + } } diff --git a/src/test/java/com/influxdb/v3/client/internal/NanosecondConverterTest.java b/src/test/java/com/influxdb/v3/client/internal/NanosecondConverterTest.java new file mode 100644 index 00000000..31ea7f1f --- /dev/null +++ b/src/test/java/com/influxdb/v3/client/internal/NanosecondConverterTest.java @@ -0,0 +1,91 @@ +/* + * The MIT License + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + * THE SOFTWARE. + */ +package com.influxdb.v3.client.internal; + +import java.math.BigInteger; + +import org.apache.arrow.vector.types.TimeUnit; +import org.apache.arrow.vector.types.pojo.ArrowType; +import org.apache.arrow.vector.types.pojo.Field; +import org.apache.arrow.vector.types.pojo.FieldType; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +public class NanosecondConverterTest { + + @Test + void testGetTimestampNanosecond() { + BigInteger timestampNanoSecond = null; + + // Second + FieldType timeTypeSecond = new FieldType(true, + new ArrowType.Timestamp(TimeUnit.SECOND, "UTC"), + null); + Field timeFieldSecond = new Field("time", timeTypeSecond, null); + timestampNanoSecond = NanosecondConverter.getTimestampNano(123_456L, timeFieldSecond); + Assertions.assertEquals( + BigInteger.valueOf(123_456L) + .multiply(BigInteger.valueOf(1_000_000_000)), timestampNanoSecond + ); + + // MilliSecond + FieldType timeTypeMilliSecond = new FieldType(true, + new ArrowType.Timestamp(TimeUnit.MILLISECOND, "UTC"), + null); + Field timeFieldMilliSecond = new Field("time", timeTypeMilliSecond, null); + timestampNanoSecond = NanosecondConverter.getTimestampNano(123_456L, timeFieldMilliSecond); + Assertions.assertEquals( + BigInteger.valueOf(123_456L) + .multiply(BigInteger.valueOf(1_000_000)), timestampNanoSecond + ); + + // MicroSecond + FieldType timeTypeMicroSecond = new FieldType(true, + new ArrowType.Timestamp(TimeUnit.MICROSECOND, "UTC"), + null); + Field timeFieldMicroSecond = new Field("time", timeTypeMicroSecond, null); + timestampNanoSecond = NanosecondConverter.getTimestampNano(123_456L, timeFieldMicroSecond); + Assertions.assertEquals( + BigInteger.valueOf(123_456L) + .multiply(BigInteger.valueOf(1_000)), timestampNanoSecond + ); + + // Nano Second + FieldType timeTypeNanoSecond = new FieldType(true, + new ArrowType.Timestamp(TimeUnit.NANOSECOND, "UTC"), + null); + Field timeFieldNanoSecond = new Field("time", timeTypeNanoSecond, null); + timestampNanoSecond = NanosecondConverter.getTimestampNano(123_456L, timeFieldNanoSecond); + Assertions.assertEquals(BigInteger.valueOf(123_456L), timestampNanoSecond); + + // For ArrowType.Time type + FieldType timeMilliSecond = new FieldType(true, + new ArrowType.Time(TimeUnit.MILLISECOND, 32), + null); + Field fieldMilliSecond = new Field("time", timeMilliSecond, null); + timestampNanoSecond = NanosecondConverter.getTimestampNano(123_456L, fieldMilliSecond); + Assertions.assertEquals( + BigInteger.valueOf(123_456L) + .multiply(BigInteger.valueOf(1_000_000)), timestampNanoSecond + ); + } +} diff --git a/src/test/java/com/influxdb/v3/client/internal/VectorSchemaRootConverterTest.java b/src/test/java/com/influxdb/v3/client/internal/VectorSchemaRootConverterTest.java index 7915b2bc..f0b77c9b 100644 --- a/src/test/java/com/influxdb/v3/client/internal/VectorSchemaRootConverterTest.java +++ b/src/test/java/com/influxdb/v3/client/internal/VectorSchemaRootConverterTest.java @@ -25,16 +25,20 @@ import java.nio.charset.StandardCharsets; import java.util.Arrays; import java.util.HashMap; +import java.util.List; import java.util.Map; import javax.annotation.Nonnull; import org.apache.arrow.memory.RootAllocator; import org.apache.arrow.vector.BaseFixedWidthVector; import org.apache.arrow.vector.BigIntVector; +import org.apache.arrow.vector.BitVector; +import org.apache.arrow.vector.Float8Vector; import org.apache.arrow.vector.TimeMilliVector; import org.apache.arrow.vector.TimeStampVector; import org.apache.arrow.vector.VarCharVector; import org.apache.arrow.vector.VectorSchemaRoot; +import org.apache.arrow.vector.types.FloatingPointPrecision; import org.apache.arrow.vector.types.TimeUnit; import org.apache.arrow.vector.types.pojo.ArrowType; import org.apache.arrow.vector.types.pojo.Field; @@ -125,6 +129,41 @@ void timestampAsArrowInt() { } } + @Test + public void testConverterWithMetaType() { + try (VectorSchemaRoot root = generateVectorSchemaRoot()) { + PointValues pointValues = VectorSchemaRootConverter.INSTANCE.toPointValues(0, root, root.getFieldVectors()); + + String measurement = pointValues.getMeasurement(); + Assertions.assertThat(measurement).isEqualTo("host"); + Assertions.assertThat(measurement.getClass()).isEqualTo(String.class); + + BigInteger expected = BigInteger.valueOf(123_456L * 1_000_000); + Assertions.assertThat((BigInteger) pointValues.getTimestamp()).isEqualByComparingTo(expected); + + Long memTotal = (Long) pointValues.getField("mem_total"); + Assertions.assertThat(memTotal).isEqualTo(2048); + Assertions.assertThat(memTotal.getClass()).isEqualTo(Long.class); + + Long diskFree = (Long) pointValues.getField("disk_free"); + Assertions.assertThat(diskFree).isEqualTo(1_000_000); + Assertions.assertThat(diskFree.getClass()).isEqualTo(Long.class); + + Double temperature = (Double) pointValues.getField("temperature"); + Assertions.assertThat(temperature).isEqualTo(100.8766f); + Assertions.assertThat(temperature.getClass()).isEqualTo(Double.class); + + String name = (String) pointValues.getField("name"); + Assertions.assertThat(name).isEqualTo("intel"); + Assertions.assertThat(name.getClass()).isEqualTo(String.class); + + Boolean isActive = (Boolean) pointValues.getField("isActive"); + Assertions.assertThat(isActive).isEqualTo(true); + Assertions.assertThat(isActive.getClass()).isEqualTo(Boolean.class); + } + } + + @Test void timestampWithoutMetadataAndFieldWithoutMetadata() { FieldType timeType = new FieldType(true, new ArrowType.Time(TimeUnit.MILLISECOND, 32), null); @@ -237,4 +276,104 @@ private VectorSchemaRoot initializeVectorSchemaRoot(@Nonnull final Field... fiel return root; } + + public VectorSchemaRoot generateVectorSchemaRoot() { + Field measurementField = generateStringField("measurement"); + Field timeField = generateTimeField(); + Field memTotalField = generateIntField("mem_total"); + Field diskFreeField = generateUnsignedIntField("disk_free"); + Field temperatureField = generateFloatField("temperature"); + Field nameField = generateStringField("name"); + Field isActiveField = generateBoolField("isActive"); + List fields = List.of(measurementField, + timeField, + memTotalField, + diskFreeField, + temperatureField, + nameField, + isActiveField); + + VectorSchemaRoot root = initializeVectorSchemaRoot(fields.toArray(new Field[0])); + VarCharVector measurement = (VarCharVector) root.getVector("measurement"); + measurement.allocateNew(); + measurement.set(0, "host".getBytes()); + + TimeMilliVector timeVector = (TimeMilliVector) root.getVector("time"); + timeVector.allocateNew(); + timeVector.setSafe(0, 123_456); + + BigIntVector intVector = (BigIntVector) root.getVector("mem_total"); + intVector.allocateNew(); + intVector.set(0, 2048); + + BigIntVector unsignedIntVector = (BigIntVector) root.getVector("disk_free"); + unsignedIntVector.allocateNew(); + unsignedIntVector.set(0, 1_000_000); + + Float8Vector floatVector = (Float8Vector) root.getVector("temperature"); + floatVector.allocateNew(); + floatVector.set(0, 100.8766f); + + VarCharVector stringVector = (VarCharVector) root.getVector("name"); + stringVector.allocateNew(); + stringVector.setSafe(0, "intel".getBytes()); + + BitVector boolVector = (BitVector) root.getVector("isActive"); + boolVector.allocateNew(); + boolVector.setSafe(0, 1); + + return root; + } + + private Field generateIntField(final String fieldName) { + Map metadata = new HashMap<>(); + metadata.put("iox::column::type", "iox::column_type::field::integer"); + FieldType intType = new FieldType(true, + new ArrowType.Int(64, true), + null, + metadata); + return new Field(fieldName, intType, null); + } + + private Field generateUnsignedIntField(final String fieldName) { + Map metadata = new HashMap<>(); + metadata.put("iox::column::type", "iox::column_type::field::uinteger"); + FieldType intType = new FieldType(true, + new ArrowType.Int(64, true), + null, + metadata); + return new Field(fieldName, intType, null); + } + + private Field generateFloatField(final String fieldName) { + Map metadata = new HashMap<>(); + metadata.put("iox::column::type", "iox::column_type::field::float"); + FieldType floatType = new FieldType(true, + new ArrowType.FloatingPoint(FloatingPointPrecision.DOUBLE), + null, + metadata); + return new Field(fieldName, floatType, null); + } + + private Field generateStringField(final String fieldName) { + Map metadata = new HashMap<>(); + metadata.put("iox::column::type", "iox::column_type::field::string"); + FieldType stringType = new FieldType(true, new ArrowType.Utf8(), null, metadata); + return new Field(fieldName, stringType, null); + } + + private Field generateBoolField(final String fieldName) { + Map metadata = new HashMap<>(); + metadata.put("iox::column::type", "iox::column_type::field::boolean"); + FieldType boolType = new FieldType(true, new ArrowType.Bool(), null, metadata); + return new Field(fieldName, boolType, null); + } + + private Field generateTimeField() { + FieldType timeType = new FieldType(true, + new ArrowType.Time(TimeUnit.MILLISECOND, 32), + null); + return new Field("time", timeType, null); + } + } From 0c91353ade1faf4fc9fa68cbae4640ecd8d912bf Mon Sep 17 00:00:00 2001 From: NguyenHoangSon96 Date: Thu, 21 Nov 2024 17:49:21 +0700 Subject: [PATCH 02/23] chore: update CHANGELOG.mdlog.md --- CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index f683b249..f0983259 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,7 +1,7 @@ ## 0.10.0 [unreleased] ### Features -1. [#197](https://github.com/InfluxCommunity/influxdb3-java/pull/197): Respect iox::column_type::field metadata when mapping query results into values +1. [#200](https://github.com/InfluxCommunity/influxdb3-java/pull/200): Respect iox::column_type::field metadata when mapping query results into values ## 0.9.0 [2024-08-12] From def624cdf8cf8c68fdc107b8d07838254640c231 Mon Sep 17 00:00:00 2001 From: NguyenHoangSon96 Date: Thu, 21 Nov 2024 17:52:12 +0700 Subject: [PATCH 03/23] chore: remove unused file --- src/main/java/main.java | 52 ----------------------------------------- 1 file changed, 52 deletions(-) delete mode 100644 src/main/java/main.java diff --git a/src/main/java/main.java b/src/main/java/main.java deleted file mode 100644 index 153efb52..00000000 --- a/src/main/java/main.java +++ /dev/null @@ -1,52 +0,0 @@ -/* - * The MIT License - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in - * all copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN - * THE SOFTWARE. - */ -import com.influxdb.v3.client.InfluxDBClient; -import com.influxdb.v3.client.PointValues; - -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; -import java.util.stream.Collectors; -import java.util.stream.Stream; - -public class main { - public static void main(String[] args) { - - String host = "https://us-east-1-1.aws.cloud2.influxdata.com"; - char[] token = "xWh3VQCb3pMJPw7T2lnEwFLXO-pb4OWzfNN76UTpmRKtlg83yJlz6maLC3AL0B6M6gMWWZY2QApzSdEeEopWlQ==".toCharArray(); - String database = "admin"; - -// List arrayList = new ArrayList<>(); - try (InfluxDBClient client = InfluxDBClient.getInstance(host, token, database)) { -// arrayList = client.queryPoints("SELECT * FROM host2") -// .collect(Collectors.toList()); -// System.out.println("arrayList = " + arrayList); - Stream query = client.query("SELECT * FROM host2"); - query.forEach(row -> { - System.out.println("row = " + Arrays.toString(row)); - }); - client.close(); - } catch (Exception e) { - throw new RuntimeException(e); - } - } -} From a3408c0c890b76b785824b1dfb0ce32f3e68035c Mon Sep 17 00:00:00 2001 From: NguyenHoangSon96 Date: Thu, 21 Nov 2024 17:57:11 +0700 Subject: [PATCH 04/23] chore: lint CHANGELOG.md --- CHANGELOG.md | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index f0983259..59f170a1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,7 +1,9 @@ ## 0.10.0 [unreleased] ### Features -1. [#200](https://github.com/InfluxCommunity/influxdb3-java/pull/200): Respect iox::column_type::field metadata when mapping query results into values + +1. [#200](https://github.com/InfluxCommunity/influxdb3-java/pull/200): Respect iox::column_type::field metadata when + mapping query results into values ## 0.9.0 [2024-08-12] From cf375aa468185cb30df1ce27252cc1ce8ca36128 Mon Sep 17 00:00:00 2001 From: NguyenHoangSon96 Date: Fri, 22 Nov 2024 15:00:59 +0700 Subject: [PATCH 05/23] refactor: remove setTimestamp function --- .../client/internal/InfluxDBClientImpl.java | 22 ++++----- .../internal/VectorSchemaRootConverter.java | 46 +++---------------- 2 files changed, 16 insertions(+), 52 deletions(-) diff --git a/src/main/java/com/influxdb/v3/client/internal/InfluxDBClientImpl.java b/src/main/java/com/influxdb/v3/client/internal/InfluxDBClientImpl.java index c11948de..d3d1e989 100644 --- a/src/main/java/com/influxdb/v3/client/internal/InfluxDBClientImpl.java +++ b/src/main/java/com/influxdb/v3/client/internal/InfluxDBClientImpl.java @@ -185,52 +185,48 @@ public Stream query(@Nonnull final String query, return queryData(query, parameters, options) .flatMap(vector -> { List fieldVectors = vector.getFieldVectors(); + var fields = vector.getSchema().getFields(); return IntStream.range(0, vector.getRowCount()) .mapToObj(rowNumber -> { ArrayList row = new ArrayList<>(); for (int i = 0; i < fieldVectors.size(); i++) { - var schema = vector.getSchema().getFields().get(i); + var schema = fields.get(i); var metaType = schema.getMetadata().get("iox::column::type"); String valueType = metaType != null ? metaType.split("::")[2] : null; + Object value = fieldVectors.get(i).getObject(rowNumber); if ("field".equals(valueType)) { switch (metaType) { case "iox::column_type::field::integer": case "iox::column_type::field::uinteger": - var intValue = (Long) fieldVectors.get(i) - .getObject(rowNumber); + var intValue = (Long) value; row.add(intValue); break; case "iox::column_type::field::float": - var doubleValue = (Double) fieldVectors.get(i) - .getObject(rowNumber); + var doubleValue = (Double) value; row.add(doubleValue); break; case "iox::column_type::field::string": - var textValue = (Text) fieldVectors.get(i) - .getObject(rowNumber); + var textValue = (Text) value; row.add(textValue.toString()); break; case "iox::column_type::field::boolean": - var boolValue = (Boolean) fieldVectors.get(i) - .getObject(rowNumber); + var boolValue = (Boolean) value; row.add(boolValue); break; default: } } else if ("timestamp".equals(valueType) || Objects.equals(schema.getName(), "time")) { - var timestamp = fieldVectors.get(i).getObject(rowNumber); - BigInteger time = NanosecondConverter.getTimestampNano(timestamp, schema); + BigInteger time = NanosecondConverter.getTimestampNano(value, schema); row.add(time); } else { - Object value = fieldVectors.get(i).getObject(rowNumber); row.add(value); } } return row.toArray(); - }); + }); }); } diff --git a/src/main/java/com/influxdb/v3/client/internal/VectorSchemaRootConverter.java b/src/main/java/com/influxdb/v3/client/internal/VectorSchemaRootConverter.java index 42e3e259..30c805d9 100644 --- a/src/main/java/com/influxdb/v3/client/internal/VectorSchemaRootConverter.java +++ b/src/main/java/com/influxdb/v3/client/internal/VectorSchemaRootConverter.java @@ -21,22 +21,18 @@ */ package com.influxdb.v3.client.internal; -import java.time.Instant; import java.time.LocalDateTime; -import java.time.ZoneOffset; import java.util.List; import java.util.Objects; -import java.util.concurrent.TimeUnit; import javax.annotation.Nonnull; import javax.annotation.concurrent.ThreadSafe; import org.apache.arrow.vector.FieldVector; import org.apache.arrow.vector.VectorSchemaRoot; -import org.apache.arrow.vector.types.pojo.ArrowType; -import org.apache.arrow.vector.types.pojo.Field; import org.apache.arrow.vector.util.Text; import com.influxdb.v3.client.PointValues; +import com.influxdb.v3.client.write.WritePrecision; /** @@ -82,7 +78,8 @@ PointValues toPointValues(final int rowNumber, if (metaType == null) { if (Objects.equals(name, "time") && (value instanceof Long || value instanceof LocalDateTime)) { - setTimestamp(value, schema, p); + var timeNano = NanosecondConverter.getTimestampNano(value, schema); + p.setTimestamp(timeNano, WritePrecision.NS); } else { // just push as field If you don't know what type is it p.setField(name, value); @@ -99,7 +96,8 @@ PointValues toPointValues(final int rowNumber, } else if ("tag".equals(valueType) && value instanceof String) { p.setTag(name, (String) value); } else if ("timestamp".equals(valueType)) { - setTimestamp(value, schema, p); + var timeNano = NanosecondConverter.getTimestampNano(value, schema); + p.setTimestamp(timeNano, WritePrecision.NS); } } return p; @@ -128,38 +126,8 @@ private void setFieldWithMetaType(final PointValues p, p.setBooleanField(name, (Boolean) value); break; default: - } - } - - private void setTimestamp(@Nonnull final Object value, - @Nonnull final Field schema, - @Nonnull final PointValues pointValues) { - if (value instanceof Long) { - if (schema.getFieldType().getType() instanceof ArrowType.Timestamp) { - ArrowType.Timestamp type = (ArrowType.Timestamp) schema.getFieldType().getType(); - TimeUnit timeUnit; - switch (type.getUnit()) { - case SECOND: - timeUnit = TimeUnit.SECONDS; - break; - case MILLISECOND: - timeUnit = TimeUnit.MILLISECONDS; - break; - case MICROSECOND: - timeUnit = TimeUnit.MICROSECONDS; - break; - default: - case NANOSECOND: - timeUnit = TimeUnit.NANOSECONDS; - break; - } - long nanoseconds = TimeUnit.NANOSECONDS.convert((Long) value, timeUnit); - pointValues.setTimestamp(Instant.ofEpochSecond(0, nanoseconds)); - } else { - pointValues.setTimestamp(Instant.ofEpochMilli((Long) value)); - } - } else if (value instanceof LocalDateTime) { - pointValues.setTimestamp(((LocalDateTime) value).toInstant(ZoneOffset.UTC)); + p.setField(name, value); + break; } } } From 541e23460354c2eda2571edf265bb382106c2bb0 Mon Sep 17 00:00:00 2001 From: NguyenHoangSon96 Date: Fri, 22 Nov 2024 15:11:55 +0700 Subject: [PATCH 06/23] chore: update CHANGELOG.md --- CHANGELOG.md | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 59f170a1..ec31a27e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,7 +3,12 @@ ### Features 1. [#200](https://github.com/InfluxCommunity/influxdb3-java/pull/200): Respect iox::column_type::field metadata when - mapping query results into values + mapping query results into values. + - iox::column_type::field::integer: => Long + - iox::column_type::field::uinteger: => Long + - iox::column_type::field::float: => Double + - iox::column_type::field::string: => String + - iox::column_type::field::boolean: => Boolean ## 0.9.0 [2024-08-12] From 7d35ff0789f632eb971114d3ec9bbbf0ee3beda8 Mon Sep 17 00:00:00 2001 From: NguyenHoangSon96 Date: Mon, 25 Nov 2024 14:49:09 +0700 Subject: [PATCH 07/23] chore: update CHANGELOG.md and pom.xml release version --- CHANGELOG.md | 2 +- pom.xml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index ec31a27e..0eee2d9f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,4 +1,4 @@ -## 0.10.0 [unreleased] +## 1.0.0 [unreleased] ### Features diff --git a/pom.xml b/pom.xml index 7ce47d1b..201e1082 100644 --- a/pom.xml +++ b/pom.xml @@ -28,7 +28,7 @@ com.influxdb influxdb3-java jar - 0.10.0-SNAPSHOT + 1.0.0-SNAPSHOT InfluxDB 3 Java Client From 95a29cb91f101f89e939603a4985b629823e0285 Mon Sep 17 00:00:00 2001 From: NguyenHoangSon96 Date: Mon, 25 Nov 2024 14:49:59 +0700 Subject: [PATCH 08/23] chore: add javadoc for getTimestampNano function --- .../v3/client/internal/NanosecondConverter.java | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/src/main/java/com/influxdb/v3/client/internal/NanosecondConverter.java b/src/main/java/com/influxdb/v3/client/internal/NanosecondConverter.java index dd1ee196..6d380a8c 100644 --- a/src/main/java/com/influxdb/v3/client/internal/NanosecondConverter.java +++ b/src/main/java/com/influxdb/v3/client/internal/NanosecondConverter.java @@ -119,12 +119,19 @@ public static BigInteger convert(final Instant instant, final WritePrecision pre return FROM_NANOS.get(precision).apply(nanos); } - public static BigInteger getTimestampNano(@Nonnull final Object value, @Nonnull final Field schema) { + /** + * Convert Long or LocalDateTime to timestamp nanosecond + * + * @param value the time in Long or LocalDateTime + * @param field the arrow field metadata + * @return the time in nanosecond + */ + public static BigInteger getTimestampNano(@Nonnull final Object value, @Nonnull final Field field) { BigInteger result = null; if (value instanceof Long) { - if (schema.getFieldType().getType() instanceof ArrowType.Timestamp) { - ArrowType.Timestamp type = (ArrowType.Timestamp) schema.getFieldType().getType(); + if (field.getFieldType().getType() instanceof ArrowType.Timestamp) { + ArrowType.Timestamp type = (ArrowType.Timestamp) field.getFieldType().getType(); TimeUnit timeUnit; switch (type.getUnit()) { case SECOND: @@ -136,8 +143,8 @@ public static BigInteger getTimestampNano(@Nonnull final Object value, @Nonnull case MICROSECOND: timeUnit = TimeUnit.MICROSECONDS; break; - default: case NANOSECOND: + default: timeUnit = TimeUnit.NANOSECONDS; break; } From 088b36dd03ee3cf71409fcfed09cb84f00539fb9 Mon Sep 17 00:00:00 2001 From: NguyenHoangSon96 Date: Mon, 25 Nov 2024 14:52:25 +0700 Subject: [PATCH 09/23] refactor: change to enhance for loop --- .../client/internal/InfluxDBClientImpl.java | 80 +++++++++---------- .../internal/VectorSchemaRootConverter.java | 14 ++-- 2 files changed, 45 insertions(+), 49 deletions(-) diff --git a/src/main/java/com/influxdb/v3/client/internal/InfluxDBClientImpl.java b/src/main/java/com/influxdb/v3/client/internal/InfluxDBClientImpl.java index d3d1e989..d845e102 100644 --- a/src/main/java/com/influxdb/v3/client/internal/InfluxDBClientImpl.java +++ b/src/main/java/com/influxdb/v3/client/internal/InfluxDBClientImpl.java @@ -183,51 +183,47 @@ public Stream query(@Nonnull final String query, @Nonnull final Map parameters, @Nonnull final QueryOptions options) { return queryData(query, parameters, options) - .flatMap(vector -> { - List fieldVectors = vector.getFieldVectors(); - var fields = vector.getSchema().getFields(); - return IntStream.range(0, vector.getRowCount()) - .mapToObj(rowNumber -> { - ArrayList row = new ArrayList<>(); - for (int i = 0; i < fieldVectors.size(); i++) { - var schema = fields.get(i); - var metaType = schema.getMetadata().get("iox::column::type"); - String valueType = metaType != null ? metaType.split("::")[2] : null; - - Object value = fieldVectors.get(i).getObject(rowNumber); - if ("field".equals(valueType)) { - switch (metaType) { - case "iox::column_type::field::integer": - case "iox::column_type::field::uinteger": - var intValue = (Long) value; - row.add(intValue); - break; - case "iox::column_type::field::float": - var doubleValue = (Double) value; - row.add(doubleValue); - break; - case "iox::column_type::field::string": - var textValue = (Text) value; - row.add(textValue.toString()); - break; - case "iox::column_type::field::boolean": - var boolValue = (Boolean) value; - row.add(boolValue); - break; - default: - } - } else if ("timestamp".equals(valueType) - || Objects.equals(schema.getName(), "time")) { - BigInteger time = NanosecondConverter.getTimestampNano(value, schema); - row.add(time); - } else { - row.add(value); + .flatMap(vector -> IntStream.range(0, vector.getRowCount()) + .mapToObj(rowNumber -> { + ArrayList row = new ArrayList<>(); + for (FieldVector fieldVector : vector.getFieldVectors()) { + var field = fieldVector.getField(); + var metaType = field.getMetadata().get("iox::column::type"); + String valueType = metaType != null ? metaType.split("::")[2] : null; + + Object value = fieldVector.getObject(rowNumber); + if ("field".equals(valueType)) { + switch (metaType) { + case "iox::column_type::field::integer": + case "iox::column_type::field::uinteger": + var intValue = (Long) value; + row.add(intValue); + break; + case "iox::column_type::field::float": + var doubleValue = (Double) value; + row.add(doubleValue); + break; + case "iox::column_type::field::string": + var textValue = (Text) value; + row.add(textValue.toString()); + break; + case "iox::column_type::field::boolean": + var boolValue = (Boolean) value; + row.add(boolValue); + break; + default: } + } else if ("timestamp".equals(valueType) + || Objects.equals(field.getName(), "time")) { + BigInteger time = NanosecondConverter.getTimestampNano(value, field); + row.add(time); + } else { + row.add(value); } + } - return row.toArray(); - }); - }); + return row.toArray(); + })); } @Nonnull diff --git a/src/main/java/com/influxdb/v3/client/internal/VectorSchemaRootConverter.java b/src/main/java/com/influxdb/v3/client/internal/VectorSchemaRootConverter.java index 30c805d9..fd4ea683 100644 --- a/src/main/java/com/influxdb/v3/client/internal/VectorSchemaRootConverter.java +++ b/src/main/java/com/influxdb/v3/client/internal/VectorSchemaRootConverter.java @@ -59,11 +59,11 @@ PointValues toPointValues(final int rowNumber, @Nonnull final VectorSchemaRoot vector, @Nonnull final List fieldVectors) { PointValues p = new PointValues(); - for (int i = 0; i < fieldVectors.size(); i++) { - var schema = vector.getSchema().getFields().get(i); - var value = fieldVectors.get(i).getObject(rowNumber); - var name = schema.getName(); - var metaType = schema.getMetadata().get("iox::column::type"); + for (FieldVector fieldVector : fieldVectors) { + var field = fieldVector.getField(); + var value = fieldVector.getObject(rowNumber); + var name = field.getName(); + var metaType = field.getMetadata().get("iox::column::type"); if (value instanceof Text) { value = value.toString(); @@ -78,7 +78,7 @@ PointValues toPointValues(final int rowNumber, if (metaType == null) { if (Objects.equals(name, "time") && (value instanceof Long || value instanceof LocalDateTime)) { - var timeNano = NanosecondConverter.getTimestampNano(value, schema); + var timeNano = NanosecondConverter.getTimestampNano(value, field); p.setTimestamp(timeNano, WritePrecision.NS); } else { // just push as field If you don't know what type is it @@ -96,7 +96,7 @@ PointValues toPointValues(final int rowNumber, } else if ("tag".equals(valueType) && value instanceof String) { p.setTag(name, (String) value); } else if ("timestamp".equals(valueType)) { - var timeNano = NanosecondConverter.getTimestampNano(value, schema); + var timeNano = NanosecondConverter.getTimestampNano(value, field); p.setTimestamp(timeNano, WritePrecision.NS); } } From 2696bc7c1db58ff12d949af5e224401b44c8fb98 Mon Sep 17 00:00:00 2001 From: NguyenHoangSon96 Date: Mon, 25 Nov 2024 14:54:44 +0700 Subject: [PATCH 10/23] fix: build --- .../com/influxdb/v3/client/internal/NanosecondConverter.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/com/influxdb/v3/client/internal/NanosecondConverter.java b/src/main/java/com/influxdb/v3/client/internal/NanosecondConverter.java index 6d380a8c..770cec4c 100644 --- a/src/main/java/com/influxdb/v3/client/internal/NanosecondConverter.java +++ b/src/main/java/com/influxdb/v3/client/internal/NanosecondConverter.java @@ -120,7 +120,7 @@ public static BigInteger convert(final Instant instant, final WritePrecision pre } /** - * Convert Long or LocalDateTime to timestamp nanosecond + * Convert Long or LocalDateTime to timestamp nanosecond. * * @param value the time in Long or LocalDateTime * @param field the arrow field metadata From 6c3d2fba97a35f2669d84f03ff859501dcfd7ab2 Mon Sep 17 00:00:00 2001 From: NguyenHoangSon96 Date: Tue, 26 Nov 2024 16:28:58 +0700 Subject: [PATCH 11/23] feat: move on if type cast is fail --- .../com/influxdb/v3/client/PointValues.java | 11 + .../client/internal/InfluxDBClientImpl.java | 44 +--- .../v3/client/internal/TypeCasting.java | 58 +++++ .../internal/VectorSchemaRootConverter.java | 121 +++++++++- .../internal/VectorSchemaRootUtils.java | 216 ++++++++++++++++++ .../v3/client/InfluxDBClientTest.java | 19 +- .../VectorSchemaRootConverterTest.java | 178 +++++---------- 7 files changed, 471 insertions(+), 176 deletions(-) create mode 100644 src/main/java/com/influxdb/v3/client/internal/TypeCasting.java create mode 100644 src/main/java/com/influxdb/v3/client/internal/VectorSchemaRootUtils.java diff --git a/src/main/java/com/influxdb/v3/client/PointValues.java b/src/main/java/com/influxdb/v3/client/PointValues.java index 70c11afb..a4e598f5 100644 --- a/src/main/java/com/influxdb/v3/client/PointValues.java +++ b/src/main/java/com/influxdb/v3/client/PointValues.java @@ -435,6 +435,17 @@ public PointValues setField(@Nonnull final String field, @Nullable final Object return putField(field, value); } + /** + * Add a null field. + * + * @param field the field name + * @return this + */ + @Nonnull + public PointValues setNullField(@Nonnull final String field) { + return putField(field, null); + } + /** * Adds or replaces fields for this point. * diff --git a/src/main/java/com/influxdb/v3/client/internal/InfluxDBClientImpl.java b/src/main/java/com/influxdb/v3/client/internal/InfluxDBClientImpl.java index d845e102..2d4711b1 100644 --- a/src/main/java/com/influxdb/v3/client/internal/InfluxDBClientImpl.java +++ b/src/main/java/com/influxdb/v3/client/internal/InfluxDBClientImpl.java @@ -23,9 +23,7 @@ import java.io.ByteArrayOutputStream; import java.io.IOException; -import java.math.BigInteger; import java.nio.charset.StandardCharsets; -import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -42,7 +40,6 @@ import io.netty.handler.codec.http.HttpMethod; import org.apache.arrow.vector.FieldVector; import org.apache.arrow.vector.VectorSchemaRoot; -import org.apache.arrow.vector.util.Text; import com.influxdb.v3.client.InfluxDBApiException; import com.influxdb.v3.client.InfluxDBClient; @@ -184,46 +181,7 @@ public Stream query(@Nonnull final String query, @Nonnull final QueryOptions options) { return queryData(query, parameters, options) .flatMap(vector -> IntStream.range(0, vector.getRowCount()) - .mapToObj(rowNumber -> { - ArrayList row = new ArrayList<>(); - for (FieldVector fieldVector : vector.getFieldVectors()) { - var field = fieldVector.getField(); - var metaType = field.getMetadata().get("iox::column::type"); - String valueType = metaType != null ? metaType.split("::")[2] : null; - - Object value = fieldVector.getObject(rowNumber); - if ("field".equals(valueType)) { - switch (metaType) { - case "iox::column_type::field::integer": - case "iox::column_type::field::uinteger": - var intValue = (Long) value; - row.add(intValue); - break; - case "iox::column_type::field::float": - var doubleValue = (Double) value; - row.add(doubleValue); - break; - case "iox::column_type::field::string": - var textValue = (Text) value; - row.add(textValue.toString()); - break; - case "iox::column_type::field::boolean": - var boolValue = (Boolean) value; - row.add(boolValue); - break; - default: - } - } else if ("timestamp".equals(valueType) - || Objects.equals(field.getName(), "time")) { - BigInteger time = NanosecondConverter.getTimestampNano(value, field); - row.add(time); - } else { - row.add(value); - } - } - - return row.toArray(); - })); + .mapToObj(rowNumber -> VectorSchemaRootConverter.INSTANCE.getArrayObjectFromVectorSchemaRoot(vector, rowNumber))); } @Nonnull diff --git a/src/main/java/com/influxdb/v3/client/internal/TypeCasting.java b/src/main/java/com/influxdb/v3/client/internal/TypeCasting.java new file mode 100644 index 00000000..cae28016 --- /dev/null +++ b/src/main/java/com/influxdb/v3/client/internal/TypeCasting.java @@ -0,0 +1,58 @@ +package com.influxdb.v3.client.internal; + +import javax.annotation.Nonnull; + +import org.apache.arrow.vector.util.Text; + +/** + * Functions for safe type casting + */ +public final class TypeCasting { + + /** + * Safe casting to long value + * + * @param value object to cast + * @return long value + */ + public static long toLongValue(@Nonnull final Object value) { + + if (long.class.isAssignableFrom(value.getClass()) + || Long.class.isAssignableFrom(value.getClass())) { + return (long) value; + } + + return ((Number) value).longValue(); + } + + /** + * Safe casting to double value + * + * @param value object to cast + * @return double value + */ + public static double toDoubleValue(@Nonnull final Object value) { + + if (double.class.isAssignableFrom(value.getClass()) + || Double.class.isAssignableFrom(value.getClass())) { + return (double) value; + } + + return ((Number) value).doubleValue(); + } + + /** + * Safe casting to string value + * + * @param value object to cast + * @return string value + */ + public static String toStringValue(@Nonnull final Object value) { + + if (Text.class.isAssignableFrom(value.getClass())) { + return ((Text) value).toString(); + } + + return (String) value; + } +} diff --git a/src/main/java/com/influxdb/v3/client/internal/VectorSchemaRootConverter.java b/src/main/java/com/influxdb/v3/client/internal/VectorSchemaRootConverter.java index fd4ea683..1a64deda 100644 --- a/src/main/java/com/influxdb/v3/client/internal/VectorSchemaRootConverter.java +++ b/src/main/java/com/influxdb/v3/client/internal/VectorSchemaRootConverter.java @@ -21,9 +21,12 @@ */ package com.influxdb.v3.client.internal; +import java.math.BigInteger; import java.time.LocalDateTime; +import java.util.ArrayList; import java.util.List; import java.util.Objects; +import java.util.logging.Logger; import javax.annotation.Nonnull; import javax.annotation.concurrent.ThreadSafe; @@ -42,9 +45,11 @@ * This class is thread-safe. */ @ThreadSafe -final class VectorSchemaRootConverter { +public final class VectorSchemaRootConverter { - static final VectorSchemaRootConverter INSTANCE = new VectorSchemaRootConverter(); + private static final Logger LOG = Logger.getLogger(VectorSchemaRootConverter.class.getName()); + + public static final VectorSchemaRootConverter INSTANCE = new VectorSchemaRootConverter(); /** * Converts a given row of data from a VectorSchemaRoot object to PointValues. @@ -103,8 +108,16 @@ PointValues toPointValues(final int rowNumber, return p; } - private void setFieldWithMetaType(final PointValues p, - final String name, + /** + * Set field value for PointValues base on iox::column::type + * + * @param p The target PointValues + * @param fieldName Field name in PointValues + * @param value The value to be set + * @param metaType The iox::column::type column meta type, eg: iox::column_type::field::integer, iox::column_type::field::float + */ + public void setFieldWithMetaType(final PointValues p, + final String fieldName, final Object value, final String metaType) { if (value == null) { @@ -114,20 +127,110 @@ private void setFieldWithMetaType(final PointValues p, switch (metaType) { case "iox::column_type::field::integer": case "iox::column_type::field::uinteger": - p.setIntegerField(name, (Long) value); + if (value instanceof Long) { + p.setIntegerField(fieldName, TypeCasting.toLongValue(value)); + } else { + p.setNullField(fieldName); + LOG.warning(String.format("Value of %s is not an Integer", fieldName)); + } break; case "iox::column_type::field::float": - p.setFloatField(name, (Double) value); + if (value instanceof Double) { + p.setFloatField(fieldName, TypeCasting.toDoubleValue(value)); + } else { + p.setNullField(fieldName); + LOG.warning(String.format("Value of %s is not a Double", fieldName)); + } break; case "iox::column_type::field::string": - p.setStringField(name, (String) value); + if (value instanceof String || value instanceof Text) { + p.setStringField(fieldName, TypeCasting.toStringValue(value)); + } else { + p.setNullField(fieldName); + LOG.warning(String.format("Value of %s is not a String", fieldName)); + } break; case "iox::column_type::field::boolean": - p.setBooleanField(name, (Boolean) value); + if (value instanceof Boolean) { + p.setBooleanField(fieldName, (Boolean) value); + } else { + p.setNullField(fieldName); + LOG.warning(String.format("Value of %s is not a Boolean", fieldName)); + } break; default: - p.setField(name, value); + p.setField(fieldName, value); break; } } + + /** + * Get array of values from VectorSchemaRoot + * + * @param vector The data return from InfluxDB + * @param rowNumber The row number of data + * @return An array of Objects represent for a row of data + */ + public Object[] getArrayObjectFromVectorSchemaRoot(VectorSchemaRoot vector, int rowNumber) { + List row = new ArrayList<>(); + for (FieldVector fieldVector : vector.getFieldVectors()) { + var field = fieldVector.getField(); + var metaType = field.getMetadata().get("iox::column::type"); + String valueType = metaType != null ? metaType.split("::")[2] : null; + String fieldName = field.getName(); + + Object value = fieldVector.getObject(rowNumber); + if (value == null) { + row.add(null); + continue; + } + + if ("field".equals(valueType)) { + switch (metaType) { + case "iox::column_type::field::integer": + case "iox::column_type::field::uinteger": + if (value instanceof Long) { + row.add(TypeCasting.toLongValue(value)); + } else { + row.add(null); + LOG.warning(String.format("Value of %s is not an Integer", fieldName)); + } + break; + case "iox::column_type::field::float": + if (value instanceof Double) { + row.add(TypeCasting.toDoubleValue(value)); + } else { + row.add(null); + LOG.warning(String.format("Value of %s is not a Double", fieldName)); + } + break; + case "iox::column_type::field::string": + if (value instanceof Text || value instanceof String) { + row.add(TypeCasting.toStringValue(value)); + } else { + row.add(null); + LOG.warning(String.format("Value of %s is not a String", fieldName)); + } + break; + case "iox::column_type::field::boolean": + if (value instanceof Boolean) { + row.add((Boolean) value); + } else { + row.add(null); + LOG.warning(String.format("Value of %s is not a Boolean", fieldName)); + } + break; + default: + } + } else if ("timestamp".equals(valueType) + || Objects.equals(fieldName, "time")) { + BigInteger time = NanosecondConverter.getTimestampNano(value, field); + row.add(time); + } else { + row.add(value); + } + } + + return row.toArray(); + } } diff --git a/src/main/java/com/influxdb/v3/client/internal/VectorSchemaRootUtils.java b/src/main/java/com/influxdb/v3/client/internal/VectorSchemaRootUtils.java new file mode 100644 index 00000000..0cd4b3e3 --- /dev/null +++ b/src/main/java/com/influxdb/v3/client/internal/VectorSchemaRootUtils.java @@ -0,0 +1,216 @@ +package com.influxdb.v3.client.internal; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import javax.annotation.Nonnull; + +import org.apache.arrow.memory.RootAllocator; +import org.apache.arrow.vector.BigIntVector; +import org.apache.arrow.vector.BitVector; +import org.apache.arrow.vector.Float8Vector; +import org.apache.arrow.vector.TimeMilliVector; +import org.apache.arrow.vector.VarCharVector; +import org.apache.arrow.vector.VectorSchemaRoot; +import org.apache.arrow.vector.types.FloatingPointPrecision; +import org.apache.arrow.vector.types.TimeUnit; +import org.apache.arrow.vector.types.pojo.ArrowType; +import org.apache.arrow.vector.types.pojo.Field; +import org.apache.arrow.vector.types.pojo.FieldType; +import org.apache.arrow.vector.types.pojo.Schema; + +public final class VectorSchemaRootUtils { + + public static VectorSchemaRoot generateInvalidVectorSchemaRoot() { + Field testField = generateInvalidIntField("test_field"); + Field testField1 = generateInvalidUnsignedIntField("test_field1"); + Field testField2 = generateInvalidFloatField("test_field2"); + Field testField3 = generateInvalidStringField("test_field3"); + Field testField4 = generateInvalidBoolField("test_field4"); + + List fields = List.of(testField, + testField1, + testField2, + testField3, + testField4); + + VectorSchemaRoot root = initializeVectorSchemaRoot(fields.toArray(new Field[0])); + + VarCharVector intVector = (VarCharVector) root.getVector("test_field"); + intVector.allocateNew(); + intVector.set(0, "aaaa".getBytes()); + + VarCharVector uIntVector = (VarCharVector) root.getVector("test_field1"); + uIntVector.allocateNew(); + uIntVector.set(0, "aaaa".getBytes()); + + VarCharVector floatVector = (VarCharVector) root.getVector("test_field2"); + floatVector.allocateNew(); + floatVector.set(0, "aaaa".getBytes()); + + Float8Vector stringVector = (Float8Vector) root.getVector("test_field3"); + stringVector.allocateNew(); + stringVector.set(0, 100.2); + + VarCharVector booleanVector = (VarCharVector) root.getVector("test_field4"); + booleanVector.allocateNew(); + booleanVector.set(0, "aaa".getBytes()); + + return root; + } + + public static VectorSchemaRoot generateVectorSchemaRoot() { + Field measurementField = generateStringField("measurement"); + Field timeField = generateTimeField(); + Field memTotalField = generateIntField("mem_total"); + Field diskFreeField = generateUnsignedIntField("disk_free"); + Field temperatureField = generateFloatField("temperature"); + Field nameField = generateStringField("name"); + Field isActiveField = generateBoolField("isActive"); + List fields = List.of(measurementField, + timeField, + memTotalField, + diskFreeField, + temperatureField, + nameField, + isActiveField); + + VectorSchemaRoot root = initializeVectorSchemaRoot(fields.toArray(new Field[0])); + VarCharVector measurement = (VarCharVector) root.getVector("measurement"); + measurement.allocateNew(); + measurement.set(0, "host".getBytes()); + + TimeMilliVector timeVector = (TimeMilliVector) root.getVector("time"); + timeVector.allocateNew(); + timeVector.setSafe(0, 123_456); + + BigIntVector intVector = (BigIntVector) root.getVector("mem_total"); + intVector.allocateNew(); + intVector.set(0, 2048); + + BigIntVector unsignedIntVector = (BigIntVector) root.getVector("disk_free"); + unsignedIntVector.allocateNew(); + unsignedIntVector.set(0, 1_000_000); + + Float8Vector floatVector = (Float8Vector) root.getVector("temperature"); + floatVector.allocateNew(); + floatVector.set(0, 100.8766f); + + VarCharVector stringVector = (VarCharVector) root.getVector("name"); + stringVector.allocateNew(); + stringVector.setSafe(0, "intel".getBytes()); + + BitVector boolVector = (BitVector) root.getVector("isActive"); + boolVector.allocateNew(); + boolVector.setSafe(0, 1); + + return root; + } + + @Nonnull + public static VectorSchemaRoot initializeVectorSchemaRoot(@Nonnull final Field... fields) { + + Schema schema = new Schema(Arrays.asList(fields)); + + VectorSchemaRoot root = VectorSchemaRoot.create(schema, new RootAllocator(Long.MAX_VALUE)); + root.allocateNew(); + + return root; + } + + private static Field generateIntField(final String fieldName) { + Map metadata = new HashMap<>(); + metadata.put("iox::column::type", "iox::column_type::field::integer"); + FieldType intType = new FieldType(true, + new ArrowType.Int(64, true), + null, + metadata); + return new Field(fieldName, intType, null); + } + + private static Field generateInvalidIntField(final String fieldName) { + Map metadata = new HashMap<>(); + metadata.put("iox::column::type", "iox::column_type::field::integer"); + FieldType intType = new FieldType(true, + new ArrowType.Utf8(), + null, + metadata); + return new Field(fieldName, intType, null); + } + + private static Field generateUnsignedIntField(final String fieldName) { + Map metadata = new HashMap<>(); + metadata.put("iox::column::type", "iox::column_type::field::uinteger"); + FieldType intType = new FieldType(true, + new ArrowType.Int(64, true), + null, + metadata); + return new Field(fieldName, intType, null); + } + + private static Field generateInvalidUnsignedIntField(final String fieldName) { + Map metadata = new HashMap<>(); + metadata.put("iox::column::type", "iox::column_type::field::uinteger"); + FieldType intType = new FieldType(true, + new ArrowType.Utf8(), + null, + metadata); + return new Field(fieldName, intType, null); + } + + private static Field generateFloatField(final String fieldName) { + Map metadata = new HashMap<>(); + metadata.put("iox::column::type", "iox::column_type::field::float"); + FieldType floatType = new FieldType(true, + new ArrowType.FloatingPoint(FloatingPointPrecision.DOUBLE), + null, + metadata); + return new Field(fieldName, floatType, null); + } + + private static Field generateInvalidFloatField(final String fieldName) { + Map metadata = new HashMap<>(); + metadata.put("iox::column::type", "iox::column_type::field::float"); + FieldType floatType = new FieldType(true, + new ArrowType.Utf8(), + null, + metadata); + return new Field(fieldName, floatType, null); + } + + private static Field generateStringField(final String fieldName) { + Map metadata = new HashMap<>(); + metadata.put("iox::column::type", "iox::column_type::field::string"); + FieldType stringType = new FieldType(true, new ArrowType.Utf8(), null, metadata); + return new Field(fieldName, stringType, null); + } + + private static Field generateInvalidStringField(final String fieldName) { + Map metadata = new HashMap<>(); + metadata.put("iox::column::type", "iox::column_type::field::string"); + FieldType stringType = new FieldType(true, new ArrowType.FloatingPoint(FloatingPointPrecision.DOUBLE), null, metadata); + return new Field(fieldName, stringType, null); + } + + private static Field generateBoolField(final String fieldName) { + Map metadata = new HashMap<>(); + metadata.put("iox::column::type", "iox::column_type::field::boolean"); + FieldType boolType = new FieldType(true, new ArrowType.Bool(), null, metadata); + return new Field(fieldName, boolType, null); + } + + private static Field generateInvalidBoolField(final String fieldName) { + Map metadata = new HashMap<>(); + metadata.put("iox::column::type", "iox::column_type::field::boolean"); + FieldType boolType = new FieldType(true, new ArrowType.Utf8(), null, metadata); + return new Field(fieldName, boolType, null); + } + + private static Field generateTimeField() { + FieldType timeType = new FieldType(true, + new ArrowType.Time(TimeUnit.MILLISECOND, 32), + null); + return new Field("time", timeType, null); + } +} diff --git a/src/test/java/com/influxdb/v3/client/InfluxDBClientTest.java b/src/test/java/com/influxdb/v3/client/InfluxDBClientTest.java index d88758ec..027c73b0 100644 --- a/src/test/java/com/influxdb/v3/client/InfluxDBClientTest.java +++ b/src/test/java/com/influxdb/v3/client/InfluxDBClientTest.java @@ -28,14 +28,17 @@ import java.util.UUID; import java.util.stream.Stream; +import org.apache.arrow.vector.VectorSchemaRoot; import org.assertj.core.api.Assertions; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.condition.EnabledIfEnvironmentVariable; +import com.influxdb.v3.client.internal.VectorSchemaRootConverter; +import com.influxdb.v3.client.internal.VectorSchemaRootUtils; import com.influxdb.v3.client.write.WriteOptions; import com.influxdb.v3.client.write.WritePrecision; -class InfluxDBClientTest { +public class InfluxDBClientTest { @Test void requiredHost() { @@ -173,4 +176,18 @@ public void testQuery() throws Exception { } } } + + @Test + public void testParseQueryWithInvalidMetaData(){ + try(VectorSchemaRoot vector = VectorSchemaRootUtils.generateInvalidVectorSchemaRoot()) { + Object[] objects = VectorSchemaRootConverter.INSTANCE.getArrayObjectFromVectorSchemaRoot(vector, 0); + + Assertions.assertThat(objects[0]).isNull(); + Assertions.assertThat(objects[1]).isNull(); + Assertions.assertThat(objects[2]).isNull(); + Assertions.assertThat(objects[3]).isNull(); + Assertions.assertThat(objects[4]).isNull(); + } + } + } diff --git a/src/test/java/com/influxdb/v3/client/internal/VectorSchemaRootConverterTest.java b/src/test/java/com/influxdb/v3/client/internal/VectorSchemaRootConverterTest.java index f0b77c9b..bfec410c 100644 --- a/src/test/java/com/influxdb/v3/client/internal/VectorSchemaRootConverterTest.java +++ b/src/test/java/com/influxdb/v3/client/internal/VectorSchemaRootConverterTest.java @@ -23,29 +23,21 @@ import java.math.BigInteger; import java.nio.charset.StandardCharsets; -import java.util.Arrays; import java.util.HashMap; -import java.util.List; import java.util.Map; import javax.annotation.Nonnull; -import org.apache.arrow.memory.RootAllocator; import org.apache.arrow.vector.BaseFixedWidthVector; import org.apache.arrow.vector.BigIntVector; -import org.apache.arrow.vector.BitVector; -import org.apache.arrow.vector.Float8Vector; import org.apache.arrow.vector.TimeMilliVector; import org.apache.arrow.vector.TimeStampVector; import org.apache.arrow.vector.VarCharVector; import org.apache.arrow.vector.VectorSchemaRoot; -import org.apache.arrow.vector.types.FloatingPointPrecision; import org.apache.arrow.vector.types.TimeUnit; import org.apache.arrow.vector.types.pojo.ArrowType; import org.apache.arrow.vector.types.pojo.Field; import org.apache.arrow.vector.types.pojo.FieldType; -import org.apache.arrow.vector.types.pojo.Schema; import org.assertj.core.api.Assertions; -import org.jetbrains.annotations.NotNull; import org.junit.jupiter.api.Test; import com.influxdb.v3.client.PointValues; @@ -131,7 +123,7 @@ void timestampAsArrowInt() { @Test public void testConverterWithMetaType() { - try (VectorSchemaRoot root = generateVectorSchemaRoot()) { + try (VectorSchemaRoot root = VectorSchemaRootUtils.generateVectorSchemaRoot()) { PointValues pointValues = VectorSchemaRootConverter.INSTANCE.toPointValues(0, root, root.getFieldVectors()); String measurement = pointValues.getMeasurement(); @@ -163,6 +155,57 @@ public void testConverterWithMetaType() { } } + @Test + void setFieldWithMetaTypeInvalidCases() { + PointValues pointValues = PointValues.measurement("host"); + + VectorSchemaRootConverter vectorSchemaRootConverter = VectorSchemaRootConverter.INSTANCE; + + // Real value is not an Integer case + vectorSchemaRootConverter.setFieldWithMetaType( + pointValues, + "test_field", + "string", + "iox::column_type::field::integer" + ); + Assertions.assertThat(pointValues.getField("test_field")).isNull(); + + // Real value is not an unsigned Integer case + vectorSchemaRootConverter.setFieldWithMetaType( + pointValues, + "test_field", + "string", + "iox::column_type::field::uinteger" + ); + Assertions.assertThat(pointValues.getField("test_field")).isNull(); + + // Real value is not a Float case + vectorSchemaRootConverter.setFieldWithMetaType( + pointValues, + "test_field", + "string", + "iox::column_type::field::float" + ); + Assertions.assertThat(pointValues.getField("test_field")).isNull(); + + // Real value is not a String case + vectorSchemaRootConverter.setFieldWithMetaType( + pointValues, + "test_field", + 10.5, + "iox::column_type::field::string" + ); + Assertions.assertThat(pointValues.getField("test_field")).isNull(); + + // Real value is not a Boolean case + vectorSchemaRootConverter.setFieldWithMetaType( + pointValues, + "test_field", + 10.5, + "iox::column_type::field::boolean" + ); + Assertions.assertThat(pointValues.getField("test_field")).isNull(); + } @Test void timestampWithoutMetadataAndFieldWithoutMetadata() { @@ -172,7 +215,7 @@ void timestampWithoutMetadataAndFieldWithoutMetadata() { FieldType stringType = new FieldType(true, new ArrowType.Utf8(), null); Field field1 = new Field("field1", stringType, null); - try (VectorSchemaRoot root = initializeVectorSchemaRoot(timeField, field1)) { + try (VectorSchemaRoot root = VectorSchemaRootUtils.initializeVectorSchemaRoot(timeField, field1)) { // // set data @@ -204,7 +247,7 @@ public void measurementValue() { FieldType stringType = new FieldType(true, new ArrowType.Utf8(), null); Field measurementField = new Field("measurement", stringType, null); - try (VectorSchemaRoot root = initializeVectorSchemaRoot(measurementField)) { + try (VectorSchemaRoot root = VectorSchemaRootUtils.initializeVectorSchemaRoot(measurementField)) { // // set data @@ -263,117 +306,6 @@ private VectorSchemaRoot createVectorSchemaRoot(@Nonnull final ArrowType arrowTy FieldType timeType = new FieldType(true, arrowType, null, metadata); Field timeField = new Field("timestamp", timeType, null); - return initializeVectorSchemaRoot(timeField); - } - - @NotNull - private VectorSchemaRoot initializeVectorSchemaRoot(@Nonnull final Field... fields) { - - Schema schema = new Schema(Arrays.asList(fields)); - - VectorSchemaRoot root = VectorSchemaRoot.create(schema, new RootAllocator(Long.MAX_VALUE)); - root.allocateNew(); - - return root; - } - - public VectorSchemaRoot generateVectorSchemaRoot() { - Field measurementField = generateStringField("measurement"); - Field timeField = generateTimeField(); - Field memTotalField = generateIntField("mem_total"); - Field diskFreeField = generateUnsignedIntField("disk_free"); - Field temperatureField = generateFloatField("temperature"); - Field nameField = generateStringField("name"); - Field isActiveField = generateBoolField("isActive"); - List fields = List.of(measurementField, - timeField, - memTotalField, - diskFreeField, - temperatureField, - nameField, - isActiveField); - - VectorSchemaRoot root = initializeVectorSchemaRoot(fields.toArray(new Field[0])); - VarCharVector measurement = (VarCharVector) root.getVector("measurement"); - measurement.allocateNew(); - measurement.set(0, "host".getBytes()); - - TimeMilliVector timeVector = (TimeMilliVector) root.getVector("time"); - timeVector.allocateNew(); - timeVector.setSafe(0, 123_456); - - BigIntVector intVector = (BigIntVector) root.getVector("mem_total"); - intVector.allocateNew(); - intVector.set(0, 2048); - - BigIntVector unsignedIntVector = (BigIntVector) root.getVector("disk_free"); - unsignedIntVector.allocateNew(); - unsignedIntVector.set(0, 1_000_000); - - Float8Vector floatVector = (Float8Vector) root.getVector("temperature"); - floatVector.allocateNew(); - floatVector.set(0, 100.8766f); - - VarCharVector stringVector = (VarCharVector) root.getVector("name"); - stringVector.allocateNew(); - stringVector.setSafe(0, "intel".getBytes()); - - BitVector boolVector = (BitVector) root.getVector("isActive"); - boolVector.allocateNew(); - boolVector.setSafe(0, 1); - - return root; - } - - private Field generateIntField(final String fieldName) { - Map metadata = new HashMap<>(); - metadata.put("iox::column::type", "iox::column_type::field::integer"); - FieldType intType = new FieldType(true, - new ArrowType.Int(64, true), - null, - metadata); - return new Field(fieldName, intType, null); - } - - private Field generateUnsignedIntField(final String fieldName) { - Map metadata = new HashMap<>(); - metadata.put("iox::column::type", "iox::column_type::field::uinteger"); - FieldType intType = new FieldType(true, - new ArrowType.Int(64, true), - null, - metadata); - return new Field(fieldName, intType, null); - } - - private Field generateFloatField(final String fieldName) { - Map metadata = new HashMap<>(); - metadata.put("iox::column::type", "iox::column_type::field::float"); - FieldType floatType = new FieldType(true, - new ArrowType.FloatingPoint(FloatingPointPrecision.DOUBLE), - null, - metadata); - return new Field(fieldName, floatType, null); - } - - private Field generateStringField(final String fieldName) { - Map metadata = new HashMap<>(); - metadata.put("iox::column::type", "iox::column_type::field::string"); - FieldType stringType = new FieldType(true, new ArrowType.Utf8(), null, metadata); - return new Field(fieldName, stringType, null); + return VectorSchemaRootUtils.initializeVectorSchemaRoot(timeField); } - - private Field generateBoolField(final String fieldName) { - Map metadata = new HashMap<>(); - metadata.put("iox::column::type", "iox::column_type::field::boolean"); - FieldType boolType = new FieldType(true, new ArrowType.Bool(), null, metadata); - return new Field(fieldName, boolType, null); - } - - private Field generateTimeField() { - FieldType timeType = new FieldType(true, - new ArrowType.Time(TimeUnit.MILLISECOND, 32), - null); - return new Field("time", timeType, null); - } - } From 24bfd5e813095f5db7d64307c08a867fa640eff5 Mon Sep 17 00:00:00 2001 From: NguyenHoangSon96 Date: Tue, 26 Nov 2024 16:52:18 +0700 Subject: [PATCH 12/23] fix: linter --- .../influxdb/v3/client/internal/TypeCasting.java | 6 +++--- .../client/internal/VectorSchemaRootConverter.java | 13 +++++++------ .../v3/client/internal/VectorSchemaRootUtils.java | 8 +++++++- .../com/influxdb/v3/client/InfluxDBClientTest.java | 4 ++-- 4 files changed, 19 insertions(+), 12 deletions(-) diff --git a/src/main/java/com/influxdb/v3/client/internal/TypeCasting.java b/src/main/java/com/influxdb/v3/client/internal/TypeCasting.java index cae28016..5463a2ca 100644 --- a/src/main/java/com/influxdb/v3/client/internal/TypeCasting.java +++ b/src/main/java/com/influxdb/v3/client/internal/TypeCasting.java @@ -10,7 +10,7 @@ public final class TypeCasting { /** - * Safe casting to long value + * Safe casting to long value. * * @param value object to cast * @return long value @@ -26,7 +26,7 @@ public static long toLongValue(@Nonnull final Object value) { } /** - * Safe casting to double value + * Safe casting to double value. * * @param value object to cast * @return double value @@ -42,7 +42,7 @@ public static double toDoubleValue(@Nonnull final Object value) { } /** - * Safe casting to string value + * Safe casting to string value. * * @param value object to cast * @return string value diff --git a/src/main/java/com/influxdb/v3/client/internal/VectorSchemaRootConverter.java b/src/main/java/com/influxdb/v3/client/internal/VectorSchemaRootConverter.java index 1a64deda..b6915570 100644 --- a/src/main/java/com/influxdb/v3/client/internal/VectorSchemaRootConverter.java +++ b/src/main/java/com/influxdb/v3/client/internal/VectorSchemaRootConverter.java @@ -109,12 +109,13 @@ PointValues toPointValues(final int rowNumber, } /** - * Set field value for PointValues base on iox::column::type + * Set field value for PointValues base on iox::column::type. * - * @param p The target PointValues + * @param p The target PointValues. * @param fieldName Field name in PointValues * @param value The value to be set - * @param metaType The iox::column::type column meta type, eg: iox::column_type::field::integer, iox::column_type::field::float + * @param metaType The iox::column::type column meta type, + * eg: iox::column_type::field::integer, iox::column_type::field::float */ public void setFieldWithMetaType(final PointValues p, final String fieldName, @@ -165,13 +166,13 @@ public void setFieldWithMetaType(final PointValues p, } /** - * Get array of values from VectorSchemaRoot + * Get array of values from VectorSchemaRoot. * - * @param vector The data return from InfluxDB + * @param vector The data return from InfluxDB. * @param rowNumber The row number of data * @return An array of Objects represent for a row of data */ - public Object[] getArrayObjectFromVectorSchemaRoot(VectorSchemaRoot vector, int rowNumber) { + public Object[] getArrayObjectFromVectorSchemaRoot(final VectorSchemaRoot vector, final int rowNumber) { List row = new ArrayList<>(); for (FieldVector fieldVector : vector.getFieldVectors()) { var field = fieldVector.getField(); diff --git a/src/main/java/com/influxdb/v3/client/internal/VectorSchemaRootUtils.java b/src/main/java/com/influxdb/v3/client/internal/VectorSchemaRootUtils.java index 0cd4b3e3..2d5b0e0d 100644 --- a/src/main/java/com/influxdb/v3/client/internal/VectorSchemaRootUtils.java +++ b/src/main/java/com/influxdb/v3/client/internal/VectorSchemaRootUtils.java @@ -22,6 +22,8 @@ public final class VectorSchemaRootUtils { + private VectorSchemaRootUtils() {} + public static VectorSchemaRoot generateInvalidVectorSchemaRoot() { Field testField = generateInvalidIntField("test_field"); Field testField1 = generateInvalidUnsignedIntField("test_field1"); @@ -189,7 +191,11 @@ private static Field generateStringField(final String fieldName) { private static Field generateInvalidStringField(final String fieldName) { Map metadata = new HashMap<>(); metadata.put("iox::column::type", "iox::column_type::field::string"); - FieldType stringType = new FieldType(true, new ArrowType.FloatingPoint(FloatingPointPrecision.DOUBLE), null, metadata); + FieldType stringType = new FieldType(true, + new ArrowType.FloatingPoint( + FloatingPointPrecision.DOUBLE), + null, + metadata); return new Field(fieldName, stringType, null); } diff --git a/src/test/java/com/influxdb/v3/client/InfluxDBClientTest.java b/src/test/java/com/influxdb/v3/client/InfluxDBClientTest.java index 027c73b0..ef873540 100644 --- a/src/test/java/com/influxdb/v3/client/InfluxDBClientTest.java +++ b/src/test/java/com/influxdb/v3/client/InfluxDBClientTest.java @@ -178,8 +178,8 @@ public void testQuery() throws Exception { } @Test - public void testParseQueryWithInvalidMetaData(){ - try(VectorSchemaRoot vector = VectorSchemaRootUtils.generateInvalidVectorSchemaRoot()) { + public void testParseQueryWithInvalidMetaData() { + try (VectorSchemaRoot vector = VectorSchemaRootUtils.generateInvalidVectorSchemaRoot()) { Object[] objects = VectorSchemaRootConverter.INSTANCE.getArrayObjectFromVectorSchemaRoot(vector, 0); Assertions.assertThat(objects[0]).isNull(); From 169944371ce4528765f86690008e69a3e2104ac1 Mon Sep 17 00:00:00 2001 From: NguyenHoangSon96 Date: Tue, 26 Nov 2024 16:58:48 +0700 Subject: [PATCH 13/23] fix: linter --- .../com/influxdb/v3/client/internal/InfluxDBClientImpl.java | 4 +++- .../java/com/influxdb/v3/client/internal/TypeCasting.java | 4 +++- .../influxdb/v3/client/internal/VectorSchemaRootUtils.java | 2 +- 3 files changed, 7 insertions(+), 3 deletions(-) diff --git a/src/main/java/com/influxdb/v3/client/internal/InfluxDBClientImpl.java b/src/main/java/com/influxdb/v3/client/internal/InfluxDBClientImpl.java index 2d4711b1..81c51f61 100644 --- a/src/main/java/com/influxdb/v3/client/internal/InfluxDBClientImpl.java +++ b/src/main/java/com/influxdb/v3/client/internal/InfluxDBClientImpl.java @@ -181,7 +181,9 @@ public Stream query(@Nonnull final String query, @Nonnull final QueryOptions options) { return queryData(query, parameters, options) .flatMap(vector -> IntStream.range(0, vector.getRowCount()) - .mapToObj(rowNumber -> VectorSchemaRootConverter.INSTANCE.getArrayObjectFromVectorSchemaRoot(vector, rowNumber))); + .mapToObj(rowNumber -> + VectorSchemaRootConverter.INSTANCE + .getArrayObjectFromVectorSchemaRoot(vector, rowNumber))); } @Nonnull diff --git a/src/main/java/com/influxdb/v3/client/internal/TypeCasting.java b/src/main/java/com/influxdb/v3/client/internal/TypeCasting.java index 5463a2ca..d7cbb43f 100644 --- a/src/main/java/com/influxdb/v3/client/internal/TypeCasting.java +++ b/src/main/java/com/influxdb/v3/client/internal/TypeCasting.java @@ -5,10 +5,12 @@ import org.apache.arrow.vector.util.Text; /** - * Functions for safe type casting + * Functions for safe type casting. */ public final class TypeCasting { + private TypeCasting() { } + /** * Safe casting to long value. * diff --git a/src/main/java/com/influxdb/v3/client/internal/VectorSchemaRootUtils.java b/src/main/java/com/influxdb/v3/client/internal/VectorSchemaRootUtils.java index 2d5b0e0d..bbd8d027 100644 --- a/src/main/java/com/influxdb/v3/client/internal/VectorSchemaRootUtils.java +++ b/src/main/java/com/influxdb/v3/client/internal/VectorSchemaRootUtils.java @@ -22,7 +22,7 @@ public final class VectorSchemaRootUtils { - private VectorSchemaRootUtils() {} + private VectorSchemaRootUtils() { } public static VectorSchemaRoot generateInvalidVectorSchemaRoot() { Field testField = generateInvalidIntField("test_field"); From 0fdf8e06b980a85abf218b23a433f03399cbb3ab Mon Sep 17 00:00:00 2001 From: NguyenHoangSon96 Date: Tue, 26 Nov 2024 17:00:54 +0700 Subject: [PATCH 14/23] fix: linter --- .../client/internal/InfluxDBClientImpl.java | 5 ++++- .../v3/client/internal/TypeCasting.java | 21 +++++++++++++++++++ .../internal/VectorSchemaRootUtils.java | 21 +++++++++++++++++++ 3 files changed, 46 insertions(+), 1 deletion(-) diff --git a/src/main/java/com/influxdb/v3/client/internal/InfluxDBClientImpl.java b/src/main/java/com/influxdb/v3/client/internal/InfluxDBClientImpl.java index 81c51f61..c8155276 100644 --- a/src/main/java/com/influxdb/v3/client/internal/InfluxDBClientImpl.java +++ b/src/main/java/com/influxdb/v3/client/internal/InfluxDBClientImpl.java @@ -183,7 +183,10 @@ public Stream query(@Nonnull final String query, .flatMap(vector -> IntStream.range(0, vector.getRowCount()) .mapToObj(rowNumber -> VectorSchemaRootConverter.INSTANCE - .getArrayObjectFromVectorSchemaRoot(vector, rowNumber))); + .getArrayObjectFromVectorSchemaRoot( + vector, + rowNumber + ))); } @Nonnull diff --git a/src/main/java/com/influxdb/v3/client/internal/TypeCasting.java b/src/main/java/com/influxdb/v3/client/internal/TypeCasting.java index d7cbb43f..8f52f5e7 100644 --- a/src/main/java/com/influxdb/v3/client/internal/TypeCasting.java +++ b/src/main/java/com/influxdb/v3/client/internal/TypeCasting.java @@ -1,3 +1,24 @@ +/* + * The MIT License + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + * THE SOFTWARE. + */ package com.influxdb.v3.client.internal; import javax.annotation.Nonnull; diff --git a/src/main/java/com/influxdb/v3/client/internal/VectorSchemaRootUtils.java b/src/main/java/com/influxdb/v3/client/internal/VectorSchemaRootUtils.java index bbd8d027..fbd1a1c5 100644 --- a/src/main/java/com/influxdb/v3/client/internal/VectorSchemaRootUtils.java +++ b/src/main/java/com/influxdb/v3/client/internal/VectorSchemaRootUtils.java @@ -1,3 +1,24 @@ +/* + * The MIT License + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + * THE SOFTWARE. + */ package com.influxdb.v3.client.internal; import java.util.Arrays; From c4bb8ab6fa03773900b522647d8349aa83d4e4fe Mon Sep 17 00:00:00 2001 From: NguyenHoangSon96 Date: Thu, 28 Nov 2024 15:38:59 +0700 Subject: [PATCH 15/23] feat: add getMappedValue function --- .../com/influxdb/v3/client/PointValues.java | 11 -- .../client/internal/InfluxDBClientImpl.java | 2 +- .../client/internal/NanosecondConverter.java | 11 +- .../v3/client/internal/TypeCasting.java | 2 +- .../internal/VectorSchemaRootConverter.java | 175 +++++++----------- .../v3/client/InfluxDBClientTest.java | 17 -- .../v3/client/internal/TypeCastingTest.java | 57 ++++++ .../VectorSchemaRootConverterTest.java | 168 ++++++++++------- .../internal/VectorSchemaRootUtils.java | 22 +-- 9 files changed, 243 insertions(+), 222 deletions(-) create mode 100644 src/test/java/com/influxdb/v3/client/internal/TypeCastingTest.java rename src/{main => test}/java/com/influxdb/v3/client/internal/VectorSchemaRootUtils.java (93%) diff --git a/src/main/java/com/influxdb/v3/client/PointValues.java b/src/main/java/com/influxdb/v3/client/PointValues.java index a4e598f5..70c11afb 100644 --- a/src/main/java/com/influxdb/v3/client/PointValues.java +++ b/src/main/java/com/influxdb/v3/client/PointValues.java @@ -435,17 +435,6 @@ public PointValues setField(@Nonnull final String field, @Nullable final Object return putField(field, value); } - /** - * Add a null field. - * - * @param field the field name - * @return this - */ - @Nonnull - public PointValues setNullField(@Nonnull final String field) { - return putField(field, null); - } - /** * Adds or replaces fields for this point. * diff --git a/src/main/java/com/influxdb/v3/client/internal/InfluxDBClientImpl.java b/src/main/java/com/influxdb/v3/client/internal/InfluxDBClientImpl.java index c8155276..6d67b7d0 100644 --- a/src/main/java/com/influxdb/v3/client/internal/InfluxDBClientImpl.java +++ b/src/main/java/com/influxdb/v3/client/internal/InfluxDBClientImpl.java @@ -218,7 +218,7 @@ public Stream queryPoints(@Nonnull final String query, return IntStream .range(0, vector.getRowCount()) .mapToObj(row -> - VectorSchemaRootConverter.INSTANCE.toPointValues(row, vector, fieldVectors)); + VectorSchemaRootConverter.INSTANCE.toPointValues(row, fieldVectors)); }); } diff --git a/src/main/java/com/influxdb/v3/client/internal/NanosecondConverter.java b/src/main/java/com/influxdb/v3/client/internal/NanosecondConverter.java index 770cec4c..56d9a210 100644 --- a/src/main/java/com/influxdb/v3/client/internal/NanosecondConverter.java +++ b/src/main/java/com/influxdb/v3/client/internal/NanosecondConverter.java @@ -150,23 +150,20 @@ public static BigInteger getTimestampNano(@Nonnull final Object value, @Nonnull } long nanoseconds = TimeUnit.NANOSECONDS.convert((Long) value, timeUnit); Instant instant = Instant.ofEpochSecond(0, nanoseconds); - result = convertInstantToNano(instant, WritePrecision.NS); + result = convertInstantToNano(instant); } else { Instant instant = Instant.ofEpochMilli((Long) value); - result = convertInstantToNano(instant, WritePrecision.NS); + result = convertInstantToNano(instant); } } else if (value instanceof LocalDateTime) { Instant instant = ((LocalDateTime) value).toInstant(ZoneOffset.UTC); - result = convertInstantToNano(instant, WritePrecision.NS); + result = convertInstantToNano(instant); } return result; } - private static BigInteger convertInstantToNano(final Instant instant, final WritePrecision precision) { + private static BigInteger convertInstantToNano(final Instant instant) { var writePrecision = WritePrecision.NS; - if (precision != null) { - writePrecision = precision; - } BigInteger convertedTime = NanosecondConverter.convert(instant, writePrecision); return NanosecondConverter.convertToNanos(convertedTime, writePrecision); } diff --git a/src/main/java/com/influxdb/v3/client/internal/TypeCasting.java b/src/main/java/com/influxdb/v3/client/internal/TypeCasting.java index 8f52f5e7..981a3811 100644 --- a/src/main/java/com/influxdb/v3/client/internal/TypeCasting.java +++ b/src/main/java/com/influxdb/v3/client/internal/TypeCasting.java @@ -73,7 +73,7 @@ public static double toDoubleValue(@Nonnull final Object value) { public static String toStringValue(@Nonnull final Object value) { if (Text.class.isAssignableFrom(value.getClass())) { - return ((Text) value).toString(); + return value.toString(); } return (String) value; diff --git a/src/main/java/com/influxdb/v3/client/internal/VectorSchemaRootConverter.java b/src/main/java/com/influxdb/v3/client/internal/VectorSchemaRootConverter.java index b6915570..3b6a13d6 100644 --- a/src/main/java/com/influxdb/v3/client/internal/VectorSchemaRootConverter.java +++ b/src/main/java/com/influxdb/v3/client/internal/VectorSchemaRootConverter.java @@ -32,6 +32,7 @@ import org.apache.arrow.vector.FieldVector; import org.apache.arrow.vector.VectorSchemaRoot; +import org.apache.arrow.vector.types.pojo.Field; import org.apache.arrow.vector.util.Text; import com.influxdb.v3.client.PointValues; @@ -55,39 +56,37 @@ public final class VectorSchemaRootConverter { * Converts a given row of data from a VectorSchemaRoot object to PointValues. * * @param rowNumber the index of the row to be converted - * @param vector the VectorSchemaRoot object containing the data * @param fieldVectors the list of FieldVector objects representing the data columns * @return the converted PointValues object */ @Nonnull PointValues toPointValues(final int rowNumber, - @Nonnull final VectorSchemaRoot vector, @Nonnull final List fieldVectors) { PointValues p = new PointValues(); for (FieldVector fieldVector : fieldVectors) { var field = fieldVector.getField(); var value = fieldVector.getObject(rowNumber); - var name = field.getName(); + var fieldName = field.getName(); var metaType = field.getMetadata().get("iox::column::type"); if (value instanceof Text) { value = value.toString(); } - if ((Objects.equals(name, "measurement") - || Objects.equals(name, "iox::measurement")) + if ((Objects.equals(fieldName, "measurement") + || Objects.equals(fieldName, "iox::measurement")) && value instanceof String) { p.setMeasurement((String) value); continue; } if (metaType == null) { - if (Objects.equals(name, "time") && (value instanceof Long || value instanceof LocalDateTime)) { + if (Objects.equals(fieldName, "time") && (value instanceof Long || value instanceof LocalDateTime)) { var timeNano = NanosecondConverter.getTimestampNano(value, field); p.setTimestamp(timeNano, WritePrecision.NS); } else { // just push as field If you don't know what type is it - p.setField(name, value); + p.setField(fieldName, value); } continue; @@ -97,71 +96,68 @@ PointValues toPointValues(final int rowNumber, String valueType = parts[2]; if ("field".equals(valueType)) { - setFieldWithMetaType(p, name, value, metaType); + var fieldValue = getMappedValue(valueType, metaType, value, fieldName, field); + p.setField(fieldName, fieldValue); } else if ("tag".equals(valueType) && value instanceof String) { - p.setTag(name, (String) value); + var tag = (String) getMappedValue(valueType, metaType, value, fieldName, field); + p.setTag(fieldName, tag); } else if ("timestamp".equals(valueType)) { - var timeNano = NanosecondConverter.getTimestampNano(value, field); + var timeNano = (BigInteger) getMappedValue(valueType, metaType, value, fieldName, field); p.setTimestamp(timeNano, WritePrecision.NS); } } return p; } - /** - * Set field value for PointValues base on iox::column::type. - * - * @param p The target PointValues. - * @param fieldName Field name in PointValues - * @param value The value to be set - * @param metaType The iox::column::type column meta type, - * eg: iox::column_type::field::integer, iox::column_type::field::float - */ - public void setFieldWithMetaType(final PointValues p, - final String fieldName, - final Object value, - final String metaType) { + public Object getMappedValue(final String valueType, + final String metaType, + final Object value, + final String fieldName, + final Field field) { if (value == null) { - return; + return null; } - switch (metaType) { - case "iox::column_type::field::integer": - case "iox::column_type::field::uinteger": - if (value instanceof Long) { - p.setIntegerField(fieldName, TypeCasting.toLongValue(value)); - } else { - p.setNullField(fieldName); - LOG.warning(String.format("Value of %s is not an Integer", fieldName)); - } - break; - case "iox::column_type::field::float": - if (value instanceof Double) { - p.setFloatField(fieldName, TypeCasting.toDoubleValue(value)); - } else { - p.setNullField(fieldName); - LOG.warning(String.format("Value of %s is not a Double", fieldName)); - } - break; - case "iox::column_type::field::string": - if (value instanceof String || value instanceof Text) { - p.setStringField(fieldName, TypeCasting.toStringValue(value)); - } else { - p.setNullField(fieldName); - LOG.warning(String.format("Value of %s is not a String", fieldName)); - } - break; - case "iox::column_type::field::boolean": - if (value instanceof Boolean) { - p.setBooleanField(fieldName, (Boolean) value); - } else { - p.setNullField(fieldName); - LOG.warning(String.format("Value of %s is not a Boolean", fieldName)); - } - break; - default: - p.setField(fieldName, value); - break; + if ("field".equals(valueType)) { + switch (metaType) { + case "iox::column_type::field::integer": + case "iox::column_type::field::uinteger": + if (value instanceof Number) { + return TypeCasting.toLongValue(value); + } else { + LOG.warning(String.format("Value of %s is not an Integer", fieldName)); + return value; + } + case "iox::column_type::field::float": + if (value instanceof Number) { + return TypeCasting.toDoubleValue(value); + } else { + LOG.warning(String.format("Value of %s is not a Double", fieldName)); + return value; + } + case "iox::column_type::field::string": + if (value instanceof Text || value instanceof String) { + return TypeCasting.toStringValue(value); + } else { + LOG.warning(String.format("Value of %s is not a String", fieldName)); + return value; + } + case "iox::column_type::field::boolean": + if (value instanceof Boolean) { + return (Boolean) value; + } else { + LOG.warning(String.format("Value of %s is not a Boolean", fieldName)); + return value; + } + default: + return value; + } + } else if ("timestamp".equals(valueType) || Objects.equals(fieldName, "time")) { + return NanosecondConverter.getTimestampNano(value, field); + } else if ("tag".equals(valueType)) { + return TypeCasting.toStringValue(value); + } else { + return value; } } @@ -178,58 +174,13 @@ public Object[] getArrayObjectFromVectorSchemaRoot(final VectorSchemaRoot vector var field = fieldVector.getField(); var metaType = field.getMetadata().get("iox::column::type"); String valueType = metaType != null ? metaType.split("::")[2] : null; - String fieldName = field.getName(); - Object value = fieldVector.getObject(rowNumber); - if (value == null) { - row.add(null); - continue; - } - - if ("field".equals(valueType)) { - switch (metaType) { - case "iox::column_type::field::integer": - case "iox::column_type::field::uinteger": - if (value instanceof Long) { - row.add(TypeCasting.toLongValue(value)); - } else { - row.add(null); - LOG.warning(String.format("Value of %s is not an Integer", fieldName)); - } - break; - case "iox::column_type::field::float": - if (value instanceof Double) { - row.add(TypeCasting.toDoubleValue(value)); - } else { - row.add(null); - LOG.warning(String.format("Value of %s is not a Double", fieldName)); - } - break; - case "iox::column_type::field::string": - if (value instanceof Text || value instanceof String) { - row.add(TypeCasting.toStringValue(value)); - } else { - row.add(null); - LOG.warning(String.format("Value of %s is not a String", fieldName)); - } - break; - case "iox::column_type::field::boolean": - if (value instanceof Boolean) { - row.add((Boolean) value); - } else { - row.add(null); - LOG.warning(String.format("Value of %s is not a Boolean", fieldName)); - } - break; - default: - } - } else if ("timestamp".equals(valueType) - || Objects.equals(fieldName, "time")) { - BigInteger time = NanosecondConverter.getTimestampNano(value, field); - row.add(time); - } else { - row.add(value); - } + var value = getMappedValue(valueType, + metaType, + fieldVector.getObject(rowNumber), + field.getName(), + field); + row.add(value); } return row.toArray(); diff --git a/src/test/java/com/influxdb/v3/client/InfluxDBClientTest.java b/src/test/java/com/influxdb/v3/client/InfluxDBClientTest.java index ef873540..f1e868a2 100644 --- a/src/test/java/com/influxdb/v3/client/InfluxDBClientTest.java +++ b/src/test/java/com/influxdb/v3/client/InfluxDBClientTest.java @@ -28,13 +28,10 @@ import java.util.UUID; import java.util.stream.Stream; -import org.apache.arrow.vector.VectorSchemaRoot; import org.assertj.core.api.Assertions; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.condition.EnabledIfEnvironmentVariable; -import com.influxdb.v3.client.internal.VectorSchemaRootConverter; -import com.influxdb.v3.client.internal.VectorSchemaRootUtils; import com.influxdb.v3.client.write.WriteOptions; import com.influxdb.v3.client.write.WritePrecision; @@ -176,18 +173,4 @@ public void testQuery() throws Exception { } } } - - @Test - public void testParseQueryWithInvalidMetaData() { - try (VectorSchemaRoot vector = VectorSchemaRootUtils.generateInvalidVectorSchemaRoot()) { - Object[] objects = VectorSchemaRootConverter.INSTANCE.getArrayObjectFromVectorSchemaRoot(vector, 0); - - Assertions.assertThat(objects[0]).isNull(); - Assertions.assertThat(objects[1]).isNull(); - Assertions.assertThat(objects[2]).isNull(); - Assertions.assertThat(objects[3]).isNull(); - Assertions.assertThat(objects[4]).isNull(); - } - } - } diff --git a/src/test/java/com/influxdb/v3/client/internal/TypeCastingTest.java b/src/test/java/com/influxdb/v3/client/internal/TypeCastingTest.java new file mode 100644 index 00000000..5b77fe6d --- /dev/null +++ b/src/test/java/com/influxdb/v3/client/internal/TypeCastingTest.java @@ -0,0 +1,57 @@ +/* + * The MIT License + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + * THE SOFTWARE. + */ +package com.influxdb.v3.client.internal; + +import org.apache.arrow.vector.util.Text; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +public class TypeCastingTest { + + @Test + void testToLongValue() { + Assertions.assertEquals(1, TypeCasting.toLongValue(1)); + Assertions.assertEquals(1.0, TypeCasting.toLongValue(1.23)); + + Assertions.assertThrows(ClassCastException.class, + () -> TypeCasting.toLongValue("1")); + } + + @Test + void testToDoubleValue() { + Assertions.assertEquals(1.23, TypeCasting.toDoubleValue(1.23)); + Assertions.assertEquals(1.0, TypeCasting.toDoubleValue(1)); + + Assertions.assertThrows(ClassCastException.class, + () -> TypeCasting.toDoubleValue("1.2")); + } + + @Test + void testToStringValue() { + Assertions.assertEquals("test", TypeCasting.toStringValue("test")); + Assertions.assertEquals("test", + TypeCasting.toStringValue(new Text("test"))); + + Assertions.assertThrows(ClassCastException.class, + () -> TypeCasting.toStringValue(1)); + } +} diff --git a/src/test/java/com/influxdb/v3/client/internal/VectorSchemaRootConverterTest.java b/src/test/java/com/influxdb/v3/client/internal/VectorSchemaRootConverterTest.java index bfec410c..0aaad4f9 100644 --- a/src/test/java/com/influxdb/v3/client/internal/VectorSchemaRootConverterTest.java +++ b/src/test/java/com/influxdb/v3/client/internal/VectorSchemaRootConverterTest.java @@ -48,7 +48,7 @@ class VectorSchemaRootConverterTest { void timestampAsArrowTime() { try (VectorSchemaRoot root = createTimeVector(1234, new ArrowType.Time(TimeUnit.MILLISECOND, 32))) { - PointValues pointValues = VectorSchemaRootConverter.INSTANCE.toPointValues(0, root, root.getFieldVectors()); + PointValues pointValues = VectorSchemaRootConverter.INSTANCE.toPointValues(0, root.getFieldVectors()); BigInteger expected = BigInteger.valueOf(1_234 * 1_000_000); Assertions.assertThat((BigInteger) pointValues.getTimestamp()).isEqualByComparingTo(expected); @@ -59,7 +59,7 @@ void timestampAsArrowTime() { void timestampAsArrowTimestampSecond() { try (VectorSchemaRoot root = createTimeVector(45_678, new ArrowType.Timestamp(TimeUnit.SECOND, "UTC"))) { - PointValues pointValues = VectorSchemaRootConverter.INSTANCE.toPointValues(0, root, root.getFieldVectors()); + PointValues pointValues = VectorSchemaRootConverter.INSTANCE.toPointValues(0, root.getFieldVectors()); BigInteger expected = BigInteger.valueOf(45_678L * 1_000_000_000); Assertions.assertThat((BigInteger) pointValues.getTimestamp()).isEqualByComparingTo(expected); @@ -70,7 +70,7 @@ void timestampAsArrowTimestampSecond() { void timestampAsArrowTimestampMillisecond() { try (VectorSchemaRoot root = createTimeVector(45_678, new ArrowType.Timestamp(TimeUnit.MILLISECOND, "UTC"))) { - PointValues pointValues = VectorSchemaRootConverter.INSTANCE.toPointValues(0, root, root.getFieldVectors()); + PointValues pointValues = VectorSchemaRootConverter.INSTANCE.toPointValues(0, root.getFieldVectors()); BigInteger expected = BigInteger.valueOf(45_678L * 1_000_000); Assertions.assertThat((BigInteger) pointValues.getTimestamp()).isEqualByComparingTo(expected); @@ -81,7 +81,7 @@ void timestampAsArrowTimestampMillisecond() { void timestampAsArrowTimestampMicrosecond() { try (VectorSchemaRoot root = createTimeVector(45_678, new ArrowType.Timestamp(TimeUnit.MICROSECOND, "UTC"))) { - PointValues pointValues = VectorSchemaRootConverter.INSTANCE.toPointValues(0, root, root.getFieldVectors()); + PointValues pointValues = VectorSchemaRootConverter.INSTANCE.toPointValues(0, root.getFieldVectors()); BigInteger expected = BigInteger.valueOf(45_678L * 1_000); Assertions.assertThat((BigInteger) pointValues.getTimestamp()).isEqualByComparingTo(expected); @@ -92,7 +92,7 @@ void timestampAsArrowTimestampMicrosecond() { void timestampAsArrowTimestampNanosecond() { try (VectorSchemaRoot root = createTimeVector(45_678, new ArrowType.Timestamp(TimeUnit.NANOSECOND, "UTC"))) { - PointValues pointValues = VectorSchemaRootConverter.INSTANCE.toPointValues(0, root, root.getFieldVectors()); + PointValues pointValues = VectorSchemaRootConverter.INSTANCE.toPointValues(0, root.getFieldVectors()); BigInteger expected = BigInteger.valueOf(45_678L); Assertions.assertThat((BigInteger) pointValues.getTimestamp()).isEqualByComparingTo(expected); @@ -103,7 +103,7 @@ void timestampAsArrowTimestampNanosecond() { void timestampAsArrowTimestampNanosecondWithoutTimezone() { try (VectorSchemaRoot root = createTimeVector(45_978, new ArrowType.Timestamp(TimeUnit.NANOSECOND, null))) { - PointValues pointValues = VectorSchemaRootConverter.INSTANCE.toPointValues(0, root, root.getFieldVectors()); + PointValues pointValues = VectorSchemaRootConverter.INSTANCE.toPointValues(0, root.getFieldVectors()); BigInteger expected = BigInteger.valueOf(45_978L); Assertions.assertThat((BigInteger) pointValues.getTimestamp()).isEqualByComparingTo(expected); @@ -114,17 +114,113 @@ void timestampAsArrowTimestampNanosecondWithoutTimezone() { void timestampAsArrowInt() { try (VectorSchemaRoot root = createTimeVector(45_678, new ArrowType.Int(64, true))) { - PointValues pointValues = VectorSchemaRootConverter.INSTANCE.toPointValues(0, root, root.getFieldVectors()); + PointValues pointValues = VectorSchemaRootConverter.INSTANCE.toPointValues(0, root.getFieldVectors()); BigInteger expected = BigInteger.valueOf(45_678L * 1_000_000); Assertions.assertThat((BigInteger) pointValues.getTimestamp()).isEqualByComparingTo(expected); } } + @Test + void getMappedValueValidMetaDataInteger() { + Field field = VectorSchemaRootUtils.generateIntField("test"); + String metaType = field.getMetadata().get("iox::column::type"); + Long value = 1L; + String fieldName = field.getName(); + Object mappedValue = VectorSchemaRootConverter.INSTANCE + .getMappedValue("field", metaType, value, fieldName, field); + Assertions.assertThat(mappedValue).isEqualTo(value); + Assertions.assertThat(mappedValue.getClass()).isEqualTo(Long.class); + } + + @Test + void getMappedValueInvalidMetaDataInteger() { + Field field = VectorSchemaRootUtils.generateInvalidIntField("test"); + String metaType = field.getMetadata().get("iox::column::type"); + String value = "1"; + String fieldName = field.getName(); + Object mappedValue = VectorSchemaRootConverter.INSTANCE + .getMappedValue("field", metaType, value, fieldName, field); + Assertions.assertThat(mappedValue).isEqualTo(value); + Assertions.assertThat(mappedValue.getClass()).isEqualTo(String.class); + } + + @Test + void getMappedValueValidMetaDataFloat() { + Field field = VectorSchemaRootUtils.generateFloatField("test"); + String metaType = field.getMetadata().get("iox::column::type"); + Double value = 1.2; + String fieldName = field.getName(); + Object mappedValue = VectorSchemaRootConverter.INSTANCE + .getMappedValue("field", metaType, value, fieldName, field); + Assertions.assertThat(mappedValue).isEqualTo(value); + Assertions.assertThat(mappedValue.getClass()).isEqualTo(Double.class); + } + + @Test + void getMappedValueInvalidMetaDataFloat() { + Field field = VectorSchemaRootUtils.generateInvalidFloatField("test"); + String metaType = field.getMetadata().get("iox::column::type"); + String value = "1.2"; + String fieldName = field.getName(); + Object mappedValue = VectorSchemaRootConverter.INSTANCE + .getMappedValue("field", metaType, value, fieldName, field); + Assertions.assertThat(mappedValue).isEqualTo(value); + Assertions.assertThat(mappedValue.getClass()).isEqualTo(String.class); + } + + @Test + void getMappedValueValidMetaDataString() { + Field field = VectorSchemaRootUtils.generateStringField("test"); + String metaType = field.getMetadata().get("iox::column::type"); + String value = "string"; + String fieldName = field.getName(); + Object mappedValue = VectorSchemaRootConverter.INSTANCE + .getMappedValue("field", metaType, value, fieldName, field); + Assertions.assertThat(mappedValue).isEqualTo(value); + Assertions.assertThat(mappedValue.getClass()).isEqualTo(String.class); + } + + @Test + void getMappedValueInvalidMetaDataString() { + Field field = VectorSchemaRootUtils.generateInvalidStringField("test"); + String metaType = field.getMetadata().get("iox::column::type"); + Double value = 1.1; + String fieldName = field.getName(); + Object mappedValue = VectorSchemaRootConverter.INSTANCE + .getMappedValue("field", metaType, value, fieldName, field); + Assertions.assertThat(mappedValue).isEqualTo(value); + Assertions.assertThat(mappedValue.getClass()).isEqualTo(Double.class); + } + + @Test + void getMappedValueValidMetaDataBoolean() { + Field field = VectorSchemaRootUtils.generateBoolField("test"); + String metaType = field.getMetadata().get("iox::column::type"); + Boolean value = true; + String fieldName = field.getName(); + Object mappedValue = VectorSchemaRootConverter.INSTANCE + .getMappedValue("field", metaType, value, fieldName, field); + Assertions.assertThat(mappedValue).isEqualTo(value); + Assertions.assertThat(mappedValue.getClass()).isEqualTo(Boolean.class); + } + + @Test + void getMappedValueInvalidMetaDataBoolean() { + Field field = VectorSchemaRootUtils.generateInvalidBoolField("test"); + String metaType = field.getMetadata().get("iox::column::type"); + String value = "true"; + String fieldName = field.getName(); + Object mappedValue = VectorSchemaRootConverter.INSTANCE + .getMappedValue("field", metaType, value, fieldName, field); + Assertions.assertThat(mappedValue).isEqualTo(value); + Assertions.assertThat(mappedValue.getClass()).isEqualTo(String.class); + } + @Test public void testConverterWithMetaType() { try (VectorSchemaRoot root = VectorSchemaRootUtils.generateVectorSchemaRoot()) { - PointValues pointValues = VectorSchemaRootConverter.INSTANCE.toPointValues(0, root, root.getFieldVectors()); + PointValues pointValues = VectorSchemaRootConverter.INSTANCE.toPointValues(0, root.getFieldVectors()); String measurement = pointValues.getMeasurement(); Assertions.assertThat(measurement).isEqualTo("host"); @@ -155,58 +251,6 @@ public void testConverterWithMetaType() { } } - @Test - void setFieldWithMetaTypeInvalidCases() { - PointValues pointValues = PointValues.measurement("host"); - - VectorSchemaRootConverter vectorSchemaRootConverter = VectorSchemaRootConverter.INSTANCE; - - // Real value is not an Integer case - vectorSchemaRootConverter.setFieldWithMetaType( - pointValues, - "test_field", - "string", - "iox::column_type::field::integer" - ); - Assertions.assertThat(pointValues.getField("test_field")).isNull(); - - // Real value is not an unsigned Integer case - vectorSchemaRootConverter.setFieldWithMetaType( - pointValues, - "test_field", - "string", - "iox::column_type::field::uinteger" - ); - Assertions.assertThat(pointValues.getField("test_field")).isNull(); - - // Real value is not a Float case - vectorSchemaRootConverter.setFieldWithMetaType( - pointValues, - "test_field", - "string", - "iox::column_type::field::float" - ); - Assertions.assertThat(pointValues.getField("test_field")).isNull(); - - // Real value is not a String case - vectorSchemaRootConverter.setFieldWithMetaType( - pointValues, - "test_field", - 10.5, - "iox::column_type::field::string" - ); - Assertions.assertThat(pointValues.getField("test_field")).isNull(); - - // Real value is not a Boolean case - vectorSchemaRootConverter.setFieldWithMetaType( - pointValues, - "test_field", - 10.5, - "iox::column_type::field::boolean" - ); - Assertions.assertThat(pointValues.getField("test_field")).isNull(); - } - @Test void timestampWithoutMetadataAndFieldWithoutMetadata() { FieldType timeType = new FieldType(true, new ArrowType.Time(TimeUnit.MILLISECOND, 32), null); @@ -234,7 +278,7 @@ void timestampWithoutMetadataAndFieldWithoutMetadata() { timeVector.setValueCount(1); root.setRowCount(1); - PointValues pointValues = VectorSchemaRootConverter.INSTANCE.toPointValues(0, root, root.getFieldVectors()); + PointValues pointValues = VectorSchemaRootConverter.INSTANCE.toPointValues(0, root.getFieldVectors()); BigInteger expected = BigInteger.valueOf(123_456L * 1_000_000); Assertions.assertThat((BigInteger) pointValues.getTimestamp()).isEqualByComparingTo(expected); @@ -262,7 +306,7 @@ public void measurementValue() { measurementVector.setValueCount(1); root.setRowCount(1); - PointValues pointValues = VectorSchemaRootConverter.INSTANCE.toPointValues(0, root, root.getFieldVectors()); + PointValues pointValues = VectorSchemaRootConverter.INSTANCE.toPointValues(0, root.getFieldVectors()); Assertions.assertThat(pointValues.getMeasurement()).isEqualTo("measurementValue"); } diff --git a/src/main/java/com/influxdb/v3/client/internal/VectorSchemaRootUtils.java b/src/test/java/com/influxdb/v3/client/internal/VectorSchemaRootUtils.java similarity index 93% rename from src/main/java/com/influxdb/v3/client/internal/VectorSchemaRootUtils.java rename to src/test/java/com/influxdb/v3/client/internal/VectorSchemaRootUtils.java index fbd1a1c5..3557e037 100644 --- a/src/main/java/com/influxdb/v3/client/internal/VectorSchemaRootUtils.java +++ b/src/test/java/com/influxdb/v3/client/internal/VectorSchemaRootUtils.java @@ -142,7 +142,7 @@ public static VectorSchemaRoot initializeVectorSchemaRoot(@Nonnull final Field.. return root; } - private static Field generateIntField(final String fieldName) { + public static Field generateIntField(final String fieldName) { Map metadata = new HashMap<>(); metadata.put("iox::column::type", "iox::column_type::field::integer"); FieldType intType = new FieldType(true, @@ -152,7 +152,7 @@ private static Field generateIntField(final String fieldName) { return new Field(fieldName, intType, null); } - private static Field generateInvalidIntField(final String fieldName) { + public static Field generateInvalidIntField(final String fieldName) { Map metadata = new HashMap<>(); metadata.put("iox::column::type", "iox::column_type::field::integer"); FieldType intType = new FieldType(true, @@ -162,7 +162,7 @@ private static Field generateInvalidIntField(final String fieldName) { return new Field(fieldName, intType, null); } - private static Field generateUnsignedIntField(final String fieldName) { + public static Field generateUnsignedIntField(final String fieldName) { Map metadata = new HashMap<>(); metadata.put("iox::column::type", "iox::column_type::field::uinteger"); FieldType intType = new FieldType(true, @@ -172,7 +172,7 @@ private static Field generateUnsignedIntField(final String fieldName) { return new Field(fieldName, intType, null); } - private static Field generateInvalidUnsignedIntField(final String fieldName) { + public static Field generateInvalidUnsignedIntField(final String fieldName) { Map metadata = new HashMap<>(); metadata.put("iox::column::type", "iox::column_type::field::uinteger"); FieldType intType = new FieldType(true, @@ -182,7 +182,7 @@ private static Field generateInvalidUnsignedIntField(final String fieldName) { return new Field(fieldName, intType, null); } - private static Field generateFloatField(final String fieldName) { + public static Field generateFloatField(final String fieldName) { Map metadata = new HashMap<>(); metadata.put("iox::column::type", "iox::column_type::field::float"); FieldType floatType = new FieldType(true, @@ -192,7 +192,7 @@ private static Field generateFloatField(final String fieldName) { return new Field(fieldName, floatType, null); } - private static Field generateInvalidFloatField(final String fieldName) { + public static Field generateInvalidFloatField(final String fieldName) { Map metadata = new HashMap<>(); metadata.put("iox::column::type", "iox::column_type::field::float"); FieldType floatType = new FieldType(true, @@ -202,14 +202,14 @@ private static Field generateInvalidFloatField(final String fieldName) { return new Field(fieldName, floatType, null); } - private static Field generateStringField(final String fieldName) { + public static Field generateStringField(final String fieldName) { Map metadata = new HashMap<>(); metadata.put("iox::column::type", "iox::column_type::field::string"); FieldType stringType = new FieldType(true, new ArrowType.Utf8(), null, metadata); return new Field(fieldName, stringType, null); } - private static Field generateInvalidStringField(final String fieldName) { + public static Field generateInvalidStringField(final String fieldName) { Map metadata = new HashMap<>(); metadata.put("iox::column::type", "iox::column_type::field::string"); FieldType stringType = new FieldType(true, @@ -220,21 +220,21 @@ private static Field generateInvalidStringField(final String fieldName) { return new Field(fieldName, stringType, null); } - private static Field generateBoolField(final String fieldName) { + public static Field generateBoolField(final String fieldName) { Map metadata = new HashMap<>(); metadata.put("iox::column::type", "iox::column_type::field::boolean"); FieldType boolType = new FieldType(true, new ArrowType.Bool(), null, metadata); return new Field(fieldName, boolType, null); } - private static Field generateInvalidBoolField(final String fieldName) { + public static Field generateInvalidBoolField(final String fieldName) { Map metadata = new HashMap<>(); metadata.put("iox::column::type", "iox::column_type::field::boolean"); FieldType boolType = new FieldType(true, new ArrowType.Utf8(), null, metadata); return new Field(fieldName, boolType, null); } - private static Field generateTimeField() { + public static Field generateTimeField() { FieldType timeType = new FieldType(true, new ArrowType.Time(TimeUnit.MILLISECOND, 32), null); From a796a131ea2e2cbadfc98ca5d083336662d5e28b Mon Sep 17 00:00:00 2001 From: NguyenHoangSon96 Date: Fri, 29 Nov 2024 10:15:09 +0700 Subject: [PATCH 16/23] refactor: getMappedValue function --- .../internal/VectorSchemaRootConverter.java | 46 +++++++++---------- .../VectorSchemaRootConverterTest.java | 40 ++++------------ 2 files changed, 31 insertions(+), 55 deletions(-) diff --git a/src/main/java/com/influxdb/v3/client/internal/VectorSchemaRootConverter.java b/src/main/java/com/influxdb/v3/client/internal/VectorSchemaRootConverter.java index 3b6a13d6..7e89e9bd 100644 --- a/src/main/java/com/influxdb/v3/client/internal/VectorSchemaRootConverter.java +++ b/src/main/java/com/influxdb/v3/client/internal/VectorSchemaRootConverter.java @@ -28,6 +28,7 @@ import java.util.Objects; import java.util.logging.Logger; import javax.annotation.Nonnull; +import javax.annotation.Nullable; import javax.annotation.concurrent.ThreadSafe; import org.apache.arrow.vector.FieldVector; @@ -92,32 +93,36 @@ PointValues toPointValues(final int rowNumber, continue; } - String[] parts = metaType.split("::"); - String valueType = parts[2]; - + String valueType = metaType.split("::")[2]; + Object mappedValue = getMappedValue(field, value); if ("field".equals(valueType)) { - var fieldValue = getMappedValue(valueType, metaType, value, fieldName, field); - p.setField(fieldName, fieldValue); + p.setField(fieldName, mappedValue); } else if ("tag".equals(valueType) && value instanceof String) { - var tag = (String) getMappedValue(valueType, metaType, value, fieldName, field); - p.setTag(fieldName, tag); + p.setTag(fieldName, (String) mappedValue); } else if ("timestamp".equals(valueType)) { - var timeNano = (BigInteger) getMappedValue(valueType, metaType, value, fieldName, field); - p.setTimestamp(timeNano, WritePrecision.NS); + p.setTimestamp((BigInteger) mappedValue, WritePrecision.NS); } } return p; } - public Object getMappedValue(final String valueType, - final String metaType, - final Object value, - final String fieldName, - final Field field) { + /** + * Function to cast value return base on metadata from InfluxDB. + * + * @param field the Field object from Arrow + * @param value the value to cast + * @return the casted value + */ + public Object getMappedValue(@Nonnull final Field field, @Nullable final Object value) { if (value == null) { return null; } + var metaType = field.getMetadata().get("iox::column::type"); + var fieldName = field.getName(); + String[] parts = metaType.split("::"); + String valueType = parts[2]; + if ("field".equals(valueType)) { switch (metaType) { case "iox::column_type::field::integer": @@ -171,15 +176,10 @@ public Object getMappedValue(final String valueType, public Object[] getArrayObjectFromVectorSchemaRoot(final VectorSchemaRoot vector, final int rowNumber) { List row = new ArrayList<>(); for (FieldVector fieldVector : vector.getFieldVectors()) { - var field = fieldVector.getField(); - var metaType = field.getMetadata().get("iox::column::type"); - String valueType = metaType != null ? metaType.split("::")[2] : null; - - var value = getMappedValue(valueType, - metaType, - fieldVector.getObject(rowNumber), - field.getName(), - field); + var value = getMappedValue( + fieldVector.getField(), + fieldVector.getObject(rowNumber) + ); row.add(value); } diff --git a/src/test/java/com/influxdb/v3/client/internal/VectorSchemaRootConverterTest.java b/src/test/java/com/influxdb/v3/client/internal/VectorSchemaRootConverterTest.java index 0aaad4f9..dbbd69aa 100644 --- a/src/test/java/com/influxdb/v3/client/internal/VectorSchemaRootConverterTest.java +++ b/src/test/java/com/influxdb/v3/client/internal/VectorSchemaRootConverterTest.java @@ -124,11 +124,8 @@ void timestampAsArrowInt() { @Test void getMappedValueValidMetaDataInteger() { Field field = VectorSchemaRootUtils.generateIntField("test"); - String metaType = field.getMetadata().get("iox::column::type"); Long value = 1L; - String fieldName = field.getName(); - Object mappedValue = VectorSchemaRootConverter.INSTANCE - .getMappedValue("field", metaType, value, fieldName, field); + Object mappedValue = VectorSchemaRootConverter.INSTANCE.getMappedValue(field, value); Assertions.assertThat(mappedValue).isEqualTo(value); Assertions.assertThat(mappedValue.getClass()).isEqualTo(Long.class); } @@ -136,11 +133,8 @@ void getMappedValueValidMetaDataInteger() { @Test void getMappedValueInvalidMetaDataInteger() { Field field = VectorSchemaRootUtils.generateInvalidIntField("test"); - String metaType = field.getMetadata().get("iox::column::type"); String value = "1"; - String fieldName = field.getName(); - Object mappedValue = VectorSchemaRootConverter.INSTANCE - .getMappedValue("field", metaType, value, fieldName, field); + Object mappedValue = VectorSchemaRootConverter.INSTANCE.getMappedValue(field, value); Assertions.assertThat(mappedValue).isEqualTo(value); Assertions.assertThat(mappedValue.getClass()).isEqualTo(String.class); } @@ -148,11 +142,8 @@ void getMappedValueInvalidMetaDataInteger() { @Test void getMappedValueValidMetaDataFloat() { Field field = VectorSchemaRootUtils.generateFloatField("test"); - String metaType = field.getMetadata().get("iox::column::type"); Double value = 1.2; - String fieldName = field.getName(); - Object mappedValue = VectorSchemaRootConverter.INSTANCE - .getMappedValue("field", metaType, value, fieldName, field); + Object mappedValue = VectorSchemaRootConverter.INSTANCE.getMappedValue(field, value); Assertions.assertThat(mappedValue).isEqualTo(value); Assertions.assertThat(mappedValue.getClass()).isEqualTo(Double.class); } @@ -160,11 +151,8 @@ void getMappedValueValidMetaDataFloat() { @Test void getMappedValueInvalidMetaDataFloat() { Field field = VectorSchemaRootUtils.generateInvalidFloatField("test"); - String metaType = field.getMetadata().get("iox::column::type"); String value = "1.2"; - String fieldName = field.getName(); - Object mappedValue = VectorSchemaRootConverter.INSTANCE - .getMappedValue("field", metaType, value, fieldName, field); + Object mappedValue = VectorSchemaRootConverter.INSTANCE.getMappedValue(field, value); Assertions.assertThat(mappedValue).isEqualTo(value); Assertions.assertThat(mappedValue.getClass()).isEqualTo(String.class); } @@ -172,11 +160,8 @@ void getMappedValueInvalidMetaDataFloat() { @Test void getMappedValueValidMetaDataString() { Field field = VectorSchemaRootUtils.generateStringField("test"); - String metaType = field.getMetadata().get("iox::column::type"); String value = "string"; - String fieldName = field.getName(); - Object mappedValue = VectorSchemaRootConverter.INSTANCE - .getMappedValue("field", metaType, value, fieldName, field); + Object mappedValue = VectorSchemaRootConverter.INSTANCE.getMappedValue(field, value); Assertions.assertThat(mappedValue).isEqualTo(value); Assertions.assertThat(mappedValue.getClass()).isEqualTo(String.class); } @@ -184,11 +169,8 @@ void getMappedValueValidMetaDataString() { @Test void getMappedValueInvalidMetaDataString() { Field field = VectorSchemaRootUtils.generateInvalidStringField("test"); - String metaType = field.getMetadata().get("iox::column::type"); Double value = 1.1; - String fieldName = field.getName(); - Object mappedValue = VectorSchemaRootConverter.INSTANCE - .getMappedValue("field", metaType, value, fieldName, field); + Object mappedValue = VectorSchemaRootConverter.INSTANCE.getMappedValue(field, value); Assertions.assertThat(mappedValue).isEqualTo(value); Assertions.assertThat(mappedValue.getClass()).isEqualTo(Double.class); } @@ -196,11 +178,8 @@ void getMappedValueInvalidMetaDataString() { @Test void getMappedValueValidMetaDataBoolean() { Field field = VectorSchemaRootUtils.generateBoolField("test"); - String metaType = field.getMetadata().get("iox::column::type"); Boolean value = true; - String fieldName = field.getName(); - Object mappedValue = VectorSchemaRootConverter.INSTANCE - .getMappedValue("field", metaType, value, fieldName, field); + Object mappedValue = VectorSchemaRootConverter.INSTANCE.getMappedValue(field, value); Assertions.assertThat(mappedValue).isEqualTo(value); Assertions.assertThat(mappedValue.getClass()).isEqualTo(Boolean.class); } @@ -208,11 +187,8 @@ void getMappedValueValidMetaDataBoolean() { @Test void getMappedValueInvalidMetaDataBoolean() { Field field = VectorSchemaRootUtils.generateInvalidBoolField("test"); - String metaType = field.getMetadata().get("iox::column::type"); String value = "true"; - String fieldName = field.getName(); - Object mappedValue = VectorSchemaRootConverter.INSTANCE - .getMappedValue("field", metaType, value, fieldName, field); + Object mappedValue = VectorSchemaRootConverter.INSTANCE.getMappedValue(field, value); Assertions.assertThat(mappedValue).isEqualTo(value); Assertions.assertThat(mappedValue.getClass()).isEqualTo(String.class); } From daab5454532337c33fdbff7c9fe5f59d2b21ca2e Mon Sep 17 00:00:00 2001 From: NguyenHoangSon96 Date: Fri, 29 Nov 2024 10:54:19 +0700 Subject: [PATCH 17/23] refactor: getMappedValue function --- .../internal/VectorSchemaRootConverter.java | 15 +++++++++++++-- 1 file changed, 13 insertions(+), 2 deletions(-) diff --git a/src/main/java/com/influxdb/v3/client/internal/VectorSchemaRootConverter.java b/src/main/java/com/influxdb/v3/client/internal/VectorSchemaRootConverter.java index 7e89e9bd..11bf92ad 100644 --- a/src/main/java/com/influxdb/v3/client/internal/VectorSchemaRootConverter.java +++ b/src/main/java/com/influxdb/v3/client/internal/VectorSchemaRootConverter.java @@ -118,11 +118,22 @@ public Object getMappedValue(@Nonnull final Field field, @Nullable final Object return null; } - var metaType = field.getMetadata().get("iox::column::type"); var fieldName = field.getName(); + if ("measurement".equals(fieldName) || "iox::measurement".equals(fieldName)) { + return value.toString(); + } + + var metaType = field.getMetadata().get("iox::column::type"); + if (metaType == null) { + if ("time".equals(fieldName) && (value instanceof Long || value instanceof LocalDateTime)) { + return NanosecondConverter.getTimestampNano(value, field); + } else { + return value; + } + } + String[] parts = metaType.split("::"); String valueType = parts[2]; - if ("field".equals(valueType)) { switch (metaType) { case "iox::column_type::field::integer": From 06e29ed79236ae6fb6b300473a440989f0130f1e Mon Sep 17 00:00:00 2001 From: NguyenHoangSon96 Date: Fri, 29 Nov 2024 11:01:47 +0700 Subject: [PATCH 18/23] refactor: getMappedValue function --- .../v3/client/internal/VectorSchemaRootConverter.java | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/src/main/java/com/influxdb/v3/client/internal/VectorSchemaRootConverter.java b/src/main/java/com/influxdb/v3/client/internal/VectorSchemaRootConverter.java index 11bf92ad..7a8ff847 100644 --- a/src/main/java/com/influxdb/v3/client/internal/VectorSchemaRootConverter.java +++ b/src/main/java/com/influxdb/v3/client/internal/VectorSchemaRootConverter.java @@ -166,15 +166,13 @@ public Object getMappedValue(@Nonnull final Field field, @Nullable final Object return value; } default: - return value; } } else if ("timestamp".equals(valueType) || Objects.equals(fieldName, "time")) { return NanosecondConverter.getTimestampNano(value, field); } else if ("tag".equals(valueType)) { return TypeCasting.toStringValue(value); - } else { - return value; } + return value; } /** From 16ea54f4cd3b7a979d7861567f7e6d60f12a0e3f Mon Sep 17 00:00:00 2001 From: NguyenHoangSon96 Date: Fri, 29 Nov 2024 14:06:19 +0700 Subject: [PATCH 19/23] fix: linter --- .../influxdb/v3/client/internal/VectorSchemaRootConverter.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/com/influxdb/v3/client/internal/VectorSchemaRootConverter.java b/src/main/java/com/influxdb/v3/client/internal/VectorSchemaRootConverter.java index 7a8ff847..0fdc1345 100644 --- a/src/main/java/com/influxdb/v3/client/internal/VectorSchemaRootConverter.java +++ b/src/main/java/com/influxdb/v3/client/internal/VectorSchemaRootConverter.java @@ -169,7 +169,7 @@ public Object getMappedValue(@Nonnull final Field field, @Nullable final Object } } else if ("timestamp".equals(valueType) || Objects.equals(fieldName, "time")) { return NanosecondConverter.getTimestampNano(value, field); - } else if ("tag".equals(valueType)) { + } else { return TypeCasting.toStringValue(value); } return value; From 547c87e433aace452710bbdeb32f258c4cff2e2c Mon Sep 17 00:00:00 2001 From: NguyenHoangSon96 Date: Fri, 29 Nov 2024 14:40:26 +0700 Subject: [PATCH 20/23] fix: linter --- .../influxdb/v3/client/internal/VectorSchemaRootConverter.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/com/influxdb/v3/client/internal/VectorSchemaRootConverter.java b/src/main/java/com/influxdb/v3/client/internal/VectorSchemaRootConverter.java index 0fdc1345..d316b54a 100644 --- a/src/main/java/com/influxdb/v3/client/internal/VectorSchemaRootConverter.java +++ b/src/main/java/com/influxdb/v3/client/internal/VectorSchemaRootConverter.java @@ -166,13 +166,13 @@ public Object getMappedValue(@Nonnull final Field field, @Nullable final Object return value; } default: + return value; } } else if ("timestamp".equals(valueType) || Objects.equals(fieldName, "time")) { return NanosecondConverter.getTimestampNano(value, field); } else { return TypeCasting.toStringValue(value); } - return value; } /** From dbaaf99e67229c6336099a37d6a819aded506c70 Mon Sep 17 00:00:00 2001 From: NguyenHoangSon96 Date: Fri, 6 Dec 2024 08:59:38 +0700 Subject: [PATCH 21/23] refactor: change warning message --- .../client/internal/NanosecondConverter.java | 4 ++- .../internal/VectorSchemaRootConverter.java | 27 ++++++++++--------- 2 files changed, 17 insertions(+), 14 deletions(-) diff --git a/src/main/java/com/influxdb/v3/client/internal/NanosecondConverter.java b/src/main/java/com/influxdb/v3/client/internal/NanosecondConverter.java index 56d9a210..102f481a 100644 --- a/src/main/java/com/influxdb/v3/client/internal/NanosecondConverter.java +++ b/src/main/java/com/influxdb/v3/client/internal/NanosecondConverter.java @@ -126,6 +126,7 @@ public static BigInteger convert(final Instant instant, final WritePrecision pre * @param field the arrow field metadata * @return the time in nanosecond */ + @Nullable public static BigInteger getTimestampNano(@Nonnull final Object value, @Nonnull final Field field) { BigInteger result = null; @@ -162,7 +163,8 @@ public static BigInteger getTimestampNano(@Nonnull final Object value, @Nonnull return result; } - private static BigInteger convertInstantToNano(final Instant instant) { + @Nullable + private static BigInteger convertInstantToNano(@Nonnull final Instant instant) { var writePrecision = WritePrecision.NS; BigInteger convertedTime = NanosecondConverter.convert(instant, writePrecision); return NanosecondConverter.convertToNanos(convertedTime, writePrecision); diff --git a/src/main/java/com/influxdb/v3/client/internal/VectorSchemaRootConverter.java b/src/main/java/com/influxdb/v3/client/internal/VectorSchemaRootConverter.java index d316b54a..624ca8eb 100644 --- a/src/main/java/com/influxdb/v3/client/internal/VectorSchemaRootConverter.java +++ b/src/main/java/com/influxdb/v3/client/internal/VectorSchemaRootConverter.java @@ -111,7 +111,7 @@ PointValues toPointValues(final int rowNumber, * * @param field the Field object from Arrow * @param value the value to cast - * @return the casted value + * @return the value with the correct type */ public Object getMappedValue(@Nonnull final Field field, @Nullable final Object value) { if (value == null) { @@ -141,28 +141,28 @@ public Object getMappedValue(@Nonnull final Field field, @Nullable final Object if (value instanceof Number) { return TypeCasting.toLongValue(value); } else { - LOG.warning(String.format("Value of %s is not an Integer", fieldName)); + LOG.warning(String.format("Value %s is not an Long", value)); return value; } case "iox::column_type::field::float": if (value instanceof Number) { return TypeCasting.toDoubleValue(value); } else { - LOG.warning(String.format("Value of %s is not a Double", fieldName)); + LOG.warning(String.format("Value %s is not a Double", value)); return value; } case "iox::column_type::field::string": if (value instanceof Text || value instanceof String) { return TypeCasting.toStringValue(value); } else { - LOG.warning(String.format("Value of %s is not a String", fieldName)); + LOG.warning(String.format("Value %s is not a String", value)); return value; } case "iox::column_type::field::boolean": if (value instanceof Boolean) { - return (Boolean) value; + return value; } else { - LOG.warning(String.format("Value of %s is not a Boolean", fieldName)); + LOG.warning(String.format("Value %s is not a Boolean", value)); return value; } default: @@ -180,18 +180,19 @@ public Object getMappedValue(@Nonnull final Field field, @Nullable final Object * * @param vector The data return from InfluxDB. * @param rowNumber The row number of data - * @return An array of Objects represent for a row of data + * @return An array of Objects represents a row of data */ - public Object[] getArrayObjectFromVectorSchemaRoot(final VectorSchemaRoot vector, final int rowNumber) { - List row = new ArrayList<>(); - for (FieldVector fieldVector : vector.getFieldVectors()) { - var value = getMappedValue( + public Object[] getArrayObjectFromVectorSchemaRoot(@Nonnull final VectorSchemaRoot vector, final int rowNumber) { + int columnSize = vector.getFieldVectors().size(); + var row = new Object[columnSize]; + for (int i = 0; i < columnSize; i++) { + FieldVector fieldVector = vector.getVector(i); + row[i] = getMappedValue( fieldVector.getField(), fieldVector.getObject(rowNumber) ); - row.add(value); } - return row.toArray(); + return row; } } From 79159a966ed28754a62886c4a30999f7166fdc20 Mon Sep 17 00:00:00 2001 From: NguyenHoangSon96 Date: Fri, 6 Dec 2024 09:03:48 +0700 Subject: [PATCH 22/23] fix: build --- .../influxdb/v3/client/internal/VectorSchemaRootConverter.java | 1 - 1 file changed, 1 deletion(-) diff --git a/src/main/java/com/influxdb/v3/client/internal/VectorSchemaRootConverter.java b/src/main/java/com/influxdb/v3/client/internal/VectorSchemaRootConverter.java index 624ca8eb..d9127bfe 100644 --- a/src/main/java/com/influxdb/v3/client/internal/VectorSchemaRootConverter.java +++ b/src/main/java/com/influxdb/v3/client/internal/VectorSchemaRootConverter.java @@ -23,7 +23,6 @@ import java.math.BigInteger; import java.time.LocalDateTime; -import java.util.ArrayList; import java.util.List; import java.util.Objects; import java.util.logging.Logger; From fcca2dfaf58a680184267a84b55d65d120104a60 Mon Sep 17 00:00:00 2001 From: NguyenHoangSon96 Date: Fri, 6 Dec 2024 11:53:42 +0700 Subject: [PATCH 23/23] refactor: getArrayObjectFromVectorSchemaRoot function --- .../v3/client/internal/VectorSchemaRootConverter.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/main/java/com/influxdb/v3/client/internal/VectorSchemaRootConverter.java b/src/main/java/com/influxdb/v3/client/internal/VectorSchemaRootConverter.java index d9127bfe..b2ce9587 100644 --- a/src/main/java/com/influxdb/v3/client/internal/VectorSchemaRootConverter.java +++ b/src/main/java/com/influxdb/v3/client/internal/VectorSchemaRootConverter.java @@ -182,10 +182,11 @@ public Object getMappedValue(@Nonnull final Field field, @Nullable final Object * @return An array of Objects represents a row of data */ public Object[] getArrayObjectFromVectorSchemaRoot(@Nonnull final VectorSchemaRoot vector, final int rowNumber) { - int columnSize = vector.getFieldVectors().size(); + List fieldVectors = vector.getFieldVectors(); + int columnSize = fieldVectors.size(); var row = new Object[columnSize]; for (int i = 0; i < columnSize; i++) { - FieldVector fieldVector = vector.getVector(i); + FieldVector fieldVector = fieldVectors.get(i); row[i] = getMappedValue( fieldVector.getField(), fieldVector.getObject(rowNumber)