diff --git a/src/Ydb.Sdk/src/Services/Topic/Reader/InternalMessage.cs b/src/Ydb.Sdk/src/Services/Topic/Reader/InternalMessage.cs new file mode 100644 index 00000000..4655e22a --- /dev/null +++ b/src/Ydb.Sdk/src/Services/Topic/Reader/InternalMessage.cs @@ -0,0 +1,85 @@ +using System.Collections.Immutable; +using Google.Protobuf; +using Google.Protobuf.Collections; +using Google.Protobuf.WellKnownTypes; +using Ydb.Topic; + +namespace Ydb.Sdk.Services.Topic.Reader; + +internal class InternalMessage +{ + public InternalMessage( + ByteString data, + string topic, + long partitionId, + string producerId, + OffsetsRange offsetsRange, + Timestamp createdAt, + RepeatedField metadataItems, + int dataSize) + { + Data = data; + Topic = topic; + PartitionId = partitionId; + ProducerId = producerId; + OffsetsRange = offsetsRange; + CreatedAt = createdAt; + MetadataItems = metadataItems; + DataSize = dataSize; + } + + private ByteString Data { get; } + + private string Topic { get; } + + private long PartitionId { get; } + + private string ProducerId { get; } + + private OffsetsRange OffsetsRange { get; } + + private Timestamp CreatedAt { get; } + + private RepeatedField MetadataItems { get; } + + private int DataSize { get; } + + internal Message ToPublicMessage(IDeserializer deserializer, ReaderSession readerSession) + { + return new Message( + data: deserializer.Deserialize(Data.ToByteArray()), + topic: Topic, + partitionId: PartitionId, + producerId: ProducerId, + createdAt: CreatedAt.ToDateTime(), + metadata: MetadataItems.Select(item => new Metadata(item.Key, item.Value.ToByteArray())).ToImmutableArray(), + offsetsRange: OffsetsRange, + readerSession: readerSession + ); + } +} + +internal class InternalBatchMessage +{ + public InternalBatchMessage( + OffsetsRange batchOffsetsRange, + Queue internalMessages, + ReaderSession readerSession) + { + BatchOffsetsRange = batchOffsetsRange; + InternalMessages = internalMessages; + ReaderSession = readerSession; + } + + internal OffsetsRange BatchOffsetsRange { get; } + + internal Queue InternalMessages { get; } + + internal ReaderSession ReaderSession { get; } +} + +internal record CommitSending( + OffsetsRange OffsetsRange, + long PartitionSessionId, + TaskCompletionSource TcsTopicPartitionOffset +); diff --git a/src/Ydb.Sdk/src/Services/Topic/Reader/Message.cs b/src/Ydb.Sdk/src/Services/Topic/Reader/Message.cs index 1eea95d6..047691e1 100644 --- a/src/Ydb.Sdk/src/Services/Topic/Reader/Message.cs +++ b/src/Ydb.Sdk/src/Services/Topic/Reader/Message.cs @@ -49,7 +49,7 @@ internal Message( public Task CommitAsync() { - return _readerSession.CommitOffsetRange(_offsetsRange); + return _readerSession.CommitOffsetRange(_offsetsRange, PartitionId); } } @@ -76,6 +76,19 @@ public Task CommitBatchAsync() var offsetsRange = new OffsetsRange { Start = Batch.First().Start, End = Batch.Last().End }; - return _readerSession.CommitOffsetRange(offsetsRange); + return _readerSession.CommitOffsetRange(offsetsRange, Batch.First().PartitionId); } } + +public class TopicPartitionOffset +{ + public TopicPartitionOffset(long offset, long partitionId) + { + Offset = offset; + PartitionId = partitionId; + } + + public long Offset { get; } + + public long PartitionId { get; } +} diff --git a/src/Ydb.Sdk/src/Services/Topic/Reader/Reader.cs b/src/Ydb.Sdk/src/Services/Topic/Reader/Reader.cs index 65197f6d..cec32584 100644 --- a/src/Ydb.Sdk/src/Services/Topic/Reader/Reader.cs +++ b/src/Ydb.Sdk/src/Services/Topic/Reader/Reader.cs @@ -21,24 +21,33 @@ namespace Ydb.Sdk.Services.Topic.Reader; internal class Reader : IReader { + private const double FreeBufferCoefficient = 0.2; + private readonly IDriver _driver; private readonly ReaderConfig _config; private readonly IDeserializer _deserializer; private readonly ILogger _logger; + private readonly GrpcRequestSettings _readerGrpcRequestSettings; private readonly Channel _receivedMessagesChannel = - Channel.CreateUnbounded(); + Channel.CreateUnbounded( + new UnboundedChannelOptions + { + SingleReader = true, + SingleWriter = true, + AllowSynchronousContinuations = false + } + ); private readonly CancellationTokenSource _disposeCts = new(); - private volatile ReaderSession? _readerSession; - internal Reader(IDriver driver, ReaderConfig config, IDeserializer deserializer) { _driver = driver; _config = config; _deserializer = deserializer; _logger = driver.LoggerFactory.CreateLogger>(); + _readerGrpcRequestSettings = new GrpcRequestSettings { CancellationToken = _disposeCts.Token }; _ = Initialize(); } @@ -49,6 +58,11 @@ public async ValueTask> ReadAsync(CancellationToken cancellation { if (_receivedMessagesChannel.Reader.TryPeek(out var batchInternalMessage)) { + if (!batchInternalMessage.ReaderSession.IsActive) + { + continue; + } + if (batchInternalMessage.InternalMessages.TryDequeue(out var message)) { return message.ToPublicMessage(_deserializer, batchInternalMessage.ReaderSession); @@ -77,7 +91,7 @@ public async ValueTask> ReadBatchAsync(CancellationToken ca throw new ReaderException("Detect race condition on ReadBatchAsync operation"); } - if (batchInternalMessage.InternalMessages.Count == 0) + if (batchInternalMessage.InternalMessages.Count == 0 || !batchInternalMessage.ReaderSession.IsActive) { continue; } @@ -106,10 +120,7 @@ private async Task Initialize() _logger.LogInformation("Reader session initialization started. ReaderConfig: {ReaderConfig}", _config); - var stream = _driver.BidirectionalStreamCall( - TopicService.StreamReadMethod, - GrpcRequestSettings.DefaultInstance - ); + var stream = _driver.BidirectionalStreamCall(TopicService.StreamReadMethod, _readerGrpcRequestSettings); var initRequest = new StreamReadMessage.Types.InitRequest(); if (_config.ConsumerName != null) @@ -187,15 +198,14 @@ await stream.Write(new MessageFromClient ReadRequest = new StreamReadMessage.Types.ReadRequest { BytesSize = _config.MemoryUsageMaxBytes } }); - _readerSession = new ReaderSession( + new ReaderSession( _config, stream, initResponse.SessionId, Initialize, _logger, _receivedMessagesChannel.Writer - ); - _readerSession.RunProcessingTopic(); + ).RunProcessingTopic(); } catch (Driver.TransportException e) { @@ -205,21 +215,11 @@ await stream.Write(new MessageFromClient } } - private void ClearChannelAsync() - { - while (_receivedMessagesChannel.Reader.TryRead(out _)) - { - /* Do nothing, simple read */ - } - } - public void Dispose() { try { _disposeCts.Cancel(); - - _readerSession?.Dispose(); } finally { @@ -252,8 +252,17 @@ public void Dispose() internal class ReaderSession : TopicSession { private readonly ChannelWriter _channelWriter; + + private readonly Channel _channelCommitSending = Channel.CreateUnbounded( + new UnboundedChannelOptions + { + SingleReader = true, + SingleWriter = true, + AllowSynchronousContinuations = false + } + ); + private readonly ConcurrentDictionary _partitionSessions = new(); - private readonly ConcurrentQueue _tcsOnCommitedMessages = new(); private long _memoryUsageMaxBytes; @@ -277,6 +286,27 @@ ChannelWriter channelWriter public async void RunProcessingTopic() { + _ = Task.Run(async () => + { + await foreach (var commitSending in _channelCommitSending.Reader.ReadAllAsync()) + { + await Stream.Write(new MessageFromClient + { + CommitOffsetRequest = new StreamReadMessage.Types.CommitOffsetRequest + { + CommitOffsets = + { + new StreamReadMessage.Types.CommitOffsetRequest.Types.PartitionCommitOffset + { + Offsets = { commitSending.OffsetsRange }, + PartitionSessionId = commitSending.PartitionSessionId + } + } + } + }); + } + }); + while (await Stream.MoveNextAsync()) { switch (Stream.Current.ServerMessageCase) @@ -285,9 +315,24 @@ public async void RunProcessingTopic() await HandleReadResponse(); break; case ServerMessageOneofCase.StartPartitionSessionRequest: - HandleStartPartitionSessionRequest(); + var startPartitionSessionRequest = Stream.Current.StartPartitionSessionRequest; + var partitionSession = startPartitionSessionRequest.PartitionSession; + + _partitionSessions[partitionSession.PartitionSessionId] = new PartitionSession( + partitionSession.PartitionSessionId, + partitionSession.Path, + partitionSession.PartitionId, + startPartitionSessionRequest.CommittedOffset + ); break; case ServerMessageOneofCase.CommitOffsetResponse: + // foreach (var offset in Stream.Current.CommitOffsetResponse.PartitionsCommittedOffsets) + // { + // offset.CommittedOffset; + // offset.PartitionSessionId; + // } + + break; case ServerMessageOneofCase.PartitionSessionStatusResponse: case ServerMessageOneofCase.UpdateTokenResponse: case ServerMessageOneofCase.StopPartitionSessionRequest: @@ -300,9 +345,13 @@ public async void RunProcessingTopic() } } - public async Task CommitOffsetRange(OffsetsRange offsetsRange) + public async Task CommitOffsetRange(OffsetsRange offsetsRange, long partitionId) { - throw new NotImplementedException(); + var tcsCommit = new TaskCompletionSource(); + + await _channelCommitSending.Writer.WriteAsync(new CommitSending(offsetsRange, partitionId, tcsCommit)); + + return await tcsCommit.Task; } private async Task HandleReadResponse() @@ -310,40 +359,50 @@ private async Task HandleReadResponse() var readResponse = Stream.Current.ReadResponse; Interlocked.Add(ref _memoryUsageMaxBytes, -readResponse.BytesSize); - + var readResponsesInBatch = 0; + foreach (var partition in readResponse.PartitionData) { var partitionSessionId = partition.PartitionSessionId; if (_partitionSessions.TryGetValue(partitionSessionId, out var partitionSession)) { - var internalBatchMessages = new Queue(); var startOffsetBatch = partitionSession.CommitedOffset; var endOffsetBatch = partitionSession.CommitedOffset; - foreach (var batch in partition.Batches) + + var batch = partition.Batches; + for (var i = 0; i < partition.Batches.Count; i++) { - foreach (var messageData in batch.MessageData) + var internalBatchMessages = new Queue(); + var actuallySummaryBatchPayload = 0; + + foreach (var messageData in batch[i].MessageData) { + actuallySummaryBatchPayload += messageData.Data.Length; + internalBatchMessages.Enqueue(new InternalMessage( data: messageData.Data, topic: partitionSession.TopicPath, partitionId: partitionSession.PartitionId, - producerId: batch.ProducerId, + producerId: batch[i].ProducerId, offsetsRange: new OffsetsRange { Start = partitionSession.CommitedOffset, End = messageData.Offset }, createdAt: messageData.CreatedAt, - metadataItems: messageData.MetadataItems + metadataItems: messageData.MetadataItems, + 0 )); partitionSession.CommitedOffset = endOffsetBatch = messageData.Offset + 1; } - } - await _channelWriter.WriteAsync(new InternalBatchMessage( - new OffsetsRange { Start = startOffsetBatch, End = endOffsetBatch }, - internalBatchMessages, - this) - ); + readResponsesInBatch -= actuallySummaryBatchPayload; + + await _channelWriter.WriteAsync(new InternalBatchMessage( + new OffsetsRange { Start = startOffsetBatch, End = endOffsetBatch }, + internalBatchMessages, + this) + ); + } } else { @@ -355,19 +414,6 @@ await _channelWriter.WriteAsync(new InternalBatchMessage( } } - private void HandleStartPartitionSessionRequest() - { - var startPartitionSessionRequest = Stream.Current.StartPartitionSessionRequest; - var partitionSession = startPartitionSessionRequest.PartitionSession; - - _partitionSessions[partitionSession.PartitionSessionId] = new PartitionSession( - partitionSession.PartitionSessionId, - partitionSession.Path, - partitionSession.PartitionId, - startPartitionSessionRequest.CommittedOffset - ); - } - private class PartitionSession { public PartitionSession( @@ -395,71 +441,3 @@ public PartitionSession( internal long CommitedOffset { get; set; } } } - -internal class InternalMessage -{ - public InternalMessage( - ByteString data, - string topic, - long partitionId, - string producerId, - OffsetsRange offsetsRange, - Timestamp createdAt, - RepeatedField metadataItems) - { - Data = data; - Topic = topic; - PartitionId = partitionId; - ProducerId = producerId; - OffsetsRange = offsetsRange; - CreatedAt = createdAt; - MetadataItems = metadataItems; - } - - private ByteString Data { get; } - - private string Topic { get; } - - private long PartitionId { get; } - - private string ProducerId { get; } - - private OffsetsRange OffsetsRange { get; } - - private Timestamp CreatedAt { get; } - - private RepeatedField MetadataItems { get; } - - internal Message ToPublicMessage(IDeserializer deserializer, ReaderSession readerSession) - { - return new Message( - data: deserializer.Deserialize(Data.ToByteArray()), - topic: Topic, - partitionId: PartitionId, - producerId: ProducerId, - createdAt: CreatedAt.ToDateTime(), - metadata: MetadataItems.Select(item => new Metadata(item.Key, item.Value.ToByteArray())).ToImmutableArray(), - offsetsRange: OffsetsRange, - readerSession: readerSession - ); - } -} - -internal class InternalBatchMessage -{ - public InternalBatchMessage( - OffsetsRange batchOffsetsRange, - Queue internalMessages, - ReaderSession readerSession) - { - BatchOffsetsRange = batchOffsetsRange; - InternalMessages = internalMessages; - ReaderSession = readerSession; - } - - internal OffsetsRange BatchOffsetsRange { get; } - - internal Queue InternalMessages { get; } - - internal ReaderSession ReaderSession { get; } -} diff --git a/src/Ydb.Sdk/src/Services/Topic/TopicSession.cs b/src/Ydb.Sdk/src/Services/Topic/TopicSession.cs index 5b6ff824..2d6c8e65 100644 --- a/src/Ydb.Sdk/src/Services/Topic/TopicSession.cs +++ b/src/Ydb.Sdk/src/Services/Topic/TopicSession.cs @@ -24,6 +24,8 @@ protected TopicSession( _initialize = initialize; } + internal bool IsActive => Volatile.Read(ref _isActive) == 1; + protected async void ReconnectSession() { if (Interlocked.CompareExchange(ref _isActive, 0, 1) == 0)