Skip to content

Commit

Permalink
Merge pull request #203 from ydb-platform/release_v2.1.9
Browse files Browse the repository at this point in the history
Release v2.1.9
  • Loading branch information
pnv1 authored Dec 8, 2023
2 parents 26402f2 + 1a07245 commit b58f7ee
Show file tree
Hide file tree
Showing 21 changed files with 67 additions and 37 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
## 2.1.9 ##

* Topics: Fixed a bug where first commit was not getting commitResponse if a user had sent a custom StartPartitionSessionResponse
* Topics: Enhanced writer logging

## 2.1.8 ##

* Topics: Added DeferredCommitter class to group several read commits into one or just defer each commit without holding data reference
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ Firstly you can import YDB Java BOM to specify correct versions of SDK modules.
<dependency>
<groupId>tech.ydb</groupId>
<artifactId>ydb-sdk-bom</artifactId>
<version>2.1.8</version>
<version>2.1.9</version>
<type>pom</type>
<scope>import</scope>
</dependency>
Expand Down
2 changes: 1 addition & 1 deletion bom/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
<modelVersion>4.0.0</modelVersion>

<groupId>tech.ydb</groupId>
<version>2.1.8</version>
<version>2.1.9</version>
<artifactId>ydb-sdk-bom</artifactId>
<name>Java SDK Bill of Materials</name>
<description>Java SDK Bill of Materials (BOM)</description>
Expand Down
2 changes: 1 addition & 1 deletion coordination/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
<parent>
<groupId>tech.ydb</groupId>
<artifactId>ydb-sdk-parent</artifactId>
<version>2.1.8</version>
<version>2.1.9</version>
</parent>

<artifactId>ydb-sdk-coordination</artifactId>
Expand Down
2 changes: 1 addition & 1 deletion core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
<parent>
<groupId>tech.ydb</groupId>
<artifactId>ydb-sdk-parent</artifactId>
<version>2.1.8</version>
<version>2.1.9</version>
</parent>

