Skip to content

Commit

Permalink
chore: refactoring
Browse files Browse the repository at this point in the history
  • Loading branch information
karel-rehor committed Nov 1, 2024
1 parent d36d020 commit 2097a0a
Show file tree
Hide file tree
Showing 5 changed files with 118 additions and 102 deletions.
53 changes: 49 additions & 4 deletions influxdb3/batching/batcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,55 @@ const DefaultBatchSize = 1000
// DefaultCapacity is the default initial capacity of the point buffer
const DefaultCapacity = 2 * DefaultBatchSize

type Emittable interface {
Size(int) // setsize
Capacity(int) // set capacity
ReadyCallback(func()) // ready Callback
}

type PointEmittable interface {
Emittable
EmitCallback(func([]*influxdb3.Point)) // callback for emitting points
}

type Option func(PointEmittable)

// WithSize changes the batch-size emitted by the batcher
// With the standard Batcher the implied unit is a Point
// With the LPBatcher the implied unit is a byte
func WithSize(size int) Option {
return func(b PointEmittable) {
b.Size(size)
}
}

// WithCapacity changes the initial capacity of the internal buffer
// With the standard Batcher implied unit is a Point
// With the LPBatcher the implied unit is a byte
func WithCapacity(capacity int) Option {
return func(b PointEmittable) {
b.Capacity(capacity)
}
}

// WithReadyCallback sets the function called when a new batch is ready. The
// batcher will wait for the callback to finish, so please return as fast as
// possible and move long-running processing to a go-routine.
func WithReadyCallback(f func()) Option {
return func(b PointEmittable) {
b.ReadyCallback(f)
}
}

// WithEmitCallback sets the function called when a new batch is ready with the
// batch of points. The batcher will wait for the callback to finish, so please
// return as fast as possible and move long-running processing to a go-routine.
func WithEmitCallback(f func([]*influxdb3.Point)) Option {
return func(b PointEmittable) {
b.EmitCallback(f)
}
}

