Skip to content

Commit

Permalink
API + SSE subscription
Browse files Browse the repository at this point in the history
start api + sse

send txs

move TxIn to common, use API as receiver

cleanup
  • Loading branch information
metachris committed Jun 24, 2024
1 parent 6167658 commit 5567b87
Show file tree
Hide file tree
Showing 19 changed files with 448 additions and 72 deletions.
52 changes: 52 additions & 0 deletions api/handler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package api

import (
"fmt"
"net/http"

"github.com/google/uuid"
)

type SSESubscription struct {
uid string
txC chan string
}

func (s *Server) handleTxSSE(w http.ResponseWriter, r *http.Request) {
// SSE server for transactions
s.log.Info("SSE connection opened for transactions")

// Set CORS headers to allow all origins. You may want to restrict this to specific origins in a production environment.
w.Header().Set("Access-Control-Allow-Origin", "*")
w.Header().Set("Access-Control-Expose-Headers", "Content-Type")

w.Header().Set("Content-Type", "text/event-stream")
w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("Connection", "keep-alive")

subscriber := SSESubscription{
uid: uuid.New().String(),
txC: make(chan string, 100),
}
s.addSubscriber(&subscriber)

// pingTicker := time.NewTicker(5 * time.Second)

// Wait for txs or end of request...
for {
select {
case <-r.Context().Done():
s.log.Info("SSE closed, removing subscriber")
s.removeSubscriber(&subscriber)
return

case tx := <-subscriber.txC:
fmt.Fprintf(w, "data: %s\n\n", tx)
w.(http.Flusher).Flush() //nolint:forcetypeassert

// case <-pingTicker.C:
// fmt.Fprintf(w, ": ping\n\n")
// w.(http.Flusher).Flush() //nolint:forcetypeassert
}
}
}
30 changes: 30 additions & 0 deletions api/handler_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package api

// func getTestLogger() *zap.SugaredLogger {
// return common.GetLogger(true, false)
// }

// func Test_Handlers_Healthcheck_Drain_Undrain(t *testing.T) {
// const (
// latency = 200 * time.Millisecond
// listenAddr = ":8080"
// )

// //nolint: exhaustruct
// // s := New(&HTTPServerConfig{
// // DrainDuration: latency,
// // ListenAddr: listenAddr,
// // Log: getTestLogger(),
// // })

// // { // Check health
// // req := httptest.NewRequest(http.MethodGet, "http://localhost"+listenAddr+"/readyz", nil) //nolint:goconst,nolintlint
// // w := httptest.NewRecorder()
// // s.handleReadinessCheck(w, req)
// // resp := w.Result()
// // defer resp.Body.Close()
// // _, err := io.ReadAll(resp.Body)
// // require.NoError(t, err)
// // require.Equal(t, http.StatusOK, resp.StatusCode, "Healthcheck must return `Ok` before draining")
// // }
// }
125 changes: 125 additions & 0 deletions api/server.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
// Package api contains the webserver for API and SSE subscription
package api

import (
"context"
"errors"
"net/http"
"sync"
"time"

"github.com/flashbots/go-utils/httplogger"
"github.com/flashbots/mempool-dumpster/common"
"github.com/go-chi/chi/middleware"
"github.com/go-chi/chi/v5"
"go.uber.org/atomic"
"go.uber.org/zap"
)

type HTTPServerConfig struct {
ListenAddr string
Log *zap.SugaredLogger

EnablePprof bool // https://go-chi.io/#/pages/middleware?id=profiler

DrainDuration time.Duration
GracefulShutdownDuration time.Duration
ReadTimeout time.Duration
WriteTimeout time.Duration
}

type Server struct {
cfg *HTTPServerConfig
isReady atomic.Bool
log *zap.SugaredLogger

srv *http.Server
sseConnectionMap map[string]*SSESubscription
sseConnectionLock sync.RWMutex
}

func New(cfg *HTTPServerConfig) (srv *Server) {
srv = &Server{
cfg: cfg,
log: cfg.Log,
srv: nil,
sseConnectionMap: make(map[string]*SSESubscription),
}
srv.isReady.Swap(true)

mux := chi.NewRouter()

mux.With(srv.httpLogger).Get("/sse/transactions", srv.handleTxSSE)
// mux.With(srv.httpLogger).Get("/sse/dummyTx", srv.handleDummyTx)

if cfg.EnablePprof {
srv.log.Info("pprof API enabled")
mux.Mount("/debug", middleware.Profiler())
}

srv.srv = &http.Server{
Addr: cfg.ListenAddr,
Handler: mux,
ReadTimeout: cfg.ReadTimeout,
WriteTimeout: cfg.WriteTimeout,
}

return srv
}

func (s *Server) httpLogger(next http.Handler) http.Handler {
return httplogger.LoggingMiddlewareZap(s.log.Desugar(), next)
}

func (s *Server) RunInBackground() {
// api
go func() {
s.log.With("listenAddress", s.cfg.ListenAddr).Info("Starting HTTP server")
if err := s.srv.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) {
s.log.With("err", err).Error("HTTP server failed")
}
}()
}

func (s *Server) Shutdown() {
// api
ctx, cancel := context.WithTimeout(context.Background(), s.cfg.GracefulShutdownDuration)
defer cancel()
if err := s.srv.Shutdown(ctx); err != nil {
s.log.With("err", err).Error("Graceful HTTP server shutdown failed")
} else {
s.log.Info("HTTP server gracefully stopped")
}
}

func (s *Server) addSubscriber(sub *SSESubscription) {
s.sseConnectionLock.Lock()
defer s.sseConnectionLock.Unlock()
s.sseConnectionMap[sub.uid] = sub
}

