Skip to content

Commit

Permalink
feat: (WIP) LPBatching - refactor, tidy, and additional tests
Browse files Browse the repository at this point in the history
  • Loading branch information
karel-rehor committed Oct 25, 2024
1 parent 6ca9e69 commit 5824190
Show file tree
Hide file tree
Showing 5 changed files with 286 additions and 192 deletions.
127 changes: 30 additions & 97 deletions influxdb3/batching/batcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,98 +24,71 @@ THE SOFTWARE.
package batching

import (
"fmt"
"log/slog"
"sync"

"github.com/InfluxCommunity/influxdb3-go/influxdb3"
)

const (
BatchUnknown = iota
BatchPoints = iota
BatchLP = iota
)

type Option func(*interface{})

// Option to adapt properties of a batcher
type PBOption func(*Batcher)
type Option func(*interface{})

// WithSize changes the batch-size emitted by the batcher
/*func WithSize(size int) PBOption {
return func(b *Batcher) {
b.size = size
}
}*/

// With the standard Batcher the implied unit is a Point
// With the LPBatcher the implied unit is a byte
func WithSize(size int) Option {
return func(b *interface{}) {
if bb, bok := (*b).(*Batcher); bok {
bb.size = size
} else if lb, lok := (*b).(*LPBatcher); lok {
lb.size = size
} else {
slog.Warn("Failed to match Batcher in WithCapacityOpt. Value not set.")
slog.Warn("Failed to match Batcher type in WithSize. Value not set.")
}

Check warning on line 48 in influxdb3/batching/batcher.go

View check run for this annotation

Codecov / codecov/patch

influxdb3/batching/batcher.go#L47-L48

Added lines #L47 - L48 were not covered by tests
}
}

// WithCapacity changes the initial capacity of the points buffer
/*func WithCapacity(capacity int) PBOption {
return func(b *Batcher) {
b.capacity = capacity
}
}*/

// WithCapacity changes the initial capacity of the internal buffer
// With the standard Batcher implied unit is a Point
// With the LPBatcher the implied unit is a byte
func WithCapacity(capacity int) Option {
return func(b *interface{}) {
if bb, bok := (*b).(*Batcher); bok {
bb.capacity = capacity
} else if lb, lok := (*b).(*LPBatcher); lok {
lb.capacity = capacity
} else {
slog.Warn("Failed to match Batcher in WithCapacityOpt. Value not set.")
slog.Warn("Failed to match Batcher type in WithCapacity. Value not set.")
}

Check warning on line 63 in influxdb3/batching/batcher.go

View check run for this annotation

Codecov / codecov/patch

influxdb3/batching/batcher.go#L62-L63

Added lines #L62 - L63 were not covered by tests
}
}

// WithReadyCallback sets the function called when a new batch is ready. The
// batcher will wait for the callback to finish, so please return as fast as
// possible and move long-running processing to a go-routine.
/*func WithReadyCallback(f func()) PBOption {
return func(b *Batcher) {
b.callbackReady = f
}
}*/

func WithReadyCallback(f func()) Option {
return func(b *interface{}) {
if bb, bok := (*b).(*Batcher); bok {
bb.callbackReady = f
} else if lb, lok := (*b).(*LPBatcher); lok {
lb.callbackReady = f
} else {
slog.Warn("Failed to match Batcher in WithReadyCallbackOpt. Callback not set.")
slog.Warn("Failed to match Batcher type in WithReadyCallback. Callback not set.")
}

Check warning on line 78 in influxdb3/batching/batcher.go

View check run for this annotation

Codecov / codecov/patch

influxdb3/batching/batcher.go#L77-L78

Added lines #L77 - L78 were not covered by tests
}

}

// WithEmitPointsCallback sets the function called when a new batch is ready with the
// WithEmitCallback sets the function called when a new batch is ready with the
// batch of points. The batcher will wait for the callback to finish, so please
// return as fast as possible and move long-running processing to a go-routine.
/*func WithEmitPointsCallback(f func([]*influxdb3.Point)) PBOption {
return func(b *Batcher) {
b.callbackEmit = f
}
}*/

