From 5567b87bbdb5ef5c650da2e4714b91b9a6444192 Mon Sep 17 00:00:00 2001 From: Chris Hager Date: Wed, 12 Jun 2024 13:02:52 +0200 Subject: [PATCH] API + SSE subscription start api + sse send txs move TxIn to common, use API as receiver cleanup --- api/handler.go | 52 +++++++++++++ api/handler_test.go | 30 +++++++ api/server.go | 125 ++++++++++++++++++++++++++++++ cmd/collect/main.go | 10 +++ cmd/dev/main.go | 101 ++++++++++++++++++++++++ collector/collector.go | 19 +++++ collector/node_conn.go | 10 ++- collector/node_conn_bloxroute.go | 18 +++-- collector/node_conn_chainbound.go | 10 ++- collector/node_conn_eden.go | 18 +++-- collector/receiver.go | 13 +++- collector/tx_processor.go | 12 +-- collector/tx_processor_test.go | 6 +- collector/types.go | 12 --- common/analyzer.go | 2 +- common/types.go | 9 +++ go.mod | 23 +++--- go.sum | 42 +++++----- scripts/subscibe-test/main.go | 8 +- 19 files changed, 448 insertions(+), 72 deletions(-) create mode 100644 api/handler.go create mode 100644 api/handler_test.go create mode 100644 api/server.go create mode 100644 cmd/dev/main.go diff --git a/api/handler.go b/api/handler.go new file mode 100644 index 0000000..827e3c0 --- /dev/null +++ b/api/handler.go @@ -0,0 +1,52 @@ +package api + +import ( + "fmt" + "net/http" + + "github.com/google/uuid" +) + +type SSESubscription struct { + uid string + txC chan string +} + +func (s *Server) handleTxSSE(w http.ResponseWriter, r *http.Request) { + // SSE server for transactions + s.log.Info("SSE connection opened for transactions") + + // Set CORS headers to allow all origins. You may want to restrict this to specific origins in a production environment. + w.Header().Set("Access-Control-Allow-Origin", "*") + w.Header().Set("Access-Control-Expose-Headers", "Content-Type") + + w.Header().Set("Content-Type", "text/event-stream") + w.Header().Set("Cache-Control", "no-cache") + w.Header().Set("Connection", "keep-alive") + + subscriber := SSESubscription{ + uid: uuid.New().String(), + txC: make(chan string, 100), + } + s.addSubscriber(&subscriber) + + // pingTicker := time.NewTicker(5 * time.Second) + + // Wait for txs or end of request... + for { + select { + case <-r.Context().Done(): + s.log.Info("SSE closed, removing subscriber") + s.removeSubscriber(&subscriber) + return + + case tx := <-subscriber.txC: + fmt.Fprintf(w, "data: %s\n\n", tx) + w.(http.Flusher).Flush() //nolint:forcetypeassert + + // case <-pingTicker.C: + // fmt.Fprintf(w, ": ping\n\n") + // w.(http.Flusher).Flush() //nolint:forcetypeassert + } + } +} diff --git a/api/handler_test.go b/api/handler_test.go new file mode 100644 index 0000000..657fd4a --- /dev/null +++ b/api/handler_test.go @@ -0,0 +1,30 @@ +package api + +// func getTestLogger() *zap.SugaredLogger { +// return common.GetLogger(true, false) +// } + +// func Test_Handlers_Healthcheck_Drain_Undrain(t *testing.T) { +// const ( +// latency = 200 * time.Millisecond +// listenAddr = ":8080" +// ) + +// //nolint: exhaustruct +// // s := New(&HTTPServerConfig{ +// // DrainDuration: latency, +// // ListenAddr: listenAddr, +// // Log: getTestLogger(), +// // }) + +// // { // Check health +// // req := httptest.NewRequest(http.MethodGet, "http://localhost"+listenAddr+"/readyz", nil) //nolint:goconst,nolintlint +// // w := httptest.NewRecorder() +// // s.handleReadinessCheck(w, req) +// // resp := w.Result() +// // defer resp.Body.Close() +// // _, err := io.ReadAll(resp.Body) +// // require.NoError(t, err) +// // require.Equal(t, http.StatusOK, resp.StatusCode, "Healthcheck must return `Ok` before draining") +// // } +// } diff --git a/api/server.go b/api/server.go new file mode 100644 index 0000000..17cdd36 --- /dev/null +++ b/api/server.go @@ -0,0 +1,125 @@ +// Package api contains the webserver for API and SSE subscription +package api + +import ( + "context" + "errors" + "net/http" + "sync" + "time" + + "github.com/flashbots/go-utils/httplogger" + "github.com/flashbots/mempool-dumpster/common" + "github.com/go-chi/chi/middleware" + "github.com/go-chi/chi/v5" + "go.uber.org/atomic" + "go.uber.org/zap" +) + +type HTTPServerConfig struct { + ListenAddr string + Log *zap.SugaredLogger + + EnablePprof bool // https://go-chi.io/#/pages/middleware?id=profiler + + DrainDuration time.Duration + GracefulShutdownDuration time.Duration + ReadTimeout time.Duration + WriteTimeout time.Duration +} + +type Server struct { + cfg *HTTPServerConfig + isReady atomic.Bool + log *zap.SugaredLogger + + srv *http.Server + sseConnectionMap map[string]*SSESubscription + sseConnectionLock sync.RWMutex +} + +func New(cfg *HTTPServerConfig) (srv *Server) { + srv = &Server{ + cfg: cfg, + log: cfg.Log, + srv: nil, + sseConnectionMap: make(map[string]*SSESubscription), + } + srv.isReady.Swap(true) + + mux := chi.NewRouter() + + mux.With(srv.httpLogger).Get("/sse/transactions", srv.handleTxSSE) + // mux.With(srv.httpLogger).Get("/sse/dummyTx", srv.handleDummyTx) + + if cfg.EnablePprof { + srv.log.Info("pprof API enabled") + mux.Mount("/debug", middleware.Profiler()) + } + + srv.srv = &http.Server{ + Addr: cfg.ListenAddr, + Handler: mux, + ReadTimeout: cfg.ReadTimeout, + WriteTimeout: cfg.WriteTimeout, + } + + return srv +} + +func (s *Server) httpLogger(next http.Handler) http.Handler { + return httplogger.LoggingMiddlewareZap(s.log.Desugar(), next) +} + +func (s *Server) RunInBackground() { + // api + go func() { + s.log.With("listenAddress", s.cfg.ListenAddr).Info("Starting HTTP server") + if err := s.srv.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) { + s.log.With("err", err).Error("HTTP server failed") + } + }() +} + +func (s *Server) Shutdown() { + // api + ctx, cancel := context.WithTimeout(context.Background(), s.cfg.GracefulShutdownDuration) + defer cancel() + if err := s.srv.Shutdown(ctx); err != nil { + s.log.With("err", err).Error("Graceful HTTP server shutdown failed") + } else { + s.log.Info("HTTP server gracefully stopped") + } +} + +func (s *Server) addSubscriber(sub *SSESubscription) { + s.sseConnectionLock.Lock() + defer s.sseConnectionLock.Unlock() + s.sseConnectionMap[sub.uid] = sub +} + +func (s *Server) removeSubscriber(sub *SSESubscription) { + s.sseConnectionLock.Lock() + defer s.sseConnectionLock.Unlock() + delete(s.sseConnectionMap, sub.uid) + s.log.With("subscribers", len(s.sseConnectionMap)).Info("removed subscriber") +} + +func (s *Server) SendTx(ctx context.Context, tx *common.TxIn) error { + s.sseConnectionLock.RLock() + defer s.sseConnectionLock.RUnlock() + if len(s.sseConnectionMap) == 0 { + return nil + } + + txRLP, err := common.TxToRLPString(tx.Tx) + if err != nil { + return err + } + + for _, sub := range s.sseConnectionMap { + sub.txC <- txRLP + } + + return nil +} diff --git a/cmd/collect/main.go b/cmd/collect/main.go index 82295e1..b38d2f8 100644 --- a/cmd/collect/main.go +++ b/cmd/collect/main.go @@ -83,6 +83,14 @@ var ( Usage: "sources of txs to send to receivers", Category: "Tx Receivers Configuration", }, + + // SSE tx subscription + &cli.StringFlag{ + Name: "api-listen-addr", + EnvVars: []string{"API_ADDR"}, + Usage: "API listen address (host:port)", + Category: "Tx Receivers Configuration", + }, } ) @@ -112,6 +120,7 @@ func runCollector(cCtx *cli.Context) error { chainboundAuth = cCtx.StringSlice("chainbound") receivers = cCtx.StringSlice("tx-receivers") receiversAllowedSources = cCtx.StringSlice("tx-receivers-allowed-sources") + apiListenAddr = cCtx.String("api-listen-addr") ) // Logger setup @@ -145,6 +154,7 @@ func runCollector(cCtx *cli.Context) error { ChainboundAuth: chainboundAuth, Receivers: receivers, ReceiversAllowedSources: receiversAllowedSources, + APIListenAddr: apiListenAddr, } collector.Start(&opts) diff --git a/cmd/dev/main.go b/cmd/dev/main.go new file mode 100644 index 0000000..8005f82 --- /dev/null +++ b/cmd/dev/main.go @@ -0,0 +1,101 @@ +package main + +import ( + "context" + "log" + "os" + "os/signal" + "syscall" + "time" + + "github.com/flashbots/mempool-dumpster/api" + "github.com/flashbots/mempool-dumpster/common" + "github.com/urfave/cli/v2" +) + +var ( + version = "dev" // is set during build process + + cliFlags = []cli.Flag{ + &cli.BoolFlag{ + Name: "debug", + EnvVars: []string{"DEBUG"}, + Usage: "enable debug logging", + Category: "Collector Configuration", + }, + + &cli.StringFlag{ + Name: "api-listen-addr", + EnvVars: []string{"API_ADDR"}, + Value: "localhost:8060", + Usage: "API listen address (host:port)", + Category: "Tx Receivers Configuration", + }, + } +) + +func main() { + app := &cli.App{ + Name: "dev", + Usage: "dev stuff", + Version: version, + Flags: cliFlags, + Action: runDev, + } + + if err := app.Run(os.Args); err != nil { + log.Fatal(err) + } +} + +func runDev(cCtx *cli.Context) error { + var ( + debug = cCtx.Bool("debug") + apiListenAddr = cCtx.String("api-listen-addr") + ) + + // Logger setup + log := common.GetLogger(debug, false) + defer func() { _ = log.Sync() }() + + if apiListenAddr == "" { + log.Fatal("api-listen-addr is required") + } + + server := api.New(&api.HTTPServerConfig{ + Log: log, + ListenAddr: apiListenAddr, + }) + server.RunInBackground() + + // Send dummy txs all X seconds + txRLP := "0x02f873018305643b840f2c19f08503f8bfbbb2832ab980940ed1bcc400acd34593451e76f854992198995f52808498e5b12ac080a051eb99ae13fd1ace55dd93a4b36eefa5d34e115cd7b9fd5d0ffac07300cbaeb2a0782d9ad12490b45af932d8c98cb3c2fd8c02cdd6317edb36bde2df7556fa9132" + _, tx, err := common.ParseTx(int64(1693785600337), txRLP) + if err != nil { + return err + } + ctx := context.TODO() + + go func() { + for { + time.Sleep(2 * time.Second) + tx := common.TxIn{ + T: time.Now().UTC(), + Tx: tx, + Source: "dummy", + } + + err = server.SendTx(ctx, &tx) + if err != nil { + log.Errorw("failed to send tx", "err", err) + } + } + }() + + // Wwait for termination signal + exit := make(chan os.Signal, 1) + signal.Notify(exit, os.Interrupt, syscall.SIGTERM) + <-exit + log.Info("bye") + return nil +} diff --git a/collector/collector.go b/collector/collector.go index 62a3f65..227d2c1 100644 --- a/collector/collector.go +++ b/collector/collector.go @@ -2,6 +2,7 @@ package collector import ( + "github.com/flashbots/mempool-dumpster/api" "github.com/flashbots/mempool-dumpster/common" "go.uber.org/zap" ) @@ -19,10 +20,22 @@ type CollectorOpts struct { Receivers []string ReceiversAllowedSources []string + + APIListenAddr string } // Start kicks off all the service components in the background func Start(opts *CollectorOpts) { + // Start API first + var apiServer *api.Server + if opts.APIListenAddr != "" { + apiServer = api.New(&api.HTTPServerConfig{ + Log: opts.Log, + ListenAddr: opts.APIListenAddr, + }) + go apiServer.RunInBackground() + } + processor := NewTxProcessor(TxProcessorOpts{ Log: opts.Log, UID: opts.UID, @@ -31,6 +44,12 @@ func Start(opts *CollectorOpts) { HTTPReceivers: opts.Receivers, ReceiversAllowedSources: opts.ReceiversAllowedSources, }) + + // If API server is running, add it as a TX receiver + if apiServer != nil { + processor.receivers = append(processor.receivers, apiServer) + } + go processor.Start() // Regular nodes diff --git a/collector/node_conn.go b/collector/node_conn.go index f12cbf9..69612ef 100644 --- a/collector/node_conn.go +++ b/collector/node_conn.go @@ -17,12 +17,12 @@ type NodeConnection struct { log *zap.SugaredLogger uri string uriTag string // identifier of tx source (i.e. "infura", "alchemy", "ws://localhost:8546") - txC chan TxIn + txC chan common.TxIn isAlchemy bool backoffSec int } -func NewNodeConnection(log *zap.SugaredLogger, nodeURI string, txC chan TxIn) *NodeConnection { +func NewNodeConnection(log *zap.SugaredLogger, nodeURI string, txC chan common.TxIn) *NodeConnection { srcAlias := common.TxSourcName(nodeURI) return &NodeConnection{ log: log.With("src", srcAlias), @@ -75,7 +75,11 @@ func (nc *NodeConnection) connect() { nc.log.Errorw("subscription error, reconnecting...", "error", err) go nc.reconnect() case tx := <-localC: - nc.txC <- TxIn{time.Now().UTC(), tx, nc.uriTag} + nc.txC <- common.TxIn{ + T: time.Now().UTC(), + Tx: tx, + Source: nc.uriTag, + } } } } diff --git a/collector/node_conn_bloxroute.go b/collector/node_conn_bloxroute.go index 65f8d5b..5e69312 100644 --- a/collector/node_conn_bloxroute.go +++ b/collector/node_conn_bloxroute.go @@ -24,7 +24,7 @@ import ( ) type BlxNodeOpts struct { - TxC chan TxIn + TxC chan common.TxIn Log *zap.SugaredLogger AuthHeader string URL string // optional override, default: blxDefaultURL @@ -47,7 +47,7 @@ type BlxNodeConnection struct { authHeader string url string srcTag string - txC chan TxIn + txC chan common.TxIn backoffSec int } @@ -156,7 +156,11 @@ func (nc *BlxNodeConnection) connect() { continue } - nc.txC <- TxIn{time.Now().UTC(), &tx, nc.srcTag} + nc.txC <- common.TxIn{ + T: time.Now().UTC(), + Tx: &tx, + Source: nc.srcTag, + } } } @@ -165,7 +169,7 @@ type BlxNodeConnectionGRPC struct { authHeader string url string srcTag string - txC chan TxIn + txC chan common.TxIn backoffSec int } @@ -253,7 +257,11 @@ func (nc *BlxNodeConnectionGRPC) connect() { continue } - nc.txC <- TxIn{time.Now().UTC(), &tx, nc.srcTag} + nc.txC <- common.TxIn{ + T: time.Now().UTC(), + Tx: &tx, + Source: nc.srcTag, + } } } } diff --git a/collector/node_conn_chainbound.go b/collector/node_conn_chainbound.go index 3dac29d..332af31 100644 --- a/collector/node_conn_chainbound.go +++ b/collector/node_conn_chainbound.go @@ -13,7 +13,7 @@ import ( ) type ChainboundNodeOpts struct { - TxC chan TxIn + TxC chan common.TxIn Log *zap.SugaredLogger APIKey string URL string // optional override, default: ChainboundDefaultURL @@ -26,7 +26,7 @@ type ChainboundNodeConnection struct { url string srcTag string fiberC chan *fiber.TransactionWithSender - txC chan TxIn + txC chan common.TxIn backoffSec int } @@ -58,7 +58,11 @@ func (cbc *ChainboundNodeConnection) Start() { go cbc.connect() for fiberTx := range cbc.fiberC { - cbc.txC <- TxIn{time.Now().UTC(), fiberTx.Transaction, cbc.srcTag} + cbc.txC <- common.TxIn{ + T: time.Now().UTC(), + Tx: fiberTx.Transaction, + Source: cbc.srcTag, + } } cbc.log.Error("chainbound stream closed") diff --git a/collector/node_conn_eden.go b/collector/node_conn_eden.go index 63388fe..a7ebb53 100644 --- a/collector/node_conn_eden.go +++ b/collector/node_conn_eden.go @@ -22,7 +22,7 @@ import ( ) type EdenNodeOpts struct { - TxC chan TxIn + TxC chan common.TxIn Log *zap.SugaredLogger AuthHeader string URL string // optional override, default: edenDefaultURL @@ -45,7 +45,7 @@ type EdenNodeConnection struct { authHeader string url string srcTag string - txC chan TxIn + txC chan common.TxIn backoffSec int } @@ -154,7 +154,11 @@ func (nc *EdenNodeConnection) connect() { continue } - nc.txC <- TxIn{time.Now().UTC(), &tx, nc.srcTag} + nc.txC <- common.TxIn{ + T: time.Now().UTC(), + Tx: &tx, + Source: nc.srcTag, + } } } @@ -163,7 +167,7 @@ type EdenNodeConnectionGRPC struct { authHeader string url string srcTag string - txC chan TxIn + txC chan common.TxIn backoffSec int } @@ -250,6 +254,10 @@ func (nc *EdenNodeConnectionGRPC) connect() { continue } - nc.txC <- TxIn{time.Now().UTC(), &tx, nc.srcTag} + nc.txC <- common.TxIn{ + T: time.Now().UTC(), + Tx: &tx, + Source: nc.srcTag, + } } } diff --git a/collector/receiver.go b/collector/receiver.go index 69f402e..8e7b07f 100644 --- a/collector/receiver.go +++ b/collector/receiver.go @@ -1,14 +1,23 @@ package collector +// +// Forwarding select transactions to various kinds of receivers. +// +// One type of receiver is HTTPReceiver here. +// Another type is the API server, to stream out transactions as SSE stream. +// + import ( "bytes" "context" "io" "net/http" + + "github.com/flashbots/mempool-dumpster/common" ) type TxReceiver interface { - SendTx(ctx context.Context, tx *TxIn) error + SendTx(ctx context.Context, tx *common.TxIn) error } type HTTPReceiver struct { @@ -21,7 +30,7 @@ func NewHTTPReceiver(url string) *HTTPReceiver { } } -func (r *HTTPReceiver) SendTx(ctx context.Context, tx *TxIn) error { +func (r *HTTPReceiver) SendTx(ctx context.Context, tx *common.TxIn) error { rawTx, err := tx.Tx.MarshalBinary() if err != nil { return err diff --git a/collector/tx_processor.go b/collector/tx_processor.go index 558b849..f4de1ea 100644 --- a/collector/tx_processor.go +++ b/collector/tx_processor.go @@ -40,7 +40,7 @@ type TxProcessor struct { log *zap.SugaredLogger uid string outDir string - txC chan TxIn // note: it's important that the value is sent in here instead of a pointer, otherwise there are memory race conditions + txC chan common.TxIn // note: it's important that the value is sent in here instead of a pointer, otherwise there are memory race conditions outFilesLock sync.RWMutex outFiles map[int64]*OutFiles @@ -74,7 +74,7 @@ func NewTxProcessor(opts TxProcessorOpts) *TxProcessor { return &TxProcessor{ //nolint:exhaustruct log: opts.Log, // .With("uid", uid), - txC: make(chan TxIn, 100), + txC: make(chan common.TxIn, 100), uid: opts.UID, outDir: opts.OutDir, @@ -126,7 +126,7 @@ func (p *TxProcessor) Start() { } } -func (p *TxProcessor) sendTxToReceivers(txIn TxIn) { +func (p *TxProcessor) sendTxToReceivers(txIn common.TxIn) { sourceOk := false for _, allowedSource := range p.receiversAllowedSources { if txIn.Source == allowedSource { @@ -155,7 +155,7 @@ func (p *TxProcessor) sendTxToReceivers(txIn TxIn) { wg.Wait() } -func (p *TxProcessor) processTx(txIn TxIn) { +func (p *TxProcessor) processTx(txIn common.TxIn) { tx := txIn.Tx txHashLower := strings.ToLower(tx.Hash().Hex()) log := p.log.With("tx_hash", txHashLower).With("source", txIn.Source) @@ -242,7 +242,7 @@ func (p *TxProcessor) processTx(txIn TxIn) { p.knownTxsLock.Unlock() } -func (p *TxProcessor) writeTrash(fTrash *os.File, txIn TxIn, message, notes string) { +func (p *TxProcessor) writeTrash(fTrash *os.File, txIn common.TxIn, message, notes string) { txHashLower := strings.ToLower(txIn.Tx.Hash().Hex()) _, err := fmt.Fprintf(fTrash, "%d,%s,%s,%s,%s\n", txIn.T.UnixMilli(), txHashLower, txIn.Source, message, notes) if err != nil { @@ -250,7 +250,7 @@ func (p *TxProcessor) writeTrash(fTrash *os.File, txIn TxIn, message, notes stri } } -func (p *TxProcessor) validateTx(fTrash *os.File, txIn TxIn) error { // inspired by https://github.com/flashbots/suave-geth/blob/dd3875eccde5b11feb621f10d9aae6417c98bdb0/core/txpool/txpool.go#L600 +func (p *TxProcessor) validateTx(fTrash *os.File, txIn common.TxIn) error { // inspired by https://github.com/flashbots/suave-geth/blob/dd3875eccde5b11feb621f10d9aae6417c98bdb0/core/txpool/txpool.go#L600 tx := txIn.Tx txHashLower := strings.ToLower(tx.Hash().Hex()) log := p.log.With("tx_hash", txHashLower).With("source", txIn.Source) diff --git a/collector/tx_processor_test.go b/collector/tx_processor_test.go index a660015..cadab3d 100644 --- a/collector/tx_processor_test.go +++ b/collector/tx_processor_test.go @@ -17,10 +17,10 @@ import ( // } type MockTxReceiver struct { - ReceivedTx *TxIn + ReceivedTx *common.TxIn } -func (r *MockTxReceiver) SendTx(ctx context.Context, tx *TxIn) error { +func (r *MockTxReceiver) SendTx(ctx context.Context, tx *common.TxIn) error { r.ReceivedTx = tx return nil } @@ -38,7 +38,7 @@ func TestTxProcessor_sendTxToReceivers(t *testing.T) { }) processor.receivers = append(processor.receivers, &receiver) - tx := TxIn{ + tx := common.TxIn{ T: time.Now(), Tx: nil, Source: "not-allowed", diff --git a/collector/types.go b/collector/types.go index b80e6d7..d9dd910 100644 --- a/collector/types.go +++ b/collector/types.go @@ -1,17 +1,5 @@ package collector -import ( - "time" - - "github.com/ethereum/go-ethereum/core/types" -) - -type TxIn struct { - T time.Time - Tx *types.Transaction - Source string -} - type TxDetail struct { Timestamp int64 `json:"timestamp"` Hash string `json:"hash"` diff --git a/common/analyzer.go b/common/analyzer.go index f150670..adadc8f 100644 --- a/common/analyzer.go +++ b/common/analyzer.go @@ -93,7 +93,7 @@ func (a *Analyzer2) init() { // Count transactions per type a.nTransactionsPerType[tx.TxType] += 1 - a.txBytesPerType[tx.TxType] += int64(len(tx.RawTx)) / 2 // not sure this is correct, the end result seems low for blob transactions... + a.txBytesPerType[tx.TxType] += int64(len(tx.RawTx)) / 2 // Go over sources for _, src := range tx.Sources { diff --git a/common/types.go b/common/types.go index 198cad7..2370354 100644 --- a/common/types.go +++ b/common/types.go @@ -2,8 +2,17 @@ package common import ( "strings" + "time" + + "github.com/ethereum/go-ethereum/core/types" ) +type TxIn struct { + T time.Time + Tx *types.Transaction + Source string +} + type BlxRawTxMsg struct { Params struct { Result struct { diff --git a/go.mod b/go.mod index 9826007..3f79fe3 100644 --- a/go.mod +++ b/go.mod @@ -10,13 +10,17 @@ require ( github.com/dustin/go-humanize v1.0.1 github.com/eden-network/mempool-service v1.0.1 github.com/ethereum/go-ethereum v1.13.14 + github.com/flashbots/go-utils v0.6.1-0.20240610084140-4461ab748667 + github.com/go-chi/chi v1.5.5 + github.com/go-chi/chi/v5 v5.0.12 + github.com/google/uuid v1.6.0 github.com/gorilla/mux v1.8.0 github.com/gorilla/websocket v1.5.0 github.com/lithammer/shortuuid v3.0.0+incompatible github.com/olekukonko/tablewriter v0.0.5 - github.com/stretchr/testify v1.8.4 + github.com/stretchr/testify v1.9.0 github.com/tdewolff/minify v2.3.6+incompatible - github.com/urfave/cli/v2 v2.25.7 + github.com/urfave/cli/v2 v2.27.2 github.com/xitongsys/parquet-go v1.6.2 github.com/xitongsys/parquet-go-source v0.0.0-20230830030807-0dd610dbff1d go.uber.org/atomic v1.11.0 @@ -43,7 +47,7 @@ require ( github.com/cockroachdb/tokenbucket v0.0.0-20230807174530-cc333fc44b06 // indirect github.com/consensys/bavard v0.1.13 // indirect github.com/consensys/gnark-crypto v0.12.1 // indirect - github.com/cpuguy83/go-md2man/v2 v2.0.2 // indirect + github.com/cpuguy83/go-md2man/v2 v2.0.4 // indirect github.com/crate-crypto/go-ipa v0.0.0-20231025140028-3c0104f4b233 // indirect github.com/crate-crypto/go-kzg-4844 v0.7.0 // indirect github.com/davecgh/go-spew v1.1.1 // indirect @@ -61,7 +65,6 @@ require ( github.com/gogo/protobuf v1.3.2 // indirect github.com/golang/protobuf v1.5.4 // indirect github.com/golang/snappy v0.0.5-0.20220116011046-fa5810519dcb // indirect - github.com/google/uuid v1.6.0 // indirect github.com/hashicorp/golang-lru/v2 v2.0.7 // indirect github.com/holiman/bloomfilter/v2 v2.0.3 // indirect github.com/holiman/uint256 v1.2.4 // indirect @@ -74,7 +77,6 @@ require ( github.com/mattn/go-colorable v0.1.13 // indirect github.com/mattn/go-isatty v0.0.20 // indirect github.com/mattn/go-runewidth v0.0.14 // indirect - github.com/matttproud/golang_protobuf_extensions v1.0.4 // indirect github.com/minio/sha256-simd v1.0.1 // indirect github.com/mitchellh/mapstructure v1.5.0 // indirect github.com/mmcloughlin/addchain v0.4.0 // indirect @@ -82,22 +84,23 @@ require ( github.com/pierrec/lz4/v4 v4.1.8 // indirect github.com/pkg/errors v0.9.1 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect - github.com/prometheus/client_golang v1.16.0 // indirect - github.com/prometheus/client_model v0.4.0 // indirect - github.com/prometheus/common v0.44.0 // indirect - github.com/prometheus/procfs v0.11.1 // indirect + github.com/prometheus/client_golang v1.19.1 // indirect + github.com/prometheus/client_model v0.5.0 // indirect + github.com/prometheus/common v0.48.0 // indirect + github.com/prometheus/procfs v0.12.0 // indirect github.com/prysmaticlabs/go-bitfield v0.0.0-20210809151128-385d8c5e3fb7 // indirect github.com/rivo/uniseg v0.4.3 // indirect github.com/rogpeppe/go-internal v1.11.0 // indirect github.com/russross/blackfriday/v2 v2.1.0 // indirect github.com/shirou/gopsutil v3.21.11+incompatible // indirect + github.com/sirupsen/logrus v1.9.3 // indirect github.com/supranational/blst v0.3.11 // indirect github.com/syndtr/goleveldb v1.0.1-0.20220721030215-126854af5e6d // indirect github.com/tdewolff/parse v2.3.4+incompatible // indirect github.com/tdewolff/test v1.0.9 // indirect github.com/tklauser/go-sysconf v0.3.12 // indirect github.com/tklauser/numcpus v0.6.1 // indirect - github.com/xrash/smetrics v0.0.0-20201216005158-039620a65673 // indirect + github.com/xrash/smetrics v0.0.0-20240312152122-5f08fbb34913 // indirect github.com/yusufpapurcu/wmi v1.2.2 // indirect go.uber.org/multierr v1.11.0 // indirect golang.org/x/crypto v0.21.0 // indirect diff --git a/go.sum b/go.sum index 2991423..c8578b9 100644 --- a/go.sum +++ b/go.sum @@ -229,8 +229,8 @@ github.com/coreos/go-systemd v0.0.0-20190321100706-95778dfbb74e/go.mod h1:F5haX7 github.com/coreos/go-systemd v0.0.0-20190719114852-fd7a80b32e1f/go.mod h1:F5haX7vjVVG0kc13fIWeqUViNPyEJxv/OmvnBo0Yme4= github.com/coreos/go-systemd/v22 v22.3.2/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc= github.com/cpuguy83/go-md2man v1.0.10/go.mod h1:SmD6nW6nTyfqj6ABTjUi3V3JVMnlJmwcJI5acqYI6dE= -github.com/cpuguy83/go-md2man/v2 v2.0.2 h1:p1EgwI/C7NhT0JmVkwCD2ZBK8j4aeHQX2pMHHBfMQ6w= -github.com/cpuguy83/go-md2man/v2 v2.0.2/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o= +github.com/cpuguy83/go-md2man/v2 v2.0.4 h1:wfIWP927BUkWJb2NmU/kNDYIBTh/ziUX91+lVfRxZq4= +github.com/cpuguy83/go-md2man/v2 v2.0.4/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o= github.com/crate-crypto/go-ipa v0.0.0-20231025140028-3c0104f4b233 h1:d28BXYi+wUpz1KBmiF9bWrjEMacUEREV6MBi2ODnrfQ= github.com/crate-crypto/go-ipa v0.0.0-20231025140028-3c0104f4b233/go.mod h1:geZJZH3SzKCqnz5VT0q/DyIG/tvu/dZk+VIfXicupJs= github.com/crate-crypto/go-kzg-4844 v0.7.0 h1:C0vgZRk4q4EZ/JgPfzuSoxdCq3C3mOZMBShovmncxvA= @@ -282,6 +282,8 @@ github.com/ferranbt/fastssz v0.1.3 h1:ZI+z3JH05h4kgmFXdHuR1aWYsgrg7o+Fw7/NCzM16M github.com/ferranbt/fastssz v0.1.3/go.mod h1:0Y9TEd/9XuFlh7mskMPfXiI2Dkw4Ddg9EyXt1W7MRvE= github.com/fjl/memsize v0.0.2 h1:27txuSD9or+NZlnOWdKUxeBzTAUkWCVh+4Gf2dWFOzA= github.com/fjl/memsize v0.0.2/go.mod h1:VvhXpOYNQvB+uIk2RvXzuaQtkQJzzIx6lSBe1xv7hi0= +github.com/flashbots/go-utils v0.6.1-0.20240610084140-4461ab748667 h1:Zpdah3TPNH96wp4IZG8eH81WU0ISS39+b1EEuVrwGBA= +github.com/flashbots/go-utils v0.6.1-0.20240610084140-4461ab748667/go.mod h1:6ZfgrAI+ApKSBF4QghFO06VfRJGGAOOyG4DO0siN2ow= github.com/fogleman/gg v1.2.1-0.20190220221249-0403632d5b90/go.mod h1:R/bRT+9gY/C5z7JzPU0zXsXHKM4/ayA+zqcVNZzPa1k= github.com/form3tech-oss/jwt-go v3.2.2+incompatible/go.mod h1:pbq4aXjuKjdthFRnoDwaVPLA+WlJuPGy+QneDUgJi2k= github.com/fortytw2/leaktest v1.3.0/go.mod h1:jDsjWgpAGjm2CA7WthBh/CdZYEPF31XHquHwclZch5g= @@ -306,6 +308,10 @@ github.com/gin-gonic/gin v1.4.0/go.mod h1:OW2EZn3DO8Ln9oIKOvM++LBO+5UPHJJDH72/q/ github.com/gin-gonic/gin v1.6.3/go.mod h1:75u5sXoLsGZoRN5Sgbi1eraJ4GU3++wFwWzhwvtwp4M= github.com/gin-gonic/gin v1.7.3/go.mod h1:jD2toBW3GZUr5UMcdrwQA10I7RuaFOl/SGeDjXkfUtY= github.com/go-check/check v0.0.0-20180628173108-788fd7840127/go.mod h1:9ES+weclKsC9YodN5RgxqK/VD9HM9JsCSh7rNhMZE98= +github.com/go-chi/chi v1.5.5 h1:vOB/HbEMt9QqBqErz07QehcOKHaWFtuj87tTDVz2qXE= +github.com/go-chi/chi v1.5.5/go.mod h1:C9JqLr3tIYjDOZpzn+BCuxY8z8vmca43EeMgyZt7irw= +github.com/go-chi/chi/v5 v5.0.12 h1:9euLV5sTrTNTRUU9POmDUvfxyj6LAABLUcEWO+JJb4s= +github.com/go-chi/chi/v5 v5.0.12/go.mod h1:DslCQbL2OYiznFReuXYUmQ2hGd1aDpCnlMNITLSKoi8= github.com/go-errors/errors v1.0.1/go.mod h1:f4zRHt4oKfwPJE5k8C9vpYG+aDHdBFUsgrm6/TyX73Q= github.com/go-errors/errors v1.4.2 h1:J6MZopCL4uSllY1OfXM374weqZFFItUbrImctkmUxIA= github.com/go-errors/errors v1.4.2/go.mod h1:sIVyrIiJhuEF+Pj9Ebtd6P/rEYROXFi3BopGUQ5a5Og= @@ -628,8 +634,6 @@ github.com/mattn/go-runewidth v0.0.9/go.mod h1:H031xJmbD/WCDINGzjvQ9THkh0rPKHF+m github.com/mattn/go-runewidth v0.0.14 h1:+xnbZSEeDbOIg5/mE6JF0w6n9duR1l3/WmbinWVwUuU= github.com/mattn/go-runewidth v0.0.14/go.mod h1:Jdepj2loyihRzMpdS35Xk/zdY8IAYHsh153qUoGf23w= github.com/mattn/goveralls v0.0.2/go.mod h1:8d1ZMHsd7fW6IRPKQh46F2WRpyib5/X4FOpevwGNQEw= -github.com/matttproud/golang_protobuf_extensions v1.0.4 h1:mmDVorXM7PCGKw94cs5zkfA9PSy5pEvNWRP0ET0TIVo= -github.com/matttproud/golang_protobuf_extensions v1.0.4/go.mod h1:BSXmuO+STAnVfrANrmjBb36TMTDstsz7MSK+HVaYKv4= github.com/mediocregopher/radix/v3 v3.4.2/go.mod h1:8FL3F6UQRXHXIBSPUs5h0RybMF8i4n7wVopoX3x7Bv8= github.com/microcosm-cc/bluemonday v1.0.2/go.mod h1:iVP4YcDBq+n/5fb23BhYFvIMq/leAFZyRl6bYmGDlGc= github.com/minio/md5-simd v1.1.2/go.mod h1:MzdKDxYpY2BT9XQFocsiZf/NKVtR7nkE4RoEpN+20RM= @@ -696,15 +700,15 @@ github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= -github.com/prometheus/client_golang v1.16.0 h1:yk/hx9hDbrGHovbci4BY+pRMfSuuat626eFsHb7tmT8= -github.com/prometheus/client_golang v1.16.0/go.mod h1:Zsulrv/L9oM40tJ7T815tM89lFEugiJ9HzIqaAx4LKc= +github.com/prometheus/client_golang v1.19.1 h1:wZWJDwK+NameRJuPGDhlnFgx8e8HN3XHQeLaYJFJBOE= +github.com/prometheus/client_golang v1.19.1/go.mod h1:mP78NwGzrVks5S2H6ab8+ZZGJLZUq1hoULYBAYBw1Ho= github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= -github.com/prometheus/client_model v0.4.0 h1:5lQXD3cAg1OXBf4Wq03gTrXHeaV0TQvGfUooCfx1yqY= -github.com/prometheus/client_model v0.4.0/go.mod h1:oMQmHW1/JoDwqLtg57MGgP/Fb1CJEYF2imWWhWtMkYU= -github.com/prometheus/common v0.44.0 h1:+5BrQJwiBB9xsMygAB3TNvpQKOwlkc25LbISbrdOOfY= -github.com/prometheus/common v0.44.0/go.mod h1:ofAIvZbQ1e/nugmZGz4/qCb9Ap1VoSTIO7x0VV9VvuY= -github.com/prometheus/procfs v0.11.1 h1:xRC8Iq1yyca5ypa9n1EZnWZkt7dwcoRPQwX/5gwaUuI= -github.com/prometheus/procfs v0.11.1/go.mod h1:eesXgaPo1q7lBpVMoMy0ZOFTth9hBn4W/y0/p/ScXhY= +github.com/prometheus/client_model v0.5.0 h1:VQw1hfvPvk3Uv6Qf29VrPF32JB6rtbgI6cYPYQjL0Qw= +github.com/prometheus/client_model v0.5.0/go.mod h1:dTiFglRmd66nLR9Pv9f0mZi7B7fk5Pm3gvsjB5tr+kI= +github.com/prometheus/common v0.48.0 h1:QO8U2CdOzSn1BBsmXJXduaaW+dY/5QLjfB8svtSzKKE= +github.com/prometheus/common v0.48.0/go.mod h1:0/KsvlIEfPQCQ5I2iNSAWKPZziNCvRs5EC6ILDTlAPc= +github.com/prometheus/procfs v0.12.0 h1:jluTpSng7V9hY0O2R9DzzJHYb2xULk9VTR1V1R/k6Bo= +github.com/prometheus/procfs v0.12.0/go.mod h1:pcuDEFsWDnvcgNzo4EEweacyhjeA9Zk3cnaOZAZEfOo= github.com/prysmaticlabs/go-bitfield v0.0.0-20210809151128-385d8c5e3fb7 h1:0tVE4tdWQK9ZpYygoV7+vS6QkDvQVySboMVEIxBJmXw= github.com/prysmaticlabs/go-bitfield v0.0.0-20210809151128-385d8c5e3fb7/go.mod h1:wmuf/mdK4VMD+jA9ThwcUKjg3a2XWM9cVfFYjDyY4j4= github.com/rivo/uniseg v0.2.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc= @@ -738,6 +742,8 @@ github.com/shurcooL/sanitized_anchor_name v1.0.0/go.mod h1:1NzhyTcUVG4SuEtjjoZeV github.com/sirupsen/logrus v1.4.1/go.mod h1:ni0Sbl8bgC9z8RoU9G6nDWqqs/fq4eDPysMBDgk/93Q= github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE= github.com/sirupsen/logrus v1.9.0/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ= +github.com/sirupsen/logrus v1.9.3 h1:dueUQJ1C2q9oE3F7wvmSGAaVtTmUizReu6fjN8uqzbQ= +github.com/sirupsen/logrus v1.9.3/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ= github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d/go.mod h1:OnSkiWE9lh6wB0YB77sQom3nweQdgAjqCqsofrRNTgc= github.com/smartystreets/goconvey v1.6.4/go.mod h1:syvi0/a8iFYH4r/RixwvyeAJjdLS9QV7WQ/tjFTllLA= github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA= @@ -762,8 +768,8 @@ github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/ github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.7.2/go.mod h1:R6va5+xMeoiuVRoj+gSkQ7d3FALtqAAGI1FQKckRals= -github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk= -github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= +github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= +github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= github.com/supranational/blst v0.3.11 h1:LyU6FolezeWAhvQk0k6O/d49jqgO52MSDDfYgbeoEm4= github.com/supranational/blst v0.3.11/go.mod h1:jZJtfjgudtNl4en1tzwPIV3KjUnQUvG3/j+w+fVonLw= github.com/syndtr/goleveldb v1.0.1-0.20220721030215-126854af5e6d h1:vfofYNRScrDdvS342BElfbETmL1Aiz3i2t0zfRj16Hs= @@ -786,8 +792,8 @@ github.com/ugorji/go/codec v0.0.0-20181204163529-d75b2dcb6bc8/go.mod h1:VFNgLljT github.com/ugorji/go/codec v1.1.7/go.mod h1:Ax+UKWsSmolVDwsd+7N3ZtXu+yMGCf907BLYF3GoBXY= github.com/umbracle/gohashtree v0.0.2-alpha.0.20230207094856-5b775a815c10 h1:CQh33pStIp/E30b7TxDlXfM0145bn2e8boI30IxAhTg= github.com/umbracle/gohashtree v0.0.2-alpha.0.20230207094856-5b775a815c10/go.mod h1:x/Pa0FF5Te9kdrlZKJK82YmAkvL8+f989USgz6Jiw7M= -github.com/urfave/cli/v2 v2.25.7 h1:VAzn5oq403l5pHjc4OhD54+XGO9cdKVL/7lDjF+iKUs= -github.com/urfave/cli/v2 v2.25.7/go.mod h1:8qnjx1vcq5s2/wpsqoZFndg2CE5tNFyrTvS6SinrnYQ= +github.com/urfave/cli/v2 v2.27.2 h1:6e0H+AkS+zDckwPCUrZkKX38mRaau4nL2uipkJpbkcI= +github.com/urfave/cli/v2 v2.27.2/go.mod h1:g0+79LmHHATl7DAcHO99smiR/T7uGLw84w8Y42x+4eM= github.com/urfave/negroni v1.0.0/go.mod h1:Meg73S6kFm/4PpbYdq35yYWoCZ9mS/YSx+lKnmiohz4= github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc= github.com/valyala/fasthttp v1.6.0/go.mod h1:FstJa9V+Pj9vQ7OJie2qMHdwemEDaDiSdBnvPM1Su9w= @@ -805,8 +811,8 @@ github.com/xitongsys/parquet-go-source v0.0.0-20200817004010-026bad9b25d0/go.mod github.com/xitongsys/parquet-go-source v0.0.0-20230830030807-0dd610dbff1d h1:VVWj8KWdzpebBaXpTVpOaQW32y2UCWy3JXJ5lVDa/e8= github.com/xitongsys/parquet-go-source v0.0.0-20230830030807-0dd610dbff1d/go.mod h1:HaLl1OAA7RAuQURU3Enxn7aRAI9yezsPPaxiGrbzxW4= github.com/xordataexchange/crypt v0.0.3-0.20170626215501-b2862e3d0a77/go.mod h1:aYKd//L2LvnjZzWKhF00oedf4jCCReLcmhLdhm1A27Q= -github.com/xrash/smetrics v0.0.0-20201216005158-039620a65673 h1:bAn7/zixMGCfxrRTfdpNzjtPYqr8smhKouy9mxVdGPU= -github.com/xrash/smetrics v0.0.0-20201216005158-039620a65673/go.mod h1:N3UwUGtsrSj3ccvlPHLoLsHnpR27oXr4ZE984MbSER8= +github.com/xrash/smetrics v0.0.0-20240312152122-5f08fbb34913 h1:+qGGcbkzsfDQNPPe9UDgpxAWQrhbbBXOYJFQDq/dtJw= +github.com/xrash/smetrics v0.0.0-20240312152122-5f08fbb34913/go.mod h1:4aEEwZQutDLsQv2Deui4iYQ6DWTxR14g6m8Wv88+Xqk= github.com/yalp/jsonpath v0.0.0-20180802001716-5cc68e5049a0/go.mod h1:/LWChgwKmvncFJFHJ7Gvn9wZArjbV5/FppcK2fKk/tI= github.com/yudai/gojsondiff v1.0.0/go.mod h1:AY32+k2cwILAkW1fbgxQ5mUmMiZFgLIV+FBNExI05xg= github.com/yudai/golcs v0.0.0-20170316035057-ecda9a501e82/go.mod h1:lgjkn3NuSvDfVJdfcVVdX+jpBxNmX4rDAzaS45IcYoM= diff --git a/scripts/subscibe-test/main.go b/scripts/subscibe-test/main.go index f88ad47..693c752 100644 --- a/scripts/subscibe-test/main.go +++ b/scripts/subscibe-test/main.go @@ -29,7 +29,7 @@ func main() { } func MainGeneric() { - txC := make(chan collector.TxIn) + txC := make(chan common.TxIn) log := common.GetLogger(true, false) nc := collector.NewNodeConnection(log, url, txC) nc.StartInBackground() @@ -39,7 +39,7 @@ func MainGeneric() { } func MainBlx() { - txC := make(chan collector.TxIn) + txC := make(chan common.TxIn) log := common.GetLogger(true, false) token, url := common.GetAuthTokenAndURL(os.Getenv("BLX_AUTH")) blxOpts := collector.BlxNodeOpts{ @@ -56,7 +56,7 @@ func MainBlx() { } func MainEden() { - txC := make(chan collector.TxIn) + txC := make(chan common.TxIn) log := common.GetLogger(true, false) token, url := common.GetAuthTokenAndURL(os.Getenv("EDEN_AUTH")) blxOpts := collector.EdenNodeOpts{ @@ -73,7 +73,7 @@ func MainEden() { } func MainChainbound() { - txC := make(chan collector.TxIn) + txC := make(chan common.TxIn) log := common.GetLogger(true, false) opts := collector.ChainboundNodeOpts{ //nolint:exhaustruct TxC: txC,