Skip to content

Commit

Permalink
Add support for reading without a consumer
Browse files Browse the repository at this point in the history
  • Loading branch information
pnv1 committed Jan 9, 2024
1 parent 0e20b2a commit ace33b8
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 4 deletions.
12 changes: 9 additions & 3 deletions topic/src/main/java/tech/ydb/topic/read/impl/ReaderImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,11 @@ public ReaderImpl(TopicRpc topicRpc, ReaderSettings settings) {
}
message.append("\"").append(topic.getPath()).append("\"");
}
message.append(" and Consumer: \"").append(settings.getConsumerName()).append("\"");
if (settings.getConsumerName() != null) {
message.append(" and Consumer: \"").append(settings.getConsumerName()).append("\"");
} else {
message.append(" without a Consumer");
}
logger.info(message.toString());
}

Expand Down Expand Up @@ -144,8 +148,10 @@ public void startAndInitialize() {
start(this::processMessage).whenComplete(this::onSessionClosing);

YdbTopic.StreamReadMessage.InitRequest.Builder initRequestBuilder = YdbTopic.StreamReadMessage.InitRequest
.newBuilder()
.setConsumer(settings.getConsumerName());
.newBuilder();
if (settings.getConsumerName() != null) {
initRequestBuilder.setConsumer(settings.getConsumerName());
}
settings.getTopics().forEach(topicReadSettings -> {
YdbTopic.StreamReadMessage.InitRequest.TopicReadSettings.Builder settingsBuilder =
YdbTopic.StreamReadMessage.InitRequest.TopicReadSettings.newBuilder()
Expand Down
15 changes: 14 additions & 1 deletion topic/src/main/java/tech/ydb/topic/settings/ReaderSettings.java
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ public static Builder newBuilder() {
*/
public static class Builder {
private String consumerName = null;
private boolean readWithoutConsumer = false;
private String readerName = null;
private List<TopicReadSettings> topics = new ArrayList<>();
private long maxMemoryUsageBytes = MAX_MEMORY_USAGE_BYTES_DEFAULT;
Expand All @@ -91,6 +92,11 @@ public Builder setConsumerName(String consumerName) {
return this;
}

public Builder withoutConsumer() {
this.readWithoutConsumer = true;
return this;
}

// Not supported in API yet
public Builder setReaderName(String readerName) {
this.readerName = readerName;
Expand Down Expand Up @@ -145,7 +151,14 @@ public Builder setDecompressionExecutor(Executor decompressionExecutor) {

public ReaderSettings build() {
if (consumerName == null) {
throw new IllegalArgumentException("Missing consumer name for read settings");
if (!readWithoutConsumer) {
throw new IllegalArgumentException("Missing consumer name for read settings");
}
} else {
if (readWithoutConsumer) {
throw new IllegalArgumentException("Both mutually exclusive options consumerName and " +
"withoutConsumer are set for read settings");
}
}
if (topics.isEmpty()) {
throw new IllegalArgumentException("Missing topics for read settings. At least one should be set");
Expand Down

0 comments on commit ace33b8

Please sign in to comment.