From 58241902edc0c9c4ff3701a7b0b0e2b30c36299f Mon Sep 17 00:00:00 2001
From: karel rehor <karl.koerner@bonitoo.io>
Date: Fri, 25 Oct 2024 17:27:20 +0200
Subject: [PATCH] feat: (WIP) LPBatching - refactor, tidy, and additional tests

---
 influxdb3/batching/batcher.go         | 127 ++++-------------
 influxdb3/batching/batcher_test.go    |  81 +++++------
 influxdb3/batching/example_test.go    |   2 +-
 influxdb3/batching/lp_batcher.go      |  75 +++++-----
 influxdb3/batching/lp_batcher_test.go | 193 ++++++++++++++++++++++++--
 5 files changed, 286 insertions(+), 192 deletions(-)

diff --git a/influxdb3/batching/batcher.go b/influxdb3/batching/batcher.go
index aef4157..22ed7ed 100644
--- a/influxdb3/batching/batcher.go
+++ b/influxdb3/batching/batcher.go
@@ -24,30 +24,19 @@ THE SOFTWARE.
 package batching
 
 import (
+	"fmt"
 	"log/slog"
 	"sync"
 
 	"github.com/InfluxCommunity/influxdb3-go/influxdb3"
 )
 
-const (
-	BatchUnknown = iota
-	BatchPoints  = iota
-	BatchLP      = iota
-)
-
-type Option func(*interface{})
-
 // Option to adapt properties of a batcher
-type PBOption func(*Batcher)
+type Option func(*interface{})
 
 // WithSize changes the batch-size emitted by the batcher
