Skip to content

Commit

Permalink
feat: adding LP batching (#112)
Browse files Browse the repository at this point in the history
* feat: (WIP) adding LP batching

* chore: (WIP) tidying and adding methods for LPBatcher

* feat: (WIP) better handling of []byte in LPBatcher.Emit()

* feat: (WIP) LPBatching 1) testof Add([]*point) > size;\n 2) refactoring

* feat: (WIP) LPBatching - refactor, tidy, and additional tests

* feat: (WIP) lp batching - additional tests

* feat: (WIP) adding LP batching

* chore: (WIP) tidying and adding methods for LPBatcher

* feat: (WIP) better handling of []byte in LPBatcher.Emit()

* feat: (WIP) LPBatching 1) testof Add([]*point) > size;\n 2) refactoring

* feat: (WIP) LPBatching - refactor, tidy, and additional tests

* feat: (WIP) lp batching - additional tests

* tests: (WIP) starting integration test of LPBatcher.

* chore: restore custom flag for client_e2e_test

* chore: refactor options handling and extensibility

* test: (WIP) starting integration test for LPBatcher.

* chore: refactoring

* chore: restore build flags to client_e2e_test

* chore: cleanup lint issues

* chore: cleanup comments

* chore: rename interface methods using Set<T>() convention.

* test: updates integration test for LPBatcher

* test: cleanup code in new integration test

* docs: adds examples of LPBatcher usage

* docs: update examples/README.md

* chore: fixing basic issues

* chore: minor restructure lpBatching example

* docs: update CHANGELOG.md

* chore: remove unnecessary log messages, improve examples/README.md
  • Loading branch information
karel-rehor authored Nov 11, 2024
1 parent 349a67f commit 5b63709
Show file tree
Hide file tree
Showing 9 changed files with 1,083 additions and 27 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
## 0.14.0 [unreleased]

### Features

