diff --git a/src/Ydb.Sdk/src/Services/Topic/Reader/Message.cs b/src/Ydb.Sdk/src/Services/Topic/Reader/Message.cs index 047691e..394eef7 100644 --- a/src/Ydb.Sdk/src/Services/Topic/Reader/Message.cs +++ b/src/Ydb.Sdk/src/Services/Topic/Reader/Message.cs @@ -59,9 +59,7 @@ public class BatchMessage public ImmutableArray> Batch { get; } - internal BatchMessage( - ImmutableArray> batch, - ReaderSession readerSession) + internal BatchMessage(ImmutableArray> batch, ReaderSession readerSession) { Batch = batch; _readerSession = readerSession; diff --git a/src/Ydb.Sdk/src/Services/Topic/Reader/Reader.cs b/src/Ydb.Sdk/src/Services/Topic/Reader/Reader.cs index db1b36a..0980c97 100644 --- a/src/Ydb.Sdk/src/Services/Topic/Reader/Reader.cs +++ b/src/Ydb.Sdk/src/Services/Topic/Reader/Reader.cs @@ -1,9 +1,6 @@ using System.Collections.Concurrent; using System.Collections.Immutable; -using System.Net.Sockets; using System.Threading.Channels; -using Google.Protobuf; -using Google.Protobuf.Collections; using Google.Protobuf.WellKnownTypes; using Microsoft.Extensions.Logging; using Ydb.Topic; @@ -310,46 +307,62 @@ await Stream.Write(new MessageFromClient } catch (Driver.TransportException e) { + Logger.LogError(e, "ReaderSession[{SessionId}] have transport error on Commit", SessionId); + ReconnectSession(); } }); - while (await Stream.MoveNextAsync()) + try { - switch (Stream.Current.ServerMessageCase) + while (await Stream.MoveNextAsync()) { - case ServerMessageOneofCase.ReadResponse: - await HandleReadResponse(); - break; - case ServerMessageOneofCase.StartPartitionSessionRequest: - var startPartitionSessionRequest = Stream.Current.StartPartitionSessionRequest; - var partitionSession = startPartitionSessionRequest.PartitionSession; - - _partitionSessions[partitionSession.PartitionSessionId] = new PartitionSession( - partitionSession.PartitionSessionId, - partitionSession.Path, - partitionSession.PartitionId, - startPartitionSessionRequest.CommittedOffset - ); - break; - case ServerMessageOneofCase.CommitOffsetResponse: - // foreach (var offset in Stream.Current.CommitOffsetResponse.PartitionsCommittedOffsets) - // { - // offset.CommittedOffset; - // offset.PartitionSessionId; - // } - - break; - case ServerMessageOneofCase.PartitionSessionStatusResponse: - case ServerMessageOneofCase.UpdateTokenResponse: - case ServerMessageOneofCase.StopPartitionSessionRequest: - case ServerMessageOneofCase.InitResponse: - case ServerMessageOneofCase.None: - break; - default: - throw new ArgumentOutOfRangeException(); + switch (Stream.Current.ServerMessageCase) + { + case ServerMessageOneofCase.ReadResponse: + await HandleReadResponse(); + break; + case ServerMessageOneofCase.StartPartitionSessionRequest: + var startPartitionSessionRequest = Stream.Current.StartPartitionSessionRequest; + var partitionSession = startPartitionSessionRequest.PartitionSession; + + _partitionSessions[partitionSession.PartitionSessionId] = new PartitionSession( + partitionSession.PartitionSessionId, + partitionSession.Path, + partitionSession.PartitionId, + startPartitionSessionRequest.CommittedOffset + ); + + Logger.LogInformation("ReaderSession[{SessionId}] started PartitionSession[]", SessionId); + break; + case ServerMessageOneofCase.CommitOffsetResponse: + // foreach (var offset in Stream.Current.CommitOffsetResponse.PartitionsCommittedOffsets) + // { + // offset.CommittedOffset; + // offset.PartitionSessionId; + // } + + break; + case ServerMessageOneofCase.PartitionSessionStatusResponse: + case ServerMessageOneofCase.UpdateTokenResponse: + case ServerMessageOneofCase.StopPartitionSessionRequest: + case ServerMessageOneofCase.InitResponse: + case ServerMessageOneofCase.None: + break; + default: + throw new ArgumentOutOfRangeException(); + } } } + catch (Driver.TransportException e) + { + Logger.LogError(e, "ReaderSession[{SessionId}] have transport error on processing server messages", + SessionId); + } + finally + { + ReconnectSession(); + } } public async Task CommitOffsetRange(OffsetsRange offsetsRange, long partitionId) @@ -367,7 +380,7 @@ private async Task HandleReadResponse() Interlocked.Add(ref _memoryUsageMaxBytes, -readResponse.BytesSize); var readResponsesInBatch = 0; - + foreach (var partition in readResponse.PartitionData) { var partitionSessionId = partition.PartitionSessionId; @@ -376,7 +389,7 @@ private async Task HandleReadResponse() { var startOffsetBatch = partitionSession.CommitedOffset; var endOffsetBatch = partitionSession.CommitedOffset; - + var batch = partition.Batches; for (var i = 0; i < partition.Batches.Count; i++) { @@ -386,7 +399,7 @@ private async Task HandleReadResponse() foreach (var messageData in batch[i].MessageData) { actuallySummaryBatchPayload += messageData.Data.Length; - + internalBatchMessages.Enqueue(new InternalMessage( data: messageData.Data, topic: partitionSession.TopicPath, diff --git a/src/Ydb.Sdk/src/Services/Topic/TopicSession.cs b/src/Ydb.Sdk/src/Services/Topic/TopicSession.cs index b91be0d..d1ce444 100644 --- a/src/Ydb.Sdk/src/Services/Topic/TopicSession.cs +++ b/src/Ydb.Sdk/src/Services/Topic/TopicSession.cs @@ -12,8 +12,6 @@ internal abstract class TopicSession : IDisposable private int _isActive = 1; - public bool IsActive => Volatile.Read(ref _isActive) == 1; - protected TopicSession( IBidirectionalStream stream, ILogger logger, @@ -26,7 +24,7 @@ protected TopicSession( _initialize = initialize; } - internal bool IsActive => Volatile.Read(ref _isActive) == 1; + public bool IsActive => Volatile.Read(ref _isActive) == 1; protected async void ReconnectSession() {