Skip to content

Commit

Permalink
Return main upstream response when using multiple endpoints
Browse files Browse the repository at this point in the history
This changes refactors the Proxy remote wrtie handler to do
as much work outside of the handler as possible.

There is a major change in behaviour.
The extra endpoints are now written to async in the background of
the main request. Extra endpoints will be written to with best
effort and logged.

The response code from the thanos backend will be returned to the
client.

This will cause the client to retry on specific errors and will
have a knock on effect of a successfully written request prior
being attempted once again to the extra remote write endpoints.
  • Loading branch information
philipgough committed May 20, 2024
1 parent dd9217e commit d131672
Showing 1 changed file with 149 additions and 98 deletions.
247 changes: 149 additions & 98 deletions internal/remotewrite/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@ package remotewrite

import (
"bytes"
"io/ioutil"
"context"
"io"
"net"
"net/http"
"net/url"
Expand All @@ -18,13 +19,12 @@ import (
)

const (
THANOS_ENDPOINT_NAME = "thanos-receiver"
thanosEndpointName = "thanos-receiver"
)

type Endpoint struct {
Name string `yaml:"name"`
URL string `yaml:"url"`
// +optional
Name string `yaml:"name"`
URL string `yaml:"url"`
ClientConfig *promconfig.HTTPClientConfig `yaml:"http_client_config,omitempty"`
}

Expand All @@ -42,107 +42,79 @@ var (
}, []string{"code", "name"})
)

