diff --git a/influxdb3/batching/batcher.go b/influxdb3/batching/batcher.go index aef4157..22ed7ed 100644 --- a/influxdb3/batching/batcher.go +++ b/influxdb3/batching/batcher.go @@ -24,30 +24,19 @@ THE SOFTWARE. package batching import ( + "fmt" "log/slog" "sync" "github.com/InfluxCommunity/influxdb3-go/influxdb3" ) -const ( - BatchUnknown = iota - BatchPoints = iota - BatchLP = iota -) - -type Option func(*interface{}) - // Option to adapt properties of a batcher -type PBOption func(*Batcher) +type Option func(*interface{}) // WithSize changes the batch-size emitted by the batcher -/*func WithSize(size int) PBOption { - return func(b *Batcher) { - b.size = size - } -}*/ - +// With the standard Batcher the implied unit is a Point +// With the LPBatcher the implied unit is a byte func WithSize(size int) Option { return func(b *interface{}) { if bb, bok := (*b).(*Batcher); bok { @@ -55,18 +44,14 @@ func WithSize(size int) Option { } else if lb, lok := (*b).(*LPBatcher); lok { lb.size = size } else { - slog.Warn("Failed to match Batcher in WithCapacityOpt. Value not set.") + slog.Warn("Failed to match Batcher type in WithSize. Value not set.") } } } -// WithCapacity changes the initial capacity of the points buffer -/*func WithCapacity(capacity int) PBOption { - return func(b *Batcher) { - b.capacity = capacity - } -}*/ - +// WithCapacity changes the initial capacity of the internal buffer +// With the standard Batcher implied unit is a Point +// With the LPBatcher the implied unit is a byte func WithCapacity(capacity int) Option { return func(b *interface{}) { if bb, bok := (*b).(*Batcher); bok { @@ -74,7 +59,7 @@ func WithCapacity(capacity int) Option { } else if lb, lok := (*b).(*LPBatcher); lok { lb.capacity = capacity } else { - slog.Warn("Failed to match Batcher in WithCapacityOpt. Value not set.") + slog.Warn("Failed to match Batcher type in WithCapacity. Value not set.") } } } @@ -82,12 +67,6 @@ func WithCapacity(capacity int) Option { // WithReadyCallback sets the function called when a new batch is ready. The // batcher will wait for the callback to finish, so please return as fast as // possible and move long-running processing to a go-routine. -/*func WithReadyCallback(f func()) PBOption { - return func(b *Batcher) { - b.callbackReady = f - } -}*/ - func WithReadyCallback(f func()) Option { return func(b *interface{}) { if bb, bok := (*b).(*Batcher); bok { @@ -95,27 +74,21 @@ func WithReadyCallback(f func()) Option { } else if lb, lok := (*b).(*LPBatcher); lok { lb.callbackReady = f } else { - slog.Warn("Failed to match Batcher in WithReadyCallbackOpt. Callback not set.") + slog.Warn("Failed to match Batcher type in WithReadyCallback. Callback not set.") } } } -// WithEmitPointsCallback sets the function called when a new batch is ready with the +// WithEmitCallback sets the function called when a new batch is ready with the // batch of points. The batcher will wait for the callback to finish, so please // return as fast as possible and move long-running processing to a go-routine. -/*func WithEmitPointsCallback(f func([]*influxdb3.Point)) PBOption { - return func(b *Batcher) { - b.callbackEmit = f - } -}*/ - -func WithEmitPointsCallback(f func([]*influxdb3.Point)) Option { +func WithEmitCallback(f func([]*influxdb3.Point)) Option { return func(b *interface{}) { if bb, bok := (*b).(*Batcher); bok { bb.callbackEmit = f } else { - slog.Warn("Failed to match Batcher in WithEmitPointsCallbackOpt. Callback not set.") + slog.Warn("Failed to match type Batcher in WithEmitPointsCallback. Callback not set.") } } } @@ -175,7 +148,6 @@ func (b *Batcher) Add(p ...*influxdb3.Point) { // Add the point b.points = append(b.points, p...) - //b.addToBuffer(interfaces...) // Call callbacks if a new batch is ready for b.isReady() { @@ -186,6 +158,13 @@ func (b *Batcher) Add(p ...*influxdb3.Point) { b.callbackEmit(b.emitPoints()) } else { // no emitter callback + if b.CurrentLoadSize() >= (b.capacity - b.size) { + slog.Warn( + fmt.Sprintf("Batcher is ready, but no callbackEmit is available. "+ + "Batcher load is %d points waiting to be emitted.", + b.CurrentLoadSize()), + ) + } break } } @@ -221,61 +200,15 @@ func (b *Batcher) emitPoints() []*influxdb3.Point { return points } -/* -func (b *Batcher) addToBuffer(items ...*interface{}) { - b.Lock() - defer b.Unlock() - - b.buffer = append(b.buffer, items...) - - //Call callbacks if a new batch is ready - if b.isReady() { - if b.callbackReady != nil { - b.callbackReady() - } - if b.callbackEmit != nil { - // ??? and if its line protocol? - b.callbackEmit(b.emitPoints()) - } - } +// Flush drains all points even if buffer currently larger than size. +// It does not call the callbackEmit method +func (b *Batcher) Flush() []*influxdb3.Point { + slog.Info(fmt.Sprintf("Flushing all points (%d) from buffer.", b.CurrentLoadSize())) + points := b.points + b.points = b.points[len(points):] + return points } -*/ -/* -func (b *Batcher) AddPoints(p ...*influxdb3.Point) error { - //b.Lock() - //defer b.Unlock() - if b.idiom != BatchIdiomPoints { - if len(b.buffer) == 0 { - b.idiom = BatchIdiomPoints - } else { - return errors.New("this batcher does not support the Point idiom") - } - } - interfaces := make([]*interface{}, len(p)) - // Add the point - for i, point := range p { - var iface interface{} = point - interfaces[i] = &iface - } - //b.points = append(b.points, p...) - b.addToBuffer(interfaces...) - return nil -} */ -/* -func (b *Batcher) AddLP(lines ...string) error { - if b.idiom != BatchIdiomLP { - if len(b.buffer) == 0 { - b.idiom = BatchIdiomLP - } else { - return errors.New("this batcher does not support the Line Protocol (LP) idiom") - } - } - interfaces := make([]*interface{}, len(lines)) - for n, line := range lines { - var iface interface{} = line - interfaces[n] = &iface - } - b.addToBuffer(interfaces...) - return nil -} */ +func (b *Batcher) CurrentLoadSize() int { + return len(b.points) +} diff --git a/influxdb3/batching/batcher_test.go b/influxdb3/batching/batcher_test.go index 2c6d1d9..459c0b5 100644 --- a/influxdb3/batching/batcher_test.go +++ b/influxdb3/batching/batcher_test.go @@ -24,13 +24,11 @@ package batching import ( "fmt" - "reflect" "sync" "testing" "time" "github.com/InfluxCommunity/influxdb3-go/influxdb3" - "github.com/influxdata/line-protocol/v2/lineprotocol" "github.com/stretchr/testify/assert" ) @@ -55,46 +53,20 @@ func TestCustomValues(t *testing.T) { assert.Equal(t, capacity, cap(b.points)) } -func TestAddAndDefaultEmitPointDefault(t *testing.T) { - batchSize := 10 - //emitted := false - points2emit := make([]*influxdb3.Point, batchSize) - - b := NewBatcher(WithSize(batchSize)) - for n := range batchSize / 2 { - points2emit[n] = influxdb3.NewPointWithMeasurement("pointtest"). - SetTag("foo", "bar"). - SetIntegerField("count", int64(n+1)) - } - fmt.Printf("\nDEBUG points2emit[0]: %v\n", reflect.TypeOf(points2emit[0])) - b.Add(points2emit...) - // force Emit - result := b.Emit() - fmt.Printf("\nDEBUG Inspect result %+v", result) - fmt.Printf("\nDEBUG Inspect result %+v", reflect.TypeOf(result[0])) - lp, err := result[0].MarshalBinary(lineprotocol.Millisecond) - if err != nil { - fmt.Printf("err: %s\n", err) - } - fmt.Printf("\nDEBUG Inspect lp %s", string(lp)) -} - -func TestAddAndCallBackEmitPoint(t *testing.T) { +func TestAddAndCallBackEmit(t *testing.T) { batchSize := 5 emitted := false var emittedPoints []*influxdb3.Point b := NewBatcher( WithSize(batchSize), - WithEmitPointsCallback(func(points []*influxdb3.Point) { - fmt.Printf("callback called with %v\n", points) + WithEmitCallback(func(points []*influxdb3.Point) { emitted = true emittedPoints = points }), ) for range batchSize { - fmt.Printf("Adding point") b.Add(&influxdb3.Point{}) } @@ -139,7 +111,7 @@ func TestPartialEmit(t *testing.T) { b := NewBatcher( WithSize(batchSize), - WithEmitPointsCallback(func(points []*influxdb3.Point) { + WithEmitCallback(func(points []*influxdb3.Point) { emitted = true }), ) @@ -159,7 +131,7 @@ func TestThreadSafety(t *testing.T) { emits := 0 b := NewBatcher( WithSize(batchSize), - WithEmitPointsCallback(func(points []*influxdb3.Point) { + WithEmitCallback(func(points []*influxdb3.Point) { emits++ }), ) @@ -184,7 +156,9 @@ func TestThreadSafety(t *testing.T) { func TestAddLargerThanSize(t *testing.T) { batchSize := 5 emitCt := 0 - pointSet := make([]*influxdb3.Point, (batchSize*10)+3) + loadFactor := 10 + remainder := 3 + pointSet := make([]*influxdb3.Point, (batchSize*loadFactor)+remainder) for ct := range pointSet { pointSet[ct] = influxdb3.NewPoint("test", map[string]string{"foo": "bar"}, @@ -192,29 +166,38 @@ func TestAddLargerThanSize(t *testing.T) { time.Now()) } - for _, v := range pointSet { - lp, _ := v.MarshalBinary(lineprotocol.Millisecond) - fmt.Printf("%s", string(lp)) - } - subSet := pointSet[:5] - fmt.Printf("DEBUG subset: %v\n", subSet) resultSet := make([]*influxdb3.Point, 0) b := NewBatcher(WithSize(batchSize), - WithEmitPointsCallback(func(points []*influxdb3.Point) { + WithCapacity(batchSize*3), + WithEmitCallback(func(points []*influxdb3.Point) { resultSet = append(resultSet, points...) emitCt++ })) b.Add(pointSet...) - fmt.Printf("\nDEBUG emitCt: %d\n", emitCt) - fmt.Printf("\nDEBUG len(resultSet): %d\n", len(resultSet)) - fmt.Printf("\nDEBUG len(b.points): %d\n", len(b.points)) - fmt.Printf("\nDEBUG dump resultSet:\n") - for _, rp := range resultSet { - lp, _ := rp.MarshalBinary(lineprotocol.Millisecond) - fmt.Printf("%s", string(lp)) + expectedCt := len(pointSet) / batchSize + assert.Equal(t, expectedCt, emitCt) + assert.Equal(t, loadFactor*batchSize, len(resultSet)) + fmt.Printf("DEBUG resultSet %d\n", len(resultSet)) + assert.Equal(t, remainder, len(b.points)) + assert.Equal(t, pointSet[:len(pointSet)-remainder], resultSet) +} + +func TestFlush(t *testing.T) { + batchSize := 5 + loadFactor := 3 + pointSet := make([]*influxdb3.Point, batchSize*loadFactor) + for ct := range pointSet { + pointSet[ct] = influxdb3.NewPoint("test", + map[string]string{"foo": "bar"}, + map[string]interface{}{"count": ct + 1}, + time.Now()) } - fmt.Println() - //assert.Equal(t, pointSet, resultSet) + b := NewBatcher(WithSize(batchSize), WithCapacity(batchSize*2)) + b.Add(pointSet...) + assert.Equal(t, batchSize*loadFactor, b.CurrentLoadSize()) + flushed := b.Flush() + assert.Equal(t, batchSize*loadFactor, len(flushed)) + assert.Equal(t, 0, b.CurrentLoadSize()) } diff --git a/influxdb3/batching/example_test.go b/influxdb3/batching/example_test.go index 7b0e668..bed2883 100644 --- a/influxdb3/batching/example_test.go +++ b/influxdb3/batching/example_test.go @@ -90,7 +90,7 @@ func Example_batcher() { b = batching.NewBatcher( batching.WithSize(5), batching.WithReadyCallback(func() { fmt.Println("ready") }), - batching.WithEmitPointsCallback(func(points []*influxdb3.Point) { + batching.WithEmitCallback(func(points []*influxdb3.Point) { err = client.WritePoints(context.Background(), points) if err != nil { log.Fatal(err) diff --git a/influxdb3/batching/lp_batcher.go b/influxdb3/batching/lp_batcher.go index 162deac..4a69280 100644 --- a/influxdb3/batching/lp_batcher.go +++ b/influxdb3/batching/lp_batcher.go @@ -2,36 +2,20 @@ package batching import ( "bytes" + "fmt" + "log/slog" "sync" ) const DefaultBufferSize = 100000 const DefaultBufferCapacity = DefaultBufferSize * 2 -//type LPOption func(*LPBatcher) - -/* -func WithBufferSize(size int) func(*interface{}) { - return func(b *interface{}) { - if lpb, ok := (*b).(*LPBatcher); ok { - lpb.size = size - } - } -} - -func WithBufferCapacity(capacity int) Option { - return func(b *interface{}) { - if lpb, ok := (*b).(*LPBatcher); ok { - lpb.capacity = capacity - } - } -} -*/ - func WithEmitBytesCallback(f func([]byte)) Option { return func(b *interface{}) { if lpb, ok := (*b).(*LPBatcher); ok { lpb.callbackEmit = f + } else { + slog.Warn("Failed to match type LPBatcher in WithEmitBytesCallback. Callback not set.") } } } @@ -72,18 +56,29 @@ func (l *LPBatcher) Add(lines ...string) { for _, line := range lines { if len(line) != 0 { // ignore empty lines l.buffer = append(l.buffer, line...) - if line[len(line)-1] != '\n' { + if line[len(line)-1] != '\n' { //ensure newline demarcation l.buffer = append(l.buffer, '\n') } } } - if l.isReady() { + for l.isReady() { if l.callbackReady != nil { l.callbackReady() } if l.callbackEmit != nil { l.callbackEmit(l.emitBytes()) + } else { + // no emitter callback + if l.CurrentLoadSize() > (l.capacity - l.size) { + slog.Warn( + fmt.Sprintf("Batcher is ready, but no callbackEmit is available. "+ + "Batcher load is %d bytes waiting to be emitted.", + l.CurrentLoadSize()), + ) + } + break + } } } @@ -98,17 +93,14 @@ func (l *LPBatcher) isReady() bool { return len(l.buffer) >= l.size } -func (b *LPBatcher) Emit() []byte { - b.Lock() - defer b.Unlock() - - packet := b.emitBytes() - - if b.callbackEmit != nil { - b.callbackEmit(packet) - } +// Emit returns a new batch of bytes with the provided batch size or with the +// remaining bytes. Please drain the bytes at the end of your processing to +// get the remaining bytes not filling up a batch. +func (l *LPBatcher) Emit() []byte { + l.Lock() + defer l.Unlock() - return packet + return l.emitBytes() } func (l *LPBatcher) emitBytes() []byte { @@ -116,8 +108,27 @@ func (l *LPBatcher) emitBytes() []byte { prepacket := l.buffer[:c] lastLF := bytes.LastIndexByte(prepacket, '\n') + + if len(prepacket) < 1 || lastLF < 0 { + return prepacket + } packet := l.buffer[:lastLF] l.buffer = l.buffer[len(packet):] + if len(l.buffer) == 1 && l.buffer[0] == '\n' { // removing lingering delimiter + l.buffer = l.buffer[1:] + } return packet } + +// Flush drains all bytes even if buffer currently larger than size +func (l *LPBatcher) Flush() []byte { + slog.Info(fmt.Sprintf("Flushing all bytes (%d) from buffer.", l.CurrentLoadSize())) + packet := l.buffer + l.buffer = l.buffer[len(packet):] + return packet +} + +func (l *LPBatcher) CurrentLoadSize() int { + return len(l.buffer) +} diff --git a/influxdb3/batching/lp_batcher_test.go b/influxdb3/batching/lp_batcher_test.go index b7725c4..a60740d 100644 --- a/influxdb3/batching/lp_batcher_test.go +++ b/influxdb3/batching/lp_batcher_test.go @@ -2,14 +2,38 @@ package batching import ( "fmt" - "math" "strings" + "sync" "testing" "github.com/stretchr/testify/assert" ) -func TestLineProtocolBatcherCreate(t *testing.T) { +func TestLPDefaultValues(t *testing.T) { + lpb := NewLPBatcher() + + assert.Equal(t, DefaultBufferSize, lpb.size) + assert.Equal(t, DefaultBufferCapacity, lpb.capacity) + assert.Nil(t, lpb.callbackReady) + assert.Nil(t, lpb.callbackEmit) +} + +func TestLPCustomValues(t *testing.T) { + size := 2048 + capacity := size * 2 + + lpb := NewLPBatcher( + WithSize(size), + WithCapacity(capacity), + ) + + assert.Equal(t, size, lpb.size) + assert.Equal(t, capacity, lpb.capacity) + assert.Nil(t, lpb.callbackReady) + assert.Nil(t, lpb.callbackEmit) +} + +func TestLPBatcherCreate(t *testing.T) { size := 1000 capacity := size * 2 @@ -35,7 +59,67 @@ func TestLineProtocolBatcherCreate(t *testing.T) { assert.Nil(t, l.callbackReady) } -func TestAddAndEmitLineProtocolDefault(t *testing.T) { +func TestLPReady(t *testing.T) { + size := 10 + capacity := size * 2 + lpb := NewLPBatcher(WithSize(size), WithCapacity(capacity)) + lpb.Add("0123456789ABCDEF") + + assert.True(t, lpb.Ready(), "LPBatcher should be ready when the batch size is reached") +} + +func TestLPReadyCallback(t *testing.T) { + size := 10 + capacity := size * 2 + readyCalled := false + + lpb := NewLPBatcher(WithSize(size), + WithCapacity(capacity), + WithReadyCallback(func() { + readyCalled = true + })) + + lpb.Add("0123456789ABCDEF") + + assert.True(t, readyCalled) +} + +func TestLPAddAndPartialEmit(t *testing.T) { + size := 500 + capacity := size * 2 + emitCount := 0 + emittedBytes := make([]byte, 0) + + lineTemplate := "cpu,location=tabor fVal=2.71,count=%di" + lines := make([]string, 5) + lineByteCt := 0 + for n := range 5 { + lines[n] = fmt.Sprintf(lineTemplate, n+1) + lineByteCt += len([]byte(lines[n])) + 1 + } + + verif := strings.Join(lines, "\n") + + lpb := NewLPBatcher( + WithSize(size), + WithCapacity(capacity), + WithEmitBytesCallback(func(ba []byte) { + emitCount++ + emittedBytes = append(emittedBytes, ba...) + })) + lpb.Add(lines...) + + assert.Equal(t, lineByteCt, lpb.CurrentLoadSize()) + + packet := lpb.Emit() + + assert.Equal(t, verif, string(packet)) + assert.Equal(t, 0, lpb.CurrentLoadSize()) + assert.Equal(t, 0, emitCount) // callback should not have been called + assert.Equal(t, 0, len(emittedBytes)) // callback should not have been called +} + +func TestLPAddAndEmitCallBack(t *testing.T) { batchSize := 1000 // Bytes capacity := 10000 // Bytes emitCount := 0 @@ -44,7 +128,7 @@ func TestAddAndEmitLineProtocolDefault(t *testing.T) { lps2emit := make([]string, 100) - b := NewLPBatcher( + lpb := NewLPBatcher( WithSize(batchSize), WithCapacity(capacity), WithReadyCallback(func() { @@ -54,7 +138,6 @@ func TestAddAndEmitLineProtocolDefault(t *testing.T) { emitCount++ emittedBytes = append(emittedBytes, b...) })) - fmt.Printf("\nDEBUG b: %+v\n", b) for n := range lps2emit { lps2emit[n] = fmt.Sprintf("lptest,foo=bar count=%di", n+1) @@ -63,22 +146,106 @@ func TestAddAndEmitLineProtocolDefault(t *testing.T) { for i, _ := range lps2emit { if i > 0 && i%10 == 0 { set := lps2emit[i-10 : i] - b.Add(set...) + lpb.Add(set...) } } // add lingering set - b.Add(lps2emit[len(lps2emit)-10:]...) + lpb.Add(lps2emit[len(lps2emit)-10:]...) verify := strings.Join(lps2emit, "\n") - assert.False(t, b.Ready()) + assert.False(t, lpb.Ready()) - _ = b.Emit() // flush any leftovers - to be collected in callback above + emittedBytes = append(emittedBytes, lpb.Emit()...) // drain any leftovers - expectCall := math.Ceil(float64(len(emittedBytes)) / float64(batchSize)) - expectReady := len(emittedBytes) / batchSize + expectCall := len(emittedBytes) / batchSize assert.Equal(t, int(expectCall), emitCount) assert.Equal(t, verify, string(emittedBytes)) - fmt.Printf("DEBUG expectedReady: %d, readyCalled: %d\n", expectReady, readyCalled) - assert.Equal(t, expectReady, readyCalled) + assert.Equal(t, expectCall, readyCalled) +} + +func TestLPBufferFlush(t *testing.T) { + size := 10 + capacity := size * 2 + + lpb := NewLPBatcher(WithSize(size), WithCapacity(capacity)) + testString := "0123456789ABCDEF\n" + + assert.Equal(t, 0, lpb.CurrentLoadSize()) + lpb.Add(testString) + assert.Equal(t, len(testString), lpb.CurrentLoadSize()) + packet := lpb.Flush() + assert.Equal(t, 0, lpb.CurrentLoadSize()) + assert.Equal(t, testString, string(packet)) +} + +func TestLPThreadSafety(t *testing.T) { + size := 80 + capacity := size * 2 + var wg sync.WaitGroup + emitCt := 0 + testString := "123456789ABCDEF\n" + + lpb := NewLPBatcher(WithSize(size), + WithCapacity(capacity), + WithEmitBytesCallback(func(ba []byte) { + emitCt++ + })) + + for range 25 { + wg.Add(1) + go func() { + defer wg.Done() + for range 4 { + lpb.Add(testString) + } + }() + } + + wg.Wait() + packet := lpb.Emit() + assert.Equal(t, 20, emitCt, "All bytes should have been emitted") + assert.Empty(t, packet, "Remaining bytes should be emitted correctly") +} + +func TestLPAddLargerThanSize(t *testing.T) { + // TODO review test -- appears Emit called too frequently + // Look for leading '\n' in lp.buffer + size := 64 + loadFactor := 10 + capacity := size * loadFactor + remainder := 3 + testString := "123456789ABCDEF\n" + stringSet := make([]string, ((size/len(testString))*loadFactor)+remainder) + stringSetByteCt := 0 + for ct := range stringSet { + stringSet[ct] = testString + stringSetByteCt += len([]byte(testString)) + } + + fmt.Printf("DEBUG len(stringSet)=%d\n", len(stringSet)) + fmt.Printf("DEBUG stringSetByteCount %d\n", stringSetByteCt) + fmt.Printf("DEBUG stringSet: %v\n", stringSet) + + emitCt := 0 + resultBuffer := make([]byte, 0) + lpb := NewLPBatcher( + WithSize(size), + WithCapacity(capacity), + WithEmitBytesCallback(func(ba []byte) { + emitCt++ + resultBuffer = append(resultBuffer, ba...) + })) + + lpb.Add(stringSet...) + + results := strings.Split(string(resultBuffer), "\n") + resultsBytes := len(resultBuffer) + fmt.Printf("DEBUG emitCt: %d\n", emitCt) + fmt.Printf("DEBUG resultsBytes: %d\n", resultsBytes) + fmt.Printf("DEBUG len(results): %d\n", len(results)) + fmt.Printf("DEBUG results: %s\n", results) + fmt.Printf("DEBUG lpb.CurrentLoadSize: %d\n", lpb.CurrentLoadSize()) + fmt.Printf("DEBUG lpb.buffer #%s#\n", string(lpb.buffer)) + }