diff --git a/topic/src/main/java/tech/ydb/topic/read/impl/AsyncReaderImpl.java b/topic/src/main/java/tech/ydb/topic/read/impl/AsyncReaderImpl.java index 5de376986..92aa68ac7 100644 --- a/topic/src/main/java/tech/ydb/topic/read/impl/AsyncReaderImpl.java +++ b/topic/src/main/java/tech/ydb/topic/read/impl/AsyncReaderImpl.java @@ -119,21 +119,20 @@ protected void handleClosePartitionSession(tech.ydb.topic.read.PartitionSession }); } - @Override - protected void handleCloseReader() { + protected void handleReaderClosed() { handlerExecutor.execute(() -> { eventHandler.onReaderClosed(new ReaderClosedEvent()); }); } @Override - protected CompletableFuture shutdownImpl() { - return super.shutdownImpl().whenComplete((res, th) -> { - if (defaultHandlerExecutorService != null) { - logger.debug("Shutting down default handler executor"); - defaultHandlerExecutorService.shutdown(); - } - }); + protected void onShutdown(String reason) { + super.onShutdown(reason); + handleReaderClosed(); + if (defaultHandlerExecutorService != null) { + logger.debug("Shutting down default handler executor"); + defaultHandlerExecutorService.shutdown(); + } } @Override diff --git a/topic/src/main/java/tech/ydb/topic/read/impl/ReaderImpl.java b/topic/src/main/java/tech/ydb/topic/read/impl/ReaderImpl.java index c8169a541..2fd65cf47 100644 --- a/topic/src/main/java/tech/ydb/topic/read/impl/ReaderImpl.java +++ b/topic/src/main/java/tech/ydb/topic/read/impl/ReaderImpl.java @@ -112,7 +112,6 @@ protected abstract void handleStopPartitionSession( YdbTopic.StreamReadMessage.StopPartitionSessionRequest request, @Nullable Long partitionId, Runnable confirmCallback); protected abstract void handleClosePartitionSession(PartitionSession partitionSession); - protected abstract void handleCloseReader(); @Override protected void onStreamReconnect() { diff --git a/topic/src/main/java/tech/ydb/topic/read/impl/SyncReaderImpl.java b/topic/src/main/java/tech/ydb/topic/read/impl/SyncReaderImpl.java index ed9492190..200190716 100644 --- a/topic/src/main/java/tech/ydb/topic/read/impl/SyncReaderImpl.java +++ b/topic/src/main/java/tech/ydb/topic/read/impl/SyncReaderImpl.java @@ -158,16 +158,6 @@ protected void handleClosePartitionSession(PartitionSession partitionSession) { logger.debug("ClosePartitionSession event received. Ignoring."); } - @Override - protected void handleCloseReader() { - logger.debug("CloseReader event received. Ignoring."); - } - - @Override - protected CompletableFuture shutdownImpl() { - return super.shutdownImpl(); - } - @Override public void shutdown() { shutdownImpl().join();