Skip to content

Commit

Permalink
Enable disperser-v2 metrics registry
Browse files Browse the repository at this point in the history
pschork committed Jan 8, 2025

Verified

This commit was signed with the committer’s verified signature.
1 parent 5e23e1a commit 86248fa
Showing 4 changed files with 53 additions and 6 deletions.
38 changes: 36 additions & 2 deletions disperser/apiserver/metrics_v2.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,20 @@
package apiserver

import (
"context"
"fmt"
"net/http"
"time"

"github.com/Layr-Labs/eigenda/common"
"github.com/Layr-Labs/eigenda/disperser"
"github.com/Layr-Labs/eigensdk-go/logging"
grpcprom "github.com/grpc-ecosystem/go-grpc-middleware/providers/prometheus"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/collectors"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/client_golang/prometheus/promhttp"
"google.golang.org/grpc"
"time"
)

const namespace = "eigenda_disperser_api"
@@ -22,12 +30,18 @@ type metricsV2 struct {
validateDispersalRequestLatency *prometheus.SummaryVec
storeBlobLatency *prometheus.SummaryVec
getBlobStatusLatency *prometheus.SummaryVec

registry *prometheus.Registry
httpPort string
logger logging.Logger
}

// newAPIServerV2Metrics creates a new metricsV2 instance.
func newAPIServerV2Metrics(registry *prometheus.Registry) *metricsV2 {
func newAPIServerV2Metrics(registry *prometheus.Registry, metricsConfig disperser.MetricsConfig, logger logging.Logger) *metricsV2 {
grpcMetrics := grpcprom.NewServerMetrics()
registry.MustRegister(grpcMetrics)
registry.MustRegister(collectors.NewProcessCollector(collectors.ProcessCollectorOpts{}))
registry.MustRegister(collectors.NewGoCollector())

grpcServerOption := grpc.UnaryInterceptor(
grpcMetrics.UnaryServerInterceptor(),
@@ -113,9 +127,29 @@ func newAPIServerV2Metrics(registry *prometheus.Registry) *metricsV2 {
validateDispersalRequestLatency: validateDispersalRequestLatency,
storeBlobLatency: storeBlobLatency,
getBlobStatusLatency: getBlobStatusLatency,
registry: registry,
httpPort: metricsConfig.HTTPPort,
logger: logger.With("component", "DisperserV2Metrics"),
}
}

// Start starts the metrics server
func (m *metricsV2) Start(ctx context.Context) {
m.logger.Info("Starting metrics server at ", "port", m.httpPort)
addr := fmt.Sprintf(":%s", m.httpPort)
go func() {
log := m.logger
mux := http.NewServeMux()
m.logger.Info("metrics registry", "registry", m.registry)
mux.Handle("/metrics", promhttp.HandlerFor(
m.registry,
promhttp.HandlerOpts{},
))
err := http.ListenAndServe(addr, mux)
log.Error("Prometheus server failed", "err", err)
}()
}

func (m *metricsV2) reportGetBlobCommitmentLatency(duration time.Duration) {
m.getBlobCommitmentLatency.WithLabelValues().Observe(common.ToMilliseconds(duration))
}
12 changes: 10 additions & 2 deletions disperser/apiserver/server_v2.go
Original file line number Diff line number Diff line change
@@ -58,7 +58,8 @@ type DispersalServerV2 struct {
maxNumSymbolsPerBlob uint64
onchainStateRefreshInterval time.Duration

metrics *metricsV2
metricsConfig disperser.MetricsConfig
metrics *metricsV2
}

// NewDispersalServerV2 creates a new Server struct with the provided parameters.
@@ -74,6 +75,7 @@ func NewDispersalServerV2(
onchainStateRefreshInterval time.Duration,
_logger logging.Logger,
registry *prometheus.Registry,
metricsConfig disperser.MetricsConfig,
) (*DispersalServerV2, error) {
if serverConfig.GrpcPort == "" {
return nil, errors.New("grpc port is required")
@@ -116,7 +118,8 @@ func NewDispersalServerV2(
maxNumSymbolsPerBlob: maxNumSymbolsPerBlob,
onchainStateRefreshInterval: onchainStateRefreshInterval,

metrics: newAPIServerV2Metrics(registry),
metricsConfig: metricsConfig,
metrics: newAPIServerV2Metrics(registry, metricsConfig, logger),
}, nil
}

@@ -167,6 +170,11 @@ func (s *DispersalServerV2) Start(ctx context.Context) error {
return errors.New("could not start GRPC server")
}

// Start the metrics server
if s.metricsConfig.EnableMetrics {
s.metrics.Start(context.Background())
}

return nil
}

7 changes: 6 additions & 1 deletion disperser/apiserver/server_v2_test.go
Original file line number Diff line number Diff line change
@@ -517,7 +517,12 @@ func newTestServerV2(t *testing.T) *testComponents {
10,
time.Hour,
logger,
prometheus.NewRegistry())
prometheus.NewRegistry(),
disperser.MetricsConfig{
HTTPPort: "9094",
EnableMetrics: false,
},
)
assert.NoError(t, err)

err = s.RefreshOnchainState(context.Background())
2 changes: 1 addition & 1 deletion disperser/cmd/apiserver/main.go
Original file line number Diff line number Diff line change
@@ -182,6 +182,7 @@ func RunDisperserServer(ctx *cli.Context) error {
config.OnchainStateRefreshInterval,
logger,
reg,
config.MetricsConfig,
)
if err != nil {
return err
@@ -212,7 +213,6 @@ func RunDisperserServer(ctx *cli.Context) error {
// Enable Metrics Block
if config.MetricsConfig.EnableMetrics {
httpSocket := fmt.Sprintf(":%s", config.MetricsConfig.HTTPPort)
// TODO(cody-littley): once we deprecate v1, move all remaining metrics functionality to metrics_v2.go
metrics.Start(context.Background())
logger.Info("Enabled metrics for Disperser", "socket", httpSocket)
}

0 comments on commit 86248fa

Please sign in to comment.