From b34c200afd9ad570775994504240c7465bd6c326 Mon Sep 17 00:00:00 2001 From: karel rehor Date: Tue, 14 Jan 2025 17:15:39 +0100 Subject: [PATCH 1/2] fix: ear 5762, LPBatcher line longer than size. --- influxdb3/batching/lp_batcher.go | 24 +++++++++++++++++++++++- influxdb3/batching/lp_batcher_test.go | 25 +++++++++++++++++++++++++ 2 files changed, 48 insertions(+), 1 deletion(-) diff --git a/influxdb3/batching/lp_batcher.go b/influxdb3/batching/lp_batcher.go index 9170e1e..912c649 100644 --- a/influxdb3/batching/lp_batcher.go +++ b/influxdb3/batching/lp_batcher.go @@ -162,16 +162,38 @@ func (lpb *LPBatcher) Emit() []byte { } func (lpb *LPBatcher) emitBytes() []byte { + firstLF := bytes.IndexByte(lpb.buffer, '\n') + + var packet []byte + c := min(lpb.size, len(lpb.buffer)) if c == 0 { // i.e. buffer is empty return lpb.buffer } + // with no '\n' record separator + // just emit whole buffer + if firstLF == -1 { + packet = lpb.buffer + lpb.buffer = lpb.buffer[:0] + return packet + } + + // With first line larger than defined size + // just emit first line + if firstLF > lpb.size { + packet = lpb.buffer[:firstLF] + lpb.buffer = lpb.buffer[len(packet)+1:] // remove trailing '\n' + return packet + } + + // otherwise: process buffer where len(buffer) > size with multiple lines + prepacket := lpb.buffer[:c] lastLF := bytes.LastIndexByte(prepacket, '\n') + 1 - packet := lpb.buffer[:lastLF] + packet = lpb.buffer[:lastLF] lpb.buffer = lpb.buffer[len(packet):] return packet diff --git a/influxdb3/batching/lp_batcher_test.go b/influxdb3/batching/lp_batcher_test.go index 03dddc7..49ac739 100644 --- a/influxdb3/batching/lp_batcher_test.go +++ b/influxdb3/batching/lp_batcher_test.go @@ -283,3 +283,28 @@ func TestLPAddLargerThanSize(t *testing.T) { assert.Equal(t, len(remainBuffer), lpb.CurrentLoadSize()) assert.Equal(t, remainBuffer, lpb.buffer) } + +// see EAR 5762 +func TestLPAddLinesLargerThanSize(t *testing.T) { + batchSize := 16 + loadFactor := 10 + capacity := batchSize * loadFactor + + linesWithCRLF := []string{ + "0123456789ABCDEFZZ", // len 18 + "ZZFEDCBA9876543210", // len 18 + } + + emitCt := 0 + resultBuffer := make([]byte, 0) + lpb := NewLPBatcher( + WithBufferSize(batchSize), + WithBufferCapacity(capacity), + WithEmitBytesCallback(func(ba []byte) { + emitCt++ + resultBuffer = append(resultBuffer, ba...) + })) + lpb.Add(linesWithCRLF...) + assert.Equal(t, 2, emitCt, "Emit should be called correct number of times") + assert.Equal(t, strings.Join(linesWithCRLF, ""), string(resultBuffer)) +} From b826b60ebaac77a9a27a08c41dc201ca5850cd62 Mon Sep 17 00:00:00 2001 From: karel rehor Date: Tue, 14 Jan 2025 17:29:51 +0100 Subject: [PATCH 2/2] chore: LPBatcher - remove unused paranoid block --- influxdb3/batching/lp_batcher.go | 8 -------- 1 file changed, 8 deletions(-) diff --git a/influxdb3/batching/lp_batcher.go b/influxdb3/batching/lp_batcher.go index 912c649..ad9b8a3 100644 --- a/influxdb3/batching/lp_batcher.go +++ b/influxdb3/batching/lp_batcher.go @@ -172,14 +172,6 @@ func (lpb *LPBatcher) emitBytes() []byte { return lpb.buffer } - // with no '\n' record separator - // just emit whole buffer - if firstLF == -1 { - packet = lpb.buffer - lpb.buffer = lpb.buffer[:0] - return packet - } - // With first line larger than defined size // just emit first line if firstLF > lpb.size {