// Batcher collects points and emits them as batches
type Batcher struct {
size int
Expand Down Expand Up @@ -64,10 +113,6 @@ func (b *Batcher) EmitCallback(f func([]*influxdb3.Point)) {
b.callbackEmit = f
}

func (b *Batcher) EmitBytesCallback(f func([]byte)) {
slog.Warn("Basic Batcher does not support bytes emitting functionality")
}

// NewBatcher creates and initializes a new Batcher instance applying the
// specified options. By default, a batch-size is DefaultBatchSize and the
// initial capacity is DefaultCapacity.
Expand Down
59 changes: 0 additions & 59 deletions influxdb3/batching/batcher_iface.go

This file was deleted.

48 changes: 41 additions & 7 deletions influxdb3/batching/lp_batcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,51 @@ import (
"fmt"
"log/slog"
"sync"

"github.com/InfluxCommunity/influxdb3-go/influxdb3"
)

const DefaultBufferSize = 100000
const DefaultBufferCapacity = DefaultBufferSize * 2

type ByteEmittable interface {
Emittable
EmitBytesCallback(func([]byte)) // callback for emitting bytes
}

type LPOption func(ByteEmittable)

// WithSize changes the batch-size emitted by the batcher
// With the standard Batcher the implied unit is a Point
// With the LPBatcher the implied unit is a byte
func WithBufferSize(size int) LPOption {
return func(b ByteEmittable) {
b.Size(size)
}
}

// WithCapacity changes the initial capacity of the internal buffer
// With the standard Batcher implied unit is a Point
// With the LPBatcher the implied unit is a byte
func WithBufferCapacity(capacity int) LPOption {
return func(b ByteEmittable) {
b.Capacity(capacity)
}
}

// WithReadyCallback sets the function called when a new batch is ready. The
// batcher will wait for the callback to finish, so please return as fast as
// possible and move long-running processing to a go-routine.
func WithByteEmitReadyCallback(f func()) LPOption {
return func(b ByteEmittable) {
b.ReadyCallback(f)
}
}

func WithEmitBytesCallback(f func([]byte)) LPOption {
return func(b ByteEmittable) {
b.EmitBytesCallback(f)
}
}

type LPBatcher struct {
size int
capacity int
Expand All @@ -35,15 +73,11 @@ func (lpb *LPBatcher) ReadyCallback(f func()) {
lpb.callbackReady = f
}

func (lpb *LPBatcher) EmitCallback(f func([]*influxdb3.Point)) {
slog.Warn("EmitCallback([]*influxbb3.Point) not supported in LPBatcher")
}

func (lpb *LPBatcher) EmitBytesCallback(f func([]byte)) {
lpb.callbackByteEmit = f
}

func NewLPBatcher(options ...Option) *LPBatcher {
func NewLPBatcher(options ...LPOption) *LPBatcher {
lpb := &LPBatcher{
size: DefaultBufferSize,
capacity: DefaultBufferCapacity,
Expand Down
42 changes: 21 additions & 21 deletions influxdb3/batching/lp_batcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ func TestLPCustomValues(t *testing.T) {
capacity := size * 2

lpb := NewLPBatcher(
WithSize(size),
WithCapacity(capacity),
WithBufferSize(size),
WithBufferCapacity(capacity),
)

assert.Equal(t, size, lpb.size)
Expand All @@ -41,8 +41,8 @@ func TestLPBatcherCreate(t *testing.T) {
var emittedBytes []byte

l := NewLPBatcher(
WithSize(size),
WithCapacity(capacity),
WithBufferSize(size),
WithBufferCapacity(capacity),
WithEmitBytesCallback(func(ba []byte) {
emitted = true
emittedBytes = ba
Expand All @@ -60,7 +60,7 @@ func TestLPBatcherCreate(t *testing.T) {
func TestLPReady(t *testing.T) {
size := 10
capacity := size * 2
lpb := NewLPBatcher(WithSize(size), WithCapacity(capacity))
lpb := NewLPBatcher(WithBufferSize(size), WithBufferCapacity(capacity))
lpb.Add("0123456789ABCDEF")

assert.True(t, lpb.Ready(), "LPBatcher should be ready when the batch size is reached")
Expand All @@ -71,9 +71,9 @@ func TestLPReadyCallback(t *testing.T) {
capacity := size * 2
readyCalled := false

lpb := NewLPBatcher(WithSize(size),
WithCapacity(capacity),
WithReadyCallback(func() {
lpb := NewLPBatcher(WithBufferSize(size),
WithBufferCapacity(capacity),
WithByteEmitReadyCallback(func() {
readyCalled = true
}))

Expand All @@ -86,7 +86,7 @@ func TestEmitEmptyBatcher(t *testing.T) {
size := 256
capacity := size * 2

lpb := NewLPBatcher(WithSize(size), WithCapacity(capacity))
lpb := NewLPBatcher(WithBufferSize(size), WithBufferCapacity(capacity))

results := lpb.Emit()

Expand All @@ -97,7 +97,7 @@ func TestAddLineAppendsLF(t *testing.T) {
size := 256
capacity := size * 2

lpb := NewLPBatcher(WithSize(size), WithCapacity(capacity))
lpb := NewLPBatcher(WithBufferSize(size), WithBufferCapacity(capacity))
lines := []string{
"cpu,location=roswell,id=R2D2 fVal=3.14,iVal=42i",
"cpu,location=dyatlov,id=C3PO fVal=2.71,iVal=21i",
Expand All @@ -111,7 +111,7 @@ func TestAddLineAppendsLF(t *testing.T) {
func TestAddLineAppendsNoLFWhenPresent(t *testing.T) {
size := 256
capacity := size * 2
lpb := NewLPBatcher(WithSize(size), WithCapacity(capacity))
lpb := NewLPBatcher(WithBufferSize(size), WithBufferCapacity(capacity))
lines := []string{
"cpu,location=roswell,id=R2D2 fVal=3.14,iVal=42i\n",
"cpu,location=dyatlov,id=C3PO fVal=2.71,iVal=21i\n",
Expand Down Expand Up @@ -140,8 +140,8 @@ func TestLPAddAndPartialEmit(t *testing.T) {
verify += "\n"

lpb := NewLPBatcher(
WithSize(size),
WithCapacity(capacity),
WithBufferSize(size),
WithBufferCapacity(capacity),
WithEmitBytesCallback(func(ba []byte) {
emitCount++
emittedBytes = append(emittedBytes, ba...)
Expand All @@ -168,9 +168,9 @@ func TestLPAddAndEmitCallBack(t *testing.T) {
lps2emit := make([]string, 100)

lpb := NewLPBatcher(
WithSize(batchSize),
WithCapacity(capacity),
WithReadyCallback(func() {
WithBufferSize(batchSize),
WithBufferCapacity(capacity),
WithByteEmitReadyCallback(func() {
readyCalled++
}),
WithEmitBytesCallback(func(b []byte) {
Expand Down Expand Up @@ -208,7 +208,7 @@ func TestLPBufferFlush(t *testing.T) {
size := 10
capacity := size * 2

lpb := NewLPBatcher(WithSize(size), WithCapacity(capacity))
lpb := NewLPBatcher(WithBufferSize(size), WithBufferCapacity(capacity))
testString := "0123456789ABCDEF\n"

assert.Equal(t, 0, lpb.CurrentLoadSize())
Expand All @@ -226,8 +226,8 @@ func TestLPThreadSafety(t *testing.T) {
emitCt := 0
testString := "123456789ABCDEF\n"

lpb := NewLPBatcher(WithSize(size),
WithCapacity(capacity),
lpb := NewLPBatcher(WithBufferSize(size),
WithBufferCapacity(capacity),
WithEmitBytesCallback(func(ba []byte) {
emitCt++
}))
Expand Down Expand Up @@ -264,8 +264,8 @@ func TestLPAddLargerThanSize(t *testing.T) {
emitCt := 0
resultBuffer := make([]byte, 0)
lpb := NewLPBatcher(
WithSize(batchSize),
WithCapacity(capacity),
WithBufferSize(batchSize),
WithBufferCapacity(capacity),
WithEmitBytesCallback(func(ba []byte) {
emitCt++
resultBuffer = append(resultBuffer, ba...)
Expand Down
18 changes: 7 additions & 11 deletions influxdb3/client_e2e_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,3 @@
//go:build e2e
// +build e2e

/*
The MIT License
Expand Down Expand Up @@ -369,10 +366,10 @@ func TestEscapedStringValues(t *testing.T) {
}
}

func TestBatchLP(t *testing.T) {
func TestLPBatcher(t *testing.T) {
SkipCheck(t)
// TODO add asserts

rnd := rand.New(rand.NewSource(time.Now().UnixNano()))
url := os.Getenv("TESTING_INFLUXDB_URL")
token := os.Getenv("TESTING_INFLUXDB_TOKEN")
database := os.Getenv("TESTING_INFLUXDB_DATABASE")
Expand All @@ -390,11 +387,10 @@ func TestBatchLP(t *testing.T) {
ids := []string{"R2D2", "C3PO", "ROBBIE"}
lines := make([]string, 0)
now := time.Now().UnixMilli()
//rand.Seed(now.UnixNano())
for n := range 2000 {
lines = append(lines, fmt.Sprintf(dataTemplate, locations[n%len(locations)],
ids[n%len(ids)],
(rand.Float64()*100)-50.0, n+1, now-int64(n*1000)))
(rnd.Float64()*100)-50.0, n+1, now-int64(n*1000)))
if n%2 == 0 {
lines[n] = lines[n] + "\n" // verify appending LF
}
Expand All @@ -408,9 +404,9 @@ func TestBatchLP(t *testing.T) {
emitCt := 0
results := make([]byte, 0)
lpb := batching.NewLPBatcher(
batching.WithSize(size),
batching.WithCapacity(capacity),
batching.WithReadyCallback(func() {
batching.WithBufferSize(size),
batching.WithBufferCapacity(capacity),
batching.WithByteEmitReadyCallback(func() {
readyCt++
}),
batching.WithEmitBytesCallback(func(ba []byte) {
Expand All @@ -437,7 +433,6 @@ func TestBatchLP(t *testing.T) {

fmt.Printf("DEBUG readyCt: %d\n", readyCt)
fmt.Printf("DEBUG emitCt: %d\n", emitCt)
fmt.Printf("DEBUG results: %+v\n", string(results))
fmt.Printf("DEBUG lpb.buffer: %+v\n", lpb.CurrentLoadSize())
fmt.Printf("DEBUG getting rest\n")

Expand All @@ -447,6 +442,7 @@ func TestBatchLP(t *testing.T) {
fmt.Printf("ERROR %v\n", err)
}
results = append(results, leftover...)
fmt.Printf("DEBUG results: %+v\n", string(results))

fmt.Printf("DEBUG last emit to string: %s\n", string(lpb.Emit()))
fmt.Printf("DEBUG loadsize %d\n", lpb.CurrentLoadSize())
Expand Down

0 comments on commit 2097a0a

Please sign in to comment.