Skip to content

Commit

Permalink
test: (WIP) starting integration test for LPBatcher.
Browse files Browse the repository at this point in the history
  • Loading branch information
karel-rehor committed Oct 30, 2024
1 parent 455d2a1 commit d36d020
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 7 deletions.
2 changes: 0 additions & 2 deletions influxdb3/batching/lp_batcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,9 +77,7 @@ func (l *LPBatcher) Add(lines ...string) {
l.callbackReady()
}
if l.callbackByteEmit != nil {
// fmt.Printf("DEBUG calling emit function\n")
l.callbackByteEmit(l.emitBytes())
//fmt.Printf("DEBUG l.buffer #%s#\n", string(l.buffer))
} else {
// no emitter callback
if l.CurrentLoadSize() > (l.capacity - l.size) {
Expand Down
25 changes: 20 additions & 5 deletions influxdb3/client_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -371,6 +371,7 @@ func TestEscapedStringValues(t *testing.T) {

func TestBatchLP(t *testing.T) {
SkipCheck(t)
// TODO add asserts

url := os.Getenv("TESTING_INFLUXDB_URL")
token := os.Getenv("TESTING_INFLUXDB_TOKEN")
Expand All @@ -390,7 +391,7 @@ func TestBatchLP(t *testing.T) {
lines := make([]string, 0)
now := time.Now().UnixMilli()
//rand.Seed(now.UnixNano())
for n := range 20 {
for n := range 2000 {
lines = append(lines, fmt.Sprintf(dataTemplate, locations[n%len(locations)],
ids[n%len(ids)],
(rand.Float64()*100)-50.0, n+1, now-int64(n*1000)))
Expand All @@ -401,7 +402,7 @@ func TestBatchLP(t *testing.T) {

fmt.Printf("DEBUG lines %+v\n", lines)

size := 256
size := 4096
capacity := size * 2
readyCt := 0
emitCt := 0
Expand All @@ -424,19 +425,33 @@ func TestBatchLP(t *testing.T) {
//fmt.Printf("DEBUG type(lpb) %s\n", reflect.Type(lpb))
fmt.Printf("DEBUG lpb: %+v\n", lpb)

for _, line := range lines {
lpb.Add(line)
sent := 0
for n, _ := range lines {
if n%100 == 0 {
fmt.Printf("DEBUG sent %d\n", sent)
lpb.Add(lines[sent : sent+100]...)
sent += 100
}
}
lpb.Add(lines[sent:len(lines)]...) // add remainder

fmt.Printf("DEBUG readyCt: %d\n", readyCt)
fmt.Printf("DEBUG emitCt: %d\n", emitCt)
fmt.Printf("DEBUG results: %+v\n", string(results))
fmt.Printf("DEBUG lpb.buffer: %+v\n", lpb.CurrentLoadSize())
fmt.Printf("DEBUG getting rest\n")

leftover := lpb.Emit() // emit anything left over
err = client.Write(context.Background(), leftover, influxdb3.WithPrecision(lineprotocol.Millisecond))
if err != nil {
fmt.Printf("ERROR %v\n", err)
}
results = append(results, leftover...)

fmt.Printf("DEBUG last emit to string: %s\n", string(lpb.Emit()))
fmt.Printf("DEBUG loadsize %d\n", lpb.CurrentLoadSize())

query := "SELECT * FROM \"ibot\" WHERE time >= now() - interval '1 minutes' Order by count"
query := "SELECT * FROM \"ibot\" WHERE time >= now() - interval '90 minutes' Order by count"

qResults, qerr := client.Query(context.Background(), query)

Expand Down

0 comments on commit d36d020

Please sign in to comment.