Skip to content

Commit

Permalink
dev: handle rpc exceptions in reader
Browse files Browse the repository at this point in the history
  • Loading branch information
KirillKurdyukov committed Jan 16, 2025
1 parent 20d3c38 commit 458edff
Show file tree
Hide file tree
Showing 3 changed files with 53 additions and 44 deletions.
4 changes: 1 addition & 3 deletions src/Ydb.Sdk/src/Services/Topic/Reader/Message.cs
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,7 @@ public class BatchMessage<TValue>

public ImmutableArray<Message<TValue>> Batch { get; }

internal BatchMessage(
ImmutableArray<Message<TValue>> batch,
ReaderSession readerSession)
internal BatchMessage(ImmutableArray<Message<TValue>> batch, ReaderSession readerSession)
{
Batch = batch;
_readerSession = readerSession;
Expand Down
89 changes: 51 additions & 38 deletions src/Ydb.Sdk/src/Services/Topic/Reader/Reader.cs
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
using System.Collections.Concurrent;
using System.Collections.Immutable;
using System.Net.Sockets;
using System.Threading.Channels;
using Google.Protobuf;
using Google.Protobuf.Collections;
using Google.Protobuf.WellKnownTypes;
using Microsoft.Extensions.Logging;
using Ydb.Topic;
Expand Down Expand Up @@ -310,46 +307,62 @@ await Stream.Write(new MessageFromClient
}
catch (Driver.TransportException e)
{
Logger.LogError(e, "ReaderSession[{SessionId}] have transport error on Commit", SessionId);

ReconnectSession();
}
});

while (await Stream.MoveNextAsync())
try
{
switch (Stream.Current.ServerMessageCase)
while (await Stream.MoveNextAsync())
{
case ServerMessageOneofCase.ReadResponse:
await HandleReadResponse();
break;
case ServerMessageOneofCase.StartPartitionSessionRequest:
var startPartitionSessionRequest = Stream.Current.StartPartitionSessionRequest;
var partitionSession = startPartitionSessionRequest.PartitionSession;

_partitionSessions[partitionSession.PartitionSessionId] = new PartitionSession(
partitionSession.PartitionSessionId,
partitionSession.Path,
partitionSession.PartitionId,
startPartitionSessionRequest.CommittedOffset
);
break;
case ServerMessageOneofCase.CommitOffsetResponse:
// foreach (var offset in Stream.Current.CommitOffsetResponse.PartitionsCommittedOffsets)
// {
// offset.CommittedOffset;
// offset.PartitionSessionId;
// }

break;
case ServerMessageOneofCase.PartitionSessionStatusResponse:
case ServerMessageOneofCase.UpdateTokenResponse:
case ServerMessageOneofCase.StopPartitionSessionRequest:
case ServerMessageOneofCase.InitResponse:
case ServerMessageOneofCase.None:
break;
default:
throw new ArgumentOutOfRangeException();
switch (Stream.Current.ServerMessageCase)
{
case ServerMessageOneofCase.ReadResponse:
await HandleReadResponse();
break;
case ServerMessageOneofCase.StartPartitionSessionRequest:
var startPartitionSessionRequest = Stream.Current.StartPartitionSessionRequest;
var partitionSession = startPartitionSessionRequest.PartitionSession;

_partitionSessions[partitionSession.PartitionSessionId] = new PartitionSession(
partitionSession.PartitionSessionId,
partitionSession.Path,
partitionSession.PartitionId,
startPartitionSessionRequest.CommittedOffset
);

Logger.LogInformation("ReaderSession[{SessionId}] started PartitionSession[]", SessionId);
break;
case ServerMessageOneofCase.CommitOffsetResponse:
// foreach (var offset in Stream.Current.CommitOffsetResponse.PartitionsCommittedOffsets)
// {
// offset.CommittedOffset;
// offset.PartitionSessionId;
// }

break;
case ServerMessageOneofCase.PartitionSessionStatusResponse:
case ServerMessageOneofCase.UpdateTokenResponse:
case ServerMessageOneofCase.StopPartitionSessionRequest:
case ServerMessageOneofCase.InitResponse:
case ServerMessageOneofCase.None:
break;
default:
throw new ArgumentOutOfRangeException();
}
}
}
catch (Driver.TransportException e)
{
Logger.LogError(e, "ReaderSession[{SessionId}] have transport error on processing server messages",
SessionId);
}
finally
{
ReconnectSession();
}
}

public async Task<TopicPartitionOffset> CommitOffsetRange(OffsetsRange offsetsRange, long partitionId)
Expand All @@ -367,7 +380,7 @@ private async Task HandleReadResponse()

Interlocked.Add(ref _memoryUsageMaxBytes, -readResponse.BytesSize);
var readResponsesInBatch = 0;

foreach (var partition in readResponse.PartitionData)
{
var partitionSessionId = partition.PartitionSessionId;
Expand All @@ -376,7 +389,7 @@ private async Task HandleReadResponse()
{
var startOffsetBatch = partitionSession.CommitedOffset;
var endOffsetBatch = partitionSession.CommitedOffset;

var batch = partition.Batches;
for (var i = 0; i < partition.Batches.Count; i++)
{
Expand All @@ -386,7 +399,7 @@ private async Task HandleReadResponse()
foreach (var messageData in batch[i].MessageData)
{
actuallySummaryBatchPayload += messageData.Data.Length;

internalBatchMessages.Enqueue(new InternalMessage(
data: messageData.Data,
topic: partitionSession.TopicPath,
Expand Down
4 changes: 1 addition & 3 deletions src/Ydb.Sdk/src/Services/Topic/TopicSession.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@ internal abstract class TopicSession<TFromClient, TFromServer> : IDisposable

private int _isActive = 1;

public bool IsActive => Volatile.Read(ref _isActive) == 1;

protected TopicSession(
IBidirectionalStream<TFromClient, TFromServer> stream,
ILogger logger,
Expand All @@ -26,7 +24,7 @@ protected TopicSession(
_initialize = initialize;
}

internal bool IsActive => Volatile.Read(ref _isActive) == 1;
public bool IsActive => Volatile.Read(ref _isActive) == 1;

protected async void ReconnectSession()
{
Expand Down

0 comments on commit 458edff

Please sign in to comment.