Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add topic message DeferredCommitter, remove redundant data references #189

Merged
merged 7 commits into from
Oct 26, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 44 additions & 0 deletions topic/src/main/java/tech/ydb/topic/read/DeferredCommitter.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package tech.ydb.topic.read;

import tech.ydb.topic.read.events.DataReceivedEvent;
import tech.ydb.topic.read.impl.DeferredCommitterImpl;

/**
* A helper class that is used to call deferred commits.
* Several {@link Message}s or/and {@link tech.ydb.topic.read.events.DataReceivedEvent}s can be accepted to commit later
* all at once.
* Contains no data references and therefore may also be useful in cases where commit() is called after processing data
* in an external system.
*
* @author Nikolay Perfilov
*/
public interface DeferredCommitter {
/**
* Creates a new instance of {@link DeferredCommitter}
*
* @return a new instance of {@link DeferredCommitter}
*/
static DeferredCommitter newInstance() {
return new DeferredCommitterImpl();
}

/**
* Adds a {@link Message} to commit it later with a commit method
*
* @param message a {@link Message} to commit later
*/
void add(Message message);

/**
* Adds a {@link DataReceivedEvent} to commit all its messages later with a commit method
*
* @param event a {@link DataReceivedEvent} to commit later
*/
void add(DataReceivedEvent event);

/**
* Commits offset ranges from all {@link Message}s and {@link DataReceivedEvent}s
* that were added to this DeferredCommitter since last commit
*/
void commit();
}
6 changes: 5 additions & 1 deletion topic/src/main/java/tech/ydb/topic/read/Message.java
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,11 @@ public interface Message {
PartitionSession getPartitionSession();

/**
* Commit this message
* Commits this message
* If there was an error while committing, there is no point of retrying committing the same message:
* the whole PartitionSession should be shut down by that time. And if commit hadn't reached the server,
* it will resend all these messages in next PartitionSession.
*
* @return CompletableFuture that will be completed when commit confirmation from server will be received
*/
CompletableFuture<Void> commit();
Expand Down
10 changes: 10 additions & 0 deletions topic/src/main/java/tech/ydb/topic/read/OffsetsRange.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package tech.ydb.topic.read;

/**
* @author Nikolay Perfilov
*/
public interface OffsetsRange {
long getStart();

long getEnd();
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,29 @@
*/
public interface DataReceivedEvent {

/**
* Returns a list of messages grouped in one batch.
* Each message can be committed individually or all messages can be committed at once with commit() method
*
* @return a list of messages
*/
List<Message> getMessages();

/**
* Returns a partition session this data was received on
*
* @return a partition session this data was received on
*/
PartitionSession getPartitionSession();

/**
* Commits all messages in this event at once.
* If there was an error while committing, there is no point of retrying committing the same messages:
* the whole PartitionSession should be shut down by that time. And if commit hadn't reached the server,
* it will resend all these messages in next PartitionSession.
*
* @return a CompletableFuture that will be completed when commit confirmation from server will be received
*/
CompletableFuture<Void> commit();

}
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package tech.ydb.topic.read.events;

import tech.ydb.topic.read.OffsetsRange;
import tech.ydb.topic.read.PartitionSession;
import tech.ydb.topic.read.impl.OffsetsRange;
import tech.ydb.topic.settings.StartPartitionSessionSettings;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ protected void handleStartPartitionSessionRequest(YdbTopic.StreamReadMessage.Sta
StartPartitionSessionEvent event = new StartPartitionSessionEventImpl(
partitionSession,
request.getCommittedOffset(),
new OffsetsRange(offsetsRange.getStart(), offsetsRange.getEnd()),
new OffsetsRangeImpl(offsetsRange.getStart(), offsetsRange.getEnd()),
confirmCallback
);
eventHandler.onStartPartitionSession(event);
Expand Down
39 changes: 39 additions & 0 deletions topic/src/main/java/tech/ydb/topic/read/impl/CommitterImpl.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package tech.ydb.topic.read.impl;

import java.util.concurrent.CompletableFuture;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import tech.ydb.topic.read.OffsetsRange;

/**
* @author Nikolay Perfilov
*/
public class CommitterImpl {
private static final Logger logger = LoggerFactory.getLogger(CommitterImpl.class);
private final PartitionSessionImpl partitionSession;
private final int messageCount;
private final OffsetsRange offsetsToCommit;

public CommitterImpl(PartitionSessionImpl partitionSession, int messageCount, OffsetsRange offsetsToCommit) {
this.partitionSession = partitionSession;
this.messageCount = messageCount;
this.offsetsToCommit = offsetsToCommit;
}


public CompletableFuture<Void> commit() {
return commitImpl(true);
}

public CompletableFuture<Void> commitImpl(boolean fromCommitter) {
if (logger.isDebugEnabled()) {
logger.debug("[{}] partition session {} (partition {}): committing {} message(s), offsets" +
" [{},{})" + (fromCommitter ? " from Committer" : ""), partitionSession.getPath(),
partitionSession.getId(), partitionSession.getPartitionId(), messageCount,
offsetsToCommit.getStart(), offsetsToCommit.getEnd());
}
return partitionSession.commitOffsetRange(offsetsToCommit);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
package tech.ydb.topic.read.impl;

import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import tech.ydb.topic.read.DeferredCommitter;
import tech.ydb.topic.read.Message;
import tech.ydb.topic.read.OffsetsRange;
import tech.ydb.topic.read.events.DataReceivedEvent;
import tech.ydb.topic.read.impl.events.DataReceivedEventImpl;

/**
* @author Nikolay Perfilov
*/
public class DeferredCommitterImpl implements DeferredCommitter {
private static final Logger logger = LoggerFactory.getLogger(DeferredCommitterImpl.class);

private final Map<PartitionSessionImpl, PartitionRanges> rangesByPartition = new ConcurrentHashMap<>();

private static class PartitionRanges {
private final PartitionSessionImpl partitionSession;
private final DisjointOffsetRangeSet ranges = new DisjointOffsetRangeSet();

private PartitionRanges(PartitionSessionImpl partitionSession) {
this.partitionSession = partitionSession;
}

private void add(OffsetsRange offsetRange) {
try {
synchronized (ranges) {
ranges.add(offsetRange);
}
} catch (RuntimeException exception) {
String errorMessage = "Error adding new offset range to DeferredCommitter for partition session " +
partitionSession.getId() + " (partition " + partitionSession.getPartitionId() + "): " +
exception.getMessage();
logger.error(errorMessage);
throw new RuntimeException(errorMessage, exception);
}
}

private void commit() {
List<OffsetsRange> rangesToCommit;
synchronized (ranges) {
rangesToCommit = ranges.getRangesAndClear();
}
partitionSession.commitOffsetRanges(rangesToCommit);
}
}

@Override
public void add(Message message) {
MessageImpl messageImpl = (MessageImpl) message;
PartitionRanges partitionRanges = rangesByPartition
.computeIfAbsent(messageImpl.getPartitionSessionImpl(), PartitionRanges::new);
partitionRanges.add(messageImpl.getOffsetsToCommit());
}

@Override
public void add(DataReceivedEvent event) {
DataReceivedEventImpl eventImpl = (DataReceivedEventImpl) event;
PartitionRanges partitionRanges = rangesByPartition
.computeIfAbsent(eventImpl.getPartitionSessionImpl(), PartitionRanges::new);
partitionRanges.add(eventImpl.getOffsetsToCommit());
}

@Override
public void commit() {
rangesByPartition.forEach((session, partitionRanges) -> {
partitionRanges.commit();
});
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package tech.ydb.topic.read.impl;

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;

import tech.ydb.topic.read.OffsetsRange;

/**
* @author Nikolay Perfilov
*/
public class DisjointOffsetRangeSet {
private final NavigableMap<Long, OffsetsRangeImpl> ranges = new ConcurrentSkipListMap<>();

public void add(OffsetsRange rangeToCommit) {
Map.Entry<Long, OffsetsRangeImpl> floorEntry = ranges.floorEntry(rangeToCommit.getStart());
if (floorEntry != null && floorEntry.getValue().getEnd() > rangeToCommit.getStart()) {
throwClashesException(floorEntry.getValue(), rangeToCommit);
}
Map.Entry<Long, OffsetsRangeImpl> ceilingEntry = ranges.ceilingEntry(rangeToCommit.getStart());
if (ceilingEntry != null && rangeToCommit.getEnd() > ceilingEntry.getValue().getStart()) {
throwClashesException(ceilingEntry.getValue(), rangeToCommit);
}
boolean mergedFloor = false;
if (floorEntry != null && floorEntry.getValue().getEnd() == rangeToCommit.getStart()) {
floorEntry.getValue().setEnd(rangeToCommit.getEnd());
mergedFloor = true;
}
if (ceilingEntry != null) {
OffsetsRangeImpl ceilingValue = ceilingEntry.getValue();
if (rangeToCommit.getEnd() == ceilingValue.getStart()) {
ranges.remove(ceilingEntry.getKey());
if (mergedFloor) {
floorEntry.getValue().setEnd(ceilingValue.getEnd());
} else {
ceilingValue.setStart(rangeToCommit.getStart());
ranges.put(rangeToCommit.getStart(), ceilingValue);
}
return;
}
}
if (!mergedFloor) {
ranges.put(rangeToCommit.getStart(), new OffsetsRangeImpl(rangeToCommit));
}
}

public List<OffsetsRange> getRangesAndClear() {
Collection<OffsetsRangeImpl> values = ranges.values();
List<OffsetsRange> result = new ArrayList<>(values);
values.clear();
return result;
}

private void throwClashesException(OffsetsRangeImpl existingRange, OffsetsRange newRange) {
String errMessage = "Error adding new offset range. Added range [" +
newRange.getStart() + "," + newRange.getEnd() + ") clashes with existing range [" +
existingRange.getStart() + "," + existingRange.getEnd() + ")";
throw new RuntimeException(errMessage);
}
}
Loading