<artifactId>ydb-sdk-core</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ public abstract class BaseGrpcTransport implements GrpcTransport {
protected volatile boolean shutdown = false;

abstract AuthCallOptions getAuthCallOptions();
abstract GrpcChannel getChannel(GrpcRequestSettings settings);
protected abstract GrpcChannel getChannel(GrpcRequestSettings settings);
abstract void updateChannelStatus(GrpcChannel channel, io.grpc.Status status);

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ public AuthCallOptions getAuthCallOptions() {
}

@Override
GrpcChannel getChannel(GrpcRequestSettings settings) {
protected GrpcChannel getChannel(GrpcRequestSettings settings) {
return channel;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ public AuthCallOptions getAuthCallOptions() {
}

@Override
GrpcChannel getChannel(GrpcRequestSettings settings) {
protected GrpcChannel getChannel(GrpcRequestSettings settings) {
EndpointRecord endpoint = endpointPool.getEndpoint(null);
return channelPool.getChannel(endpoint);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ public AuthCallOptions getAuthCallOptions() {
}

@Override
GrpcChannel getChannel(GrpcRequestSettings settings) {
protected GrpcChannel getChannel(GrpcRequestSettings settings) {
return channel;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ public AuthCallOptions getAuthCallOptions() {
}

@Override
GrpcChannel getChannel(GrpcRequestSettings settings) {
protected GrpcChannel getChannel(GrpcRequestSettings settings) {
EndpointRecord endpoint = endpointPool.getEndpoint(settings.getPreferredNodeID());
return channelPool.getChannel(endpoint);
}
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/resources/version.properties
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version=2.1.8
version=2.1.9
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

<groupId>tech.ydb</groupId>
<artifactId>ydb-sdk-parent</artifactId>
<version>2.1.8</version>
<version>2.1.9</version>

<name>Java SDK for YDB</name>
<description>Java SDK for YDB</description>
Expand Down
2 changes: 1 addition & 1 deletion scheme/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
<parent>
<groupId>tech.ydb</groupId>
<artifactId>ydb-sdk-parent</artifactId>
<version>2.1.8</version>
<version>2.1.9</version>
</parent>

<artifactId>ydb-sdk-scheme</artifactId>
Expand Down
2 changes: 1 addition & 1 deletion table/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
<parent>
<groupId>tech.ydb</groupId>
<artifactId>ydb-sdk-parent</artifactId>
<version>2.1.8</version>
<version>2.1.9</version>
</parent>

<artifactId>ydb-sdk-table</artifactId>
Expand Down
2 changes: 1 addition & 1 deletion tests/common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
<parent>
<groupId>tech.ydb</groupId>
<artifactId>ydb-sdk-parent</artifactId>
<version>2.1.8</version>
<version>2.1.9</version>
<relativePath>../../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion tests/junit4-support/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
<parent>
<groupId>tech.ydb</groupId>
<artifactId>ydb-sdk-parent</artifactId>
<version>2.1.8</version>
<version>2.1.9</version>
<relativePath>../../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion tests/junit5-support/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
<parent>
<groupId>tech.ydb</groupId>
<artifactId>ydb-sdk-parent</artifactId>
<version>2.1.8</version>
<version>2.1.9</version>
<relativePath>../../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion topic/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
<parent>
<groupId>tech.ydb</groupId>
<artifactId>ydb-sdk-parent</artifactId>
<version>2.1.8</version>
<version>2.1.9</version>
</parent>

<artifactId>ydb-sdk-topic</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,14 @@ public PartitionSession getSessionInfo() {
return sessionInfo;
}

public void setLastReadOffset(long lastReadOffset) {
this.lastReadOffset = lastReadOffset;
}

public void setLastCommittedOffset(long lastCommittedOffset) {
this.lastCommittedOffset = lastCommittedOffset;
}

public CompletableFuture<Void> addBatches(List<YdbTopic.StreamReadMessage.ReadResponse.Batch> batches) {
if (!isWorking.get()) {
return CompletableFuture.completedFuture(null);
Expand Down Expand Up @@ -202,7 +210,7 @@ public CompletableFuture<Void> commitOffsetRange(OffsetsRange rangeToCommit) {
// Bulk commit without result future
public void commitOffsetRanges(List<OffsetsRange> rangesToCommit) {
if (isWorking.get()) {
if (logger.isDebugEnabled()) {
if (logger.isInfoEnabled()) {
StringBuilder message = new StringBuilder("[").append(path)
.append("] Sending CommitRequest for partition session ").append(id)
.append(" (partition ").append(partitionId).append(") with offset ranges ");
Expand Down
29 changes: 19 additions & 10 deletions topic/src/main/java/tech/ydb/topic/read/impl/ReaderImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ private void sendReadRequest() {
.build());
}

private void sendStartPartitionSessionResponse(PartitionSession partitionSession,
private void sendStartPartitionSessionResponse(PartitionSessionImpl partitionSession,
StartPartitionSessionSettings startSettings) {
if (!isWorking.get()) {
logger.info("[{}] Need to send StartPartitionSessionResponse for partition session {} (partition {})," +
Expand All @@ -204,16 +204,23 @@ private void sendStartPartitionSessionResponse(PartitionSession partitionSession
YdbTopic.StreamReadMessage.StartPartitionSessionResponse.Builder responseBuilder =
YdbTopic.StreamReadMessage.StartPartitionSessionResponse.newBuilder()
.setPartitionSessionId(partitionSession.getId());
Long userDefinedReadOffset = null;
Long userDefinedCommitOffset = null;
if (startSettings != null) {
if (startSettings.getReadOffset() != null) {
responseBuilder.setReadOffset(startSettings.getReadOffset());
userDefinedReadOffset = startSettings.getReadOffset();
if (userDefinedReadOffset != null) {
responseBuilder.setReadOffset(userDefinedReadOffset);
partitionSession.setLastReadOffset(userDefinedReadOffset);
}
if (startSettings.getCommitOffset() != null) {
responseBuilder.setCommitOffset(startSettings.getCommitOffset());
userDefinedCommitOffset = startSettings.getCommitOffset();
if (userDefinedCommitOffset != null) {
responseBuilder.setCommitOffset(userDefinedCommitOffset);
partitionSession.setLastCommittedOffset(userDefinedCommitOffset);
}
}
logger.info("[{}] Sending StartPartitionSessionResponse for partition session {} (partition {})", fullId,
partitionSession.getId(), partitionSession.getPartitionId());
logger.info("[{}] Sending StartPartitionSessionResponse for partition session {} (partition {})" +
" with readOffset {} and commitOffset {}", fullId, partitionSession.getId(),
partitionSession.getPartitionId(), userDefinedReadOffset, userDefinedCommitOffset);
send(YdbTopic.StreamReadMessage.FromClient.newBuilder()
.setStartPartitionSessionResponse(responseBuilder.build())
.build());
Expand Down Expand Up @@ -302,8 +309,10 @@ private void onInitResponse(YdbTopic.StreamReadMessage.InitResponse response) {
private void onStartPartitionSessionRequest(YdbTopic.StreamReadMessage.StartPartitionSessionRequest request) {
long partitionSessionId = request.getPartitionSession().getPartitionSessionId();
long partitionId = request.getPartitionSession().getPartitionId();
logger.info("[{}] Received StartPartitionSessionRequest: partition session {} (partition {})", fullId,
partitionSessionId, partitionId);
logger.info("[{}] Received StartPartitionSessionRequest: partition session {} (partition {}) " +
"with committedOffset {} and partitionOffsets [{}-{})", fullId,
partitionSessionId, partitionId, request.getCommittedOffset(),
request.getPartitionOffsets().getStart(), request.getPartitionOffsets().getEnd());

PartitionSessionImpl partitionSession = PartitionSessionImpl.newBuilder()
.setId(partitionSessionId)
Expand All @@ -319,7 +328,7 @@ private void onStartPartitionSessionRequest(YdbTopic.StreamReadMessage.StartPart
partitionSessions.put(partitionSession.getId(), partitionSession);

handleStartPartitionSessionRequest(request, partitionSession.getSessionInfo(),
(settings) -> sendStartPartitionSessionResponse(partitionSession.getSessionInfo(), settings));
(settings) -> sendStartPartitionSessionResponse(partitionSession, settings));
}

protected void onStopPartitionSessionRequest(YdbTopic.StreamReadMessage.StopPartitionSessionRequest request) {
Expand Down
26 changes: 17 additions & 9 deletions topic/src/main/java/tech/ydb/topic/write/impl/WriterImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -112,15 +112,19 @@ public CompletableFuture<Void> tryToEnqueue(EnqueuedMessage message, boolean ins
}
} else if (availableSizeBytes <= message.getMessage().getData().length) {
if (instant) {
logger.info("[{}] Rejecting a message due to reaching message queue size limit of {} bytes", id,
settings.getMaxSendBufferMemorySize());
String errorMessage = "[" + id +
"] Rejecting a message due to reaching message queue size limit of " +
settings.getMaxSendBufferMemorySize() + " bytes. Buffer currently has " +
currentInFlightCount + " messages with " + availableSizeBytes +
" bytes available";
logger.info(errorMessage);
CompletableFuture<Void> result = new CompletableFuture<>();
result.completeExceptionally(new QueueOverflowException("Message queue size limit of "
+ settings.getMaxSendBufferMemorySize() + " bytes reached"));
result.completeExceptionally(new QueueOverflowException(errorMessage));
return result;
} else {
logger.info("[{}] Message queue size limit of {} bytes reached. Putting the message into incoming" +
" waiting queue", id, settings.getMaxSendBufferMemorySize());
" waiting queue. Buffer currently has {} messages with {} bytes available", id,
settings.getMaxSendBufferMemorySize(), currentInFlightCount, availableSizeBytes);
}
} else if (incomingQueue.isEmpty()) {
acceptMessageIntoSendingQueue(message);
Expand All @@ -138,8 +142,8 @@ private void acceptMessageIntoSendingQueue(EnqueuedMessage message) {
this.lastAcceptedMessageFuture = message.getFuture();
this.currentInFlightCount++;
this.availableSizeBytes -= message.getUncompressedSizeBytes();
if (logger.isTraceEnabled()) {
logger.trace("[{}] Accepted 1 message of {} uncompressed bytes. Current In-flight: {}, " +
if (logger.isDebugEnabled()) {
logger.debug("[{}] Accepted 1 message of {} uncompressed bytes. Current In-flight: {}, " +
"AvailableSizeBytes: {} ({} / {} acquired)", id, message.getUncompressedSizeBytes(),
currentInFlightCount, availableSizeBytes, maxSendBufferMemorySize - availableSizeBytes,
maxSendBufferMemorySize);
Expand Down Expand Up @@ -408,8 +412,10 @@ private void onInitResponse(YdbTopic.StreamWriteMessage.InitResponse response) {
sendDataRequestIfNeeded();
}

// Shouldn't be called more than once at a time due to grpc guarantees
private void onWriteResponse(YdbTopic.StreamWriteMessage.WriteResponse response) {
List<YdbTopic.StreamWriteMessage.WriteResponse.WriteAck> acks = response.getAcksList();
logger.debug("[{}] Received WriteResponse with {} WriteAcks", fullId, acks.size());
int inFlightFreed = 0;
long bytesFreed = 0;
for (YdbTopic.StreamWriteMessage.WriteResponse.WriteAck ack : acks) {
Expand All @@ -419,10 +425,10 @@ private void onWriteResponse(YdbTopic.StreamWriteMessage.WriteResponse response)
break;
}
if (sentMessage.getSeqNo() == ack.getSeqNo()) {
processWriteAck(sentMessage, ack);
inFlightFreed++;
bytesFreed += sentMessage.getSizeBytes();
sentMessages.remove();
processWriteAck(sentMessage, ack);
break;
}
if (sentMessage.getSeqNo() < ack.getSeqNo()) {
Expand All @@ -436,7 +442,7 @@ private void onWriteResponse(YdbTopic.StreamWriteMessage.WriteResponse response)
sentMessages.remove();
// Checking next message waiting for ack
} else {
logger.info("[{}] Received an ack with seqNo {} which is older than the oldest message with " +
logger.warn("[{}] Received an ack with seqNo {} which is older than the oldest message with " +
"seqNo {} waiting for ack", fullId, ack.getSeqNo(), sentMessage.getSeqNo());
break;
}
Expand Down Expand Up @@ -465,6 +471,8 @@ private void processMessage(YdbTopic.StreamWriteMessage.FromServer message) {

private void processWriteAck(EnqueuedMessage message,
YdbTopic.StreamWriteMessage.WriteResponse.WriteAck ack) {
logger.debug("[{}] Received WriteAck with seqNo {} and status {}", fullId, ack.getSeqNo(),
ack.getMessageWriteStatusCase());
WriteAck resultAck;
switch (ack.getMessageWriteStatusCase()) {
case WRITTEN:
Expand Down

0 comments on commit b58f7ee

Please sign in to comment.