-
Notifications
You must be signed in to change notification settings - Fork 486
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
draft up a way to translate otel loki receiver to flow #6782
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,143 @@ | ||
package otelcolconvert | ||
|
||
import ( | ||
"fmt" | ||
|
||
"github.com/grafana/agent/internal/component/common/loki" | ||
"github.com/grafana/agent/internal/component/loki/source/api" | ||
"github.com/grafana/agent/internal/component/otelcol" | ||
otel_loki "github.com/grafana/agent/internal/component/otelcol/receiver/loki" | ||
"github.com/grafana/agent/internal/converter/diag" | ||
"github.com/grafana/agent/internal/converter/internal/common" | ||
"github.com/grafana/agent/internal/converter/internal/promtailconvert/build" | ||
"github.com/grafana/dskit/server" | ||
"github.com/grafana/loki/clients/pkg/promtail/scrapeconfig" | ||
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/lokireceiver" | ||
"go.opentelemetry.io/collector/component" | ||
) | ||
|
||
func init() { | ||
converters = append(converters, lokiReceiverConverter{}) | ||
} | ||
|
||
type lokiReceiverConverter struct{} | ||
|
||
func (lokiReceiverConverter) Factory() component.Factory { return lokireceiver.NewFactory() } | ||
|
||
func (lokiReceiverConverter) InputComponentName() string { return "" } | ||
|
||
func (lokiReceiverConverter) ConvertAndAppend(state *State, id component.InstanceID, cfg component.Config) diag.Diagnostics { | ||
var diags diag.Diagnostics | ||
|
||
label := state.FlowComponentLabel() | ||
|
||
otelArgs := toOtelcolReceiverLoki(state, id) | ||
otelBlock := common.NewBlockWithOverride([]string{"otelcol", "receiver", "loki"}, label, otelArgs) | ||
|
||
logsReceivers := []loki.LogsReceiver{common.ConvertLogsReceiver{ | ||
Expr: StringifyBlock(otelBlock) + ".receiver", | ||
}} | ||
apiArgs := toLokiSourceApi(logsReceivers, cfg.(*lokireceiver.Config)) | ||
apiBlock := common.NewBlockWithOverride([]string{"loki", "source", "api"}, label, apiArgs) | ||
|
||
diags.Add( | ||
diag.SeverityLevelInfo, | ||
fmt.Sprintf("Converted %s into %s", StringifyInstanceID(id), StringifyBlock(otelBlock)), | ||
) | ||
|
||
diags.Add( | ||
diag.SeverityLevelInfo, | ||
fmt.Sprintf("Converted %s into %s", StringifyInstanceID(id), StringifyBlock(apiBlock)), | ||
) | ||
|
||
// Do this at the end in reverse order so the pipeline looks a little better | ||
state.Body().AppendBlock(apiBlock) | ||
state.Body().AppendBlock(otelBlock) | ||
return diags | ||
} | ||
|
||
func toOtelcolReceiverLoki(state *State, id component.InstanceID) *otel_loki.Arguments { | ||
var ( | ||
nextLogs = state.Next(id, component.DataTypeLogs) | ||
) | ||
|
||
return &otel_loki.Arguments{ | ||
Output: &otelcol.ConsumerArguments{ | ||
Logs: ToTokenizedConsumers(nextLogs), | ||
}, | ||
} | ||
} | ||
|
||
func toLokiSourceApi(logsReceivers []loki.LogsReceiver, cfg *lokireceiver.Config) *api.Arguments { | ||
ptc := &scrapeconfig.PushTargetConfig{ | ||
Server: toServer(&cfg.Protocols), | ||
Labels: nil, | ||
KeepTimestamp: cfg.KeepTimestamp, | ||
} | ||
|
||
args := build.ToLokiApiArguments(ptc, logsReceivers) | ||
return &args | ||
} | ||
|
||
func toServer(cfg *lokireceiver.Protocols) server.Config { | ||
return server.Config{ | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Lots to figure out here with how to convert the server |
||
HTTPListenAddress: "", | ||
HTTPListenPort: 0, | ||
HTTPConnLimit: 0, | ||
HTTPServerReadTimeout: 0, | ||
HTTPServerWriteTimeout: 0, | ||
HTTPServerIdleTimeout: 0, | ||
GRPCListenAddress: "", | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. otel supports GRPC for their receiver and while ours has them wired up as flow arguments it is not supported. I created a new issue to add this support here: https://github.com/grafana/agent/issues/6783 |
||
GRPCListenPort: 0, | ||
GRPCConnLimit: 0, | ||
GRPCServerMaxConnectionAge: 0, | ||
GRPCServerMaxConnectionAgeGrace: 0, | ||
GRPCServerMaxRecvMsgSize: 0, | ||
GRPCServerMaxSendMsgSize: 0, | ||
GRPCServerMaxConcurrentStreams: uint(cfg.GRPC.MaxConcurrentStreams), | ||
GRPCServerMaxConnectionIdle: 0, | ||
ServerGracefulShutdownTimeout: cfg.GRPC.NetAddr.DialerConfig.Timeout, | ||
|
||
// PROMTAIL CONVERTER IGNORES ALL OF THESE | ||
// | ||
// HTTPListenNetwork: "", | ||
// HTTPServerReadHeaderTimeout: 0, | ||
// GRPCListenNetwork: "", | ||
// CipherSuites: "", | ||
// MinVersion: "", | ||
// HTTPTLSConfig: server.TLSConfig{}, | ||
// GRPCTLSConfig: server.TLSConfig{}, | ||
// RegisterInstrumentation: false, | ||
// ReportGRPCCodesInInstrumentationLabel: false, | ||
// ReportHTTP4XXCodesInInstrumentationLabel: false, | ||
// ExcludeRequestInLog: false, | ||
// DisableRequestSuccessLog: false, | ||
// HTTPLogClosedConnectionsWithoutResponse: false, | ||
// GRPCOptions: []grpc.ServerOption{}, | ||
// GRPCMiddleware: []grpc.UnaryServerInterceptor{}, | ||
// GRPCStreamMiddleware: []grpc.StreamServerInterceptor{}, | ||
// HTTPMiddleware: []middleware.Interface{}, | ||
// Router: &mux.Router{}, | ||
// DoNotAddDefaultHTTPMiddleware: false, | ||
// RouteHTTPToGRPC: false, | ||
// GRPCServerTime: 0, | ||
// GRPCServerTimeout: cfg.GRPC.NetAddr.DialerConfig.Timeout, | ||
// GRPCServerMinTimeBetweenPings: 0, | ||
// GRPCServerPingWithoutStreamAllowed: false, | ||
// GRPCServerNumWorkers: 0, | ||
// LogFormat: "", | ||
// LogLevel: log.Level{}, | ||
// Log: nil, | ||
// LogSourceIPs: false, | ||
// LogSourceIPsHeader: "", | ||
// LogSourceIPsRegex: "", | ||
// LogRequestHeaders: false, | ||
// LogRequestAtInfoLevel: false, | ||
// LogRequestExcludeHeadersList: "", | ||
// SignalHandler: nil, | ||
// Registerer: nil, | ||
// Gatherer: nil, | ||
// PathPrefix: "", | ||
// GrpcMethodLimiter: nil, | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,22 @@ | ||
loki.source.api "default" { | ||
http { } | ||
|
||
grpc { } | ||
graceful_shutdown_timeout = "10s" | ||
forward_to = [otelcol.receiver.loki.default.receiver] | ||
labels = {} | ||
relabel_rules = null | ||
use_incoming_timestamp = true | ||
} | ||
|
||
otelcol.receiver.loki "default" { | ||
output { | ||
logs = [otelcol.exporter.otlp.default.input] | ||
} | ||
} | ||
|
||
otelcol.exporter.otlp "default" { | ||
client { | ||
endpoint = "database:4317" | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,21 @@ | ||
receivers: | ||
loki: | ||
protocols: | ||
http: | ||
endpoint: 0.0.0.0:3500 | ||
grpc: | ||
endpoint: 0.0.0.0:3600 | ||
dialer: | ||
timeout: 10s | ||
use_incoming_timestamp: true | ||
|
||
exporters: | ||
otlp: | ||
endpoint: database:4317 | ||
|
||
service: | ||
pipelines: | ||
logs: | ||
receivers: [loki] | ||
processors: [] | ||
exporters: [otlp] |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -8,7 +8,7 @@ import ( | |
"github.com/grafana/agent/internal/component/common/loki" | ||
"github.com/grafana/agent/internal/converter/diag" | ||
"github.com/grafana/agent/internal/converter/internal/common" | ||
"github.com/grafana/agent/internal/converter/internal/promtailconvert/internal/build" | ||
"github.com/grafana/agent/internal/converter/internal/promtailconvert/build" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. These files all had to move in order to let the otelcol converter make use of them |
||
"github.com/grafana/dskit/flagext" | ||
promtailcfg "github.com/grafana/loki/clients/pkg/promtail/config" | ||
"github.com/grafana/loki/clients/pkg/promtail/limit" | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Promtail converter code does not use the common utils for this so we need to add code here