Skip to content
This repository has been archived by the owner on Nov 7, 2022. It is now read-only.

all: accept MetricsExporters by implementation #294

Merged
merged 1 commit into from
Jan 10, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions cmd/ocagent/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,14 +76,13 @@ func runOCAgent() {
log.Fatalf("Configuration logical error: %v", err)
}

traceExporters, closeFns, err := config.ExportersFromYAMLConfig(yamlBlob)
traceExporters, metricsExporters, closeFns, err := config.ExportersFromYAMLConfig(yamlBlob)
if err != nil {
log.Fatalf("Config: failed to create exporters from YAML: %v", err)
}

commonSpanSink := exporter.MultiTraceExporters(traceExporters...)
// TODO: (@odeke-em, @songy23) remove this noopMetricsSink once we have metrics exporters.
commonMetricsSink := new(noopMetricsSink)
commonMetricsSink := exporter.MultiMetricsExporters(metricsExporters...)

// Add other receivers here as they are implemented
ocReceiverDoneFn, err := runOCReceiver(agentConfig, commonSpanSink, commonMetricsSink)
Expand Down
37 changes: 25 additions & 12 deletions cmd/occollector/app/collector/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,17 +156,20 @@ func execute() {

var closeFns []func()
var spanProcessors []processor.SpanProcessor
exportersCloseFns, exporters := createExporters()
exportersCloseFns, traceExporters, metricsExporters := createExporters()
closeFns = append(closeFns, exportersCloseFns...)
if len(exporters) > 0 {
if len(traceExporters) > 0 {
// Exporters need an extra hop from OC-proto to span data: to workaround that for now
// we will use a special processor that transforms the data to a format that they can consume.
// TODO: (@pjanotti) we should avoid this step in the long run, its an extra hop just to re-use
// the exporters: this can lose node information and it is not ideal for performance and delegates
// the retry/buffering to the exporters (that are designed to run within the tracing process).
spanProcessors = append(spanProcessors, processor.NewTraceExporterProcessor(exporters...))
spanProcessors = append(spanProcessors, processor.NewTraceExporterProcessor(traceExporters...))
}

// TODO: (@pjanotti) make use of metrics exporters
_ = metricsExporters

if v.GetBool(debugProcessorFlg) {
spanProcessors = append(spanProcessors, processor.NewNoopSpanProcessor(logger))
}
Expand Down Expand Up @@ -256,11 +259,11 @@ func initTelemetry(level telemetry.Level, port int, asyncErrorChannel chan<- err
return nil
}

func createExporters() (doneFns []func(), traceExporters []exporter.TraceExporter) {
func createExporters() (doneFns []func(), traceExporters []exporter.TraceExporter, metricsExporters []exporter.MetricsExporter) {
// TODO: (@pjanotti) this is slightly modified from agent but in the end duplication, need to consolidate style and visibility.
parseFns := []struct {
name string
fn func([]byte) ([]exporter.TraceExporter, []func() error, error)
fn func([]byte) ([]exporter.TraceExporter, []exporter.MetricsExporter, []func() error, error)
}{
{name: "datadog", fn: exporterparser.DatadogTraceExportersFromYAML},
{name: "stackdriver", fn: exporterparser.StackdriverTraceExportersFromYAML},
Expand All @@ -280,19 +283,25 @@ func createExporters() (doneFns []func(), traceExporters []exporter.TraceExporte
}

for _, cfg := range parseFns {
tes, tesDoneFns, err := cfg.fn(cfgBlob)
tes, mes, tesDoneFns, err := cfg.fn(cfgBlob)
if err != nil {
logger.Fatal("Failed to create config for exporter", zap.String("exporter", cfg.name), zap.Error(err))
}

wasEnabled := false
var anyTraceExporterEnabled, anyMetricsExporterEnabled bool

for _, te := range tes {
if te != nil {
wasEnabled = true
traceExporters = append(traceExporters, te)
anyTraceExporterEnabled = true
}
}
for _, me := range mes {
if me != nil {
metricsExporters = append(metricsExporters, me)
anyMetricsExporterEnabled = true
}
}

for _, tesDoneFn := range tesDoneFns {
if tesDoneFn != nil {
wrapperFn := func() {
Expand All @@ -304,12 +313,16 @@ func createExporters() (doneFns []func(), traceExporters []exporter.TraceExporte
}
}

if wasEnabled {
logger.Info("Exporter enabled", zap.String("exporter", cfg.name))
if anyTraceExporterEnabled {
logger.Info("Trace Exporter enabled", zap.String("exporter", cfg.name))
}
if anyMetricsExporterEnabled {
logger.Info("Metrices Exporter enabled", zap.String("exporter", cfg.name))
}

}

return doneFns, traceExporters
return doneFns, traceExporters, metricsExporters
}

func buildQueuedSpanProcessor(logger *zap.Logger, opts *builder.QueuedSpanProcessorCfg) (processor.SpanProcessor, error) {
Expand Down
21 changes: 14 additions & 7 deletions exporter/exporterparser/datadog.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ type datadogConfig struct {
Tags []string `yaml:"tags,omitempty"`

EnableTracing bool `yaml:"enable_tracing,omitempty"`
EnableMetrics bool `yaml:"enable_metrics,omitempty"`
}

type datadogExporter struct {
Expand All @@ -47,25 +48,25 @@ type datadogExporter struct {

// DatadogTraceExportersFromYAML parses the yaml bytes and returns an exporter.TraceExporter targeting
// Datadog according to the configuration settings.
func DatadogTraceExportersFromYAML(config []byte) (tes []exporter.TraceExporter, doneFns []func() error, err error) {
func DatadogTraceExportersFromYAML(config []byte) (tes []exporter.TraceExporter, mes []exporter.MetricsExporter, doneFns []func() error, err error) {
var cfg struct {
Exporters *struct {
Datadog *datadogConfig `yaml:"datadog"`
} `yaml:"exporters"`
}
if err := yamlUnmarshal(config, &cfg); err != nil {
return nil, nil, err
return nil, nil, nil, err
}
if cfg.Exporters == nil {
return nil, nil, nil
return nil, nil, nil, nil
}

dc := cfg.Exporters.Datadog
if dc == nil {
return nil, nil, nil
return nil, nil, nil, nil
}
if !dc.EnableTracing {
return nil, nil, nil
if !dc.EnableTracing && !dc.EnableMetrics {
return nil, nil, nil, nil
}

// TODO(jbd): Create new exporter for each service name.
Expand All @@ -79,7 +80,13 @@ func DatadogTraceExportersFromYAML(config []byte) (tes []exporter.TraceExporter,
de.Stop()
return nil
})
tes = append(tes, &datadogExporter{exporter: de})

dexp := &datadogExporter{exporter: de}
tes = append(tes, dexp)

// TODO: (@odeke-em, @songya23) implement ExportMetrics for Datadog.
// mes = append(mes, oexp)

return
}

Expand Down
12 changes: 6 additions & 6 deletions exporter/exporterparser/jaeger.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,23 +34,23 @@ type jaegerExporter struct {
exporter *jaeger.Exporter
}

// JaegerExportersFromYAML parses the yaml bytes and returns an exporter.TraceExporter targeting
// JaegerExportersFromYAML parses the yaml bytes and returns exporter.TraceExporters targeting
// Jaeger according to the configuration settings.
func JaegerExportersFromYAML(config []byte) (tes []exporter.TraceExporter, doneFns []func() error, err error) {
func JaegerExportersFromYAML(config []byte) (tes []exporter.TraceExporter, mes []exporter.MetricsExporter, doneFns []func() error, err error) {
var cfg struct {
Exporters *struct {
Jaeger *jaegerConfig `yaml:"jaeger"`
} `yaml:"exporters"`
}
if err := yamlUnmarshal(config, &cfg); err != nil {
return nil, nil, err
return nil, nil, nil, err
}
if cfg.Exporters == nil {
return nil, nil, nil
return nil, nil, nil, nil
}
jc := cfg.Exporters.Jaeger
if jc == nil {
return nil, nil, nil
return nil, nil, nil, nil
}

// jaeger.NewExporter performs configurqtion validation
Expand All @@ -63,7 +63,7 @@ func JaegerExportersFromYAML(config []byte) (tes []exporter.TraceExporter, doneF
},
})
if err != nil {
return nil, nil, err
return nil, nil, nil, err
}

doneFns = append(doneFns, func() error {
Expand Down
12 changes: 6 additions & 6 deletions exporter/exporterparser/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,22 +37,22 @@ var _ exporter.TraceExporter = (*kafkaExporter)(nil)

// KafkaExportersFromYAML parses the yaml bytes and returns an exporter.TraceExporter targeting
// Kafka according to the configuration settings.
func KafkaExportersFromYAML(config []byte) (tes []exporter.TraceExporter, doneFns []func() error, err error) {
func KafkaExportersFromYAML(config []byte) (tes []exporter.TraceExporter, mes []exporter.MetricsExporter, doneFns []func() error, err error) {
var cfg struct {
Exporters *struct {
Kafka *kafkaConfig `yaml:"kafka"`
} `yaml:"exporters"`
}

if err := yamlUnmarshal(config, &cfg); err != nil {
return nil, nil, err
return nil, nil, nil, err
}
if cfg.Exporters == nil {
return nil, nil, nil
return nil, nil, nil, nil
}
kc := cfg.Exporters.Kafka
if kc == nil {
return nil, nil, nil
return nil, nil, nil, nil
}

kde, kerr := kafka.NewExporter(kafka.Options{
Expand All @@ -61,15 +61,15 @@ func KafkaExportersFromYAML(config []byte) (tes []exporter.TraceExporter, doneFn
})

if kerr != nil {
return nil, nil, fmt.Errorf("Cannot configure Kafka Trace exporter: %v", kerr)
return nil, nil, nil, fmt.Errorf("Cannot configure Kafka Trace exporter: %v", kerr)
}

tes = append(tes, &kafkaExporter{exporter: kde})
doneFns = append(doneFns, func() error {
kde.Flush()
return nil
})
return tes, doneFns, nil
return tes, nil, doneFns, nil
}

func (kde *kafkaExporter) ExportSpans(ctx context.Context, td data.TraceData) error {
Expand Down
20 changes: 12 additions & 8 deletions exporter/exporterparser/opencensus.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,38 +37,42 @@ var _ exporter.TraceExporter = (*ocagentExporter)(nil)

// OpenCensusTraceExportersFromYAML parses the yaml bytes and returns an exporter.TraceExporter targeting
// OpenCensus Agent/Collector according to the configuration settings.
func OpenCensusTraceExportersFromYAML(config []byte) (tes []exporter.TraceExporter, doneFns []func() error, err error) {
func OpenCensusTraceExportersFromYAML(config []byte) (tes []exporter.TraceExporter, mes []exporter.MetricsExporter, doneFns []func() error, err error) {
var cfg struct {
Exporters *struct {
OpenCensus *opencensusConfig `yaml:"opencensus"`
} `yaml:"exporters"`
}
if err := yamlUnmarshal(config, &cfg); err != nil {
return nil, nil, err
return nil, nil, nil, err
}
if cfg.Exporters == nil {
return nil, nil, nil
return nil, nil, nil, nil
}
ocac := cfg.Exporters.OpenCensus
if ocac == nil {
return nil, nil, nil
return nil, nil, nil, nil
}

if ocac.Endpoint == "" {
return nil, nil, fmt.Errorf("OpenCensus config requires an Endpoint")
return nil, nil, nil, fmt.Errorf("OpenCensus config requires an Endpoint")
}

sde, serr := ocagent.NewExporter(ocagent.WithAddress(ocac.Endpoint), ocagent.WithInsecure())
if serr != nil {
return nil, nil, fmt.Errorf("Cannot configure OpenCensus Trace exporter: %v", serr)
return nil, nil, nil, fmt.Errorf("Cannot configure OpenCensus Trace exporter: %v", serr)
}

tes = append(tes, &ocagentExporter{exporter: sde})
oexp := &ocagentExporter{exporter: sde}
tes = append(tes, oexp)

// TODO: (@odeke-em, @songya23) implement ExportMetrics for OpenCensus.
// mes = append(mes, oexp)
doneFns = append(doneFns, func() error {
sde.Flush()
return nil
})
return tes, doneFns, nil
return tes, mes, doneFns, nil
}

func (sde *ocagentExporter) ExportSpans(ctx context.Context, td data.TraceData) error {
Expand Down
28 changes: 18 additions & 10 deletions exporter/exporterparser/stackdriver.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
type stackdriverConfig struct {
ProjectID string `yaml:"project,omitempty"`
EnableTracing bool `yaml:"enable_tracing,omitempty"`
EnableMetrics bool `yaml:"enable_metrics,omitempty"`
}

type stackdriverExporter struct {
Expand All @@ -37,45 +38,52 @@ var _ exporter.TraceExporter = (*stackdriverExporter)(nil)

// StackdriverTraceExportersFromYAML parses the yaml bytes and returns an exporter.TraceExporter targeting
// Stackdriver according to the configuration settings.
func StackdriverTraceExportersFromYAML(config []byte) (tes []exporter.TraceExporter, doneFns []func() error, err error) {
func StackdriverTraceExportersFromYAML(config []byte) (tes []exporter.TraceExporter, mes []exporter.MetricsExporter, doneFns []func() error, err error) {
var cfg struct {
Exporters *struct {
Stackdriver *stackdriverConfig `yaml:"stackdriver"`
} `yaml:"exporters"`
}
if err := yamlUnmarshal(config, &cfg); err != nil {
return nil, nil, err
return nil, nil, nil, err
}
if cfg.Exporters == nil {
return nil, nil, nil
return nil, nil, nil, nil
}
sc := cfg.Exporters.Stackdriver
if sc == nil {
return nil, nil, nil
return nil, nil, nil, nil
}
if !sc.EnableTracing {
return nil, nil, nil
if !sc.EnableTracing && !sc.EnableMetrics {
return nil, nil, nil, nil
}

// TODO: For each ProjectID, create a different exporter
// or at least a unique Stackdriver client per ProjectID.
if sc.ProjectID == "" {
return nil, nil, fmt.Errorf("Stackdriver config requires a project ID")
return nil, nil, nil, fmt.Errorf("Stackdriver config requires a project ID")
}

sde, serr := stackdriver.NewExporter(stackdriver.Options{
ProjectID: sc.ProjectID,
})
if serr != nil {
return nil, nil, fmt.Errorf("Cannot configure Stackdriver Trace exporter: %v", serr)
return nil, nil, nil, fmt.Errorf("Cannot configure Stackdriver Trace exporter: %v", serr)
}

tes = append(tes, &stackdriverExporter{exporter: sde})
stexp := &stackdriverExporter{exporter: sde}
if sc.EnableTracing {
tes = append(tes, stexp)
}
if sc.EnableMetrics {
// TODO: (@odeke-em, @songya23) implement ExportMetrics for Stackdriver.
// mes = append(mes, oexp)
}
doneFns = append(doneFns, func() error {
sde.Flush()
return nil
})
return tes, doneFns, nil
return tes, mes, doneFns, nil
}

func (sde *stackdriverExporter) ExportSpans(ctx context.Context, td data.TraceData) error {
Expand Down
Loading