Skip to content

Commit

Permalink
Add option to send underlying metric timestamps on prometheus output (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
jasonk000 authored Oct 2, 2020
1 parent 7197eac commit eb6b09e
Show file tree
Hide file tree
Showing 2 changed files with 117 additions and 13 deletions.
38 changes: 29 additions & 9 deletions prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,11 @@ import (
"net/http"
"sort"
"sync"
"time"

"go.opencensus.io/trace"

"github.com/golang/protobuf/ptypes"
commonpb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/common/v1"
metricspb "github.com/census-instrumentation/opencensus-proto/gen-go/metrics/v1"
resourcepb "github.com/census-instrumentation/opencensus-proto/gen-go/resource/v1"
Expand All @@ -44,10 +46,11 @@ type Exporter struct {

// Options customizes a created Prometheus Exporter.
type Options struct {
Namespace string
OnError func(err error)
ConstLabels prometheus.Labels // ConstLabels will be set as labels on all views.
Registry *prometheus.Registry
Namespace string
OnError func(err error)
ConstLabels prometheus.Labels // ConstLabels will be set as labels on all views.
Registry *prometheus.Registry
SendTimestamps bool
}

// New is the constructor to make an Exporter with the defined Options.
Expand Down Expand Up @@ -237,7 +240,7 @@ func (c *collector) protoTimeSeriesToPrometheusMetrics(ctx context.Context, metr

pmetrics := make([]prometheus.Metric, 0, len(ts.Points))
for _, point := range ts.Points {
pmet, err := protoMetricToPrometheusMetric(ctx, point, desc, derivedPrometheusValueType, labelValues)
pmet, err := protoMetricToPrometheusMetric(ctx, point, desc, derivedPrometheusValueType, labelValues, c.opts.SendTimestamps)
if err == nil {
pmetrics = append(pmetrics, pmet)
} else {
Expand Down Expand Up @@ -277,7 +280,12 @@ func protoLabelKeysToLabels(protoLabelKeys []*metricspb.LabelKey) []string {
return labelKeys
}

func protoMetricToPrometheusMetric(ctx context.Context, point *metricspb.Point, desc *prometheus.Desc, derivedPrometheusType prometheus.ValueType, labelValues []string) (prometheus.Metric, error) {
func protoMetricToPrometheusMetric(ctx context.Context, point *metricspb.Point, desc *prometheus.Desc, derivedPrometheusType prometheus.ValueType, labelValues []string, sendTimestamps bool) (prometheus.Metric, error) {
timestamp, err := ptypes.Timestamp(point.Timestamp)
if err != nil {
timestamp = time.Now()
}

switch value := point.Value.(type) {
case *metricspb.Point_DistributionValue:
dValue := value.DistributionValue
Expand Down Expand Up @@ -308,14 +316,26 @@ func protoMetricToPrometheusMetric(ctx context.Context, point *metricspb.Point,
cumCount += countPerBucket
points[bucket] = cumCount
}
return prometheus.NewConstHistogram(desc, uint64(dValue.Count), dValue.Sum, points, labelValues...)
metric, err := prometheus.NewConstHistogram(desc, uint64(dValue.Count), dValue.Sum, points, labelValues...)
if err != nil || !sendTimestamps {
return metric, err
}
return prometheus.NewMetricWithTimestamp(timestamp, metric), nil

case *metricspb.Point_Int64Value:
// Derive the Prometheus
return prometheus.NewConstMetric(desc, derivedPrometheusType, float64(value.Int64Value), labelValues...)
metric, err := prometheus.NewConstMetric(desc, derivedPrometheusType, float64(value.Int64Value), labelValues...)
if err != nil || !sendTimestamps {
return metric, err
}
return prometheus.NewMetricWithTimestamp(timestamp, metric), nil

case *metricspb.Point_DoubleValue:
return prometheus.NewConstMetric(desc, derivedPrometheusType, value.DoubleValue, labelValues...)
metric, err := prometheus.NewConstMetric(desc, derivedPrometheusType, value.DoubleValue, labelValues...)
if err != nil || !sendTimestamps {
return metric, err
}
return prometheus.NewMetricWithTimestamp(timestamp, metric), nil

default:
return nil, fmt.Errorf("Unhandled type: %T", point.Value)
Expand Down
92 changes: 88 additions & 4 deletions prometheus_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,15 @@ var (
Seconds: 1543160298,
Nanos: 100000997,
}
// before is a scrape that happened 5s earlier
startTimestampBefore = &timestamp.Timestamp{
Seconds: 1543160293,
Nanos: 100000090,
}
endTimestampBefore = &timestamp.Timestamp{
Seconds: 1543160293,
Nanos: 100000997,
}
)

func TestOnlyCumulativeWindowSupported(t *testing.T) {
Expand Down Expand Up @@ -303,31 +312,31 @@ func makeMetrics() []*metricspb.Metric {
},
Timeseries: []*metricspb.TimeSeries{
{
StartTimestamp: startTimestamp,
StartTimestamp: startTimestampBefore,
LabelValues: []*metricspb.LabelValue{
{Value: "windows"},
{Value: "x86"},
{Value: "Storage"},
},
Points: []*metricspb.Point{
{
Timestamp: endTimestamp,
Timestamp: endTimestampBefore,
Value: &metricspb.Point_Int64Value{
Int64Value: 99,
},
},
},
},
{
StartTimestamp: startTimestamp,
StartTimestamp: startTimestampBefore,
LabelValues: []*metricspb.LabelValue{
{Value: "darwin"},
{Value: "386"},
{Value: "Ops"},
},
Points: []*metricspb.Point{
{
Timestamp: endTimestamp,
Timestamp: endTimestampBefore,
Value: &metricspb.Point_DoubleValue{
DoubleValue: 49.5,
},
Expand Down Expand Up @@ -443,3 +452,78 @@ with_metric_descriptor_count 2
t.Errorf("Mismatched output\nGot:\n%s\nWant:\n%s", g, w)
}
}

func TestMetricsEndpointWithTimestampOutput(t *testing.T) {
exp, err := New(Options{
SendTimestamps: true,
})
if err != nil {
t.Fatalf("Failed to create Prometheus exporter: %v", err)
}

srv := httptest.NewServer(exp)
defer srv.Close()

// Now record some metrics.
metrics := makeMetrics()
for _, metric := range metrics {
exp.ExportMetric(context.Background(), nil, nil, metric)
}

var i int
var output string
for {
time.Sleep(10 * time.Millisecond)
if i == 1000 {
t.Fatal("no output at / (10s wait)")
}
i++

resp, err := http.Get(srv.URL)
if err != nil {
t.Fatalf("Failed to get metrics on / error: %v", err)
}

slurp, err := ioutil.ReadAll(resp.Body)
_ = resp.Body.Close()
if err != nil {
t.Fatalf("Failed to read body: %v", err)
}

output = string(slurp)
if output != "" {
break
}
}

if strings.Contains(output, "collected before with the same name and label values") {
t.Fatalf("metric name and labels were duplicated but must be unique. Got\n\t%q", output)
}

if strings.Contains(output, "error(s) occurred") {
t.Fatalf("error reported by Prometheus registry:\n\t%s", output)
}

want := `# HELP a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_ Unlimited metric key lengths
# TYPE a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_ counter
a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_{arch="x86",keykeykeykeykeykeykeykeykeykeykeykeykeykeykeykeykeykeykeykeykeykeykeykeykeykeykeykeykeykeykeykeykeykeykeykeykeykeykeykeykeykeykeykeykeykeykeykeykeykey="",my_org_department="Storage",os="windows"} 99 1543160298100
# HELP this_one_there_where_ Extra ones
# TYPE this_one_there_where_ gauge
this_one_there_where_{arch="386",my_org_department="Ops",os="darwin"} 49.5 1543160293100
this_one_there_where_{arch="x86",my_org_department="Storage",os="windows"} 99 1543160293100
# HELP with_metric_descriptor This is a test
# TYPE with_metric_descriptor histogram
with_metric_descriptor_bucket{le="0"} 0 1543160298100
with_metric_descriptor_bucket{le="10"} 1 1543160298100
with_metric_descriptor_bucket{le="20"} 1 1543160298100
with_metric_descriptor_bucket{le="30"} 1 1543160298100
with_metric_descriptor_bucket{le="40"} 6 1543160298100
with_metric_descriptor_bucket{le="+Inf"} 2 1543160298100
with_metric_descriptor_sum 61.9 1543160298100
with_metric_descriptor_count 2 1543160298100
`

if g, w := output, want; g != w {
t.Errorf("Mismatched output\nGot:\n%s\nWant:\n%s", g, w)
}
}

0 comments on commit eb6b09e

Please sign in to comment.