Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add topic message DeferredCommitter, remove redundant data references #189

Merged
merged 7 commits into from
Oct 26, 2023
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions topic/src/main/java/tech/ydb/topic/read/Committer.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package tech.ydb.topic.read;

import java.util.concurrent.CompletableFuture;

/**
* A helper class that is used to call deferred commits
* Contains no data references and therefore may be useful in cases where commit() is called after processing data in
* an external system
*
* @author Nikolay Perfilov
*/
public interface Committer {
/**
* Commits offsets associated with this committer
* If there was an error while committing, there is no point of retrying committing the same message(s):
* the whole PartitionSession should be shut down by that time. And if commit hadn't reached the server,
* it will resend all these messages in next PartitionSession.
* @return CompletableFuture that will be completed when commit confirmation from server will be received
*/
CompletableFuture<Void> commit();
}
25 changes: 25 additions & 0 deletions topic/src/main/java/tech/ydb/topic/read/DeferredCommitter.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package tech.ydb.topic.read;

/**
* A helper class that is used to call deferred commits.
* Several {@link Message}s or/and {@link tech.ydb.topic.read.events.DataReceivedEvent}s can be accepted to commit later
* all at once.
* Contains no data references and therefore may also be useful in cases where commit() is called after processing data
* in an external system.
*
* @author Nikolay Perfilov
*/
public interface DeferredCommitter {
/**
* Adds a {@link Message} to commit it later with a commit method
*
* @param message a {@link Message} to commit later
*/
void add(Message message);

/**
* Commits offset ranges from all {@link Message}s and {@link tech.ydb.topic.read.events.DataReceivedEvent}s
* that were added to this DeferredCommitter since last commit
*/
void commit();
}
15 changes: 14 additions & 1 deletion topic/src/main/java/tech/ydb/topic/read/Message.java
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,22 @@ public interface Message {
PartitionSession getPartitionSession();

/**
* Commit this message
* Commits this message
* If there was an error while committing, there is no point of retrying committing the same message:
* the whole PartitionSession should be shut down by that time. And if commit hadn't reached the server,
* it will resend all these messages in next PartitionSession.
*
* @return CompletableFuture that will be completed when commit confirmation from server will be received
*/
CompletableFuture<Void> commit();

/**
* Returns a Committer object to call commit() on later.
* This object has no data references and therefore may be useful in cases where commit() is called after
* processing data in an external system
*
* @return a Committer object
*/
Committer getCommitter();

}
10 changes: 10 additions & 0 deletions topic/src/main/java/tech/ydb/topic/read/OffsetsRange.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package tech.ydb.topic.read;

