Skip to content

Commit

Permalink
internal message to public message - because the message is part of p…
Browse files Browse the repository at this point in the history
…ublic interface
  • Loading branch information
rekby committed Dec 14, 2023
1 parent f1d4f44 commit bee38c7
Show file tree
Hide file tree
Showing 9 changed files with 33 additions and 29 deletions.
12 changes: 6 additions & 6 deletions internal/topic/topicwriterinternal/encoders_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,13 +51,13 @@ func TestEncoderSelector_CodecMeasure(t *testing.T) {
var messages []messageWithDataContent
for i := 0; i < smallCount; i++ {
data := make([]byte, smallSize)
message := newMessageDataWithContent(Message{Data: bytes.NewReader(data)}, testCommonEncoders)
message := newMessageDataWithContent(PublicMessage{Data: bytes.NewReader(data)}, testCommonEncoders)
messages = append(messages, message)
}

for i := 0; i < largeCount; i++ {
data := make([]byte, largeSize)
message := newMessageDataWithContent(Message{Data: bytes.NewReader(data)}, testCommonEncoders)
message := newMessageDataWithContent(PublicMessage{Data: bytes.NewReader(data)}, testCommonEncoders)
messages = append(messages, message)
}

Expand Down Expand Up @@ -160,7 +160,7 @@ func TestCompressMessages(t *testing.T) {
require.NoError(t, readInParallelWithCodec(messages, rawtopiccommon.CodecRaw, 1))
})
t.Run("RawError", func(t *testing.T) {
mess := newMessageDataWithContent(Message{}, testCommonEncoders)
mess := newMessageDataWithContent(PublicMessage{}, testCommonEncoders)
_, err := mess.GetEncodedBytes(rawtopiccommon.CodecGzip)
require.NoError(t, err)
messages := []messageWithDataContent{mess}
Expand All @@ -171,7 +171,7 @@ func TestCompressMessages(t *testing.T) {
t.Run("GzipOneThread", func(t *testing.T) {
var messages []messageWithDataContent
for i := 0; i < messageCount; i++ {
mess := newMessageDataWithContent(Message{Data: strings.NewReader("asdf")}, testCommonEncoders)
mess := newMessageDataWithContent(PublicMessage{Data: strings.NewReader("asdf")}, testCommonEncoders)
messages = append(messages, mess)
}

Expand All @@ -185,7 +185,7 @@ func TestCompressMessages(t *testing.T) {
t.Run("GzipOk", func(t *testing.T) {
var messages []messageWithDataContent
for i := 0; i < messageCount; i++ {
mess := newMessageDataWithContent(Message{Data: strings.NewReader("asdf")}, testCommonEncoders)
mess := newMessageDataWithContent(PublicMessage{Data: strings.NewReader("asdf")}, testCommonEncoders)
messages = append(messages, mess)
}

Expand All @@ -198,7 +198,7 @@ func TestCompressMessages(t *testing.T) {
t.Run("GzipErr", func(t *testing.T) {
var messages []messageWithDataContent
for i := 0; i < messageCount; i++ {
mess := newMessageDataWithContent(Message{Data: strings.NewReader("asdf")}, testCommonEncoders)
mess := newMessageDataWithContent(PublicMessage{Data: strings.NewReader("asdf")}, testCommonEncoders)
messages = append(messages, mess)
}
messages[0].dataWasRead = true
Expand Down
10 changes: 5 additions & 5 deletions internal/topic/topicwriterinternal/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (

var errNoRawContent = xerrors.Wrap(errors.New("ydb: internal state error - no raw message content"))

type Message struct {
type PublicMessage struct {
SeqNo int64
CreatedAt time.Time
Data io.Reader
Expand Down Expand Up @@ -53,7 +53,7 @@ func NewPartitioningWithPartitionID(id int64) PublicFuturePartitioning {
}

type messageWithDataContent struct {
Message
PublicMessage

dataWasRead bool
encoders *EncoderMap
Expand Down Expand Up @@ -182,11 +182,11 @@ func (m *messageWithDataContent) getEncodedBytes(codec rawtopiccommon.Codec) ([]
}

func newMessageDataWithContent(
message Message, //nolint:gocritic
message PublicMessage, //nolint:gocritic
encoders *EncoderMap,
) messageWithDataContent {
return messageWithDataContent{
Message: message,
encoders: encoders,
PublicMessage: message,
encoders: encoders,
}
}
2 changes: 1 addition & 1 deletion internal/topic/topicwriterinternal/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func NewWriter(cred credentials.Credentials, options []PublicWriterOption) (*Wri
}, nil
}

func (w *Writer) Write(ctx context.Context, messages ...Message) error {
func (w *Writer) Write(ctx context.Context, messages ...PublicMessage) error {
if err := ctx.Err(); err != nil {
return err
}
Expand Down
4 changes: 2 additions & 2 deletions internal/topic/topicwriterinternal/writer_reconnector.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ func (w *WriterReconnector) start() {
w.background.Start(name+", sendloop", w.connectionLoop)
}

func (w *WriterReconnector) Write(ctx context.Context, messages []Message) error {
func (w *WriterReconnector) Write(ctx context.Context, messages []PublicMessage) error {
if err := w.background.CloseReason(); err != nil {
return xerrors.WithStackTrace(fmt.Errorf("ydb: writer is closed: %w", err))
}
Expand Down Expand Up @@ -291,7 +291,7 @@ func (w *WriterReconnector) checkMessages(messages []messageWithDataContent) err
return nil
}

func (w *WriterReconnector) createMessagesWithContent(messages []Message) ([]messageWithDataContent, error) {
func (w *WriterReconnector) createMessagesWithContent(messages []PublicMessage) ([]messageWithDataContent, error) {
res := make([]messageWithDataContent, 0, len(messages))
for i := range messages {
mess := newMessageDataWithContent(messages[i], w.encodersMap)
Expand Down
24 changes: 14 additions & 10 deletions internal/topic/topicwriterinternal/writer_reconnector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,10 +77,10 @@ func TestWriterImpl_CheckMessages(t *testing.T) {
maxSize := 5
w.cfg.MaxMessageSize = maxSize

err := w.Write(ctx, []Message{{Data: bytes.NewReader(make([]byte, maxSize))}})
err := w.Write(ctx, []PublicMessage{{Data: bytes.NewReader(make([]byte, maxSize))}})
require.NoError(t, err)

err = w.Write(ctx, []Message{{Data: bytes.NewReader(make([]byte, maxSize+1))}})
err = w.Write(ctx, []PublicMessage{{Data: bytes.NewReader(make([]byte, maxSize+1))}})
require.Error(t, err)
})
}
Expand Down Expand Up @@ -141,7 +141,11 @@ func TestWriterImpl_Write(t *testing.T) {

writeCompleted := make(empty.Chan)
go func() {
err := e.writer.Write(e.ctx, []Message{{SeqNo: seqNo, CreatedAt: messageTime, Data: bytes.NewReader(messageData)}})
err := e.writer.Write(e.ctx, []PublicMessage{{
SeqNo: seqNo,
CreatedAt: messageTime,
Data: bytes.NewReader(messageData),
}})
require.NoError(t, err)
close(writeCompleted)
}()
Expand Down Expand Up @@ -187,7 +191,7 @@ func TestWriterImpl_WriteCodecs(t *testing.T) {
})

require.NoError(t, err)
require.NoError(t, e.writer.Write(e.ctx, []Message{{
require.NoError(t, e.writer.Write(e.ctx, []PublicMessage{{
Data: bytes.NewReader(messContent),
}}))

Expand Down Expand Up @@ -215,7 +219,7 @@ func TestWriterImpl_WriteCodecs(t *testing.T) {
})

require.NoError(t, err)
require.NoError(t, e.writer.Write(e.ctx, []Message{{
require.NoError(t, e.writer.Write(e.ctx, []PublicMessage{{
Data: bytes.NewReader(messContent),
}}))

Expand All @@ -242,7 +246,7 @@ func TestWriterImpl_WriteCodecs(t *testing.T) {
codecs := make(map[rawtopiccommon.Codec]empty.Struct)

for i := 0; i < codecMeasureIntervalBatches; i++ {
require.NoError(t, e.writer.Write(e.ctx, []Message{{
require.NoError(t, e.writer.Write(e.ctx, []PublicMessage{{
Data: bytes.NewReader(messContentShort),
}}))
// wait send
Expand All @@ -251,7 +255,7 @@ func TestWriterImpl_WriteCodecs(t *testing.T) {
}

for i := 0; i < codecMeasureIntervalBatches; i++ {
require.NoError(t, e.writer.Write(e.ctx, []Message{{
require.NoError(t, e.writer.Write(e.ctx, []PublicMessage{{
Data: bytes.NewReader(messContentLong),
}}))
// wait send
Expand Down Expand Up @@ -711,12 +715,12 @@ func TestCalculateAllowedCodecs(t *testing.T) {
}

func newTestMessageWithDataContent(num int) messageWithDataContent {
res := newMessageDataWithContent(Message{SeqNo: int64(num)}, testCommonEncoders)
res := newMessageDataWithContent(PublicMessage{SeqNo: int64(num)}, testCommonEncoders)
return res
}

func newTestMessages(numbers ...int) []Message {
messages := make([]Message, len(numbers))
func newTestMessages(numbers ...int) []PublicMessage {
messages := make([]PublicMessage, len(numbers))
for i, num := range numbers {
messages[i].SeqNo = int64(num)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (

//go:generate mockgen -source writer_stream_interface.go -destination writer_stream_interface_mock_test.go -package topicwriterinternal -write_package_comment=false
type StreamWriter interface {
Write(ctx context.Context, messages []Message) error
Write(ctx context.Context, messages []PublicMessage) error
WaitInit(ctx context.Context) (info InitialInfo, err error)
Close(ctx context.Context) error
}
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions internal/topic/topicwriterinternal/writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func TestWriterWrite(t *testing.T) {
w := Writer{
streamWriter: strm,
}
require.NoError(t, w.Write(ctx, Message{SeqNo: 1}))
require.NoError(t, w.Write(ctx, PublicMessage{SeqNo: 1}))
})
})
}
Expand All @@ -56,6 +56,6 @@ func TestWriterWriteMessage(t *testing.T) {
w := Writer{
streamWriter: strm,
}
require.NoError(t, w.Write(ctx, Message{SeqNo: 1}, Message{SeqNo: 3}))
require.NoError(t, w.Write(ctx, PublicMessage{SeqNo: 1}, PublicMessage{SeqNo: 3}))
})
}
2 changes: 1 addition & 1 deletion topic/topicwriter/topicwriter.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
)

type (
Message = topicwriterinternal.Message
Message = topicwriterinternal.PublicMessage
)

var ErrQueueLimitExceed = topicwriterinternal.PublicErrQueueIsFull
Expand Down

0 comments on commit bee38c7

Please sign in to comment.