From 26f9290ef19de88409a3b48f78e638bc9f4b7319 Mon Sep 17 00:00:00 2001 From: karel rehor Date: Mon, 23 Sep 2024 17:10:05 +0200 Subject: [PATCH 1/3] feat: support newline chars in tag values. --- influxdb3/client_e2e_test.go | 53 ++++++++++++++++++++++++++++++++++++ influxdb3/point.go | 8 ++++-- influxdb3/point_test.go | 43 +++++++++++++++++++++++++++++ 3 files changed, 102 insertions(+), 2 deletions(-) diff --git a/influxdb3/client_e2e_test.go b/influxdb3/client_e2e_test.go index af793da..4b71724 100644 --- a/influxdb3/client_e2e_test.go +++ b/influxdb3/client_e2e_test.go @@ -39,7 +39,21 @@ import ( "github.com/stretchr/testify/require" ) +func SkipCheck(t *testing.T) { + + if _, present := os.LookupEnv("TESTING_INFLUXDB_URL"); !present { + t.Skip("TESTING_INFLUXDB_URL not set") + } + if _, present := os.LookupEnv("TESTING_INFLUXDB_TOKEN"); !present { + t.Skip("TESTING_INFLUXDB_TOKEN not set") + } + if _, present := os.LookupEnv("TESTING_INFLUXDB_DATABASE"); !present { + t.Skip("TESTING_INFLUXDB_DATABASE not set") + } +} + func TestWriteAndQueryExample(t *testing.T) { + SkipCheck(t) now := time.Now().UTC() testId := now.UnixNano() @@ -153,6 +167,7 @@ func TestWriteAndQueryExample(t *testing.T) { } func TestQueryWithParameters(t *testing.T) { + SkipCheck(t) now := time.Now().UTC() testId := now.UnixNano() @@ -221,6 +236,7 @@ func TestQueryWithParameters(t *testing.T) { } func TestQueryDatabaseDoesNotExist(t *testing.T) { + SkipCheck(t) url := os.Getenv("TESTING_INFLUXDB_URL") token := os.Getenv("TESTING_INFLUXDB_TOKEN") @@ -237,6 +253,7 @@ func TestQueryDatabaseDoesNotExist(t *testing.T) { } func TestQuerySchema(t *testing.T) { + SkipCheck(t) url := os.Getenv("TESTING_INFLUXDB_URL") token := os.Getenv("TESTING_INFLUXDB_TOKEN") database := os.Getenv("TESTING_INFLUXDB_DATABASE") @@ -253,6 +270,7 @@ func TestQuerySchema(t *testing.T) { } func TestQuerySchemaWithOptions(t *testing.T) { + SkipCheck(t) url := os.Getenv("TESTING_INFLUXDB_URL") token := os.Getenv("TESTING_INFLUXDB_TOKEN") database := os.Getenv("TESTING_INFLUXDB_DATABASE") @@ -269,6 +287,7 @@ func TestQuerySchemaWithOptions(t *testing.T) { } func TestQuerySchemaInfluxQL(t *testing.T) { + SkipCheck(t) url := os.Getenv("TESTING_INFLUXDB_URL") token := os.Getenv("TESTING_INFLUXDB_TOKEN") database := os.Getenv("TESTING_INFLUXDB_DATABASE") @@ -285,6 +304,7 @@ func TestQuerySchemaInfluxQL(t *testing.T) { } func TestWriteError(t *testing.T) { + SkipCheck(t) url := os.Getenv("TESTING_INFLUXDB_URL") token := os.Getenv("TESTING_INFLUXDB_TOKEN") database := os.Getenv("TESTING_INFLUXDB_DATABASE") @@ -308,3 +328,36 @@ func TestWriteError(t *testing.T) { assert.NotNil(t, err.(*influxdb3.ServerError).Headers["X-Influxdb-Build"][0]) } + +func TestEscapedStringValues(t *testing.T) { + SkipCheck(t) + url := os.Getenv("TESTING_INFLUXDB_URL") + token := os.Getenv("TESTING_INFLUXDB_TOKEN") + database := os.Getenv("TESTING_INFLUXDB_DATABASE") + + client, err := influxdb3.New(influxdb3.ClientConfig{ + Host: url, + Token: token, + Database: database, + }) + require.NoError(t, err) + p := influxdb3.NewPoint("escapee", + map[string]string{ + "tag1": "new\nline and space", + "tag2": "escaped\\nline and space", + }, + map[string]interface{}{ + "fVal": 41.3, + "sVal": "greetings\nearthlings", + }, time.Now()) + + err = client.WritePoints(context.Background(), []*influxdb3.Point{p}) + require.NoError(t, err) + qit, err := client.Query(context.Background(), "SELECT * FROM \"escapee\" WHERE time >= now() - interval '1 minute'") + require.NoError(t, err) + for qit.Next() { + assert.EqualValues(t, "greetings\\nearthlings", qit.Value()["sVal"]) + assert.EqualValues(t, "new\\nline and space", qit.Value()["tag1"]) + assert.EqualValues(t, "escaped\\nline and space", qit.Value()["tag2"]) + } +} diff --git a/influxdb3/point.go b/influxdb3/point.go index 4b2d492..4b663d4 100644 --- a/influxdb3/point.go +++ b/influxdb3/point.go @@ -26,6 +26,7 @@ import ( "errors" "fmt" "sort" + "strings" "time" "github.com/influxdata/line-protocol/v2/lineprotocol" @@ -268,10 +269,13 @@ func (p *Point) MarshalBinaryWithDefaultTags(precision lineprotocol.Precision, d } lastKey = tagKey + // N.B. Some customers have requested support for newline chars in tag values (EAR 5476) + // Though this is outside the lineprotocol specification, it was supported in + // previous GO client versions. if value, ok := p.Values.Tags[tagKey]; ok { - enc.AddTag(tagKey, value) + enc.AddTag(tagKey, strings.Replace(value, "\n", "\\n", -1)) } else { - enc.AddTag(tagKey, defaultTags[tagKey]) + enc.AddTag(tagKey, strings.Replace(defaultTags[tagKey], "\n", "\\n", -1)) } } diff --git a/influxdb3/point_test.go b/influxdb3/point_test.go index a3afc13..1e055ab 100644 --- a/influxdb3/point_test.go +++ b/influxdb3/point_test.go @@ -178,6 +178,49 @@ func TestPointDefaultTags(t *testing.T) { assert.EqualValues(t, `test,tag1=a,tag2=b,tag3=f float64=80.1234567 60000000070`+"\n", string(line)) } +func TestPointWithNewlineTags(t *testing.T) { + p := NewPoint("test", + map[string]string{ + "tag1": "new\nline and space", + "tag2": "escaped\\nline and space", + "ambiTag": "ambiguous\ntag", + }, + map[string]interface{}{ + "fVal": 41.3, + }, time.Unix(60, 70)) + + defaultTags := map[string]string{ + "defTag1": "default\nline and space", + "defTag2": "escaped\\ndefault line and space", + "ambiTag": "default\nambiguous\ntag", + } + + line, err := p.MarshalBinary(lineprotocol.Nanosecond) + require.NoError(t, err) + assert.EqualValues(t, + "test,ambiTag=ambiguous\\ntag,tag1=new\\nline\\ and\\ space,tag2=escaped\\nline\\ and\\ space "+ + "fVal=41.3 60000000070\n", + string(line)) + + line, err = p.MarshalBinaryWithDefaultTags(lineprotocol.Nanosecond, defaultTags) + require.NoError(t, err) + assert.EqualValues(t, + "test,ambiTag=ambiguous\\ntag,defTag1=default\\nline\\ and\\ space,defTag2=escaped"+ + "\\ndefault\\ line\\ and\\ space,tag1=new\\nline\\ and\\ space,tag2=escaped\\nline\\ and\\ space "+ + "fVal=41.3 60000000070\n", + string(line)) + + pInvalid := NewPoint("test", map[string]string{ + "tag\nbroken": "tag\nvalue with space", + }, map[string]interface{}{ + "fVal": 17.2, + }, time.Unix(60, 70)) + + _, err = pInvalid.MarshalBinary(lineprotocol.Nanosecond) + require.Error(t, err) + assert.EqualValues(t, "encoding error: invalid tag key \"tag\\nbroken\"", err.Error()) +} + func TestPointFields(t *testing.T) { p := NewPoint("test", nil, map[string]interface{}{ "field1": 10, From a60a6b0d4a4dbbab3e4fe34e5210a5dbc7fd7e76 Mon Sep 17 00:00:00 2001 From: karel rehor Date: Mon, 23 Sep 2024 17:16:39 +0200 Subject: [PATCH 2/3] chore: fix linter issue --- influxdb3/point.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/influxdb3/point.go b/influxdb3/point.go index 4b663d4..9dbae70 100644 --- a/influxdb3/point.go +++ b/influxdb3/point.go @@ -273,9 +273,9 @@ func (p *Point) MarshalBinaryWithDefaultTags(precision lineprotocol.Precision, d // Though this is outside the lineprotocol specification, it was supported in // previous GO client versions. if value, ok := p.Values.Tags[tagKey]; ok { - enc.AddTag(tagKey, strings.Replace(value, "\n", "\\n", -1)) + enc.AddTag(tagKey, strings.ReplaceAll(value, "\n", "\\n")) } else { - enc.AddTag(tagKey, strings.Replace(defaultTags[tagKey], "\n", "\\n", -1)) + enc.AddTag(tagKey, strings.ReplaceAll(defaultTags[tagKey], "\n", "\\n")) } } From e6dc12ca3edf764195869ffc8a9f3587cd20b26d Mon Sep 17 00:00:00 2001 From: karel rehor Date: Mon, 23 Sep 2024 17:34:57 +0200 Subject: [PATCH 3/3] docs: update CHANGELOG.md --- CHANGELOG.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index d43b74e..0f955c1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,7 @@ ## 0.11.0 [unreleased] +1. [#105](https://github.com/InfluxCommunity/influxdb3-go/pull/105): Support newlines in tag values. + ## 0.10.0 [2024-09-13] ### Features