-
Notifications
You must be signed in to change notification settings - Fork 544
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
add influx push endpoint to mimir #10153
base: main
Are you sure you want to change the base?
Conversation
Signed-off-by: alexgreenbank <[email protected]>
Signed-off-by: alexgreenbank <[email protected]>
Signed-off-by: alexgreenbank <[email protected]>
Signed-off-by: alexgreenbank <[email protected]>
Signed-off-by: alexgreenbank <[email protected]>
Signed-off-by: alexgreenbank <[email protected]>
Signed-off-by: alexgreenbank <[email protected]>
Signed-off-by: alexgreenbank <[email protected]>
Signed-off-by: alexgreenbank <[email protected]>
Signed-off-by: alexgreenbank <[email protected]>
Signed-off-by: alexgreenbank <[email protected]>
Signed-off-by: alexgreenbank <[email protected]>
Signed-off-by: alexgreenbank <[email protected]>
pkg/api/api.go
Outdated
// TODO(alexg): hidden behind a featureflag or experimental config option? | ||
a.RegisterRoute(InfluxPushEndpoint, distributor.InfluxHandler( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think a feature flag for this is needed. We can just state in the docs (about-versioning.md) that the endpoint is experimental.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK, done.
pkg/distributor/influx.go
Outdated
level.Debug(spanLogger).Log( | ||
"msg", "decodeAndConvert complete", | ||
"bytesRead", bytesRead, | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Opinionated style nit: I don't think we need 4 lines for this debug log.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed in next push.
pkg/distributor/influx.go
Outdated
"bytesRead", bytesRead, | ||
) | ||
if err != nil { | ||
level.Error(logger).Log("err", err.Error()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this logger have all the required context? Can you also add some context here about what was going on when this happened? I'm scared of finding this log:
ts=2024-12-17 err="unexpected EOF"
Also, nit, .Error()
call is not needed, just pass err
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed, fixed in next push.
pkg/distributor/influx.go
Outdated
"github.com/grafana/dskit/grpcutil" | ||
"github.com/grafana/dskit/httpgrpc" | ||
"github.com/grafana/dskit/middleware" | ||
io2 "github.com/influxdata/influxdb/v2/kit/io" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
io2 "github.com/influxdata/influxdb/v2/kit/io" | |
influxio "github.com/influxdata/influxdb/v2/kit/io" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed, fixed in next push.
pkg/distributor/influx.go
Outdated
"github.com/grafana/mimir/pkg/util/spanlogger" | ||
) | ||
|
||
func parser(ctx context.Context, r *http.Request, maxSize int, _ *util.RequestBuffers, req *mimirpb.PreallocWriteRequest, logger log.Logger) error { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This function lives in pkg/distributor
, I would say it's a little bit pretentious to take the name parser
for this :D
How about influxRequestParser
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed, fixed in next push.
pkg/distributor/influx.go
Outdated
// TODO(alexg): Do we even need httpgrpc here? | ||
// Check for httpgrpc error, default to client error if parsing failed |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't see any httpgrpc errors being returned by parser
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oops, should have got rid of that one. Will resolve once I work out best way to wrap existing error in the StatusBadRequest.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, httpgprc
is used to smuggle both the http status code and the error message out of the supplier()
function. I've removed the misleading comment.
pkg/distributor/influxpush/parser.go
Outdated
(charInt >= 48 && charInt <= 57) || // 0-9 | ||
charInt == 95) { // _ | ||
|
||
*in = (*in)[:charIndex] + "_" + (*in)[charIndex+1:] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This allocates a new string (hopefully just one, compiler should be clever) for each one of the invalid characters. How about you modify the bytes slice before transforming it to a string?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed. Done in latest commit.
pkg/distributor/influxpush/parser.go
Outdated
// analog of invalidChars = regexp.MustCompile("[^a-zA-Z0-9_]") | ||
func replaceInvalidChars(in *string) { | ||
for charIndex, char := range *in { | ||
charInt := int(char) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do you need it to be an int?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It doesn't, removed unnecessary cast.
pkg/distributor/influxpush/parser.go
Outdated
func replaceInvalidChars(in *string) { | ||
for charIndex, char := range *in { | ||
charInt := int(char) | ||
if !((charInt >= 97 && charInt <= 122) || // a-z |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you use 'a' instead of 97 (is it correct?) you won't need the comment and the code will be less prone to have bugs.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Moved to literal chars instead of ASCII codes.
pkg/distributor/influxpush/parser.go
Outdated
} | ||
} | ||
// prefix with _ if first char is 0-9 | ||
if int((*in)[0]) >= 48 && int((*in)[0]) <= 57 { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should check first that *in
isn't empty, otherwise it would panic.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The values come from the influx library which won't allow an empty string to be created, but I've added an extra check just in case. Also added some extra tests to ensure the assumption about the influx library holds.
pkg/distributor/influxpush/parser.go
Outdated
key := string(tag.Key) | ||
if key == "__name__" || key == internalLabel { | ||
continue | ||
} | ||
replaceInvalidChars(&key) | ||
lbls = append(lbls, mimirpb.LabelAdapter{ | ||
Name: key, | ||
Value: string(tag.Value), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can do yoloString
here instead of string()
, as the push code is crafted to avoid keeping references to the strings from LabelAdapter
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Interesting! Fixed as per suggestion. I take it no other strings are guaranteed not to be referenced and so it can't be used anywhere else?
Signed-off-by: alexgreenbank <[email protected]>
Signed-off-by: alexgreenbank <[email protected]>
Signed-off-by: alexgreenbank <[email protected]>
Signed-off-by: alexgreenbank <[email protected]>
Signed-off-by: alexgreenbank <[email protected]>
Signed-off-by: alexgreenbank <[email protected]>
Signed-off-by: alexgreenbank <[email protected]>
Signed-off-by: alexgreenbank <[email protected]>
Signed-off-by: alexgreenbank <[email protected]>
Signed-off-by: alexgreenbank <[email protected]>
Signed-off-by: alexgreenbank <[email protected]>
Signed-off-by: alexgreenbank <[email protected]>
OK, comments left to deal with
|
Signed-off-by: alexgreenbank <[email protected]>
Signed-off-by: alexgreenbank <[email protected]>
Signed-off-by: alexgreenbank <[email protected]>
Signed-off-by: alexgreenbank <[email protected]>
Signed-off-by: alexgreenbank <[email protected]>
Signed-off-by: alexgreenbank <[email protected]>
Signed-off-by: alexgreenbank <[email protected]>
Signed-off-by: alexgreenbank <[email protected]>
Signed-off-by: alexgreenbank <[email protected]>
💻 Deploy preview available: https://deploy-preview-mimir-10153-zb444pucvq-vp.a.run.app/docs/mimir/latest/ |
Signed-off-by: alexgreenbank <[email protected]>
@@ -256,6 +256,7 @@ func (a *API) RegisterRuntimeConfig(runtimeConfigHandler http.HandlerFunc, userL | |||
|
|||
const PrometheusPushEndpoint = "/api/v1/push" | |||
const OTLPPushEndpoint = "/otlp/v1/metrics" | |||
const InfluxPushEndpoint = "/api/v1/influx/push" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this path already an established thing? I feel like it should be /influx/v1/push
instead (/api
comes from the times when we only had Prometheus, and OTLP is under /otlp
)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, sadly it's burned into many of the agents. I'd love the prefix to be /influx
but I don't think it is possible.
I will make one final check though.
pkg/distributor/influx.go
Outdated
level.Debug(spanLogger).Log( | ||
"msg", "Influx to Prometheus conversion complete", | ||
"metric_count", len(pts), | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think there's no need for a separate log here, just log the len(pts)
in the call above.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK, done in next commit.
pkg/distributor/influx.go
Outdated
spanLogger.SetTag("content_length", r.ContentLength) | ||
|
||
pts, bytesRead, err := influxpush.ParseInfluxLineReader(ctx, r, maxSize) | ||
level.Debug(spanLogger).Log("msg", "decodeAndConvert complete", "bytesRead", bytesRead) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd say we should also log err
here (just in case it wasn't nil)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK, done in next commit.
pkg/distributor/influxpush/parser.go
Outdated
key := string(tag.Key) | ||
if key == "__name__" || key == internalLabel { | ||
continue | ||
} | ||
replaceInvalidChars(&key) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We create a string from the tag.Key
slice (first allocation), then we pass it as a pointer to replaceInvalidChars
which allocates a slice from that string (another allocation), and then we write into that pointer the string we allocate (third allocation) from that bytes slice.
Why don't we just do key := replaceInvalidChars(tag.Key)
, and make replaceInvalidChars
just modify the incoming bytes slice and then return it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done as agreed...
Signed-off-by: alexgreenbank <[email protected]>
Signed-off-by: alexgreenbank <[email protected]>
Signed-off-by: alexgreenbank <[email protected]>
Signed-off-by: alexgreenbank <[email protected]>
Signed-off-by: alexgreenbank <[email protected]>
All checks passed. Blimey. |
Signed-off-by: alexgreenbank <[email protected]>
Signed-off-by: alexgreenbank <[email protected]>
Signed-off-by: alexgreenbank <[email protected]>
…into alexg/influx-push-handler
What this PR does
Add influx endpoint to Mimir.
Based on original code from #1971 but brought up to date for modern Distributor framework.
MVP. Experimantal/hidden feature so no documentation yet.
Checklist
CHANGELOG.md
updated - the order of entries should be[CHANGE]
,[FEATURE]
,[ENHANCEMENT]
,[BUGFIX]
.about-versioning.md
updated with experimental features.