Skip to content

Commit

Permalink
Merge pull request #199 from ydb-platform/issue_198
Browse files Browse the repository at this point in the history
Use offsets from StartPartitionSessionResponse in first message commit offsets
  • Loading branch information
pnv1 authored Nov 20, 2023
2 parents 7e091ee + 6bbd3ae commit 850a5ec
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,14 @@ public PartitionSession getSessionInfo() {
return sessionInfo;
}

public void setLastReadOffset(long lastReadOffset) {
this.lastReadOffset = lastReadOffset;
}

public void setLastCommittedOffset(long lastCommittedOffset) {
this.lastCommittedOffset = lastCommittedOffset;
}

public CompletableFuture<Void> addBatches(List<YdbTopic.StreamReadMessage.ReadResponse.Batch> batches) {
if (!isWorking.get()) {
return CompletableFuture.completedFuture(null);
Expand Down Expand Up @@ -202,7 +210,7 @@ public CompletableFuture<Void> commitOffsetRange(OffsetsRange rangeToCommit) {
// Bulk commit without result future
public void commitOffsetRanges(List<OffsetsRange> rangesToCommit) {
if (isWorking.get()) {
if (logger.isDebugEnabled()) {
if (logger.isInfoEnabled()) {
StringBuilder message = new StringBuilder("[").append(path)
.append("] Sending CommitRequest for partition session ").append(id)
.append(" (partition ").append(partitionId).append(") with offset ranges ");
Expand Down
29 changes: 19 additions & 10 deletions topic/src/main/java/tech/ydb/topic/read/impl/ReaderImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ private void sendReadRequest() {
.build());
}

private void sendStartPartitionSessionResponse(PartitionSession partitionSession,
private void sendStartPartitionSessionResponse(PartitionSessionImpl partitionSession,
StartPartitionSessionSettings startSettings) {
if (!isWorking.get()) {
logger.info("[{}] Need to send StartPartitionSessionResponse for partition session {} (partition {})," +
Expand All @@ -204,16 +204,23 @@ private void sendStartPartitionSessionResponse(PartitionSession partitionSession
YdbTopic.StreamReadMessage.StartPartitionSessionResponse.Builder responseBuilder =
YdbTopic.StreamReadMessage.StartPartitionSessionResponse.newBuilder()
.setPartitionSessionId(partitionSession.getId());
Long userDefinedReadOffset = null;
Long userDefinedCommitOffset = null;
if (startSettings != null) {
if (startSettings.getReadOffset() != null) {
responseBuilder.setReadOffset(startSettings.getReadOffset());
userDefinedReadOffset = startSettings.getReadOffset();
if (userDefinedReadOffset != null) {
responseBuilder.setReadOffset(userDefinedReadOffset);
partitionSession.setLastReadOffset(userDefinedReadOffset);
}
if (startSettings.getCommitOffset() != null) {
responseBuilder.setCommitOffset(startSettings.getCommitOffset());
userDefinedCommitOffset = startSettings.getCommitOffset();
if (userDefinedCommitOffset != null) {
responseBuilder.setCommitOffset(userDefinedCommitOffset);
partitionSession.setLastCommittedOffset(userDefinedCommitOffset);
}
}
logger.info("[{}] Sending StartPartitionSessionResponse for partition session {} (partition {})", fullId,
partitionSession.getId(), partitionSession.getPartitionId());
logger.info("[{}] Sending StartPartitionSessionResponse for partition session {} (partition {})" +
" with readOffset {} and commitOffset {}", fullId, partitionSession.getId(),
partitionSession.getPartitionId(), userDefinedReadOffset, userDefinedCommitOffset);
send(YdbTopic.StreamReadMessage.FromClient.newBuilder()
.setStartPartitionSessionResponse(responseBuilder.build())
.build());
Expand Down Expand Up @@ -302,8 +309,10 @@ private void onInitResponse(YdbTopic.StreamReadMessage.InitResponse response) {
private void onStartPartitionSessionRequest(YdbTopic.StreamReadMessage.StartPartitionSessionRequest request) {
long partitionSessionId = request.getPartitionSession().getPartitionSessionId();
long partitionId = request.getPartitionSession().getPartitionId();
logger.info("[{}] Received StartPartitionSessionRequest: partition session {} (partition {})", fullId,
partitionSessionId, partitionId);
logger.info("[{}] Received StartPartitionSessionRequest: partition session {} (partition {}) " +
"with committedOffset {} and partitionOffsets [{}-{})", fullId,
partitionSessionId, partitionId, request.getCommittedOffset(),
request.getPartitionOffsets().getStart(), request.getPartitionOffsets().getEnd());

PartitionSessionImpl partitionSession = PartitionSessionImpl.newBuilder()
.setId(partitionSessionId)
Expand All @@ -319,7 +328,7 @@ private void onStartPartitionSessionRequest(YdbTopic.StreamReadMessage.StartPart
partitionSessions.put(partitionSession.getId(), partitionSession);

handleStartPartitionSessionRequest(request, partitionSession.getSessionInfo(),
(settings) -> sendStartPartitionSessionResponse(partitionSession.getSessionInfo(), settings));
(settings) -> sendStartPartitionSessionResponse(partitionSession, settings));
}

protected void onStopPartitionSessionRequest(YdbTopic.StreamReadMessage.StopPartitionSessionRequest request) {
Expand Down

0 comments on commit 850a5ec

Please sign in to comment.