func remoteWrite(write *url.URL, endpoints []Endpoint, logger log.Logger) http.Handler {
func (rd *RequestDuplicator) remoteWrite(write *url.URL, endpoints []Endpoint, logger log.Logger, logManager *logManager) http.Handler {
var clientMap = map[string]*http.Client{}
clientMap = make(map[string]*http.Client)
defaultHTTPClient := defaultClient()
writePath := write.Path
writeHost := write.Host
if write.Scheme == "" {
write.Scheme = "http"
}
writeScheme := write.Scheme

for _, ep := range endpoints {
var client = defaultHTTPClient
if ep.ClientConfig != nil {
epClient, err := promconfig.NewClientFromConfig(*ep.ClientConfig, ep.Name,
promconfig.WithDialContextFunc((&net.Dialer{
Timeout: 30 * time.Second,
KeepAlive: 30 * time.Second,
}).DialContext))
if err == nil {
client = epClient
}
}
clientMap[ep.Name] = client
}

return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
requests.With(prometheus.Labels{"method": r.Method}).Inc()
rlogger := log.With(logger, "request", middleware.GetReqID(r.Context()))

body, _ := ioutil.ReadAll(r.Body)
_ = r.Body.Close()
r.Body = ioutil.NopCloser(bytes.NewBuffer(body))

if write != nil {
remotewriteUrl := url.URL{}
remotewriteUrl.Path = path.Join(write.Path, r.URL.Path)
remotewriteUrl.Host = write.Host
remotewriteUrl.Scheme = write.Scheme
endpoints[len(endpoints)-1].URL = remotewriteUrl.String()
body, err := io.ReadAll(r.Body)
if err != nil {
level.Error(rlogger).Log("msg", "failed to read request body", "err", err)
w.WriteHeader(http.StatusInternalServerError)
return
}
headers := r.Header.Clone()

rlogger := log.With(logger, "request", middleware.GetReqID(r.Context()))
for i, endpoint := range endpoints {
var client *http.Client
var err error
if endpoint.ClientConfig == nil {
client = &http.Client{
Transport: &http.Transport{
DisableKeepAlives: true,
IdleConnTimeout: 30 * time.Second,
DialContext: (&net.Dialer{
Timeout: 30 * time.Second,
KeepAlive: 30 * time.Second,
}).DialContext,
},
}
} else {
client, err = promconfig.NewClientFromConfig(*endpoint.ClientConfig, endpoint.Name,
promconfig.WithDialContextFunc((&net.Dialer{
Timeout: 30 * time.Second,
KeepAlive: 30 * time.Second,
}).DialContext))
if err != nil {
//level.Error(rlogger).Log("msg", "failed to create a new HTTP client", "err", err)
LogChannels[i] <- logMessage{
messageKey: "failed to create a new HTTP client",
keyvals: []interface{}{
"msg", "failed to create a new HTTP client", "err", err,
}}
}
}
rwReq, err := rebuildProxyRequest(r, body, writePath, writeHost, writeScheme)
if err != nil {
level.Error(rlogger).Log("msg", "failed to rebuild the request", "err", err)
w.WriteHeader(http.StatusInternalServerError)
return
}

req, err := http.NewRequest(http.MethodPost, endpoint.URL, bytes.NewReader(body))
req.Header = r.Header
if err != nil {
//level.Error(rlogger).Log("msg", "Failed to create the forward request", "err", err, "url", endpoint.URL)
LogChannels[i] <- logMessage{
messageKey: "failed to create the forward request",
keyvals: []interface{}{
"msg", "failed to create the forward request", "err", err,
}}
} else {
ep := endpoint
j := i
go func() {
for _, endpoint := range endpoints {
go func() {
resp, err := client.Do(req)
req, err := mirrorRequestFromBody(body, headers, endpoint.URL)
if err != nil {
remotewriteRequests.With(prometheus.Labels{"code": "<error>", "name": ep.Name}).Inc()
//level.Error(rlogger).Log("msg", "Failed to send request to the server", "err", err)
LogChannels[j] <- logMessage{
messageKey: "failed to send request to the server",
keyvals: []interface{}{
"msg", "failed to send request to the server", "err", err,
}}
} else {
defer resp.Body.Close()
remotewriteRequests.With(prometheus.Labels{"code": strconv.Itoa(resp.StatusCode), "name": ep.Name}).Inc()
if resp.StatusCode >= 300 || resp.StatusCode < 200 {
responseBody, err := ioutil.ReadAll(resp.Body)
if err != nil {
//level.Error(rlogger).Log("msg", "Failed to read response of the forward request", "err", err, "return code", resp.Status, "url", ep.URL)
LogChannels[j] <- logMessage{
messageKey: "failed to forward metrics" + resp.Status,
keyvals: []interface{}{
"msg", "failed to forward metrics", "return code", resp.Status, "url", ep.URL,
}}
} else {
LogChannels[j] <- logMessage{
messageKey: "Failed to forward metrics" + resp.Status,
keyvals: []interface{}{
"msg", "failed to forward metrics", "return code", resp.Status, "response", string(responseBody), "url", ep.URL}}
}
} else {
level.Debug(rlogger).Log("msg", successWrite, "url", ep.URL)
LogChannels[j] <- logMessage{
messageKey: successWrite,
}
}
level.Error(rlogger).Log("msg", "failed to build the remote write request", "url", endpoint.URL, "err", err)
return
}
client := getClientForEndpoint(endpoint.Name, clientMap)
_ = rd.doRemoteWriteRequest(client, req, endpoint.Name, logger)
}()

}
}()

// handle the main remote write endpoint request synchronously
if write != nil {
statusCode := rd.doRemoteWriteRequest(defaultHTTPClient, rwReq, thanosEndpointName, logger)
w.WriteHeader(statusCode)
}
})
}

func Proxy(write *url.URL, endpoints []Endpoint, logger log.Logger, r *prometheus.Registry) http.Handler {
type RequestDuplicator struct {
logManager *logManager
}

func (rd *RequestDuplicator) Proxy(write *url.URL, endpoints []Endpoint, logger log.Logger, r *prometheus.Registry) http.Handler {

r.MustRegister(requests)
r.MustRegister(remotewriteRequests)
Expand All @@ -151,14 +123,93 @@ func Proxy(write *url.URL, endpoints []Endpoint, logger log.Logger, r *prometheu
endpoints = []Endpoint{}
}

if write != nil {
endpoints = append(endpoints, Endpoint{
URL: write.String(),
Name: THANOS_ENDPOINT_NAME,
})
if rd.logManager == nil {
rd.logManager = newLogManager(logger, endpoints, nil)
}

InitChannels(logger, len(endpoints))
return rd.remoteWrite(write, endpoints, logger, rd.logManager)
}

func rebuildProxyRequest(r *http.Request, body []byte, reqPath, host, scheme string) (*http.Request, error) {
remotewriteUrl := url.URL{}
remotewriteUrl.Path = path.Join(reqPath, r.URL.Path)
remotewriteUrl.Host = host
remotewriteUrl.Scheme = scheme

req, err := http.NewRequest(r.Method, remotewriteUrl.String(), bytes.NewReader(body))
if err != nil {
return nil, err

return remoteWrite(write, endpoints, logger)
}
req.Header = r.Header.Clone()
req.WithContext(r.Context())
return req, nil
}

// mirrorRequestFromBody build a remote write request for the upstream remote write endpoint
// we enforce a 5s timeout here to avoid having unbounded goroutines due to slow backends
func mirrorRequestFromBody(body []byte, headers http.Header, endpoint string) (*http.Request, error) {
req, err := http.NewRequest(http.MethodPost, endpoint, bytes.NewReader(body))
if err != nil {
return nil, err
}
req.Header = headers
ctx, _ := context.WithTimeout(context.Background(), 5*time.Second)
req = req.WithContext(ctx)
return req, nil
}

func defaultClient() *http.Client {
return &http.Client{
Transport: &http.Transport{
DisableKeepAlives: true,
IdleConnTimeout: 30 * time.Second,
DialContext: (&net.Dialer{
Timeout: 30 * time.Second,
KeepAlive: 30 * time.Second,
}).DialContext,
},
}
}

func getClientForEndpoint(name string, fromPool map[string]*http.Client) *http.Client {
c, ok := fromPool[name]
if !ok {
return defaultClient()
}
return c
}

func (rd *RequestDuplicator) doRemoteWriteRequest(
client *http.Client,
req *http.Request,
epName string,
logger log.Logger,
) int {
resp, err := client.Do(req)
if err != nil {
remotewriteRequests.With(prometheus.Labels{"code": "<error>", "name": epName}).Inc()
rd.logManager.log(epName, "failed to send request to the server", "msg", "failed to send request to the server", "err", err)
return http.StatusInternalServerError
}

remotewriteRequests.With(prometheus.Labels{"code": strconv.Itoa(resp.StatusCode), "name": epName}).Inc()
if resp.StatusCode >= 300 || resp.StatusCode < 200 {
responseBody, err := io.ReadAll(resp.Body)
keyVals := []interface{}{
"msg", "failed to forward metrics",
"endpoint", epName,
"response code", resp.Status,
"response", string(responseBody),
"url", req.URL.String(),
}

if err != nil {
keyVals = append(keyVals, "err", err)
}
rd.logManager.log(epName, "failed to forward metrics "+resp.Status, keyVals...)
return resp.StatusCode
}
level.Debug(logger).Log("msg", "Successfully forwarded metrics", "url", req.URL.String())
return resp.StatusCode
}

0 comments on commit d131672

Please sign in to comment.