-/*func WithSize(size int) PBOption {
-	return func(b *Batcher) {
-		b.size = size
-	}
-}*/
-
+// With the standard Batcher the implied unit is a Point
+// With the LPBatcher the implied unit is a byte
 func WithSize(size int) Option {
 	return func(b *interface{}) {
 		if bb, bok := (*b).(*Batcher); bok {
@@ -55,18 +44,14 @@ func WithSize(size int) Option {
 		} else if lb, lok := (*b).(*LPBatcher); lok {
 			lb.size = size
 		} else {
-			slog.Warn("Failed to match Batcher in WithCapacityOpt. Value not set.")
+			slog.Warn("Failed to match Batcher type in WithSize. Value not set.")
 		}
 	}
 }
 
-// WithCapacity changes the initial capacity of the points buffer
-/*func WithCapacity(capacity int) PBOption {
-	return func(b *Batcher) {
-		b.capacity = capacity
-	}
-}*/
-
+// WithCapacity changes the initial capacity of the internal buffer
+// With the standard Batcher implied unit is a Point
+// With the LPBatcher the implied unit is a byte
 func WithCapacity(capacity int) Option {
 	return func(b *interface{}) {
 		if bb, bok := (*b).(*Batcher); bok {
@@ -74,7 +59,7 @@ func WithCapacity(capacity int) Option {
 		} else if lb, lok := (*b).(*LPBatcher); lok {
 			lb.capacity = capacity
 		} else {
-			slog.Warn("Failed to match Batcher in WithCapacityOpt. Value not set.")
+			slog.Warn("Failed to match Batcher type in WithCapacity. Value not set.")
 		}
 	}
 }
@@ -82,12 +67,6 @@ func WithCapacity(capacity int) Option {
 // WithReadyCallback sets the function called when a new batch is ready. The
 // batcher will wait for the callback to finish, so please return as fast as
 // possible and move long-running processing to a  go-routine.
-/*func WithReadyCallback(f func()) PBOption {
-	return func(b *Batcher) {
-		b.callbackReady = f
-	}
-}*/
-
 func WithReadyCallback(f func()) Option {
 	return func(b *interface{}) {
 		if bb, bok := (*b).(*Batcher); bok {
@@ -95,27 +74,21 @@ func WithReadyCallback(f func()) Option {
 		} else if lb, lok := (*b).(*LPBatcher); lok {
 			lb.callbackReady = f
 		} else {
-			slog.Warn("Failed to match Batcher in WithReadyCallbackOpt. Callback not set.")
+			slog.Warn("Failed to match Batcher type in WithReadyCallback. Callback not set.")
 		}
 	}
 
 }
 
-// WithEmitPointsCallback sets the function called when a new batch is ready with the
+// WithEmitCallback sets the function called when a new batch is ready with the
 // batch of points. The batcher will wait for the callback to finish, so please
 // return as fast as possible and move long-running processing to a go-routine.
-/*func WithEmitPointsCallback(f func([]*influxdb3.Point)) PBOption {
-	return func(b *Batcher) {
-		b.callbackEmit = f
-	}
-}*/
-
-func WithEmitPointsCallback(f func([]*influxdb3.Point)) Option {
+func WithEmitCallback(f func([]*influxdb3.Point)) Option {
 	return func(b *interface{}) {
 		if bb, bok := (*b).(*Batcher); bok {
 			bb.callbackEmit = f
 		} else {
-			slog.Warn("Failed to match Batcher in WithEmitPointsCallbackOpt. Callback not set.")
+			slog.Warn("Failed to match type Batcher in WithEmitPointsCallback. Callback not set.")
 		}
 	}
 }
@@ -175,7 +148,6 @@ func (b *Batcher) Add(p ...*influxdb3.Point) {
 
 	// Add the point
 	b.points = append(b.points, p...)
-	//b.addToBuffer(interfaces...)
 
 	// Call callbacks if a new batch is ready
 	for b.isReady() {
@@ -186,6 +158,13 @@ func (b *Batcher) Add(p ...*influxdb3.Point) {
 			b.callbackEmit(b.emitPoints())
 		} else {
 			// no emitter callback
+			if b.CurrentLoadSize() >= (b.capacity - b.size) {
+				slog.Warn(
+					fmt.Sprintf("Batcher is ready, but no callbackEmit is available.  "+
+						"Batcher load is %d points waiting to be emitted.",
+						b.CurrentLoadSize()),
+				)
+			}
 			break
 		}
 	}
@@ -221,61 +200,15 @@ func (b *Batcher) emitPoints() []*influxdb3.Point {
 	return points
 }
 
-/*
-func (b *Batcher) addToBuffer(items ...*interface{}) {
-	b.Lock()
-	defer b.Unlock()
-
-	b.buffer = append(b.buffer, items...)
-
-	//Call callbacks if a new batch is ready
-	if b.isReady() {
-		if b.callbackReady != nil {
-			b.callbackReady()
-		}
-		if b.callbackEmit != nil {
-			// ??? and if its line protocol?
-			b.callbackEmit(b.emitPoints())
-		}
-	}
+// Flush drains all points even if buffer currently larger than size.
+// It does not call the callbackEmit method
+func (b *Batcher) Flush() []*influxdb3.Point {
+	slog.Info(fmt.Sprintf("Flushing all points (%d) from buffer.", b.CurrentLoadSize()))
+	points := b.points
+	b.points = b.points[len(points):]
+	return points
 }
-*/
-/*
-func (b *Batcher) AddPoints(p ...*influxdb3.Point) error {
-	//b.Lock()
-	//defer b.Unlock()
-	if b.idiom != BatchIdiomPoints {
-		if len(b.buffer) == 0 {
-			b.idiom = BatchIdiomPoints
-		} else {
-			return errors.New("this batcher does not support the Point idiom")
-		}
-	}
-	interfaces := make([]*interface{}, len(p))
-	// Add the point
-	for i, point := range p {
-		var iface interface{} = point
-		interfaces[i] = &iface
-	}
-	//b.points = append(b.points, p...)
-	b.addToBuffer(interfaces...)
-	return nil
-} */
 
-/*
-func (b *Batcher) AddLP(lines ...string) error {
-	if b.idiom != BatchIdiomLP {
-		if len(b.buffer) == 0 {
-			b.idiom = BatchIdiomLP
-		} else {
-			return errors.New("this batcher does not support the Line Protocol (LP) idiom")
-		}
-	}
-	interfaces := make([]*interface{}, len(lines))
-	for n, line := range lines {
-		var iface interface{} = line
-		interfaces[n] = &iface
-	}
-	b.addToBuffer(interfaces...)
-	return nil
-} */
+func (b *Batcher) CurrentLoadSize() int {
+	return len(b.points)
+}
diff --git a/influxdb3/batching/batcher_test.go b/influxdb3/batching/batcher_test.go
index 2c6d1d9..459c0b5 100644
--- a/influxdb3/batching/batcher_test.go
+++ b/influxdb3/batching/batcher_test.go
@@ -24,13 +24,11 @@ package batching
 
 import (
 	"fmt"
-	"reflect"
 	"sync"
 	"testing"
 	"time"
 
 	"github.com/InfluxCommunity/influxdb3-go/influxdb3"
-	"github.com/influxdata/line-protocol/v2/lineprotocol"
 	"github.com/stretchr/testify/assert"
 )
 
@@ -55,46 +53,20 @@ func TestCustomValues(t *testing.T) {
 	assert.Equal(t, capacity, cap(b.points))
 }
 
-func TestAddAndDefaultEmitPointDefault(t *testing.T) {
-	batchSize := 10
-	//emitted := false
-	points2emit := make([]*influxdb3.Point, batchSize)
-
-	b := NewBatcher(WithSize(batchSize))
-	for n := range batchSize / 2 {
-		points2emit[n] = influxdb3.NewPointWithMeasurement("pointtest").
-			SetTag("foo", "bar").
-			SetIntegerField("count", int64(n+1))
-	}
-	fmt.Printf("\nDEBUG points2emit[0]: %v\n", reflect.TypeOf(points2emit[0]))
-	b.Add(points2emit...)
-	// force Emit
-	result := b.Emit()
-	fmt.Printf("\nDEBUG Inspect result %+v", result)
-	fmt.Printf("\nDEBUG Inspect result %+v", reflect.TypeOf(result[0]))
-	lp, err := result[0].MarshalBinary(lineprotocol.Millisecond)
-	if err != nil {
-		fmt.Printf("err: %s\n", err)
-	}
-	fmt.Printf("\nDEBUG Inspect lp %s", string(lp))
-}
-
-func TestAddAndCallBackEmitPoint(t *testing.T) {
+func TestAddAndCallBackEmit(t *testing.T) {
 	batchSize := 5
 	emitted := false
 	var emittedPoints []*influxdb3.Point
 
 	b := NewBatcher(
 		WithSize(batchSize),
-		WithEmitPointsCallback(func(points []*influxdb3.Point) {
-			fmt.Printf("callback called with %v\n", points)
+		WithEmitCallback(func(points []*influxdb3.Point) {
 			emitted = true
 			emittedPoints = points
 		}),
 	)
 
 	for range batchSize {
-		fmt.Printf("Adding point")
 		b.Add(&influxdb3.Point{})
 	}
 
@@ -139,7 +111,7 @@ func TestPartialEmit(t *testing.T) {
 
 	b := NewBatcher(
 		WithSize(batchSize),
-		WithEmitPointsCallback(func(points []*influxdb3.Point) {
+		WithEmitCallback(func(points []*influxdb3.Point) {
 			emitted = true
 		}),
 	)
@@ -159,7 +131,7 @@ func TestThreadSafety(t *testing.T) {
 	emits := 0
 	b := NewBatcher(
 		WithSize(batchSize),
-		WithEmitPointsCallback(func(points []*influxdb3.Point) {
+		WithEmitCallback(func(points []*influxdb3.Point) {
 			emits++
 		}),
 	)
@@ -184,7 +156,9 @@ func TestThreadSafety(t *testing.T) {
 func TestAddLargerThanSize(t *testing.T) {
 	batchSize := 5
 	emitCt := 0
-	pointSet := make([]*influxdb3.Point, (batchSize*10)+3)
+	loadFactor := 10
+	remainder := 3
+	pointSet := make([]*influxdb3.Point, (batchSize*loadFactor)+remainder)
 	for ct := range pointSet {
 		pointSet[ct] = influxdb3.NewPoint("test",
 			map[string]string{"foo": "bar"},
@@ -192,29 +166,38 @@ func TestAddLargerThanSize(t *testing.T) {
 			time.Now())
 	}
 
-	for _, v := range pointSet {
-		lp, _ := v.MarshalBinary(lineprotocol.Millisecond)
-		fmt.Printf("%s", string(lp))
-	}
-	subSet := pointSet[:5]
-	fmt.Printf("DEBUG subset: %v\n", subSet)
 	resultSet := make([]*influxdb3.Point, 0)
 	b := NewBatcher(WithSize(batchSize),
-		WithEmitPointsCallback(func(points []*influxdb3.Point) {
+		WithCapacity(batchSize*3),
+		WithEmitCallback(func(points []*influxdb3.Point) {
 			resultSet = append(resultSet, points...)
 			emitCt++
 		}))
 
 	b.Add(pointSet...)
-	fmt.Printf("\nDEBUG emitCt: %d\n", emitCt)
-	fmt.Printf("\nDEBUG len(resultSet): %d\n", len(resultSet))
-	fmt.Printf("\nDEBUG len(b.points): %d\n", len(b.points))
-	fmt.Printf("\nDEBUG dump resultSet:\n")
-	for _, rp := range resultSet {
-		lp, _ := rp.MarshalBinary(lineprotocol.Millisecond)
-		fmt.Printf("%s", string(lp))
+	expectedCt := len(pointSet) / batchSize
+	assert.Equal(t, expectedCt, emitCt)
+	assert.Equal(t, loadFactor*batchSize, len(resultSet))
+	fmt.Printf("DEBUG resultSet %d\n", len(resultSet))
+	assert.Equal(t, remainder, len(b.points))
+	assert.Equal(t, pointSet[:len(pointSet)-remainder], resultSet)
+}
+
+func TestFlush(t *testing.T) {
+	batchSize := 5
+	loadFactor := 3
+	pointSet := make([]*influxdb3.Point, batchSize*loadFactor)
+	for ct := range pointSet {
+		pointSet[ct] = influxdb3.NewPoint("test",
+			map[string]string{"foo": "bar"},
+			map[string]interface{}{"count": ct + 1},
+			time.Now())
 	}
-	fmt.Println()
-	//assert.Equal(t, pointSet, resultSet)
 
+	b := NewBatcher(WithSize(batchSize), WithCapacity(batchSize*2))
+	b.Add(pointSet...)
+	assert.Equal(t, batchSize*loadFactor, b.CurrentLoadSize())
+	flushed := b.Flush()
+	assert.Equal(t, batchSize*loadFactor, len(flushed))
+	assert.Equal(t, 0, b.CurrentLoadSize())
 }
diff --git a/influxdb3/batching/example_test.go b/influxdb3/batching/example_test.go
index 7b0e668..bed2883 100644
--- a/influxdb3/batching/example_test.go
+++ b/influxdb3/batching/example_test.go
@@ -90,7 +90,7 @@ func Example_batcher() {
 	b = batching.NewBatcher(
 		batching.WithSize(5),
 		batching.WithReadyCallback(func() { fmt.Println("ready") }),
-		batching.WithEmitPointsCallback(func(points []*influxdb3.Point) {
+		batching.WithEmitCallback(func(points []*influxdb3.Point) {
 			err = client.WritePoints(context.Background(), points)
 			if err != nil {
 				log.Fatal(err)
diff --git a/influxdb3/batching/lp_batcher.go b/influxdb3/batching/lp_batcher.go
index 162deac..4a69280 100644
--- a/influxdb3/batching/lp_batcher.go
+++ b/influxdb3/batching/lp_batcher.go
@@ -2,36 +2,20 @@ package batching
 
 import (
 	"bytes"
+	"fmt"
+	"log/slog"
 	"sync"
 )
 
 const DefaultBufferSize = 100000
 const DefaultBufferCapacity = DefaultBufferSize * 2
 
-//type LPOption func(*LPBatcher)
-
-/*
-func WithBufferSize(size int) func(*interface{}) {
-	return func(b *interface{}) {
-		if lpb, ok := (*b).(*LPBatcher); ok {
-			lpb.size = size
-		}
-	}
-}
-
-func WithBufferCapacity(capacity int) Option {
-	return func(b *interface{}) {
-		if lpb, ok := (*b).(*LPBatcher); ok {
-			lpb.capacity = capacity
-		}
-	}
-}
-*/
-
 func WithEmitBytesCallback(f func([]byte)) Option {
 	return func(b *interface{}) {
 		if lpb, ok := (*b).(*LPBatcher); ok {
 			lpb.callbackEmit = f
+		} else {
+			slog.Warn("Failed to match type LPBatcher in WithEmitBytesCallback. Callback not set.")
 		}
 	}
 }
@@ -72,18 +56,29 @@ func (l *LPBatcher) Add(lines ...string) {
 	for _, line := range lines {
 		if len(line) != 0 { // ignore empty lines
 			l.buffer = append(l.buffer, line...)
-			if line[len(line)-1] != '\n' {
+			if line[len(line)-1] != '\n' { //ensure newline demarcation
 				l.buffer = append(l.buffer, '\n')
 			}
 		}
 	}
 
-	if l.isReady() {
+	for l.isReady() {
 		if l.callbackReady != nil {
 			l.callbackReady()
 		}
 		if l.callbackEmit != nil {
 			l.callbackEmit(l.emitBytes())
+		} else {
+			// no emitter callback
+			if l.CurrentLoadSize() > (l.capacity - l.size) {
+				slog.Warn(
+					fmt.Sprintf("Batcher is ready, but no callbackEmit is available.  "+
+						"Batcher load is %d bytes waiting to be emitted.",
+						l.CurrentLoadSize()),
+				)
+			}
+			break
+
 		}
 	}
 }
@@ -98,17 +93,14 @@ func (l *LPBatcher) isReady() bool {
 	return len(l.buffer) >= l.size
 }
 
-func (b *LPBatcher) Emit() []byte {
-	b.Lock()
-	defer b.Unlock()
-
-	packet := b.emitBytes()
-
-	if b.callbackEmit != nil {
-		b.callbackEmit(packet)
-	}
+// Emit returns a new batch of bytes with the provided batch size or with the
+// remaining bytes. Please drain the bytes at the end of your processing to
+// get the remaining bytes not filling up a batch.
+func (l *LPBatcher) Emit() []byte {
+	l.Lock()
+	defer l.Unlock()
 
-	return packet
+	return l.emitBytes()
 }
 
 func (l *LPBatcher) emitBytes() []byte {
@@ -116,8 +108,27 @@ func (l *LPBatcher) emitBytes() []byte {
 
 	prepacket := l.buffer[:c]
 	lastLF := bytes.LastIndexByte(prepacket, '\n')
+
+	if len(prepacket) < 1 || lastLF < 0 {
+		return prepacket
+	}
 	packet := l.buffer[:lastLF]
 	l.buffer = l.buffer[len(packet):]
+	if len(l.buffer) == 1 && l.buffer[0] == '\n' { // removing lingering delimiter
+		l.buffer = l.buffer[1:]
+	}
 
 	return packet
 }
+
+// Flush drains all bytes even if buffer currently larger than size
+func (l *LPBatcher) Flush() []byte {
+	slog.Info(fmt.Sprintf("Flushing all bytes (%d) from buffer.", l.CurrentLoadSize()))
+	packet := l.buffer
+	l.buffer = l.buffer[len(packet):]
+	return packet
+}
+
+func (l *LPBatcher) CurrentLoadSize() int {
+	return len(l.buffer)
+}
diff --git a/influxdb3/batching/lp_batcher_test.go b/influxdb3/batching/lp_batcher_test.go
index b7725c4..a60740d 100644
--- a/influxdb3/batching/lp_batcher_test.go
+++ b/influxdb3/batching/lp_batcher_test.go
@@ -2,14 +2,38 @@ package batching
 
 import (
 	"fmt"
-	"math"
 	"strings"
+	"sync"
 	"testing"
 
 	"github.com/stretchr/testify/assert"
 )
 
-func TestLineProtocolBatcherCreate(t *testing.T) {
+func TestLPDefaultValues(t *testing.T) {
+	lpb := NewLPBatcher()
+
+	assert.Equal(t, DefaultBufferSize, lpb.size)
+	assert.Equal(t, DefaultBufferCapacity, lpb.capacity)
+	assert.Nil(t, lpb.callbackReady)
+	assert.Nil(t, lpb.callbackEmit)
+}
+
+func TestLPCustomValues(t *testing.T) {
+	size := 2048
+	capacity := size * 2
+
+	lpb := NewLPBatcher(
+		WithSize(size),
+		WithCapacity(capacity),
+	)
+
+	assert.Equal(t, size, lpb.size)
+	assert.Equal(t, capacity, lpb.capacity)
+	assert.Nil(t, lpb.callbackReady)
+	assert.Nil(t, lpb.callbackEmit)
+}
+
+func TestLPBatcherCreate(t *testing.T) {
 	size := 1000
 	capacity := size * 2
 
@@ -35,7 +59,67 @@ func TestLineProtocolBatcherCreate(t *testing.T) {
 	assert.Nil(t, l.callbackReady)
 }
 
-func TestAddAndEmitLineProtocolDefault(t *testing.T) {
+func TestLPReady(t *testing.T) {
+	size := 10
+	capacity := size * 2
+	lpb := NewLPBatcher(WithSize(size), WithCapacity(capacity))
+	lpb.Add("0123456789ABCDEF")
+
+	assert.True(t, lpb.Ready(), "LPBatcher should be ready when the batch size is reached")
+}
+
+func TestLPReadyCallback(t *testing.T) {
+	size := 10
+	capacity := size * 2
+	readyCalled := false
+
+	lpb := NewLPBatcher(WithSize(size),
+		WithCapacity(capacity),
+		WithReadyCallback(func() {
+			readyCalled = true
+		}))
+
+	lpb.Add("0123456789ABCDEF")
+
+	assert.True(t, readyCalled)
+}
+
+func TestLPAddAndPartialEmit(t *testing.T) {
+	size := 500
+	capacity := size * 2
+	emitCount := 0
+	emittedBytes := make([]byte, 0)
+
+	lineTemplate := "cpu,location=tabor fVal=2.71,count=%di"
+	lines := make([]string, 5)
+	lineByteCt := 0
+	for n := range 5 {
+		lines[n] = fmt.Sprintf(lineTemplate, n+1)
+		lineByteCt += len([]byte(lines[n])) + 1
+	}
+
+	verif := strings.Join(lines, "\n")
+
+	lpb := NewLPBatcher(
+		WithSize(size),
+		WithCapacity(capacity),
+		WithEmitBytesCallback(func(ba []byte) {
+			emitCount++
+			emittedBytes = append(emittedBytes, ba...)
+		}))
+	lpb.Add(lines...)
+
+	assert.Equal(t, lineByteCt, lpb.CurrentLoadSize())
+
+	packet := lpb.Emit()
+
+	assert.Equal(t, verif, string(packet))
+	assert.Equal(t, 0, lpb.CurrentLoadSize())
+	assert.Equal(t, 0, emitCount)         // callback should not have been called
+	assert.Equal(t, 0, len(emittedBytes)) // callback should not have been called
+}
+
+func TestLPAddAndEmitCallBack(t *testing.T) {
 	batchSize := 1000 // Bytes
 	capacity := 10000 // Bytes
 	emitCount := 0
@@ -44,7 +128,7 @@ func TestAddAndEmitLineProtocolDefault(t *testing.T) {
 
 	lps2emit := make([]string, 100)
 
-	b := NewLPBatcher(
+	lpb := NewLPBatcher(
 		WithSize(batchSize),
 		WithCapacity(capacity),
 		WithReadyCallback(func() {
@@ -54,7 +138,6 @@ func TestAddAndEmitLineProtocolDefault(t *testing.T) {
 			emitCount++
 			emittedBytes = append(emittedBytes, b...)
 		}))
-	fmt.Printf("\nDEBUG b: %+v\n", b)
 
 	for n := range lps2emit {
 		lps2emit[n] = fmt.Sprintf("lptest,foo=bar count=%di", n+1)
@@ -63,22 +146,106 @@ func TestAddAndEmitLineProtocolDefault(t *testing.T) {
 	for i, _ := range lps2emit {
 		if i > 0 && i%10 == 0 {
 			set := lps2emit[i-10 : i]
-			b.Add(set...)
+			lpb.Add(set...)
 		}
 	}
 	// add lingering set
-	b.Add(lps2emit[len(lps2emit)-10:]...)
+	lpb.Add(lps2emit[len(lps2emit)-10:]...)
 
 	verify := strings.Join(lps2emit, "\n")
 
-	assert.False(t, b.Ready())
+	assert.False(t, lpb.Ready())
 
-	_ = b.Emit() // flush any leftovers - to be collected in callback above
+	emittedBytes = append(emittedBytes, lpb.Emit()...) // drain any leftovers
 
-	expectCall := math.Ceil(float64(len(emittedBytes)) / float64(batchSize))
-	expectReady := len(emittedBytes) / batchSize
+	expectCall := len(emittedBytes) / batchSize
 	assert.Equal(t, int(expectCall), emitCount)
 	assert.Equal(t, verify, string(emittedBytes))
-	fmt.Printf("DEBUG expectedReady: %d, readyCalled: %d\n", expectReady, readyCalled)
-	assert.Equal(t, expectReady, readyCalled)
+	assert.Equal(t, expectCall, readyCalled)
+}
+
+func TestLPBufferFlush(t *testing.T) {
+	size := 10
+	capacity := size * 2
+
+	lpb := NewLPBatcher(WithSize(size), WithCapacity(capacity))
+	testString := "0123456789ABCDEF\n"
+
+	assert.Equal(t, 0, lpb.CurrentLoadSize())
+	lpb.Add(testString)
+	assert.Equal(t, len(testString), lpb.CurrentLoadSize())
+	packet := lpb.Flush()
+	assert.Equal(t, 0, lpb.CurrentLoadSize())
+	assert.Equal(t, testString, string(packet))
+}
+
+func TestLPThreadSafety(t *testing.T) {
+	size := 80
+	capacity := size * 2
+	var wg sync.WaitGroup
+	emitCt := 0
+	testString := "123456789ABCDEF\n"
+
+	lpb := NewLPBatcher(WithSize(size),
+		WithCapacity(capacity),
+		WithEmitBytesCallback(func(ba []byte) {
+			emitCt++
+		}))
+
+	for range 25 {
+		wg.Add(1)
+		go func() {
+			defer wg.Done()
+			for range 4 {
+				lpb.Add(testString)
+			}
+		}()
+	}
+
+	wg.Wait()
+	packet := lpb.Emit()
+	assert.Equal(t, 20, emitCt, "All bytes should have been emitted")
+	assert.Empty(t, packet, "Remaining bytes should be emitted correctly")
+}
+
+func TestLPAddLargerThanSize(t *testing.T) {
+	// TODO review test -- appears Emit called too frequently
+	// Look for leading '\n' in lp.buffer
+	size := 64
+	loadFactor := 10
+	capacity := size * loadFactor
+	remainder := 3
+	testString := "123456789ABCDEF\n"
+	stringSet := make([]string, ((size/len(testString))*loadFactor)+remainder)
+	stringSetByteCt := 0
+	for ct := range stringSet {
+		stringSet[ct] = testString
+		stringSetByteCt += len([]byte(testString))
+	}
+
+	fmt.Printf("DEBUG len(stringSet)=%d\n", len(stringSet))
+	fmt.Printf("DEBUG stringSetByteCount %d\n", stringSetByteCt)
+	fmt.Printf("DEBUG stringSet: %v\n", stringSet)
+
+	emitCt := 0
+	resultBuffer := make([]byte, 0)
+	lpb := NewLPBatcher(
+		WithSize(size),
+		WithCapacity(capacity),
+		WithEmitBytesCallback(func(ba []byte) {
+			emitCt++
+			resultBuffer = append(resultBuffer, ba...)
+		}))
+
+	lpb.Add(stringSet...)
+
+	results := strings.Split(string(resultBuffer), "\n")
+	resultsBytes := len(resultBuffer)
+	fmt.Printf("DEBUG emitCt: %d\n", emitCt)
+	fmt.Printf("DEBUG resultsBytes: %d\n", resultsBytes)
+	fmt.Printf("DEBUG len(results): %d\n", len(results))
+	fmt.Printf("DEBUG results: %s\n", results)
+	fmt.Printf("DEBUG lpb.CurrentLoadSize: %d\n", lpb.CurrentLoadSize())
+	fmt.Printf("DEBUG lpb.buffer #%s#\n", string(lpb.buffer))
+
 }