func (s *Server) removeSubscriber(sub *SSESubscription) {
s.sseConnectionLock.Lock()
defer s.sseConnectionLock.Unlock()
delete(s.sseConnectionMap, sub.uid)
s.log.With("subscribers", len(s.sseConnectionMap)).Info("removed subscriber")
}

func (s *Server) SendTx(ctx context.Context, tx *common.TxIn) error {
s.sseConnectionLock.RLock()
defer s.sseConnectionLock.RUnlock()
if len(s.sseConnectionMap) == 0 {
return nil
}

txRLP, err := common.TxToRLPString(tx.Tx)
if err != nil {
return err
}

for _, sub := range s.sseConnectionMap {
sub.txC <- txRLP
}

return nil
}
10 changes: 10 additions & 0 deletions cmd/collect/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,14 @@ var (
Usage: "sources of txs to send to receivers",
Category: "Tx Receivers Configuration",
},

// SSE tx subscription
&cli.StringFlag{
Name: "api-listen-addr",
EnvVars: []string{"API_ADDR"},
Usage: "API listen address (host:port)",
Category: "Tx Receivers Configuration",
},
}
)

Expand Down Expand Up @@ -112,6 +120,7 @@ func runCollector(cCtx *cli.Context) error {
chainboundAuth = cCtx.StringSlice("chainbound")
receivers = cCtx.StringSlice("tx-receivers")
receiversAllowedSources = cCtx.StringSlice("tx-receivers-allowed-sources")
apiListenAddr = cCtx.String("api-listen-addr")
)

// Logger setup
Expand Down Expand Up @@ -145,6 +154,7 @@ func runCollector(cCtx *cli.Context) error {
ChainboundAuth: chainboundAuth,
Receivers: receivers,
ReceiversAllowedSources: receiversAllowedSources,
APIListenAddr: apiListenAddr,
}

collector.Start(&opts)
Expand Down
101 changes: 101 additions & 0 deletions cmd/dev/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
package main

import (
"context"
"log"
"os"
"os/signal"
"syscall"
"time"

"github.com/flashbots/mempool-dumpster/api"
"github.com/flashbots/mempool-dumpster/common"
"github.com/urfave/cli/v2"
)

var (
version = "dev" // is set during build process

cliFlags = []cli.Flag{
&cli.BoolFlag{
Name: "debug",
EnvVars: []string{"DEBUG"},
Usage: "enable debug logging",
Category: "Collector Configuration",
},

&cli.StringFlag{
Name: "api-listen-addr",
EnvVars: []string{"API_ADDR"},
Value: "localhost:8060",
Usage: "API listen address (host:port)",
Category: "Tx Receivers Configuration",
},
}
)

func main() {
app := &cli.App{
Name: "dev",
Usage: "dev stuff",
Version: version,
Flags: cliFlags,
Action: runDev,
}

if err := app.Run(os.Args); err != nil {
log.Fatal(err)
}
}

func runDev(cCtx *cli.Context) error {
var (
debug = cCtx.Bool("debug")
apiListenAddr = cCtx.String("api-listen-addr")
)

// Logger setup
log := common.GetLogger(debug, false)
defer func() { _ = log.Sync() }()

if apiListenAddr == "" {
log.Fatal("api-listen-addr is required")
}

server := api.New(&api.HTTPServerConfig{
Log: log,
ListenAddr: apiListenAddr,
})
server.RunInBackground()

// Send dummy txs all X seconds
txRLP := "0x02f873018305643b840f2c19f08503f8bfbbb2832ab980940ed1bcc400acd34593451e76f854992198995f52808498e5b12ac080a051eb99ae13fd1ace55dd93a4b36eefa5d34e115cd7b9fd5d0ffac07300cbaeb2a0782d9ad12490b45af932d8c98cb3c2fd8c02cdd6317edb36bde2df7556fa9132"
_, tx, err := common.ParseTx(int64(1693785600337), txRLP)
if err != nil {
return err
}
ctx := context.TODO()

go func() {
for {
time.Sleep(2 * time.Second)
tx := common.TxIn{
T: time.Now().UTC(),
Tx: tx,
Source: "dummy",
}

err = server.SendTx(ctx, &tx)
if err != nil {
log.Errorw("failed to send tx", "err", err)
}
}
}()

// Wwait for termination signal
exit := make(chan os.Signal, 1)
signal.Notify(exit, os.Interrupt, syscall.SIGTERM)
<-exit
log.Info("bye")
return nil
}
19 changes: 19 additions & 0 deletions collector/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
package collector

import (
"github.com/flashbots/mempool-dumpster/api"
"github.com/flashbots/mempool-dumpster/common"
"go.uber.org/zap"
)
Expand All @@ -19,10 +20,22 @@ type CollectorOpts struct {

Receivers []string
ReceiversAllowedSources []string

APIListenAddr string
}

// Start kicks off all the service components in the background
func Start(opts *CollectorOpts) {
// Start API first
var apiServer *api.Server
if opts.APIListenAddr != "" {
apiServer = api.New(&api.HTTPServerConfig{
Log: opts.Log,
ListenAddr: opts.APIListenAddr,
})
go apiServer.RunInBackground()
}

processor := NewTxProcessor(TxProcessorOpts{
Log: opts.Log,
UID: opts.UID,
Expand All @@ -31,6 +44,12 @@ func Start(opts *CollectorOpts) {
HTTPReceivers: opts.Receivers,
ReceiversAllowedSources: opts.ReceiversAllowedSources,
})

// If API server is running, add it as a TX receiver
if apiServer != nil {
processor.receivers = append(processor.receivers, apiServer)
}

go processor.Start()

// Regular nodes
Expand Down
Loading

0 comments on commit 5567b87

Please sign in to comment.