1. [#112](https://github.com/InfluxCommunity/influxdb3-go/pull/112): Adds `LPBatcher` for lineprotocol batching following the model of the Point `Batcher`.

### Bug Fixes

1. [#113](https://github.com/InfluxCommunity/influxdb3-go/pull/113): Honor struct tags on WriteData, avoid panic for unexported fields
Expand Down
143 changes: 143 additions & 0 deletions examples/LPBatching/lpBatching.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
package main

import (
"context"
"fmt"
"log/slog"
"math/rand"
"os"
"text/tabwriter"
"time"

"github.com/InfluxCommunity/influxdb3-go/influxdb3"
"github.com/InfluxCommunity/influxdb3-go/influxdb3/batching"
"github.com/apache/arrow/go/v15/arrow"
)

const LineCount = 100

func main() {
// PREPARE DATA
// Create a random number generator
rnd := rand.New(rand.NewSource(time.Now().UnixNano()))

// Retrieve credentials from environment variables.
url := os.Getenv("INFLUX_URL")
token := os.Getenv("INFLUX_TOKEN")
database := os.Getenv("INFLUX_DATABASE")

// unmanned aquatic surface vehicle as source
dataTemplate := "uasv,id=%s,location=%s speed=%f,bearing=%f,ticks=%di %d"
ids := []string{"orca", "flipper", "gaston"}
syncLocations := []string{"nice", "split", "goa", "cancun"}

// Instantiate a client using your credentials.
client, err := influxdb3.New(influxdb3.ClientConfig{
Host: url,
Token: token,
Database: database,
})

if err != nil {
panic(err)
}

defer func(client *influxdb3.Client) {
err := client.Close()
if err != nil {
slog.Error("Error closing client", err)
}
}(client)

// SYNC WRITE BATCHES
// create a new Line Protocol Batcher
syncLpb := batching.NewLPBatcher(batching.WithBufferSize(4096)) // Set buffer size
t := time.Now().Add(-LineCount * time.Second)

// add new data into the batcher
for range LineCount {
syncLpb.Add(fmt.Sprintf(dataTemplate,
ids[rnd.Intn(len(ids))],
syncLocations[rnd.Intn(len(syncLocations))],
rnd.Float64()*100,
rnd.Float64()*360,
rnd.Intn(100),
t.UnixNano(),
))
t = t.Add(time.Second)

// if ready state reached, emit a batch
if syncLpb.Ready() {
err = client.Write(context.Background(), syncLpb.Emit())
if err != nil {
slog.Error(err.Error())
}
}
}

// Write final batch to client
err = client.Write(context.Background(), syncLpb.Emit())
if err != nil {
slog.Error(err.Error())
}
fmt.Printf("Sync Writes Done. %d Bytes remaining in batcher buffer\n",
syncLpb.CurrentLoadSize())

// ASYNC WRITE BATCHES
asyncLpb := batching.NewLPBatcher(batching.WithBufferSize(4096), // Set buffer size
batching.WithByteEmitReadyCallback(func() { fmt.Println("|-- ready to emit -->") }), // Set ready callback
batching.WithEmitBytesCallback(func(bytes []byte) { // Set Callback to handle emitted bytes
err = client.Write(context.Background(), bytes)
if err != nil {
slog.Error(err.Error())
}
}))

asyncLocations := []string{"ibiza", "dubai", "phuket", "maui"}
t = time.Now().Add(-LineCount * time.Second)

// Add new data to Batcher
for range LineCount {
asyncLpb.Add(fmt.Sprintf(dataTemplate, ids[rnd.Intn(len(ids))],
asyncLocations[rnd.Intn(len(asyncLocations))],
rnd.Float64()*100,
rnd.Float64()*360,
rnd.Intn(100),
t.UnixNano()))
t = t.Add(time.Second)
}

// Write the remaining batch records to the client
err = client.Write(context.Background(), asyncLpb.Emit())
if err != nil {
slog.Error(err.Error())
}
fmt.Printf("Async Writes Done. %d Bytes remaining in batcher buffer\n",
asyncLpb.CurrentLoadSize())

// Prepare an SQL query
query := `
SELECT *
FROM uasv
WHERE time >= now() - interval '5 minutes'
AND location IN ('cancun', 'dubai', 'ibiza')
ORDER BY time DESC
`
iterator, err := client.Query(context.Background(), query)
if err != nil {
slog.Error(err.Error())
}
tw := tabwriter.NewWriter(os.Stdout, 1, 1, 1, ' ', 0)
defer tw.Flush()

fmt.Fprintln(tw, "\nTime\tid\tlocation\tspeed\tbearing\tticks")
for iterator.Next() {
value := iterator.Value()
t := (value["time"].(arrow.Timestamp)).ToTime(arrow.Nanosecond).Format(time.RFC3339)
_, err := fmt.Fprintf(tw, "%v\t%s\t%s\t%.1f\t%.2f\t%d\n", t,
value["id"], value["location"], value["speed"], value["bearing"], value["ticks"])
if err != nil {
slog.Error(err.Error())
}
}
}
3 changes: 2 additions & 1 deletion examples/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,5 @@
- [Write and query data](Basic/basic.go) - A complete Go example that demonstrates the different ways of writing data, and then queries your data stored in InfluxDB v3 (formerly InfluxDB IOx).
- [Downsampling](Downsampling/downsampling.go) - A complete Go example that uses a downsampling query and then writes downsampled data back to a different table.
- [HTTP Error Handling](HTTPErrorHandled/httpErrorHandled.go) - A complete Go example for reading HTTP headers in case of an server error occurs.
- [Batching write](Batching/batching.go) - A complete Go example that demonstrates how to write data in batches.
- [Batching write](Batching/batching.go) - A complete Go example that demonstrates how to write Point data in batches.
- [Line protocol batching write](LPBatchin/lpBatching.go) - A complete GO example demonstrating how to write line protocol data in batches.
105 changes: 81 additions & 24 deletions influxdb3/batching/batcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,64 +24,100 @@ THE SOFTWARE.
package batching

import (
"fmt"
"log/slog"
"sync"

"github.com/InfluxCommunity/influxdb3-go/influxdb3"
)

// Option to adapt properties of a batcher
type Option func(*Batcher)
// DefaultBatchSize is the default number of points emitted
const DefaultBatchSize = 1000

// DefaultCapacity is the default initial capacity of the point buffer
const DefaultCapacity = 2 * DefaultBatchSize

// Emittable provides the base for any type
// that will collect and then emit data upon
// reaching a ready state.
type Emittable interface {
SetSize(s int) // setsize
SetCapacity(c int) // set capacity
SetReadyCallback(rcb func()) // ready Callback
}

// PointEmittable provides the basis for any type emitting
// Point arrays as []*influxdb3.Point
type PointEmittable interface {
Emittable
SetEmitCallback(epcb func([]*influxdb3.Point)) // callback for emitting points
}

type Option func(PointEmittable)

// WithSize changes the batch-size emitted by the batcher
func WithSize(size int) Option {
return func(b *Batcher) {
b.size = size
return func(b PointEmittable) {
b.SetSize(size)
}
}

// WithCapacity changes the initial capacity of the points buffer
// WithCapacity changes the initial capacity of the internal buffer
func WithCapacity(capacity int) Option {
return func(b *Batcher) {
b.capacity = capacity
return func(b PointEmittable) {
b.SetCapacity(capacity)
}
}

// WithReadyCallback sets the function called when a new batch is ready. The
// batcher will wait for the callback to finish, so please return as fast as
// possible and move long-running processing to a go-routine.
func WithReadyCallback(f func()) Option {
return func(b *Batcher) {
b.callbackReady = f
return func(b PointEmittable) {
b.SetReadyCallback(f)
}
}

// WithEmitCallback sets the function called when a new batch is ready with the
// batch of points. The batcher will wait for the callback to finish, so please
// return as fast as possible and move long-running processing to a go-routine.
func WithEmitCallback(f func([]*influxdb3.Point)) Option {
return func(b *Batcher) {
b.callbackEmit = f
return func(b PointEmittable) {
b.SetEmitCallback(f)
}
}

// DefaultBatchSize is the default number of points emitted
const DefaultBatchSize = 1000

// DefaultCapacity is the default initial capacity of the point buffer
const DefaultCapacity = 2 * DefaultBatchSize

// Batcher collects points and emits them as batches
type Batcher struct {
size int
capacity int

size int
capacity int
callbackReady func()
callbackEmit func([]*influxdb3.Point)

points []*influxdb3.Point
sync.Mutex
}

// SetSize sets the batch size. Units are Points.
func (b *Batcher) SetSize(s int) {
b.size = s
}

// SetCapacity sets the initial Capacity of the internal []*influxdb3.Point buffer.
func (b *Batcher) SetCapacity(c int) {
b.capacity = c
}

// SetReadyCallback sets the callbackReady function.
func (b *Batcher) SetReadyCallback(f func()) {
b.callbackReady = f
}

// SetEmitCallback sets the callbackEmit function.
func (b *Batcher) SetEmitCallback(f func([]*influxdb3.Point)) {
b.callbackEmit = f
}

// NewBatcher creates and initializes a new Batcher instance applying the
// specified options. By default, a batch-size is DefaultBatchSize and the
// initial capacity is DefaultCapacity.
Expand All @@ -97,7 +133,7 @@ func NewBatcher(options ...Option) *Batcher {
o(b)
}

// Setup the internal data
// setup internal data
b.points = make([]*influxdb3.Point, 0, b.capacity)

return b
Expand All @@ -112,13 +148,22 @@ func (b *Batcher) Add(p ...*influxdb3.Point) {
b.points = append(b.points, p...)

// Call callbacks if a new batch is ready
if b.isReady() {
for b.isReady() {
if b.callbackReady != nil {
b.callbackReady()
}
if b.callbackEmit != nil {
b.callbackEmit(b.emitPoints())
if b.callbackEmit == nil {
// no emitter callback
if b.CurrentLoadSize() >= (b.capacity - b.size) {
slog.Warn(
fmt.Sprintf("Batcher is ready, but no callbackEmit is available. "+
"Batcher load is %d points waiting to be emitted.",
b.CurrentLoadSize()),
)
}
break
}
b.callbackEmit(b.emitPoints())
}
}

Expand Down Expand Up @@ -151,3 +196,15 @@ func (b *Batcher) emitPoints() []*influxdb3.Point {

return points
}

// Flush drains all points even if the internal buffer is currently larger than size.
// It does not call the callbackEmit method
func (b *Batcher) Flush() []*influxdb3.Point {
points := b.points
b.points = b.points[:0]
return points
}

func (b *Batcher) CurrentLoadSize() int {
return len(b.points)
}
Loading

0 comments on commit 5b63709

Please sign in to comment.