diff --git a/internal/topic/topicwriterinternal/encoders_test.go b/internal/topic/topicwriterinternal/encoders_test.go index feecc6d74..02609f600 100644 --- a/internal/topic/topicwriterinternal/encoders_test.go +++ b/internal/topic/topicwriterinternal/encoders_test.go @@ -51,13 +51,13 @@ func TestEncoderSelector_CodecMeasure(t *testing.T) { var messages []messageWithDataContent for i := 0; i < smallCount; i++ { data := make([]byte, smallSize) - message := newMessageDataWithContent(Message{Data: bytes.NewReader(data)}, testCommonEncoders) + message := newMessageDataWithContent(PublicMessage{Data: bytes.NewReader(data)}, testCommonEncoders) messages = append(messages, message) } for i := 0; i < largeCount; i++ { data := make([]byte, largeSize) - message := newMessageDataWithContent(Message{Data: bytes.NewReader(data)}, testCommonEncoders) + message := newMessageDataWithContent(PublicMessage{Data: bytes.NewReader(data)}, testCommonEncoders) messages = append(messages, message) } @@ -160,7 +160,7 @@ func TestCompressMessages(t *testing.T) { require.NoError(t, readInParallelWithCodec(messages, rawtopiccommon.CodecRaw, 1)) }) t.Run("RawError", func(t *testing.T) { - mess := newMessageDataWithContent(Message{}, testCommonEncoders) + mess := newMessageDataWithContent(PublicMessage{}, testCommonEncoders) _, err := mess.GetEncodedBytes(rawtopiccommon.CodecGzip) require.NoError(t, err) messages := []messageWithDataContent{mess} @@ -171,7 +171,7 @@ func TestCompressMessages(t *testing.T) { t.Run("GzipOneThread", func(t *testing.T) { var messages []messageWithDataContent for i := 0; i < messageCount; i++ { - mess := newMessageDataWithContent(Message{Data: strings.NewReader("asdf")}, testCommonEncoders) + mess := newMessageDataWithContent(PublicMessage{Data: strings.NewReader("asdf")}, testCommonEncoders) messages = append(messages, mess) } @@ -185,7 +185,7 @@ func TestCompressMessages(t *testing.T) { t.Run("GzipOk", func(t *testing.T) { var messages []messageWithDataContent for i := 0; i < messageCount; i++ { - mess := newMessageDataWithContent(Message{Data: strings.NewReader("asdf")}, testCommonEncoders) + mess := newMessageDataWithContent(PublicMessage{Data: strings.NewReader("asdf")}, testCommonEncoders) messages = append(messages, mess) } @@ -198,7 +198,7 @@ func TestCompressMessages(t *testing.T) { t.Run("GzipErr", func(t *testing.T) { var messages []messageWithDataContent for i := 0; i < messageCount; i++ { - mess := newMessageDataWithContent(Message{Data: strings.NewReader("asdf")}, testCommonEncoders) + mess := newMessageDataWithContent(PublicMessage{Data: strings.NewReader("asdf")}, testCommonEncoders) messages = append(messages, mess) } messages[0].dataWasRead = true diff --git a/internal/topic/topicwriterinternal/message.go b/internal/topic/topicwriterinternal/message.go index 2ef3e569e..1749e461c 100644 --- a/internal/topic/topicwriterinternal/message.go +++ b/internal/topic/topicwriterinternal/message.go @@ -14,7 +14,7 @@ import ( var errNoRawContent = xerrors.Wrap(errors.New("ydb: internal state error - no raw message content")) -type Message struct { +type PublicMessage struct { SeqNo int64 CreatedAt time.Time Data io.Reader @@ -53,7 +53,7 @@ func NewPartitioningWithPartitionID(id int64) PublicFuturePartitioning { } type messageWithDataContent struct { - Message + PublicMessage dataWasRead bool encoders *EncoderMap @@ -182,11 +182,11 @@ func (m *messageWithDataContent) getEncodedBytes(codec rawtopiccommon.Codec) ([] } func newMessageDataWithContent( - message Message, //nolint:gocritic + message PublicMessage, //nolint:gocritic encoders *EncoderMap, ) messageWithDataContent { return messageWithDataContent{ - Message: message, - encoders: encoders, + PublicMessage: message, + encoders: encoders, } } diff --git a/internal/topic/topicwriterinternal/writer.go b/internal/topic/topicwriterinternal/writer.go index 9b3758666..d3a74ffd0 100644 --- a/internal/topic/topicwriterinternal/writer.go +++ b/internal/topic/topicwriterinternal/writer.go @@ -40,7 +40,7 @@ func NewWriter(cred credentials.Credentials, options []PublicWriterOption) (*Wri }, nil } -func (w *Writer) Write(ctx context.Context, messages ...Message) error { +func (w *Writer) Write(ctx context.Context, messages ...PublicMessage) error { if err := ctx.Err(); err != nil { return err } diff --git a/internal/topic/topicwriterinternal/writer_reconnector.go b/internal/topic/topicwriterinternal/writer_reconnector.go index 4e34eab96..a0518df11 100644 --- a/internal/topic/topicwriterinternal/writer_reconnector.go +++ b/internal/topic/topicwriterinternal/writer_reconnector.go @@ -207,7 +207,7 @@ func (w *WriterReconnector) start() { w.background.Start(name+", sendloop", w.connectionLoop) } -func (w *WriterReconnector) Write(ctx context.Context, messages []Message) error { +func (w *WriterReconnector) Write(ctx context.Context, messages []PublicMessage) error { if err := w.background.CloseReason(); err != nil { return xerrors.WithStackTrace(fmt.Errorf("ydb: writer is closed: %w", err)) } @@ -291,7 +291,7 @@ func (w *WriterReconnector) checkMessages(messages []messageWithDataContent) err return nil } -func (w *WriterReconnector) createMessagesWithContent(messages []Message) ([]messageWithDataContent, error) { +func (w *WriterReconnector) createMessagesWithContent(messages []PublicMessage) ([]messageWithDataContent, error) { res := make([]messageWithDataContent, 0, len(messages)) for i := range messages { mess := newMessageDataWithContent(messages[i], w.encodersMap) diff --git a/internal/topic/topicwriterinternal/writer_reconnector_test.go b/internal/topic/topicwriterinternal/writer_reconnector_test.go index 91d266136..0141cc67a 100644 --- a/internal/topic/topicwriterinternal/writer_reconnector_test.go +++ b/internal/topic/topicwriterinternal/writer_reconnector_test.go @@ -77,10 +77,10 @@ func TestWriterImpl_CheckMessages(t *testing.T) { maxSize := 5 w.cfg.MaxMessageSize = maxSize - err := w.Write(ctx, []Message{{Data: bytes.NewReader(make([]byte, maxSize))}}) + err := w.Write(ctx, []PublicMessage{{Data: bytes.NewReader(make([]byte, maxSize))}}) require.NoError(t, err) - err = w.Write(ctx, []Message{{Data: bytes.NewReader(make([]byte, maxSize+1))}}) + err = w.Write(ctx, []PublicMessage{{Data: bytes.NewReader(make([]byte, maxSize+1))}}) require.Error(t, err) }) } @@ -141,7 +141,11 @@ func TestWriterImpl_Write(t *testing.T) { writeCompleted := make(empty.Chan) go func() { - err := e.writer.Write(e.ctx, []Message{{SeqNo: seqNo, CreatedAt: messageTime, Data: bytes.NewReader(messageData)}}) + err := e.writer.Write(e.ctx, []PublicMessage{{ + SeqNo: seqNo, + CreatedAt: messageTime, + Data: bytes.NewReader(messageData), + }}) require.NoError(t, err) close(writeCompleted) }() @@ -187,7 +191,7 @@ func TestWriterImpl_WriteCodecs(t *testing.T) { }) require.NoError(t, err) - require.NoError(t, e.writer.Write(e.ctx, []Message{{ + require.NoError(t, e.writer.Write(e.ctx, []PublicMessage{{ Data: bytes.NewReader(messContent), }})) @@ -215,7 +219,7 @@ func TestWriterImpl_WriteCodecs(t *testing.T) { }) require.NoError(t, err) - require.NoError(t, e.writer.Write(e.ctx, []Message{{ + require.NoError(t, e.writer.Write(e.ctx, []PublicMessage{{ Data: bytes.NewReader(messContent), }})) @@ -242,7 +246,7 @@ func TestWriterImpl_WriteCodecs(t *testing.T) { codecs := make(map[rawtopiccommon.Codec]empty.Struct) for i := 0; i < codecMeasureIntervalBatches; i++ { - require.NoError(t, e.writer.Write(e.ctx, []Message{{ + require.NoError(t, e.writer.Write(e.ctx, []PublicMessage{{ Data: bytes.NewReader(messContentShort), }})) // wait send @@ -251,7 +255,7 @@ func TestWriterImpl_WriteCodecs(t *testing.T) { } for i := 0; i < codecMeasureIntervalBatches; i++ { - require.NoError(t, e.writer.Write(e.ctx, []Message{{ + require.NoError(t, e.writer.Write(e.ctx, []PublicMessage{{ Data: bytes.NewReader(messContentLong), }})) // wait send @@ -711,12 +715,12 @@ func TestCalculateAllowedCodecs(t *testing.T) { } func newTestMessageWithDataContent(num int) messageWithDataContent { - res := newMessageDataWithContent(Message{SeqNo: int64(num)}, testCommonEncoders) + res := newMessageDataWithContent(PublicMessage{SeqNo: int64(num)}, testCommonEncoders) return res } -func newTestMessages(numbers ...int) []Message { - messages := make([]Message, len(numbers)) +func newTestMessages(numbers ...int) []PublicMessage { + messages := make([]PublicMessage, len(numbers)) for i, num := range numbers { messages[i].SeqNo = int64(num) } diff --git a/internal/topic/topicwriterinternal/writer_stream_interface.go b/internal/topic/topicwriterinternal/writer_stream_interface.go index 5455641b4..7973f227f 100644 --- a/internal/topic/topicwriterinternal/writer_stream_interface.go +++ b/internal/topic/topicwriterinternal/writer_stream_interface.go @@ -6,7 +6,7 @@ import ( //go:generate mockgen -source writer_stream_interface.go -destination writer_stream_interface_mock_test.go -package topicwriterinternal -write_package_comment=false type StreamWriter interface { - Write(ctx context.Context, messages []Message) error + Write(ctx context.Context, messages []PublicMessage) error WaitInit(ctx context.Context) (info InitialInfo, err error) Close(ctx context.Context) error } diff --git a/internal/topic/topicwriterinternal/writer_stream_interface_mock_test.go b/internal/topic/topicwriterinternal/writer_stream_interface_mock_test.go index fb53d60d6..a663d3409 100644 --- a/internal/topic/topicwriterinternal/writer_stream_interface_mock_test.go +++ b/internal/topic/topicwriterinternal/writer_stream_interface_mock_test.go @@ -63,7 +63,7 @@ func (mr *MockStreamWriterMockRecorder) WaitInit(ctx interface{}) *gomock.Call { } // Write mocks base method. -func (m *MockStreamWriter) Write(ctx context.Context, messages []Message) error { +func (m *MockStreamWriter) Write(ctx context.Context, messages []PublicMessage) error { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "Write", ctx, messages) ret0, _ := ret[0].(error) diff --git a/internal/topic/topicwriterinternal/writer_test.go b/internal/topic/topicwriterinternal/writer_test.go index 96d91cd46..2be44ed68 100644 --- a/internal/topic/topicwriterinternal/writer_test.go +++ b/internal/topic/topicwriterinternal/writer_test.go @@ -41,7 +41,7 @@ func TestWriterWrite(t *testing.T) { w := Writer{ streamWriter: strm, } - require.NoError(t, w.Write(ctx, Message{SeqNo: 1})) + require.NoError(t, w.Write(ctx, PublicMessage{SeqNo: 1})) }) }) } @@ -56,6 +56,6 @@ func TestWriterWriteMessage(t *testing.T) { w := Writer{ streamWriter: strm, } - require.NoError(t, w.Write(ctx, Message{SeqNo: 1}, Message{SeqNo: 3})) + require.NoError(t, w.Write(ctx, PublicMessage{SeqNo: 1}, PublicMessage{SeqNo: 3})) }) } diff --git a/topic/topicwriter/topicwriter.go b/topic/topicwriter/topicwriter.go index d06efe588..341e16da4 100644 --- a/topic/topicwriter/topicwriter.go +++ b/topic/topicwriter/topicwriter.go @@ -7,7 +7,7 @@ import ( ) type ( - Message = topicwriterinternal.Message + Message = topicwriterinternal.PublicMessage ) var ErrQueueLimitExceed = topicwriterinternal.PublicErrQueueIsFull