Skip to content

Commit

Permalink
Add message metadata support
Browse files Browse the repository at this point in the history
  • Loading branch information
pnv1 committed Dec 22, 2023
1 parent e6e0c34 commit 6632969
Show file tree
Hide file tree
Showing 6 changed files with 100 additions and 3 deletions.
24 changes: 24 additions & 0 deletions topic/src/main/java/tech/ydb/topic/description/MetadataItem.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package tech.ydb.topic.description;

import javax.annotation.Nonnull;

/**
* @author Nikolay Perfilov
*/
public class MetadataItem {
private final String key;
private final byte[] value;

public MetadataItem(@Nonnull String key, byte[] value) {
this.key = key;
this.value = value;
}

public String getKey() {
return key;
}

public byte[] getValue() {
return value;
}
}
8 changes: 8 additions & 0 deletions topic/src/main/java/tech/ydb/topic/read/Message.java
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
package tech.ydb.topic.read;

import java.time.Instant;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;

import io.grpc.ExperimentalApi;

import tech.ydb.topic.description.MetadataItem;

/**
* @author Nikolay Perfilov
*/
Expand Down Expand Up @@ -52,6 +55,11 @@ public interface Message {
*/
Instant getWrittenAt();

/**
* @return message metadata items
*/
List<MetadataItem> getMetadataItems();

/**
* @return Partition session of this message
*/
Expand Down
15 changes: 15 additions & 0 deletions topic/src/main/java/tech/ydb/topic/read/impl/MessageImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@

import java.io.IOException;
import java.time.Instant;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;

import tech.ydb.topic.description.MetadataItem;
import tech.ydb.topic.read.DecompressionException;
import tech.ydb.topic.read.Message;
import tech.ydb.topic.read.OffsetsRange;
Expand All @@ -22,6 +24,7 @@ public class MessageImpl implements Message {
private final String messageGroupId;
private final BatchMeta batchMeta;
private final PartitionSessionImpl partitionSession;
private List<MetadataItem> metadataItems;
private final OffsetsRange offsetsToCommit;
private final CommitterImpl committer;
private boolean isDecompressed = false;
Expand All @@ -36,6 +39,7 @@ private MessageImpl(Builder builder) {
this.messageGroupId = builder.messageGroupId;
this.batchMeta = builder.batchMeta;
this.partitionSession = builder.partitionSession;
this.metadataItems = builder.metadataItems;
this.offsetsToCommit = new OffsetsRangeImpl(commitOffsetFrom, offset + 1);
this.committer = new CommitterImpl(partitionSession, 1, offsetsToCommit);
}
Expand Down Expand Up @@ -105,6 +109,11 @@ public PartitionSessionImpl getPartitionSessionImpl() {
return partitionSession;
}

@Override
public List<MetadataItem> getMetadataItems() {
return metadataItems;
}

public void setDecompressed(boolean decompressed) {
isDecompressed = decompressed;
}
Expand All @@ -130,6 +139,7 @@ public static class Builder {
private String messageGroupId;
private BatchMeta batchMeta;
private PartitionSessionImpl partitionSession;
private List<MetadataItem> metadataItems;

public Builder setData(byte[] data) {
this.data = data;
Expand Down Expand Up @@ -171,6 +181,11 @@ public Builder setPartitionSession(PartitionSessionImpl partitionSession) {
return this;
}

public Builder setMetadataItems(List<MetadataItem> metadataItems) {
this.metadataItems = metadataItems;
return this;
}

public MessageImpl build() {
return new MessageImpl(this);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,15 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import tech.ydb.core.utils.ProtobufUtils;
import tech.ydb.proto.topic.YdbTopic;
import tech.ydb.topic.description.Codec;
import tech.ydb.topic.description.MetadataItem;
import tech.ydb.topic.read.Message;
import tech.ydb.topic.read.OffsetsRange;
import tech.ydb.topic.read.PartitionSession;
Expand Down Expand Up @@ -140,6 +142,11 @@ public CompletableFuture<Void> addBatches(List<YdbTopic.StreamReadMessage.ReadRe
.setCommitOffsetFrom(commitOffsetFrom)
.setCreatedAt(ProtobufUtils.protoToInstant(messageData.getCreatedAt()))
.setMessageGroupId(messageData.getMessageGroupId())
.setMetadataItems(messageData.getMetadataItemsList()
.stream()
.map(metadataItem -> new MetadataItem(metadataItem.getKey(),
metadataItem.toByteArray()))
.collect(Collectors.toList()))
.build()
);
});
Expand Down
26 changes: 26 additions & 0 deletions topic/src/main/java/tech/ydb/topic/write/Message.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,12 @@
package tech.ydb.topic.write;

import java.time.Instant;
import java.util.ArrayList;
import java.util.List;

import javax.annotation.Nonnull;

import tech.ydb.topic.description.MetadataItem;

/**
* @author Nikolay Perfilov
Expand All @@ -9,11 +15,13 @@ public class Message {
private byte[] data;
private final Long seqNo;
private final Instant createTimestamp;
private List<MetadataItem> metadataItems;

private Message(Builder builder) {
this.data = builder.data;
this.seqNo = builder.seqNo;
this.createTimestamp = builder.createTimestamp != null ? builder.createTimestamp : Instant.now();
this.metadataItems = builder.metadataItems;
}

private Message(byte[] data) {
Expand Down Expand Up @@ -46,13 +54,18 @@ public Instant getCreateTimestamp() {
return createTimestamp;
}

public List<MetadataItem> getMetadataItems() {
return metadataItems;
}

/**
* BUILDER
*/
public static class Builder {
private byte[] data;
private Long seqNo = null;
private Instant createTimestamp = null;
private List<MetadataItem> metadataItems = null;

public Builder setData(byte[] data) {
this.data = data;
Expand All @@ -69,6 +82,19 @@ public Builder setCreateTimestamp(Instant createTimestamp) {
return this;
}

public Builder addMetadataItem(@Nonnull MetadataItem metadataItem) {
if (metadataItems == null) {
metadataItems = new ArrayList<>();
}
metadataItems.add(metadataItem);
return this;
}

public Builder setMetadataItems(List<MetadataItem> metadataItems) {
this.metadataItems = metadataItems;
return this;
}

public Message build() {
return new Message(this);
}
Expand Down
23 changes: 20 additions & 3 deletions topic/src/main/java/tech/ydb/topic/write/impl/MessageSender.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,15 @@
import java.util.Arrays;
import java.util.List;
import java.util.Queue;
import java.util.stream.Collectors;

import com.google.protobuf.ByteString;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import tech.ydb.core.utils.ProtobufUtils;
import tech.ydb.proto.topic.YdbTopic;
import tech.ydb.topic.description.MetadataItem;
import tech.ydb.topic.settings.WriterSettings;
import tech.ydb.topic.utils.ProtoUtils;

Expand Down Expand Up @@ -138,13 +140,28 @@ public void tryAddMessageToRequest(EnqueuedMessage message) {
message.setSeqNo(messageSeqNo);
}

YdbTopic.StreamWriteMessage.WriteRequest.MessageData messageData =


YdbTopic.StreamWriteMessage.WriteRequest.MessageData.Builder messageDataBuilder =
YdbTopic.StreamWriteMessage.WriteRequest.MessageData.newBuilder()
.setSeqNo(messageSeqNo)
.setData(ByteString.copyFrom(message.getMessage().getData()))
.setCreatedAt(ProtobufUtils.instantToProto(message.getMessage().getCreateTimestamp()))
.setUncompressedSize(message.getUncompressedSizeBytes())
.build();
.setUncompressedSize(message.getUncompressedSizeBytes());

List<MetadataItem> metadataItems = message.getMessage().getMetadataItems();
if (metadataItems != null && !metadataItems.isEmpty()) {
messageDataBuilder.addAllMetadataItems(metadataItems
.stream()
.map(metadataItem -> YdbTopic.MetadataItem.newBuilder()
.setKey(metadataItem.getKey())
.setValue(ByteString.copyFrom(metadataItem.getValue()))
.build())
.collect(Collectors.toList()));
}

YdbTopic.StreamWriteMessage.WriteRequest.MessageData messageData = messageDataBuilder.build();

long sizeWithCurrentMessage = getCurrentRequestSize() + messageData.getSerializedSize() + messageOverheadBytes;
if (sizeWithCurrentMessage <= MAX_GRPC_MESSAGE_SIZE) {
addMessage(messageData);
Expand Down

0 comments on commit 6632969

Please sign in to comment.