func WithEmitPointsCallback(f func([]*influxdb3.Point)) Option {
func WithEmitCallback(f func([]*influxdb3.Point)) Option {
return func(b *interface{}) {
if bb, bok := (*b).(*Batcher); bok {
bb.callbackEmit = f
} else {
slog.Warn("Failed to match Batcher in WithEmitPointsCallbackOpt. Callback not set.")
slog.Warn("Failed to match type Batcher in WithEmitPointsCallback. Callback not set.")
}

Check warning on line 92 in influxdb3/batching/batcher.go

View check run for this annotation

Codecov / codecov/patch

influxdb3/batching/batcher.go#L91-L92

Added lines #L91 - L92 were not covered by tests
}
}
Expand Down Expand Up @@ -175,7 +148,6 @@ func (b *Batcher) Add(p ...*influxdb3.Point) {

// Add the point
b.points = append(b.points, p...)
//b.addToBuffer(interfaces...)

// Call callbacks if a new batch is ready
for b.isReady() {
Expand All @@ -186,6 +158,13 @@ func (b *Batcher) Add(p ...*influxdb3.Point) {
b.callbackEmit(b.emitPoints())
} else {
// no emitter callback
if b.CurrentLoadSize() >= (b.capacity - b.size) {
slog.Warn(
fmt.Sprintf("Batcher is ready, but no callbackEmit is available. "+
"Batcher load is %d points waiting to be emitted.",
b.CurrentLoadSize()),
)
}
break
}
}
Expand Down Expand Up @@ -221,61 +200,15 @@ func (b *Batcher) emitPoints() []*influxdb3.Point {
return points
}

/*
func (b *Batcher) addToBuffer(items ...*interface{}) {
b.Lock()
defer b.Unlock()
b.buffer = append(b.buffer, items...)
//Call callbacks if a new batch is ready
if b.isReady() {
if b.callbackReady != nil {
b.callbackReady()
}
if b.callbackEmit != nil {
// ??? and if its line protocol?
b.callbackEmit(b.emitPoints())
}
}
// Flush drains all points even if buffer currently larger than size.
// It does not call the callbackEmit method
func (b *Batcher) Flush() []*influxdb3.Point {
slog.Info(fmt.Sprintf("Flushing all points (%d) from buffer.", b.CurrentLoadSize()))
points := b.points
b.points = b.points[len(points):]
return points
}
*/
/*
func (b *Batcher) AddPoints(p ...*influxdb3.Point) error {
//b.Lock()
//defer b.Unlock()
if b.idiom != BatchIdiomPoints {
if len(b.buffer) == 0 {
b.idiom = BatchIdiomPoints
} else {
return errors.New("this batcher does not support the Point idiom")
}
}
interfaces := make([]*interface{}, len(p))
// Add the point
for i, point := range p {
var iface interface{} = point
interfaces[i] = &iface
}
//b.points = append(b.points, p...)
b.addToBuffer(interfaces...)
return nil
} */

/*
func (b *Batcher) AddLP(lines ...string) error {
if b.idiom != BatchIdiomLP {
if len(b.buffer) == 0 {
b.idiom = BatchIdiomLP
} else {
return errors.New("this batcher does not support the Line Protocol (LP) idiom")
}
}
interfaces := make([]*interface{}, len(lines))
for n, line := range lines {
var iface interface{} = line
interfaces[n] = &iface
}
b.addToBuffer(interfaces...)
return nil
} */
func (b *Batcher) CurrentLoadSize() int {
return len(b.points)
}
81 changes: 32 additions & 49 deletions influxdb3/batching/batcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,11 @@ package batching

import (
"fmt"
"reflect"
"sync"
"testing"
"time"

"github.com/InfluxCommunity/influxdb3-go/influxdb3"
"github.com/influxdata/line-protocol/v2/lineprotocol"
"github.com/stretchr/testify/assert"
)

Expand All @@ -55,46 +53,20 @@ func TestCustomValues(t *testing.T) {
assert.Equal(t, capacity, cap(b.points))
}

func TestAddAndDefaultEmitPointDefault(t *testing.T) {
batchSize := 10
//emitted := false
points2emit := make([]*influxdb3.Point, batchSize)

b := NewBatcher(WithSize(batchSize))
for n := range batchSize / 2 {
points2emit[n] = influxdb3.NewPointWithMeasurement("pointtest").
SetTag("foo", "bar").
SetIntegerField("count", int64(n+1))
}
fmt.Printf("\nDEBUG points2emit[0]: %v\n", reflect.TypeOf(points2emit[0]))
b.Add(points2emit...)
// force Emit
result := b.Emit()
fmt.Printf("\nDEBUG Inspect result %+v", result)
fmt.Printf("\nDEBUG Inspect result %+v", reflect.TypeOf(result[0]))
lp, err := result[0].MarshalBinary(lineprotocol.Millisecond)
if err != nil {
fmt.Printf("err: %s\n", err)
}
fmt.Printf("\nDEBUG Inspect lp %s", string(lp))
}

func TestAddAndCallBackEmitPoint(t *testing.T) {
func TestAddAndCallBackEmit(t *testing.T) {
batchSize := 5
emitted := false
var emittedPoints []*influxdb3.Point

b := NewBatcher(
WithSize(batchSize),
WithEmitPointsCallback(func(points []*influxdb3.Point) {
fmt.Printf("callback called with %v\n", points)
WithEmitCallback(func(points []*influxdb3.Point) {
emitted = true
emittedPoints = points
}),
)

for range batchSize {
fmt.Printf("Adding point")
b.Add(&influxdb3.Point{})
}

Expand Down Expand Up @@ -139,7 +111,7 @@ func TestPartialEmit(t *testing.T) {

b := NewBatcher(
WithSize(batchSize),
WithEmitPointsCallback(func(points []*influxdb3.Point) {
WithEmitCallback(func(points []*influxdb3.Point) {
emitted = true
}),
)
Expand All @@ -159,7 +131,7 @@ func TestThreadSafety(t *testing.T) {
emits := 0
b := NewBatcher(
WithSize(batchSize),
WithEmitPointsCallback(func(points []*influxdb3.Point) {
WithEmitCallback(func(points []*influxdb3.Point) {
emits++
}),
)
Expand All @@ -184,37 +156,48 @@ func TestThreadSafety(t *testing.T) {
func TestAddLargerThanSize(t *testing.T) {
batchSize := 5
emitCt := 0
pointSet := make([]*influxdb3.Point, (batchSize*10)+3)
loadFactor := 10
remainder := 3
pointSet := make([]*influxdb3.Point, (batchSize*loadFactor)+remainder)
for ct := range pointSet {
pointSet[ct] = influxdb3.NewPoint("test",
map[string]string{"foo": "bar"},
map[string]interface{}{"count": ct + 1},
time.Now())
}

for _, v := range pointSet {
lp, _ := v.MarshalBinary(lineprotocol.Millisecond)
fmt.Printf("%s", string(lp))
}
subSet := pointSet[:5]
fmt.Printf("DEBUG subset: %v\n", subSet)
resultSet := make([]*influxdb3.Point, 0)
b := NewBatcher(WithSize(batchSize),
WithEmitPointsCallback(func(points []*influxdb3.Point) {
WithCapacity(batchSize*3),
WithEmitCallback(func(points []*influxdb3.Point) {
resultSet = append(resultSet, points...)
emitCt++
}))

b.Add(pointSet...)
fmt.Printf("\nDEBUG emitCt: %d\n", emitCt)
fmt.Printf("\nDEBUG len(resultSet): %d\n", len(resultSet))
fmt.Printf("\nDEBUG len(b.points): %d\n", len(b.points))
fmt.Printf("\nDEBUG dump resultSet:\n")
for _, rp := range resultSet {
lp, _ := rp.MarshalBinary(lineprotocol.Millisecond)
fmt.Printf("%s", string(lp))
expectedCt := len(pointSet) / batchSize
assert.Equal(t, expectedCt, emitCt)
assert.Equal(t, loadFactor*batchSize, len(resultSet))
fmt.Printf("DEBUG resultSet %d\n", len(resultSet))
assert.Equal(t, remainder, len(b.points))
assert.Equal(t, pointSet[:len(pointSet)-remainder], resultSet)
}

func TestFlush(t *testing.T) {
batchSize := 5
loadFactor := 3
pointSet := make([]*influxdb3.Point, batchSize*loadFactor)
for ct := range pointSet {
pointSet[ct] = influxdb3.NewPoint("test",
map[string]string{"foo": "bar"},
map[string]interface{}{"count": ct + 1},
time.Now())
}
fmt.Println()
//assert.Equal(t, pointSet, resultSet)

b := NewBatcher(WithSize(batchSize), WithCapacity(batchSize*2))
b.Add(pointSet...)
assert.Equal(t, batchSize*loadFactor, b.CurrentLoadSize())
flushed := b.Flush()
assert.Equal(t, batchSize*loadFactor, len(flushed))
assert.Equal(t, 0, b.CurrentLoadSize())
}
2 changes: 1 addition & 1 deletion influxdb3/batching/example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ func Example_batcher() {
b = batching.NewBatcher(
batching.WithSize(5),
batching.WithReadyCallback(func() { fmt.Println("ready") }),
batching.WithEmitPointsCallback(func(points []*influxdb3.Point) {
batching.WithEmitCallback(func(points []*influxdb3.Point) {
err = client.WritePoints(context.Background(), points)
if err != nil {
log.Fatal(err)
Expand Down
Loading

0 comments on commit 5824190

Please sign in to comment.