Skip to content

Commit

Permalink
Merge pull request #214 from ydb-platform/handle_reader_shutdown
Browse files Browse the repository at this point in the history
Call onReaderClosed when reader is shut down
  • Loading branch information
pnv1 authored Jan 11, 2024
2 parents 45472d5 + fdaf9a6 commit 07ddc30
Show file tree
Hide file tree
Showing 3 changed files with 8 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -119,21 +119,20 @@ protected void handleClosePartitionSession(tech.ydb.topic.read.PartitionSession
});
}

@Override
protected void handleCloseReader() {
protected void handleReaderClosed() {
handlerExecutor.execute(() -> {
eventHandler.onReaderClosed(new ReaderClosedEvent());
});
}

@Override
protected CompletableFuture<Void> shutdownImpl() {
return super.shutdownImpl().whenComplete((res, th) -> {
if (defaultHandlerExecutorService != null) {
logger.debug("Shutting down default handler executor");
defaultHandlerExecutorService.shutdown();
}
});
protected void onShutdown(String reason) {
super.onShutdown(reason);
handleReaderClosed();
if (defaultHandlerExecutorService != null) {
logger.debug("Shutting down default handler executor");
defaultHandlerExecutorService.shutdown();
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,6 @@ protected abstract void handleStopPartitionSession(
YdbTopic.StreamReadMessage.StopPartitionSessionRequest request, @Nullable Long partitionId,
Runnable confirmCallback);
protected abstract void handleClosePartitionSession(PartitionSession partitionSession);
protected abstract void handleCloseReader();

@Override
protected void onStreamReconnect() {
Expand Down
10 changes: 0 additions & 10 deletions topic/src/main/java/tech/ydb/topic/read/impl/SyncReaderImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -158,16 +158,6 @@ protected void handleClosePartitionSession(PartitionSession partitionSession) {
logger.debug("ClosePartitionSession event received. Ignoring.");
}

@Override
protected void handleCloseReader() {
logger.debug("CloseReader event received. Ignoring.");
}

@Override
protected CompletableFuture<Void> shutdownImpl() {
return super.shutdownImpl();
}

@Override
public void shutdown() {
shutdownImpl().join();
Expand Down

0 comments on commit 07ddc30

Please sign in to comment.