Skip to content
This repository has been archived by the owner on Nov 7, 2022. It is now read-only.

Commit

Permalink
all: accept MetricsExporters by implementation
Browse files Browse the repository at this point in the history
ExportersFromYAMLConfig now returns metricsExporters too.
  • Loading branch information
odeke-em committed Jan 10, 2019
1 parent 8fde60e commit d2f0713
Show file tree
Hide file tree
Showing 10 changed files with 114 additions and 63 deletions.
5 changes: 2 additions & 3 deletions cmd/ocagent/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,14 +76,13 @@ func runOCAgent() {
log.Fatalf("Configuration logical error: %v", err)
}

traceExporters, closeFns, err := config.ExportersFromYAMLConfig(yamlBlob)
traceExporters, metricsExporters, closeFns, err := config.ExportersFromYAMLConfig(yamlBlob)
if err != nil {
log.Fatalf("Config: failed to create exporters from YAML: %v", err)
}

commonSpanSink := exporter.MultiTraceExporters(traceExporters...)
// TODO: (@odeke-em, @songy23) remove this noopMetricsSink once we have metrics exporters.
commonMetricsSink := new(noopMetricsSink)
commonMetricsSink := exporter.MultiMetricsExporters(metricsExporters...)

// Add other receivers here as they are implemented
ocReceiverDoneFn, err := runOCReceiver(agentConfig, commonSpanSink, commonMetricsSink)
Expand Down
37 changes: 25 additions & 12 deletions cmd/occollector/app/collector/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,17 +156,20 @@ func execute() {

var closeFns []func()
var spanProcessors []processor.SpanProcessor
exportersCloseFns, exporters := createExporters()
exportersCloseFns, traceExporters, metricsExporters := createExporters()
closeFns = append(closeFns, exportersCloseFns...)
if len(exporters) > 0 {
if len(traceExporters) > 0 {
// Exporters need an extra hop from OC-proto to span data: to workaround that for now
// we will use a special processor that transforms the data to a format that they can consume.
// TODO: (@pjanotti) we should avoid this step in the long run, its an extra hop just to re-use
// the exporters: this can lose node information and it is not ideal for performance and delegates
// the retry/buffering to the exporters (that are designed to run within the tracing process).
spanProcessors = append(spanProcessors, processor.NewTraceExporterProcessor(exporters...))
spanProcessors = append(spanProcessors, processor.NewTraceExporterProcessor(traceExporters...))
}

// TODO: (@pjanotti) make use of metrics exporters
_ = metricsExporters

if v.GetBool(debugProcessorFlg) {
spanProcessors = append(spanProcessors, processor.NewNoopSpanProcessor(logger))
}
Expand Down Expand Up @@ -256,11 +259,11 @@ func initTelemetry(level telemetry.Level, port int, asyncErrorChannel chan<- err
return nil
}

func createExporters() (doneFns []func(), traceExporters []exporter.TraceExporter) {
func createExporters() (doneFns []func(), traceExporters []exporter.TraceExporter, metricsExporters []exporter.MetricsExporter) {
// TODO: (@pjanotti) this is slightly modified from agent but in the end duplication, need to consolidate style and visibility.
parseFns := []struct {
name string
fn func([]byte) ([]exporter.TraceExporter, []func() error, error)
fn func([]byte) ([]exporter.TraceExporter, []exporter.MetricsExporter, []func() error, error)
}{
{name: "datadog", fn: exporterparser.DatadogTraceExportersFromYAML},
{name: "stackdriver", fn: exporterparser.StackdriverTraceExportersFromYAML},
Expand All @@ -280,19 +283,25 @@ func createExporters() (doneFns []func(), traceExporters []exporter.TraceExporte
}

for _, cfg := range parseFns {
tes, tesDoneFns, err := cfg.fn(cfgBlob)
tes, mes, tesDoneFns, err := cfg.fn(cfgBlob)
if err != nil {
logger.Fatal("Failed to create config for exporter", zap.String("exporter", cfg.name), zap.Error(err))
}

wasEnabled := false
var anyTraceExporterEnabled, anyMetricsExporterEnabled bool

for _, te := range tes {
if te != nil {
wasEnabled = true
traceExporters = append(traceExporters, te)
anyTraceExporterEnabled = true
}
}
for _, me := range mes {
if me != nil {
metricsExporters = append(metricsExporters, me)
anyMetricsExporterEnabled = true
}
}

for _, tesDoneFn := range tesDoneFns {
if tesDoneFn != nil {
wrapperFn := func() {
Expand All @@ -304,12 +313,16 @@ func createExporters() (doneFns []func(), traceExporters []exporter.TraceExporte
}
}

if wasEnabled {
logger.Info("Exporter enabled", zap.String("exporter", cfg.name))
if anyTraceExporterEnabled {
logger.Info("Trace Exporter enabled", zap.String("exporter", cfg.name))
}
if anyMetricsExporterEnabled {
logger.Info("Metrices Exporter enabled", zap.String("exporter", cfg.name))
}

}

return doneFns, traceExporters
return doneFns, traceExporters, metricsExporters
}

func buildQueuedSpanProcessor(logger *zap.Logger, opts *builder.QueuedSpanProcessorCfg) (processor.SpanProcessor, error) {
Expand Down
21 changes: 14 additions & 7 deletions exporter/exporterparser/datadog.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ type datadogConfig struct {
Tags []string `yaml:"tags,omitempty"`

EnableTracing bool `yaml:"enable_tracing,omitempty"`
EnableMetrics bool `yaml:"enable_metrics,omitempty"`
}

type datadogExporter struct {
Expand All @@ -47,25 +48,25 @@ type datadogExporter struct {

// DatadogTraceExportersFromYAML parses the yaml bytes and returns an exporter.TraceExporter targeting
// Datadog according to the configuration settings.
func DatadogTraceExportersFromYAML(config []byte) (tes []exporter.TraceExporter, doneFns []func() error, err error) {
func DatadogTraceExportersFromYAML(config []byte) (tes []exporter.TraceExporter, mes []exporter.MetricsExporter, doneFns []func() error, err error) {
var cfg struct {
Exporters *struct {
Datadog *datadogConfig `yaml:"datadog"`
} `yaml:"exporters"`
}
if err := yamlUnmarshal(config, &cfg); err != nil {
return nil, nil, err
return nil, nil, nil, err
}
if cfg.Exporters == nil {
return nil, nil, nil
return nil, nil, nil, nil
}

dc := cfg.Exporters.Datadog
if dc == nil {
return nil, nil, nil
return nil, nil, nil, nil
}
if !dc.EnableTracing {
return nil, nil, nil
if !dc.EnableTracing && !dc.EnableMetrics {
return nil, nil, nil, nil
}

// TODO(jbd): Create new exporter for each service name.
Expand All @@ -79,7 +80,13 @@ func DatadogTraceExportersFromYAML(config []byte) (tes []exporter.TraceExporter,
de.Stop()
return nil
})
tes = append(tes, &datadogExporter{exporter: de})

dexp := &datadogExporter{exporter: de}
tes = append(tes, dexp)

// TODO: (@odeke-em, @songya23) implement ExportMetrics for Datadog.
// mes = append(mes, oexp)

return
}

Expand Down
12 changes: 6 additions & 6 deletions exporter/exporterparser/jaeger.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,23 +34,23 @@ type jaegerExporter struct {
exporter *jaeger.Exporter
}

// JaegerExportersFromYAML parses the yaml bytes and returns an exporter.TraceExporter targeting
// JaegerExportersFromYAML parses the yaml bytes and returns exporter.TraceExporters targeting
// Jaeger according to the configuration settings.
func JaegerExportersFromYAML(config []byte) (tes []exporter.TraceExporter, doneFns []func() error, err error) {
func JaegerExportersFromYAML(config []byte) (tes []exporter.TraceExporter, mes []exporter.MetricsExporter, doneFns []func() error, err error) {
var cfg struct {
Exporters *struct {
Jaeger *jaegerConfig `yaml:"jaeger"`
} `yaml:"exporters"`
}
if err := yamlUnmarshal(config, &cfg); err != nil {
return nil, nil, err
return nil, nil, nil, err
}
if cfg.Exporters == nil {
return nil, nil, nil
return nil, nil, nil, nil
}
jc := cfg.Exporters.Jaeger
if jc == nil {
return nil, nil, nil
return nil, nil, nil, nil
}

// jaeger.NewExporter performs configurqtion validation
Expand All @@ -63,7 +63,7 @@ func JaegerExportersFromYAML(config []byte) (tes []exporter.TraceExporter, doneF
},
})
if err != nil {
return nil, nil, err
return nil, nil, nil, err
}

doneFns = append(doneFns, func() error {
Expand Down
12 changes: 6 additions & 6 deletions exporter/exporterparser/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,22 +37,22 @@ var _ exporter.TraceExporter = (*kafkaExporter)(nil)

// KafkaExportersFromYAML parses the yaml bytes and returns an exporter.TraceExporter targeting
// Kafka according to the configuration settings.
func KafkaExportersFromYAML(config []byte) (tes []exporter.TraceExporter, doneFns []func() error, err error) {
func KafkaExportersFromYAML(config []byte) (tes []exporter.TraceExporter, mes []exporter.MetricsExporter, doneFns []func() error, err error) {
var cfg struct {
Exporters *struct {
Kafka *kafkaConfig `yaml:"kafka"`
} `yaml:"exporters"`
}

if err := yamlUnmarshal(config, &cfg); err != nil {
return nil, nil, err
return nil, nil, nil, err
}
if cfg.Exporters == nil {
return nil, nil, nil
return nil, nil, nil, nil
}
kc := cfg.Exporters.Kafka
if kc == nil {
return nil, nil, nil
return nil, nil, nil, nil
}

kde, kerr := kafka.NewExporter(kafka.Options{
Expand All @@ -61,15 +61,15 @@ func KafkaExportersFromYAML(config []byte) (tes []exporter.TraceExporter, doneFn
})

if kerr != nil {
return nil, nil, fmt.Errorf("Cannot configure Kafka Trace exporter: %v", kerr)
return nil, nil, nil, fmt.Errorf("Cannot configure Kafka Trace exporter: %v", kerr)
}

tes = append(tes, &kafkaExporter{exporter: kde})
doneFns = append(doneFns, func() error {
kde.Flush()
return nil
})
return tes, doneFns, nil
return tes, nil, doneFns, nil
}

func (kde *kafkaExporter) ExportSpans(ctx context.Context, td data.TraceData) error {
Expand Down
20 changes: 12 additions & 8 deletions exporter/exporterparser/opencensus.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,38 +37,42 @@ var _ exporter.TraceExporter = (*ocagentExporter)(nil)

// OpenCensusTraceExportersFromYAML parses the yaml bytes and returns an exporter.TraceExporter targeting
// OpenCensus Agent/Collector according to the configuration settings.
func OpenCensusTraceExportersFromYAML(config []byte) (tes []exporter.TraceExporter, doneFns []func() error, err error) {
func OpenCensusTraceExportersFromYAML(config []byte) (tes []exporter.TraceExporter, mes []exporter.MetricsExporter, doneFns []func() error, err error) {
var cfg struct {
Exporters *struct {
OpenCensus *opencensusConfig `yaml:"opencensus"`
} `yaml:"exporters"`
}
if err := yamlUnmarshal(config, &cfg); err != nil {
return nil, nil, err
return nil, nil, nil, err
}
if cfg.Exporters == nil {
return nil, nil, nil
return nil, nil, nil, nil
}
ocac := cfg.Exporters.OpenCensus
if ocac == nil {
return nil, nil, nil
return nil, nil, nil, nil
}

if ocac.Endpoint == "" {
return nil, nil, fmt.Errorf("OpenCensus config requires an Endpoint")
return nil, nil, nil, fmt.Errorf("OpenCensus config requires an Endpoint")
}

sde, serr := ocagent.NewExporter(ocagent.WithAddress(ocac.Endpoint), ocagent.WithInsecure())
if serr != nil {
return nil, nil, fmt.Errorf("Cannot configure OpenCensus Trace exporter: %v", serr)
return nil, nil, nil, fmt.Errorf("Cannot configure OpenCensus Trace exporter: %v", serr)
}

tes = append(tes, &ocagentExporter{exporter: sde})
oexp := &ocagentExporter{exporter: sde}
tes = append(tes, oexp)

// TODO: (@odeke-em, @songya23) implement ExportMetrics for OpenCensus.
// mes = append(mes, oexp)
doneFns = append(doneFns, func() error {
sde.Flush()
return nil
})
return tes, doneFns, nil
return tes, mes, doneFns, nil
}

func (sde *ocagentExporter) ExportSpans(ctx context.Context, td data.TraceData) error {
Expand Down
28 changes: 18 additions & 10 deletions exporter/exporterparser/stackdriver.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
type stackdriverConfig struct {
ProjectID string `yaml:"project,omitempty"`
EnableTracing bool `yaml:"enable_tracing,omitempty"`
EnableMetrics bool `yaml:"enable_metrics,omitempty"`
}

type stackdriverExporter struct {
Expand All @@ -37,45 +38,52 @@ var _ exporter.TraceExporter = (*stackdriverExporter)(nil)

// StackdriverTraceExportersFromYAML parses the yaml bytes and returns an exporter.TraceExporter targeting
// Stackdriver according to the configuration settings.
func StackdriverTraceExportersFromYAML(config []byte) (tes []exporter.TraceExporter, doneFns []func() error, err error) {
func StackdriverTraceExportersFromYAML(config []byte) (tes []exporter.TraceExporter, mes []exporter.MetricsExporter, doneFns []func() error, err error) {
var cfg struct {
Exporters *struct {
Stackdriver *stackdriverConfig `yaml:"stackdriver"`
} `yaml:"exporters"`
}
if err := yamlUnmarshal(config, &cfg); err != nil {
return nil, nil, err
return nil, nil, nil, err
}
if cfg.Exporters == nil {
return nil, nil, nil
return nil, nil, nil, nil
}
sc := cfg.Exporters.Stackdriver
if sc == nil {
return nil, nil, nil
return nil, nil, nil, nil
}
if !sc.EnableTracing {
return nil, nil, nil
if !sc.EnableTracing && !sc.EnableMetrics {
return nil, nil, nil, nil
}

// TODO: For each ProjectID, create a different exporter
// or at least a unique Stackdriver client per ProjectID.
if sc.ProjectID == "" {
return nil, nil, fmt.Errorf("Stackdriver config requires a project ID")
return nil, nil, nil, fmt.Errorf("Stackdriver config requires a project ID")
}

sde, serr := stackdriver.NewExporter(stackdriver.Options{
ProjectID: sc.ProjectID,
})
if serr != nil {
return nil, nil, fmt.Errorf("Cannot configure Stackdriver Trace exporter: %v", serr)
return nil, nil, nil, fmt.Errorf("Cannot configure Stackdriver Trace exporter: %v", serr)
}

tes = append(tes, &stackdriverExporter{exporter: sde})
stexp := &stackdriverExporter{exporter: sde}
if sc.EnableTracing {
tes = append(tes, stexp)
}
if sc.EnableMetrics {
// TODO: (@odeke-em, @songya23) implement ExportMetrics for Stackdriver.
// mes = append(mes, oexp)
}
doneFns = append(doneFns, func() error {
sde.Flush()
return nil
})
return tes, doneFns, nil
return tes, mes, doneFns, nil
}

func (sde *stackdriverExporter) ExportSpans(ctx context.Context, td data.TraceData) error {
Expand Down
Loading

0 comments on commit d2f0713

Please sign in to comment.