From f4817f0baf8037c2ecc2d67c5331ac37ff131c48 Mon Sep 17 00:00:00 2001 From: cortze Date: Thu, 27 Oct 2022 11:01:27 +0200 Subject: [PATCH 1/2] update analyzer for safe-shutdown --- .env | 10 ++++++++++ .gitignore | 3 ++- Makefile | 33 +++++++++++++++++++++++++++++++++ cmd/reward_cmd.go | 26 +++++++++++++++++++++++++- go.mod | 1 - pkg/analyzer/analyzer.go | 28 ++++++++++++++++++++++++---- pkg/analyzer/download_state.go | 5 +++++ pkg/analyzer/process_state.go | 5 +++-- pkg/analyzer/validatorWorker.go | 4 ++-- pkg/db/postgresql/service.go | 1 - 10 files changed, 104 insertions(+), 12 deletions(-) create mode 100644 .env create mode 100644 Makefile diff --git a/.env b/.env new file mode 100644 index 00000000..bd9c31dc --- /dev/null +++ b/.env @@ -0,0 +1,10 @@ +STATE_ANALYZER_CMD="rewards" +STATE_ANALYZER_LOG_LEVEL="debug" +STATE_ANALYZER_BN_ENDPOINT="http://beacon_node_ip:api-port" +STATE_ANALYZER_OUTFOLDER="results" +STATE_ANALYZER_INIT_SLOT="10000" +STATE_ANALYZER_FINAL_SLOT="10050" +STATE_ANALYZER_VALIDATOR_INDEXES="test_validators.json" +STATE_ANALYZER_DB_URL="postgresql://db_user:db_password@db_ip:db_port/db_name" +STATE_ANALYZER_WORKERS_NUM="50" +STATE_ANALYZER_DB_WORKERS_NUM="4" \ No newline at end of file diff --git a/.gitignore b/.gitignore index 418d82da..36680fb6 100644 --- a/.gitignore +++ b/.gitignore @@ -3,4 +3,5 @@ eth2-state-analyzer .vscode/** test*.json .ipynb_checkpoints/** -metrika/** \ No newline at end of file +metrika/** +.config \ No newline at end of file diff --git a/Makefile b/Makefile new file mode 100644 index 00000000..206038a4 --- /dev/null +++ b/Makefile @@ -0,0 +1,33 @@ +#!make + +GOCC=go +MKDIR_P=mkdir -p + +BIN_PATH=./build +BIN="./build/eth2-state-analyzer" + +include .env + +.PHONY: check build install run clean + +build: + $(GOCC) build -o $(BIN) + +install: + $(GOCC) install + +run: + $(BIN) $(STATE_ANALYZER_CMD) \ + --log-level=${STATE_ANALYZER_LOG_LEVEL} \ + --bn-endpoint=${STATE_ANALYZER_BN_ENDPOINT} \ + --outfolder=${STATE_ANALYZER_OUTFOLDER} \ + --init-slot=${STATE_ANALYZER_INIT_SLOT} \ + --final-slot=${STATE_ANALYZER_FINAL_SLOT} \ + --validator-indexes=${STATE_ANALYZER_VALIDATOR_INDEXES} \ + --db-url=${STATE_ANALYZER_DB_URL} \ + --workers-num=${STATE_ANALYZER_WORKERS_NUM} \ + --db-workers-num=${STATE_ANALYZER_DB_WORKERS_NUM} + +clean: + rm -r $(BIN_PATH) + diff --git a/cmd/reward_cmd.go b/cmd/reward_cmd.go index fc12c27c..c25f48cb 100644 --- a/cmd/reward_cmd.go +++ b/cmd/reward_cmd.go @@ -1,6 +1,9 @@ package cmd import ( + "os" + "os/signal" + "syscall" "time" "github.com/pkg/errors" @@ -113,12 +116,33 @@ func LaunchRewardsCalculator(c *cli.Context) error { if err != nil { return err } + // generate the state analyzer stateAnalyzer, err := analyzer.NewStateAnalyzer(c.Context, cli, initSlot, finalSlot, validatorIndexes, dbUrl, coworkers, dbWorkers) if err != nil { return err } - stateAnalyzer.Run() + procDoneC := make(chan struct{}) + sigtermC := make(chan os.Signal) + + signal.Notify(sigtermC, syscall.SIGINT, syscall.SIGTERM, os.Interrupt, syscall.SIGTERM) + + go func() { + stateAnalyzer.Run() + procDoneC <- struct{}{} + }() + + select { + case <-sigtermC: + logRewardsRewards.Info("Sudden shutdown detected, controlled shutdown of the cli triggered") + stateAnalyzer.Close() + + case <-procDoneC: + logRewardsRewards.Info("Process successfully finish!") + } + close(sigtermC) + close(procDoneC) + return nil } diff --git a/go.mod b/go.mod index 4f350496..5fe9cf31 100644 --- a/go.mod +++ b/go.mod @@ -23,7 +23,6 @@ require ( github.com/jackc/pgproto3/v2 v2.3.0 // indirect github.com/jackc/pgservicefile v0.0.0-20200714003250-2b9c44734f2b // indirect github.com/jackc/pgtype v1.11.0 // indirect - github.com/jackc/pgx/v4 v4.16.1 // indirect github.com/jackc/puddle v1.2.1 // indirect github.com/klauspost/cpuid/v2 v2.0.11 // indirect github.com/konsorten/go-windows-terminal-sequences v1.0.3 // indirect diff --git a/pkg/analyzer/analyzer.go b/pkg/analyzer/analyzer.go index fb4571ef..1be7a324 100644 --- a/pkg/analyzer/analyzer.go +++ b/pkg/analyzer/analyzer.go @@ -30,6 +30,7 @@ var ( type StateAnalyzer struct { ctx context.Context + cancel context.CancelFunc InitSlot uint64 FinalSlot uint64 ValidatorIndexes []uint64 @@ -42,11 +43,15 @@ type StateAnalyzer struct { cli *clientapi.APIClient dbClient *postgresql.PostgresDBService + // Control Variables + finishDownload bool + routineClosed chan struct{} + initTime time.Time } func NewStateAnalyzer( - ctx context.Context, + pCtx context.Context, httpCli *clientapi.APIClient, initSlot uint64, finalSlot uint64, @@ -55,6 +60,9 @@ func NewStateAnalyzer( workerNum int, dbWorkerNum int) (*StateAnalyzer, error) { log.Infof("generating new State Analzyer from slots %d:%d, for validators %v", initSlot, finalSlot, valIdxs) + // gen new ctx from parent + ctx, cancel := context.WithCancel(pCtx) + // Check if the range of slots is valid if !utils.IsValidRangeuint64(initSlot, finalSlot) { return nil, errors.New("provided slot range isn't valid") @@ -93,6 +101,7 @@ func NewStateAnalyzer( return &StateAnalyzer{ ctx: ctx, + cancel: cancel, InitSlot: initSlot, FinalSlot: finalSlot, ValidatorIndexes: valIdxs, @@ -103,11 +112,12 @@ func NewStateAnalyzer( cli: httpCli, dbClient: i_dbClient, validatorWorkerNum: workerNum, + routineClosed: make(chan struct{}), }, nil } func (s *StateAnalyzer) Run() { - + defer s.cancel() // Get init time s.initTime = time.Now() @@ -155,17 +165,27 @@ func (s *StateAnalyzer) Run() { wgProcess.Wait() processFinishedFlag = true - close(s.EpochTaskChan) log.Info("Beacon State Processing finished") wgWorkers.Wait() - close(s.ValTaskChan) log.Info("All validator workers finished") s.dbClient.DoneTasks() <-s.dbClient.FinishSignalChan + + close(s.ValTaskChan) totalTime += int64(time.Since(start).Seconds()) analysisDuration := time.Since(s.initTime).Seconds() log.Info("State Analyzer finished in ", analysisDuration) + if s.finishDownload { + s.routineClosed <- struct{}{} + } +} + +func (s *StateAnalyzer) Close() { + log.Info("Sudden closed detected, closing StateAnalyzer") + s.finishDownload = true + <-s.routineClosed + s.cancel() } // diff --git a/pkg/analyzer/download_state.go b/pkg/analyzer/download_state.go index 991fe828..87dfefab 100644 --- a/pkg/analyzer/download_state.go +++ b/pkg/analyzer/download_state.go @@ -27,6 +27,11 @@ func (s *StateAnalyzer) runDownloadStates(wgDownload *sync.WaitGroup) { return default: + if s.finishDownload { + log.Info("sudden shutdown detected, state downloader routine") + close(s.EpochTaskChan) + return + } ticker.Reset(minReqTime) firstIteration := true secondIteration := true diff --git a/pkg/analyzer/process_state.go b/pkg/analyzer/process_state.go index 5d749014..89220463 100644 --- a/pkg/analyzer/process_state.go +++ b/pkg/analyzer/process_state.go @@ -16,12 +16,13 @@ import ( func (s *StateAnalyzer) runProcessState(wgProcess *sync.WaitGroup, downloadFinishedFlag *bool) { defer wgProcess.Done() + var suddenShutDown bool = false epochBatch := pgx.Batch{} log.Info("Launching Beacon State Pre-Processer") loop: for { // in case the downloads have finished, and there are no more tasks to execute - if *downloadFinishedFlag && len(s.EpochTaskChan) == 0 { + if (*downloadFinishedFlag && len(s.EpochTaskChan) == 0) || (suddenShutDown && len(s.EpochTaskChan) == 0) { log.Warn("the task channel has been closed, finishing epoch routine") if epochBatch.Len() == 0 { log.Debugf("Sending last epoch batch to be stored...") @@ -35,7 +36,7 @@ loop: select { case <-s.ctx.Done(): log.Info("context has died, closing state processer routine") - return + suddenShutDown = true case task, ok := <-s.EpochTaskChan: diff --git a/pkg/analyzer/validatorWorker.go b/pkg/analyzer/validatorWorker.go index a21233d7..f03bda6a 100644 --- a/pkg/analyzer/validatorWorker.go +++ b/pkg/analyzer/validatorWorker.go @@ -118,11 +118,11 @@ loop: wlog.Debugf("Validator group processed, worker freed for next group. Took %f seconds", time.Since(snapshot).Seconds()) case <-s.ctx.Done(): - log.Info("context has died, closing state processer routine") + log.Info("context has died, closing state Worker routine") return default: } } - wlog.Infof("Validator worker finished, no mroe tasks to process") + wlog.Infof("Validator worker finished, no more tasks to process") } diff --git a/pkg/db/postgresql/service.go b/pkg/db/postgresql/service.go index 10bff660..90f5a481 100644 --- a/pkg/db/postgresql/service.go +++ b/pkg/db/postgresql/service.go @@ -133,7 +133,6 @@ func (p *PostgresDBService) runWriters() { case <-p.ctx.Done(): wlogWriter.Info("shutdown detected, closing persister") - break loop default: } From de370a82da21122f6e1b0dda6c9891c8f8f33be7 Mon Sep 17 00:00:00 2001 From: cortze Date: Thu, 27 Oct 2022 11:11:12 +0200 Subject: [PATCH 2/2] clean code + typos --- pkg/analyzer/process_state.go | 5 ++--- pkg/analyzer/validatorWorker.go | 2 +- pkg/db/postgresql/service.go | 1 + 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/pkg/analyzer/process_state.go b/pkg/analyzer/process_state.go index 89220463..5d749014 100644 --- a/pkg/analyzer/process_state.go +++ b/pkg/analyzer/process_state.go @@ -16,13 +16,12 @@ import ( func (s *StateAnalyzer) runProcessState(wgProcess *sync.WaitGroup, downloadFinishedFlag *bool) { defer wgProcess.Done() - var suddenShutDown bool = false epochBatch := pgx.Batch{} log.Info("Launching Beacon State Pre-Processer") loop: for { // in case the downloads have finished, and there are no more tasks to execute - if (*downloadFinishedFlag && len(s.EpochTaskChan) == 0) || (suddenShutDown && len(s.EpochTaskChan) == 0) { + if *downloadFinishedFlag && len(s.EpochTaskChan) == 0 { log.Warn("the task channel has been closed, finishing epoch routine") if epochBatch.Len() == 0 { log.Debugf("Sending last epoch batch to be stored...") @@ -36,7 +35,7 @@ loop: select { case <-s.ctx.Done(): log.Info("context has died, closing state processer routine") - suddenShutDown = true + return case task, ok := <-s.EpochTaskChan: diff --git a/pkg/analyzer/validatorWorker.go b/pkg/analyzer/validatorWorker.go index f03bda6a..76fc8eb7 100644 --- a/pkg/analyzer/validatorWorker.go +++ b/pkg/analyzer/validatorWorker.go @@ -118,7 +118,7 @@ loop: wlog.Debugf("Validator group processed, worker freed for next group. Took %f seconds", time.Since(snapshot).Seconds()) case <-s.ctx.Done(): - log.Info("context has died, closing state Worker routine") + log.Info("context has died, closing state worker routine") return default: } diff --git a/pkg/db/postgresql/service.go b/pkg/db/postgresql/service.go index 90f5a481..10bff660 100644 --- a/pkg/db/postgresql/service.go +++ b/pkg/db/postgresql/service.go @@ -133,6 +133,7 @@ func (p *PostgresDBService) runWriters() { case <-p.ctx.Done(): wlogWriter.Info("shutdown detected, closing persister") + break loop default: }