Skip to content

Commit

Permalink
Merge pull request #211 from ydb-platform/read_without_consumer
Browse files Browse the repository at this point in the history
Add support for reading without a consumer
  • Loading branch information
pnv1 authored Jan 10, 2024
2 parents 9d6758a + ff0a42e commit 45472d5
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 4 deletions.
12 changes: 9 additions & 3 deletions topic/src/main/java/tech/ydb/topic/read/impl/ReaderImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,11 @@ public ReaderImpl(TopicRpc topicRpc, ReaderSettings settings) {
}
message.append("\"").append(topic.getPath()).append("\"");
}
message.append(" and Consumer: \"").append(settings.getConsumerName()).append("\"");
if (settings.getConsumerName() != null) {
message.append(" and Consumer: \"").append(settings.getConsumerName()).append("\"");
} else {
message.append(" without a Consumer");
}
logger.info(message.toString());
}

Expand Down Expand Up @@ -144,8 +148,10 @@ public void startAndInitialize() {
start(this::processMessage).whenComplete(this::onSessionClosing);

YdbTopic.StreamReadMessage.InitRequest.Builder initRequestBuilder = YdbTopic.StreamReadMessage.InitRequest
.newBuilder()
.setConsumer(settings.getConsumerName());
.newBuilder();
if (settings.getConsumerName() != null) {
initRequestBuilder.setConsumer(settings.getConsumerName());
}
settings.getTopics().forEach(topicReadSettings -> {
YdbTopic.StreamReadMessage.InitRequest.TopicReadSettings.Builder settingsBuilder =
YdbTopic.StreamReadMessage.InitRequest.TopicReadSettings.newBuilder()
Expand Down
21 changes: 20 additions & 1 deletion topic/src/main/java/tech/ydb/topic/settings/ReaderSettings.java
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ public static Builder newBuilder() {
*/
public static class Builder {
private String consumerName = null;
private boolean readWithoutConsumer = false;
private String readerName = null;
private List<TopicReadSettings> topics = new ArrayList<>();
private long maxMemoryUsageBytes = MAX_MEMORY_USAGE_BYTES_DEFAULT;
Expand All @@ -91,6 +92,16 @@ public Builder setConsumerName(String consumerName) {
return this;
}

/**
* Experimental feature. Interface may change in future
* Explicitly require reading without a consumer. Reading progress will not be saved on server this way.
* @return settings builder
*/
public Builder withoutConsumer() {
this.readWithoutConsumer = true;
return this;
}

// Not supported in API yet
public Builder setReaderName(String readerName) {
this.readerName = readerName;
Expand Down Expand Up @@ -145,7 +156,15 @@ public Builder setDecompressionExecutor(Executor decompressionExecutor) {

public ReaderSettings build() {
if (consumerName == null) {
throw new IllegalArgumentException("Missing consumer name for read settings");
if (!readWithoutConsumer) {
throw new IllegalArgumentException("Missing consumer name for read settings. " +
"Use withoutConsumer option explicitly if you want to read without a consumer");
}
} else {
if (readWithoutConsumer) {
throw new IllegalArgumentException("Both mutually exclusive options consumerName and " +
"withoutConsumer are set for read settings");
}
}
if (topics.isEmpty()) {
throw new IllegalArgumentException("Missing topics for read settings. At least one should be set");
Expand Down

0 comments on commit 45472d5

Please sign in to comment.