Skip to content

Commit

Permalink
apacheGH-38096: [Java] FlightStream with metadata can cause error whe…
Browse files Browse the repository at this point in the history
…n closing (apache#38110)

### Rationale for this change

The Java FlightStream can raise an error if metadata is transferred and ends up being closed twice.

```
java.lang.IllegalStateException: RefCnt has gone negative
	at org.apache.arrow.util.Preconditions.checkState(Preconditions.java:458)
	at org.apache.arrow.memory.BufferLedger.release(BufferLedger.java:130)
	at org.apache.arrow.memory.BufferLedger.release(BufferLedger.java:104)
	at org.apache.arrow.memory.ArrowBuf.close(ArrowBuf.java:1044)
	at org.apache.arrow.util.AutoCloseables.close(AutoCloseables.java:97)
	at org.apache.arrow.flight.FlightStream.close(FlightStream.java:208)
```

### What changes are included in this PR?

When FlightStream is closed, remove any reference of previous metadata to prevent reference count going negative if closed again. Also added `ExchangeReaderWriter.getResult()` for convenience and clear up ambiguity on error handling.

### Are these changes tested?

Unit tests added for closing with metadata and 

### Are there any user-facing changes?

Added `ExchangeReaderWriter.getResult()`
* Closes: apache#38096

Authored-by: Bryan Cutler <[email protected]>
Signed-off-by: David Li <[email protected]>
  • Loading branch information
BryanCutler authored and dgreiss committed Feb 17, 2024
1 parent a45dd10 commit 9afb870
Show file tree
Hide file tree
Showing 3 changed files with 103 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -429,6 +429,16 @@ public ClientStreamListener getWriter() {
return writer;
}

/**
* Make sure stream is drained. You must call this to be notified of any errors that may have
* happened after the exchange is complete. This should be called after `getWriter().completed()`
* and instead of `getWriter().getResult()`.
*/
public void getResult() {
// After exchange is complete, make sure stream is drained to propagate errors through reader
while (reader.next()) { };
}

/** Shut down the streams in this call. */
@Override
public void close() throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,8 @@ public void close() throws Exception {
} else {
AutoCloseables.close(closeables);
}
// Remove any metadata after closing to prevent negative refcnt
applicationMetadata = null;
} finally {
// The value of this CompletableFuture is meaningless, only whether it's completed (or has an exception)
// No-op if already complete
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ public class TestDoExchange {
static byte[] EXCHANGE_METADATA_ONLY = "only-metadata".getBytes(StandardCharsets.UTF_8);
static byte[] EXCHANGE_TRANSFORM = "transform".getBytes(StandardCharsets.UTF_8);
static byte[] EXCHANGE_CANCEL = "cancel".getBytes(StandardCharsets.UTF_8);
static byte[] EXCHANGE_ERROR = "error".getBytes(StandardCharsets.UTF_8);

private BufferAllocator allocator;
private FlightServer server;
Expand Down Expand Up @@ -365,6 +366,37 @@ public void testClientCancel() throws Exception {
}
}

/** Test a DoExchange error handling. */
@Test
public void testDoExchangeError() throws Exception {
final Schema schema = new Schema(Collections.singletonList(Field.nullable("a", new ArrowType.Int(32, true))));
try (final FlightClient.ExchangeReaderWriter stream = client.doExchange(FlightDescriptor.command(EXCHANGE_ERROR));
final VectorSchemaRoot root = VectorSchemaRoot.create(schema, allocator)) {
final FlightStream reader = stream.getReader();

// Write data and check that it gets echoed back.
IntVector iv = (IntVector) root.getVector("a");
iv.allocateNew();
stream.getWriter().start(root);
for (int i = 0; i < 10; i++) {
iv.setSafe(0, i);
root.setRowCount(1);
stream.getWriter().putNext();

assertTrue(reader.next());
assertEquals(root.getSchema(), reader.getSchema());
assertEquals(i, ((IntVector) reader.getRoot().getVector("a")).get(0));
}

// Complete the stream so that the server knows not to expect any more messages from us.
stream.getWriter().completed();

// Must call reader.next() to get any errors after exchange, will return false if no error
final FlightRuntimeException fre = assertThrows(FlightRuntimeException.class, stream::getResult);
assertEquals("error completing exchange", fre.status().description());
}
}

/** Have the client close the stream without reading; ensure memory is not leaked. */
@Test
public void testClientClose() throws Exception {
Expand All @@ -381,6 +413,38 @@ public void testClientClose() throws Exception {
client = null;
}

/** Test closing with Metadata can't lead to error. */
@Test
public void testCloseWithMetadata() throws Exception {
// Send a particular descriptor to the server and check for a particular response pattern.
try (final FlightClient.ExchangeReaderWriter stream =
client.doExchange(FlightDescriptor.command(EXCHANGE_METADATA_ONLY))) {
final FlightStream reader = stream.getReader();

// Server starts by sending a message without data (hence no VectorSchemaRoot should be present)
assertTrue(reader.next());
assertFalse(reader.hasRoot());
assertEquals(42, reader.getLatestMetadata().getInt(0));

// Write a metadata message to the server (without sending any data)
ArrowBuf buf = allocator.buffer(4);
buf.writeInt(84);
stream.getWriter().putMetadata(buf);

// Check that the server echoed the metadata back to us
assertTrue(reader.next());
assertFalse(reader.hasRoot());
assertEquals(84, reader.getLatestMetadata().getInt(0));

// Close our write channel and ensure the server also closes theirs
stream.getWriter().completed();
stream.getResult();

// Not necessary to close reader here, but check closing twice doesn't lead to negative refcnt from metadata
stream.getReader().close();
}
}

static class Producer extends NoOpFlightProducer {
static final Schema SCHEMA = new Schema(
Collections.singletonList(Field.nullable("a", new ArrowType.Int(32, true))));
Expand All @@ -404,6 +468,8 @@ public void doExchange(CallContext context, FlightStream reader, ServerStreamLis
transform(context, reader, writer);
} else if (Arrays.equals(reader.getDescriptor().getCommand(), EXCHANGE_CANCEL)) {
cancel(context, reader, writer);
} else if (Arrays.equals(reader.getDescriptor().getCommand(), EXCHANGE_ERROR)) {
error(context, reader, writer);
} else {
writer.error(CallStatus.UNIMPLEMENTED.withDescription("Command not implemented").toRuntimeException());
}
Expand Down Expand Up @@ -534,5 +600,30 @@ private void transform(CallContext context, FlightStream reader, ServerStreamLis
private void cancel(CallContext context, FlightStream reader, ServerStreamListener writer) {
writer.error(CallStatus.CANCELLED.withDescription("expected").toRuntimeException());
}

private void error(CallContext context, FlightStream reader, ServerStreamListener writer) {
VectorSchemaRoot root = null;
VectorLoader loader = null;
while (reader.next()) {

if (root == null) {
root = VectorSchemaRoot.create(reader.getSchema(), allocator);
loader = new VectorLoader(root);
writer.start(root);
}
VectorUnloader unloader = new VectorUnloader(reader.getRoot());
try (final ArrowRecordBatch arb = unloader.getRecordBatch()) {
loader.load(arb);
}

writer.putNext();
}
if (root != null) {
root.close();
}

// An error occurs before completing the writer
writer.error(CallStatus.INTERNAL.withDescription("error completing exchange").toRuntimeException());
}
}
}

0 comments on commit 9afb870

Please sign in to comment.