Skip to content

Commit

Permalink
fix: serialize InfluxDB's response to PointValues
Browse files Browse the repository at this point in the history
  • Loading branch information
bednar committed Dec 8, 2023
1 parent c9ac879 commit 6ca572b
Showing 1 changed file with 13 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
Expand Down Expand Up @@ -163,7 +165,7 @@ public Stream<Object[]> query(@Nonnull final String query, @Nonnull final QueryO

@Nonnull
@Override
public Stream<PointValues> queryPoints(@Nonnull final String query) {
public Stream<PointValues> queryPoints(@Nonnull final String query) {
return queryPoints(query, QueryOptions.DEFAULTS);
}

Expand All @@ -177,9 +179,6 @@ public Stream<PointValues> queryPoints(@Nonnull final String query, @Nonnull fin
.range(0, vector.getRowCount())
.mapToObj(rowNumber -> {
PointValues p = new PointValues();


ArrayList<Object> row = new ArrayList<>();
for (int i = 0; i < fieldVectors.size(); i++) {
var schema = vector.getSchema().getFields().get(i);
var value = fieldVectors.get(i).getObject(rowNumber);
Expand All @@ -192,7 +191,7 @@ public Stream<PointValues> queryPoints(@Nonnull final String query, @Nonnull fin

if ((Objects.equals(name, "measurement")
|| Objects.equals(name, "iox::measurement"))
&& value instanceof String) {
&& value instanceof String) {
p.setMeasurement((String) value);
continue;
}
Expand All @@ -208,15 +207,21 @@ public Stream<PointValues> queryPoints(@Nonnull final String query, @Nonnull fin
continue;
}

String[] parts = metaType.split(":");
String[] parts = metaType.split("::");
String valueType = parts[2];

if ("field".equals(valueType)) {
p.setField(name, value);
} else if ("tag".equals(valueType) && value instanceof String) {
p.setTag(name, (String) value);
} else if ("timestamp".equals(valueType) && value instanceof Instant) {
p.setTimestamp((Instant) value);
} else if ("timestamp".equals(valueType)) {
if (value instanceof Long) {
p.setTimestamp(Instant.ofEpochMilli((Long) value));

Check warning on line 219 in src/main/java/com/influxdb/v3/client/internal/InfluxDBClientImpl.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/com/influxdb/v3/client/internal/InfluxDBClientImpl.java#L219

Added line #L219 was not covered by tests
} else if (value instanceof Instant) {
p.setTimestamp((Instant) value);

Check warning on line 221 in src/main/java/com/influxdb/v3/client/internal/InfluxDBClientImpl.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/com/influxdb/v3/client/internal/InfluxDBClientImpl.java#L221

Added line #L221 was not covered by tests
} else if (value instanceof LocalDateTime) {
p.setTimestamp(((LocalDateTime) value).toInstant(ZoneOffset.UTC));
}
}
}

Expand Down

0 comments on commit 6ca572b

Please sign in to comment.