/**
* @author Nikolay Perfilov
*/
public interface OffsetsRange {
long getStart();

long getEnd();
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import java.util.List;
import java.util.concurrent.CompletableFuture;

import tech.ydb.topic.read.Committer;
import tech.ydb.topic.read.Message;
import tech.ydb.topic.read.PartitionSession;

Expand All @@ -11,10 +12,38 @@
*/
public interface DataReceivedEvent {

/**
* Returns a list of messages grouped in one batch.
* Each message can be committed individually or all messages can be committed at once with commit() method
*
* @return a list of messages
*/
List<Message> getMessages();

/**
* Returns a partition session this data was received on
*
* @return a partition session this data was received on
*/
PartitionSession getPartitionSession();

/**
* Commits all messages in this event at once.
* If there was an error while committing, there is no point of retrying committing the same messages:
* the whole PartitionSession should be shut down by that time. And if commit hadn't reached the server,
* it will resend all these messages in next PartitionSession.
*
* @return a CompletableFuture that will be completed when commit confirmation from server will be received
*/
CompletableFuture<Void> commit();

/**
* Returns a Committer object to call commit() on later.
* This object has no data references and therefore may be useful in cases where commit() is called after
* processing data in an external system
*
* @return a Committer object
*/
Committer getCommitter();

}
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package tech.ydb.topic.read.events;

import tech.ydb.topic.read.OffsetsRange;
import tech.ydb.topic.read.PartitionSession;
import tech.ydb.topic.read.impl.OffsetsRange;
import tech.ydb.topic.settings.StartPartitionSessionSettings;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ protected void handleStartPartitionSessionRequest(YdbTopic.StreamReadMessage.Sta
StartPartitionSessionEvent event = new StartPartitionSessionEventImpl(
partitionSession,
request.getCommittedOffset(),
new OffsetsRange(offsetsRange.getStart(), offsetsRange.getEnd()),
new OffsetsRangeImpl(offsetsRange.getStart(), offsetsRange.getEnd()),
confirmCallback
);
eventHandler.onStartPartitionSession(event);
Expand Down
40 changes: 40 additions & 0 deletions topic/src/main/java/tech/ydb/topic/read/impl/CommitterImpl.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package tech.ydb.topic.read.impl;

import java.util.concurrent.CompletableFuture;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import tech.ydb.topic.read.Committer;
import tech.ydb.topic.read.OffsetsRange;

/**
* @author Nikolay Perfilov
*/
public class CommitterImpl implements Committer {
private static final Logger logger = LoggerFactory.getLogger(CommitterImpl.class);
private final PartitionSessionImpl partitionSession;
private final int messageCount;
private final OffsetsRange offsetsToCommit;

public CommitterImpl(PartitionSessionImpl partitionSession, int messageCount, OffsetsRange offsetsToCommit) {
this.partitionSession = partitionSession;
this.messageCount = messageCount;
this.offsetsToCommit = offsetsToCommit;
}

@Override
public CompletableFuture<Void> commit() {
return commitImpl(true);
}

public CompletableFuture<Void> commitImpl(boolean fromCommitter) {
if (logger.isDebugEnabled()) {
logger.debug("[{}] partition session {} (partition {}): committing {} message(s), offsets" +
" [{},{})" + (fromCommitter ? " from Committer" : ""), partitionSession.getPath(),
partitionSession.getId(), partitionSession.getPartitionId(), messageCount,
offsetsToCommit.getStart(), offsetsToCommit.getEnd());
}
return partitionSession.commitOffsetRange(offsetsToCommit);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package tech.ydb.topic.read.impl;

import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import tech.ydb.topic.read.DeferredCommitter;
import tech.ydb.topic.read.Message;
import tech.ydb.topic.read.OffsetsRange;

/**
* @author Nikolay Perfilov
*/
public class DeferredCommitterImpl implements DeferredCommitter {
private static final Logger logger = LoggerFactory.getLogger(DeferredCommitterImpl.class);

private final Map<PartitionSessionImpl, PartitionRanges> rangesByPartition = new ConcurrentHashMap<>();

private static class PartitionRanges {
private final PartitionSessionImpl partitionSession;
private final DisjointOffsetRangeSet ranges = new DisjointOffsetRangeSet();

private PartitionRanges(PartitionSessionImpl partitionSession) {
this.partitionSession = partitionSession;
}

private void add(MessageImpl message) {
try {
synchronized (ranges) {
ranges.add(message.getOffsetsToCommit());
}
} catch (RuntimeException exception) {
String errorMessage = "Error adding new offset range to DeferredCommitter for partition session " +
partitionSession.getId() + " (partition " + partitionSession.getPartitionId() + "): " +
exception.getMessage();
logger.error(errorMessage);
throw new RuntimeException(errorMessage, exception);
}
}

private void commit() {
List<OffsetsRange> rangesToCommit;
synchronized (ranges) {
rangesToCommit = ranges.getRangesAndClear();
}
partitionSession.commitOffsetRanges(rangesToCommit);
}
}

@Override
public void add(Message message) {
MessageImpl messageImpl = (MessageImpl) message;
PartitionRanges partitionRanges = rangesByPartition
.computeIfAbsent(messageImpl.getPartitionSessionImpl(), PartitionRanges::new);
partitionRanges.add(messageImpl);
}

@Override
public void commit() {
rangesByPartition.forEach((session, partitionRanges) -> {
partitionRanges.commit();
});
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package tech.ydb.topic.read.impl;

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;

import tech.ydb.topic.read.OffsetsRange;

/**
* @author Nikolay Perfilov
*/
public class DisjointOffsetRangeSet {
private final NavigableMap<Long, OffsetsRangeImpl> ranges = new ConcurrentSkipListMap<>();

public void add(OffsetsRange rangeToCommit) {
Map.Entry<Long, OffsetsRangeImpl> floorEntry = ranges.floorEntry(rangeToCommit.getStart());
if (floorEntry != null && floorEntry.getValue().getEnd() > rangeToCommit.getStart()) {
throwClashesException(floorEntry.getValue(), rangeToCommit);
}
Map.Entry<Long, OffsetsRangeImpl> ceilingEntry = ranges.ceilingEntry(rangeToCommit.getStart());
if (ceilingEntry != null && rangeToCommit.getEnd() > ceilingEntry.getValue().getStart()) {
throwClashesException(ceilingEntry.getValue(), rangeToCommit);
}
boolean mergedFloor = false;
if (floorEntry != null && floorEntry.getValue().getEnd() == rangeToCommit.getStart()) {
floorEntry.getValue().setEnd(rangeToCommit.getEnd());
mergedFloor = true;
}
if (ceilingEntry != null) {
OffsetsRangeImpl ceilingValue = ceilingEntry.getValue();
if (rangeToCommit.getEnd() == ceilingValue.getStart()) {
ranges.remove(ceilingEntry.getKey());
if (mergedFloor) {
floorEntry.getValue().setEnd(ceilingValue.getEnd());
} else {
ceilingValue.setStart(rangeToCommit.getStart());
ranges.put(rangeToCommit.getStart(), ceilingValue);
}
return;
}
}
if (!mergedFloor) {
ranges.put(rangeToCommit.getStart(), new OffsetsRangeImpl(rangeToCommit));
}
}

public List<OffsetsRange> getRangesAndClear() {
Collection<OffsetsRangeImpl> values = ranges.values();
List<OffsetsRange> result = new ArrayList<>(values);
values.clear();
return result;
}

private void throwClashesException(OffsetsRangeImpl existingRange, OffsetsRange newRange) {
String errMessage = "Error adding new offset range. Added range [" +
newRange.getStart() + "," + newRange.getEnd() + ") clashes with existing range [" +
existingRange.getStart() + "," + existingRange.getEnd() + ")";
throw new RuntimeException(errMessage);
}
}
Loading