diff --git a/CHANGELOG.md b/CHANGELOG.md index 584ea9e99..0cb5fdbbc 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,4 @@ +* Fixed sometime panic on topic writer closing * Added experimental query parameters builder `ydb.ParamsBuilder()` * Changed types of `table/table.{QueryParameters,ParameterOption}` to aliases on `internal/params.{Parameters,NamedValue}` * Fixed bug with optional decimal serialization diff --git a/internal/topic/topicwriterinternal/queue.go b/internal/topic/topicwriterinternal/queue.go index 8e15833fc..25ff072d3 100644 --- a/internal/topic/topicwriterinternal/queue.go +++ b/internal/topic/topicwriterinternal/queue.go @@ -14,6 +14,7 @@ import ( var ( errCloseClosedMessageQueue = xerrors.Wrap(errors.New("ydb: close closed message queue")) + errAckOnClosedMessageQueue = xerrors.Wrap(errors.New("ydb: ack on closed message queue")) errGetMessageFromClosedQueue = xerrors.Wrap(errors.New("ydb: get message from closed message queue")) errAddUnorderedMessages = xerrors.Wrap(errors.New("ydb: add unordered messages")) errAckUnexpectedMessage = xerrors.Wrap(errors.New("ydb: ack unexpected message")) @@ -151,6 +152,9 @@ func (q *messageQueue) AcksReceived(acks []rawtopicwriter.WriteAck) error { q.OnAckReceived(ackReceivedCounter) } }() + if q.closed { + return xerrors.WithStackTrace(errAckOnClosedMessageQueue) + } for i := range acks { if err := q.ackReceivedNeedLock(acks[i].SeqNo); err != nil { diff --git a/internal/topic/topicwriterinternal/queue_test.go b/internal/topic/topicwriterinternal/queue_test.go index 8cc6ade34..d62e6b16e 100644 --- a/internal/topic/topicwriterinternal/queue_test.go +++ b/internal/topic/topicwriterinternal/queue_test.go @@ -407,6 +407,26 @@ func TestQueuePanicOnOverflow(t *testing.T) { }) } +func TestRegressionIssue1038_ReceiveAckAfterCloseQueue(t *testing.T) { + counter := 0 + + q := newMessageQueue() + q.OnAckReceived = func(count int) { + counter -= count + } + require.NoError(t, q.AddMessages(newTestMessagesWithContent(1))) + counter++ + + require.NoError(t, q.Close(errors.New("test err"))) + require.ErrorIs(t, q.AcksReceived([]rawtopicwriter.WriteAck{ + { + SeqNo: 1, + MessageWriteStatus: rawtopicwriter.MessageWriteStatus{}, + }, + }), errAckOnClosedMessageQueue) + require.Zero(t, counter) +} + func TestQueue_Ack(t *testing.T) { t.Run("First", func(t *testing.T) { q := newMessageQueue() diff --git a/internal/topic/topicwriterinternal/writer_single_stream.go b/internal/topic/topicwriterinternal/writer_single_stream.go index 444ae3d3d..a3f4cbcf3 100644 --- a/internal/topic/topicwriterinternal/writer_single_stream.go +++ b/internal/topic/topicwriterinternal/writer_single_stream.go @@ -189,7 +189,7 @@ func (w *SingleStreamWriter) receiveMessagesLoop(ctx context.Context) { switch m := mess.(type) { case *rawtopicwriter.WriteResult: - if err = w.cfg.queue.AcksReceived(m.Acks); err != nil { + if err = w.cfg.queue.AcksReceived(m.Acks); err != nil && !errors.Is(err, errCloseClosedMessageQueue) { reason := xerrors.WithStackTrace(err) closeCtx, closeCtxCancel := xcontext.WithCancel(ctx) closeCtxCancel()