From 7bfc71cf0cb27ebcb64d71e7bbc9f772b93a427b Mon Sep 17 00:00:00 2001 From: Nikolay Perfilov Date: Thu, 14 Dec 2023 12:54:58 +0300 Subject: [PATCH 1/2] Change log level for commit response from info to debug --- .../tech/ydb/topic/read/impl/PartitionSessionImpl.java | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/topic/src/main/java/tech/ydb/topic/read/impl/PartitionSessionImpl.java b/topic/src/main/java/tech/ydb/topic/read/impl/PartitionSessionImpl.java index 7199ecdd7..3d124c434 100644 --- a/topic/src/main/java/tech/ydb/topic/read/impl/PartitionSessionImpl.java +++ b/topic/src/main/java/tech/ydb/topic/read/impl/PartitionSessionImpl.java @@ -245,10 +245,12 @@ public void handleCommitResponse(long committedOffset) { return; } Map> futuresToComplete = commitFutures.headMap(committedOffset, true); - logger.info("[{}] Commit response received for partition session {} (partition {}). Committed offset: {}" + - ". Previous committed offset: {} (diff is {} message(s)). Completing {} commit futures", - path, id, partitionId, committedOffset, lastCommittedOffset, committedOffset - lastCommittedOffset, - futuresToComplete.size()); + if (logger.isDebugEnabled()) { + logger.debug("[{}] Commit response received for partition session {} (partition {}). Committed offset: {}" + + ". Previous committed offset: {} (diff is {} message(s)). Completing {} commit futures", + path, id, partitionId, committedOffset, lastCommittedOffset, committedOffset - lastCommittedOffset, + futuresToComplete.size()); + } lastCommittedOffset = committedOffset; futuresToComplete.values().forEach(future -> future.complete(null)); futuresToComplete.clear(); From 0d70af87ad923b86154cc56b3e33a315b4f60164 Mon Sep 17 00:00:00 2001 From: Nikolay Perfilov Date: Thu, 14 Dec 2023 15:15:03 +0300 Subject: [PATCH 2/2] Improve writer buffer overflow messaging and handling --- .../tech/ydb/topic/write/impl/WriterImpl.java | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/topic/src/main/java/tech/ydb/topic/write/impl/WriterImpl.java b/topic/src/main/java/tech/ydb/topic/write/impl/WriterImpl.java index a0651235d..6438f3e76 100644 --- a/topic/src/main/java/tech/ydb/topic/write/impl/WriterImpl.java +++ b/topic/src/main/java/tech/ydb/topic/write/impl/WriterImpl.java @@ -110,21 +110,22 @@ public CompletableFuture tryToEnqueue(EnqueuedMessage message, boolean ins logger.info("[{}] Message queue in-flight limit of {} reached. Putting the message into incoming " + "waiting queue", id, settings.getMaxSendBufferMessagesCount()); } - } else if (availableSizeBytes <= message.getMessage().getData().length) { + } else if (availableSizeBytes < message.getMessage().getData().length) { if (instant) { - String errorMessage = "[" + id + - "] Rejecting a message due to reaching message queue size limit of " + - settings.getMaxSendBufferMemorySize() + " bytes. Buffer currently has " + - currentInFlightCount + " messages with " + availableSizeBytes + + String errorMessage = "[" + id + "] Rejecting a message of " + + message.getMessage().getData().length + + " bytes: not enough space in message queue. Buffer currently has " + currentInFlightCount + + " messages with " + availableSizeBytes + " / " + settings.getMaxSendBufferMemorySize() + " bytes available"; logger.info(errorMessage); CompletableFuture result = new CompletableFuture<>(); result.completeExceptionally(new QueueOverflowException(errorMessage)); return result; } else { - logger.info("[{}] Message queue size limit of {} bytes reached. Putting the message into incoming" + - " waiting queue. Buffer currently has {} messages with {} bytes available", id, - settings.getMaxSendBufferMemorySize(), currentInFlightCount, availableSizeBytes); + logger.info("[{}] Can't accept a message of {} bytes into message queue. Buffer currently has " + + "{} messages with {} / {} bytes available. Putting the message into incoming " + + "waiting queue.", id, message.getMessage().getData().length, currentInFlightCount, + availableSizeBytes, settings.getMaxSendBufferMemorySize()); } } else if (incomingQueue.isEmpty()) { acceptMessageIntoSendingQueue(message);