Skip to content

Commit

Permalink
Merge branch 'master' into query-client-interface
Browse files Browse the repository at this point in the history
  • Loading branch information
asmyasnikov committed Feb 28, 2024
2 parents 37e9d8b + 028cc9f commit e01e6ba
Show file tree
Hide file tree
Showing 4 changed files with 26 additions and 1 deletion.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
* Fixed sometime panic on topic writer closing
* Added experimental query parameters builder `ydb.ParamsBuilder()`
* Changed types of `table/table.{QueryParameters,ParameterOption}` to aliases on `internal/params.{Parameters,NamedValue}`
* Fixed bug with optional decimal serialization
Expand Down
4 changes: 4 additions & 0 deletions internal/topic/topicwriterinternal/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (

var (
errCloseClosedMessageQueue = xerrors.Wrap(errors.New("ydb: close closed message queue"))
errAckOnClosedMessageQueue = xerrors.Wrap(errors.New("ydb: ack on closed message queue"))
errGetMessageFromClosedQueue = xerrors.Wrap(errors.New("ydb: get message from closed message queue"))
errAddUnorderedMessages = xerrors.Wrap(errors.New("ydb: add unordered messages"))
errAckUnexpectedMessage = xerrors.Wrap(errors.New("ydb: ack unexpected message"))
Expand Down Expand Up @@ -151,6 +152,9 @@ func (q *messageQueue) AcksReceived(acks []rawtopicwriter.WriteAck) error {
q.OnAckReceived(ackReceivedCounter)
}
}()
if q.closed {
return xerrors.WithStackTrace(errAckOnClosedMessageQueue)
}

for i := range acks {
if err := q.ackReceivedNeedLock(acks[i].SeqNo); err != nil {
Expand Down
20 changes: 20 additions & 0 deletions internal/topic/topicwriterinternal/queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -407,6 +407,26 @@ func TestQueuePanicOnOverflow(t *testing.T) {
})
}

func TestRegressionIssue1038_ReceiveAckAfterCloseQueue(t *testing.T) {
counter := 0

q := newMessageQueue()
q.OnAckReceived = func(count int) {
counter -= count
}
require.NoError(t, q.AddMessages(newTestMessagesWithContent(1)))
counter++

require.NoError(t, q.Close(errors.New("test err")))
require.ErrorIs(t, q.AcksReceived([]rawtopicwriter.WriteAck{
{
SeqNo: 1,
MessageWriteStatus: rawtopicwriter.MessageWriteStatus{},
},
}), errAckOnClosedMessageQueue)
require.Zero(t, counter)
}

func TestQueue_Ack(t *testing.T) {
t.Run("First", func(t *testing.T) {
q := newMessageQueue()
Expand Down
2 changes: 1 addition & 1 deletion internal/topic/topicwriterinternal/writer_single_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ func (w *SingleStreamWriter) receiveMessagesLoop(ctx context.Context) {

switch m := mess.(type) {
case *rawtopicwriter.WriteResult:
if err = w.cfg.queue.AcksReceived(m.Acks); err != nil {
if err = w.cfg.queue.AcksReceived(m.Acks); err != nil && !errors.Is(err, errCloseClosedMessageQueue) {
reason := xerrors.WithStackTrace(err)
closeCtx, closeCtxCancel := xcontext.WithCancel(ctx)
closeCtxCancel()
Expand Down

0 comments on commit e01e6ba

Please sign in to comment.