From eb6b09e74813e5987551aa279e7804ddb13fa756 Mon Sep 17 00:00:00 2001 From: Jason Koch Date: Thu, 1 Oct 2020 21:45:54 -0700 Subject: [PATCH] Add option to send underlying metric timestamps on prometheus output (#11) --- prometheus.go | 38 ++++++++++++++----- prometheus_test.go | 92 ++++++++++++++++++++++++++++++++++++++++++++-- 2 files changed, 117 insertions(+), 13 deletions(-) diff --git a/prometheus.go b/prometheus.go index 263271d..c33cdef 100644 --- a/prometheus.go +++ b/prometheus.go @@ -23,9 +23,11 @@ import ( "net/http" "sort" "sync" + "time" "go.opencensus.io/trace" + "github.com/golang/protobuf/ptypes" commonpb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/common/v1" metricspb "github.com/census-instrumentation/opencensus-proto/gen-go/metrics/v1" resourcepb "github.com/census-instrumentation/opencensus-proto/gen-go/resource/v1" @@ -44,10 +46,11 @@ type Exporter struct { // Options customizes a created Prometheus Exporter. type Options struct { - Namespace string - OnError func(err error) - ConstLabels prometheus.Labels // ConstLabels will be set as labels on all views. - Registry *prometheus.Registry + Namespace string + OnError func(err error) + ConstLabels prometheus.Labels // ConstLabels will be set as labels on all views. + Registry *prometheus.Registry + SendTimestamps bool } // New is the constructor to make an Exporter with the defined Options. @@ -237,7 +240,7 @@ func (c *collector) protoTimeSeriesToPrometheusMetrics(ctx context.Context, metr pmetrics := make([]prometheus.Metric, 0, len(ts.Points)) for _, point := range ts.Points { - pmet, err := protoMetricToPrometheusMetric(ctx, point, desc, derivedPrometheusValueType, labelValues) + pmet, err := protoMetricToPrometheusMetric(ctx, point, desc, derivedPrometheusValueType, labelValues, c.opts.SendTimestamps) if err == nil { pmetrics = append(pmetrics, pmet) } else { @@ -277,7 +280,12 @@ func protoLabelKeysToLabels(protoLabelKeys []*metricspb.LabelKey) []string { return labelKeys } -func protoMetricToPrometheusMetric(ctx context.Context, point *metricspb.Point, desc *prometheus.Desc, derivedPrometheusType prometheus.ValueType, labelValues []string) (prometheus.Metric, error) { +func protoMetricToPrometheusMetric(ctx context.Context, point *metricspb.Point, desc *prometheus.Desc, derivedPrometheusType prometheus.ValueType, labelValues []string, sendTimestamps bool) (prometheus.Metric, error) { + timestamp, err := ptypes.Timestamp(point.Timestamp) + if err != nil { + timestamp = time.Now() + } + switch value := point.Value.(type) { case *metricspb.Point_DistributionValue: dValue := value.DistributionValue @@ -308,14 +316,26 @@ func protoMetricToPrometheusMetric(ctx context.Context, point *metricspb.Point, cumCount += countPerBucket points[bucket] = cumCount } - return prometheus.NewConstHistogram(desc, uint64(dValue.Count), dValue.Sum, points, labelValues...) + metric, err := prometheus.NewConstHistogram(desc, uint64(dValue.Count), dValue.Sum, points, labelValues...) + if err != nil || !sendTimestamps { + return metric, err + } + return prometheus.NewMetricWithTimestamp(timestamp, metric), nil case *metricspb.Point_Int64Value: // Derive the Prometheus - return prometheus.NewConstMetric(desc, derivedPrometheusType, float64(value.Int64Value), labelValues...) + metric, err := prometheus.NewConstMetric(desc, derivedPrometheusType, float64(value.Int64Value), labelValues...) + if err != nil || !sendTimestamps { + return metric, err + } + return prometheus.NewMetricWithTimestamp(timestamp, metric), nil case *metricspb.Point_DoubleValue: - return prometheus.NewConstMetric(desc, derivedPrometheusType, value.DoubleValue, labelValues...) + metric, err := prometheus.NewConstMetric(desc, derivedPrometheusType, value.DoubleValue, labelValues...) + if err != nil || !sendTimestamps { + return metric, err + } + return prometheus.NewMetricWithTimestamp(timestamp, metric), nil default: return nil, fmt.Errorf("Unhandled type: %T", point.Value) diff --git a/prometheus_test.go b/prometheus_test.go index 6c2d252..47eda01 100644 --- a/prometheus_test.go +++ b/prometheus_test.go @@ -39,6 +39,15 @@ var ( Seconds: 1543160298, Nanos: 100000997, } + // before is a scrape that happened 5s earlier + startTimestampBefore = ×tamp.Timestamp{ + Seconds: 1543160293, + Nanos: 100000090, + } + endTimestampBefore = ×tamp.Timestamp{ + Seconds: 1543160293, + Nanos: 100000997, + } ) func TestOnlyCumulativeWindowSupported(t *testing.T) { @@ -303,7 +312,7 @@ func makeMetrics() []*metricspb.Metric { }, Timeseries: []*metricspb.TimeSeries{ { - StartTimestamp: startTimestamp, + StartTimestamp: startTimestampBefore, LabelValues: []*metricspb.LabelValue{ {Value: "windows"}, {Value: "x86"}, @@ -311,7 +320,7 @@ func makeMetrics() []*metricspb.Metric { }, Points: []*metricspb.Point{ { - Timestamp: endTimestamp, + Timestamp: endTimestampBefore, Value: &metricspb.Point_Int64Value{ Int64Value: 99, }, @@ -319,7 +328,7 @@ func makeMetrics() []*metricspb.Metric { }, }, { - StartTimestamp: startTimestamp, + StartTimestamp: startTimestampBefore, LabelValues: []*metricspb.LabelValue{ {Value: "darwin"}, {Value: "386"}, @@ -327,7 +336,7 @@ func makeMetrics() []*metricspb.Metric { }, Points: []*metricspb.Point{ { - Timestamp: endTimestamp, + Timestamp: endTimestampBefore, Value: &metricspb.Point_DoubleValue{ DoubleValue: 49.5, }, @@ -443,3 +452,78 @@ with_metric_descriptor_count 2 t.Errorf("Mismatched output\nGot:\n%s\nWant:\n%s", g, w) } } + +func TestMetricsEndpointWithTimestampOutput(t *testing.T) { + exp, err := New(Options{ + SendTimestamps: true, + }) + if err != nil { + t.Fatalf("Failed to create Prometheus exporter: %v", err) + } + + srv := httptest.NewServer(exp) + defer srv.Close() + + // Now record some metrics. + metrics := makeMetrics() + for _, metric := range metrics { + exp.ExportMetric(context.Background(), nil, nil, metric) + } + + var i int + var output string + for { + time.Sleep(10 * time.Millisecond) + if i == 1000 { + t.Fatal("no output at / (10s wait)") + } + i++ + + resp, err := http.Get(srv.URL) + if err != nil { + t.Fatalf("Failed to get metrics on / error: %v", err) + } + + slurp, err := ioutil.ReadAll(resp.Body) + _ = resp.Body.Close() + if err != nil { + t.Fatalf("Failed to read body: %v", err) + } + + output = string(slurp) + if output != "" { + break + } + } + + if strings.Contains(output, "collected before with the same name and label values") { + t.Fatalf("metric name and labels were duplicated but must be unique. Got\n\t%q", output) + } + + if strings.Contains(output, "error(s) occurred") { + t.Fatalf("error reported by Prometheus registry:\n\t%s", output) + } + + want := `# HELP a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_ Unlimited metric key lengths +# TYPE a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_ counter +a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_{arch="x86",keykeykeykeykeykeykeykeykeykeykeykeykeykeykeykeykeykeykeykeykeykeykeykeykeykeykeykeykeykeykeykeykeykeykeykeykeykeykeykeykeykeykeykeykeykeykeykeykeykey="",my_org_department="Storage",os="windows"} 99 1543160298100 +# HELP this_one_there_where_ Extra ones +# TYPE this_one_there_where_ gauge +this_one_there_where_{arch="386",my_org_department="Ops",os="darwin"} 49.5 1543160293100 +this_one_there_where_{arch="x86",my_org_department="Storage",os="windows"} 99 1543160293100 +# HELP with_metric_descriptor This is a test +# TYPE with_metric_descriptor histogram +with_metric_descriptor_bucket{le="0"} 0 1543160298100 +with_metric_descriptor_bucket{le="10"} 1 1543160298100 +with_metric_descriptor_bucket{le="20"} 1 1543160298100 +with_metric_descriptor_bucket{le="30"} 1 1543160298100 +with_metric_descriptor_bucket{le="40"} 6 1543160298100 +with_metric_descriptor_bucket{le="+Inf"} 2 1543160298100 +with_metric_descriptor_sum 61.9 1543160298100 +with_metric_descriptor_count 2 1543160298100 +` + + if g, w := output, want; g != w { + t.Errorf("Mismatched output\nGot:\n%s\nWant:\n%s", g, w) + } +}