diff --git a/influxdb3/batching/lp_batcher.go b/influxdb3/batching/lp_batcher.go index c60f694..44116a6 100644 --- a/influxdb3/batching/lp_batcher.go +++ b/influxdb3/batching/lp_batcher.go @@ -77,9 +77,7 @@ func (l *LPBatcher) Add(lines ...string) { l.callbackReady() } if l.callbackByteEmit != nil { - // fmt.Printf("DEBUG calling emit function\n") l.callbackByteEmit(l.emitBytes()) - //fmt.Printf("DEBUG l.buffer #%s#\n", string(l.buffer)) } else { // no emitter callback if l.CurrentLoadSize() > (l.capacity - l.size) { diff --git a/influxdb3/client_e2e_test.go b/influxdb3/client_e2e_test.go index 848705f..d21e032 100644 --- a/influxdb3/client_e2e_test.go +++ b/influxdb3/client_e2e_test.go @@ -371,6 +371,7 @@ func TestEscapedStringValues(t *testing.T) { func TestBatchLP(t *testing.T) { SkipCheck(t) + // TODO add asserts url := os.Getenv("TESTING_INFLUXDB_URL") token := os.Getenv("TESTING_INFLUXDB_TOKEN") @@ -390,7 +391,7 @@ func TestBatchLP(t *testing.T) { lines := make([]string, 0) now := time.Now().UnixMilli() //rand.Seed(now.UnixNano()) - for n := range 20 { + for n := range 2000 { lines = append(lines, fmt.Sprintf(dataTemplate, locations[n%len(locations)], ids[n%len(ids)], (rand.Float64()*100)-50.0, n+1, now-int64(n*1000))) @@ -401,7 +402,7 @@ func TestBatchLP(t *testing.T) { fmt.Printf("DEBUG lines %+v\n", lines) - size := 256 + size := 4096 capacity := size * 2 readyCt := 0 emitCt := 0 @@ -424,19 +425,33 @@ func TestBatchLP(t *testing.T) { //fmt.Printf("DEBUG type(lpb) %s\n", reflect.Type(lpb)) fmt.Printf("DEBUG lpb: %+v\n", lpb) - for _, line := range lines { - lpb.Add(line) + sent := 0 + for n, _ := range lines { + if n%100 == 0 { + fmt.Printf("DEBUG sent %d\n", sent) + lpb.Add(lines[sent : sent+100]...) + sent += 100 + } } + lpb.Add(lines[sent:len(lines)]...) // add remainder fmt.Printf("DEBUG readyCt: %d\n", readyCt) fmt.Printf("DEBUG emitCt: %d\n", emitCt) fmt.Printf("DEBUG results: %+v\n", string(results)) fmt.Printf("DEBUG lpb.buffer: %+v\n", lpb.CurrentLoadSize()) fmt.Printf("DEBUG getting rest\n") + + leftover := lpb.Emit() // emit anything left over + err = client.Write(context.Background(), leftover, influxdb3.WithPrecision(lineprotocol.Millisecond)) + if err != nil { + fmt.Printf("ERROR %v\n", err) + } + results = append(results, leftover...) + fmt.Printf("DEBUG last emit to string: %s\n", string(lpb.Emit())) fmt.Printf("DEBUG loadsize %d\n", lpb.CurrentLoadSize()) - query := "SELECT * FROM \"ibot\" WHERE time >= now() - interval '1 minutes' Order by count" + query := "SELECT * FROM \"ibot\" WHERE time >= now() - interval '90 minutes' Order by count" qResults, qerr := client.Query(context.Background(), query)