diff --git a/CHANGELOG.md b/CHANGELOG.md index 7f42da4..561d923 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,9 @@ ## 0.14.0 [unreleased] +### Features + +1. [#112](https://github.com/InfluxCommunity/influxdb3-go/pull/112): Adds `LPBatcher` for lineprotocol batching following the model of the Point `Batcher`. + ### Bug Fixes 1. [#113](https://github.com/InfluxCommunity/influxdb3-go/pull/113): Honor struct tags on WriteData, avoid panic for unexported fields diff --git a/examples/LPBatching/lpBatching.go b/examples/LPBatching/lpBatching.go new file mode 100644 index 0000000..cd4a44d --- /dev/null +++ b/examples/LPBatching/lpBatching.go @@ -0,0 +1,143 @@ +package main + +import ( + "context" + "fmt" + "log/slog" + "math/rand" + "os" + "text/tabwriter" + "time" + + "github.com/InfluxCommunity/influxdb3-go/influxdb3" + "github.com/InfluxCommunity/influxdb3-go/influxdb3/batching" + "github.com/apache/arrow/go/v15/arrow" +) + +const LineCount = 100 + +func main() { + // PREPARE DATA + // Create a random number generator + rnd := rand.New(rand.NewSource(time.Now().UnixNano())) + + // Retrieve credentials from environment variables. + url := os.Getenv("INFLUX_URL") + token := os.Getenv("INFLUX_TOKEN") + database := os.Getenv("INFLUX_DATABASE") + + // unmanned aquatic surface vehicle as source + dataTemplate := "uasv,id=%s,location=%s speed=%f,bearing=%f,ticks=%di %d" + ids := []string{"orca", "flipper", "gaston"} + syncLocations := []string{"nice", "split", "goa", "cancun"} + + // Instantiate a client using your credentials. + client, err := influxdb3.New(influxdb3.ClientConfig{ + Host: url, + Token: token, + Database: database, + }) + + if err != nil { + panic(err) + } + + defer func(client *influxdb3.Client) { + err := client.Close() + if err != nil { + slog.Error("Error closing client", err) + } + }(client) + + // SYNC WRITE BATCHES + // create a new Line Protocol Batcher + syncLpb := batching.NewLPBatcher(batching.WithBufferSize(4096)) // Set buffer size + t := time.Now().Add(-LineCount * time.Second) + + // add new data into the batcher + for range LineCount { + syncLpb.Add(fmt.Sprintf(dataTemplate, + ids[rnd.Intn(len(ids))], + syncLocations[rnd.Intn(len(syncLocations))], + rnd.Float64()*100, + rnd.Float64()*360, + rnd.Intn(100), + t.UnixNano(), + )) + t = t.Add(time.Second) + + // if ready state reached, emit a batch + if syncLpb.Ready() { + err = client.Write(context.Background(), syncLpb.Emit()) + if err != nil { + slog.Error(err.Error()) + } + } + } + + // Write final batch to client + err = client.Write(context.Background(), syncLpb.Emit()) + if err != nil { + slog.Error(err.Error()) + } + fmt.Printf("Sync Writes Done. %d Bytes remaining in batcher buffer\n", + syncLpb.CurrentLoadSize()) + + // ASYNC WRITE BATCHES + asyncLpb := batching.NewLPBatcher(batching.WithBufferSize(4096), // Set buffer size + batching.WithByteEmitReadyCallback(func() { fmt.Println("|-- ready to emit -->") }), // Set ready callback + batching.WithEmitBytesCallback(func(bytes []byte) { // Set Callback to handle emitted bytes + err = client.Write(context.Background(), bytes) + if err != nil { + slog.Error(err.Error()) + } + })) + + asyncLocations := []string{"ibiza", "dubai", "phuket", "maui"} + t = time.Now().Add(-LineCount * time.Second) + + // Add new data to Batcher + for range LineCount { + asyncLpb.Add(fmt.Sprintf(dataTemplate, ids[rnd.Intn(len(ids))], + asyncLocations[rnd.Intn(len(asyncLocations))], + rnd.Float64()*100, + rnd.Float64()*360, + rnd.Intn(100), + t.UnixNano())) + t = t.Add(time.Second) + } + + // Write the remaining batch records to the client + err = client.Write(context.Background(), asyncLpb.Emit()) + if err != nil { + slog.Error(err.Error()) + } + fmt.Printf("Async Writes Done. %d Bytes remaining in batcher buffer\n", + asyncLpb.CurrentLoadSize()) + + // Prepare an SQL query + query := ` + SELECT * + FROM uasv + WHERE time >= now() - interval '5 minutes' + AND location IN ('cancun', 'dubai', 'ibiza') + ORDER BY time DESC + ` + iterator, err := client.Query(context.Background(), query) + if err != nil { + slog.Error(err.Error()) + } + tw := tabwriter.NewWriter(os.Stdout, 1, 1, 1, ' ', 0) + defer tw.Flush() + + fmt.Fprintln(tw, "\nTime\tid\tlocation\tspeed\tbearing\tticks") + for iterator.Next() { + value := iterator.Value() + t := (value["time"].(arrow.Timestamp)).ToTime(arrow.Nanosecond).Format(time.RFC3339) + _, err := fmt.Fprintf(tw, "%v\t%s\t%s\t%.1f\t%.2f\t%d\n", t, + value["id"], value["location"], value["speed"], value["bearing"], value["ticks"]) + if err != nil { + slog.Error(err.Error()) + } + } +} diff --git a/examples/README.md b/examples/README.md index 8010c0f..4f1815f 100644 --- a/examples/README.md +++ b/examples/README.md @@ -3,4 +3,5 @@ - [Write and query data](Basic/basic.go) - A complete Go example that demonstrates the different ways of writing data, and then queries your data stored in InfluxDB v3 (formerly InfluxDB IOx). - [Downsampling](Downsampling/downsampling.go) - A complete Go example that uses a downsampling query and then writes downsampled data back to a different table. - [HTTP Error Handling](HTTPErrorHandled/httpErrorHandled.go) - A complete Go example for reading HTTP headers in case of an server error occurs. -- [Batching write](Batching/batching.go) - A complete Go example that demonstrates how to write data in batches. +- [Batching write](Batching/batching.go) - A complete Go example that demonstrates how to write Point data in batches. +- [Line protocol batching write](LPBatchin/lpBatching.go) - A complete GO example demonstrating how to write line protocol data in batches. diff --git a/influxdb3/batching/batcher.go b/influxdb3/batching/batcher.go index 8e01f2a..c9ff2ce 100644 --- a/influxdb3/batching/batcher.go +++ b/influxdb3/batching/batcher.go @@ -24,25 +24,48 @@ THE SOFTWARE. package batching import ( + "fmt" + "log/slog" "sync" "github.com/InfluxCommunity/influxdb3-go/influxdb3" ) -// Option to adapt properties of a batcher -type Option func(*Batcher) +// DefaultBatchSize is the default number of points emitted +const DefaultBatchSize = 1000 + +// DefaultCapacity is the default initial capacity of the point buffer +const DefaultCapacity = 2 * DefaultBatchSize + +// Emittable provides the base for any type +// that will collect and then emit data upon +// reaching a ready state. +type Emittable interface { + SetSize(s int) // setsize + SetCapacity(c int) // set capacity + SetReadyCallback(rcb func()) // ready Callback +} + +// PointEmittable provides the basis for any type emitting +// Point arrays as []*influxdb3.Point +type PointEmittable interface { + Emittable + SetEmitCallback(epcb func([]*influxdb3.Point)) // callback for emitting points +} + +type Option func(PointEmittable) // WithSize changes the batch-size emitted by the batcher func WithSize(size int) Option { - return func(b *Batcher) { - b.size = size + return func(b PointEmittable) { + b.SetSize(size) } } -// WithCapacity changes the initial capacity of the points buffer +// WithCapacity changes the initial capacity of the internal buffer func WithCapacity(capacity int) Option { - return func(b *Batcher) { - b.capacity = capacity + return func(b PointEmittable) { + b.SetCapacity(capacity) } } @@ -50,8 +73,8 @@ func WithCapacity(capacity int) Option { // batcher will wait for the callback to finish, so please return as fast as // possible and move long-running processing to a go-routine. func WithReadyCallback(f func()) Option { - return func(b *Batcher) { - b.callbackReady = f + return func(b PointEmittable) { + b.SetReadyCallback(f) } } @@ -59,22 +82,15 @@ func WithReadyCallback(f func()) Option { // batch of points. The batcher will wait for the callback to finish, so please // return as fast as possible and move long-running processing to a go-routine. func WithEmitCallback(f func([]*influxdb3.Point)) Option { - return func(b *Batcher) { - b.callbackEmit = f + return func(b PointEmittable) { + b.SetEmitCallback(f) } } -// DefaultBatchSize is the default number of points emitted -const DefaultBatchSize = 1000 - -// DefaultCapacity is the default initial capacity of the point buffer -const DefaultCapacity = 2 * DefaultBatchSize - // Batcher collects points and emits them as batches type Batcher struct { - size int - capacity int - + size int + capacity int callbackReady func() callbackEmit func([]*influxdb3.Point) @@ -82,6 +98,26 @@ type Batcher struct { sync.Mutex } +// SetSize sets the batch size. Units are Points. +func (b *Batcher) SetSize(s int) { + b.size = s +} + +// SetCapacity sets the initial Capacity of the internal []*influxdb3.Point buffer. +func (b *Batcher) SetCapacity(c int) { + b.capacity = c +} + +// SetReadyCallback sets the callbackReady function. +func (b *Batcher) SetReadyCallback(f func()) { + b.callbackReady = f +} + +// SetEmitCallback sets the callbackEmit function. +func (b *Batcher) SetEmitCallback(f func([]*influxdb3.Point)) { + b.callbackEmit = f +} + // NewBatcher creates and initializes a new Batcher instance applying the // specified options. By default, a batch-size is DefaultBatchSize and the // initial capacity is DefaultCapacity. @@ -97,7 +133,7 @@ func NewBatcher(options ...Option) *Batcher { o(b) } - // Setup the internal data + // setup internal data b.points = make([]*influxdb3.Point, 0, b.capacity) return b @@ -112,13 +148,22 @@ func (b *Batcher) Add(p ...*influxdb3.Point) { b.points = append(b.points, p...) // Call callbacks if a new batch is ready - if b.isReady() { + for b.isReady() { if b.callbackReady != nil { b.callbackReady() } - if b.callbackEmit != nil { - b.callbackEmit(b.emitPoints()) + if b.callbackEmit == nil { + // no emitter callback + if b.CurrentLoadSize() >= (b.capacity - b.size) { + slog.Warn( + fmt.Sprintf("Batcher is ready, but no callbackEmit is available. "+ + "Batcher load is %d points waiting to be emitted.", + b.CurrentLoadSize()), + ) + } + break } + b.callbackEmit(b.emitPoints()) } } @@ -151,3 +196,15 @@ func (b *Batcher) emitPoints() []*influxdb3.Point { return points } + +// Flush drains all points even if the internal buffer is currently larger than size. +// It does not call the callbackEmit method +func (b *Batcher) Flush() []*influxdb3.Point { + points := b.points + b.points = b.points[:0] + return points +} + +func (b *Batcher) CurrentLoadSize() int { + return len(b.points) +} diff --git a/influxdb3/batching/batcher_test.go b/influxdb3/batching/batcher_test.go index 5a0c523..c8c403c 100644 --- a/influxdb3/batching/batcher_test.go +++ b/influxdb3/batching/batcher_test.go @@ -25,6 +25,7 @@ package batching import ( "sync" "testing" + "time" "github.com/InfluxCommunity/influxdb3-go/influxdb3" "github.com/stretchr/testify/assert" @@ -51,7 +52,7 @@ func TestCustomValues(t *testing.T) { assert.Equal(t, capacity, cap(b.points)) } -func TestAddAndEmit(t *testing.T) { +func TestAddAndCallBackEmit(t *testing.T) { batchSize := 5 emitted := false var emittedPoints []*influxdb3.Point @@ -150,3 +151,51 @@ func TestThreadSafety(t *testing.T) { assert.Equal(t, 20, emits, "All points should have been emitted") assert.Empty(t, points, "Remaining points should be emitted correctly") } + +func TestAddLargerThanSize(t *testing.T) { + batchSize := 5 + emitCt := 0 + loadFactor := 10 + remainder := 3 + pointSet := make([]*influxdb3.Point, (batchSize*loadFactor)+remainder) + for ct := range pointSet { + pointSet[ct] = influxdb3.NewPoint("test", + map[string]string{"foo": "bar"}, + map[string]interface{}{"count": ct + 1}, + time.Now()) + } + + resultSet := make([]*influxdb3.Point, 0) + b := NewBatcher(WithSize(batchSize), + WithCapacity(batchSize*3), + WithEmitCallback(func(points []*influxdb3.Point) { + resultSet = append(resultSet, points...) + emitCt++ + })) + + b.Add(pointSet...) + expectedCt := len(pointSet) / batchSize + assert.Equal(t, expectedCt, emitCt) + assert.Len(t, resultSet, loadFactor*batchSize) + assert.Len(t, b.points, remainder) + assert.Equal(t, pointSet[:len(pointSet)-remainder], resultSet) +} + +func TestFlush(t *testing.T) { + batchSize := 5 + loadFactor := 3 + pointSet := make([]*influxdb3.Point, batchSize*loadFactor) + for ct := range pointSet { + pointSet[ct] = influxdb3.NewPoint("test", + map[string]string{"foo": "bar"}, + map[string]interface{}{"count": ct + 1}, + time.Now()) + } + + b := NewBatcher(WithSize(batchSize), WithCapacity(batchSize*2)) + b.Add(pointSet...) + assert.Equal(t, batchSize*loadFactor, b.CurrentLoadSize()) + flushed := b.Flush() + assert.Len(t, flushed, batchSize*loadFactor) + assert.Equal(t, 0, b.CurrentLoadSize()) +} diff --git a/influxdb3/batching/example_test.go b/influxdb3/batching/example_test.go index bed2883..421923c 100644 --- a/influxdb3/batching/example_test.go +++ b/influxdb3/batching/example_test.go @@ -123,3 +123,93 @@ func Example_batcher() { log.Fatal(err) } } + +func Example_lineProtocol_batcher() { + // Create a random number generator + rnd := rand.New(rand.NewSource(time.Now().UnixNano())) + + // initialize data + dataTemplate := "cpu,host=%s load=%.3f,reg=%d %d" + syncHosts := []string{"r2d2", "c3po", "robbie"} + const recordCount = 200 + + var wErr error + + // Instantiate a client using your credentials. + client, err := influxdb3.NewFromEnv() + if err != nil { + log.Fatal(err) + } + defer func(client *influxdb3.Client) { + err = client.Close() + if err != nil { + log.Fatal(err) + } + }(client) + + // SYNCHRONOUS USAGE + // create a new Line Protocol Batcher with a batch size of 4096 bytes + slpb := batching.NewLPBatcher(batching.WithBufferSize(4096)) // Set buffer size + + // Simulate delay of a second + t := time.Now().Add(-recordCount * time.Second) + + // create and emit records + for range recordCount { + slpb.Add(fmt.Sprintf(dataTemplate, + syncHosts[rnd.Intn(len(syncHosts))], + rnd.Float64()*150, + rnd.Intn(32), + t)) + + t = t.Add(time.Second) + + if slpb.Ready() { + wErr = client.Write(context.Background(), slpb.Emit()) + if wErr != nil { + log.Fatal(wErr) + } + } + } + + // write any remaining records in batcher to client + wErr = client.Write(context.Background(), slpb.Emit()) + if wErr != nil { + log.Fatal(wErr) + } + + // ASYNCHRONOUS USAGE + asyncHosts := []string{"Z80", "C64", "i8088"} + // create a new Line Protocol Batcher with a batch size of 4096 bytes + // ... a callback to handle when ready state reached and + // ... a callback to handle emits of bytes + alpb := batching.NewLPBatcher(batching.WithBufferSize(4096), + batching.WithByteEmitReadyCallback(func() { fmt.Println("ready") }), + batching.WithEmitBytesCallback(func(bytes []byte) { + wErr := client.Write(context.Background(), bytes) + if wErr != nil { + log.Fatal(wErr) + } + })) + + // Simulate delay of a second + t = time.Now().Add(-recordCount * time.Second) + + // create and add data to the batcher + for range recordCount { + alpb.Add(fmt.Sprintf(dataTemplate, + asyncHosts[rnd.Intn(len(asyncHosts))], + rnd.Float64()*150, + rnd.Intn(32), + t)) + + // update time + t = t.Add(time.Second) + } + + // write any remaining records in batcher to client + wErr = client.Write(context.Background(), alpb.Emit()) + if wErr != nil { + log.Fatal(wErr) + } +} diff --git a/influxdb3/batching/lp_batcher.go b/influxdb3/batching/lp_batcher.go new file mode 100644 index 0000000..9170e1e --- /dev/null +++ b/influxdb3/batching/lp_batcher.go @@ -0,0 +1,190 @@ +package batching + +import ( + "bytes" + "fmt" + "log/slog" + "sync" +) + +const DefaultByteBatchSize = 100000 +const DefaultBufferCapacity = DefaultByteBatchSize * 2 + +// ByteEmittable provides the basis for a type Emitting line protocol data +// as a byte array (i.e. []byte). +type ByteEmittable interface { + Emittable + SetEmitBytesCallback(ebcb func([]byte)) // callback for emitting bytes +} + +type LPOption func(ByteEmittable) + +// WithBufferSize changes the batch-size emitted by the LPbatcher +// The unit is byte +func WithBufferSize(size int) LPOption { + return func(b ByteEmittable) { + b.SetSize(size) + } +} + +// WithBufferCapacity changes the initial capacity of the internal buffer +// The unit is byte +func WithBufferCapacity(capacity int) LPOption { + return func(b ByteEmittable) { + b.SetCapacity(capacity) + } +} + +// WithByteEmitReadyCallback sets the function called when a new batch is ready. The +// batcher will wait for the callback to finish, so please return as fast as +// possible and move long-running processing to a go-routine. +func WithByteEmitReadyCallback(f func()) LPOption { + return func(b ByteEmittable) { + b.SetReadyCallback(f) + } +} + +// WithEmitBytesCallback sets the function called when a new batch is ready +// with the batch bytes. The batcher will wait for the callback to finish, so please +// return as quickly as possible and move any long-running processing to a go routine. +func WithEmitBytesCallback(f func([]byte)) LPOption { + return func(b ByteEmittable) { + b.SetEmitBytesCallback(f) + } +} + +// LPBatcher collects line protocol strings storing them +// to a byte buffer and then emitting them as []byte. +type LPBatcher struct { + size int + capacity int + + callbackReady func() + callbackByteEmit func([]byte) + + buffer []byte + sync.Mutex +} + +// SetSize sets the batch size of the batcher +func (lpb *LPBatcher) SetSize(s int) { + lpb.size = s +} + +// SetCapacity sets the initial capacity of the internal buffer +func (lpb *LPBatcher) SetCapacity(c int) { + lpb.capacity = c +} + +// SetReadyCallback sets the ReadyCallback function +func (lpb *LPBatcher) SetReadyCallback(f func()) { + lpb.callbackReady = f +} + +// SetEmitBytesCallback sets the callbackByteEmit function +func (lpb *LPBatcher) SetEmitBytesCallback(f func([]byte)) { + lpb.callbackByteEmit = f +} + +// NewLPBatcher creates and initializes a new LPBatcher instance +// applying the supplied options. By default a batch size is DefaultByteBatchSize +// and the initial capacity is the DefaultBufferCapacity. +func NewLPBatcher(options ...LPOption) *LPBatcher { + lpb := &LPBatcher{ + size: DefaultByteBatchSize, + capacity: DefaultBufferCapacity, + } + + // Apply the options + for _, o := range options { + o(lpb) + } + + // setup internal data + lpb.buffer = make([]byte, 0, lpb.capacity) + return lpb +} + +// Add lines to the buffer and call appropriate callbacks when +// the ready state is reached. +func (lpb *LPBatcher) Add(lines ...string) { + lpb.Lock() + defer lpb.Unlock() + + for _, line := range lines { + if len(line) != 0 { // ignore empty lines + lpb.buffer = append(lpb.buffer, line...) + if line[len(line)-1] != '\n' { // ensure newline demarcation + lpb.buffer = append(lpb.buffer, '\n') + } + } + } + + for lpb.isReady() { + if lpb.callbackReady != nil { + lpb.callbackReady() + } + if lpb.callbackByteEmit == nil { + // no emitter callback + if lpb.CurrentLoadSize() > (lpb.capacity - lpb.size) { + slog.Warn( + fmt.Sprintf("Batcher is ready, but no callbackByteEmit is available. "+ + "Batcher load is %d bytes waiting to be emitted.", + lpb.CurrentLoadSize()), + ) + } + break + } + lpb.callbackByteEmit(lpb.emitBytes()) + } +} + +// Ready reports when the ready state is reached. +func (lpb *LPBatcher) Ready() bool { + lpb.Lock() + defer lpb.Unlock() + return lpb.isReady() +} + +func (lpb *LPBatcher) isReady() bool { + return len(lpb.buffer) >= lpb.size +} + +// Emit returns a new batch of bytes with upto to the provided batch size +// depending on when the last newline character in the potential batch is met, or +// with all the remaining bytes. Please drain the bytes at the end of your +// processing to get the remaining bytes not filling up a batch. +func (lpb *LPBatcher) Emit() []byte { + lpb.Lock() + defer lpb.Unlock() + + return lpb.emitBytes() +} + +func (lpb *LPBatcher) emitBytes() []byte { + c := min(lpb.size, len(lpb.buffer)) + + if c == 0 { // i.e. buffer is empty + return lpb.buffer + } + + prepacket := lpb.buffer[:c] + lastLF := bytes.LastIndexByte(prepacket, '\n') + 1 + + packet := lpb.buffer[:lastLF] + lpb.buffer = lpb.buffer[len(packet):] + + return packet +} + +// Flush drains all bytes even if buffer currently larger than size +func (lpb *LPBatcher) Flush() []byte { + packet := lpb.buffer + lpb.buffer = lpb.buffer[:0] + return packet +} + +// CurrentLoadSize returns the current size of the internal buffer +func (lpb *LPBatcher) CurrentLoadSize() int { + return len(lpb.buffer) +} diff --git a/influxdb3/batching/lp_batcher_test.go b/influxdb3/batching/lp_batcher_test.go new file mode 100644 index 0000000..03dddc7 --- /dev/null +++ b/influxdb3/batching/lp_batcher_test.go @@ -0,0 +1,285 @@ +package batching + +import ( + "fmt" + "strings" + "sync" + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestLPDefaultValues(t *testing.T) { + lpb := NewLPBatcher() + + assert.Equal(t, DefaultByteBatchSize, lpb.size) + assert.Equal(t, DefaultBufferCapacity, lpb.capacity) + assert.Nil(t, lpb.callbackReady) + assert.Nil(t, lpb.callbackByteEmit) +} + +func TestLPCustomValues(t *testing.T) { + size := 2048 + capacity := size * 2 + + lpb := NewLPBatcher( + WithBufferSize(size), + WithBufferCapacity(capacity), + ) + + assert.Equal(t, size, lpb.size) + assert.Equal(t, capacity, lpb.capacity) + assert.Nil(t, lpb.callbackReady) + assert.Nil(t, lpb.callbackByteEmit) +} + +func TestLPBatcherCreate(t *testing.T) { + size := 1000 + capacity := size * 2 + + var emitted bool + var emittedBytes []byte + + l := NewLPBatcher( + WithBufferSize(size), + WithBufferCapacity(capacity), + WithEmitBytesCallback(func(ba []byte) { + emitted = true + emittedBytes = ba + }), + ) + + assert.Equal(t, size, l.size) + assert.Equal(t, capacity, l.capacity) + assert.False(t, emitted) + assert.Nil(t, emittedBytes) + assert.NotNil(t, l.callbackByteEmit) + assert.Nil(t, l.callbackReady) +} + +func TestLPReady(t *testing.T) { + size := 10 + capacity := size * 2 + lpb := NewLPBatcher(WithBufferSize(size), WithBufferCapacity(capacity)) + lpb.Add("0123456789ABCDEF") + + assert.True(t, lpb.Ready(), "LPBatcher should be ready when the batch size is reached") +} + +func TestLPReadyCallback(t *testing.T) { + size := 10 + capacity := size * 2 + readyCalled := false + + lpb := NewLPBatcher(WithBufferSize(size), + WithBufferCapacity(capacity), + WithByteEmitReadyCallback(func() { + readyCalled = true + })) + + lpb.Add("0123456789ABCDEF") + + assert.True(t, readyCalled) +} + +func TestEmitEmptyBatcher(t *testing.T) { + size := 256 + capacity := size * 2 + + lpb := NewLPBatcher(WithBufferSize(size), WithBufferCapacity(capacity)) + + results := lpb.Emit() + + assert.Empty(t, results) +} + +func TestAddLineAppendsLF(t *testing.T) { + size := 256 + capacity := size * 2 + + lpb := NewLPBatcher(WithBufferSize(size), WithBufferCapacity(capacity)) + lines := []string{ + "cpu,location=roswell,id=R2D2 fVal=3.14,iVal=42i", + "cpu,location=dyatlov,id=C3PO fVal=2.71,iVal=21i", + "cpu,location=titan,id=HAL69 fVal=1.41,iVal=7i", + } + lpb.Add(lines...) + results := lpb.Emit() + assert.Equal(t, []byte(strings.Join(lines, "\n")+"\n"), results) +} + +func TestAddLineAppendsNoLFWhenPresent(t *testing.T) { + size := 256 + capacity := size * 2 + lpb := NewLPBatcher(WithBufferSize(size), WithBufferCapacity(capacity)) + lines := []string{ + "cpu,location=roswell,id=R2D2 fVal=3.14,iVal=42i\n", + "cpu,location=dyatlov,id=C3PO fVal=2.71,iVal=21i\n", + "cpu,location=titan,id=HAL69 fVal=1.41,iVal=7i\n", + } + lpb.Add(lines...) + results := lpb.Emit() + assert.Equal(t, []byte(strings.Join(lines, "")), results) +} + +func TestLPAddAndPartialEmit(t *testing.T) { + size := 500 + capacity := size * 2 + emitCount := 0 + emittedBytes := make([]byte, 0) + + lineTemplate := "cpu,location=tabor fVal=2.71,count=%di" + lines := make([]string, 5) + lineByteCt := 0 + for n := range 5 { + lines[n] = fmt.Sprintf(lineTemplate, n+1) + lineByteCt += len([]byte(lines[n])) + 1 + } + + verify := strings.Join(lines, "\n") + verify += "\n" + + lpb := NewLPBatcher( + WithBufferSize(size), + WithBufferCapacity(capacity), + WithEmitBytesCallback(func(ba []byte) { + emitCount++ + emittedBytes = append(emittedBytes, ba...) + })) + lpb.Add(lines...) + + assert.Equal(t, lineByteCt, lpb.CurrentLoadSize()) + + packet := lpb.Emit() + + assert.Equal(t, verify, string(packet)) + assert.Equal(t, 0, lpb.CurrentLoadSize()) + assert.Equal(t, 0, emitCount) // callback should not have been called + assert.Empty(t, emittedBytes) // callback should not have been called +} + +func TestLPAddAndEmitCallBack(t *testing.T) { + batchSize := 1000 // Bytes + capacity := 10000 // Bytes + emitCount := 0 + emittedBytes := make([]byte, 0) + readyCalled := 0 + + lps2emit := make([]string, 100) + + lpb := NewLPBatcher( + WithBufferSize(batchSize), + WithBufferCapacity(capacity), + WithByteEmitReadyCallback(func() { + readyCalled++ + }), + WithEmitBytesCallback(func(b []byte) { + emitCount++ + emittedBytes = append(emittedBytes, b...) + })) + + for n := range lps2emit { + lps2emit[n] = fmt.Sprintf("lptest,foo=bar count=%di", n+1) + } + + for i := range lps2emit { + if i > 0 && i%10 == 0 { + set := lps2emit[i-10 : i] + lpb.Add(set...) + } + } + // add lingering set + lpb.Add(lps2emit[len(lps2emit)-10:]...) + + verify := strings.Join(lps2emit, "\n") + verify += "\n" + + assert.False(t, lpb.Ready()) + + emittedBytes = append(emittedBytes, lpb.Emit()...) // drain any leftovers + + expectCall := len(emittedBytes) / batchSize + assert.Equal(t, expectCall, emitCount) + assert.Equal(t, verify, string(emittedBytes)) + assert.Equal(t, expectCall, readyCalled) +} + +func TestLPBufferFlush(t *testing.T) { + size := 10 + capacity := size * 2 + + lpb := NewLPBatcher(WithBufferSize(size), WithBufferCapacity(capacity)) + testString := "0123456789ABCDEF\n" + + assert.Equal(t, 0, lpb.CurrentLoadSize()) + lpb.Add(testString) + assert.Equal(t, len(testString), lpb.CurrentLoadSize()) + packet := lpb.Flush() + assert.Equal(t, 0, lpb.CurrentLoadSize()) + assert.Equal(t, testString, string(packet)) +} + +func TestLPThreadSafety(t *testing.T) { + size := 80 + capacity := size * 2 + var wg sync.WaitGroup + emitCt := 0 + testString := "123456789ABCDEF\n" + + lpb := NewLPBatcher(WithBufferSize(size), + WithBufferCapacity(capacity), + WithEmitBytesCallback(func(ba []byte) { + emitCt++ + })) + + for range 25 { + wg.Add(1) + go func() { + defer wg.Done() + for range 4 { + lpb.Add(testString) + } + }() + } + + wg.Wait() + packet := lpb.Emit() + assert.Equal(t, 20, emitCt, "All bytes should have been emitted") + assert.Empty(t, packet, "Remaining bytes should be emitted correctly") +} + +func TestLPAddLargerThanSize(t *testing.T) { + batchSize := 64 + loadFactor := 10 + capacity := batchSize * loadFactor + remainder := 3 + testString := "123456789ABCDEF\n" + stringSet := make([]string, ((batchSize/len(testString))*loadFactor)+remainder) + verify := make([]byte, 0) + for ct := range stringSet { + stringSet[ct] = testString + verify = append(verify, []byte(stringSet[ct])...) + } + + emitCt := 0 + resultBuffer := make([]byte, 0) + lpb := NewLPBatcher( + WithBufferSize(batchSize), + WithBufferCapacity(capacity), + WithEmitBytesCallback(func(ba []byte) { + emitCt++ + resultBuffer = append(resultBuffer, ba...) + })) + + lpb.Add(stringSet...) + + resultBytes := len(resultBuffer) + assert.Equal(t, len(verify)/batchSize, emitCt, "Emit should be called correct number of times") + assert.Equal(t, batchSize*emitCt, resultBytes, + "ResultBuffer should have size of batchSize * number of emit calls ") + checkBuffer := verify[:batchSize*emitCt] + remainBuffer := verify[batchSize*emitCt:] + assert.Equal(t, checkBuffer, resultBuffer) + assert.Equal(t, len(remainBuffer), lpb.CurrentLoadSize()) + assert.Equal(t, remainBuffer, lpb.buffer) +} diff --git a/influxdb3/client_e2e_test.go b/influxdb3/client_e2e_test.go index d581e10..7e16582 100644 --- a/influxdb3/client_e2e_test.go +++ b/influxdb3/client_e2e_test.go @@ -28,19 +28,24 @@ package influxdb3_test import ( "context" "fmt" + "log/slog" + "math" + "math/rand" "os" "strconv" + "strings" "testing" "time" "github.com/InfluxCommunity/influxdb3-go/influxdb3" + "github.com/InfluxCommunity/influxdb3-go/influxdb3/batching" "github.com/apache/arrow/go/v15/arrow" + "github.com/influxdata/line-protocol/v2/lineprotocol" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) func SkipCheck(t *testing.T) { - if _, present := os.LookupEnv("TESTING_INFLUXDB_URL"); !present { t.Skip("TESTING_INFLUXDB_URL not set") } @@ -365,3 +370,235 @@ func TestEscapedStringValues(t *testing.T) { assert.EqualValues(t, "preescaped\\nline and\\ttab", qit.Value()["tag4"]) } } + +func PointFromLineProtocol(lp string) (*influxdb3.Point, error) { + groups := strings.Split(strings.TrimSpace(lp), " ") + head := strings.Split(groups[0], ",") + fieldLines := strings.Split(groups[1], ",") + + if len(head) < 1 { + return nil, fmt.Errorf("invalid line format: %s", lp) + } + + result := influxdb3.NewPointWithMeasurement(head[0]) + + if len(fieldLines) < 1 { + return nil, fmt.Errorf("LineProtocol has no fields: %s", lp) + } + + if len(head) > 1 { + for i := 1; i < len(head); i++ { + tkv := strings.Split(head[i], "=") + result = result.SetTag(tkv[0], tkv[1]) + } + } + + for _, fl := range fieldLines { + fkv := strings.Split(fl, "=") + switch { + case strings.Contains(fkv[1], "\""): + result.SetStringField(fkv[0], fkv[1]) + case strings.Contains(fkv[1], "i"): + fkv[1] = strings.ReplaceAll(fkv[1], "i", "") + ival, err := strconv.ParseInt(fkv[1], 10, 64) + if err != nil { + return nil, err + } + result = result.SetField(fkv[0], ival) + default: + fval, err := strconv.ParseFloat(fkv[1], 64) + if err != nil { + return nil, err + } + result = result.SetField(fkv[0], fval) + } + } + + if len(groups[2]) > 0 { + timestamp, err := strconv.ParseInt(groups[2], 10, 64) + nanoFactor := int64(19 - len(groups[2])) + timestamp *= int64(math.Pow(10.0, float64(nanoFactor))) + if err != nil { + return nil, fmt.Errorf("invalid time format: %s -> %w", lp, err) + } + result = result.SetTimestampWithEpoch(timestamp) + result = result.SetTimestamp(result.Values.Timestamp.UTC()) + } + + return result, nil +} + +// LooseComparePointValues attempts to compare values only but not exact types +// Some value types get coerced in client-server transactions +func LooseEqualPointValues(pvA *influxdb3.PointValues, pvB *influxdb3.PointValues) bool { + if pvA.MeasurementName != pvB.MeasurementName { + return false + } + if pvA.Timestamp != pvB.Timestamp { + return false + } + for tagName := range pvA.Tags { + if pvA.Tags[tagName] != pvB.Tags[tagName] { + return false + } + } + for fieldName := range pvA.Fields { + switch pvA.Fields[fieldName].(type) { + case int, int16, int32, int64: + ai, aiok := pvA.Fields[fieldName].(int64) + bi, biok := pvB.Fields[fieldName].(int64) + if !aiok || !biok { + return false + } + if ai != bi { + return false + } + case float32, float64: + af, afok := pvA.Fields[fieldName].(float64) + bf, bfok := pvB.Fields[fieldName].(float64) + if !afok || !bfok { + return false + } + if af != bf { + return false + } + default: // compare as strings + as, saok := pvA.Fields[fieldName].(string) + bs, sbok := pvB.Fields[fieldName].(string) + if !saok || !sbok { + return false + } + if as != bs { + return false + } + } + } + return true +} + +func TestLPBatcher(t *testing.T) { + SkipCheck(t) + + rnd := rand.New(rand.NewSource(time.Now().UnixNano())) + url := os.Getenv("TESTING_INFLUXDB_URL") + token := os.Getenv("TESTING_INFLUXDB_TOKEN") + database := os.Getenv("TESTING_INFLUXDB_DATABASE") + + client, err := influxdb3.New(influxdb3.ClientConfig{ + Host: url, + Token: token, + Database: database, + }) + require.NoError(t, err) + defer func(client *influxdb3.Client) { + err := client.Close() + if err != nil { + slog.Warn("Failed to close client correctly.") + } + }(client) + + measurement := fmt.Sprintf("ibot%d", rnd.Int63n(99000)+1000) + dataTemplate := "%s,location=%s,id=%s fVal=%f,count=%di %d" + locations := []string{"akron", "dakar", "kyoto", "perth"} + ids := []string{"R2D2", "C3PO", "ROBBIE"} + lines := make([]string, 0) + now := time.Now().UnixMilli() + estBytesCt := 0 + lineCount := 2000 + for n := range lineCount { + lines = append(lines, fmt.Sprintf(dataTemplate, + measurement, + locations[n%len(locations)], + ids[n%len(ids)], + (rnd.Float64()*100)-50.0, n+1, now-int64(n*1000))) + if n%2 == 0 { + lines[n] += "\n" // verify appending LF with every second rec + } else { + estBytesCt++ // LPBatcher appends missing "\n" on odd cases so increase estimate + } + estBytesCt += len([]byte(lines[n])) + } + + size := 4096 + capacity := size * 2 + readyCt := 0 + emitCt := 0 + results := make([]byte, 0) + lag := 0 + lpb := batching.NewLPBatcher( + batching.WithBufferSize(size), + batching.WithBufferCapacity(capacity), + batching.WithByteEmitReadyCallback(func() { + readyCt++ + }), + batching.WithEmitBytesCallback(func(ba []byte) { + emitCt++ + // N.B. LPBatcher emits up to last '\n' in packet so will usually be less than `size` + // lag collects the difference for asserts below + lag += size - len(ba) + results = append(results, ba...) + err := client.Write(context.Background(), ba, influxdb3.WithPrecision(lineprotocol.Millisecond)) + if err != nil { + assert.Fail(t, "Failed to write ba") + } + })) + + sent := 0 + for n := range lines { + if n%100 == 0 { + lpb.Add(lines[sent : sent+100]...) + sent += 100 + } + } + lpb.Add(lines[sent:]...) // add remainder + + // Check that collected emits make sense + assert.Equal(t, readyCt, emitCt) + assert.Equal(t, estBytesCt+lag, size*emitCt+lpb.CurrentLoadSize()) + + // emit anything left over + leftover := lpb.Emit() + assert.Zero(t, lpb.CurrentLoadSize()) + err = client.Write(context.Background(), leftover, influxdb3.WithPrecision(lineprotocol.Millisecond)) + if err != nil { + assert.Fail(t, "Failed to write leftover bytes from lpb - LPBatcher") + } + results = append(results, leftover...) + + // Retrieve and check results + query := fmt.Sprintf("SELECT * FROM \"%s\" WHERE time >= now() - interval '90 minutes' Order by count", + measurement) + + qiterator, qerr := client.Query(context.Background(), query) + + if qerr != nil { + assert.Failf(t, "Failed to query.", "query: %s", query) + } + + var pvResults []*influxdb3.PointValues + for qiterator.Next() { + pvResults = append(pvResults, qiterator.AsPoints()) + } + + // Check random retrieved samples match source LineProtocol + samples := 10 + for n := range samples { + index := 0 + if n > 0 { // always test first value + index = rnd.Intn(len(lines)) + } + if n == (samples - 1) { + index = len(lines) - 1 // always test last value + } + point, cnvErr := PointFromLineProtocol(lines[index]) + if cnvErr != nil { + assert.Failf(t, "Failed to deserialize point", "index: %d, line: %d", index, lines[index]) + } + if point != nil { + point.Values.MeasurementName = "" + assert.True(t, LooseEqualPointValues(point.Values, pvResults[index])) + } else { + assert.Fail(t, "Nil returned on deserialize point", "index: %d, line: %d", index, lines[index]) + } + } +}