diff --git a/src/Ydb.Sdk/src/Services/Topic/Exceptions.cs b/src/Ydb.Sdk/src/Services/Topic/Exceptions.cs index 76543d90..7e72ed45 100644 --- a/src/Ydb.Sdk/src/Services/Topic/Exceptions.cs +++ b/src/Ydb.Sdk/src/Services/Topic/Exceptions.cs @@ -17,7 +17,20 @@ public WriterException(string message, Exception inner) : base(message, inner) public class ReaderException : Exception { - protected ReaderException(string message) : base(message) + public ReaderException(string message) : base(message) { + Status = new Status(StatusCode.Unspecified); } + + public ReaderException(string message, Status status) : base(message + ": " + status) + { + Status = status; + } + + public ReaderException(string message, Driver.TransportException e) : base(message, e) + { + Status = e.Status; + } + + public Status Status { get; } } diff --git a/src/Ydb.Sdk/src/Services/Topic/IReader.cs b/src/Ydb.Sdk/src/Services/Topic/IReader.cs index fc021168..9a09cd4a 100644 --- a/src/Ydb.Sdk/src/Services/Topic/IReader.cs +++ b/src/Ydb.Sdk/src/Services/Topic/IReader.cs @@ -2,9 +2,9 @@ namespace Ydb.Sdk.Services.Topic; -public interface IReader +public interface IReader : IDisposable { - public Task ReadAsync(); + public ValueTask> ReadAsync(CancellationToken cancellationToken = default); - public Task> ReadMessageAsync(); + public ValueTask> ReadBatchAsync(CancellationToken cancellationToken = default); } diff --git a/src/Ydb.Sdk/src/Services/Topic/Reader/InternalMessage.cs b/src/Ydb.Sdk/src/Services/Topic/Reader/InternalMessage.cs new file mode 100644 index 00000000..9187b5d4 --- /dev/null +++ b/src/Ydb.Sdk/src/Services/Topic/Reader/InternalMessage.cs @@ -0,0 +1,89 @@ +using System.Collections.Immutable; +using Google.Protobuf; +using Google.Protobuf.Collections; +using Google.Protobuf.WellKnownTypes; +using Ydb.Topic; + +namespace Ydb.Sdk.Services.Topic.Reader; + +internal class InternalMessage +{ + public InternalMessage( + ByteString data, + string topic, + long partitionId, + string producerId, + OffsetsRange offsetsRange, + Timestamp createdAt, + RepeatedField metadataItems, + long dataSize) + { + Data = data; + Topic = topic; + PartitionId = partitionId; + ProducerId = producerId; + OffsetsRange = offsetsRange; + CreatedAt = createdAt; + MetadataItems = metadataItems; + DataSize = dataSize; + } + + private ByteString Data { get; } + + private string Topic { get; } + + private long PartitionId { get; } + + private string ProducerId { get; } + + private OffsetsRange OffsetsRange { get; } + + private Timestamp CreatedAt { get; } + + private RepeatedField MetadataItems { get; } + + private long DataSize { get; } + + internal Message ToPublicMessage(IDeserializer deserializer, ReaderSession readerSession) + { + return new Message( + data: deserializer.Deserialize(Data.ToByteArray()), + topic: Topic, + partitionId: PartitionId, + producerId: ProducerId, + createdAt: CreatedAt.ToDateTime(), + metadata: MetadataItems.Select(item => new Metadata(item.Key, item.Value.ToByteArray())).ToImmutableArray(), + offsetsRange: OffsetsRange, + readerSession: readerSession + ); + } +} + +internal class InternalBatchMessage +{ + public InternalBatchMessage( + OffsetsRange batchOffsetsRange, + Queue internalMessages, + ReaderSession readerSession, + long approximatelyBatchSize) + { + BatchOffsetsRange = batchOffsetsRange; + InternalMessages = internalMessages; + ReaderSession = readerSession; + ApproximatelyBatchSize = approximatelyBatchSize; + } + + internal OffsetsRange BatchOffsetsRange { get; } + + internal Queue InternalMessages { get; } + + internal ReaderSession ReaderSession { get; } + + internal long ApproximatelyBatchSize { get; } +} + +internal record CommitSending( + OffsetsRange OffsetsRange, + long PartitionSessionId, + TaskCompletionSource TcsCommit +); diff --git a/src/Ydb.Sdk/src/Services/Topic/Reader/Message.cs b/src/Ydb.Sdk/src/Services/Topic/Reader/Message.cs index 1feb5c7d..394eef74 100644 --- a/src/Ydb.Sdk/src/Services/Topic/Reader/Message.cs +++ b/src/Ydb.Sdk/src/Services/Topic/Reader/Message.cs @@ -1,5 +1,92 @@ +using System.Collections.Immutable; +using Ydb.Topic; + namespace Ydb.Sdk.Services.Topic.Reader; public class Message { + private readonly OffsetsRange _offsetsRange; + private readonly ReaderSession _readerSession; + + internal Message( + TValue data, + string topic, + long partitionId, + string producerId, + DateTime createdAt, + ImmutableArray metadata, + OffsetsRange offsetsRange, + ReaderSession readerSession) + { + Data = data; + Topic = topic; + PartitionId = partitionId; + ProducerId = producerId; + CreatedAt = createdAt; + Metadata = metadata; + + _offsetsRange = offsetsRange; + _readerSession = readerSession; + } + + public TValue Data { get; } + + /// + /// The topic associated with the message. + /// + public string Topic { get; } + + public long PartitionId { get; } + + public string ProducerId { get; } + + public DateTime CreatedAt { get; } + + public ImmutableArray Metadata { get; } + + internal long Start => _offsetsRange.Start; + internal long End => _offsetsRange.End; + + public Task CommitAsync() + { + return _readerSession.CommitOffsetRange(_offsetsRange, PartitionId); + } +} + +public class BatchMessage +{ + private readonly ReaderSession _readerSession; + + public ImmutableArray> Batch { get; } + + internal BatchMessage(ImmutableArray> batch, ReaderSession readerSession) + { + Batch = batch; + _readerSession = readerSession; + } + + public Task CommitBatchAsync() + { + if (Batch.Length == 0) + { + return Task.CompletedTask; + } + + var offsetsRange = new OffsetsRange { Start = Batch.First().Start, End = Batch.Last().End }; + + return _readerSession.CommitOffsetRange(offsetsRange, Batch.First().PartitionId); + } +} + +public class TopicPartitionOffset +{ + public TopicPartitionOffset(long offset, long partitionId) + { + Offset = offset; + PartitionId = partitionId; + } + + public long Offset { get; } + + public long PartitionId { get; } } diff --git a/src/Ydb.Sdk/src/Services/Topic/Reader/Reader.cs b/src/Ydb.Sdk/src/Services/Topic/Reader/Reader.cs index caf87943..f3d59540 100644 --- a/src/Ydb.Sdk/src/Services/Topic/Reader/Reader.cs +++ b/src/Ydb.Sdk/src/Services/Topic/Reader/Reader.cs @@ -1,23 +1,587 @@ +using System.Collections.Concurrent; +using System.Collections.Immutable; +using System.Threading.Channels; +using Google.Protobuf.WellKnownTypes; +using Microsoft.Extensions.Logging; +using Ydb.Sdk.Ado; +using Ydb.Topic; +using Ydb.Topic.V1; +using static Ydb.Topic.StreamReadMessage.Types.FromServer; + namespace Ydb.Sdk.Services.Topic.Reader; +using MessageFromClient = StreamReadMessage.Types.FromClient; +using MessageFromServer = StreamReadMessage.Types.FromServer; +using ReaderStream = IBidirectionalStream< + StreamReadMessage.Types.FromClient, + StreamReadMessage.Types.FromServer +>; + internal class Reader : IReader { - // internal Reader(Driver driver, ReaderConfig config, IDeserializer deserializer) - // { - // } + private const double FreeBufferCoefficient = 0.2; + + private readonly IDriver _driver; + private readonly ReaderConfig _config; + private readonly IDeserializer _deserializer; + private readonly ILogger _logger; + private readonly GrpcRequestSettings _readerGrpcRequestSettings; + + private readonly Channel _receivedMessagesChannel = + Channel.CreateUnbounded( + new UnboundedChannelOptions + { + SingleReader = true, + SingleWriter = true, + AllowSynchronousContinuations = false + } + ); - internal Task Initialize() + private readonly CancellationTokenSource _disposeCts = new(); + + internal Reader(IDriver driver, ReaderConfig config, IDeserializer deserializer) { - throw new NotImplementedException(); + _driver = driver; + _config = config; + _deserializer = deserializer; + _logger = driver.LoggerFactory.CreateLogger>(); + _readerGrpcRequestSettings = new GrpcRequestSettings { CancellationToken = _disposeCts.Token }; + + _ = Initialize(); } - public Task ReadAsync() + public async ValueTask> ReadAsync(CancellationToken cancellationToken = default) { - throw new NotImplementedException(); + while (await _receivedMessagesChannel.Reader.WaitToReadAsync(cancellationToken)) + { + if (_receivedMessagesChannel.Reader.TryPeek(out var batchInternalMessage)) + { + if (!batchInternalMessage.ReaderSession.IsActive) + { + continue; + } + + if (batchInternalMessage.InternalMessages.TryDequeue(out var message)) + { + return message.ToPublicMessage(_deserializer, batchInternalMessage.ReaderSession); + } + + if (!_receivedMessagesChannel.Reader.TryRead(out _)) + { + throw new ReaderException("Detect race condition on ReadAsync operation"); + } + } + else + { + throw new ReaderException("Detect race condition on ReadAsync operation"); + } + } + + throw new ReaderException("Reader is disposed"); } - public Task> ReadMessageAsync() + public async ValueTask> ReadBatchAsync(CancellationToken cancellationToken = default) { - throw new NotImplementedException(); + while (await _receivedMessagesChannel.Reader.WaitToReadAsync(cancellationToken)) + { + if (!_receivedMessagesChannel.Reader.TryRead(out var batchInternalMessage)) + { + throw new ReaderException("Detect race condition on ReadBatchAsync operation"); + } + + if (batchInternalMessage.InternalMessages.Count == 0 || !batchInternalMessage.ReaderSession.IsActive) + { + continue; + } + + return new BatchMessage( + batchInternalMessage.InternalMessages + .Select(message => message.ToPublicMessage(_deserializer, batchInternalMessage.ReaderSession)) + .ToImmutableArray(), + batchInternalMessage.ReaderSession + ); + } + + throw new ReaderException("Reader is disposed"); + } + + private async Task Initialize() + { + try + { + if (_disposeCts.IsCancellationRequested) + { + _logger.LogWarning("Reader writer is canceled because it has been disposed"); + + return; + } + + _logger.LogInformation("Reader session initialization started. ReaderConfig: {ReaderConfig}", _config); + + var stream = _driver.BidirectionalStreamCall(TopicService.StreamReadMethod, _readerGrpcRequestSettings); + + var initRequest = new StreamReadMessage.Types.InitRequest(); + if (_config.ConsumerName != null) + { + initRequest.Consumer = _config.ConsumerName; + } + + if (_config.ReaderName != null) + { + initRequest.ReaderName = _config.ReaderName; + } + + foreach (var subscribe in _config.SubscribeSettings) + { + var topicReadSettings = new StreamReadMessage.Types.InitRequest.Types.TopicReadSettings + { + Path = subscribe.TopicPath + }; + + if (subscribe.MaxLag != null) + { + topicReadSettings.MaxLag = Duration.FromTimeSpan(subscribe.MaxLag.Value); + } + + if (subscribe.ReadFrom != null) + { + topicReadSettings.ReadFrom = Timestamp.FromDateTime(subscribe.ReadFrom.Value); + } + + initRequest.TopicsReadSettings.Add(topicReadSettings); + } + + _logger.LogDebug("Sending initialization request for the read stream: {InitRequest}", initRequest); + + await stream.Write(new MessageFromClient { InitRequest = initRequest }); + if (!await stream.MoveNextAsync()) + { + _logger.LogError("Stream unexpectedly closed by YDB server. Current InitRequest: {InitRequest}", + initRequest); + + _ = Task.Run(Initialize, _disposeCts.Token); + + return; + } + + var receivedInitMessage = stream.Current; + + var status = Status.FromProto(receivedInitMessage.Status, receivedInitMessage.Issues); + + if (status.IsNotSuccess) + { + if (RetrySettings.DefaultInstance.GetRetryRule(status.StatusCode).Policy != RetryPolicy.None) + { + _logger.LogError("Reader initialization failed to start. Reason: {Status}", status); + + _ = Task.Run(Initialize, _disposeCts.Token); + } + else + { + _logger.LogCritical("Reader initialization failed to start. Reason: {Status}", status); + + _receivedMessagesChannel.Writer.Complete(new ReaderException("Initialization failed", status)); + } + + return; + } + + var initResponse = receivedInitMessage.InitResponse; + + _logger.LogDebug("Received a response for the initialization request on the read stream: {InitResponse}", + initResponse); + + await stream.Write(new MessageFromClient + { + ReadRequest = new StreamReadMessage.Types.ReadRequest { BytesSize = _config.MemoryUsageMaxBytes } + }); + + new ReaderSession( + _config, + stream, + initResponse.SessionId, + Initialize, + _logger, + _receivedMessagesChannel.Writer + ).RunProcessingTopic(); + } + catch (Driver.TransportException e) + { + _logger.LogError(e, "Transport error on executing ReaderSession"); + + _ = Task.Run(Initialize, _disposeCts.Token); + } + } + + public void Dispose() + { + try + { + _disposeCts.Cancel(); + } + finally + { + _disposeCts.Dispose(); + } + } +} + +/// +/// Server and client each keep track of total bytes size of all ReadResponses. +/// When client is ready to receive N more bytes in responses (to increment possible total by N), +/// it sends a ReadRequest with bytes_size = N. +/// bytes_size value must be positive. +/// So in expression 'A = (sum of bytes_size in all ReadRequests) - (sum of bytes_size in all ReadResponses)' +/// server will keep A (available size for responses) non-negative. +/// But there is an exception. If server receives ReadRequest, and the first message in response exceeds A - +/// then it will still be delivered, and A will become negative until enough additional ReadRequests. +/// +/// Example: +/// 1) Let client have 200 bytes buffer. It sends ReadRequest with bytes_size = 200; +/// 2) Server may return one ReadResponse with bytes_size = 70 and then another 80 bytes response; +/// now client buffer has 50 free bytes, server is free to send up to 50 bytes in responses. +/// 3) Client processes 100 bytes from buffer, now buffer free space is 150 bytes, +/// so client sends ReadRequest with bytes_size = 100; +/// 4) Server is free to send up to 50 + 100 = 150 bytes. But the next read message is too big, +/// and it sends 160 bytes ReadResponse. +/// 5) Let's assume client somehow processes it, and its 200 bytes buffer is free again. +/// It should account for excess 10 bytes and send ReadRequest with bytes_size = 210. +/// +internal class ReaderSession : TopicSession +{ + private readonly ChannelWriter _channelWriter; + private readonly CancellationTokenSource _lifecycleReaderSessionCts = new(); + + private readonly Channel _channelCommitSending = Channel.CreateUnbounded( + new UnboundedChannelOptions + { + SingleReader = true, + SingleWriter = true, + AllowSynchronousContinuations = false + } + ); + + private readonly ConcurrentDictionary _partitionSessions = new(); + + private long _memoryUsageMaxBytes; + + public ReaderSession( + ReaderConfig config, + ReaderStream stream, + string sessionId, + Func initialize, + ILogger logger, + ChannelWriter channelWriter + ) : base( + stream, + logger, + sessionId, + initialize + ) + { + _channelWriter = channelWriter; + _memoryUsageMaxBytes = config.MemoryUsageMaxBytes; + } + + public async void RunProcessingTopic() + { + _ = Task.Run(async () => + { + try + { + await foreach (var commitSending in _channelCommitSending.Reader.ReadAllAsync()) + { + if (_partitionSessions.TryGetValue(commitSending.PartitionSessionId, out var partitionSession)) + { + partitionSession.RegisterCommitRequest(commitSending); + } + else + { + Logger.LogWarning( + "Offset range [{OffsetRange}] is requested to be committed, " + + "but PartitionSession[PartitionSessionId={PartitionSessionId}] is already closed", + commitSending.OffsetsRange, commitSending.PartitionSessionId); + } + + await Stream.Write(new MessageFromClient + { + CommitOffsetRequest = new StreamReadMessage.Types.CommitOffsetRequest + { + CommitOffsets = + { + new StreamReadMessage.Types.CommitOffsetRequest.Types.PartitionCommitOffset + { + Offsets = { commitSending.OffsetsRange }, + PartitionSessionId = commitSending.PartitionSessionId + } + } + } + }); + } + } + catch (Driver.TransportException e) + { + Logger.LogError(e, "ReaderSession[{SessionId}] have transport error on Commit", SessionId); + + _lifecycleReaderSessionCts.Cancel(); + + ReconnectSession(); + } + }); + + try + { + while (await Stream.MoveNextAsync()) + { + switch (Stream.Current.ServerMessageCase) + { + case ServerMessageOneofCase.ReadResponse: + await HandleReadResponse(); + break; + case ServerMessageOneofCase.StartPartitionSessionRequest: + await HandleStartPartitionSessionRequest(); + break; + case ServerMessageOneofCase.CommitOffsetResponse: + HandleCommitOffsetResponse(); + break; + case ServerMessageOneofCase.PartitionSessionStatusResponse: + case ServerMessageOneofCase.UpdateTokenResponse: + case ServerMessageOneofCase.StopPartitionSessionRequest: + case ServerMessageOneofCase.InitResponse: + case ServerMessageOneofCase.None: + break; + default: + throw new ArgumentOutOfRangeException(); + } + } + } + catch (Driver.TransportException e) + { + Logger.LogError(e, "ReaderSession[{SessionId}] have transport error on processing server messages", + SessionId); + } + finally + { + _lifecycleReaderSessionCts.Cancel(); + + ReconnectSession(); + } + } + + private async Task HandleStartPartitionSessionRequest() + { + var startPartitionSessionRequest = Stream.Current.StartPartitionSessionRequest; + var partitionSession = startPartitionSessionRequest.PartitionSession; + _partitionSessions[partitionSession.PartitionSessionId] = new PartitionSession( + Logger, + partitionSession.PartitionSessionId, + partitionSession.Path, + partitionSession.PartitionId, + startPartitionSessionRequest.CommittedOffset + ); + + Logger.LogInformation( + "ReaderSession[{SessionId}] started PartitionSession[PartitionSessionId={PartitionSessionId}, " + + "Path={Path}, PartitionId={PartitionId}, CommittedOffset={CommittedOffset}]", + SessionId, partitionSession.PartitionSessionId, partitionSession.Path, + partitionSession.PartitionId, startPartitionSessionRequest.CommittedOffset); + + await Stream.Write(new MessageFromClient + { + StartPartitionSessionResponse = new StreamReadMessage.Types.StartPartitionSessionResponse + { + PartitionSessionId = partitionSession.PartitionSessionId + /* Simple client doesn't have read_offset or commit_offset settings */ + } + }); + } + + private void HandleCommitOffsetResponse() + { + foreach (var partitionsCommittedOffset in Stream.Current.CommitOffsetResponse.PartitionsCommittedOffsets) + { + if (_partitionSessions.TryGetValue(partitionsCommittedOffset.PartitionSessionId, + out var partitionSession)) + { + partitionSession.HandleCommitedOffset(partitionSession.CommitedOffset); + } + else + { + Logger.LogError( + "Received CommitOffsetResponse[CommittedOffset={CommittedOffset}] " + + "for unknown PartitionSession[PartitionSessionId={PartitionSessionId}]", + partitionsCommittedOffset.CommittedOffset, partitionsCommittedOffset.PartitionSessionId); + } + } + } + + public async Task CommitOffsetRange(OffsetsRange offsetsRange, long partitionId) + { + var tcsCommit = new TaskCompletionSource(); + + await using var register = _lifecycleReaderSessionCts.Token.Register(() => tcsCommit + .TrySetException(new YdbException($"ReaderSession[{SessionId}] was deactivated"))); + + await _channelCommitSending.Writer.WriteAsync(new CommitSending(offsetsRange, partitionId, tcsCommit)); + + await tcsCommit.Task; + } + + private async Task HandleReadResponse() + { + var readResponse = Stream.Current.ReadResponse; + + Interlocked.Add(ref _memoryUsageMaxBytes, -readResponse.BytesSize); + + var bytesSize = readResponse.BytesSize; + var partitionCount = readResponse.PartitionData.Count; + + for (var partitionIndex = 0; partitionIndex < partitionCount; partitionIndex++) + { + var partition = readResponse.PartitionData[partitionIndex]; + var partitionSessionId = partition.PartitionSessionId; + var approximatelyPartitionBytesSize = CalculateApproximatelyBytesSize( + bytesSize: bytesSize, + countParts: partitionCount, + currentIndex: partitionIndex + ); + + if (_partitionSessions.TryGetValue(partitionSessionId, out var partitionSession)) + { + var startOffsetBatch = partitionSession.CommitedOffset; + var endOffsetBatch = partitionSession.CommitedOffset; + + var batchCount = partition.Batches.Count; + var batch = partition.Batches; + + for (var batchIndex = 0; batchIndex < batchCount; batchIndex++) + { + var approximatelyBatchBytesSize = CalculateApproximatelyBytesSize( + bytesSize: approximatelyPartitionBytesSize, + countParts: batchCount, + currentIndex: batchIndex + ); + + var internalBatchMessages = new Queue(); + var messagesCount = batch[batchIndex].MessageData.Count; + + for (var messageIndex = 0; messageIndex < messagesCount; messageIndex++) + { + var messageData = batch[batchIndex].MessageData[messageIndex]; + + internalBatchMessages.Enqueue( + new InternalMessage( + data: messageData.Data, + topic: partitionSession.TopicPath, + partitionId: partitionSession.PartitionId, + producerId: batch[batchIndex].ProducerId, + offsetsRange: new OffsetsRange + { Start = partitionSession.PrevEndOffsetMessage, End = messageData.Offset }, + createdAt: messageData.CreatedAt, + metadataItems: messageData.MetadataItems, + CalculateApproximatelyBytesSize( + bytesSize: approximatelyBatchBytesSize, + countParts: messagesCount, + currentIndex: messageIndex + ) + ) + ); + + partitionSession.PrevEndOffsetMessage = endOffsetBatch = messageData.Offset + 1; + } + + await _channelWriter.WriteAsync( + new InternalBatchMessage( + new OffsetsRange { Start = startOffsetBatch, End = endOffsetBatch }, + internalBatchMessages, + this, + approximatelyBatchBytesSize + ) + ); + } + } + else + { + Logger.LogError( + "ReaderSession[{SessionId}]: received PartitionData for unknown(closed?) " + + "PartitionSession[{PartitionSessionId}], all messages were skipped!", + SessionId, partitionSessionId); + + Interlocked.Add(ref _memoryUsageMaxBytes, approximatelyPartitionBytesSize); + } + } + } + + private static long CalculateApproximatelyBytesSize(long bytesSize, int countParts, int currentIndex) + { + return bytesSize / countParts + currentIndex == countParts - 1 ? bytesSize % countParts : 0; + } + + private class PartitionSession + { + private readonly ILogger _logger; + private readonly ConcurrentQueue<(long EndOffset, TaskCompletionSource TcsCommit)> _waitCommitMessages = new(); + + public PartitionSession( + ILogger logger, + long partitionSessionId, + string topicPath, + long partitionId, + long commitedOffset) + { + _logger = logger; + PartitionSessionId = partitionSessionId; + TopicPath = topicPath; + PartitionId = partitionId; + CommitedOffset = commitedOffset; + PrevEndOffsetMessage = commitedOffset; + } + + // Identifier of partition session. Unique inside one RPC call. + internal long PartitionSessionId { get; } + + // Topic path of partition + internal string TopicPath { get; } + + // Partition identifier + internal long PartitionId { get; } + + // Each offset up to and including (committed_offset - 1) was fully processed. + internal long CommitedOffset { get; set; } + + internal long PrevEndOffsetMessage { get; set; } + + internal void RegisterCommitRequest(CommitSending commitSending) + { + var endOffset = commitSending.OffsetsRange.End; + + if (endOffset <= CommitedOffset) + { + commitSending.TcsCommit.SetResult(); + } + else + { + _waitCommitMessages.Enqueue((endOffset, commitSending.TcsCommit)); + } + } + + internal void HandleCommitedOffset(long commitedOffset) + { + if (CommitedOffset >= commitedOffset) + { + _logger.LogError( + "Received CommitOffsetResponse[CommitedOffset={CommitedOffset}] " + + "which is not greater than previous committed offset: {PrevCommitedOffset}", + commitedOffset, CommitedOffset); + } + + CommitedOffset = commitedOffset; + + while (_waitCommitMessages.TryPeek(out var waitCommitTcs) && waitCommitTcs.EndOffset <= commitedOffset) + { + _waitCommitMessages.TryDequeue(out _); + waitCommitTcs.TcsCommit.SetResult(); + } + } } } diff --git a/src/Ydb.Sdk/src/Services/Topic/Reader/ReaderBuilder.cs b/src/Ydb.Sdk/src/Services/Topic/Reader/ReaderBuilder.cs index 97306ed6..2e3cf096 100644 --- a/src/Ydb.Sdk/src/Services/Topic/Reader/ReaderBuilder.cs +++ b/src/Ydb.Sdk/src/Services/Topic/Reader/ReaderBuilder.cs @@ -1,33 +1,109 @@ +using System.Text; + namespace Ydb.Sdk.Services.Topic.Reader; public class ReaderBuilder { - // private readonly ReaderConfig _config; - // private readonly Driver _driver; - // - // public ReaderBuilder(Driver driver, ReaderConfig config) - // { - // _driver = driver; - // _config = config; - // } + private readonly Driver _driver; + + public ReaderBuilder(Driver driver) + { + _driver = driver; + } public IDeserializer? Deserializer { get; set; } - public Task> Build() + /// + /// Message that describes topic to read. + /// Topics that will be read by this reader. + /// + public List SubscribeSettings { get; } = new(); + + /// + /// Path of consumer that is used for reading by this session. + /// + public string? ConsumerName { get; set; } + + /// + /// Optional name. Will be shown in debug stat. + /// + public string? ReaderName { get; set; } + + /// + /// Maximum amount of data the broker shall return for a Fetch request. + /// Messages are fetched in batches by the consumer and if the first message batch + /// in the first non-empty partition of the Fetch request is larger than this value, + /// then the message batch will still be returned to ensure the consumer can make progress. + /// + public long MemoryUsageMaxBytes { get; set; } = 50 * 1024 * 1024; // 50 Mb + + public IReader Build() + { + var config = new ReaderConfig( + subscribeSettings: SubscribeSettings, + consumerName: ConsumerName, + readerName: ReaderName, + memoryUsageMaxBytes: MemoryUsageMaxBytes + ); + + var reader = new Reader( + _driver, + config, + Deserializer ?? (IDeserializer)( + Deserializers.DefaultDeserializers.TryGetValue(typeof(TValue), out var deserializer) + ? deserializer + : throw new ReaderException("The deserializer is not set") + ) + ); + + return reader; + } +} + +public class SubscribeSettings +{ + public string TopicPath { get; } + + /// Topic path + public SubscribeSettings(string topicPath) { - throw new NotImplementedException(); - // var reader = new Reader( - // _driver, - // _config, - // Deserializer ?? (IDeserializer)( - // Deserializers.DefaultDeserializers.TryGetValue(typeof(TValue), out var deserializer) - // ? deserializer - // : throw new WriterException("The serializer is not set") - // ) - // ); - // - // await reader.Initialize(); - // - // return reader; + TopicPath = topicPath; + } + + /// + /// Partitions that will be read by this session. + /// If list is empty - then session will read all partitions. + /// + public List PartitionIds { get; } = new(); + + /// + /// Skip all messages that has write timestamp smaller than now - max_lag. + /// Zero means infinite lag. + /// + public TimeSpan? MaxLag { get; set; } + + /// + /// Read data only after this timestamp from this topic. + /// Read only messages with 'written_at' value greater or equal than this timestamp. + /// + public DateTime? ReadFrom { get; set; } + + public override string ToString() + { + var toString = new StringBuilder().Append("{TopicPath: ").Append(TopicPath); + + if (MaxLag != null) + { + toString.Append(", MaxLog: ").Append(MaxLag); + } + + if (ReadFrom != null) + { + toString.Append(", ReadFrom: ").Append(ReadFrom); + } + + toString.Append("PartitionIds: [").Append(string.Join(", ", PartitionIds)).Append("]}"); + + return toString.ToString(); } } diff --git a/src/Ydb.Sdk/src/Services/Topic/Reader/ReaderConfig.cs b/src/Ydb.Sdk/src/Services/Topic/Reader/ReaderConfig.cs index b9bb8851..de6287e3 100644 --- a/src/Ydb.Sdk/src/Services/Topic/Reader/ReaderConfig.cs +++ b/src/Ydb.Sdk/src/Services/Topic/Reader/ReaderConfig.cs @@ -1,54 +1,47 @@ +using System.Text; + namespace Ydb.Sdk.Services.Topic.Reader; -public class ReaderConfig +internal class ReaderConfig { - /// - /// Message that describes topic to read. - /// Topics that will be read by this reader. - /// - public List SubscribeSettings { get; } = new(); - - /// - /// Path of consumer that is used for reading by this session. - /// - public string? ConsumerName { get; set; } - - /// - /// Optional name. Will be shown in debug stat. - /// - public string? ReaderName { get; set; } - - /// - /// Direct reading from a partition node. - /// - public bool DirectRead { get; set; } -} + public ReaderConfig( + List subscribeSettings, + string? consumerName, + string? readerName, + long memoryUsageMaxBytes) + { + SubscribeSettings = subscribeSettings; + ConsumerName = consumerName; + ReaderName = readerName; + MemoryUsageMaxBytes = memoryUsageMaxBytes; + } -public class SubscribeSettings -{ - public string TopicPath { get; } + public List SubscribeSettings { get; } - /// Topic path - public SubscribeSettings(string topicPath) + public string? ConsumerName { get; } + + public string? ReaderName { get; } + + public long MemoryUsageMaxBytes { get; } + + public override string ToString() { - TopicPath = topicPath; + var toString = new StringBuilder().Append("SubscribeSettings: [") + .Append(string.Join(", ", SubscribeSettings)) + .Append(']') + .Append(", MemoryUsageMaxBytes: ") + .Append(MemoryUsageMaxBytes); + + if (ConsumerName != null) + { + toString.Append(", ConsumerName: ").Append(ConsumerName); + } + + if (ReaderName != null) + { + toString.Append(", ReaderName: ").Append(ReaderName); + } + + return toString.ToString(); } - - /// - /// Partitions that will be read by this session. - /// If list is empty - then session will read all partitions. - /// - public List PartitionIds { get; } = new(); - - /// - /// Skip all messages that has write timestamp smaller than now - max_lag. - /// Zero means infinite lag. - /// - public TimeSpan? MaxLag { get; set; } - - /// - /// Read data only after this timestamp from this topic. - /// Read only messages with 'written_at' value greater or equal than this timestamp. - /// - public DateTime? ReadFrom { get; set; } } diff --git a/src/Ydb.Sdk/src/Services/Topic/TopicSession.cs b/src/Ydb.Sdk/src/Services/Topic/TopicSession.cs index 59428c2d..d1ce4448 100644 --- a/src/Ydb.Sdk/src/Services/Topic/TopicSession.cs +++ b/src/Ydb.Sdk/src/Services/Topic/TopicSession.cs @@ -12,8 +12,6 @@ internal abstract class TopicSession : IDisposable private int _isActive = 1; - public bool IsActive => Volatile.Read(ref _isActive) == 1; - protected TopicSession( IBidirectionalStream stream, ILogger logger, @@ -26,6 +24,8 @@ protected TopicSession( _initialize = initialize; } + public bool IsActive => Volatile.Read(ref _isActive) == 1; + protected async void ReconnectSession() { if (Interlocked.CompareExchange(ref _isActive, 0, 1) == 0)