-
Notifications
You must be signed in to change notification settings - Fork 544
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
add influx push endpoint to mimir #10153
base: main
Are you sure you want to change the base?
Changes from 40 commits
444d34f
adb376a
3db179f
295fa8d
d2897e3
c2fb679
593b2e2
31bf23d
2487a88
03e8e3c
0bd8da8
4a76a11
066c009
bec3a26
ac51def
3a57dc6
92379e4
d44c71d
591389e
afbc357
847bcb9
320c467
730a7c3
de27d4b
af3def1
d65b3a5
9d94276
258fe0d
e5252d4
f86691b
32cc156
c798360
8d4e7ca
e915764
013b3d6
3c5a166
773722f
6143162
ac4e491
767695a
419d327
9e9e117
0da4b8f
c470fb3
c44d321
537fa37
14bae20
b951127
9d035f5
c31c191
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,154 @@ | ||
// SPDX-License-Identifier: AGPL-3.0-only | ||
|
||
package distributor | ||
|
||
import ( | ||
"context" | ||
"errors" | ||
"net/http" | ||
|
||
"github.com/go-kit/log" | ||
"github.com/go-kit/log/level" | ||
"github.com/grafana/dskit/grpcutil" | ||
"github.com/grafana/dskit/httpgrpc" | ||
"github.com/grafana/dskit/middleware" | ||
"github.com/grafana/dskit/tenant" | ||
influxio "github.com/influxdata/influxdb/v2/kit/io" | ||
|
||
"github.com/grafana/mimir/pkg/distributor/influxpush" | ||
"github.com/grafana/mimir/pkg/mimirpb" | ||
"github.com/grafana/mimir/pkg/util" | ||
utillog "github.com/grafana/mimir/pkg/util/log" | ||
"github.com/grafana/mimir/pkg/util/spanlogger" | ||
) | ||
|
||
func influxRequestParser(ctx context.Context, r *http.Request, maxSize int, _ *util.RequestBuffers, req *mimirpb.PreallocWriteRequest, logger log.Logger) (int, error) { | ||
spanLogger, ctx := spanlogger.NewWithLogger(ctx, logger, "Distributor.InfluxHandler.decodeAndConvert") | ||
defer spanLogger.Span.Finish() | ||
|
||
spanLogger.SetTag("content_type", r.Header.Get("Content-Type")) | ||
spanLogger.SetTag("content_encoding", r.Header.Get("Content-Encoding")) | ||
spanLogger.SetTag("content_length", r.ContentLength) | ||
|
||
pts, bytesRead, err := influxpush.ParseInfluxLineReader(ctx, r, maxSize) | ||
level.Debug(spanLogger).Log("msg", "decodeAndConvert complete", "bytesRead", bytesRead) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'd say we should also log There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. OK, done in next commit. |
||
if err != nil { | ||
level.Error(logger).Log("msg", "failed to parse Influx push request", "err", err) | ||
return bytesRead, err | ||
} | ||
|
||
level.Debug(spanLogger).Log( | ||
"msg", "Influx to Prometheus conversion complete", | ||
"metric_count", len(pts), | ||
) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think there's no need for a separate log here, just log the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. OK, done in next commit. |
||
|
||
req.Timeseries = pts | ||
return bytesRead, nil | ||
} | ||
|
||
// InfluxHandler is a http.Handler which accepts Influx Line protocol and converts it to WriteRequests. | ||
func InfluxHandler( | ||
maxRecvMsgSize int, | ||
requestBufferPool util.Pool, | ||
sourceIPs *middleware.SourceIPExtractor, | ||
retryCfg RetryConfig, | ||
push PushFunc, | ||
pushMetrics *PushMetrics, | ||
logger log.Logger, | ||
) http.Handler { | ||
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { | ||
ctx := r.Context() | ||
logger := utillog.WithContext(ctx, logger) | ||
if sourceIPs != nil { | ||
source := sourceIPs.Get(r) | ||
if source != "" { | ||
logger = utillog.WithSourceIPs(source, logger) | ||
} | ||
} | ||
|
||
tenantID, err := tenant.TenantID(ctx) | ||
if err != nil { | ||
level.Warn(logger).Log("msg", "unable to obtain tenantID", "err", err) | ||
return | ||
} | ||
|
||
pushMetrics.IncInfluxRequest(tenantID) | ||
|
||
var bytesRead int | ||
|
||
supplier := func() (*mimirpb.WriteRequest, func(), error) { | ||
rb := util.NewRequestBuffers(requestBufferPool) | ||
var req mimirpb.PreallocWriteRequest | ||
|
||
if bytesRead, err = influxRequestParser(ctx, r, maxRecvMsgSize, rb, &req, logger); err != nil { | ||
err = httpgrpc.Error(http.StatusBadRequest, err.Error()) | ||
rb.CleanUp() | ||
return nil, nil, err | ||
} | ||
|
||
cleanup := func() { | ||
mimirpb.ReuseSlice(req.Timeseries) | ||
rb.CleanUp() | ||
} | ||
return &req.WriteRequest, cleanup, nil | ||
} | ||
|
||
pushMetrics.ObserveInfluxUncompressedBodySize(tenantID, float64(bytesRead)) | ||
|
||
req := newRequest(supplier) | ||
// https://docs.influxdata.com/influxdb/cloud/api/v2/#tag/Response-codes | ||
if err := push(ctx, req); err != nil { | ||
if errors.Is(err, context.Canceled) { | ||
level.Warn(logger).Log("msg", "push request canceled", "err", err) | ||
w.WriteHeader(statusClientClosedRequest) | ||
return | ||
} | ||
if errors.Is(err, influxio.ErrReadLimitExceeded) { | ||
// TODO(alexg): One thing we have seen in the past is that telegraf clients send a batch of data | ||
// if it is too big they should respond to the 413 below, but if a client doesn't understand this | ||
// it just sends the next batch that is even bigger. In the past this has had to be dealt with by | ||
// adding rate limits to drop the payloads. | ||
level.Warn(logger).Log("msg", "request too large", "err", err, "bytesRead", bytesRead, "maxMsgSize", maxRecvMsgSize) | ||
w.WriteHeader(http.StatusRequestEntityTooLarge) | ||
return | ||
} | ||
// From: https://github.com/grafana/influx2cortex/blob/main/pkg/influx/errors.go | ||
|
||
var httpCode int | ||
var errorMsg string | ||
|
||
if st, ok := grpcutil.ErrorToStatus(err); ok { | ||
// This code is needed for a correct handling of errors returned by the supplier function. | ||
// These errors are created by using the httpgrpc package. | ||
httpCode = int(st.Code()) | ||
errorMsg = st.Message() | ||
} else { | ||
var distributorErr Error | ||
errorMsg = err.Error() | ||
if errors.Is(err, context.DeadlineExceeded) || !errors.As(err, &distributorErr) { | ||
httpCode = http.StatusServiceUnavailable | ||
} else { | ||
httpCode = errorCauseToHTTPStatusCode(distributorErr.Cause(), false) | ||
} | ||
} | ||
if httpCode != 202 { | ||
// This error message is consistent with error message in Prometheus remote-write handler, and ingester's ingest-storage pushToStorage method. | ||
msgs := []interface{}{"msg", "detected an error while ingesting Influx metrics request (the request may have been partially ingested)", "httpCode", httpCode, "err", err} | ||
if httpCode/100 == 4 { | ||
// This tag makes the error message visible for our Grafana Cloud customers. | ||
msgs = append(msgs, "insight", true) | ||
} | ||
level.Error(logger).Log(msgs...) | ||
} | ||
if httpCode < 500 { | ||
level.Info(logger).Log("msg", errorMsg, "response_code", httpCode, "err", err) | ||
} else { | ||
level.Warn(logger).Log("msg", errorMsg, "response_code", httpCode, "err", err) | ||
} | ||
addHeaders(w, err, r, httpCode, retryCfg) | ||
w.WriteHeader(httpCode) | ||
} else { | ||
w.WriteHeader(http.StatusNoContent) // Needed for Telegraf, otherwise it tries to marshal JSON and considers the write a failure. | ||
} | ||
}) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this path already an established thing? I feel like it should be
/influx/v1/push
instead (/api
comes from the times when we only had Prometheus, and OTLP is under/otlp
)There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, sadly it's burned into many of the agents. I'd love the prefix to be
/influx
but I don't think it is possible.I will make one final check though.