diff --git a/CHANGELOG.md b/CHANGELOG.md index fc5f0ffde44..9c54ed50b1c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,8 @@ ## main / unreleased +* [FEATURE] Distributor: Add experimental Influx handler. #10153 + ### Grafana Mimir * [CHANGE] Distributor: OTLP and push handler replace all non-UTF8 characters with the unicode replacement character `\uFFFD` in error messages before propagating them. #10236 diff --git a/docs/sources/mimir/configure/about-versioning.md b/docs/sources/mimir/configure/about-versioning.md index 3eb5a0de095..89f1cf0b17d 100644 --- a/docs/sources/mimir/configure/about-versioning.md +++ b/docs/sources/mimir/configure/about-versioning.md @@ -74,6 +74,10 @@ The following features are currently experimental: - Cache rule group contents. - `-ruler-storage.cache.rule-group-enabled` - Distributor + - Influx ingestion + - `/api/v1/influx/push` endpoint + - `-distributor.influx-endpoint-enabled` + - `-distributor.max-influx-request-size` - Metrics relabeling - `-distributor.metric-relabeling-enabled` - Using status code 529 instead of 429 upon rate limit exhaustion. diff --git a/go.mod b/go.mod index f2bd093067b..a1107bf14cb 100644 --- a/go.mod +++ b/go.mod @@ -25,6 +25,7 @@ require ( github.com/grafana/dskit v0.0.0-20250106205746-3702098cbd0c github.com/grafana/e2e v0.1.2-0.20240118170847-db90b84177fc github.com/hashicorp/golang-lru v1.0.2 // indirect + github.com/influxdata/influxdb/v2 v2.7.11 github.com/json-iterator/go v1.1.12 github.com/minio/minio-go/v7 v7.0.82 github.com/mitchellh/go-wordwrap v1.0.1 diff --git a/go.sum b/go.sum index 918a1ec5fb7..cc3d6079f68 100644 --- a/go.sum +++ b/go.sum @@ -1394,6 +1394,8 @@ github.com/imdario/mergo v0.3.16 h1:wwQJbIsHYGMUyLSPrEq1CT16AhnhNJQ51+4fdHUnCl4= github.com/imdario/mergo v0.3.16/go.mod h1:WBLT9ZmE3lPoWsEzCh9LPo3TiwVN+ZKEjmz+hD27ysY= github.com/inconshreveable/mousetrap v1.1.0 h1:wN+x4NVGpMsO7ErUn/mUI3vEoE6Jt13X2s0bqwp9tc8= github.com/inconshreveable/mousetrap v1.1.0/go.mod h1:vpF70FUmC8bwa3OWnCshd2FqLfsEA9PFc4w1p2J65bw= +github.com/influxdata/influxdb/v2 v2.7.11 h1:qs9qr5hsuFrlTiBtr5lBrALbQ2dHAanf21fBLlLpKww= +github.com/influxdata/influxdb/v2 v2.7.11/go.mod h1:zNOyzQy6WbfvGi1CK1aJ2W8khOq9+Gdsj8yLj8bHHqg= github.com/ionos-cloud/sdk-go/v6 v6.3.0 h1:/lTieTH9Mo/CWm3cTlFLnK10jgxjUGkAqRffGqvPteY= github.com/ionos-cloud/sdk-go/v6 v6.3.0/go.mod h1:SXrO9OGyWjd2rZhAhEpdYN6VUAODzzqRdqA9BCviQtI= github.com/jessevdk/go-flags v1.5.0 h1:1jKYvbxEjfUl0fmqTCOfonvskHHXMjBySTLW4y9LFvc= @@ -1553,8 +1555,8 @@ github.com/onsi/gomega v1.24.0 h1:+0glovB9Jd6z3VR+ScSwQqXVTIfJcGA9UBM8yzQxhqg= github.com/onsi/gomega v1.24.0/go.mod h1:Z/NWtiqwBrwUt4/2loMmHL63EDLnYHmVbuBpDr2vQAg= github.com/opencontainers/go-digest v1.0.0 h1:apOUWs51W5PlhuyGyz9FCeeBIOUDA/6nW8Oi/yOhh5U= github.com/opencontainers/go-digest v1.0.0/go.mod h1:0JzlMkj0TRzQZfJkVvzbP0HBR3IKzErnv2BNG4W4MAM= -github.com/opencontainers/image-spec v1.0.2 h1:9yCKha/T5XdGtO0q9Q9a6T5NUCsTn/DrBg0D7ufOcFM= -github.com/opencontainers/image-spec v1.0.2/go.mod h1:BtxoFyWECRxE4U/7sNtV5W15zMzWCbyJoFRP3s7yZA0= +github.com/opencontainers/image-spec v1.1.0-rc2 h1:2zx/Stx4Wc5pIPDvIxHXvXtQFW/7XWJGmnM7r3wg034= +github.com/opencontainers/image-spec v1.1.0-rc2/go.mod h1:3OVijpioIKYWTqjiG0zfF6wvoJ4fAXGbjdZuI2NgsRQ= github.com/opentracing/opentracing-go v1.1.0/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o= github.com/opentracing/opentracing-go v1.2.1-0.20220228012449-10b1cf09e00b h1:FfH+VrHHk6Lxt9HdVS0PXzSXFyS2NbZKXv33FYPol0A= github.com/opentracing/opentracing-go v1.2.1-0.20220228012449-10b1cf09e00b/go.mod h1:AC62GU6hc0BrNm+9RK9VSiwa/EUe1bkIeFORAMcHvJU= diff --git a/integration/e2emimir/client.go b/integration/e2emimir/client.go index 6055696967e..32f634de088 100644 --- a/integration/e2emimir/client.go +++ b/integration/e2emimir/client.go @@ -7,6 +7,7 @@ package e2emimir import ( "bytes" + "compress/gzip" "context" "encoding/json" "errors" @@ -35,6 +36,7 @@ import ( yaml "gopkg.in/yaml.v3" "github.com/grafana/mimir/pkg/alertmanager" + mimirapi "github.com/grafana/mimir/pkg/api" "github.com/grafana/mimir/pkg/cardinality" "github.com/grafana/mimir/pkg/distributor" "github.com/grafana/mimir/pkg/frontend/querymiddleware" @@ -186,6 +188,43 @@ func (c *Client) Push(timeseries []prompb.TimeSeries) (*http.Response, error) { return res, nil } +// PushInflux the input timeseries to the remote endpoint in Influx format. +func (c *Client) PushInflux(timeseries []prompb.TimeSeries) (*http.Response, error) { + // Create write request. + data := distributor.TimeseriesToInfluxRequest(timeseries) + + // Compress it. + var buf bytes.Buffer + + gzipData := gzip.NewWriter(&buf) + if _, err := gzipData.Write([]byte(data)); err != nil { + return nil, err + } + if err := gzipData.Close(); err != nil { + return nil, err + } + + // Create HTTP request + req, err := http.NewRequest("POST", fmt.Sprintf("http://%s%s", c.distributorAddress, mimirapi.InfluxPushEndpoint), &buf) + if err != nil { + return nil, err + } + req.Header.Set("Content-Encoding", "gzip") + req.Header.Set("X-Scope-OrgID", c.orgID) + + ctx, cancel := context.WithTimeout(context.Background(), c.timeout) + defer cancel() + + // Execute HTTP request + res, err := c.httpClient.Do(req.WithContext(ctx)) + if err != nil { + return nil, err + } + + defer res.Body.Close() + return res, nil +} + // PushOTLP the input timeseries to the remote endpoint in OTLP format func (c *Client) PushOTLP(timeseries []prompb.TimeSeries, metadata []mimirpb.MetricMetadata) (*http.Response, error) { // Create write request diff --git a/integration/influx_ingestion_test.go b/integration/influx_ingestion_test.go new file mode 100644 index 00000000000..bb9d929d63d --- /dev/null +++ b/integration/influx_ingestion_test.go @@ -0,0 +1,100 @@ +// SPDX-License-Identifier: AGPL-3.0-only +//go:build requires_docker + +package integration + +import ( + "math" + "testing" + "time" + + "github.com/grafana/e2e" + e2edb "github.com/grafana/e2e/db" + "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/model/labels" + "github.com/prometheus/prometheus/prompb" + v1 "github.com/prometheus/prometheus/web/api/v1" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/grafana/mimir/integration/e2emimir" +) + +func TestInfluxIngestion(t *testing.T) { + t.Helper() + + s, err := e2e.NewScenario(networkName) + require.NoError(t, err) + defer s.Close() + + // Start dependencies. + minio := e2edb.NewMinio(9000, blocksBucketName) + require.NoError(t, s.StartAndWaitReady(minio)) + + // Start Mimir components. + require.NoError(t, copyFileToSharedDir(s, "docs/configurations/single-process-config-blocks.yaml", mimirConfigFile)) + + // Start Mimir in single binary mode, reading the config from file and overwriting + // the backend config to make it work with Minio. + flags := mergeFlags( + DefaultSingleBinaryFlags(), + BlocksStorageFlags(), + BlocksStorageS3Flags(), + map[string]string{ + "-distributor.influx-endpoint-enabled": "true", + }, + ) + + mimir := e2emimir.NewSingleBinary("mimir-1", flags, e2emimir.WithConfigFile(mimirConfigFile), e2emimir.WithPorts(9009, 9095)) + require.NoError(t, s.StartAndWaitReady(mimir)) + + c, err := e2emimir.NewClient(mimir.HTTPEndpoint(), mimir.HTTPEndpoint(), "", "", "user-1") + require.NoError(t, err) + + // Push some series to Mimir. + now := time.Now() + + series, expectedVector, expectedMatrix := generateFloatSeries("series_f1", now, prompb.Label{Name: "foo", Value: "bar"}) + // Fix up the expectation as Influx values seem to be rounded to millionths + for _, s := range expectedVector { + s.Metric[model.LabelName("__mimir_source__")] = model.LabelValue("influx") + s.Value = model.SampleValue(math.Round(float64(s.Value)*1000000) / 1000000.0) + } + // Fix up the expectation as Influx values seem to be rounded to millionths + for im, s := range expectedMatrix { + for iv, v := range s.Values { + expectedMatrix[im].Values[iv].Value = model.SampleValue(math.Round(float64(v.Value)*1000000) / 1000000.0) + } + } + + res, err := c.PushInflux(series) + require.NoError(t, err) + require.Equal(t, 204, res.StatusCode) + + // Check metric to track Influx requests + require.NoError(t, mimir.WaitSumMetricsWithOptions(e2e.Equals(1), []string{"cortex_distributor_influx_requests_total"}, e2e.WithLabelMatchers( + labels.MustNewMatcher(labels.MatchEqual, "user", "user-1")))) + + // Query the series. + result, err := c.Query("series_f1", now) + require.NoError(t, err) + require.Equal(t, model.ValVector, result.Type()) + assert.Equal(t, expectedVector, result.(model.Vector)) + + labelValues, err := c.LabelValues("foo", v1.MinTime, v1.MaxTime, nil) + require.NoError(t, err) + require.Equal(t, model.LabelValues{"bar"}, labelValues) + + labelNames, err := c.LabelNames(v1.MinTime, v1.MaxTime, nil) + require.NoError(t, err) + require.Equal(t, []string{"__mimir_source__", "__name__", "foo"}, labelNames) + + rangeResult, err := c.QueryRange("series_f1", now.Add(-15*time.Minute), now, 15*time.Second) + require.NoError(t, err) + require.Equal(t, model.ValMatrix, rangeResult.Type()) + require.Equal(t, expectedMatrix, rangeResult.(model.Matrix)) + + // No metadata to query, but we do the query anyway. + _, err = c.GetPrometheusMetadata() + require.NoError(t, err) +} diff --git a/pkg/api/api.go b/pkg/api/api.go index bd8da8e6e80..979627b7e1d 100644 --- a/pkg/api/api.go +++ b/pkg/api/api.go @@ -256,6 +256,7 @@ func (a *API) RegisterRuntimeConfig(runtimeConfigHandler http.HandlerFunc, userL const PrometheusPushEndpoint = "/api/v1/push" const OTLPPushEndpoint = "/otlp/v1/metrics" +const InfluxPushEndpoint = "/api/v1/influx/push" // RegisterDistributor registers the endpoints associated with the distributor. func (a *API) RegisterDistributor(d *distributor.Distributor, pushConfig distributor.Config, reg prometheus.Registerer, limits *validation.Overrides) { @@ -265,6 +266,14 @@ func (a *API) RegisterDistributor(d *distributor.Distributor, pushConfig distrib pushConfig.MaxRecvMsgSize, d.RequestBufferPool, a.sourceIPs, a.cfg.SkipLabelNameValidationHeader, a.cfg.SkipLabelCountValidationHeader, limits, pushConfig.RetryConfig, d.PushWithMiddlewares, d.PushMetrics, a.logger, ), true, false, "POST") + + if pushConfig.EnableInfluxEndpoint { + // The Influx Push endpoint is experimental. + a.RegisterRoute(InfluxPushEndpoint, distributor.InfluxHandler( + pushConfig.MaxInfluxRequestSize, d.RequestBufferPool, a.sourceIPs, pushConfig.RetryConfig, d.PushWithMiddlewares, d.PushMetrics, a.logger, + ), true, false, "POST") + } + a.RegisterRoute(OTLPPushEndpoint, distributor.OTLPHandler( pushConfig.MaxOTLPRequestSize, d.RequestBufferPool, a.sourceIPs, limits, pushConfig.OTelResourceAttributePromotionConfig, pushConfig.RetryConfig, pushConfig.EnableStartTimeQuietZero, d.PushWithMiddlewares, d.PushMetrics, reg, a.logger, diff --git a/pkg/distributor/distributor.go b/pkg/distributor/distributor.go index 4e55b298ef7..1fbf7c90cd4 100644 --- a/pkg/distributor/distributor.go +++ b/pkg/distributor/distributor.go @@ -87,7 +87,8 @@ const ( // metaLabelTenantID is the name of the metric_relabel_configs label with tenant ID. metaLabelTenantID = model.MetaLabelPrefix + "tenant_id" - maxOTLPRequestSizeFlag = "distributor.max-otlp-request-size" + maxOTLPRequestSizeFlag = "distributor.max-otlp-request-size" + maxInfluxRequestSizeFlag = "distributor.max-influx-request-size" instanceIngestionRateTickInterval = time.Second @@ -208,6 +209,7 @@ type Config struct { MaxRecvMsgSize int `yaml:"max_recv_msg_size" category:"advanced"` MaxOTLPRequestSize int `yaml:"max_otlp_request_size" category:"experimental"` + MaxInfluxRequestSize int `yaml:"max_influx_request_size" category:"experimental" doc:"hidden"` MaxRequestPoolBufferSize int `yaml:"max_request_pool_buffer_size" category:"experimental"` RemoteTimeout time.Duration `yaml:"remote_timeout" category:"advanced"` @@ -250,6 +252,9 @@ type Config struct { // OTelResourceAttributePromotionConfig allows for specializing OTel resource attribute promotion. OTelResourceAttributePromotionConfig OTelResourceAttributePromotionConfig `yaml:"-"` + // Influx endpoint disabled by default + EnableInfluxEndpoint bool `yaml:"influx_endpoint_enabled" category:"experimental" doc:"hidden"` + // Change the implementation of OTel startTime from a real zero to a special NaN value. EnableStartTimeQuietZero bool `yaml:"start_time_quiet_zero" category:"advanced" doc:"hidden"` } @@ -266,9 +271,11 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet, logger log.Logger) { f.IntVar(&cfg.MaxRecvMsgSize, "distributor.max-recv-msg-size", 100<<20, "Max message size in bytes that the distributors will accept for incoming push requests to the remote write API. If exceeded, the request will be rejected.") f.IntVar(&cfg.MaxOTLPRequestSize, maxOTLPRequestSizeFlag, 100<<20, "Maximum OTLP request size in bytes that the distributors accept. Requests exceeding this limit are rejected.") + f.IntVar(&cfg.MaxInfluxRequestSize, maxInfluxRequestSizeFlag, 100<<20, "Maximum Influx request size in bytes that the distributors accept. Requests exceeding this limit are rejected.") f.IntVar(&cfg.MaxRequestPoolBufferSize, "distributor.max-request-pool-buffer-size", 0, "Max size of the pooled buffers used for marshaling write requests. If 0, no max size is enforced.") f.DurationVar(&cfg.RemoteTimeout, "distributor.remote-timeout", 2*time.Second, "Timeout for downstream ingesters.") f.BoolVar(&cfg.WriteRequestsBufferPoolingEnabled, "distributor.write-requests-buffer-pooling-enabled", true, "Enable pooling of buffers used for marshaling write requests.") + f.BoolVar(&cfg.EnableInfluxEndpoint, "distributor.influx-endpoint-enabled", false, "Enable Influx endpoint.") f.IntVar(&cfg.ReusableIngesterPushWorkers, "distributor.reusable-ingester-push-workers", 2000, "Number of pre-allocated workers used to forward push requests to the ingesters. If 0, no workers will be used and a new goroutine will be spawned for each ingester push request. If not enough workers available, new goroutine will be spawned. (Note: this is a performance optimization, not a limiting feature.)") f.BoolVar(&cfg.EnableStartTimeQuietZero, "distributor.otel-start-time-quiet-zero", false, "Change the implementation of OTel startTime from a real zero to a special NaN value.") @@ -294,12 +301,27 @@ const ( ) type PushMetrics struct { + // Influx metrics. + influxRequestCounter *prometheus.CounterVec + influxUncompressedBodySize *prometheus.HistogramVec + // OTLP metrics. otlpRequestCounter *prometheus.CounterVec uncompressedBodySize *prometheus.HistogramVec } func newPushMetrics(reg prometheus.Registerer) *PushMetrics { return &PushMetrics{ + influxRequestCounter: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ + Name: "cortex_distributor_influx_requests_total", + Help: "The total number of Influx requests that have come in to the distributor.", + }, []string{"user"}), + influxUncompressedBodySize: promauto.With(reg).NewHistogramVec(prometheus.HistogramOpts{ + Name: "cortex_distributor_influx_uncompressed_request_body_size_bytes", + Help: "Size of uncompressed request body in bytes.", + NativeHistogramBucketFactor: 1.1, + NativeHistogramMinResetDuration: 1 * time.Hour, + NativeHistogramMaxBucketNumber: 100, + }, []string{"user"}), otlpRequestCounter: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ Name: "cortex_distributor_otlp_requests_total", Help: "The total number of OTLP requests that have come in to the distributor.", @@ -314,6 +336,18 @@ func newPushMetrics(reg prometheus.Registerer) *PushMetrics { } } +func (m *PushMetrics) IncInfluxRequest(user string) { + if m != nil { + m.influxRequestCounter.WithLabelValues(user).Inc() + } +} + +func (m *PushMetrics) ObserveInfluxUncompressedBodySize(user string, size float64) { + if m != nil { + m.influxUncompressedBodySize.WithLabelValues(user).Observe(size) + } +} + func (m *PushMetrics) IncOTLPRequest(user string) { if m != nil { m.otlpRequestCounter.WithLabelValues(user).Inc() @@ -327,6 +361,8 @@ func (m *PushMetrics) ObserveUncompressedBodySize(user string, size float64) { } func (m *PushMetrics) deleteUserMetrics(user string) { + m.influxRequestCounter.DeleteLabelValues(user) + m.influxUncompressedBodySize.DeleteLabelValues(user) m.otlpRequestCounter.DeleteLabelValues(user) m.uncompressedBodySize.DeleteLabelValues(user) } diff --git a/pkg/distributor/influx.go b/pkg/distributor/influx.go new file mode 100644 index 00000000000..0d3ba995b5c --- /dev/null +++ b/pkg/distributor/influx.go @@ -0,0 +1,194 @@ +// SPDX-License-Identifier: AGPL-3.0-only + +package distributor + +import ( + "bytes" + "context" + "errors" + "fmt" + "net/http" + "strings" + "time" + + "github.com/go-kit/log" + "github.com/go-kit/log/level" + "github.com/grafana/dskit/grpcutil" + "github.com/grafana/dskit/httpgrpc" + "github.com/grafana/dskit/middleware" + "github.com/grafana/dskit/tenant" + influxio "github.com/influxdata/influxdb/v2/kit/io" + "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/prompb" + + "github.com/grafana/mimir/pkg/distributor/influxpush" + "github.com/grafana/mimir/pkg/mimirpb" + "github.com/grafana/mimir/pkg/util" + utillog "github.com/grafana/mimir/pkg/util/log" + "github.com/grafana/mimir/pkg/util/spanlogger" +) + +func influxRequestParser(ctx context.Context, r *http.Request, maxSize int, _ *util.RequestBuffers, req *mimirpb.PreallocWriteRequest, logger log.Logger) (int, error) { + spanLogger, ctx := spanlogger.NewWithLogger(ctx, logger, "Distributor.InfluxHandler.decodeAndConvert") + defer spanLogger.Span.Finish() + + spanLogger.SetTag("content_type", r.Header.Get("Content-Type")) + spanLogger.SetTag("content_encoding", r.Header.Get("Content-Encoding")) + spanLogger.SetTag("content_length", r.ContentLength) + + pts, bytesRead, err := influxpush.ParseInfluxLineReader(ctx, r, maxSize) + level.Debug(spanLogger).Log("msg", "decodeAndConvert complete", "bytesRead", bytesRead, "metric_count", len(pts), "err", err) + if err != nil { + level.Error(logger).Log("msg", "failed to parse Influx push request", "err", err) + return bytesRead, err + } + + req.Timeseries = pts + return bytesRead, nil +} + +// InfluxHandler is a http.Handler which accepts Influx Line protocol and converts it to WriteRequests. +func InfluxHandler( + maxRecvMsgSize int, + requestBufferPool util.Pool, + sourceIPs *middleware.SourceIPExtractor, + retryCfg RetryConfig, + push PushFunc, + pushMetrics *PushMetrics, + logger log.Logger, +) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + ctx := r.Context() + logger := utillog.WithContext(ctx, logger) + if sourceIPs != nil { + source := sourceIPs.Get(r) + if source != "" { + logger = utillog.WithSourceIPs(source, logger) + } + } + + tenantID, err := tenant.TenantID(ctx) + if err != nil { + level.Warn(logger).Log("msg", "unable to obtain tenantID", "err", err) + return + } + + pushMetrics.IncInfluxRequest(tenantID) + + var bytesRead int + + supplier := func() (*mimirpb.WriteRequest, func(), error) { + rb := util.NewRequestBuffers(requestBufferPool) + var req mimirpb.PreallocWriteRequest + + if bytesRead, err = influxRequestParser(ctx, r, maxRecvMsgSize, rb, &req, logger); err != nil { + err = httpgrpc.Error(http.StatusBadRequest, err.Error()) + rb.CleanUp() + return nil, nil, err + } + + cleanup := func() { + mimirpb.ReuseSlice(req.Timeseries) + rb.CleanUp() + } + return &req.WriteRequest, cleanup, nil + } + + pushMetrics.ObserveInfluxUncompressedBodySize(tenantID, float64(bytesRead)) + + req := newRequest(supplier) + // https://docs.influxdata.com/influxdb/cloud/api/v2/#tag/Response-codes + if err := push(ctx, req); err != nil { + if errors.Is(err, context.Canceled) { + level.Warn(logger).Log("msg", "push request canceled", "err", err) + w.WriteHeader(statusClientClosedRequest) + return + } + if errors.Is(err, influxio.ErrReadLimitExceeded) { + // TODO(alexg): One thing we have seen in the past is that telegraf clients send a batch of data + // if it is too big they should respond to the 413 below, but if a client doesn't understand this + // it just sends the next batch that is even bigger. In the past this has had to be dealt with by + // adding rate limits to drop the payloads. + level.Warn(logger).Log("msg", "request too large", "err", err, "bytesRead", bytesRead, "maxMsgSize", maxRecvMsgSize) + w.WriteHeader(http.StatusRequestEntityTooLarge) + return + } + // From: https://github.com/grafana/influx2cortex/blob/main/pkg/influx/errors.go + + var httpCode int + var errorMsg string + + if st, ok := grpcutil.ErrorToStatus(err); ok { + // This code is needed for a correct handling of errors returned by the supplier function. + // These errors are created by using the httpgrpc package. + httpCode = int(st.Code()) + errorMsg = st.Message() + } else { + var distributorErr Error + errorMsg = err.Error() + if errors.Is(err, context.DeadlineExceeded) || !errors.As(err, &distributorErr) { + httpCode = http.StatusServiceUnavailable + } else { + httpCode = errorCauseToHTTPStatusCode(distributorErr.Cause(), false) + } + } + if httpCode != 202 { + // This error message is consistent with error message in Prometheus remote-write handler, and ingester's ingest-storage pushToStorage method. + msgs := []interface{}{"msg", "detected an error while ingesting Influx metrics request (the request may have been partially ingested)", "httpCode", httpCode, "err", err} + if httpCode/100 == 4 { + // This tag makes the error message visible for our Grafana Cloud customers. + msgs = append(msgs, "insight", true) + } + level.Error(logger).Log(msgs...) + } + if httpCode < 500 { + level.Info(logger).Log("msg", errorMsg, "response_code", httpCode, "err", err) + } else { + level.Warn(logger).Log("msg", errorMsg, "response_code", httpCode, "err", err) + } + addHeaders(w, err, r, httpCode, retryCfg) + w.WriteHeader(httpCode) + } else { + w.WriteHeader(http.StatusNoContent) // Needed for Telegraf, otherwise it tries to marshal JSON and considers the write a failure. + } + }) +} + +// TimeseriesToInfluxRequest is used in tests. +func TimeseriesToInfluxRequest(timeseries []prompb.TimeSeries) string { + var retBuffer bytes.Buffer + + for _, ts := range timeseries { + name := "" + others := make([]string, 0, 10) + + for _, l := range ts.Labels { + if l.Name == model.MetricNameLabel { + name = l.Value + continue + } + if l.Name != "__mimir_source__" { + others = append(others, l.Name+"="+l.Value) + } + } + + // We are going to assume that the __name__ value is the measurement name + // and does not have a field name suffix (e.g. measurement,t1=v1 f1=1.5" -> "measurement_f1") + // as we can't work out whether it was "measurement_f1 value=3" or "measurement f1=3" from the + // created series. + line := name + if len(others) > 0 { + line += "," + strings.Join(others, ",") + } + + if len(ts.Samples) > 0 { + // We only take the first sample + // data: "measurement,t1=v1 value=1.5 1465839830100400200", + line += fmt.Sprintf(" value=%f %d", ts.Samples[0].Value, ts.Samples[0].Timestamp*time.Millisecond.Nanoseconds()) + } + retBuffer.WriteString(line) + retBuffer.WriteString("\n") + } + + return retBuffer.String() +} diff --git a/pkg/distributor/influx_test.go b/pkg/distributor/influx_test.go new file mode 100644 index 00000000000..cbe18530d3e --- /dev/null +++ b/pkg/distributor/influx_test.go @@ -0,0 +1,151 @@ +// SPDX-License-Identifier: AGPL-3.0-only + +package distributor + +import ( + "bytes" + "context" + "net/http" + "net/http/httptest" + "testing" + + "github.com/go-kit/log" + "github.com/grafana/dskit/user" + influxio "github.com/influxdata/influxdb/v2/kit/io" + "github.com/stretchr/testify/assert" + + "github.com/grafana/mimir/pkg/mimirpb" +) + +func TestInfluxHandleSeriesPush(t *testing.T) { + defaultExpectedWriteRequest := &mimirpb.WriteRequest{ + Timeseries: []mimirpb.PreallocTimeseries{ + { + TimeSeries: &mimirpb.TimeSeries{ + Labels: []mimirpb.LabelAdapter{ + {Name: "__mimir_source__", Value: "influx"}, + {Name: "__name__", Value: "measurement_f1"}, + {Name: "t1", Value: "v1"}, + }, + Samples: []mimirpb.Sample{ + {Value: 2, TimestampMs: 1465839830100}, + }, + }, + }, + }, + } + + tests := []struct { + name string + url string + data string + expectedCode int + push func(t *testing.T) PushFunc + maxRequestSizeBytes int + }{ + { + name: "POST", + url: "/write", + data: "measurement,t1=v1 f1=2 1465839830100400200", + expectedCode: http.StatusNoContent, + push: func(t *testing.T) PushFunc { + return func(_ context.Context, pushReq *Request) error { + req, err := pushReq.WriteRequest() + assert.Equal(t, defaultExpectedWriteRequest, req) + assert.Nil(t, err) + return err + } + }, + maxRequestSizeBytes: 1 << 20, + }, + { + name: "POST with precision", + url: "/write?precision=ns", + data: "measurement,t1=v1 f1=2 1465839830100400200", + expectedCode: http.StatusNoContent, + push: func(t *testing.T) PushFunc { + return func(_ context.Context, pushReq *Request) error { + req, err := pushReq.WriteRequest() + assert.Equal(t, defaultExpectedWriteRequest, req) + assert.Nil(t, err) + return err + } + }, + maxRequestSizeBytes: 1 << 20, + }, + { + name: "invalid parsing error handling", + url: "/write", + data: "measurement,t1=v1 f1= 1465839830100400200", + expectedCode: http.StatusBadRequest, + push: func(t *testing.T) PushFunc { + return func(_ context.Context, pushReq *Request) error { + req, err := pushReq.WriteRequest() + assert.Nil(t, req) + assert.ErrorContains(t, err, "unable to parse") + assert.ErrorContains(t, err, "missing field value") + return err + } + }, + maxRequestSizeBytes: 1 << 20, + }, + { + name: "invalid query params", + url: "/write?precision=?", + data: "measurement,t1=v1 f1=2 1465839830100400200", + expectedCode: http.StatusBadRequest, + push: func(t *testing.T) PushFunc { + // return func(ctx context.Context, req *mimirpb.WriteRequest) error { + return func(_ context.Context, pushReq *Request) error { + req, err := pushReq.WriteRequest() + assert.Nil(t, req) + assert.ErrorContains(t, err, "precision supplied is not valid") + return err + } + }, + maxRequestSizeBytes: 1 << 20, + }, + { + name: "internal server error", + url: "/write", + data: "measurement,t1=v1 f1=2 1465839830100400200", + expectedCode: http.StatusServiceUnavailable, + push: func(t *testing.T) PushFunc { + return func(_ context.Context, _ *Request) error { + assert.Error(t, context.DeadlineExceeded) + return context.DeadlineExceeded + } + }, + maxRequestSizeBytes: 1 << 20, + }, + { + name: "max batch size violated", + url: "/write", + data: "measurement,t1=v1 f1=2 0123456789", + expectedCode: http.StatusBadRequest, + push: func(t *testing.T) PushFunc { + return func(_ context.Context, pushReq *Request) error { + req, err := pushReq.WriteRequest() + assert.Nil(t, req) + assert.Error(t, influxio.ErrReadLimitExceeded) + return err + } + }, + maxRequestSizeBytes: 10, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + handler := InfluxHandler(tt.maxRequestSizeBytes, nil, nil, RetryConfig{}, tt.push(t), nil, log.NewNopLogger()) + req := httptest.NewRequest("POST", tt.url, bytes.NewReader([]byte(tt.data))) + const tenantID = "test" + req.Header.Set("X-Scope-OrgID", tenantID) + ctx := user.InjectOrgID(context.Background(), tenantID) + req = req.WithContext(ctx) + rec := httptest.NewRecorder() + handler.ServeHTTP(rec, req) + assert.Equal(t, tt.expectedCode, rec.Code) + }) + } +} diff --git a/pkg/distributor/influxpush/parser.go b/pkg/distributor/influxpush/parser.go new file mode 100644 index 00000000000..96734086ee8 --- /dev/null +++ b/pkg/distributor/influxpush/parser.go @@ -0,0 +1,188 @@ +// SPDX-License-Identifier: AGPL-3.0-only + +package influxpush + +import ( + "compress/gzip" + "context" + "fmt" + "io" + "net/http" + "sort" + "time" + "unsafe" + + io2 "github.com/influxdata/influxdb/v2/kit/io" + "github.com/influxdata/influxdb/v2/models" + "github.com/prometheus/prometheus/model/labels" + + "github.com/grafana/mimir/pkg/mimirpb" + "github.com/grafana/mimir/pkg/util" +) + +const internalLabel = "__mimir_source__" + +// ParseInfluxLineReader parses a Influx Line Protocol request from an io.Reader. +func ParseInfluxLineReader(_ context.Context, r *http.Request, maxSize int) ([]mimirpb.PreallocTimeseries, int, error) { + qp := r.URL.Query() + precision := qp.Get("precision") + if precision == "" { + precision = "ns" + } + + if !models.ValidPrecision(precision) { + return nil, 0, fmt.Errorf("precision supplied is not valid: %s", precision) + } + + encoding := r.Header.Get("Content-Encoding") + reader, err := batchReadCloser(r.Body, encoding, int64(maxSize)) + if err != nil { + return nil, 0, fmt.Errorf("gzip compression error: %w", err) + } + data, err := io.ReadAll(reader) + dataLen := len(data) // In case if something is read despite an error. + if err != nil { + return nil, dataLen, fmt.Errorf("can't read body: %s", err) + } + + err = reader.Close() + if err != nil { + return nil, dataLen, fmt.Errorf("problem reading body: %s", err) + } + + points, err := models.ParsePointsWithPrecision(data, time.Now().UTC(), precision) + if err != nil { + return nil, dataLen, fmt.Errorf("can't parse points: %s", err) + } + ts, err := writeRequestFromInfluxPoints(points) + return ts, dataLen, err +} + +func writeRequestFromInfluxPoints(points []models.Point) ([]mimirpb.PreallocTimeseries, error) { + // Technically the same series should not be repeated. We should put all the samples for + // a series in single client.Timeseries. Having said that doing it is not very optimal and the + // occurrence of multiple timestamps for the same series is rare. Only reason I see it happening is + // for backfilling and this is not the API for that. Keeping that in mind, we are going to create a new + // client.Timeseries for each sample. + + returnTs := mimirpb.PreallocTimeseriesSliceFromPool()[:0] + if cap(returnTs) < len(points) { + returnTs = make([]mimirpb.PreallocTimeseries, 0, len(points)) + } + for _, pt := range points { + var err error + returnTs, err = influxPointToTimeseries(pt, returnTs) + if err != nil { + return nil, err + } + } + + return returnTs, nil +} + +// Points to Prometheus is heavily inspired from https://github.com/prometheus/influxdb_exporter/blob/a1dc16ad596a990d8854545ea39a57a99a3c7c43/main.go#L148-L211 +func influxPointToTimeseries(pt models.Point, returnTs []mimirpb.PreallocTimeseries) ([]mimirpb.PreallocTimeseries, error) { + + fields, err := pt.Fields() + if err != nil { + return returnTs, fmt.Errorf("can't get fields from point: %w", err) + } + for field, v := range fields { + var value float64 + switch v := v.(type) { + case float64: + value = v + case int64: + value = float64(v) + case bool: + if v { + value = 1 + } else { + value = 0 + } + default: + continue + } + + name := string(replaceInvalidChars(pt.Name())) + if field != "value" { + // If the field name is not "value" then we append it to the name, fixing chars as we go + name += "_" + string(replaceInvalidChars([]byte(field))) + } + + tags := pt.Tags() + lbls := make([]mimirpb.LabelAdapter, 0, len(tags)+2) // An additional one for __name__, and one for internal label + lbls = append(lbls, mimirpb.LabelAdapter{ + Name: labels.MetricName, + Value: name, + }) + lbls = append(lbls, mimirpb.LabelAdapter{ + Name: internalLabel, // An internal label for tracking active series + Value: "influx", + }) + for _, tag := range tags { + if string(tag.Key) == "__name__" || string(tag.Key) == internalLabel { + continue + } + lbls = append(lbls, mimirpb.LabelAdapter{ + Name: string(replaceInvalidChars(tag.Key)), + Value: yoloString(tag.Value), + }) + } + sort.Slice(lbls, func(i, j int) bool { + return lbls[i].Name < lbls[j].Name + }) + + ts := mimirpb.TimeSeries{ + Labels: lbls, + Samples: []mimirpb.Sample{{ + TimestampMs: util.TimeToMillis(pt.Time()), + Value: value, + }}, + } + + returnTs = append(returnTs, mimirpb.PreallocTimeseries{TimeSeries: &ts}) + } + + return returnTs, nil +} + +// analog of invalidChars = regexp.MustCompile("[^a-zA-Z0-9_]") +func replaceInvalidChars(bSlice []byte) []byte { + for bIndex, b := range bSlice { + if !((b >= 'a' && b <= 'z') || // a-z + (b >= 'A' && b <= 'Z') || // A-Z + (b >= '0' && b <= '9') || // 0-9 + b == '_') { // _ + bSlice[bIndex] = '_' + } + } + + // prefix with _ if first char is 0-9 + if len(bSlice) > 0 && bSlice[0] >= '0' && bSlice[0] <= '9' { + bSlice = append([]byte{'_'}, bSlice...) + } + return bSlice +} + +// batchReadCloser (potentially) wraps an io.ReadCloser in Gzip +// decompression and limits the reading to a specific number of bytes. +func batchReadCloser(rc io.ReadCloser, encoding string, maxBatchSizeBytes int64) (io.ReadCloser, error) { + switch encoding { + case "gzip", "x-gzip": + var err error + rc, err = gzip.NewReader(rc) + if err != nil { + return nil, err + } + } + if maxBatchSizeBytes > 0 { + rc = io2.NewLimitedReadCloser(rc, maxBatchSizeBytes) + } + return rc, nil +} + +// yoloString to create strings that are guaranteed not to be referenced again. +func yoloString(b []byte) string { + return *((*string)(unsafe.Pointer(&b))) +} diff --git a/pkg/distributor/influxpush/parser_test.go b/pkg/distributor/influxpush/parser_test.go new file mode 100644 index 00000000000..729819d0f55 --- /dev/null +++ b/pkg/distributor/influxpush/parser_test.go @@ -0,0 +1,283 @@ +// SPDX-License-Identifier: AGPL-3.0-only + +package influxpush + +import ( + "bytes" + "context" + "net/http/httptest" + "sort" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/grafana/mimir/pkg/mimirpb" +) + +const maxSize = 100 << 10 + +func TestParseInfluxLineReader(t *testing.T) { + tests := []struct { + name string + url string + data string + expectedResult []mimirpb.TimeSeries + }{ + { + name: "parse simple line single value called value", + url: "/", + data: "measurement,t1=v1 value=1.5 1465839830100400200", + expectedResult: []mimirpb.TimeSeries{ + { + Labels: []mimirpb.LabelAdapter{ + {Name: "__mimir_source__", Value: "influx"}, + {Name: "__name__", Value: "measurement"}, + {Name: "t1", Value: "v1"}, + }, + Samples: []mimirpb.Sample{{Value: 1.5, TimestampMs: 1465839830100}}, + }, + }, + }, + { + name: "parse simple line single value", + url: "/", + data: "measurement,t1=v1 f1=2 1465839830100400200", + expectedResult: []mimirpb.TimeSeries{ + { + Labels: []mimirpb.LabelAdapter{ + {Name: "__mimir_source__", Value: "influx"}, + {Name: "__name__", Value: "measurement_f1"}, + {Name: "t1", Value: "v1"}, + }, + Samples: []mimirpb.Sample{{Value: 2, TimestampMs: 1465839830100}}, + }, + }, + }, + { + name: "parse simple line single float value", + url: "/", + data: "measurement,t1=v1 f1=3.14159 1465839830100400200", + expectedResult: []mimirpb.TimeSeries{ + { + Labels: []mimirpb.LabelAdapter{ + {Name: "__mimir_source__", Value: "influx"}, + {Name: "__name__", Value: "measurement_f1"}, + {Name: "t1", Value: "v1"}, + }, + Samples: []mimirpb.Sample{{Value: 3.14159, TimestampMs: 1465839830100}}, + }, + }, + }, + { + name: "parse simple line multiple int/float values", + url: "/", + data: "measurement,t1=v1 f1=2,f2=3i,f3=3.14159 1465839830100400200", + expectedResult: []mimirpb.TimeSeries{ + { + Labels: []mimirpb.LabelAdapter{ + {Name: "__mimir_source__", Value: "influx"}, + {Name: "__name__", Value: "measurement_f1"}, + {Name: "t1", Value: "v1"}, + }, + Samples: []mimirpb.Sample{{Value: 2, TimestampMs: 1465839830100}}, + }, + { + Labels: []mimirpb.LabelAdapter{ + {Name: "__mimir_source__", Value: "influx"}, + {Name: "__name__", Value: "measurement_f2"}, + {Name: "t1", Value: "v1"}, + }, + Samples: []mimirpb.Sample{{Value: 3, TimestampMs: 1465839830100}}, + }, + { + Labels: []mimirpb.LabelAdapter{ + {Name: "__mimir_source__", Value: "influx"}, + {Name: "__name__", Value: "measurement_f3"}, + {Name: "t1", Value: "v1"}, + }, + Samples: []mimirpb.Sample{{Value: 3.14159, TimestampMs: 1465839830100}}, + }, + }, + }, + { + name: "parse simple line ignoring string value", + url: "/", + data: "measurement,t1=v1 f1=2,f2=\"val2\" 1465839830100400200", + expectedResult: []mimirpb.TimeSeries{ + { + Labels: []mimirpb.LabelAdapter{ + {Name: "__mimir_source__", Value: "influx"}, + {Name: "__name__", Value: "measurement_f1"}, + {Name: "t1", Value: "v1"}, + }, + Samples: []mimirpb.Sample{{Value: 2, TimestampMs: 1465839830100}}, + }, + // We don't produce a result for the f2="val2" field set + }, + }, + { + name: "parse multiple tags", + url: "/", + data: "measurement,t1=v1,t2=v2,t3=v3 f1=36 1465839830100400200", + expectedResult: []mimirpb.TimeSeries{ + { + Labels: []mimirpb.LabelAdapter{ + {Name: "__mimir_source__", Value: "influx"}, + {Name: "__name__", Value: "measurement_f1"}, + {Name: "t1", Value: "v1"}, + {Name: "t2", Value: "v2"}, + {Name: "t3", Value: "v3"}, + }, + Samples: []mimirpb.Sample{{Value: 36, TimestampMs: 1465839830100}}, + }, + }, + }, + { + name: "parse multiple fields", + url: "/", + data: "measurement,t1=v1 f1=3.0,f2=365,f3=0 1465839830100400200", + expectedResult: []mimirpb.TimeSeries{ + { + Labels: []mimirpb.LabelAdapter{ + {Name: "__mimir_source__", Value: "influx"}, + {Name: "__name__", Value: "measurement_f1"}, + {Name: "t1", Value: "v1"}, + }, + Samples: []mimirpb.Sample{{Value: 3, TimestampMs: 1465839830100}}, + }, + { + Labels: []mimirpb.LabelAdapter{ + {Name: "__mimir_source__", Value: "influx"}, + {Name: "__name__", Value: "measurement_f2"}, + {Name: "t1", Value: "v1"}, + }, + Samples: []mimirpb.Sample{{Value: 365, TimestampMs: 1465839830100}}, + }, + { + Labels: []mimirpb.LabelAdapter{ + {Name: "__mimir_source__", Value: "influx"}, + {Name: "__name__", Value: "measurement_f3"}, + {Name: "t1", Value: "v1"}, + }, + Samples: []mimirpb.Sample{{Value: 0, TimestampMs: 1465839830100}}, + }, + }, + }, + { + name: "parse invalid char conversion", + url: "/", + data: "*measurement,#t1?=v1 f#1=0 1465839830100400200", + expectedResult: []mimirpb.TimeSeries{ + { + Labels: []mimirpb.LabelAdapter{ + {Name: "__mimir_source__", Value: "influx"}, + {Name: "__name__", Value: "_measurement_f_1"}, + {Name: "_t1_", Value: "v1"}, + }, + Samples: []mimirpb.Sample{{Value: 0, TimestampMs: 1465839830100}}, + }, + }, + }, + { + name: "parse invalid char conversion number prefix", + url: "/", + data: "0measurement,1t1=v1 f1=0 1465839830100400200", + expectedResult: []mimirpb.TimeSeries{ + { + Labels: []mimirpb.LabelAdapter{ + {Name: "_1t1", Value: "v1"}, + {Name: "__mimir_source__", Value: "influx"}, + {Name: "__name__", Value: "_0measurement_f1"}, + }, + Samples: []mimirpb.Sample{{Value: 0, TimestampMs: 1465839830100}}, + }, + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + req := httptest.NewRequest("POST", tt.url, bytes.NewReader([]byte(tt.data))) + + timeSeries, _, err := ParseInfluxLineReader(context.Background(), req, maxSize) + require.NoError(t, err) + + if len(timeSeries) > 1 { + // sort the returned timeSeries results in guarantee expected order for comparison + sort.Slice(timeSeries, func(i, j int) bool { + return timeSeries[i].String() < timeSeries[j].String() + }) + } + + // Ensure we are getting the expected number of results + assert.Equal(t, len(timeSeries), len(tt.expectedResult)) + + // Compare them one by one + for i := 0; i < len(timeSeries); i++ { + assert.Equal(t, timeSeries[i].String(), tt.expectedResult[i].String()) + } + }) + } +} + +func TestParseInfluxInvalidInput(t *testing.T) { + tests := []struct { + name string + url string + data string + }{ + { + name: "parse invalid precision", + url: "/write?precision=ss", // precision must be of type "ns", "us", "ms", "s" + data: "measurement,t1=v1 f1=2 1465839830100400200", + }, + { + name: "parse invalid field input", + url: "/write", + data: "measurement,t1=v1 f1= 1465839830100400200", // field value is missing + }, + { + name: "parse invalid tags", + url: "/write", + data: "measurement,t1=v1,t2 f1=2 1465839830100400200", // field value is missing + }, + { + name: "parse field value invalid quotes", + url: "/write", + data: "measurement,t1=v1 f1=v1 1465839830100400200", // string type field values require double quotes + }, + { + name: "parse missing field", + url: "/write", + data: "measurement,t1=v1 1465839830100400200", // missing field + }, + { + name: "parse missing tag name", + url: "/write", + data: "measurement,=v1 1465839830100400200", // missing tag name + }, + { + name: "parse missing tag value", + url: "/write", + data: "measurement,t1= 1465839830100400200", // missing tag value + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + req := httptest.NewRequest("POST", tt.url, bytes.NewReader([]byte(tt.data))) + + _, _, err := ParseInfluxLineReader(context.Background(), req, maxSize) + require.Error(t, err) + }) + } +} + +func TestParseInfluxBatchReadCloser(t *testing.T) { + req := httptest.NewRequest("POST", "/write", bytes.NewReader([]byte("m,t1=v1 f1=2 1465839830100400200"))) + req.Header.Add("Content-Encoding", "gzip") + + _, err := batchReadCloser(req.Body, "gzip", int64(maxSize)) + require.Error(t, err) +} diff --git a/vendor/github.com/influxdata/influxdb/v2/LICENSE b/vendor/github.com/influxdata/influxdb/v2/LICENSE new file mode 100644 index 00000000000..2517ee1da6e --- /dev/null +++ b/vendor/github.com/influxdata/influxdb/v2/LICENSE @@ -0,0 +1,21 @@ +MIT License + +Copyright (c) 2018 InfluxData + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. diff --git a/vendor/github.com/influxdata/influxdb/v2/kit/io/limited_read_closer.go b/vendor/github.com/influxdata/influxdb/v2/kit/io/limited_read_closer.go new file mode 100644 index 00000000000..71f1ff14f7f --- /dev/null +++ b/vendor/github.com/influxdata/influxdb/v2/kit/io/limited_read_closer.go @@ -0,0 +1,59 @@ +package io + +import ( + "errors" + "io" +) + +var ErrReadLimitExceeded = errors.New("read limit exceeded") + +// LimitedReadCloser wraps an io.ReadCloser in limiting behavior using +// io.LimitedReader. It allows us to obtain the limit error at the time of close +// instead of just when writing. +type LimitedReadCloser struct { + R io.ReadCloser // underlying reader + N int64 // max bytes remaining + err error + closed bool + limitExceeded bool +} + +// NewLimitedReadCloser returns a new LimitedReadCloser. +func NewLimitedReadCloser(r io.ReadCloser, n int64) *LimitedReadCloser { + return &LimitedReadCloser{ + R: r, + N: n, + } +} + +func (l *LimitedReadCloser) Read(p []byte) (n int, err error) { + if l.N <= 0 { + l.limitExceeded = true + return 0, io.EOF + } + if int64(len(p)) > l.N { + p = p[0:l.N] + } + n, err = l.R.Read(p) + l.N -= int64(n) + return +} + +// Close returns an ErrReadLimitExceeded when the wrapped reader exceeds the set +// limit for number of bytes. This is safe to call more than once but not +// concurrently. +func (l *LimitedReadCloser) Close() (err error) { + if l.limitExceeded { + l.err = ErrReadLimitExceeded + } + if l.closed { + // Close has already been called. + return l.err + } + if err := l.R.Close(); err != nil && l.err == nil { + l.err = err + } + // Prevent l.closer.Close from being called again. + l.closed = true + return l.err +} diff --git a/vendor/github.com/influxdata/influxdb/v2/models/consistency.go b/vendor/github.com/influxdata/influxdb/v2/models/consistency.go new file mode 100644 index 00000000000..2a3269bca11 --- /dev/null +++ b/vendor/github.com/influxdata/influxdb/v2/models/consistency.go @@ -0,0 +1,48 @@ +package models + +import ( + "errors" + "strings" +) + +// ConsistencyLevel represent a required replication criteria before a write can +// be returned as successful. +// +// The consistency level is handled in open-source InfluxDB but only applicable to clusters. +type ConsistencyLevel int + +const ( + // ConsistencyLevelAny allows for hinted handoff, potentially no write happened yet. + ConsistencyLevelAny ConsistencyLevel = iota + + // ConsistencyLevelOne requires at least one data node acknowledged a write. + ConsistencyLevelOne + + // ConsistencyLevelQuorum requires a quorum of data nodes to acknowledge a write. + ConsistencyLevelQuorum + + // ConsistencyLevelAll requires all data nodes to acknowledge a write. + ConsistencyLevelAll +) + +var ( + // ErrInvalidConsistencyLevel is returned when parsing the string version + // of a consistency level. + ErrInvalidConsistencyLevel = errors.New("invalid consistency level") +) + +// ParseConsistencyLevel converts a consistency level string to the corresponding ConsistencyLevel const. +func ParseConsistencyLevel(level string) (ConsistencyLevel, error) { + switch strings.ToLower(level) { + case "any": + return ConsistencyLevelAny, nil + case "one": + return ConsistencyLevelOne, nil + case "quorum": + return ConsistencyLevelQuorum, nil + case "all": + return ConsistencyLevelAll, nil + default: + return 0, ErrInvalidConsistencyLevel + } +} diff --git a/vendor/github.com/influxdata/influxdb/v2/models/fieldtype_string.go b/vendor/github.com/influxdata/influxdb/v2/models/fieldtype_string.go new file mode 100644 index 00000000000..d8016e8bf3e --- /dev/null +++ b/vendor/github.com/influxdata/influxdb/v2/models/fieldtype_string.go @@ -0,0 +1,28 @@ +// Code generated by "stringer -type=FieldType"; DO NOT EDIT. + +package models + +import "strconv" + +func _() { + // An "invalid array index" compiler error signifies that the constant values have changed. + // Re-run the stringer command to generate them again. + var x [1]struct{} + _ = x[Integer-0] + _ = x[Float-1] + _ = x[Boolean-2] + _ = x[String-3] + _ = x[Empty-4] + _ = x[Unsigned-5] +} + +const _FieldType_name = "IntegerFloatBooleanStringEmptyUnsigned" + +var _FieldType_index = [...]uint8{0, 7, 12, 19, 25, 30, 38} + +func (i FieldType) String() string { + if i < 0 || i >= FieldType(len(_FieldType_index)-1) { + return "FieldType(" + strconv.FormatInt(int64(i), 10) + ")" + } + return _FieldType_name[_FieldType_index[i]:_FieldType_index[i+1]] +} diff --git a/vendor/github.com/influxdata/influxdb/v2/models/gen.go b/vendor/github.com/influxdata/influxdb/v2/models/gen.go new file mode 100644 index 00000000000..0aaa43f2037 --- /dev/null +++ b/vendor/github.com/influxdata/influxdb/v2/models/gen.go @@ -0,0 +1,3 @@ +package models + +//go:generate stringer -type=FieldType diff --git a/vendor/github.com/influxdata/influxdb/v2/models/inline_fnv.go b/vendor/github.com/influxdata/influxdb/v2/models/inline_fnv.go new file mode 100644 index 00000000000..eec1ae8b013 --- /dev/null +++ b/vendor/github.com/influxdata/influxdb/v2/models/inline_fnv.go @@ -0,0 +1,32 @@ +package models // import "github.com/influxdata/influxdb/models" + +// from stdlib hash/fnv/fnv.go +const ( + prime64 = 1099511628211 + offset64 = 14695981039346656037 +) + +// InlineFNV64a is an alloc-free port of the standard library's fnv64a. +// See https://en.wikipedia.org/wiki/Fowler%E2%80%93Noll%E2%80%93Vo_hash_function. +type InlineFNV64a uint64 + +// NewInlineFNV64a returns a new instance of InlineFNV64a. +func NewInlineFNV64a() InlineFNV64a { + return offset64 +} + +// Write adds data to the running hash. +func (s *InlineFNV64a) Write(data []byte) (int, error) { + hash := uint64(*s) + for _, c := range data { + hash ^= uint64(c) + hash *= prime64 + } + *s = InlineFNV64a(hash) + return len(data), nil +} + +// Sum64 returns the uint64 of the current resulting hash. +func (s *InlineFNV64a) Sum64() uint64 { + return uint64(*s) +} diff --git a/vendor/github.com/influxdata/influxdb/v2/models/inline_strconv_parse.go b/vendor/github.com/influxdata/influxdb/v2/models/inline_strconv_parse.go new file mode 100644 index 00000000000..0ad5468994e --- /dev/null +++ b/vendor/github.com/influxdata/influxdb/v2/models/inline_strconv_parse.go @@ -0,0 +1,34 @@ +package models // import "github.com/influxdata/influxdb/models" + +import ( + "strconv" + "unsafe" +) + +// parseIntBytes is a zero-alloc wrapper around strconv.ParseInt. +func parseIntBytes(b []byte, base int, bitSize int) (i int64, err error) { + s := unsafeBytesToString(b) + return strconv.ParseInt(s, base, bitSize) +} + +// parseUintBytes is a zero-alloc wrapper around strconv.ParseUint. +func parseUintBytes(b []byte, base int, bitSize int) (i uint64, err error) { + s := unsafeBytesToString(b) + return strconv.ParseUint(s, base, bitSize) +} + +// parseFloatBytes is a zero-alloc wrapper around strconv.ParseFloat. +func parseFloatBytes(b []byte, bitSize int) (float64, error) { + s := unsafeBytesToString(b) + return strconv.ParseFloat(s, bitSize) +} + +// parseBoolBytes is a zero-alloc wrapper around strconv.ParseBool. +func parseBoolBytes(b []byte) (bool, error) { + return strconv.ParseBool(unsafeBytesToString(b)) +} + +// unsafeBytesToString converts a []byte to a string without a heap allocation. +func unsafeBytesToString(in []byte) string { + return *(*string)(unsafe.Pointer(&in)) +} diff --git a/vendor/github.com/influxdata/influxdb/v2/models/points.go b/vendor/github.com/influxdata/influxdb/v2/models/points.go new file mode 100644 index 00000000000..bad28d53a7b --- /dev/null +++ b/vendor/github.com/influxdata/influxdb/v2/models/points.go @@ -0,0 +1,2640 @@ +// Package models implements basic objects used throughout the TICK stack. +package models // import "github.com/influxdata/influxdb/models" + +import ( + "bytes" + "encoding/binary" + "errors" + "fmt" + "io" + "math" + "sort" + "strconv" + "strings" + "time" + "unicode" + "unicode/utf8" + + "github.com/influxdata/influxdb/v2/pkg/escape" +) + +const ( + // Values used to store the field key and measurement name as special internal tags. + FieldKeyTagKey = "\xff" + MeasurementTagKey = "\x00" + + // reserved tag keys which when present cause the point to be discarded + // and an error returned + reservedFieldTagKey = "_field" + reservedMeasurementTagKey = "_measurement" + reservedTimeTagKey = "time" +) + +var ( + // Predefined byte representations of special tag keys. + FieldKeyTagKeyBytes = []byte(FieldKeyTagKey) + MeasurementTagKeyBytes = []byte(MeasurementTagKey) + + // set of reserved tag keys which cannot be present when a point is being parsed. + reservedTagKeys = [][]byte{ + FieldKeyTagKeyBytes, + MeasurementTagKeyBytes, + []byte(reservedFieldTagKey), + []byte(reservedMeasurementTagKey), + []byte(reservedTimeTagKey), + } +) + +type escapeSet struct { + k [1]byte + esc [2]byte +} + +var ( + measurementEscapeCodes = [...]escapeSet{ + {k: [1]byte{','}, esc: [2]byte{'\\', ','}}, + {k: [1]byte{' '}, esc: [2]byte{'\\', ' '}}, + } + + tagEscapeCodes = [...]escapeSet{ + {k: [1]byte{','}, esc: [2]byte{'\\', ','}}, + {k: [1]byte{' '}, esc: [2]byte{'\\', ' '}}, + {k: [1]byte{'='}, esc: [2]byte{'\\', '='}}, + } + + // ErrPointMustHaveAField is returned when operating on a point that does not have any fields. + ErrPointMustHaveAField = errors.New("point without fields is unsupported") + + // ErrInvalidNumber is returned when a number is expected but not provided. + ErrInvalidNumber = errors.New("invalid number") + + // ErrInvalidPoint is returned when a point cannot be parsed correctly. + ErrInvalidPoint = errors.New("point is invalid") + + // ErrInvalidKevValuePairs is returned when the number of key, value pairs + // is odd, indicating a missing value. + ErrInvalidKevValuePairs = errors.New("key/value pairs is an odd length") +) + +const ( + // MaxKeyLength is the largest allowed size of the combined measurement and tag keys. + MaxKeyLength = 65535 +) + +// Point defines the values that will be written to the database. +type Point interface { + // Name return the measurement name for the point. + Name() []byte + + // SetName updates the measurement name for the point. + SetName(string) + + // Tags returns the tag set for the point. + Tags() Tags + + // ForEachTag iterates over each tag invoking fn. If fn return false, iteration stops. + ForEachTag(fn func(k, v []byte) bool) + + // AddTag adds or replaces a tag value for a point. + AddTag(key, value string) + + // SetTags replaces the tags for the point. + SetTags(tags Tags) + + // HasTag returns true if the tag exists for the point. + HasTag(tag []byte) bool + + // Fields returns the fields for the point. + Fields() (Fields, error) + + // Time return the timestamp for the point. + Time() time.Time + + // SetTime updates the timestamp for the point. + SetTime(t time.Time) + + // UnixNano returns the timestamp of the point as nanoseconds since Unix epoch. + UnixNano() int64 + + // HashID returns a non-cryptographic checksum of the point's key. + HashID() uint64 + + // Key returns the key (measurement joined with tags) of the point. + Key() []byte + + // String returns a string representation of the point. If there is a + // timestamp associated with the point then it will be specified with the default + // precision of nanoseconds. + String() string + + // MarshalBinary returns a binary representation of the point. + MarshalBinary() ([]byte, error) + + // PrecisionString returns a string representation of the point. If there + // is a timestamp associated with the point then it will be specified in the + // given unit. + PrecisionString(precision string) string + + // RoundedString returns a string representation of the point. If there + // is a timestamp associated with the point, then it will be rounded to the + // given duration. + RoundedString(d time.Duration) string + + // Split will attempt to return multiple points with the same timestamp whose + // string representations are no longer than size. Points with a single field or + // a point without a timestamp may exceed the requested size. + Split(size int) []Point + + // Round will round the timestamp of the point to the given duration. + Round(d time.Duration) + + // StringSize returns the length of the string that would be returned by String(). + StringSize() int + + // AppendString appends the result of String() to the provided buffer and returns + // the result, potentially reducing string allocations. + AppendString(buf []byte) []byte + + // FieldIterator returns a FieldIterator that can be used to traverse the + // fields of a point without constructing the in-memory map. + FieldIterator() FieldIterator +} + +// FieldType represents the type of a field. +type FieldType int + +const ( + // Integer indicates the field's type is integer. + Integer FieldType = iota + + // Float indicates the field's type is float. + Float + + // Boolean indicates the field's type is boolean. + Boolean + + // String indicates the field's type is string. + String + + // Empty is used to indicate that there is no field. + Empty + + // Unsigned indicates the field's type is an unsigned integer. + Unsigned +) + +// FieldIterator provides a low-allocation interface to iterate through a point's fields. +type FieldIterator interface { + // Next indicates whether there any fields remaining. + Next() bool + + // FieldKey returns the key of the current field. + FieldKey() []byte + + // Type returns the FieldType of the current field. + Type() FieldType + + // StringValue returns the string value of the current field. + StringValue() string + + // IntegerValue returns the integer value of the current field. + IntegerValue() (int64, error) + + // UnsignedValue returns the unsigned value of the current field. + UnsignedValue() (uint64, error) + + // BooleanValue returns the boolean value of the current field. + BooleanValue() (bool, error) + + // FloatValue returns the float value of the current field. + FloatValue() (float64, error) + + // Reset resets the iterator to its initial state. + Reset() +} + +// Points represents a sortable list of points by timestamp. +type Points []Point + +// Len implements sort.Interface. +func (a Points) Len() int { return len(a) } + +// Less implements sort.Interface. +func (a Points) Less(i, j int) bool { return a[i].Time().Before(a[j].Time()) } + +// Swap implements sort.Interface. +func (a Points) Swap(i, j int) { a[i], a[j] = a[j], a[i] } + +// point is the default implementation of Point. +type point struct { + time time.Time + + // text encoding of measurement and tags + // key must always be stored sorted by tags, if the original line was not sorted, + // we need to resort it + key []byte + + // text encoding of field data + fields []byte + + // text encoding of timestamp + ts []byte + + // cached version of parsed fields from data + cachedFields map[string]interface{} + + // cached version of parsed name from key + cachedName string + + // cached version of parsed tags + cachedTags Tags + + it fieldIterator +} + +// type assertions +var ( + _ Point = (*point)(nil) + _ FieldIterator = (*point)(nil) +) + +const ( + // the number of characters for the largest possible int64 (9223372036854775807) + maxInt64Digits = 19 + + // the number of characters for the smallest possible int64 (-9223372036854775808) + minInt64Digits = 20 + + // the number of characters for the largest possible uint64 (18446744073709551615) + maxUint64Digits = 20 + + // the number of characters required for the largest float64 before a range check + // would occur during parsing + maxFloat64Digits = 25 + + // the number of characters required for smallest float64 before a range check occur + // would occur during parsing + minFloat64Digits = 27 +) + +// ParsePoints returns a slice of Points from a text representation of a point +// with each point separated by newlines. If any points fail to parse, a non-nil error +// will be returned in addition to the points that parsed successfully. +func ParsePoints(buf []byte) ([]Point, error) { + return ParsePointsWithPrecision(buf, time.Now().UTC(), "n") +} + +// ParsePointsString is identical to ParsePoints but accepts a string. +func ParsePointsString(buf string) ([]Point, error) { + return ParsePoints([]byte(buf)) +} + +// ParseKey returns the measurement name and tags from a point. +// +// NOTE: to minimize heap allocations, the returned Tags will refer to subslices of buf. +// This can have the unintended effect preventing buf from being garbage collected. +func ParseKey(buf []byte) (string, Tags) { + name, tags := ParseKeyBytes(buf) + return string(name), tags +} + +func ParseKeyBytes(buf []byte) ([]byte, Tags) { + return ParseKeyBytesWithTags(buf, nil) +} + +func ParseKeyBytesWithTags(buf []byte, tags Tags) ([]byte, Tags) { + // Ignore the error because scanMeasurement returns "missing fields" which we ignore + // when just parsing a key + state, i, _ := scanMeasurement(buf, 0) + + var name []byte + if state == tagKeyState { + tags = parseTags(buf, tags) + // scanMeasurement returns the location of the comma if there are tags, strip that off + name = buf[:i-1] + } else { + name = buf[:i] + } + return unescapeMeasurement(name), tags +} + +func ParseTags(buf []byte) Tags { + return parseTags(buf, nil) +} + +func ParseTagsWithTags(buf []byte, tags Tags) Tags { + return parseTags(buf, tags) +} + +func ParseName(buf []byte) []byte { + // Ignore the error because scanMeasurement returns "missing fields" which we ignore + // when just parsing a key + state, i, _ := scanMeasurement(buf, 0) + var name []byte + if state == tagKeyState { + name = buf[:i-1] + } else { + name = buf[:i] + } + + return unescapeMeasurement(name) +} + +// ValidPrecision checks if the precision is known. +func ValidPrecision(precision string) bool { + switch precision { + case "ns", "us", "ms", "s": + return true + default: + return false + } +} + +// ParsePointsWithPrecision is similar to ParsePoints, but allows the +// caller to provide a precision for time. +// +// NOTE: to minimize heap allocations, the returned Points will refer to subslices of buf. +// This can have the unintended effect preventing buf from being garbage collected. +func ParsePointsWithPrecision(buf []byte, defaultTime time.Time, precision string) ([]Point, error) { + points := make([]Point, 0, bytes.Count(buf, []byte{'\n'})+1) + var ( + pos int + block []byte + failed []string + ) + for pos < len(buf) { + pos, block = scanLine(buf, pos) + pos++ + + if len(block) == 0 { + continue + } + + start := skipWhitespace(block, 0) + + // If line is all whitespace, just skip it + if start >= len(block) { + continue + } + + // lines which start with '#' are comments + if block[start] == '#' { + continue + } + + // strip the newline if one is present + if block[len(block)-1] == '\n' { + block = block[:len(block)-1] + } + + pt, err := parsePoint(block[start:], defaultTime, precision) + if err != nil { + failed = append(failed, fmt.Sprintf("unable to parse '%s': %v", string(block[start:]), err)) + } else { + points = append(points, pt) + } + + } + if len(failed) > 0 { + return points, fmt.Errorf("%s", strings.Join(failed, "\n")) + } + return points, nil + +} + +func parsePoint(buf []byte, defaultTime time.Time, precision string) (Point, error) { + // scan the first block which is measurement[,tag1=value1,tag2=value2...] + pos, key, err := scanKey(buf, 0) + if err != nil { + return nil, err + } + + // measurement name is required + if len(key) == 0 { + return nil, fmt.Errorf("missing measurement") + } + + if len(key) > MaxKeyLength { + return nil, fmt.Errorf("max key length exceeded: %v > %v", len(key), MaxKeyLength) + } + + // scan the second block is which is field1=value1[,field2=value2,...] + pos, fields, err := scanFields(buf, pos) + if err != nil { + return nil, err + } + + // at least one field is required + if len(fields) == 0 { + return nil, fmt.Errorf("missing fields") + } + + var maxKeyErr error + err = walkFields(fields, func(k, v []byte) bool { + if sz := seriesKeySize(key, k); sz > MaxKeyLength { + maxKeyErr = fmt.Errorf("max key length exceeded: %v > %v", sz, MaxKeyLength) + return false + } + return true + }) + + if err != nil { + return nil, err + } + + if maxKeyErr != nil { + return nil, maxKeyErr + } + + // scan the last block which is an optional integer timestamp + pos, ts, err := scanTime(buf, pos) + if err != nil { + return nil, err + } + + pt := &point{ + key: key, + fields: fields, + ts: ts, + } + + if len(ts) == 0 { + pt.time = defaultTime + pt.SetPrecision(precision) + } else { + ts, err := parseIntBytes(ts, 10, 64) + if err != nil { + return nil, err + } + pt.time, err = SafeCalcTime(ts, precision) + if err != nil { + return nil, err + } + + // Determine if there are illegal non-whitespace characters after the + // timestamp block. + for pos < len(buf) { + if buf[pos] != ' ' { + return nil, ErrInvalidPoint + } + pos++ + } + } + return pt, nil +} + +// GetPrecisionMultiplier will return a multiplier for the precision specified. +func GetPrecisionMultiplier(precision string) int64 { + d := time.Nanosecond + switch precision { + case "us": + d = time.Microsecond + case "ms": + d = time.Millisecond + case "s": + d = time.Second + } + return int64(d) +} + +// scanKey scans buf starting at i for the measurement and tag portion of the point. +// It returns the ending position and the byte slice of key within buf. If there +// are tags, they will be sorted if they are not already. +func scanKey(buf []byte, i int) (int, []byte, error) { + start := skipWhitespace(buf, i) + + i = start + + // Determines whether the tags are sort, assume they are + sorted := true + + // indices holds the indexes within buf of the start of each tag. For example, + // a buf of 'cpu,host=a,region=b,zone=c' would have indices slice of [4,11,20] + // which indicates that the first tag starts at buf[4], seconds at buf[11], and + // last at buf[20] + indices := make([]int, 100) + + // tracks how many commas we've seen so we know how many values are indices. + // Since indices is an arbitrarily large slice, + // we need to know how many values in the buffer are in use. + commas := 0 + + // First scan the Point's measurement. + state, i, err := scanMeasurement(buf, i) + if err != nil { + return i, buf[start:i], err + } + + // Optionally scan tags if needed. + if state == tagKeyState { + i, commas, indices, err = scanTags(buf, i, indices) + if err != nil { + return i, buf[start:i], err + } + } + + // Iterate over tags keys ensure that we do not encounter any + // of the reserved tag keys such as _measurement or _field. + for j := 0; j < commas; j++ { + _, key := scanTo(buf[indices[j]:indices[j+1]-1], 0, '=') + + for _, reserved := range reservedTagKeys { + if bytes.Equal(key, reserved) { + return i, buf[start:i], fmt.Errorf("cannot use reserved tag key %q", key) + } + } + } + + // Now we know where the key region is within buf, and the location of tags, we + // need to determine if duplicate tags exist and if the tags are sorted. This iterates + // over the list comparing each tag in the sequence with each other. + for j := 0; j < commas-1; j++ { + // get the left and right tags + _, left := scanTo(buf[indices[j]:indices[j+1]-1], 0, '=') + _, right := scanTo(buf[indices[j+1]:indices[j+2]-1], 0, '=') + + // If left is greater than right, the tags are not sorted. We do not have to + // continue because the short path no longer works. + // If the tags are equal, then there are duplicate tags, and we should abort. + // If the tags are not sorted, this pass may not find duplicate tags and we + // need to do a more exhaustive search later. + if cmp := bytes.Compare(left, right); cmp > 0 { + sorted = false + break + } else if cmp == 0 { + return i, buf[start:i], fmt.Errorf("duplicate tags") + } + } + + // If the tags are not sorted, then sort them. This sort is inline and + // uses the tag indices we created earlier. The actual buffer is not sorted, the + // indices are using the buffer for value comparison. After the indices are sorted, + // the buffer is reconstructed from the sorted indices. + if !sorted && commas > 0 { + // Get the measurement name for later + measurement := buf[start : indices[0]-1] + + // Sort the indices + indices := indices[:commas] + insertionSort(0, commas, buf, indices) + + // Create a new key using the measurement and sorted indices + b := make([]byte, len(buf[start:i])) + pos := copy(b, measurement) + for _, i := range indices { + b[pos] = ',' + pos++ + _, v := scanToSpaceOr(buf, i, ',') + pos += copy(b[pos:], v) + } + + // Check again for duplicate tags now that the tags are sorted. + for j := 0; j < commas-1; j++ { + // get the left and right tags + _, left := scanTo(buf[indices[j]:], 0, '=') + _, right := scanTo(buf[indices[j+1]:], 0, '=') + + // If the tags are equal, then there are duplicate tags, and we should abort. + // If the tags are not sorted, this pass may not find duplicate tags and we + // need to do a more exhaustive search later. + if bytes.Equal(left, right) { + return i, b, fmt.Errorf("duplicate tags") + } + } + + return i, b, nil + } + + return i, buf[start:i], nil +} + +// The following constants allow us to specify which state to move to +// next, when scanning sections of a Point. +const ( + tagKeyState = iota + tagValueState + fieldsState +) + +// scanMeasurement examines the measurement part of a Point, returning +// the next state to move to, and the current location in the buffer. +func scanMeasurement(buf []byte, i int) (int, int, error) { + // Check first byte of measurement, anything except a comma is fine. + // It can't be a space, since whitespace is stripped prior to this + // function call. + if i >= len(buf) || buf[i] == ',' { + return -1, i, fmt.Errorf("missing measurement") + } + + for { + i++ + if i >= len(buf) { + // cpu + return -1, i, fmt.Errorf("missing fields") + } + + if buf[i-1] == '\\' { + // Skip character (it's escaped). + continue + } + + // Unescaped comma; move onto scanning the tags. + if buf[i] == ',' { + return tagKeyState, i + 1, nil + } + + // Unescaped space; move onto scanning the fields. + if buf[i] == ' ' { + // cpu value=1.0 + return fieldsState, i, nil + } + } +} + +// scanTags examines all the tags in a Point, keeping track of and +// returning the updated indices slice, number of commas and location +// in buf where to start examining the Point fields. +func scanTags(buf []byte, i int, indices []int) (int, int, []int, error) { + var ( + err error + commas int + state = tagKeyState + ) + + for { + switch state { + case tagKeyState: + // Grow our indices slice if we have too many tags. + if commas >= len(indices) { + newIndics := make([]int, cap(indices)*2) + copy(newIndics, indices) + indices = newIndics + } + indices[commas] = i + commas++ + + i, err = scanTagsKey(buf, i) + state = tagValueState // tag value always follows a tag key + case tagValueState: + state, i, err = scanTagsValue(buf, i) + case fieldsState: + // Grow our indices slice if we had exactly enough tags to fill it + if commas >= len(indices) { + // The parser is in `fieldsState`, so there are no more + // tags. We only need 1 more entry in the slice to store + // the final entry. + newIndics := make([]int, cap(indices)+1) + copy(newIndics, indices) + indices = newIndics + } + indices[commas] = i + 1 + return i, commas, indices, nil + } + + if err != nil { + return i, commas, indices, err + } + } +} + +// scanTagsKey scans each character in a tag key. +func scanTagsKey(buf []byte, i int) (int, error) { + // First character of the key. + if i >= len(buf) || buf[i] == ' ' || buf[i] == ',' || buf[i] == '=' { + // cpu,{'', ' ', ',', '='} + return i, fmt.Errorf("missing tag key") + } + + // Examine each character in the tag key until we hit an unescaped + // equals (the tag value), or we hit an error (i.e., unescaped + // space or comma). + for { + i++ + + // Either we reached the end of the buffer or we hit an + // unescaped comma or space. + if i >= len(buf) || + ((buf[i] == ' ' || buf[i] == ',') && buf[i-1] != '\\') { + // cpu,tag{'', ' ', ','} + return i, fmt.Errorf("missing tag value") + } + + if buf[i] == '=' && buf[i-1] != '\\' { + // cpu,tag= + return i + 1, nil + } + } +} + +// scanTagsValue scans each character in a tag value. +func scanTagsValue(buf []byte, i int) (int, int, error) { + // Tag value cannot be empty. + if i >= len(buf) || buf[i] == ',' || buf[i] == ' ' { + // cpu,tag={',', ' '} + return -1, i, fmt.Errorf("missing tag value") + } + + // Examine each character in the tag value until we hit an unescaped + // comma (move onto next tag key), an unescaped space (move onto + // fields), or we error out. + for { + i++ + if i >= len(buf) { + // cpu,tag=value + return -1, i, fmt.Errorf("missing fields") + } + + // An unescaped equals sign is an invalid tag value. + if buf[i] == '=' && buf[i-1] != '\\' { + // cpu,tag={'=', 'fo=o'} + return -1, i, fmt.Errorf("invalid tag format") + } + + if buf[i] == ',' && buf[i-1] != '\\' { + // cpu,tag=foo, + return tagKeyState, i + 1, nil + } + + // cpu,tag=foo value=1.0 + // cpu, tag=foo\= value=1.0 + if buf[i] == ' ' && buf[i-1] != '\\' { + return fieldsState, i, nil + } + } +} + +func insertionSort(l, r int, buf []byte, indices []int) { + for i := l + 1; i < r; i++ { + for j := i; j > l && less(buf, indices, j, j-1); j-- { + indices[j], indices[j-1] = indices[j-1], indices[j] + } + } +} + +func less(buf []byte, indices []int, i, j int) bool { + // This grabs the tag names for i & j, it ignores the values + _, a := scanTo(buf, indices[i], '=') + _, b := scanTo(buf, indices[j], '=') + return bytes.Compare(a, b) < 0 +} + +// scanFields scans buf, starting at i for the fields section of a point. It returns +// the ending position and the byte slice of the fields within buf. +func scanFields(buf []byte, i int) (int, []byte, error) { + start := skipWhitespace(buf, i) + i = start + quoted := false + + // tracks how many '=' we've seen + equals := 0 + + // tracks how many commas we've seen + commas := 0 + + for { + // reached the end of buf? + if i >= len(buf) { + break + } + + // escaped characters? + if buf[i] == '\\' && i+1 < len(buf) { + i += 2 + continue + } + + // If the value is quoted, scan until we get to the end quote + // Only quote values in the field value since quotes are not significant + // in the field key + if buf[i] == '"' && equals > commas { + quoted = !quoted + i++ + continue + } + + // If we see an =, ensure that there is at least on char before and after it + if buf[i] == '=' && !quoted { + equals++ + + // check for "... =123" but allow "a\ =123" + if buf[i-1] == ' ' && buf[i-2] != '\\' { + return i, buf[start:i], fmt.Errorf("missing field key") + } + + // check for "...a=123,=456" but allow "a=123,a\,=456" + if buf[i-1] == ',' && buf[i-2] != '\\' { + return i, buf[start:i], fmt.Errorf("missing field key") + } + + // check for "... value=" + if i+1 >= len(buf) { + return i, buf[start:i], fmt.Errorf("missing field value") + } + + // check for "... value=,value2=..." + if buf[i+1] == ',' || buf[i+1] == ' ' { + return i, buf[start:i], fmt.Errorf("missing field value") + } + + if isNumeric(buf[i+1]) || buf[i+1] == '-' || buf[i+1] == 'N' || buf[i+1] == 'n' { + var err error + i, err = scanNumber(buf, i+1) + if err != nil { + return i, buf[start:i], err + } + continue + } + // If next byte is not a double-quote, the value must be a boolean + if buf[i+1] != '"' { + var err error + i, _, err = scanBoolean(buf, i+1) + if err != nil { + return i, buf[start:i], err + } + continue + } + } + + if buf[i] == ',' && !quoted { + commas++ + } + + // reached end of block? + if buf[i] == ' ' && !quoted { + break + } + i++ + } + + if quoted { + return i, buf[start:i], fmt.Errorf("unbalanced quotes") + } + + // check that all field sections had key and values (e.g. prevent "a=1,b" + if equals == 0 || commas != equals-1 { + return i, buf[start:i], fmt.Errorf("invalid field format") + } + + return i, buf[start:i], nil +} + +// scanTime scans buf, starting at i for the time section of a point. It +// returns the ending position and the byte slice of the timestamp within buf +// and error if the timestamp is not in the correct numeric format. +func scanTime(buf []byte, i int) (int, []byte, error) { + start := skipWhitespace(buf, i) + i = start + + for { + // reached the end of buf? + if i >= len(buf) { + break + } + + // Reached end of block or trailing whitespace? + if buf[i] == '\n' || buf[i] == ' ' { + break + } + + // Handle negative timestamps + if i == start && buf[i] == '-' { + i++ + continue + } + + // Timestamps should be integers, make sure they are so we don't need + // to actually parse the timestamp until needed. + if buf[i] < '0' || buf[i] > '9' { + return i, buf[start:i], fmt.Errorf("bad timestamp") + } + i++ + } + return i, buf[start:i], nil +} + +func isNumeric(b byte) bool { + return (b >= '0' && b <= '9') || b == '.' +} + +// scanNumber returns the end position within buf, start at i after +// scanning over buf for an integer, or float. It returns an +// error if a invalid number is scanned. +func scanNumber(buf []byte, i int) (int, error) { + start := i + var isInt, isUnsigned bool + + // Is negative number? + if i < len(buf) && buf[i] == '-' { + i++ + // There must be more characters now, as just '-' is illegal. + if i == len(buf) { + return i, ErrInvalidNumber + } + } + + // how many decimal points we've see + decimal := false + + // indicates the number is float in scientific notation + scientific := false + + for { + if i >= len(buf) { + break + } + + if buf[i] == ',' || buf[i] == ' ' { + break + } + + if buf[i] == 'i' && i > start && !(isInt || isUnsigned) { + isInt = true + i++ + continue + } else if buf[i] == 'u' && i > start && !(isInt || isUnsigned) { + isUnsigned = true + i++ + continue + } + + if buf[i] == '.' { + // Can't have more than 1 decimal (e.g. 1.1.1 should fail) + if decimal { + return i, ErrInvalidNumber + } + decimal = true + } + + // `e` is valid for floats but not as the first char + if i > start && (buf[i] == 'e' || buf[i] == 'E') { + scientific = true + i++ + continue + } + + // + and - are only valid at this point if they follow an e (scientific notation) + if (buf[i] == '+' || buf[i] == '-') && (buf[i-1] == 'e' || buf[i-1] == 'E') { + i++ + continue + } + + // NaN is an unsupported value + if i+2 < len(buf) && (buf[i] == 'N' || buf[i] == 'n') { + return i, ErrInvalidNumber + } + + if !isNumeric(buf[i]) { + return i, ErrInvalidNumber + } + i++ + } + + if (isInt || isUnsigned) && (decimal || scientific) { + return i, ErrInvalidNumber + } + + numericDigits := i - start + if isInt { + numericDigits-- + } + if decimal { + numericDigits-- + } + if buf[start] == '-' { + numericDigits-- + } + + if numericDigits == 0 { + return i, ErrInvalidNumber + } + + // It's more common that numbers will be within min/max range for their type but we need to prevent + // out or range numbers from being parsed successfully. This uses some simple heuristics to decide + // if we should parse the number to the actual type. It does not do it all the time because it incurs + // extra allocations and we end up converting the type again when writing points to disk. + if isInt { + // Make sure the last char is an 'i' for integers (e.g. 9i10 is not valid) + if buf[i-1] != 'i' { + return i, ErrInvalidNumber + } + // Parse the int to check bounds the number of digits could be larger than the max range + // We subtract 1 from the index to remove the `i` from our tests + if len(buf[start:i-1]) >= maxInt64Digits || len(buf[start:i-1]) >= minInt64Digits { + if _, err := parseIntBytes(buf[start:i-1], 10, 64); err != nil { + return i, fmt.Errorf("unable to parse integer %s: %s", buf[start:i-1], err) + } + } + } else if isUnsigned { + // Make sure the last char is a 'u' for unsigned + if buf[i-1] != 'u' { + return i, ErrInvalidNumber + } + // Make sure the first char is not a '-' for unsigned + if buf[start] == '-' { + return i, ErrInvalidNumber + } + // Parse the uint to check bounds the number of digits could be larger than the max range + // We subtract 1 from the index to remove the `u` from our tests + if len(buf[start:i-1]) >= maxUint64Digits { + if _, err := parseUintBytes(buf[start:i-1], 10, 64); err != nil { + return i, fmt.Errorf("unable to parse unsigned %s: %s", buf[start:i-1], err) + } + } + } else { + // Parse the float to check bounds if it's scientific or the number of digits could be larger than the max range + if scientific || len(buf[start:i]) >= maxFloat64Digits || len(buf[start:i]) >= minFloat64Digits { + if _, err := parseFloatBytes(buf[start:i], 64); err != nil { + return i, fmt.Errorf("invalid float") + } + } + } + + return i, nil +} + +// scanBoolean returns the end position within buf, start at i after +// scanning over buf for boolean. Valid values for a boolean are +// t, T, true, TRUE, f, F, false, FALSE. It returns an error if a invalid boolean +// is scanned. +func scanBoolean(buf []byte, i int) (int, []byte, error) { + start := i + + if i < len(buf) && (buf[i] != 't' && buf[i] != 'f' && buf[i] != 'T' && buf[i] != 'F') { + return i, buf[start:i], fmt.Errorf("invalid boolean") + } + + i++ + for { + if i >= len(buf) { + break + } + + if buf[i] == ',' || buf[i] == ' ' { + break + } + i++ + } + + // Single char bool (t, T, f, F) is ok + if i-start == 1 { + return i, buf[start:i], nil + } + + // length must be 4 for true or TRUE + if (buf[start] == 't' || buf[start] == 'T') && i-start != 4 { + return i, buf[start:i], fmt.Errorf("invalid boolean") + } + + // length must be 5 for false or FALSE + if (buf[start] == 'f' || buf[start] == 'F') && i-start != 5 { + return i, buf[start:i], fmt.Errorf("invalid boolean") + } + + // Otherwise + valid := false + switch buf[start] { + case 't': + valid = bytes.Equal(buf[start:i], []byte("true")) + case 'f': + valid = bytes.Equal(buf[start:i], []byte("false")) + case 'T': + valid = bytes.Equal(buf[start:i], []byte("TRUE")) || bytes.Equal(buf[start:i], []byte("True")) + case 'F': + valid = bytes.Equal(buf[start:i], []byte("FALSE")) || bytes.Equal(buf[start:i], []byte("False")) + } + + if !valid { + return i, buf[start:i], fmt.Errorf("invalid boolean") + } + + return i, buf[start:i], nil + +} + +// skipWhitespace returns the end position within buf, starting at i after +// scanning over spaces in tags. +func skipWhitespace(buf []byte, i int) int { + for i < len(buf) { + if buf[i] != ' ' && buf[i] != '\t' && buf[i] != 0 { + break + } + i++ + } + return i +} + +// scanLine returns the end position in buf and the next line found within +// buf. +func scanLine(buf []byte, i int) (int, []byte) { + start := i + quoted := false + fields := false + + // tracks how many '=' and commas we've seen + // this duplicates some of the functionality in scanFields + equals := 0 + commas := 0 + for { + // reached the end of buf? + if i >= len(buf) { + break + } + + // skip past escaped characters + if buf[i] == '\\' && i+2 < len(buf) { + i += 2 + continue + } + + if buf[i] == ' ' { + fields = true + } + + // If we see a double quote, makes sure it is not escaped + if fields { + if !quoted && buf[i] == '=' { + i++ + equals++ + continue + } else if !quoted && buf[i] == ',' { + i++ + commas++ + continue + } else if buf[i] == '"' && equals > commas { + i++ + quoted = !quoted + continue + } + } + + if buf[i] == '\n' && !quoted { + break + } + + i++ + } + + return i, buf[start:i] +} + +// scanTo returns the end position in buf and the next consecutive block +// of bytes, starting from i and ending with stop byte, where stop byte +// has not been escaped. +// +// If there are leading spaces, they are skipped. +func scanTo(buf []byte, i int, stop byte) (int, []byte) { + start := i + for { + // reached the end of buf? + if i >= len(buf) { + break + } + + // Reached unescaped stop value? + if buf[i] == stop && (i == 0 || buf[i-1] != '\\') { + break + } + i++ + } + + return i, buf[start:i] +} + +// scanTo returns the end position in buf and the next consecutive block +// of bytes, starting from i and ending with stop byte. If there are leading +// spaces, they are skipped. +func scanToSpaceOr(buf []byte, i int, stop byte) (int, []byte) { + start := i + if buf[i] == stop || buf[i] == ' ' { + return i, buf[start:i] + } + + for { + i++ + if buf[i-1] == '\\' { + continue + } + + // reached the end of buf? + if i >= len(buf) { + return i, buf[start:i] + } + + // reached end of block? + if buf[i] == stop || buf[i] == ' ' { + return i, buf[start:i] + } + } +} + +func scanTagValue(buf []byte, i int) (int, []byte) { + start := i + for { + if i >= len(buf) { + break + } + + if buf[i] == ',' && buf[i-1] != '\\' { + break + } + i++ + } + if i > len(buf) { + return i, nil + } + return i, buf[start:i] +} + +func scanFieldValue(buf []byte, i int) (int, []byte) { + start := i + quoted := false + for i < len(buf) { + // Only escape char for a field value is a double-quote and backslash + if buf[i] == '\\' && i+1 < len(buf) && (buf[i+1] == '"' || buf[i+1] == '\\') { + i += 2 + continue + } + + // Quoted value? (e.g. string) + if buf[i] == '"' { + i++ + quoted = !quoted + continue + } + + if buf[i] == ',' && !quoted { + break + } + i++ + } + return i, buf[start:i] +} + +func EscapeMeasurement(in []byte) []byte { + for _, c := range measurementEscapeCodes { + if bytes.IndexByte(in, c.k[0]) != -1 { + in = bytes.Replace(in, c.k[:], c.esc[:], -1) + } + } + return in +} + +func unescapeMeasurement(in []byte) []byte { + if bytes.IndexByte(in, '\\') == -1 { + return in + } + + for i := range measurementEscapeCodes { + c := &measurementEscapeCodes[i] + if bytes.IndexByte(in, c.k[0]) != -1 { + in = bytes.Replace(in, c.esc[:], c.k[:], -1) + } + } + return in +} + +func escapeTag(in []byte) []byte { + for i := range tagEscapeCodes { + c := &tagEscapeCodes[i] + if bytes.IndexByte(in, c.k[0]) != -1 { + in = bytes.Replace(in, c.k[:], c.esc[:], -1) + } + } + return in +} + +func unescapeTag(in []byte) []byte { + if bytes.IndexByte(in, '\\') == -1 { + return in + } + + for i := range tagEscapeCodes { + c := &tagEscapeCodes[i] + if bytes.IndexByte(in, c.k[0]) != -1 { + in = bytes.Replace(in, c.esc[:], c.k[:], -1) + } + } + return in +} + +// escapeStringFieldReplacer replaces double quotes and backslashes +// with the same character preceded by a backslash. +// As of Go 1.7 this benchmarked better in allocations and CPU time +// compared to iterating through a string byte-by-byte and appending to a new byte slice, +// calling strings.Replace twice, and better than (*Regex).ReplaceAllString. +var escapeStringFieldReplacer = strings.NewReplacer(`"`, `\"`, `\`, `\\`) + +// EscapeStringField returns a copy of in with any double quotes or +// backslashes with escaped values. +func EscapeStringField(in string) string { + return escapeStringFieldReplacer.Replace(in) +} + +// unescapeStringField returns a copy of in with any escaped double-quotes +// or backslashes unescaped. +func unescapeStringField(in string) string { + if strings.IndexByte(in, '\\') == -1 { + return in + } + + var out []byte + i := 0 + for { + if i >= len(in) { + break + } + // unescape backslashes + if in[i] == '\\' && i+1 < len(in) && in[i+1] == '\\' { + out = append(out, '\\') + i += 2 + continue + } + // unescape double-quotes + if in[i] == '\\' && i+1 < len(in) && in[i+1] == '"' { + out = append(out, '"') + i += 2 + continue + } + out = append(out, in[i]) + i++ + + } + return string(out) +} + +// NewPoint returns a new point with the given measurement name, tags, fields and timestamp. If +// an unsupported field value (NaN, or +/-Inf) or out of range time is passed, this function +// returns an error. +func NewPoint(name string, tags Tags, fields Fields, t time.Time) (Point, error) { + key, err := pointKey(name, tags, fields, t) + if err != nil { + return nil, err + } + + return &point{ + key: key, + time: t, + fields: fields.MarshalBinary(), + }, nil +} + +// pointKey checks some basic requirements for valid points, and returns the +// key, along with an possible error. +func pointKey(measurement string, tags Tags, fields Fields, t time.Time) ([]byte, error) { + if len(fields) == 0 { + return nil, ErrPointMustHaveAField + } + + if !t.IsZero() { + if err := CheckTime(t); err != nil { + return nil, err + } + } + + for key, value := range fields { + switch value := value.(type) { + case float64: + // Ensure the caller validates and handles invalid field values + if math.IsInf(value, 0) { + return nil, fmt.Errorf("+/-Inf is an unsupported value for field %s", key) + } + if math.IsNaN(value) { + return nil, fmt.Errorf("NaN is an unsupported value for field %s", key) + } + case float32: + // Ensure the caller validates and handles invalid field values + if math.IsInf(float64(value), 0) { + return nil, fmt.Errorf("+/-Inf is an unsupported value for field %s", key) + } + if math.IsNaN(float64(value)) { + return nil, fmt.Errorf("NaN is an unsupported value for field %s", key) + } + } + if len(key) == 0 { + return nil, fmt.Errorf("all fields must have non-empty names") + } + } + + key := MakeKey([]byte(measurement), tags) + for field := range fields { + sz := seriesKeySize(key, []byte(field)) + if sz > MaxKeyLength { + return nil, fmt.Errorf("max key length exceeded: %v > %v", sz, MaxKeyLength) + } + } + + return key, nil +} + +func seriesKeySize(key, field []byte) int { + // 4 is the length of the tsm1.fieldKeySeparator constant. It's inlined here to avoid a circular + // dependency. + return len(key) + 4 + len(field) +} + +// NewPointFromBytes returns a new Point from a marshalled Point. +func NewPointFromBytes(b []byte) (Point, error) { + p := &point{} + if err := p.UnmarshalBinary(b); err != nil { + return nil, err + } + + // This does some basic validation to ensure there are fields and they + // can be unmarshalled as well. + iter := p.FieldIterator() + var hasField bool + for iter.Next() { + if len(iter.FieldKey()) == 0 { + continue + } + hasField = true + switch iter.Type() { + case Float: + _, err := iter.FloatValue() + if err != nil { + return nil, fmt.Errorf("unable to unmarshal field %s: %s", string(iter.FieldKey()), err) + } + case Integer: + _, err := iter.IntegerValue() + if err != nil { + return nil, fmt.Errorf("unable to unmarshal field %s: %s", string(iter.FieldKey()), err) + } + case Unsigned: + _, err := iter.UnsignedValue() + if err != nil { + return nil, fmt.Errorf("unable to unmarshal field %s: %s", string(iter.FieldKey()), err) + } + case String: + // Skip since this won't return an error + case Boolean: + _, err := iter.BooleanValue() + if err != nil { + return nil, fmt.Errorf("unable to unmarshal field %s: %s", string(iter.FieldKey()), err) + } + } + } + + if !hasField { + return nil, ErrPointMustHaveAField + } + + return p, nil +} + +// MustNewPoint returns a new point with the given measurement name, tags, fields and timestamp. If +// an unsupported field value (NaN) is passed, this function panics. +func MustNewPoint(name string, tags Tags, fields Fields, time time.Time) Point { + pt, err := NewPoint(name, tags, fields, time) + if err != nil { + panic(err.Error()) + } + return pt +} + +// Key returns the key (measurement joined with tags) of the point. +func (p *point) Key() []byte { + return p.key +} + +func (p *point) name() []byte { + _, name := scanTo(p.key, 0, ',') + return name +} + +func (p *point) Name() []byte { + return escape.Unescape(p.name()) +} + +// SetName updates the measurement name for the point. +func (p *point) SetName(name string) { + p.cachedName = "" + p.key = MakeKey([]byte(name), p.Tags()) +} + +// Time return the timestamp for the point. +func (p *point) Time() time.Time { + return p.time +} + +// SetTime updates the timestamp for the point. +func (p *point) SetTime(t time.Time) { + p.time = t +} + +// Round will round the timestamp of the point to the given duration. +func (p *point) Round(d time.Duration) { + p.time = p.time.Round(d) +} + +// Tags returns the tag set for the point. +func (p *point) Tags() Tags { + if p.cachedTags != nil { + return p.cachedTags + } + p.cachedTags = parseTags(p.key, nil) + return p.cachedTags +} + +func (p *point) ForEachTag(fn func(k, v []byte) bool) { + walkTags(p.key, fn) +} + +func (p *point) HasTag(tag []byte) bool { + if len(p.key) == 0 { + return false + } + + var exists bool + walkTags(p.key, func(key, value []byte) bool { + if bytes.Equal(tag, key) { + exists = true + return false + } + return true + }) + + return exists +} + +func walkTags(buf []byte, fn func(key, value []byte) bool) { + if len(buf) == 0 { + return + } + + pos, name := scanTo(buf, 0, ',') + + // it's an empty key, so there are no tags + if len(name) == 0 { + return + } + + hasEscape := bytes.IndexByte(buf, '\\') != -1 + i := pos + 1 + var key, value []byte + for { + if i >= len(buf) { + break + } + i, key = scanTo(buf, i, '=') + i, value = scanTagValue(buf, i+1) + + if len(value) == 0 { + continue + } + + if hasEscape { + if !fn(unescapeTag(key), unescapeTag(value)) { + return + } + } else { + if !fn(key, value) { + return + } + } + + i++ + } +} + +// walkFields walks each field key and value via fn. If fn returns false, the iteration +// is stopped. The values are the raw byte slices and not the converted types. +func walkFields(buf []byte, fn func(key, value []byte) bool) error { + var i int + var key, val []byte + for len(buf) > 0 { + i, key = scanTo(buf, 0, '=') + if i > len(buf)-2 { + return fmt.Errorf("invalid value: field-key=%s", key) + } + buf = buf[i+1:] + i, val = scanFieldValue(buf, 0) + buf = buf[i:] + if !fn(key, val) { + break + } + + // slice off comma + if len(buf) > 0 { + buf = buf[1:] + } + } + return nil +} + +// parseTags parses buf into the provided destination tags, returning destination +// Tags, which may have a different length and capacity. +func parseTags(buf []byte, dst Tags) Tags { + if len(buf) == 0 { + return nil + } + + n := bytes.Count(buf, []byte(",")) + if cap(dst) < n { + dst = make(Tags, n) + } else { + dst = dst[:n] + } + + // Ensure existing behaviour when point has no tags and nil slice passed in. + if dst == nil { + dst = Tags{} + } + + // Series keys can contain escaped commas, therefore the number of commas + // in a series key only gives an estimation of the upper bound on the number + // of tags. + var i int + walkTags(buf, func(key, value []byte) bool { + dst[i].Key, dst[i].Value = key, value + i++ + return true + }) + return dst[:i] +} + +// MakeKey creates a key for a set of tags. +func MakeKey(name []byte, tags Tags) []byte { + return AppendMakeKey(nil, name, tags) +} + +// AppendMakeKey appends the key derived from name and tags to dst and returns the extended buffer. +func AppendMakeKey(dst []byte, name []byte, tags Tags) []byte { + // unescape the name and then re-escape it to avoid double escaping. + // The key should always be stored in escaped form. + dst = append(dst, EscapeMeasurement(unescapeMeasurement(name))...) + dst = tags.AppendHashKey(dst) + return dst +} + +// SetTags replaces the tags for the point. +func (p *point) SetTags(tags Tags) { + p.key = MakeKey(p.Name(), tags) + p.cachedTags = tags +} + +// AddTag adds or replaces a tag value for a point. +func (p *point) AddTag(key, value string) { + tags := p.Tags() + tags = append(tags, Tag{Key: []byte(key), Value: []byte(value)}) + sort.Sort(tags) + p.cachedTags = tags + p.key = MakeKey(p.Name(), tags) +} + +// Fields returns the fields for the point. +func (p *point) Fields() (Fields, error) { + if p.cachedFields != nil { + return p.cachedFields, nil + } + cf, err := p.unmarshalBinary() + if err != nil { + return nil, err + } + p.cachedFields = cf + return p.cachedFields, nil +} + +// SetPrecision will round a time to the specified precision. +func (p *point) SetPrecision(precision string) { + switch precision { + case "n", "ns": + case "u", "us": + p.SetTime(p.Time().Truncate(time.Microsecond)) + case "ms": + p.SetTime(p.Time().Truncate(time.Millisecond)) + case "s": + p.SetTime(p.Time().Truncate(time.Second)) + case "m": + p.SetTime(p.Time().Truncate(time.Minute)) + case "h": + p.SetTime(p.Time().Truncate(time.Hour)) + } +} + +// String returns the string representation of the point. +func (p *point) String() string { + if p.Time().IsZero() { + return string(p.Key()) + " " + string(p.fields) + } + return string(p.Key()) + " " + string(p.fields) + " " + strconv.FormatInt(p.UnixNano(), 10) +} + +// AppendString appends the string representation of the point to buf. +func (p *point) AppendString(buf []byte) []byte { + buf = append(buf, p.key...) + buf = append(buf, ' ') + buf = append(buf, p.fields...) + + if !p.time.IsZero() { + buf = append(buf, ' ') + buf = strconv.AppendInt(buf, p.UnixNano(), 10) + } + + return buf +} + +// StringSize returns the length of the string that would be returned by String(). +func (p *point) StringSize() int { + size := len(p.key) + len(p.fields) + 1 + + if !p.time.IsZero() { + digits := 1 // even "0" has one digit + t := p.UnixNano() + if t < 0 { + // account for negative sign, then negate + digits++ + t = -t + } + for t > 9 { // already accounted for one digit + digits++ + t /= 10 + } + size += digits + 1 // digits and a space + } + + return size +} + +// MarshalBinary returns a binary representation of the point. +func (p *point) MarshalBinary() ([]byte, error) { + if len(p.fields) == 0 { + return nil, ErrPointMustHaveAField + } + + tb, err := p.time.MarshalBinary() + if err != nil { + return nil, err + } + + b := make([]byte, 8+len(p.key)+len(p.fields)+len(tb)) + i := 0 + + binary.BigEndian.PutUint32(b[i:], uint32(len(p.key))) + i += 4 + + i += copy(b[i:], p.key) + + binary.BigEndian.PutUint32(b[i:i+4], uint32(len(p.fields))) + i += 4 + + i += copy(b[i:], p.fields) + + copy(b[i:], tb) + return b, nil +} + +// UnmarshalBinary decodes a binary representation of the point into a point struct. +func (p *point) UnmarshalBinary(b []byte) error { + var n int + + // Read key length. + if len(b) < 4 { + return io.ErrShortBuffer + } + n, b = int(binary.BigEndian.Uint32(b[:4])), b[4:] + + // Read key. + if len(b) < n { + return io.ErrShortBuffer + } + p.key, b = b[:n], b[n:] + + // Read fields length. + if len(b) < 4 { + return io.ErrShortBuffer + } + n, b = int(binary.BigEndian.Uint32(b[:4])), b[4:] + + // Read fields. + if len(b) < n { + return io.ErrShortBuffer + } + p.fields, b = b[:n], b[n:] + + // Read timestamp. + return p.time.UnmarshalBinary(b) +} + +// PrecisionString returns a string representation of the point. If there +// is a timestamp associated with the point then it will be specified in the +// given unit. +func (p *point) PrecisionString(precision string) string { + if p.Time().IsZero() { + return fmt.Sprintf("%s %s", p.Key(), string(p.fields)) + } + return fmt.Sprintf("%s %s %d", p.Key(), string(p.fields), + p.UnixNano()/GetPrecisionMultiplier(precision)) +} + +// RoundedString returns a string representation of the point. If there +// is a timestamp associated with the point, then it will be rounded to the +// given duration. +func (p *point) RoundedString(d time.Duration) string { + if p.Time().IsZero() { + return fmt.Sprintf("%s %s", p.Key(), string(p.fields)) + } + return fmt.Sprintf("%s %s %d", p.Key(), string(p.fields), + p.time.Round(d).UnixNano()) +} + +func (p *point) unmarshalBinary() (Fields, error) { + iter := p.FieldIterator() + fields := make(Fields, 8) + for iter.Next() { + if len(iter.FieldKey()) == 0 { + continue + } + switch iter.Type() { + case Float: + v, err := iter.FloatValue() + if err != nil { + return nil, fmt.Errorf("unable to unmarshal field %s: %s", string(iter.FieldKey()), err) + } + fields[string(iter.FieldKey())] = v + case Integer: + v, err := iter.IntegerValue() + if err != nil { + return nil, fmt.Errorf("unable to unmarshal field %s: %s", string(iter.FieldKey()), err) + } + fields[string(iter.FieldKey())] = v + case Unsigned: + v, err := iter.UnsignedValue() + if err != nil { + return nil, fmt.Errorf("unable to unmarshal field %s: %s", string(iter.FieldKey()), err) + } + fields[string(iter.FieldKey())] = v + case String: + fields[string(iter.FieldKey())] = iter.StringValue() + case Boolean: + v, err := iter.BooleanValue() + if err != nil { + return nil, fmt.Errorf("unable to unmarshal field %s: %s", string(iter.FieldKey()), err) + } + fields[string(iter.FieldKey())] = v + } + } + return fields, nil +} + +// HashID returns a non-cryptographic checksum of the point's key. +func (p *point) HashID() uint64 { + h := NewInlineFNV64a() + h.Write(p.key) + sum := h.Sum64() + return sum +} + +// UnixNano returns the timestamp of the point as nanoseconds since Unix epoch. +func (p *point) UnixNano() int64 { + return p.Time().UnixNano() +} + +// Split will attempt to return multiple points with the same timestamp whose +// string representations are no longer than size. Points with a single field or +// a point without a timestamp may exceed the requested size. +func (p *point) Split(size int) []Point { + if p.time.IsZero() || p.StringSize() <= size { + return []Point{p} + } + + // key string, timestamp string, spaces + size -= len(p.key) + len(strconv.FormatInt(p.time.UnixNano(), 10)) + 2 + + var points []Point + var start, cur int + + for cur < len(p.fields) { + end, _ := scanTo(p.fields, cur, '=') + end, _ = scanFieldValue(p.fields, end+1) + + if cur > start && end-start > size { + points = append(points, &point{ + key: p.key, + time: p.time, + fields: p.fields[start : cur-1], + }) + start = cur + } + + cur = end + 1 + } + + points = append(points, &point{ + key: p.key, + time: p.time, + fields: p.fields[start:], + }) + + return points +} + +// Tag represents a single key/value tag pair. +type Tag struct { + Key []byte + Value []byte +} + +// NewTag returns a new Tag. +func NewTag(key, value []byte) Tag { + return Tag{ + Key: key, + Value: value, + } +} + +// Size returns the size of the key and value. +func (t Tag) Size() int { return len(t.Key) + len(t.Value) } + +// Clone returns a shallow copy of Tag. +// +// Tags associated with a Point created by ParsePointsWithPrecision will hold references to the byte slice that was parsed. +// Use Clone to create a Tag with new byte slices that do not refer to the argument to ParsePointsWithPrecision. +func (t Tag) Clone() Tag { + other := Tag{ + Key: make([]byte, len(t.Key)), + Value: make([]byte, len(t.Value)), + } + + copy(other.Key, t.Key) + copy(other.Value, t.Value) + + return other +} + +// String returns the string reprsentation of the tag. +func (t *Tag) String() string { + var buf bytes.Buffer + buf.WriteByte('{') + buf.WriteString(string(t.Key)) + buf.WriteByte(' ') + buf.WriteString(string(t.Value)) + buf.WriteByte('}') + return buf.String() +} + +// Tags represents a sorted list of tags. +type Tags []Tag + +// NewTags returns a new Tags from a map. +func NewTags(m map[string]string) Tags { + if len(m) == 0 { + return nil + } + a := make(Tags, 0, len(m)) + for k, v := range m { + a = append(a, NewTag([]byte(k), []byte(v))) + } + sort.Sort(a) + return a +} + +// NewTagsKeyValues returns a new Tags from a list of key, value pairs, +// ensuring the returned result is correctly sorted. Duplicate keys are removed, +// however, it which duplicate that remains is undefined. +// NewTagsKeyValues will return ErrInvalidKevValuePairs if len(kvs) is not even. +// If the input is guaranteed to be even, the error can be safely ignored. +// If a has enough capacity, it will be reused. +func NewTagsKeyValues(a Tags, kv ...[]byte) (Tags, error) { + if len(kv)%2 == 1 { + return nil, ErrInvalidKevValuePairs + } + if len(kv) == 0 { + return nil, nil + } + + l := len(kv) / 2 + if cap(a) < l { + a = make(Tags, 0, l) + } else { + a = a[:0] + } + + for i := 0; i < len(kv)-1; i += 2 { + a = append(a, NewTag(kv[i], kv[i+1])) + } + + if !a.sorted() { + sort.Sort(a) + } + + // remove duplicates + j := 0 + for i := 0; i < len(a)-1; i++ { + if !bytes.Equal(a[i].Key, a[i+1].Key) { + if j != i { + // only copy if j has deviated from i, indicating duplicates + a[j] = a[i] + } + j++ + } + } + + a[j] = a[len(a)-1] + j++ + + return a[:j], nil +} + +// NewTagsKeyValuesStrings is equivalent to NewTagsKeyValues, except that +// it will allocate new byte slices for each key, value pair. +func NewTagsKeyValuesStrings(a Tags, kvs ...string) (Tags, error) { + kv := make([][]byte, len(kvs)) + for i := range kvs { + kv[i] = []byte(kvs[i]) + } + return NewTagsKeyValues(a, kv...) +} + +// Keys returns the list of keys for a tag set. +func (a Tags) Keys() []string { + if len(a) == 0 { + return nil + } + keys := make([]string, len(a)) + for i, tag := range a { + keys[i] = string(tag.Key) + } + return keys +} + +// Values returns the list of values for a tag set. +func (a Tags) Values() []string { + if len(a) == 0 { + return nil + } + values := make([]string, len(a)) + for i, tag := range a { + values[i] = string(tag.Value) + } + return values +} + +// String returns the string representation of the tags. +func (a Tags) String() string { + var buf bytes.Buffer + buf.WriteByte('[') + for i := range a { + buf.WriteString(a[i].String()) + if i < len(a)-1 { + buf.WriteByte(' ') + } + } + buf.WriteByte(']') + return buf.String() +} + +// Size returns the number of bytes needed to store all tags. Note, this is +// the number of bytes needed to store all keys and values and does not account +// for data structures or delimiters for example. +func (a Tags) Size() int { + var total int + for i := range a { + total += a[i].Size() + } + return total +} + +// Clone returns a copy of the slice where the elements are a result of calling `Clone` on the original elements +// +// Tags associated with a Point created by ParsePointsWithPrecision will hold references to the byte slice that was parsed. +// Use Clone to create Tags with new byte slices that do not refer to the argument to ParsePointsWithPrecision. +func (a Tags) Clone() Tags { + if len(a) == 0 { + return nil + } + + others := make(Tags, len(a)) + for i := range a { + others[i] = a[i].Clone() + } + + return others +} + +// KeyValues returns the Tags as a list of key, value pairs, +// maintaining the original order of a. v will be used if it has +// capacity. +func (a Tags) KeyValues(v [][]byte) [][]byte { + l := a.Len() * 2 + if cap(v) < l { + v = make([][]byte, 0, l) + } else { + v = v[:0] + } + for i := range a { + v = append(v, a[i].Key, a[i].Value) + } + return v +} + +// sorted returns true if a is sorted and is an optimization +// to avoid an allocation when calling sort.IsSorted, improving +// performance as much as 50%. +func (a Tags) sorted() bool { + for i := len(a) - 1; i > 0; i-- { + if bytes.Compare(a[i].Key, a[i-1].Key) == -1 { + return false + } + } + return true +} + +func (a Tags) Len() int { return len(a) } +func (a Tags) Less(i, j int) bool { return bytes.Compare(a[i].Key, a[j].Key) == -1 } +func (a Tags) Swap(i, j int) { a[i], a[j] = a[j], a[i] } + +// Equal returns true if a equals other. +func (a Tags) Equal(other Tags) bool { + if len(a) != len(other) { + return false + } + for i := range a { + if !bytes.Equal(a[i].Key, other[i].Key) || !bytes.Equal(a[i].Value, other[i].Value) { + return false + } + } + return true +} + +// CompareTags returns -1 if a < b, 1 if a > b, and 0 if a == b. +func CompareTags(a, b Tags) int { + // Compare each key & value until a mismatch. + for i := 0; i < len(a) && i < len(b); i++ { + if cmp := bytes.Compare(a[i].Key, b[i].Key); cmp != 0 { + return cmp + } + if cmp := bytes.Compare(a[i].Value, b[i].Value); cmp != 0 { + return cmp + } + } + + // If all tags are equal up to this point then return shorter tagset. + if len(a) < len(b) { + return -1 + } else if len(a) > len(b) { + return 1 + } + + // All tags are equal. + return 0 +} + +// Get returns the value for a key. +func (a Tags) Get(key []byte) []byte { + // OPTIMIZE: Use sort.Search if tagset is large. + + for _, t := range a { + if bytes.Equal(t.Key, key) { + return t.Value + } + } + return nil +} + +// GetString returns the string value for a string key. +func (a Tags) GetString(key string) string { + return string(a.Get([]byte(key))) +} + +// Set sets the value for a key. +func (a *Tags) Set(key, value []byte) { + for i, t := range *a { + if bytes.Equal(t.Key, key) { + (*a)[i].Value = value + return + } + } + *a = append(*a, Tag{Key: key, Value: value}) + sort.Sort(*a) +} + +// SetString sets the string value for a string key. +func (a *Tags) SetString(key, value string) { + a.Set([]byte(key), []byte(value)) +} + +// Delete removes a tag by key. +func (a *Tags) Delete(key []byte) { + for i, t := range *a { + if bytes.Equal(t.Key, key) { + copy((*a)[i:], (*a)[i+1:]) + (*a)[len(*a)-1] = Tag{} + *a = (*a)[:len(*a)-1] + return + } + } +} + +// Map returns a map representation of the tags. +func (a Tags) Map() map[string]string { + m := make(map[string]string, len(a)) + for _, t := range a { + m[string(t.Key)] = string(t.Value) + } + return m +} + +// Merge merges the tags combining the two. If both define a tag with the +// same key, the merged value overwrites the old value. +// A new map is returned. +func (a Tags) Merge(other map[string]string) Tags { + merged := make(map[string]string, len(a)+len(other)) + for _, t := range a { + merged[string(t.Key)] = string(t.Value) + } + for k, v := range other { + merged[k] = v + } + return NewTags(merged) +} + +// HashKey hashes all of a tag's keys. +func (a Tags) HashKey() []byte { + return a.AppendHashKey(nil) +} + +func (a Tags) needsEscape() bool { + for i := range a { + t := &a[i] + for j := range tagEscapeCodes { + c := &tagEscapeCodes[j] + if bytes.IndexByte(t.Key, c.k[0]) != -1 || bytes.IndexByte(t.Value, c.k[0]) != -1 { + return true + } + } + } + return false +} + +// AppendHashKey appends the result of hashing all of a tag's keys and values to dst and returns the extended buffer. +func (a Tags) AppendHashKey(dst []byte) []byte { + // Empty maps marshal to empty bytes. + if len(a) == 0 { + return dst + } + + // Type invariant: Tags are sorted + + sz := 0 + var escaped Tags + if a.needsEscape() { + var tmp [20]Tag + if len(a) < len(tmp) { + escaped = tmp[:len(a)] + } else { + escaped = make(Tags, len(a)) + } + + for i := range a { + t := &a[i] + nt := &escaped[i] + nt.Key = escapeTag(t.Key) + nt.Value = escapeTag(t.Value) + sz += len(nt.Key) + len(nt.Value) + } + } else { + sz = a.Size() + escaped = a + } + + sz += len(escaped) + (len(escaped) * 2) // separators + + // Generate marshaled bytes. + if cap(dst)-len(dst) < sz { + nd := make([]byte, len(dst), len(dst)+sz) + copy(nd, dst) + dst = nd + } + buf := dst[len(dst) : len(dst)+sz] + idx := 0 + for i := range escaped { + k := &escaped[i] + if len(k.Value) == 0 { + continue + } + buf[idx] = ',' + idx++ + copy(buf[idx:], k.Key) + idx += len(k.Key) + buf[idx] = '=' + idx++ + copy(buf[idx:], k.Value) + idx += len(k.Value) + } + return dst[:len(dst)+idx] +} + +// CopyTags returns a shallow copy of tags. +func CopyTags(a Tags) Tags { + other := make(Tags, len(a)) + copy(other, a) + return other +} + +// DeepCopyTags returns a deep copy of tags. +func DeepCopyTags(a Tags) Tags { + // Calculate size of keys/values in bytes. + var n int + for _, t := range a { + n += len(t.Key) + len(t.Value) + } + + // Build single allocation for all key/values. + buf := make([]byte, n) + + // Copy tags to new set. + other := make(Tags, len(a)) + for i, t := range a { + copy(buf, t.Key) + other[i].Key, buf = buf[:len(t.Key)], buf[len(t.Key):] + + copy(buf, t.Value) + other[i].Value, buf = buf[:len(t.Value)], buf[len(t.Value):] + } + + return other +} + +// Fields represents a mapping between a Point's field names and their +// values. +type Fields map[string]interface{} + +// FieldIterator returns a FieldIterator that can be used to traverse the +// fields of a point without constructing the in-memory map. +func (p *point) FieldIterator() FieldIterator { + p.Reset() + return p +} + +type fieldIterator struct { + start, end int + key, keybuf []byte + valueBuf []byte + fieldType FieldType +} + +// Next indicates whether there any fields remaining. +func (p *point) Next() bool { + p.it.start = p.it.end + if p.it.start >= len(p.fields) { + return false + } + + p.it.end, p.it.key = scanTo(p.fields, p.it.start, '=') + if escape.IsEscaped(p.it.key) { + p.it.keybuf = escape.AppendUnescaped(p.it.keybuf[:0], p.it.key) + p.it.key = p.it.keybuf + } + + p.it.end, p.it.valueBuf = scanFieldValue(p.fields, p.it.end+1) + p.it.end++ + + if len(p.it.valueBuf) == 0 { + p.it.fieldType = Empty + return true + } + + c := p.it.valueBuf[0] + + if c == '"' { + p.it.fieldType = String + return true + } + + if strings.IndexByte(`0123456789-.nNiIu`, c) >= 0 { + if p.it.valueBuf[len(p.it.valueBuf)-1] == 'i' { + p.it.fieldType = Integer + p.it.valueBuf = p.it.valueBuf[:len(p.it.valueBuf)-1] + } else if p.it.valueBuf[len(p.it.valueBuf)-1] == 'u' { + p.it.fieldType = Unsigned + p.it.valueBuf = p.it.valueBuf[:len(p.it.valueBuf)-1] + } else { + p.it.fieldType = Float + } + return true + } + + // to keep the same behavior that currently exists, default to boolean + p.it.fieldType = Boolean + return true +} + +// FieldKey returns the key of the current field. +func (p *point) FieldKey() []byte { + return p.it.key +} + +// Type returns the FieldType of the current field. +func (p *point) Type() FieldType { + return p.it.fieldType +} + +// StringValue returns the string value of the current field. +func (p *point) StringValue() string { + return unescapeStringField(string(p.it.valueBuf[1 : len(p.it.valueBuf)-1])) +} + +// IntegerValue returns the integer value of the current field. +func (p *point) IntegerValue() (int64, error) { + n, err := parseIntBytes(p.it.valueBuf, 10, 64) + if err != nil { + return 0, fmt.Errorf("unable to parse integer value %q: %v", p.it.valueBuf, err) + } + return n, nil +} + +// UnsignedValue returns the unsigned value of the current field. +func (p *point) UnsignedValue() (uint64, error) { + n, err := parseUintBytes(p.it.valueBuf, 10, 64) + if err != nil { + return 0, fmt.Errorf("unable to parse unsigned value %q: %v", p.it.valueBuf, err) + } + return n, nil +} + +// BooleanValue returns the boolean value of the current field. +func (p *point) BooleanValue() (bool, error) { + b, err := parseBoolBytes(p.it.valueBuf) + if err != nil { + return false, fmt.Errorf("unable to parse bool value %q: %v", p.it.valueBuf, err) + } + return b, nil +} + +// FloatValue returns the float value of the current field. +func (p *point) FloatValue() (float64, error) { + f, err := parseFloatBytes(p.it.valueBuf, 64) + if err != nil { + return 0, fmt.Errorf("unable to parse floating point value %q: %v", p.it.valueBuf, err) + } + return f, nil +} + +// Reset resets the iterator to its initial state. +func (p *point) Reset() { + p.it.fieldType = Empty + p.it.key = nil + p.it.valueBuf = nil + p.it.start = 0 + p.it.end = 0 +} + +// MarshalBinary encodes all the fields to their proper type and returns the binary +// representation +// NOTE: uint64 is specifically not supported due to potential overflow when we decode +// again later to an int64 +// NOTE2: uint is accepted, and may be 64 bits, and is for some reason accepted... +func (p Fields) MarshalBinary() []byte { + sz := len(p) - 1 // separators + keys := make([]string, 0, len(p)) + for k := range p { + keys = append(keys, k) + sz += len(k) + } + + // Only sort if we have multiple fields to sort. + // This length check removes an allocation incurred by the sort. + if len(keys) > 1 { + sort.Strings(keys) + } + + b := make([]byte, 0, sz) + for i, k := range keys { + if i > 0 { + b = append(b, ',') + } + b = appendField(b, k, p[k]) + } + return b +} + +func appendField(b []byte, k string, v interface{}) []byte { + b = append(b, []byte(escape.String(k))...) + b = append(b, '=') + + // check popular types first + switch v := v.(type) { + case float64: + b = strconv.AppendFloat(b, v, 'f', -1, 64) + case int64: + b = strconv.AppendInt(b, v, 10) + b = append(b, 'i') + case string: + b = append(b, '"') + b = append(b, []byte(EscapeStringField(v))...) + b = append(b, '"') + case bool: + b = strconv.AppendBool(b, v) + case int32: + b = strconv.AppendInt(b, int64(v), 10) + b = append(b, 'i') + case int16: + b = strconv.AppendInt(b, int64(v), 10) + b = append(b, 'i') + case int8: + b = strconv.AppendInt(b, int64(v), 10) + b = append(b, 'i') + case int: + b = strconv.AppendInt(b, int64(v), 10) + b = append(b, 'i') + case uint64: + b = strconv.AppendUint(b, v, 10) + b = append(b, 'u') + case uint32: + b = strconv.AppendInt(b, int64(v), 10) + b = append(b, 'i') + case uint16: + b = strconv.AppendInt(b, int64(v), 10) + b = append(b, 'i') + case uint8: + b = strconv.AppendInt(b, int64(v), 10) + b = append(b, 'i') + case uint: + // TODO: 'uint' should be converted to writing as an unsigned integer, + // but we cannot since that would break backwards compatibility. + b = strconv.AppendInt(b, int64(v), 10) + b = append(b, 'i') + case float32: + b = strconv.AppendFloat(b, float64(v), 'f', -1, 32) + case []byte: + b = append(b, v...) + case nil: + // skip + default: + // Can't determine the type, so convert to string + b = append(b, '"') + b = append(b, []byte(EscapeStringField(fmt.Sprintf("%v", v)))...) + b = append(b, '"') + + } + + return b +} + +// ValidToken returns true if the provided token is a valid unicode string, and +// only contains printable, non-replacement characters. +func ValidToken(a []byte) bool { + if !utf8.Valid(a) { + return false + } + + for _, r := range string(a) { + if !unicode.IsPrint(r) || r == unicode.ReplacementChar { + return false + } + } + return true +} + +// ValidTagTokens returns true if all the provided tag key and values are +// valid. +// +// ValidTagTokens does not validate the special tag keys used to represent the +// measurement name and field key, but it does validate the associated values. +func ValidTagTokens(tags Tags) bool { + for _, tag := range tags { + // Validate all external tag keys. + if !bytes.Equal(tag.Key, MeasurementTagKeyBytes) && !bytes.Equal(tag.Key, FieldKeyTagKeyBytes) && !ValidToken(tag.Key) { + return false + } + + // Validate all tag values (this will also validate the field key, which is a tag value for the special field key tag key). + if !ValidToken(tag.Value) { + return false + } + } + return true +} + +// ValidKeyTokens returns true if the measurement name and all tags are valid. +func ValidKeyTokens(name string, tags Tags) bool { + if !ValidToken([]byte(name)) { + return false + } + + return ValidTagTokens(tags) +} + +var ( + errInvalidUTF8 = errors.New("invalid UTF-8 sequence") + errNonPrintable = errors.New("non-printable character") + errReplacementChar = fmt.Errorf("unicode replacement char %q cannot be used", unicode.ReplacementChar) +) + +// CheckToken returns an error when the given token is invalid +// for use as a tag or value key or measurement name. +func CheckToken(a []byte) error { + if !utf8.Valid(a) { + return errInvalidUTF8 + } + + for _, r := range string(a) { + if !unicode.IsPrint(r) { + return errNonPrintable + } + if r == unicode.ReplacementChar { + return errReplacementChar + } + } + return nil +} diff --git a/vendor/github.com/influxdata/influxdb/v2/models/rows.go b/vendor/github.com/influxdata/influxdb/v2/models/rows.go new file mode 100644 index 00000000000..c087a4882d0 --- /dev/null +++ b/vendor/github.com/influxdata/influxdb/v2/models/rows.go @@ -0,0 +1,62 @@ +package models + +import ( + "sort" +) + +// Row represents a single row returned from the execution of a statement. +type Row struct { + Name string `json:"name,omitempty"` + Tags map[string]string `json:"tags,omitempty"` + Columns []string `json:"columns,omitempty"` + Values [][]interface{} `json:"values,omitempty"` + Partial bool `json:"partial,omitempty"` +} + +// SameSeries returns true if r contains values for the same series as o. +func (r *Row) SameSeries(o *Row) bool { + return r.tagsHash() == o.tagsHash() && r.Name == o.Name +} + +// tagsHash returns a hash of tag key/value pairs. +func (r *Row) tagsHash() uint64 { + h := NewInlineFNV64a() + keys := r.tagsKeys() + for _, k := range keys { + h.Write([]byte(k)) + h.Write([]byte(r.Tags[k])) + } + return h.Sum64() +} + +// tagKeys returns a sorted list of tag keys. +func (r *Row) tagsKeys() []string { + a := make([]string, 0, len(r.Tags)) + for k := range r.Tags { + a = append(a, k) + } + sort.Strings(a) + return a +} + +// Rows represents a collection of rows. Rows implements sort.Interface. +type Rows []*Row + +// Len implements sort.Interface. +func (p Rows) Len() int { return len(p) } + +// Less implements sort.Interface. +func (p Rows) Less(i, j int) bool { + // Sort by name first. + if p[i].Name != p[j].Name { + return p[i].Name < p[j].Name + } + + // Sort by tag set hash. Tags don't have a meaningful sort order so we + // just compute a hash and sort by that instead. This allows the tests + // to receive rows in a predictable order every time. + return p[i].tagsHash() < p[j].tagsHash() +} + +// Swap implements sort.Interface. +func (p Rows) Swap(i, j int) { p[i], p[j] = p[j], p[i] } diff --git a/vendor/github.com/influxdata/influxdb/v2/models/statistic.go b/vendor/github.com/influxdata/influxdb/v2/models/statistic.go new file mode 100644 index 00000000000..9107d9025ab --- /dev/null +++ b/vendor/github.com/influxdata/influxdb/v2/models/statistic.go @@ -0,0 +1,33 @@ +package models + +// Statistic is the representation of a statistic used by the monitoring service. +type Statistic struct { + Name string `json:"name"` + Tags map[string]string `json:"tags"` + Values map[string]interface{} `json:"values"` +} + +// StatisticTags is a map that can be merged with others without causing +// mutations to either map. +type StatisticTags map[string]string + +// Merge creates a new map containing the merged contents of tags and t. +// If both tags and the receiver map contain the same key, the value in tags +// is used in the resulting map. +// +// Merge always returns a usable map. +func (t StatisticTags) Merge(tags map[string]string) map[string]string { + // Add everything in tags to the result. + out := make(map[string]string, len(tags)) + for k, v := range tags { + out[k] = v + } + + // Only add values from t that don't appear in tags. + for k, v := range t { + if _, ok := tags[k]; !ok { + out[k] = v + } + } + return out +} diff --git a/vendor/github.com/influxdata/influxdb/v2/models/tagkeysset.go b/vendor/github.com/influxdata/influxdb/v2/models/tagkeysset.go new file mode 100644 index 00000000000..d165bdce337 --- /dev/null +++ b/vendor/github.com/influxdata/influxdb/v2/models/tagkeysset.go @@ -0,0 +1,156 @@ +package models + +import ( + "bytes" + "strings" +) + +// TagKeysSet provides set operations for combining Tags. +type TagKeysSet struct { + i int + keys [2][][]byte + tmp [][]byte +} + +// Clear removes all the elements of TagKeysSet and ensures all internal +// buffers are reset. +func (set *TagKeysSet) Clear() { + set.clear(set.keys[0]) + set.clear(set.keys[1]) + set.clear(set.tmp) + set.i = 0 + set.keys[0] = set.keys[0][:0] +} + +func (set *TagKeysSet) clear(b [][]byte) { + b = b[:cap(b)] + for i := range b { + b[i] = nil + } +} + +// KeysBytes returns the merged keys in lexicographical order. +// The slice is valid until the next call to UnionKeys, UnionBytes or Reset. +func (set *TagKeysSet) KeysBytes() [][]byte { + return set.keys[set.i&1] +} + +// Keys returns a copy of the merged keys in lexicographical order. +func (set *TagKeysSet) Keys() []string { + keys := set.KeysBytes() + s := make([]string, 0, len(keys)) + for i := range keys { + s = append(s, string(keys[i])) + } + return s +} + +func (set *TagKeysSet) String() string { + var s []string + for _, k := range set.KeysBytes() { + s = append(s, string(k)) + } + return strings.Join(s, ",") +} + +// IsSupersetKeys returns true if the TagKeysSet is a superset of all the keys +// contained in other. +func (set *TagKeysSet) IsSupersetKeys(other Tags) bool { + keys := set.keys[set.i&1] + i, j := 0, 0 + for i < len(keys) && j < len(other) { + if cmp := bytes.Compare(keys[i], other[j].Key); cmp > 0 { + return false + } else if cmp == 0 { + j++ + } + i++ + } + + return j == len(other) +} + +// IsSupersetBytes returns true if the TagKeysSet is a superset of all the keys +// in other. +// Other must be lexicographically sorted or the results are undefined. +func (set *TagKeysSet) IsSupersetBytes(other [][]byte) bool { + keys := set.keys[set.i&1] + i, j := 0, 0 + for i < len(keys) && j < len(other) { + if cmp := bytes.Compare(keys[i], other[j]); cmp > 0 { + return false + } else if cmp == 0 { + j++ + } + i++ + } + + return j == len(other) +} + +// UnionKeys updates the set so that it is the union of itself and all the +// keys contained in other. +func (set *TagKeysSet) UnionKeys(other Tags) { + if set.IsSupersetKeys(other) { + return + } + + if l := len(other); cap(set.tmp) < l { + set.tmp = make([][]byte, l) + } else { + set.tmp = set.tmp[:l] + } + + for i := range other { + set.tmp[i] = other[i].Key + } + + set.merge(set.tmp) +} + +// UnionBytes updates the set so that it is the union of itself and all the +// keys contained in other. +// Other must be lexicographically sorted or the results are undefined. +func (set *TagKeysSet) UnionBytes(other [][]byte) { + if set.IsSupersetBytes(other) { + return + } + + set.merge(other) +} + +func (set *TagKeysSet) merge(in [][]byte) { + keys := set.keys[set.i&1] + l := len(keys) + len(in) + set.i = (set.i + 1) & 1 + keya := set.keys[set.i&1] + if cap(keya) < l { + keya = make([][]byte, 0, l) + } else { + keya = keya[:0] + } + + i, j := 0, 0 + for i < len(keys) && j < len(in) { + ki, kj := keys[i], in[j] + if cmp := bytes.Compare(ki, kj); cmp < 0 { + i++ + } else if cmp > 0 { + ki = kj + j++ + } else { + i++ + j++ + } + + keya = append(keya, ki) + } + + if i < len(keys) { + keya = append(keya, keys[i:]...) + } else if j < len(in) { + keya = append(keya, in[j:]...) + } + + set.keys[set.i&1] = keya +} diff --git a/vendor/github.com/influxdata/influxdb/v2/models/time.go b/vendor/github.com/influxdata/influxdb/v2/models/time.go new file mode 100644 index 00000000000..297892c6da3 --- /dev/null +++ b/vendor/github.com/influxdata/influxdb/v2/models/time.go @@ -0,0 +1,74 @@ +package models + +// Helper time methods since parsing time can easily overflow and we only support a +// specific time range. + +import ( + "fmt" + "math" + "time" +) + +const ( + // MinNanoTime is the minimum time that can be represented. + // + // 1677-09-21 00:12:43.145224194 +0000 UTC + // + // The two lowest minimum integers are used as sentinel values. The + // minimum value needs to be used as a value lower than any other value for + // comparisons and another separate value is needed to act as a sentinel + // default value that is unusable by the user, but usable internally. + // Because these two values need to be used for a special purpose, we do + // not allow users to write points at these two times. + MinNanoTime = int64(math.MinInt64) + 2 + + // MaxNanoTime is the maximum time that can be represented. + // + // 2262-04-11 23:47:16.854775806 +0000 UTC + // + // The highest time represented by a nanosecond needs to be used for an + // exclusive range in the shard group, so the maximum time needs to be one + // less than the possible maximum number of nanoseconds representable by an + // int64 so that we don't lose a point at that one time. + MaxNanoTime = int64(math.MaxInt64) - 1 +) + +var ( + minNanoTime = time.Unix(0, MinNanoTime).UTC() + maxNanoTime = time.Unix(0, MaxNanoTime).UTC() + + // ErrTimeOutOfRange gets returned when time is out of the representable range using int64 nanoseconds since the epoch. + ErrTimeOutOfRange = fmt.Errorf("time outside range %d - %d", MinNanoTime, MaxNanoTime) +) + +// SafeCalcTime safely calculates the time given. Will return error if the time is outside the +// supported range. +func SafeCalcTime(timestamp int64, precision string) (time.Time, error) { + mult := GetPrecisionMultiplier(precision) + if t, ok := safeSignedMult(timestamp, mult); ok { + tme := time.Unix(0, t).UTC() + return tme, CheckTime(tme) + } + + return time.Time{}, ErrTimeOutOfRange +} + +// CheckTime checks that a time is within the safe range. +func CheckTime(t time.Time) error { + if t.Before(minNanoTime) || t.After(maxNanoTime) { + return ErrTimeOutOfRange + } + return nil +} + +// Perform the multiplication and check to make sure it didn't overflow. +func safeSignedMult(a, b int64) (int64, bool) { + if a == 0 || b == 0 || a == 1 || b == 1 { + return a * b, true + } + if a == MinNanoTime || b == MaxNanoTime { + return 0, false + } + c := a * b + return c, c/b == a +} diff --git a/vendor/github.com/influxdata/influxdb/v2/pkg/escape/bytes.go b/vendor/github.com/influxdata/influxdb/v2/pkg/escape/bytes.go new file mode 100644 index 00000000000..dd6b2eb9baa --- /dev/null +++ b/vendor/github.com/influxdata/influxdb/v2/pkg/escape/bytes.go @@ -0,0 +1,115 @@ +// Package escape contains utilities for escaping parts of InfluxQL +// and InfluxDB line protocol. +package escape // import "github.com/influxdata/influxdb/v2/pkg/escape" + +import ( + "bytes" + "strings" +) + +// Codes is a map of bytes to be escaped. +var Codes = map[byte][]byte{ + ',': []byte(`\,`), + '"': []byte(`\"`), + ' ': []byte(`\ `), + '=': []byte(`\=`), +} + +// Bytes escapes characters on the input slice, as defined by Codes. +func Bytes(in []byte) []byte { + for b, esc := range Codes { + in = bytes.Replace(in, []byte{b}, esc, -1) + } + return in +} + +const escapeChars = `," =` + +// IsEscaped returns whether b has any escaped characters, +// i.e. whether b seems to have been processed by Bytes. +func IsEscaped(b []byte) bool { + for len(b) > 0 { + i := bytes.IndexByte(b, '\\') + if i < 0 { + return false + } + + if i+1 < len(b) && strings.IndexByte(escapeChars, b[i+1]) >= 0 { + return true + } + b = b[i+1:] + } + return false +} + +// AppendUnescaped appends the unescaped version of src to dst +// and returns the resulting slice. +func AppendUnescaped(dst, src []byte) []byte { + var pos int + for len(src) > 0 { + next := bytes.IndexByte(src[pos:], '\\') + if next < 0 || pos+next+1 >= len(src) { + return append(dst, src...) + } + + if pos+next+1 < len(src) && strings.IndexByte(escapeChars, src[pos+next+1]) >= 0 { + if pos+next > 0 { + dst = append(dst, src[:pos+next]...) + } + src = src[pos+next+1:] + pos = 0 + } else { + pos += next + 1 + } + } + + return dst +} + +// Unescape returns a new slice containing the unescaped version of in. +func Unescape(in []byte) []byte { + if len(in) == 0 { + return nil + } + + if bytes.IndexByte(in, '\\') == -1 { + return in + } + + i := 0 + inLen := len(in) + + // The output size will be no more than inLen. Preallocating the + // capacity of the output is faster and uses less memory than + // letting append() do its own (over)allocation. + out := make([]byte, 0, inLen) + + for { + if i >= inLen { + break + } + if in[i] == '\\' && i+1 < inLen { + switch in[i+1] { + case ',': + out = append(out, ',') + i += 2 + continue + case '"': + out = append(out, '"') + i += 2 + continue + case ' ': + out = append(out, ' ') + i += 2 + continue + case '=': + out = append(out, '=') + i += 2 + continue + } + } + out = append(out, in[i]) + i += 1 + } + return out +} diff --git a/vendor/github.com/influxdata/influxdb/v2/pkg/escape/strings.go b/vendor/github.com/influxdata/influxdb/v2/pkg/escape/strings.go new file mode 100644 index 00000000000..db98033b0d7 --- /dev/null +++ b/vendor/github.com/influxdata/influxdb/v2/pkg/escape/strings.go @@ -0,0 +1,21 @@ +package escape + +import "strings" + +var ( + escaper = strings.NewReplacer(`,`, `\,`, `"`, `\"`, ` `, `\ `, `=`, `\=`) + unescaper = strings.NewReplacer(`\,`, `,`, `\"`, `"`, `\ `, ` `, `\=`, `=`) +) + +// UnescapeString returns unescaped version of in. +func UnescapeString(in string) string { + if strings.IndexByte(in, '\\') == -1 { + return in + } + return unescaper.Replace(in) +} + +// String returns the escaped version of in. +func String(in string) string { + return escaper.Replace(in) +} diff --git a/vendor/modules.txt b/vendor/modules.txt index 809e3eacc42..59f085fb882 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -792,6 +792,11 @@ github.com/imdario/mergo # github.com/inconshreveable/mousetrap v1.1.0 ## explicit; go 1.18 github.com/inconshreveable/mousetrap +# github.com/influxdata/influxdb/v2 v2.7.11 +## explicit; go 1.22.0 +github.com/influxdata/influxdb/v2/kit/io +github.com/influxdata/influxdb/v2/models +github.com/influxdata/influxdb/v2/pkg/escape # github.com/jessevdk/go-flags v1.5.0 ## explicit; go 1.15 github.com/jessevdk/go-flags