diff --git a/disperser/apiserver/metrics_v2.go b/disperser/apiserver/metrics_v2.go index 9273aa9d52..c3d2b7ce87 100644 --- a/disperser/apiserver/metrics_v2.go +++ b/disperser/apiserver/metrics_v2.go @@ -1,12 +1,20 @@ package apiserver import ( + "context" + "fmt" + "net/http" + "time" + "github.com/Layr-Labs/eigenda/common" + "github.com/Layr-Labs/eigenda/disperser" + "github.com/Layr-Labs/eigensdk-go/logging" grpcprom "github.com/grpc-ecosystem/go-grpc-middleware/providers/prometheus" "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/collectors" "github.com/prometheus/client_golang/prometheus/promauto" + "github.com/prometheus/client_golang/prometheus/promhttp" "google.golang.org/grpc" - "time" ) const namespace = "eigenda_disperser_api" @@ -22,12 +30,18 @@ type metricsV2 struct { validateDispersalRequestLatency *prometheus.SummaryVec storeBlobLatency *prometheus.SummaryVec getBlobStatusLatency *prometheus.SummaryVec + + registry *prometheus.Registry + httpPort string + logger logging.Logger } // newAPIServerV2Metrics creates a new metricsV2 instance. -func newAPIServerV2Metrics(registry *prometheus.Registry) *metricsV2 { +func newAPIServerV2Metrics(registry *prometheus.Registry, metricsConfig disperser.MetricsConfig, logger logging.Logger) *metricsV2 { grpcMetrics := grpcprom.NewServerMetrics() registry.MustRegister(grpcMetrics) + registry.MustRegister(collectors.NewProcessCollector(collectors.ProcessCollectorOpts{})) + registry.MustRegister(collectors.NewGoCollector()) grpcServerOption := grpc.UnaryInterceptor( grpcMetrics.UnaryServerInterceptor(), @@ -113,9 +127,29 @@ func newAPIServerV2Metrics(registry *prometheus.Registry) *metricsV2 { validateDispersalRequestLatency: validateDispersalRequestLatency, storeBlobLatency: storeBlobLatency, getBlobStatusLatency: getBlobStatusLatency, + registry: registry, + httpPort: metricsConfig.HTTPPort, + logger: logger.With("component", "DisperserV2Metrics"), } } +// Start starts the metrics server +func (m *metricsV2) Start(ctx context.Context) { + m.logger.Info("Starting metrics server at ", "port", m.httpPort) + addr := fmt.Sprintf(":%s", m.httpPort) + go func() { + log := m.logger + mux := http.NewServeMux() + m.logger.Info("metrics registry", "registry", m.registry) + mux.Handle("/metrics", promhttp.HandlerFor( + m.registry, + promhttp.HandlerOpts{}, + )) + err := http.ListenAndServe(addr, mux) + log.Error("Prometheus server failed", "err", err) + }() +} + func (m *metricsV2) reportGetBlobCommitmentLatency(duration time.Duration) { m.getBlobCommitmentLatency.WithLabelValues().Observe(common.ToMilliseconds(duration)) } diff --git a/disperser/apiserver/server_v2.go b/disperser/apiserver/server_v2.go index efe5566500..e308ebf0ce 100644 --- a/disperser/apiserver/server_v2.go +++ b/disperser/apiserver/server_v2.go @@ -58,7 +58,8 @@ type DispersalServerV2 struct { maxNumSymbolsPerBlob uint64 onchainStateRefreshInterval time.Duration - metrics *metricsV2 + metricsConfig disperser.MetricsConfig + metrics *metricsV2 } // NewDispersalServerV2 creates a new Server struct with the provided parameters. @@ -74,6 +75,7 @@ func NewDispersalServerV2( onchainStateRefreshInterval time.Duration, _logger logging.Logger, registry *prometheus.Registry, + metricsConfig disperser.MetricsConfig, ) (*DispersalServerV2, error) { if serverConfig.GrpcPort == "" { return nil, errors.New("grpc port is required") @@ -116,7 +118,8 @@ func NewDispersalServerV2( maxNumSymbolsPerBlob: maxNumSymbolsPerBlob, onchainStateRefreshInterval: onchainStateRefreshInterval, - metrics: newAPIServerV2Metrics(registry), + metricsConfig: metricsConfig, + metrics: newAPIServerV2Metrics(registry, metricsConfig, logger), }, nil } @@ -167,6 +170,11 @@ func (s *DispersalServerV2) Start(ctx context.Context) error { return errors.New("could not start GRPC server") } + // Start the metrics server + if s.metricsConfig.EnableMetrics { + s.metrics.Start(context.Background()) + } + return nil } diff --git a/disperser/apiserver/server_v2_test.go b/disperser/apiserver/server_v2_test.go index 305b6bd437..44a6924545 100644 --- a/disperser/apiserver/server_v2_test.go +++ b/disperser/apiserver/server_v2_test.go @@ -517,7 +517,12 @@ func newTestServerV2(t *testing.T) *testComponents { 10, time.Hour, logger, - prometheus.NewRegistry()) + prometheus.NewRegistry(), + disperser.MetricsConfig{ + HTTPPort: "9094", + EnableMetrics: false, + }, + ) assert.NoError(t, err) err = s.RefreshOnchainState(context.Background()) diff --git a/disperser/cmd/apiserver/main.go b/disperser/cmd/apiserver/main.go index 2659f0c87b..c6f03871a3 100644 --- a/disperser/cmd/apiserver/main.go +++ b/disperser/cmd/apiserver/main.go @@ -182,6 +182,7 @@ func RunDisperserServer(ctx *cli.Context) error { config.OnchainStateRefreshInterval, logger, reg, + config.MetricsConfig, ) if err != nil { return err @@ -212,7 +213,6 @@ func RunDisperserServer(ctx *cli.Context) error { // Enable Metrics Block if config.MetricsConfig.EnableMetrics { httpSocket := fmt.Sprintf(":%s", config.MetricsConfig.HTTPPort) - // TODO(cody-littley): once we deprecate v1, move all remaining metrics functionality to metrics_v2.go metrics.Start(context.Background()) logger.Info("Enabled metrics for Disperser", "socket", httpSocket) }