From 54611731a2867ce982b6e1ce95d0ad0ce481e534 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jakub=20Bedn=C3=A1=C5=99?= Date: Mon, 16 Oct 2023 14:59:49 +0200 Subject: [PATCH] fix: iteration over more Arrow streams (#55) * chore: use try-resource pattern * chore: use try-resource pattern --- CHANGELOG.md | 4 ++++ .../v3/client/internal/FlightSqlClient.java | 22 ++++--------------- .../com/influxdb/v3/client/ITQueryWrite.java | 21 ++++++++++++++++++ 3 files changed, 29 insertions(+), 18 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 0b70f2d..313cdbb 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,9 @@ ## 0.4.0 [unreleased] +### Bug Fixes + +1. [#55](https://github.com/InfluxCommunity/influxdb3-java/pull/55): Iteration over more Arrow streams + ## 0.3.0 [2023-10-02] ### Features diff --git a/src/main/java/com/influxdb/v3/client/internal/FlightSqlClient.java b/src/main/java/com/influxdb/v3/client/internal/FlightSqlClient.java index a93b39c..8ff8b8c 100644 --- a/src/main/java/com/influxdb/v3/client/internal/FlightSqlClient.java +++ b/src/main/java/com/influxdb/v3/client/internal/FlightSqlClient.java @@ -124,28 +124,25 @@ private static final class FlightSqlIterator implements Iterator autoCloseable = new ArrayList<>(); private final FlightStream flightStream; - private VectorSchemaRoot currentVectorSchemaRoot = null; private FlightSqlIterator(@Nonnull final FlightStream flightStream) { this.flightStream = flightStream; - loadNext(); } @Override public boolean hasNext() { - return currentVectorSchemaRoot != null; + return flightStream.next(); } @Override public VectorSchemaRoot next() { - if (currentVectorSchemaRoot == null) { + if (flightStream.getRoot() == null) { throw new NoSuchElementException(); } - VectorSchemaRoot oldVectorSchemaRoot = currentVectorSchemaRoot; - loadNext(); + autoCloseable.add(flightStream.getRoot()); - return oldVectorSchemaRoot; + return flightStream.getRoot(); } @Override @@ -156,16 +153,5 @@ public void close() { throw new RuntimeException(e); } } - - private void loadNext() { - - if (flightStream != null && flightStream.next()) { - currentVectorSchemaRoot = flightStream.getRoot(); - autoCloseable.add(currentVectorSchemaRoot); - - } else { - currentVectorSchemaRoot = null; - } - } } } diff --git a/src/test/java/com/influxdb/v3/client/ITQueryWrite.java b/src/test/java/com/influxdb/v3/client/ITQueryWrite.java index 7907cea..a7098df 100644 --- a/src/test/java/com/influxdb/v3/client/ITQueryWrite.java +++ b/src/test/java/com/influxdb/v3/client/ITQueryWrite.java @@ -124,6 +124,27 @@ void queryWriteGzip() { } } + @EnabledIfEnvironmentVariable(named = "TESTING_INFLUXDB_URL", matches = ".*") + @EnabledIfEnvironmentVariable(named = "TESTING_INFLUXDB_TOKEN", matches = ".*") + @EnabledIfEnvironmentVariable(named = "TESTING_INFLUXDB_DATABASE", matches = ".*") + @Test + void iteratingMoreVectorSchemaRoots() { + client = InfluxDBClient.getInstance(new ClientConfig.Builder() + .host(System.getenv("TESTING_INFLUXDB_URL")) + .token(System.getenv("TESTING_INFLUXDB_TOKEN").toCharArray()) + .database(System.getenv("TESTING_INFLUXDB_DATABASE")) + .gzipThreshold(1) + .build()); + + String query = "SELECT name FROM (VALUES ('Alice', 4.56), ('Bob', 8.1)) AS data(name, value) group by name"; + try (Stream stream = client.query(query)) { + Object[] names = stream.map(row -> row[0].toString()).toArray(); + + Assertions.assertThat(names).contains("Alice"); + Assertions.assertThat(names).contains("Bob"); + } + } + @NotNull private static InfluxDBClient getInstance() { return InfluxDBClient.getInstance(