diff --git a/.env b/.env new file mode 100644 index 00000000..bd9c31dc --- /dev/null +++ b/.env @@ -0,0 +1,10 @@ +STATE_ANALYZER_CMD="rewards" +STATE_ANALYZER_LOG_LEVEL="debug" +STATE_ANALYZER_BN_ENDPOINT="http://beacon_node_ip:api-port" +STATE_ANALYZER_OUTFOLDER="results" +STATE_ANALYZER_INIT_SLOT="10000" +STATE_ANALYZER_FINAL_SLOT="10050" +STATE_ANALYZER_VALIDATOR_INDEXES="test_validators.json" +STATE_ANALYZER_DB_URL="postgresql://db_user:db_password@db_ip:db_port/db_name" +STATE_ANALYZER_WORKERS_NUM="50" +STATE_ANALYZER_DB_WORKERS_NUM="4" \ No newline at end of file diff --git a/.gitignore b/.gitignore index 418d82da..36680fb6 100644 --- a/.gitignore +++ b/.gitignore @@ -3,4 +3,5 @@ eth2-state-analyzer .vscode/** test*.json .ipynb_checkpoints/** -metrika/** \ No newline at end of file +metrika/** +.config \ No newline at end of file diff --git a/Makefile b/Makefile new file mode 100644 index 00000000..206038a4 --- /dev/null +++ b/Makefile @@ -0,0 +1,33 @@ +#!make + +GOCC=go +MKDIR_P=mkdir -p + +BIN_PATH=./build +BIN="./build/eth2-state-analyzer" + +include .env + +.PHONY: check build install run clean + +build: + $(GOCC) build -o $(BIN) + +install: + $(GOCC) install + +run: + $(BIN) $(STATE_ANALYZER_CMD) \ + --log-level=${STATE_ANALYZER_LOG_LEVEL} \ + --bn-endpoint=${STATE_ANALYZER_BN_ENDPOINT} \ + --outfolder=${STATE_ANALYZER_OUTFOLDER} \ + --init-slot=${STATE_ANALYZER_INIT_SLOT} \ + --final-slot=${STATE_ANALYZER_FINAL_SLOT} \ + --validator-indexes=${STATE_ANALYZER_VALIDATOR_INDEXES} \ + --db-url=${STATE_ANALYZER_DB_URL} \ + --workers-num=${STATE_ANALYZER_WORKERS_NUM} \ + --db-workers-num=${STATE_ANALYZER_DB_WORKERS_NUM} + +clean: + rm -r $(BIN_PATH) + diff --git a/cmd/reward_cmd.go b/cmd/reward_cmd.go index fc12c27c..c25f48cb 100644 --- a/cmd/reward_cmd.go +++ b/cmd/reward_cmd.go @@ -1,6 +1,9 @@ package cmd import ( + "os" + "os/signal" + "syscall" "time" "github.com/pkg/errors" @@ -113,12 +116,33 @@ func LaunchRewardsCalculator(c *cli.Context) error { if err != nil { return err } + // generate the state analyzer stateAnalyzer, err := analyzer.NewStateAnalyzer(c.Context, cli, initSlot, finalSlot, validatorIndexes, dbUrl, coworkers, dbWorkers) if err != nil { return err } - stateAnalyzer.Run() + procDoneC := make(chan struct{}) + sigtermC := make(chan os.Signal) + + signal.Notify(sigtermC, syscall.SIGINT, syscall.SIGTERM, os.Interrupt, syscall.SIGTERM) + + go func() { + stateAnalyzer.Run() + procDoneC <- struct{}{} + }() + + select { + case <-sigtermC: + logRewardsRewards.Info("Sudden shutdown detected, controlled shutdown of the cli triggered") + stateAnalyzer.Close() + + case <-procDoneC: + logRewardsRewards.Info("Process successfully finish!") + } + close(sigtermC) + close(procDoneC) + return nil } diff --git a/go.mod b/go.mod index 4f350496..5fe9cf31 100644 --- a/go.mod +++ b/go.mod @@ -23,7 +23,6 @@ require ( github.com/jackc/pgproto3/v2 v2.3.0 // indirect github.com/jackc/pgservicefile v0.0.0-20200714003250-2b9c44734f2b // indirect github.com/jackc/pgtype v1.11.0 // indirect - github.com/jackc/pgx/v4 v4.16.1 // indirect github.com/jackc/puddle v1.2.1 // indirect github.com/klauspost/cpuid/v2 v2.0.11 // indirect github.com/konsorten/go-windows-terminal-sequences v1.0.3 // indirect diff --git a/pkg/analyzer/analyzer.go b/pkg/analyzer/analyzer.go index fb4571ef..1be7a324 100644 --- a/pkg/analyzer/analyzer.go +++ b/pkg/analyzer/analyzer.go @@ -30,6 +30,7 @@ var ( type StateAnalyzer struct { ctx context.Context + cancel context.CancelFunc InitSlot uint64 FinalSlot uint64 ValidatorIndexes []uint64 @@ -42,11 +43,15 @@ type StateAnalyzer struct { cli *clientapi.APIClient dbClient *postgresql.PostgresDBService + // Control Variables + finishDownload bool + routineClosed chan struct{} + initTime time.Time } func NewStateAnalyzer( - ctx context.Context, + pCtx context.Context, httpCli *clientapi.APIClient, initSlot uint64, finalSlot uint64, @@ -55,6 +60,9 @@ func NewStateAnalyzer( workerNum int, dbWorkerNum int) (*StateAnalyzer, error) { log.Infof("generating new State Analzyer from slots %d:%d, for validators %v", initSlot, finalSlot, valIdxs) + // gen new ctx from parent + ctx, cancel := context.WithCancel(pCtx) + // Check if the range of slots is valid if !utils.IsValidRangeuint64(initSlot, finalSlot) { return nil, errors.New("provided slot range isn't valid") @@ -93,6 +101,7 @@ func NewStateAnalyzer( return &StateAnalyzer{ ctx: ctx, + cancel: cancel, InitSlot: initSlot, FinalSlot: finalSlot, ValidatorIndexes: valIdxs, @@ -103,11 +112,12 @@ func NewStateAnalyzer( cli: httpCli, dbClient: i_dbClient, validatorWorkerNum: workerNum, + routineClosed: make(chan struct{}), }, nil } func (s *StateAnalyzer) Run() { - + defer s.cancel() // Get init time s.initTime = time.Now() @@ -155,17 +165,27 @@ func (s *StateAnalyzer) Run() { wgProcess.Wait() processFinishedFlag = true - close(s.EpochTaskChan) log.Info("Beacon State Processing finished") wgWorkers.Wait() - close(s.ValTaskChan) log.Info("All validator workers finished") s.dbClient.DoneTasks() <-s.dbClient.FinishSignalChan + + close(s.ValTaskChan) totalTime += int64(time.Since(start).Seconds()) analysisDuration := time.Since(s.initTime).Seconds() log.Info("State Analyzer finished in ", analysisDuration) + if s.finishDownload { + s.routineClosed <- struct{}{} + } +} + +func (s *StateAnalyzer) Close() { + log.Info("Sudden closed detected, closing StateAnalyzer") + s.finishDownload = true + <-s.routineClosed + s.cancel() } // diff --git a/pkg/analyzer/download_state.go b/pkg/analyzer/download_state.go index 991fe828..87dfefab 100644 --- a/pkg/analyzer/download_state.go +++ b/pkg/analyzer/download_state.go @@ -27,6 +27,11 @@ func (s *StateAnalyzer) runDownloadStates(wgDownload *sync.WaitGroup) { return default: + if s.finishDownload { + log.Info("sudden shutdown detected, state downloader routine") + close(s.EpochTaskChan) + return + } ticker.Reset(minReqTime) firstIteration := true secondIteration := true diff --git a/pkg/analyzer/validatorWorker.go b/pkg/analyzer/validatorWorker.go index a21233d7..76fc8eb7 100644 --- a/pkg/analyzer/validatorWorker.go +++ b/pkg/analyzer/validatorWorker.go @@ -118,11 +118,11 @@ loop: wlog.Debugf("Validator group processed, worker freed for next group. Took %f seconds", time.Since(snapshot).Seconds()) case <-s.ctx.Done(): - log.Info("context has died, closing state processer routine") + log.Info("context has died, closing state worker routine") return default: } } - wlog.Infof("Validator worker finished, no mroe tasks to process") + wlog.Infof("Validator worker finished, no more tasks to process") }