diff --git a/topic/src/main/java/tech/ydb/topic/read/impl/ReaderImpl.java b/topic/src/main/java/tech/ydb/topic/read/impl/ReaderImpl.java index 808d0d83b..c8169a541 100644 --- a/topic/src/main/java/tech/ydb/topic/read/impl/ReaderImpl.java +++ b/topic/src/main/java/tech/ydb/topic/read/impl/ReaderImpl.java @@ -74,7 +74,11 @@ public ReaderImpl(TopicRpc topicRpc, ReaderSettings settings) { } message.append("\"").append(topic.getPath()).append("\""); } - message.append(" and Consumer: \"").append(settings.getConsumerName()).append("\""); + if (settings.getConsumerName() != null) { + message.append(" and Consumer: \"").append(settings.getConsumerName()).append("\""); + } else { + message.append(" without a Consumer"); + } logger.info(message.toString()); } @@ -144,8 +148,10 @@ public void startAndInitialize() { start(this::processMessage).whenComplete(this::onSessionClosing); YdbTopic.StreamReadMessage.InitRequest.Builder initRequestBuilder = YdbTopic.StreamReadMessage.InitRequest - .newBuilder() - .setConsumer(settings.getConsumerName()); + .newBuilder(); + if (settings.getConsumerName() != null) { + initRequestBuilder.setConsumer(settings.getConsumerName()); + } settings.getTopics().forEach(topicReadSettings -> { YdbTopic.StreamReadMessage.InitRequest.TopicReadSettings.Builder settingsBuilder = YdbTopic.StreamReadMessage.InitRequest.TopicReadSettings.newBuilder() diff --git a/topic/src/main/java/tech/ydb/topic/settings/ReaderSettings.java b/topic/src/main/java/tech/ydb/topic/settings/ReaderSettings.java index c272948f9..49120b656 100644 --- a/topic/src/main/java/tech/ydb/topic/settings/ReaderSettings.java +++ b/topic/src/main/java/tech/ydb/topic/settings/ReaderSettings.java @@ -78,6 +78,7 @@ public static Builder newBuilder() { */ public static class Builder { private String consumerName = null; + private boolean readWithoutConsumer = false; private String readerName = null; private List topics = new ArrayList<>(); private long maxMemoryUsageBytes = MAX_MEMORY_USAGE_BYTES_DEFAULT; @@ -91,6 +92,11 @@ public Builder setConsumerName(String consumerName) { return this; } + public Builder withoutConsumer() { + this.readWithoutConsumer = true; + return this; + } + // Not supported in API yet public Builder setReaderName(String readerName) { this.readerName = readerName; @@ -145,7 +151,14 @@ public Builder setDecompressionExecutor(Executor decompressionExecutor) { public ReaderSettings build() { if (consumerName == null) { - throw new IllegalArgumentException("Missing consumer name for read settings"); + if (!readWithoutConsumer) { + throw new IllegalArgumentException("Missing consumer name for read settings"); + } + } else { + if (readWithoutConsumer) { + throw new IllegalArgumentException("Both mutually exclusive options consumerName and " + + "withoutConsumer are set for read settings"); + } } if (topics.isEmpty()) { throw new IllegalArgumentException("Missing topics for read settings. At least one should be set");