Skip to content

Commit

Permalink
update analyzer for safe-shutdown
Browse files Browse the repository at this point in the history
  • Loading branch information
cortze committed Oct 27, 2022
1 parent 6f01172 commit f4817f0
Show file tree
Hide file tree
Showing 10 changed files with 104 additions and 12 deletions.
10 changes: 10 additions & 0 deletions .env
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
STATE_ANALYZER_CMD="rewards"
STATE_ANALYZER_LOG_LEVEL="debug"
STATE_ANALYZER_BN_ENDPOINT="http://beacon_node_ip:api-port"
STATE_ANALYZER_OUTFOLDER="results"
STATE_ANALYZER_INIT_SLOT="10000"
STATE_ANALYZER_FINAL_SLOT="10050"
STATE_ANALYZER_VALIDATOR_INDEXES="test_validators.json"
STATE_ANALYZER_DB_URL="postgresql://db_user:db_password@db_ip:db_port/db_name"
STATE_ANALYZER_WORKERS_NUM="50"
STATE_ANALYZER_DB_WORKERS_NUM="4"
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,5 @@ eth2-state-analyzer
.vscode/**
test*.json
.ipynb_checkpoints/**
metrika/**
metrika/**
.config
33 changes: 33 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
#!make

GOCC=go
MKDIR_P=mkdir -p

BIN_PATH=./build
BIN="./build/eth2-state-analyzer"

include .env

.PHONY: check build install run clean

build:
$(GOCC) build -o $(BIN)

install:
$(GOCC) install

run:
$(BIN) $(STATE_ANALYZER_CMD) \
--log-level=${STATE_ANALYZER_LOG_LEVEL} \
--bn-endpoint=${STATE_ANALYZER_BN_ENDPOINT} \
--outfolder=${STATE_ANALYZER_OUTFOLDER} \
--init-slot=${STATE_ANALYZER_INIT_SLOT} \
--final-slot=${STATE_ANALYZER_FINAL_SLOT} \
--validator-indexes=${STATE_ANALYZER_VALIDATOR_INDEXES} \
--db-url=${STATE_ANALYZER_DB_URL} \
--workers-num=${STATE_ANALYZER_WORKERS_NUM} \
--db-workers-num=${STATE_ANALYZER_DB_WORKERS_NUM}

clean:
rm -r $(BIN_PATH)

26 changes: 25 additions & 1 deletion cmd/reward_cmd.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package cmd

import (
"os"
"os/signal"
"syscall"
"time"

"github.com/pkg/errors"
Expand Down Expand Up @@ -113,12 +116,33 @@ func LaunchRewardsCalculator(c *cli.Context) error {
if err != nil {
return err
}

// generate the state analyzer
stateAnalyzer, err := analyzer.NewStateAnalyzer(c.Context, cli, initSlot, finalSlot, validatorIndexes, dbUrl, coworkers, dbWorkers)
if err != nil {
return err
}

stateAnalyzer.Run()
procDoneC := make(chan struct{})
sigtermC := make(chan os.Signal)

signal.Notify(sigtermC, syscall.SIGINT, syscall.SIGTERM, os.Interrupt, syscall.SIGTERM)

go func() {
stateAnalyzer.Run()
procDoneC <- struct{}{}
}()

select {
case <-sigtermC:
logRewardsRewards.Info("Sudden shutdown detected, controlled shutdown of the cli triggered")
stateAnalyzer.Close()

case <-procDoneC:
logRewardsRewards.Info("Process successfully finish!")
}
close(sigtermC)
close(procDoneC)

return nil
}
1 change: 0 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ require (
github.com/jackc/pgproto3/v2 v2.3.0 // indirect
github.com/jackc/pgservicefile v0.0.0-20200714003250-2b9c44734f2b // indirect
github.com/jackc/pgtype v1.11.0 // indirect
github.com/jackc/pgx/v4 v4.16.1 // indirect
github.com/jackc/puddle v1.2.1 // indirect
github.com/klauspost/cpuid/v2 v2.0.11 // indirect
github.com/konsorten/go-windows-terminal-sequences v1.0.3 // indirect
Expand Down
28 changes: 24 additions & 4 deletions pkg/analyzer/analyzer.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ var (

type StateAnalyzer struct {
ctx context.Context
cancel context.CancelFunc
InitSlot uint64
FinalSlot uint64
ValidatorIndexes []uint64
Expand All @@ -42,11 +43,15 @@ type StateAnalyzer struct {
cli *clientapi.APIClient
dbClient *postgresql.PostgresDBService

// Control Variables
finishDownload bool
routineClosed chan struct{}

initTime time.Time
}

func NewStateAnalyzer(
ctx context.Context,
pCtx context.Context,
httpCli *clientapi.APIClient,
initSlot uint64,
finalSlot uint64,
Expand All @@ -55,6 +60,9 @@ func NewStateAnalyzer(
workerNum int,
dbWorkerNum int) (*StateAnalyzer, error) {
log.Infof("generating new State Analzyer from slots %d:%d, for validators %v", initSlot, finalSlot, valIdxs)
// gen new ctx from parent
ctx, cancel := context.WithCancel(pCtx)

// Check if the range of slots is valid
if !utils.IsValidRangeuint64(initSlot, finalSlot) {
return nil, errors.New("provided slot range isn't valid")
Expand Down Expand Up @@ -93,6 +101,7 @@ func NewStateAnalyzer(

return &StateAnalyzer{
ctx: ctx,
cancel: cancel,
InitSlot: initSlot,
FinalSlot: finalSlot,
ValidatorIndexes: valIdxs,
Expand All @@ -103,11 +112,12 @@ func NewStateAnalyzer(
cli: httpCli,
dbClient: i_dbClient,
validatorWorkerNum: workerNum,
routineClosed: make(chan struct{}),
}, nil
}

func (s *StateAnalyzer) Run() {

defer s.cancel()
// Get init time
s.initTime = time.Now()

Expand Down Expand Up @@ -155,17 +165,27 @@ func (s *StateAnalyzer) Run() {

wgProcess.Wait()
processFinishedFlag = true
close(s.EpochTaskChan)
log.Info("Beacon State Processing finished")

wgWorkers.Wait()
close(s.ValTaskChan)
log.Info("All validator workers finished")
s.dbClient.DoneTasks()
<-s.dbClient.FinishSignalChan

close(s.ValTaskChan)
totalTime += int64(time.Since(start).Seconds())
analysisDuration := time.Since(s.initTime).Seconds()
log.Info("State Analyzer finished in ", analysisDuration)
if s.finishDownload {
s.routineClosed <- struct{}{}
}
}

func (s *StateAnalyzer) Close() {
log.Info("Sudden closed detected, closing StateAnalyzer")
s.finishDownload = true
<-s.routineClosed
s.cancel()
}

//
Expand Down
5 changes: 5 additions & 0 deletions pkg/analyzer/download_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,11 @@ func (s *StateAnalyzer) runDownloadStates(wgDownload *sync.WaitGroup) {
return

default:
if s.finishDownload {
log.Info("sudden shutdown detected, state downloader routine")
close(s.EpochTaskChan)
return
}
ticker.Reset(minReqTime)
firstIteration := true
secondIteration := true
Expand Down
5 changes: 3 additions & 2 deletions pkg/analyzer/process_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,13 @@ import (
func (s *StateAnalyzer) runProcessState(wgProcess *sync.WaitGroup, downloadFinishedFlag *bool) {
defer wgProcess.Done()

var suddenShutDown bool = false
epochBatch := pgx.Batch{}
log.Info("Launching Beacon State Pre-Processer")
loop:
for {
// in case the downloads have finished, and there are no more tasks to execute
if *downloadFinishedFlag && len(s.EpochTaskChan) == 0 {
if (*downloadFinishedFlag && len(s.EpochTaskChan) == 0) || (suddenShutDown && len(s.EpochTaskChan) == 0) {
log.Warn("the task channel has been closed, finishing epoch routine")
if epochBatch.Len() == 0 {
log.Debugf("Sending last epoch batch to be stored...")
Expand All @@ -35,7 +36,7 @@ loop:
select {
case <-s.ctx.Done():
log.Info("context has died, closing state processer routine")
return
suddenShutDown = true

case task, ok := <-s.EpochTaskChan:

Expand Down
4 changes: 2 additions & 2 deletions pkg/analyzer/validatorWorker.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,11 +118,11 @@ loop:
wlog.Debugf("Validator group processed, worker freed for next group. Took %f seconds", time.Since(snapshot).Seconds())

case <-s.ctx.Done():
log.Info("context has died, closing state processer routine")
log.Info("context has died, closing state Worker routine")
return
default:
}

}
wlog.Infof("Validator worker finished, no mroe tasks to process")
wlog.Infof("Validator worker finished, no more tasks to process")
}
1 change: 0 additions & 1 deletion pkg/db/postgresql/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,6 @@ func (p *PostgresDBService) runWriters() {

case <-p.ctx.Done():
wlogWriter.Info("shutdown detected, closing persister")
break loop
default:
}

Expand Down

0 comments on commit f4817f0

Please sign in to comment.