From 6d6ca1e14a1311a873c6ec5d34e3b74285e4df22 Mon Sep 17 00:00:00 2001 From: lklimek <842586+lklimek@users.noreply.github.com> Date: Wed, 23 Mar 2022 14:18:06 +0100 Subject: [PATCH] chore(consensus): stabilize consensus algorithm (#284) * fix: fix the most of the compile errors * chore: update import section * fix: regenerate remote_client.go mock, some fixes by feedback * fix: remove unused global variables * fix: some fixes for 0.35 backport * fix: some fixes for 0.35 backport * fix(evidence): fix all failed tests in evidence package * fix(store): fix unit tests for internal/store package * fix(blocksync): fix unit tests for internal/blocksync package * fix(p2p): remove redundant sleep from TestMConnTransport_Listen * fix(blocksync): fic blocksync/v2 test * fix(rpc): allow reading "request_quorum_info" param for rpc "validators" handler * fix(state): fix internal/state package * fix(state): fix internal/statesync package * fix(test): fix internal/test/factory package * fix(privval): some fixes for unit tests * fix(privval): fix tests TestPrivvalVectors, TestStateSyncVectors * fix(consensus): fix consensus logic and tests * fix: remove generating AppHash from genesis * fix: remove debug printing * fix(mempool): fix TestSerialReap * fix(rpc): fix TestBroadcastEvidence_DuplicateVoteEvidence * fix: cycle dependency in "types" package * fix(types): modify and remove some test in types package * refactor: remove unnecessary type conversion, update comment docs * fix: light client package * refactor: improve validator-set generator * fix: some modification to ber able to run unit tests * fix: increase MaxHeaderBytes till 646 bytes * fix: TestTxFilter unit test at tx_filter_test.go * fix: TestGCFifo, TestGCRandom, reduce a size of elements to 100 * fix: e2e generator, remove UnmixedP2P and Legacy test cases * refactor: remove a few unused variables and functions, clean up the code * fix: modify building of a docker image for e2e, update dashcore.toml * refactor: modification after PR feedback * fix: some modifications to fix e2e test * wip * fix: revert Client.getLightBlock fom upstream to be able to handle "deadline exceeded" properly * chore: clearing code from temporary debug logs * refactor: reduce timeout for requesting node status from e2e runner * fix: update LastStateID and LastCoreChainLockedBlockHeight during state synchronization * fix: add a task for compiling tests to main tasks in e2e Makefile * fix: modify dashcore.toml * fix: modify a light client to use default height as 0 * fix: update code style * chore: revert logging to upstream version * refactor: increased timeout from 20 sec to 30 sec due to sometimes node needs a bit more time to sync * refactor: revert Client.compareNewHeaderWithWitness to upstream version * fix: remove redeclared imported package * fix: RPCStateProvider uses "truest-height" as initial height for downloading * refactor: sync dashcore.toml with ci.toml * fix(node): change TestMaxProposalBlockSize test to support tenderdash values and behaviour * fix(ci): modify coverage.yml * fix: modify app/app/test.sh after backport v0.35 * fix: update coverage.yml git workflow * fix: make changes to fix lint issues using the golangci-lint report form * fix: make changes to fix lint issues using the golangci-lint report form * wip * Fix compiling of tenderdash for arm architecture (#257) * fix: remove bls-signatures from deps of build-linux task * fix: add "post setup go" task to replace installed go version in /bin directory * fix: add sudo to install tenderdash * chore(backport): revert changes * chore: replace default go version in /bin on go 1.17 * fix: use a copy of "Precommit" fo fix a race condition * chore: remove condition at "post-setup go 1.17" * chore: revert to previous version * chore: reduce timeout to 20 sec for waiting for nodes * chore: increased wait-for-height timeout to 1 min * chore: update dashcore.toml * chore: remove duplicated debug log entry * chore: modify of counting validators for e2e testnet * chore: fix minor lint issue * chore: modyfy rotate.toml * chore: update e2e.yml CI settings * chore: change log level for e2e * refactor: change an approach of updating a node keys and proTxHash (for validator) * chore: update dashcore.toml and rotate.toml * refactor: update preparing testnet settings for e2e * refactor: declare Validator interface, add implementation of this interface for PubKey, refactor public key validation in validateValidatorUpdates function * chore: fix a couple of lint issues and remove redundant debug printing * chore: add missed code to control the proof block creation algorithm. * refactor: change a type of node proTxHash from pointer * feat: define NodeInfoRepository and implement it in PeerManager to be able to store NodeInfo * fix: p2p/switch_test.go * chore: change names of field and options function related to node-info repository * fix: decode peer-state into json byte slice manually to prevent data race * fix: extend MockPV synchronization for several exportable methods * chore: modify after merge * chore: update timeouts and entrypoint for running e2e tests * fix: modify and fix e2e implementation * fix: some stabilization changes for light client implementation * refactor: implement a validation of public key as a part of a key itself, that is used in block execution process * fix: some modifications for statesync * fix: update NodeInfo.ProTxHash for validator node during node initialization * refactor: add "ProTxHashValidate" function * refactor: add proTxHash field into PeerInfo protobuf * refactor: add proTxHash in peerInfo * refactor: move the code that routerDashDialer is not responsible into PeerManager * refactor: define DashDialerOptions struct to be able to provide NodeAddress and ProTxHash, change a signature of the method ConnectAsync in DashDialer * chore: update doc blocks for DashDialer(s) * refactor: support ProTxHash in p2ptest * refactor: improve HexByte.ShortString, if the length is not enough then return an empty string * refactor: revert DashDialer and ValidatorConnExecutor to initial version * feat: retrieve proTxHash from NodeInfo struct after handshake, add proTxHash to PeerUpdate and PeerState * fix: check peer-info for existence before to retrieve proTxHash for persisting * refactor: remove all references to NodeInfoRepository * chore: remove a couple of redundant methods after the merge * chore: fix typo * chore: temporary disable tcp NodeID resolver as it breaks running ValidatorConnExecutor component * chore: disable updating-connections for genesis/init-chain validators * fix(consensus): Decrease memory used by debug logs * refactor: enhance performance of getLogFields func * chore(consensus): further optimization of logging mem usage * fix: modify after merge * fix(dash/quorum): tendermint stops when remote validator node id lookup fails * fix(p2p): remove noisy logs from p2p package In DEBUG log level, we did create tons of P2P logs that severely affects performance and disk usage. * fix: peer proTxHash is empty in PeerState * chore: remove redundant log file * fix: vote channel size too small for commit messages * fix: deadlock in PeerState * fix(consensus): fix race condition in processPeerUpdate * fix(dash/qorum): don't try to connect to myself * fix(p2p): proTxHash not updated for incoming connections * build(e2e): enable deadlock detection in e2e tests * refactor(consensus): improve logging * test(e2e): enable debug logs in dashcore * chore: try to fix commit exchange (WIP) * fix(types): support zero ValidatorAddress * test(p2p): fix p2p network tests * fix(consensus): fix race conditions and deadlocks * feat(sync): add lock timeout to deadlock detection library * fix(reactor): panic fixed * chore(sync): mutex debugging helper * chore(types): Improve logging of validator sets * chore(sync): set lock timeout to 30s * refactor(dashcore): use Logger in dash core rpc client * refactor(logging): log label "peer" instead of "peer_id" * chore(statesync): logging improvements * refactor(statesync): publish a commit event after successful sync operation * fix(consensus): deep copy proTxHash in peer events * fix(consensus): nodeID and proTxHash mismatch in consensus tests * fix(sync): revert deadlock logic changes * refactor(consensus): cleanup reactor gossip logic * fix: detected and fix data race of reading validator's proTxHash for logging * fix(consensus): don't panic on invalid vote * fix(consensus): don't process votes when not a validator * fix(p2p): dash dialer concurrent map access * fix(e2e): e2e test of manifest generation * refactor(e2e): add a feature to wait for a mockcoreserver is ready * fix(consensus): detect and fix data race during update "round state" of a peer in gossipDataRoutine * fix(typo): modify of an error log * fix(consensus): the test TestReactorInvalidBlockChainLock has wrong order of clean up functions * refactor(bits-array): fix potential deadlock in bit_array.go * fix(consensus): don't try to send commit for height 0 * chore(types): improve logging * chore(consensus): improve logging * test(types): add proposal verification test for hardcoded proposal * test(consensus): enable prevoting NIL in TestReactorValidatorSetChanges * refactor(privval): remove some unused functions from file pv * chore(p2p): apply peer review feedback * refactor(statesync): publish a commit event after successful sync operation (#289) * refactor(bits-array): fix potential deadlock in bit_array.go (#294) * fix(ci): broken arm build (#295) * fix(ci): update apt cache before install deps * chore: remove a few files that were committed by accident * chore: rollback error initializing * chore: rollback one line function * chore(consensus): apply code review feedback Co-authored-by: shotonoff Co-authored-by: Dmitrii Golubev Co-authored-by: Dmitrii Golubev --- .dockerignore | 1 + .github/workflows/coverage.yml | 1 + Makefile | 5 + cmd/tenderdash/commands/light.go | 3 +- crypto/bls12381/bls12381.go | 1 + dash/quorum/validator_conn_executor.go | 5 + dashcore/rpc/client.go | 18 +- go.sum | 8 - internal/blocksync/v0/reactor_test.go | 2 +- internal/blocksync/v2/reactor.go | 2 +- internal/consensus/byzantine_test.go | 2 +- internal/consensus/common_test.go | 2 +- internal/consensus/core_chainlock_test.go | 2 +- internal/consensus/peer_state.go | 42 ++- internal/consensus/reactor.go | 335 +++++++++++------- internal/consensus/reactor_test.go | 16 +- internal/consensus/replay.go | 5 +- internal/consensus/replay_file.go | 5 +- internal/consensus/state.go | 184 ++++++---- internal/consensus/ticker.go | 4 +- internal/consensus/types/round_state.go | 3 +- internal/consensus/wal.go | 2 +- internal/evidence/reactor_test.go | 1 + internal/mempool/v1/mempool.go | 2 +- internal/p2p/conn/connection.go | 9 +- internal/p2p/dash_dialer.go | 6 +- internal/p2p/p2ptest/network.go | 29 +- internal/p2p/peermanager.go | 21 +- internal/p2p/pex/reactor_test.go | 2 +- internal/p2p/router.go | 3 +- internal/state/execution.go | 11 +- internal/statesync/reactor.go | 17 + internal/statesync/reactor_test.go | 1 + internal/statesync/snapshots.go | 5 +- internal/statesync/syncer.go | 2 +- libs/bits/bit_array.go | 74 ++-- libs/bytes/bytes.go | 10 + node/node.go | 14 +- node/setup.go | 2 +- privval/file.go | 103 ++++-- rpc/client/evidence_test.go | 3 +- test/e2e/Makefile | 2 +- test/e2e/docker/Dockerfile | 3 - test/e2e/networks/dashcore.toml | 1 + test/e2e/node/main.go | 1 + test/e2e/pkg/mockcoreserver/core_server.go | 3 +- test/e2e/pkg/mockcoreserver/methods.go | 4 +- test/e2e/pkg/mockcoreserver/server.go | 17 +- test/e2e/pkg/mockcoreserver/server_test.go | 20 +- test/e2e/runner/evidence.go | 3 +- .../internal/test_harness_test.go | 7 +- tools/tm-signer-harness/main.go | 2 +- types/block.go | 14 +- types/node_info.go | 6 +- types/priv_validator.go | 5 +- types/proposal.go | 16 + types/proposal_test.go | 49 +++ types/validator.go | 21 +- types/validator_address.go | 9 +- types/validator_address_test.go | 54 ++- types/validator_set.go | 13 + types/validator_set_test.go | 1 + types/vote.go | 13 +- types/vote_set_test.go | 2 +- types/vote_test.go | 1 + 65 files changed, 810 insertions(+), 420 deletions(-) diff --git a/.dockerignore b/.dockerignore index 54674378c8..b907f2fd04 100644 --- a/.dockerignore +++ b/.dockerignore @@ -4,3 +4,4 @@ test/e2e/networks test/logs test/p2p/data third_party/bls-signatures/build +*.log diff --git a/.github/workflows/coverage.yml b/.github/workflows/coverage.yml index 3cb875a5eb..3358ffacac 100644 --- a/.github/workflows/coverage.yml +++ b/.github/workflows/coverage.yml @@ -106,6 +106,7 @@ jobs: if: matrix.goarch == 'amd64' && env.GIT_DIFF != '' - name: build run: | + sudo apt-get update sudo apt-get install gcc-arm-linux-gnueabihf g++-8-arm-linux-gnueabihf export CC=arm-linux-gnueabihf-gcc export CXX=arm-linux-gnueabihf-g++-8 diff --git a/Makefile b/Makefile index 3db3dcc69c..79f5f15efe 100644 --- a/Makefile +++ b/Makefile @@ -54,6 +54,11 @@ ifeq (boltdb,$(findstring boltdb,$(TENDERMINT_BUILD_OPTIONS))) BUILD_TAGS += boltdb endif +# handle deadlock +ifeq (deadlock,$(findstring deadlock,$(TENDERMINT_BUILD_OPTIONS))) + BUILD_TAGS += deadlock +endif + # allow users to pass additional flags via the conventional LDFLAGS variable LD_FLAGS += $(LDFLAGS) diff --git a/cmd/tenderdash/commands/light.go b/cmd/tenderdash/commands/light.go index ac66842042..6b01e9c412 100644 --- a/cmd/tenderdash/commands/light.go +++ b/cmd/tenderdash/commands/light.go @@ -139,7 +139,8 @@ func runProxy(cmd *cobra.Command, args []string) error { light.DashCoreVerification(), } - dashCoreRPCClient, _ := dashcore.NewRPCClient(dashCoreRPCHost, dashCoreRPCUser, dashCoreRPCPass) + rpcLogger := logger.With("module", dashcore.ModuleName) + dashCoreRPCClient, _ := dashcore.NewRPCClient(dashCoreRPCHost, dashCoreRPCUser, dashCoreRPCPass, rpcLogger) c, err := light.NewHTTPClient( context.Background(), diff --git a/crypto/bls12381/bls12381.go b/crypto/bls12381/bls12381.go index 9c597727d3..193d52abef 100644 --- a/crypto/bls12381/bls12381.go +++ b/crypto/bls12381/bls12381.go @@ -12,6 +12,7 @@ import ( "sort" bls "github.com/dashpay/bls-signatures/go-bindings" + "github.com/tendermint/tendermint/crypto" "github.com/tendermint/tendermint/crypto/tmhash" tmjson "github.com/tendermint/tendermint/libs/json" diff --git a/dash/quorum/validator_conn_executor.go b/dash/quorum/validator_conn_executor.go index e4bda0a6b8..2339c05866 100644 --- a/dash/quorum/validator_conn_executor.go +++ b/dash/quorum/validator_conn_executor.go @@ -381,6 +381,11 @@ func (vc *ValidatorConnExecutor) updateConnections() error { func (vc *ValidatorConnExecutor) filterAddresses(validators validatorMap) validatorMap { filtered := make(validatorMap, len(validators)) for id, validator := range validators { + if vc.proTxHash != nil && string(id) == vc.proTxHash.String() { + vc.Logger.Debug("validator is ourself", "id", id, "address", validator.NodeAddress.String()) + continue + } + if err := validator.ValidateBasic(); err != nil { vc.Logger.Debug("validator address is invalid", "id", id, "address", validator.NodeAddress.String()) continue diff --git a/dashcore/rpc/client.go b/dashcore/rpc/client.go index f53c6aac27..3f3546fd9e 100644 --- a/dashcore/rpc/client.go +++ b/dashcore/rpc/client.go @@ -5,10 +5,14 @@ import ( "github.com/dashevo/dashd-go/btcjson" rpc "github.com/dashevo/dashd-go/rpcclient" + "github.com/tendermint/tendermint/crypto" "github.com/tendermint/tendermint/libs/bytes" + "github.com/tendermint/tendermint/libs/log" ) +const ModuleName = "rpcclient" + type Client interface { // QuorumInfo returns quorum info QuorumInfo(quorumType btcjson.LLMQType, quorumHash crypto.QuorumHash) (*btcjson.QuorumInfoResult, error) @@ -43,11 +47,12 @@ type Client interface { // Handles connection to the underlying dashd instance type RPCClient struct { endpoint *rpc.Client + logger log.Logger } // NewRPCClient returns an instance of Client. // it will start the endpoint (if not already started) -func NewRPCClient(host string, username string, password string) (*RPCClient, error) { +func NewRPCClient(host string, username string, password string, logger log.Logger) (*RPCClient, error) { if host == "" { return nil, fmt.Errorf("unable to establish connection to the Dash Core node") } @@ -67,7 +72,14 @@ func NewRPCClient(host string, username string, password string) (*RPCClient, er return nil, err } - dashCoreClient := RPCClient{endpoint: client} + if logger == nil { + return nil, fmt.Errorf("logger must be set") + } + + dashCoreClient := RPCClient{ + endpoint: client, + logger: logger, + } return &dashCoreClient, nil } @@ -137,7 +149,7 @@ func (rpcClient *RPCClient) QuorumVerify( signature bytes.HexBytes, quorumHash crypto.QuorumHash, ) (bool, error) { - fmt.Printf("quorum verify sig %v quorumhash %s", signature, quorumHash) + rpcClient.logger.Debug("quorum verify", "sig", signature, "quorumhash", quorumHash) return rpcClient.endpoint.QuorumVerify( quorumType, requestID.String(), diff --git a/go.sum b/go.sum index 312219d193..a0df1a7e4b 100644 --- a/go.sum +++ b/go.sum @@ -95,7 +95,6 @@ github.com/Workiva/go-datastructures v1.0.53 h1:J6Y/52yX10Xc5JjXmGtWoSSxs3mZnGSa github.com/Workiva/go-datastructures v1.0.53/go.mod h1:1yZL+zfsztete+ePzZz/Zb1/t5BnDuE2Ya2MMGhzP6A= github.com/adlio/schema v1.2.3 h1:GfKThfEsjS9cCz7gaF8zdXv4cpTdUqdljkKGDTbJjys= github.com/adlio/schema v1.2.3/go.mod h1:nD7ZWmMMbwU12Pqwg+qL0rTvHBrBXfNz+5UQxTfy38M= -github.com/aead/siphash v1.0.1 h1:FwHfE/T45KPKYuuSAKyyvE+oPWcaQ+CUmFW0bPlM+kg= github.com/aead/siphash v1.0.1/go.mod h1:Nywa3cDsYNNK3gaciGTWPwHt0wlpNV15vwmswBAUSII= github.com/afex/hystrix-go v0.0.0-20180502004556-fa1af6a1f4f5/go.mod h1:SkGFH1ia65gfNATL8TAiHDNxPzPdmEL5uirI2Uyuz6c= github.com/ajstarks/svgo v0.0.0-20180226025133-644b8db467af/go.mod h1:K08gAheRH3/J6wwsYMMT4xOr94bZjxIelGM0+d/wbFw= @@ -156,14 +155,11 @@ github.com/btcsuite/btcutil v1.0.3-0.20201208143702-a53e38424cce/go.mod h1:0DVlH github.com/btcsuite/go-socks v0.0.0-20170105172521-4720035b7bfd h1:R/opQEbFEy9JGkIguV40SvRY1uliPX8ifOvi6ICsFCw= github.com/btcsuite/go-socks v0.0.0-20170105172521-4720035b7bfd/go.mod h1:HHNXQzUsZCxOoE+CPiyCTO6x34Zs86zZUiwtpXoGdtg= github.com/btcsuite/goleveldb v0.0.0-20160330041536-7834afc9e8cd/go.mod h1:F+uVaaLLH7j4eDXPRvw78tMflu7Ie2bzYOH4Y8rRKBY= -github.com/btcsuite/goleveldb v1.0.0 h1:Tvd0BfvqX9o823q1j2UZ/epQo09eJh6dTcRp79ilIN4= github.com/btcsuite/goleveldb v1.0.0/go.mod h1:QiK9vBlgftBg6rWQIj6wFzbPfRjiykIEhBH4obrXJ/I= github.com/btcsuite/snappy-go v0.0.0-20151229074030-0bdef8d06723/go.mod h1:8woku9dyThutzjeg+3xrA5iCpBRH8XEEg3lh6TiUghc= -github.com/btcsuite/snappy-go v1.0.0 h1:ZxaA6lo2EpxGddsA8JwWOcxlzRybb444sgmeJQMJGQE= github.com/btcsuite/snappy-go v1.0.0/go.mod h1:8woku9dyThutzjeg+3xrA5iCpBRH8XEEg3lh6TiUghc= github.com/btcsuite/websocket v0.0.0-20150119174127-31079b680792 h1:R8vQdOQdZ9Y3SkEwmHoWBmX1DNXhXZqlTpq6s4tyJGc= github.com/btcsuite/websocket v0.0.0-20150119174127-31079b680792/go.mod h1:ghJtEyQwv5/p4Mg4C0fgbePVuGr935/5ddU9Z3TmDRY= -github.com/btcsuite/winsvc v1.0.0 h1:J9B4L7e3oqhXOcm+2IuNApwzQec85lE+QaikUcCs+dk= github.com/btcsuite/winsvc v1.0.0/go.mod h1:jsenWakMcC0zFBFurPLEAyrnc/teJEM1O46fmI40EZs= github.com/butuzov/ireturn v0.1.1 h1:QvrO2QF2+/Cx1WA/vETCIYBKtRjc30vesdoPUNo1EbY= github.com/butuzov/ireturn v0.1.1/go.mod h1:Wh6Zl3IMtTpaIKbmwzqi6olnM9ptYQxxVacMsOEFPoc= @@ -242,7 +238,6 @@ github.com/decred/dcrd/crypto/blake256 v1.0.0 h1:/8DMNYp9SGi5f0w7uCm6d6M4OU2rGFK github.com/decred/dcrd/crypto/blake256 v1.0.0/go.mod h1:sQl2p6Y26YV+ZOcSTP6thNdn47hh8kt6rqSlvmrXFAc= github.com/decred/dcrd/dcrec/secp256k1/v4 v4.0.1 h1:YLtO71vCjJRCBcrPMtQ9nqBsqpA1m5sE92cU+pd5Mcc= github.com/decred/dcrd/dcrec/secp256k1/v4 v4.0.1/go.mod h1:hyedUtir6IdtD/7lIxGeCxkaw7y45JueMRL4DIyJDKs= -github.com/decred/dcrd/lru v1.0.0 h1:Kbsb1SFDsIlaupWPwsPp+dkxiBY1frcS07PCPgotKz8= github.com/decred/dcrd/lru v1.0.0/go.mod h1:mxKOwFd7lFjN2GZYsiz/ecgqR6kkYAl+0pz0tEMk218= github.com/denis-tingajkin/go-header v0.4.2 h1:jEeSF4sdv8/3cT/WY8AgDHUoItNSoEZ7qg9dX7pc218= github.com/denis-tingajkin/go-header v0.4.2/go.mod h1:eLRHAVXzE5atsKAnNRDB90WHCFFnBUn4RN0nRcs1LJA= @@ -588,7 +583,6 @@ github.com/inconshreveable/mousetrap v1.0.0 h1:Z8tu5sraLXCXIcARxBp/8cbvlwVa7Z1NH github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8= github.com/influxdata/influxdb1-client v0.0.0-20200827194710-b269163b24ab/go.mod h1:qj24IKcXYK6Iy9ceXlo3Tc+vtHo9lIhSX5JddghvEPo= github.com/jessevdk/go-flags v0.0.0-20141203071132-1679536dcc89/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI= -github.com/jessevdk/go-flags v1.4.0 h1:4IU2WS7AumrZ/40jfhf4QVDMsQwqA7VEHozFRrGARJA= github.com/jessevdk/go-flags v1.4.0/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI= github.com/jgautheron/goconst v1.5.1 h1:HxVbL1MhydKs8R8n/HE5NPvzfaYmQJA3o879lE4+WcM= github.com/jgautheron/goconst v1.5.1/go.mod h1:aAosetZ5zaeC/2EfMeRswtxUFBpe2Hr7HzkgX4fanO4= @@ -607,7 +601,6 @@ github.com/jonboulle/clockwork v0.1.0/go.mod h1:Ii8DK3G1RaLaWxj9trq07+26W01tbo22 github.com/jonboulle/clockwork v0.2.0/go.mod h1:Pkfl5aHPm1nk2H9h0bjmnJD/BcgbGXUBGnn1kMkgxc8= github.com/josharian/txtarfs v0.0.0-20210218200122-0702f000015a/go.mod h1:izVPOvVRsHiKkeGCT6tYBNWyDVuzj9wAaBb5R9qamfw= github.com/jpillora/backoff v1.0.0/go.mod h1:J/6gKK9jxlEcS3zixgDgUAsiuZ7yrSoa/FX5e0EB2j4= -github.com/jrick/logrotate v1.0.0 h1:lQ1bL/n9mBNeIXoTUoYRlK4dHuNJVofX9oWqBtPnSzI= github.com/jrick/logrotate v1.0.0/go.mod h1:LNinyqDIJnpAur+b8yyulnQw/wDuN1+BYKlTRt3OuAQ= github.com/json-iterator/go v1.1.6/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU= github.com/json-iterator/go v1.1.7/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4= @@ -633,7 +626,6 @@ github.com/kisielk/errcheck v1.6.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI github.com/kisielk/gotool v1.0.0 h1:AV2c/EiW3KqPNT9ZKl07ehoAGi4C5/01Cfbblndcapg= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= github.com/kkdai/bstream v0.0.0-20161212061736-f391b8402d23/go.mod h1:J+Gs4SYgM6CZQHDETBtE9HaSEkGmuNXF86RwHhHUvq4= -github.com/kkdai/bstream v1.0.0 h1:Se5gHwgp2VT2uHfDrkbbgbgEvV9cimLELwrPJctSjg8= github.com/kkdai/bstream v1.0.0/go.mod h1:FDnDOHt5Yx4p3FaHcioFT0QjDOtgUpvjeZqAs+NVZZA= github.com/klauspost/compress v1.13.4/go.mod h1:8dP1Hq4DHOhN9w426knH3Rhby4rFm6D8eO+e+Dq5Gzg= github.com/klauspost/compress v1.13.5/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk= diff --git a/internal/blocksync/v0/reactor_test.go b/internal/blocksync/v0/reactor_test.go index eeac8be614..b551daa17d 100644 --- a/internal/blocksync/v0/reactor_test.go +++ b/internal/blocksync/v0/reactor_test.go @@ -343,7 +343,7 @@ func TestReactor_BadBlockStopsPeer(t *testing.T) { newNode := rts.network.MakeNode(t, nil, p2ptest.NodeOptions{ MaxPeers: uint16(len(rts.nodes) + 1), MaxConnected: uint16(len(rts.nodes) + 1), - }) + }, log.TestingLogger()) rts.addNode(t, newNode.NodeID, otherGenDoc, otherPrivVals[0], maxBlockHeight) // add a fake peer just so we do not wait for the consensus ticker to timeout diff --git a/internal/blocksync/v2/reactor.go b/internal/blocksync/v2/reactor.go index 04be814e9e..bc2a317952 100644 --- a/internal/blocksync/v2/reactor.go +++ b/internal/blocksync/v2/reactor.go @@ -510,7 +510,7 @@ func (r *BlockchainReactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) { return } - r.logger.Debug("received", "msg", msgProto) + // r.logger.Debug("received", "msg", msgProto) switch msg := msgProto.Sum.(type) { case *bcproto.Message_StatusRequest: diff --git a/internal/consensus/byzantine_test.go b/internal/consensus/byzantine_test.go index 88d1f92832..c48d5eaeb4 100644 --- a/internal/consensus/byzantine_test.go +++ b/internal/consensus/byzantine_test.go @@ -232,7 +232,7 @@ func TestByzantinePrevoteEquivocation(t *testing.T) { lazyNodeState.sendInternalMessage(msgInfo{&BlockPartMessage{lazyNodeState.Height, lazyNodeState.Round, part}, ""}) } lazyNodeState.Logger.Info("Signed proposal", "height", height, "round", round, "proposal", proposal) - lazyNodeState.Logger.Debug(fmt.Sprintf("Signed proposal block: %v", block)) + lazyNodeState.Logger.Debug("signed proposal block", "block", block) } else if !lazyNodeState.replayMode { lazyNodeState.Logger.Error("enterPropose: Error signing proposal", "height", height, "round", round, "err", err) } diff --git a/internal/consensus/common_test.go b/internal/consensus/common_test.go index 48e5d32455..a45eccbc88 100644 --- a/internal/consensus/common_test.go +++ b/internal/consensus/common_test.go @@ -1017,7 +1017,7 @@ func randConsensusNetWithPeers( proTxHash, _ := privVal.GetProTxHash(context.Background()) css[i] = newStateWithConfig(thisConfig, state, privVal, app) - css[i].SetLogger(logger.With("validator", i, "proTxHash", proTxHash.ShortString(), "module", "consensus")) + css[i].SetLogger(logger.With("validator", i, "node_proTxHash", proTxHash.ShortString(), "module", "consensus")) css[i].SetTimeoutTicker(tickerFunc()) } return css, genDoc, peer0Config, func() { diff --git a/internal/consensus/core_chainlock_test.go b/internal/consensus/core_chainlock_test.go index 9b2a808498..156455f5e4 100644 --- a/internal/consensus/core_chainlock_test.go +++ b/internal/consensus/core_chainlock_test.go @@ -119,7 +119,7 @@ func TestReactorInvalidBlockChainLock(t *testing.T) { newMockTickerFunc(true), newCounterWithBackwardsCoreChainLocks, ) - defer cleanup() + t.Cleanup(cleanup) for i := 0; i < nVals; i++ { ticker := NewTimeoutTicker() diff --git a/internal/consensus/peer_state.go b/internal/consensus/peer_state.go index 496d027c4b..8ad6e8f79f 100644 --- a/internal/consensus/peer_state.go +++ b/internal/consensus/peer_state.go @@ -49,7 +49,7 @@ type PeerState struct { } // NewPeerState returns a new PeerState for the given node ID. -func NewPeerState(logger log.Logger, peerID types.NodeID, proTxHash types.ProTxHash) *PeerState { +func NewPeerState(logger log.Logger, peerID types.NodeID) *PeerState { return &PeerState{ peerID: peerID, logger: logger, @@ -60,8 +60,7 @@ func NewPeerState(logger log.Logger, peerID types.NodeID, proTxHash types.ProTxH LastCommitRound: -1, CatchupCommitRound: -1, }, - Stats: &peerStateStats{}, - ProTxHash: proTxHash, + Stats: &peerStateStats{}, } } @@ -85,13 +84,20 @@ func (ps *PeerState) IsRunning() bool { // GetRoundState returns a shallow copy of the PeerRoundState. There's no point // in mutating it since it won't change PeerState. func (ps *PeerState) GetRoundState() *cstypes.PeerRoundState { - ps.mtx.Lock() - defer ps.mtx.Unlock() + ps.mtx.RLock() + defer ps.mtx.RUnlock() prs := ps.PRS.Copy() return &prs } +// UpdateRoundState ensures that the update function is called using the blocking mechanism +func (ps *PeerState) UpdateRoundState(fn func(prs *cstypes.PeerRoundState)) { + ps.mtx.Lock() + defer ps.mtx.Unlock() + fn(&ps.PRS) +} + // ToJSON returns a json of PeerState. func (ps *PeerState) ToJSON() ([]byte, error) { ps.mtx.Lock() @@ -151,14 +157,15 @@ func (ps *PeerState) InitProposalBlockParts(partSetHeader types.PartSetHeader) { // SetHasProposalBlockPart sets the given block part index as known for the peer. func (ps *PeerState) SetHasProposalBlockPart(height int64, round int32, index int) { - ps.mtx.Lock() - defer ps.mtx.Unlock() - - if ps.PRS.Height != height || ps.PRS.Round != round { + prs := ps.GetRoundState() + if prs.Height != height || prs.Round != round { + ps.logger.Debug("SetHasProposalBlockPart height/round mismatch", + "height", height, "round", round, "peer_height", prs.Height, "peer_round", prs.Round) return } - + ps.mtx.Lock() ps.PRS.ProposalBlockParts.SetIndex(index, true) + ps.mtx.Unlock() } // PickVoteToSend picks a vote to send to the peer. It will return true if a @@ -368,10 +375,17 @@ func (ps *PeerState) RecordBlockPart() int { // BlockPartsSent returns the number of useful block parts the peer has sent us. func (ps *PeerState) BlockPartsSent() int { + ps.mtx.RLock() + defer ps.mtx.RUnlock() + + return ps.Stats.BlockParts +} + +func (ps *PeerState) SetProTxHash(proTxHash types.ProTxHash) { ps.mtx.Lock() defer ps.mtx.Unlock() - return ps.Stats.BlockParts + ps.ProTxHash = proTxHash.Copy() } // SetHasVote sets the given vote as known by the peer @@ -386,7 +400,7 @@ func (ps *PeerState) setHasVote(height int64, round int32, voteType tmproto.Sign ps.logger.Debug( "peerState setHasVote", - "peer_id", ps.peerID, + "peer", ps.peerID, "height", height, "round", round, "peer_height", ps.PRS.Height, @@ -579,8 +593,8 @@ func (ps *PeerState) String() string { // StringIndented returns a string representation of the PeerState func (ps *PeerState) StringIndented(indent string) string { - ps.mtx.Lock() - defer ps.mtx.Unlock() + ps.mtx.RLock() + defer ps.mtx.RUnlock() return fmt.Sprintf(`PeerState{ %s Key %v %s RoundState %v diff --git a/internal/consensus/reactor.go b/internal/consensus/reactor.go index c39e218857..7692433fc4 100644 --- a/internal/consensus/reactor.go +++ b/internal/consensus/reactor.go @@ -61,9 +61,9 @@ var ( ID: byte(VoteChannel), Priority: 10, SendQueueCapacity: 64, - RecvBufferCapacity: 128, + RecvBufferCapacity: 4096, RecvMessageCapacity: maxMsgSize, - MaxSendBytes: 150, + MaxSendBytes: 4096, }, }, VoteSetBitsChannel: { @@ -289,7 +289,7 @@ func (r *Reactor) SwitchToConsensus(state sm.State, skipWAL bool) { // NOTE: The line below causes broadcastNewRoundStepRoutine() to broadcast a // NewRoundStepMessage. - r.state.updateToState(state, nil, r.Logger) + r.state.updateToState(state, nil) r.mtx.Lock() r.waitSync = false @@ -384,6 +384,7 @@ func (r *Reactor) broadcastHasVoteMessage(vote *types.Vote) { Index: vote.ValidatorIndex, }, } + r.Logger.Debug("sent HasVoteMessage broadcast", "vote", vote) } // Broadcasts HasCommitMessage to peers that care. @@ -395,6 +396,8 @@ func (r *Reactor) broadcastHasCommitMessage(commit *types.Commit) { Round: commit.Round, }, } + + r.Logger.Debug("sent HasCommitMessage broadcast", "commit", commit) } // subscribeToBroadcastEvents subscribes for new round steps and votes using the @@ -509,22 +512,20 @@ func (r *Reactor) gossipDataForCatchup(rs *cstypes.RoundState, prs *cstypes.Peer return } - partProto, err := part.ToProto() - if err != nil { - logger.Error("failed to convert block part to proto", "err", err) - + if err := r.sendProposalBlockPart(ps, part, prs.Height, prs.Round); err != nil { + logger.Error("cannot send proposal block part to the peer", "error", err) time.Sleep(r.state.config.PeerGossipSleepDuration) - return } - logger.Debug("sending block part for catchup", "round", prs.Round, "index", index) - r.dataCh.Out <- p2p.Envelope{ - To: ps.peerID, - Message: &tmcons.BlockPart{ - Height: prs.Height, // not our height, so it does not matter. - Round: prs.Round, // not our height, so it does not matter - Part: *partProto, - }, + return + } + + // block parts already delivered - send commits? + if rs.Height > 0 && !prs.HasCommit { + if err := r.gossipCommit(rs, ps, prs); err != nil { + logger.Error("cannot gossip commit to peer", "error", err) + } else { + time.Sleep(r.state.config.PeerGossipSleepDuration) } return @@ -556,35 +557,25 @@ OUTER_LOOP: rs := r.state.GetRoundState() prs := ps.GetRoundState() - isValidator := rs.Validators.HasProTxHash(ps.ProTxHash) + isValidator := r.isValidator(ps.ProTxHash) // Send proposal Block parts? if (isValidator && rs.ProposalBlockParts.HasHeader(prs.ProposalBlockPartSetHeader)) || (prs.HasCommit && rs.ProposalBlockParts != nil) { if !isValidator && prs.HasCommit && prs.ProposalBlockParts == nil { // We can assume if they have the commit then they should have the same part set header - ps.PRS.ProposalBlockPartSetHeader = rs.ProposalBlockParts.Header() - ps.PRS.ProposalBlockParts = bits.NewBitArray(int(rs.ProposalBlockParts.Header().Total)) + ps.UpdateRoundState(func(prs *cstypes.PeerRoundState) { + prs.ProposalBlockPartSetHeader = rs.ProposalBlockParts.Header() + prs.ProposalBlockParts = bits.NewBitArray(int(rs.ProposalBlockParts.Header().Total)) + }) } if index, ok := rs.ProposalBlockParts.BitArray().Sub(prs.ProposalBlockParts.Copy()).PickRandom(); ok { part := rs.ProposalBlockParts.GetPart(index) - partProto, err := part.ToProto() - if err != nil { - logger.Error("failed to convert block part to proto", "err", err) - return - } - logger.Debug("sending block part", "height", prs.Height, "round", prs.Round) - r.dataCh.Out <- p2p.Envelope{ - To: ps.peerID, - Message: &tmcons.BlockPart{ - Height: rs.Height, // this tells peer that this part applies to us - Round: rs.Round, // this tells peer that this part applies to us - Part: *partProto, - }, + if err := r.sendProposalBlockPart(ps, part, prs.Height, prs.Round); err != nil { + logger.Error("cannot send proposal block part to the peer", "error", err) + time.Sleep(r.state.config.PeerGossipSleepDuration) } - - ps.SetHasProposalBlockPart(prs.Height, prs.Round, index) continue OUTER_LOOP } } @@ -676,25 +667,48 @@ OUTER_LOOP: } } +func (r *Reactor) sendProposalBlockPart(ps *PeerState, part *types.Part, height int64, round int32) error { + partProto, err := part.ToProto() + if err != nil { + return fmt.Errorf("failed to convert block part to proto, error: %w", err) + } + + r.Logger.Debug("sending block part for catchup", "round", round, "height", height, "index", part.Index, "peer", ps.peerID) + r.dataCh.Out <- p2p.Envelope{ + To: ps.peerID, + Message: &tmcons.BlockPart{ + Height: height, // not our height, so it does not matter. + Round: round, // not our height, so it does not matter + Part: *partProto, + }, + } + + ps.SetHasProposalBlockPart(height, round, int(part.Index)) + return nil +} + // pickSendVote picks a vote and sends it to the peer. It will return true if // there is a vote to send and false otherwise. func (r *Reactor) pickSendVote(ps *PeerState, votes types.VoteSetReader) bool { if vote, ok := ps.PickVoteToSend(votes); ok { psJSON, _ := ps.ToJSON() + voteProto := vote.ToProto() ps.logger.Debug( - "Sending vote message", + "sending vote message", "ps", psJSON, - "peer_id", ps.peerID, + "peer", ps.peerID, "vote", vote, "peer_proTxHash", ps.ProTxHash.ShortString(), "val_proTxHash", vote.ValidatorProTxHash.ShortString(), "height", vote.Height, "round", vote.Round, + "size", voteProto.Size(), + "isValidator", r.isValidator(vote.ValidatorProTxHash), ) r.voteCh.Out <- p2p.Envelope{ To: ps.peerID, Message: &tmcons.Vote{ - Vote: vote.ToProto(), + Vote: voteProto, }, } @@ -705,19 +719,19 @@ func (r *Reactor) pickSendVote(ps *PeerState, votes types.VoteSetReader) bool { return false } -func (r *Reactor) sendCommit(ps *PeerState, commit *types.Commit) bool { +func (r *Reactor) sendCommit(ps *PeerState, commit *types.Commit) error { if commit == nil { - return false + return fmt.Errorf("attempt to send nil commit to peer %s", ps.peerID) } - r.Logger.Debug("sending commit message", "ps", ps, "commit", commit) + protoCommit := commit.ToProto() + r.Logger.Debug("sending commit message", "height", commit.Height, "round", commit.Round, "peer", ps.peerID) r.voteCh.Out <- p2p.Envelope{ To: ps.peerID, Message: &tmcons.Commit{ - Commit: commit.ToProto(), + Commit: protoCommit, }, } - ps.SetHasCommit(commit) - return true + return nil } func (r *Reactor) gossipVotesForHeight(rs *cstypes.RoundState, prs *cstypes.PeerRoundState, ps *PeerState) bool { @@ -778,6 +792,32 @@ func (r *Reactor) gossipVotesForHeight(rs *cstypes.RoundState, prs *cstypes.Peer return false } +// gossipCommit sends a commit to the peer +func (r *Reactor) gossipCommit(rs *cstypes.RoundState, ps *PeerState, prs *cstypes.PeerRoundState) error { + // logger := r.Logger.With("height", rs.Height, "peer_height", prs.Height, "peer", ps.peerID) + var commit *types.Commit + blockStoreBase := r.state.blockStore.Base() + + if prs.Height+1 == rs.Height && !prs.HasCommit { + commit = rs.LastCommit + } else if rs.Height >= prs.Height+2 && prs.Height >= blockStoreBase && !prs.HasCommit { + // Load the block commit for prs.Height, which contains precommit + // signatures for prs.Height. + commit = r.state.blockStore.LoadBlockCommit(prs.Height) + } + + if commit == nil { + return fmt.Errorf("commit at height %d not found", prs.Height) + } + + if err := r.sendCommit(ps, commit); err != nil { + return fmt.Errorf("failed to send commit to peer: %w", err) + } + + ps.SetHasCommit(commit) + return nil // success +} + func (r *Reactor) gossipVotesAndCommitRoutine(ps *PeerState) { logger := r.Logger.With("peer", ps.peerID) @@ -804,8 +844,7 @@ OUTER_LOOP: rs := r.state.GetRoundState() prs := ps.GetRoundState() - isValidator := rs.Validators.HasProTxHash(ps.ProTxHash) - wasValidator := rs.LastValidators.HasProTxHash(ps.ProTxHash) + isValidator := r.isValidator(ps.ProTxHash) switch logThrottle { case 1: // first sleep @@ -814,42 +853,31 @@ OUTER_LOOP: logThrottle = 0 } - // if height matches, then send LastCommit, Prevotes, and Precommits - if rs.Height == prs.Height { - if !wasValidator { - // If there are lastCommits to send... - if prs.Step == cstypes.RoundStepNewHeight && prs.Height+1 == rs.Height && !prs.HasCommit { - if r.sendCommit(ps, rs.LastCommit) { - logger.Info("Sending LastCommit to non-validator node") - continue OUTER_LOOP - } - } - } - if isValidator { - if r.gossipVotesForHeight(rs, prs, ps) { - continue OUTER_LOOP - } + // If there are lastCommits to send... + //prs.Step == cstypes.RoundStepNewHeight && + if prs.Height > 0 && prs.Height+1 == rs.Height && !prs.HasCommit { + if err := r.gossipCommit(rs, ps, prs); err != nil { + logger.Error("cannot send LastCommit to peer node", "error", err) + } else { + logger.Info("sending LastCommit to peer node", "peer_height", prs.Height) } + continue OUTER_LOOP } - // special catchup logic -- if peer is lagging by height 1, send LastCommit - if prs.Height != 0 && rs.Height == prs.Height+1 && wasValidator { - if r.pickSendVote(ps, rs.LastPrecommits) { - logger.Debug("picked rs.LastCommit to send", "height", prs.Height) + // if height matches, then send LastCommit, Prevotes, and Precommits + if isValidator && rs.Height == prs.Height { + if r.gossipVotesForHeight(rs, prs, ps) { continue OUTER_LOOP } } // catchup logic -- if peer is lagging by more than 1, send Commit + // note that peer can ignore a commit if it doesn't have a complete block, + // so we might need to resend it until it notifies us that it's all right blockStoreBase := r.state.blockStore.Base() - if blockStoreBase > 0 && prs.Height != 0 && rs.Height >= prs.Height+2 && prs.Height >= blockStoreBase { - // Load the block commit for prs.Height, which contains precommit - // signatures for prs.Height. - if commit := r.state.blockStore.LoadBlockCommit(prs.Height); commit != nil { - if r.sendCommit(ps, commit) { - logger.Debug("picked Catchup commit to send", "height", prs.Height) - continue OUTER_LOOP - } + if rs.Height >= prs.Height+2 && prs.Height >= blockStoreBase && !prs.HasCommit { + if err := r.gossipCommit(rs, ps, prs); err != nil { + logger.Error("cannot gossip commit to peer", "error", err) } } @@ -858,10 +886,13 @@ OUTER_LOOP: logThrottle = 1 logger.Debug( "no votes to send; sleeping", + "peer_protxhash", ps.ProTxHash, "rs.Height", rs.Height, "prs.Height", prs.Height, "localPV", rs.Votes.Prevotes(rs.Round).BitArray(), "peerPV", prs.Prevotes, "localPC", rs.Votes.Precommits(rs.Round).BitArray(), "peerPC", prs.Precommits, + "isValidator", isValidator, + "validators", rs.Validators, ) } else if logThrottle == 2 { logThrottle = 1 @@ -892,6 +923,12 @@ OUTER_LOOP: default: } + // If peer is not a validator, we do nothing + if !r.isValidator(ps.ProTxHash) { + time.Sleep(r.state.config.PeerQueryMaj23SleepDuration) + continue OUTER_LOOP + } + // maybe send Height/Round/Prevotes { rs := r.state.GetRoundState() @@ -988,16 +1025,18 @@ OUTER_LOOP: } } +func (r *Reactor) isValidator(proTxHash types.ProTxHash) bool { + _, vset := r.state.GetValidatorSet() + return vset.HasProTxHash(proTxHash) +} + // processPeerUpdate process a peer update message. For new or reconnected peers, // we create a peer state if one does not exist for the peer, which should always // be the case, and we spawn all the relevant goroutine to broadcast messages to // the peer. During peer removal, we remove the peer for our set of peers and // signal to all spawned goroutines to gracefully exit in a non-blocking manner. func (r *Reactor) processPeerUpdate(peerUpdate p2p.PeerUpdate) { - r.Logger.Debug("received peer update", "peer", peerUpdate.NodeID, "status", peerUpdate.Status) - - r.mtx.Lock() - defer r.mtx.Unlock() + r.Logger.Debug("received peer update", "peer", peerUpdate.NodeID, "status", peerUpdate.Status, "peer_protxhash", peerUpdate.ProTxHash.ShortString()) switch peerUpdate.Status { case p2p.PeerStatusUp: @@ -1008,59 +1047,91 @@ func (r *Reactor) processPeerUpdate(peerUpdate p2p.PeerUpdate) { if !r.IsRunning() { return } + r.peerUp(peerUpdate, 3) + case p2p.PeerStatusDown: + r.peerDown(peerUpdate) + } +} - var ( - ps *PeerState - ok bool - ) +// peerUp starts the peer. If it returns true, the function should be executed one more time +func (r *Reactor) peerUp(peerUpdate p2p.PeerUpdate, retries int) { + if retries < 1 { + r.Logger.Error("peer up failed: max retries exceeded", "peer", peerUpdate.NodeID) + return + } - ps, ok = r.peers[peerUpdate.NodeID] - if !ok { - ps = NewPeerState(r.Logger, peerUpdate.NodeID, peerUpdate.ProTxHash) - r.peers[peerUpdate.NodeID] = ps - } + r.mtx.Lock() + defer r.mtx.Unlock() - if !ps.IsRunning() { - // Set the peer state's closer to signal to all spawned goroutines to exit - // when the peer is removed. We also set the running state to ensure we - // do not spawn multiple instances of the same goroutines and finally we - // set the waitgroup counter so we know when all goroutines have exited. - ps.broadcastWG.Add(3) - ps.SetRunning(true) - - // start goroutines for this peer - go r.gossipDataRoutine(ps) - go r.gossipVotesAndCommitRoutine(ps) - go r.queryMaj23Routine(ps) - - // Send our state to the peer. If we're block-syncing, broadcast a - // RoundStepMessage later upon SwitchToConsensus(). - if !r.waitSync { - go r.sendNewRoundStepMessage(ps.peerID) - } - } + var ( + ps *PeerState + ok bool + ) + ps, ok = r.peers[peerUpdate.NodeID] + if !ok { + ps = NewPeerState(r.Logger, peerUpdate.NodeID) + ps.SetProTxHash(peerUpdate.ProTxHash) + r.peers[peerUpdate.NodeID] = ps + } else if len(peerUpdate.ProTxHash) > 0 { + ps.SetProTxHash(peerUpdate.ProTxHash) + } - case p2p.PeerStatusDown: - ps, ok := r.peers[peerUpdate.NodeID] - if ok && ps.IsRunning() { - // signal to all spawned goroutines for the peer to gracefully exit - ps.closer.Close() - - go func() { - // Wait for all spawned broadcast goroutines to exit before marking the - // peer state as no longer running and removal from the peers map. - ps.broadcastWG.Wait() - - r.mtx.Lock() - delete(r.peers, peerUpdate.NodeID) - r.mtx.Unlock() - - ps.SetRunning(false) - }() + select { + case <-ps.closer.Done(): + // Hmm, someone is closing this peer right now, let's wait and retry + // Note: we run this in a goroutine to not block main goroutine in ps.broadcastWG.Wait() + go func() { + ps.broadcastWG.Wait() + r.peerUp(peerUpdate, retries-1) + }() + return + default: + } + + if !ps.IsRunning() { + // Set the peer state's closer to signal to all spawned goroutines to exit + // when the peer is removed. We also set the running state to ensure we + // do not spawn multiple instances of the same goroutines and finally we + // set the waitgroup counter so we know when all goroutines have exited. + ps.broadcastWG.Add(3) + ps.SetRunning(true) + + // start goroutines for this peer + go r.gossipDataRoutine(ps) + go r.gossipVotesAndCommitRoutine(ps) + go r.queryMaj23Routine(ps) + + // Send our state to the peer. If we're block-syncing, broadcast a + // RoundStepMessage later upon SwitchToConsensus(). + if !r.waitSync { + go r.sendNewRoundStepMessage(ps.peerID) } } } +func (r *Reactor) peerDown(peerUpdate p2p.PeerUpdate) { + r.mtx.Lock() + defer r.mtx.Unlock() + + ps, ok := r.peers[peerUpdate.NodeID] + if ok && ps.IsRunning() { + // signal to all spawned goroutines for the peer to gracefully exit + ps.closer.Close() + + go func() { + // Wait for all spawned broadcast goroutines to exit before marking the + // peer state as no longer running and removal from the peers map. + ps.broadcastWG.Wait() + + r.mtx.Lock() + delete(r.peers, peerUpdate.NodeID) + r.mtx.Unlock() + + ps.SetRunning(false) + }() + } +} + // handleStateMessage handles envelopes sent from peers on the StateChannel. // An error is returned if the message is unrecognized or if validation fails. // If we fail to find the peer state for the envelope sender, we perform a no-op @@ -1164,6 +1235,8 @@ func (r *Reactor) handleDataMessage(envelope p2p.Envelope, msgI Message) error { return nil } + logger.Debug("data channel processing", "msg", envelope.Message, "type", fmt.Sprintf("%T", envelope.Message)) + switch msg := envelope.Message.(type) { case *tmcons.Proposal: pMsg := msgI.(*ProposalMessage) @@ -1197,7 +1270,7 @@ func (r *Reactor) handleVoteMessage(envelope p2p.Envelope, msgI Message) error { ps, ok := r.GetPeerState(envelope.From) if !ok || ps == nil { - r.Logger.Debug("failed to find peer state") + logger.Debug("failed to find peer state") return nil } @@ -1206,7 +1279,7 @@ func (r *Reactor) handleVoteMessage(envelope p2p.Envelope, msgI Message) error { return nil } - logger.Debug("vote channel processing", "msg", envelope.Message) + logger.Debug("vote channel processing", "msg", envelope.Message, "type", fmt.Sprintf("%T", envelope.Message)) switch msg := envelope.Message.(type) { case *tmcons.Commit: c, err := types.CommitFromProto(msg.Commit) @@ -1219,17 +1292,19 @@ func (r *Reactor) handleVoteMessage(envelope p2p.Envelope, msgI Message) error { r.state.peerMsgQueue <- msgInfo{cMsg, envelope.From} case *tmcons.Vote: r.state.mtx.RLock() + isValidator := r.state.Validators.HasProTxHash(r.state.privValidatorProTxHash) height, valSize, lastCommitSize := r.state.Height, r.state.Validators.Size(), r.state.LastPrecommits.Size() r.state.mtx.RUnlock() - vMsg := msgI.(*VoteMessage) + if isValidator { // ignore votes on non-validator nodes; TODO don't even send it + vMsg := msgI.(*VoteMessage) - ps.EnsureVoteBitArrays(height, valSize) - ps.EnsureVoteBitArrays(height-1, lastCommitSize) - ps.SetHasVote(vMsg.Vote) - - r.state.peerMsgQueue <- msgInfo{vMsg, envelope.From} + ps.EnsureVoteBitArrays(height, valSize) + ps.EnsureVoteBitArrays(height-1, lastCommitSize) + ps.SetHasVote(vMsg.Vote) + r.state.peerMsgQueue <- msgInfo{vMsg, envelope.From} + } default: return fmt.Errorf("received unknown message on VoteChannel: %T", msg) } @@ -1326,7 +1401,7 @@ func (r *Reactor) handleMessage(chID p2p.ChannelID, envelope p2p.Envelope) (err return err } - // r.Logger.Debug("received message", "ch_id", chID, "msg", msgI, "peer", envelope.From) + // r.Logger.Debug("received message on channel", "ch_id", chID, "msg", msgI, "peer", envelope.From, "type", fmt.Sprintf("%T", msgI)) switch chID { case StateChannel: diff --git a/internal/consensus/reactor_test.go b/internal/consensus/reactor_test.go index 0f77572369..80eaed32e2 100644 --- a/internal/consensus/reactor_test.go +++ b/internal/consensus/reactor_test.go @@ -24,6 +24,7 @@ import ( "github.com/tendermint/tendermint/crypto/bls12381" "github.com/tendermint/tendermint/crypto/encoding" tmsync "github.com/tendermint/tendermint/internal/libs/sync" + "github.com/tendermint/tendermint/internal/mempool" mempoolv0 "github.com/tendermint/tendermint/internal/mempool/v0" "github.com/tendermint/tendermint/internal/p2p" @@ -82,10 +83,11 @@ func setup(t *testing.T, numNodes int, states []*State, size int) *reactorTestSu ctx, cancel := context.WithCancel(context.Background()) - i := 0 - for nodeID, node := range rts.network.Nodes { + for i := 0; i < numNodes; i++ { state := states[i] - + node := rts.network.NodeByProTxHash(state.privValidatorProTxHash) + require.NotNil(t, node) + nodeID := node.NodeID reactor := NewReactor( state.Logger.With("node", nodeID), state, @@ -96,6 +98,7 @@ func setup(t *testing.T, numNodes int, states []*State, size int) *reactorTestSu node.MakePeerUpdates(t), true, ) + state.timeoutTicker.SetLogger(state.Logger.With("impl", "TimeoutTicker")) reactor.SetEventBus(state.eventBus) @@ -117,8 +120,6 @@ func setup(t *testing.T, numNodes int, states []*State, size int) *reactorTestSu require.NoError(t, reactor.Start()) require.True(t, reactor.IsRunning()) - - i++ } require.Len(t, rts.reactors, numNodes) @@ -526,6 +527,7 @@ func TestReactorRecordsVotesAndBlockParts(t *testing.T) { func TestReactorValidatorSetChanges(t *testing.T) { cfg := configSetup(t) + cfg.Consensus.TimeoutPropose = 2 * time.Second nPeers := 7 nVals := 4 @@ -534,7 +536,7 @@ func TestReactorValidatorSetChanges(t *testing.T) { nVals, nPeers, "consensus_val_set_changes_test", - newMockTickerFunc(true), + func() TimeoutTicker { return NewTimeoutTicker() }, newPersistentKVStoreWithPath, ) t.Cleanup(cleanup) @@ -721,6 +723,7 @@ func (u *validatorUpdater) removeValidatorsAt(height int64, count int) (*privVal func (u *validatorUpdater) updateStatePrivVals(privValUpdate *privValUpdate, height int64) { for i, proTxHash := range privValUpdate.newProTxHashes { j := u.stateIndexMap[proTxHash.String()] + u.states[j].mtx.Lock() u.states[j].privValidator.UpdatePrivateKey( context.Background(), privValUpdate.privKeys[i], @@ -728,6 +731,7 @@ func (u *validatorUpdater) updateStatePrivVals(privValUpdate *privValUpdate, hei privValUpdate.thresholdPubKey, height, ) + u.states[j].mtx.Unlock() } u.lastProTxHashes = privValUpdate.newProTxHashes } diff --git a/internal/consensus/replay.go b/internal/consensus/replay.go index 5fcff54781..98220aee37 100644 --- a/internal/consensus/replay.go +++ b/internal/consensus/replay.go @@ -16,6 +16,7 @@ import ( "github.com/tendermint/tendermint/crypto/merkle" "github.com/tendermint/tendermint/internal/proxy" sm "github.com/tendermint/tendermint/internal/state" + tmbytes "github.com/tendermint/tendermint/libs/bytes" "github.com/tendermint/tendermint/libs/log" "github.com/tendermint/tendermint/types" ) @@ -84,7 +85,7 @@ func (cs *State) readReplayMessage(msg *TimedWALMessage, newStepSub types.Subscr cs.handleMsg(m, true) case timeoutInfo: - cs.Logger.Info("Replay: Timeout", "height", m.Height, "round", m.Round, "step", m.Step, "dur", m.Duration) + cs.Logger.Info("Replay: Timeout", "height", m.Height, "round", m.Round, "step", m.Step.String(), "dur", m.Duration) cs.handleTimeout(m, cs.RoundState) default: return fmt.Errorf("replay: Unknown TimedWALMessage type: %v", reflect.TypeOf(msg.Msg)) @@ -279,7 +280,7 @@ func (h *Handshaker) Handshake(proxyApp proxy.AppConns) (uint64, error) { if blockHeight < 0 { return 0, fmt.Errorf("got a negative last block height (%d) from the app", blockHeight) } - appHash := res.LastBlockAppHash + appHash := tmbytes.HexBytes(res.LastBlockAppHash) h.logger.Info("ABCI Handshake App Info", "height", blockHeight, diff --git a/internal/consensus/replay_file.go b/internal/consensus/replay_file.go index 440426cdc8..7535bd25e7 100644 --- a/internal/consensus/replay_file.go +++ b/internal/consensus/replay_file.go @@ -130,9 +130,8 @@ func (pb *playback) replayReset(count int, newStepSub types.Subscription) error return err } pb.cs.Wait() - - newCS := NewState(pb.cs.config, pb.genesisState.Copy(), pb.cs.blockExec, - pb.cs.blockStore, pb.cs.txNotifier, pb.cs.evpool) + newCS := NewStateWithLogger(pb.cs.config, pb.genesisState.Copy(), pb.cs.blockExec, + pb.cs.blockStore, pb.cs.txNotifier, pb.cs.evpool, pb.cs.Logger, 0) newCS.SetEventBus(pb.cs.eventBus) newCS.startForReplay() diff --git a/internal/consensus/state.go b/internal/consensus/state.go index f0ab538b3c..489561f125 100644 --- a/internal/consensus/state.go +++ b/internal/consensus/state.go @@ -18,6 +18,7 @@ import ( "github.com/tendermint/tendermint/internal/libs/fail" tmsync "github.com/tendermint/tendermint/internal/libs/sync" sm "github.com/tendermint/tendermint/internal/state" + tmbytes "github.com/tendermint/tendermint/libs/bytes" tmevents "github.com/tendermint/tendermint/libs/events" tmjson "github.com/tendermint/tendermint/libs/json" "github.com/tendermint/tendermint/libs/log" @@ -213,7 +214,7 @@ func NewStateWithLogger( cs.reconstructLastCommit(state) } - cs.updateToState(state, nil, logger) + cs.updateToState(state, nil) // NOTE: we do not call scheduleRound0 yet, we do that upon Start() cs.BaseService = *service.NewBaseService(logger, "State", cs) @@ -674,7 +675,7 @@ func (cs *State) reconstructLastCommit(state sm.State) { // Updates State and increments height to match that of state. // The round becomes 0 and cs.Step becomes cstypes.RoundStepNewHeight. -func (cs *State) updateToState(state sm.State, commit *types.Commit, logger log.Logger) { +func (cs *State) updateToState(state sm.State, commit *types.Commit) { if cs.CommitRound > -1 && 0 < cs.Height && cs.Height != state.LastBlockHeight { panic(fmt.Sprintf( "updateToState() expected state height of %v but found %v", @@ -704,11 +705,8 @@ func (cs *State) updateToState(state sm.State, commit *types.Commit, logger log. // signal the new round step, because other services (eg. txNotifier) // depend on having an up-to-date peer state! if state.LastBlockHeight <= cs.state.LastBlockHeight { - if logger == nil { - logger = cs.Logger - } - if logger != nil { - logger.Debug( + if cs.Logger != nil { + cs.Logger.Debug( "ignoring updateToState()", "new_height", state.LastBlockHeight+1, "old_height", cs.state.LastBlockHeight+1, @@ -757,8 +755,8 @@ func (cs *State) updateToState(state sm.State, commit *types.Commit, logger log. height = state.InitialHeight } - if logger != nil { - logger.Debug("updating state height", "newHeight", height) + if cs.Logger != nil { + cs.Logger.Debug("updating state height", "newHeight", height) } // RoundState fields @@ -935,7 +933,6 @@ func (cs *State) handleMsg(mi msgInfo, fromReplay bool) { ) msg, peerID := mi.Msg, mi.PeerID - switch msg := msg.(type) { case *ProposalMessage: // will not cause transition. @@ -954,11 +951,24 @@ func (cs *State) handleMsg(mi msgInfo, fromReplay bool) { "received block part from wrong round", "height", cs.Height, "cs_round", cs.Round, + "block_height", msg.Height, "block_round", msg.Round, ) err = nil } + cs.Logger.Debug( + "received block part", + "height", cs.Height, + "round", cs.Round, + "block_height", msg.Height, + "block_round", msg.Round, + "added", added, + "peer", peerID, + "index", msg.Part.Index, + "error", err, + ) + case *VoteMessage: // attempt to add the vote and dupeout the validator if its a duplicate signature // if the vote gives us a 2/3-any or 2/3-one, we transition @@ -981,6 +991,16 @@ func (cs *State) handleMsg(mi msgInfo, fromReplay bool) { // TODO: If rs.Height == vote.Height && rs.Round < vote.Round, // the peer is sending us CatchupCommit precommits. // We could make note of this and help filter in broadcastHasVoteMessage(). + cs.Logger.Debug( + "received vote", + "height", cs.Height, + "cs_round", cs.Round, + "vote_height", msg.Vote.Height, + "vote_round", msg.Vote.Round, + "added", added, + "peer", peerID, + "error", err, + ) case *CommitMessage: // attempt to add the commit and dupeout the validator if its a duplicate signature // if the vote gives us a 2/3-any or 2/3-one, we transition @@ -988,7 +1008,16 @@ func (cs *State) handleMsg(mi msgInfo, fromReplay bool) { if added { cs.statsMsgQueue <- mi } - + cs.Logger.Debug( + "received commit", + "height", cs.Height, + "cs_round", cs.Round, + "commit_height", msg.Commit.Height, + "commit_round", msg.Commit.Round, + "added", added, + "peer", peerID, + "error", err, + ) default: cs.Logger.Error("unknown msg type", "type", fmt.Sprintf("%T", msg)) return @@ -1007,11 +1036,11 @@ func (cs *State) handleMsg(mi msgInfo, fromReplay bool) { } func (cs *State) handleTimeout(ti timeoutInfo, rs cstypes.RoundState) { - cs.Logger.Debug("received tock", "timeout", ti.Duration, "height", ti.Height, "round", ti.Round, "step", ti.Step) + cs.Logger.Debug("received tock", "timeout", ti.Duration, "height", ti.Height, "round", ti.Round, "step", ti.Step.String()) // timeouts must be for current height, round, step if ti.Height != rs.Height || ti.Round < rs.Round || (ti.Round == rs.Round && ti.Step < rs.Step) { - cs.Logger.Debug("ignoring tock because we are ahead", "height", rs.Height, "round", rs.Round, "step", rs.Step) + cs.Logger.Debug("ignoring tock because we are ahead", "height", rs.Height, "round", rs.Round, "step", rs.Step.String()) return } @@ -1192,6 +1221,7 @@ func (cs *State) needProofBlock(height int64) bool { // Enter (CreateEmptyBlocks, CreateEmptyBlocksInterval > 0 ): // after enterNewRound(height,round), after timeout of CreateEmptyBlocksInterval // Enter (!CreateEmptyBlocks) : after enterNewRound(height,round), once txs are in the mempool +// Caller should hold cs.mtx lock func (cs *State) enterPropose(height int64, round int32) { logger := cs.Logger.With("height", height, "round", round) @@ -1237,12 +1267,12 @@ func (cs *State) enterPropose(height int64, round int32) { // if not a validator, we're done if !cs.Validators.HasProTxHash(proTxHash) { - logger.Debug("propose step; this node is not a validator", "proTxHash", proTxHash, "vals", cs.Validators) + logger.Debug("propose step; this node is not a validator", "proTxHash", proTxHash.ShortString(), "vals", cs.Validators) return } if cs.isProposer(proTxHash) { - logger.Debug("propose step; our turn to propose", "proposer", proTxHash, "privValidator", + logger.Debug("propose step; our turn to propose", "proposer", proTxHash.ShortString(), "privValidator", cs.privValidator) cs.decideProposal(height, round) } else { @@ -1289,6 +1319,7 @@ func (cs *State) defaultDecideProposal(height int64, round int32) { proposal := types.NewProposal(height, proposedChainLockHeight, round, cs.ValidRound, propBlockID) p := proposal.ToProto() validatorsAtProposalHeight := cs.state.ValidatorsAtHeight(p.Height) + quorumHash := validatorsAtProposalHeight.QuorumHash proTxHash, err := cs.privValidator.GetProTxHash(context.Background()) if err != nil { @@ -1300,31 +1331,34 @@ func (cs *State) defaultDecideProposal(height int64, round int32) { ) return } - pubKey, err := cs.privValidator.GetPubKey(context.Background(), validatorsAtProposalHeight.QuorumHash) + pubKey, err := cs.privValidator.GetPubKey(context.Background(), quorumHash) if err != nil { cs.Logger.Error( "propose step; failed signing proposal; couldn't get pubKey", - "height", - height, - "round", - round, - "err", - err, + "height", height, + "round", round, + "err", err, ) return } messageBytes := types.ProposalBlockSignBytes(cs.state.ChainID, p) - cs.Logger.Debug("signing proposal", "height", proposal.Height, "round", proposal.Round, - "proposerProTxHash", proTxHash.ShortString(), "publicKey", pubKey.Bytes(), - "proposalBytes", messageBytes, "quorumType", - validatorsAtProposalHeight.QuorumType, "quorumHash", validatorsAtProposalHeight.QuorumHash) + cs.Logger.Debug( + "signing proposal", + "height", proposal.Height, + "round", proposal.Round, + "proposer_ProTxHash", proTxHash.ShortString(), + "publicKey", tmbytes.HexBytes(pubKey.Bytes()).ShortString(), + "proposalBytes", tmbytes.HexBytes(messageBytes).ShortString(), + "quorumType", validatorsAtProposalHeight.QuorumType, + "quorumHash", quorumHash.ShortString(), + ) // wait the max amount we would wait for a proposal ctx, cancel := context.WithTimeout(context.TODO(), cs.config.TimeoutPropose) defer cancel() if _, err := cs.privValidator.SignProposal(ctx, cs.state.ChainID, validatorsAtProposalHeight.QuorumType, - validatorsAtProposalHeight.QuorumHash, + quorumHash, p, ); err == nil { proposal.Signature = p.Signature @@ -1337,7 +1371,7 @@ func (cs *State) defaultDecideProposal(height int64, round int32) { cs.sendInternalMessage(msgInfo{&BlockPartMessage{cs.Height, cs.Round, part}, ""}) } - cs.Logger.Debug("signed proposal", "height", height, "round", round, "proposal", proposal) + cs.Logger.Debug("signed proposal", "height", height, "round", round, "proposal", proposal, "pubKey", pubKey.HexString()) } else if !cs.replayMode { cs.Logger.Error("propose step; failed signing proposal", "height", height, "round", round, "err", err) } @@ -1955,14 +1989,19 @@ func (cs *State) verifyCommit( block, blockParts := cs.ProposalBlock, cs.ProposalBlockParts if !blockParts.HasHeader(commit.BlockID.PartSetHeader) { - panic("expected ProposalBlockParts header to be commit header") + return false, fmt.Errorf("expected ProposalBlockParts header to be commit header") } if !block.HashesTo(commit.BlockID.Hash) { - panic("cannot finalize commit; proposal block does not hash to commit hash") + cs.Logger.Error("proposal block does not hash to commit hash", + "block", block, + "commit", commit, + "complete_proposal", cs.isProposalComplete(), + ) + return false, fmt.Errorf("cannot finalize commit; proposal block does not hash to commit hash") } if err := cs.blockExec.ValidateBlock(cs.state, block); err != nil { - panic(fmt.Errorf("+2/3 committed an invalid block: %w", err)) + return false, fmt.Errorf("+2/3 committed an invalid block: %w", err) } return true, nil } @@ -1980,10 +2019,9 @@ func (cs *State) addCommit(commit *types.Commit) (added bool, err error) { cs.applyCommit(commit, cs.Logger) // This will relay the commit to peers - if err := cs.eventBus.PublishEventCommit(types.EventDataCommit{Commit: commit}); err != nil { + if err := cs.PublishCommitEvent(commit); err != nil { return added, err } - cs.evsw.FireEvent(types.EventCommitValue, commit) if cs.config.SkipTimeoutCommit { cs.enterNewRound(cs.Height, 0) @@ -1992,6 +2030,16 @@ func (cs *State) addCommit(commit *types.Commit) (added bool, err error) { return added, err } +// PublishCommitEvent ... +func (cs *State) PublishCommitEvent(commit *types.Commit) error { + cs.Logger.Debug("publish commit event", "commit", commit) + if err := cs.eventBus.PublishEventCommit(types.EventDataCommit{Commit: commit}); err != nil { + return err + } + cs.evsw.FireEvent(types.EventCommitValue, commit) + return nil +} + func (cs *State) applyCommit(commit *types.Commit, logger log.Logger) { logger.Info("applying commit", "commit", commit) @@ -2057,7 +2105,7 @@ func (cs *State) applyCommit(commit *types.Commit, logger log.Logger) { cs.RecordMetrics(height, block) // NewHeightStep! - cs.updateToState(stateCopy, commit, logger) + cs.updateToState(stateCopy, commit) fail.Fail() // XXX @@ -2163,7 +2211,9 @@ func (cs *State) defaultSetProposal(proposal *types.Proposal) error { cs.state.Validators.QuorumHash, ) - proposer := cs.Validators.GetProposer() + vset := cs.Validators + height := cs.Height + proposer := vset.GetProposer() // fmt.Printf("verifying request Id %s signID %s quorum hash %s proposalBlockSignBytes %s\n", // hex.EncodeToString(proposalRequestId), @@ -2175,10 +2225,17 @@ func (cs *State) defaultSetProposal(proposal *types.Proposal) error { case proposer.PubKey != nil: // We are part of the validator set if !proposer.PubKey.VerifySignatureDigest(proposalBlockSignID, proposal.Signature) { - cs.Logger.Debug("error verifying signature", "height", proposal.Height, - "round", proposal.Round, "proposer", proposer.ProTxHash.ShortString(), "signature", proposal.Signature, "pubkey", - proposer.PubKey.Bytes(), "quorumType", cs.state.Validators.QuorumType, - "quorumHash", cs.state.Validators.QuorumHash, "proposalSignId", proposalBlockSignID) + cs.Logger.Debug( + "error verifying signature", + "height", proposal.Height, + "cs_height", height, + "round", proposal.Round, + "proposal", proposal, + "proposer", proposer.ProTxHash.ShortString(), + "pubkey", proposer.PubKey.HexString(), + "quorumType", cs.state.Validators.QuorumType, + "quorumHash", cs.state.Validators.QuorumHash, + "proposalSignId", tmbytes.HexBytes(proposalBlockSignID)) return ErrInvalidProposalSignature } case cs.Commit != nil && cs.Commit.Height == proposal.Height && cs.Commit.Round == proposal.Round: @@ -2220,7 +2277,12 @@ func (cs *State) addProposalBlockPart( // Blocks might be reused, so round mismatch is OK if cs.Height != height { - cs.Logger.Debug("received block part from wrong height", "height", height, "round", round) + cs.Logger.Debug( + "received block part from wrong height", + "height", cs.Height, + "round", cs.Round, + "msg_height", height, + "msg_round", round) return false, nil } @@ -2230,8 +2292,10 @@ func (cs *State) addProposalBlockPart( // then receive parts from the previous round - not necessarily a bad peer. cs.Logger.Debug( "received a block part when we are not expecting any", - "height", height, - "round", round, + "height", cs.Height, + "round", cs.Round, + "block_height", height, + "block_round", round, "index", part.Index, "peer", peerID, ) @@ -2392,10 +2456,9 @@ func (cs *State) tryAddVote(vote *types.Vote, peerID types.NodeID) (bool, error) func (cs *State) addVote(vote *types.Vote, peerID types.NodeID) (added bool, err error) { cs.Logger.Debug( "adding vote", - "vote_height", vote.Height, - "vote_type", vote.Type, - "val_index", vote.ValidatorIndex, - "cs_height", cs.Height, + "vote", vote, + "height", cs.Height, + "round", cs.Round, ) // A precommit for the previous height? @@ -2452,22 +2515,16 @@ func (cs *State) addVote(vote *types.Vote, peerID types.NodeID) (added bool, err // Ignore vote if we do not have public keys to verify votes if !cs.Validators.HasPublicKeys { added = false - cs.Logger.Debug("vote received on non-validator, ignoring it", "vote_height", vote.Height, + cs.Logger.Debug("vote received on non-validator, ignoring it", "vote", vote, "cs_height", cs.Height, "peer", peerID) return } cs.Logger.Debug( - "adding vote", - "height", vote.Height, - "round", vote.Round, - "type", vote.Type, - "val_proTxHash", vote.ValidatorProTxHash.ShortString(), - "vote_block_key", vote.BlockID.Key(), - "vote_block_signature", vote.BlockSignature, - "vote_state_signature", vote.StateSignature, - "val_index", vote.ValidatorIndex, - "cs_height", cs.Height, + "adding vote to vote set", + "height", cs.Height, + "round", cs.Round, + "vote", vote, ) height := cs.Height @@ -2476,15 +2533,9 @@ func (cs *State) addVote(vote *types.Vote, peerID types.NodeID) (added bool, err if err != nil { cs.Logger.Error( "error adding vote", - "vote_height", vote.Height, - "vote_round", vote.Round, - "vote_type", vote.Type, - "val_proTxHash", vote.ValidatorProTxHash.ShortString(), - "vote_block_key", vote.BlockID.Key(), - "vote_block_signature", vote.BlockSignature, - "vote_state_signature", vote.StateSignature, - "val_index", vote.ValidatorIndex, + "vote", vote, "cs_height", cs.Height, + "error", err, ) } // Either duplicate, or error upon cs.Votes.AddByIndex() @@ -2689,10 +2740,11 @@ func (cs *State) signAddVote(msgType tmproto.SignedMsgType, hash []byte, header } // TODO: pass pubKey to signVote + start := time.Now() vote, err := cs.signVote(msgType, hash, header) if err == nil { cs.sendInternalMessage(msgInfo{&VoteMessage{vote}, ""}) - cs.Logger.Debug("signed and pushed vote", "height", cs.Height, "round", cs.Round, "vote", vote) + cs.Logger.Debug("signed and pushed vote", "height", cs.Height, "round", cs.Round, "vote", vote, "took", time.Since(start).String()) return vote } diff --git a/internal/consensus/ticker.go b/internal/consensus/ticker.go index fb3571ac86..7cc7198f58 100644 --- a/internal/consensus/ticker.go +++ b/internal/consensus/ticker.go @@ -119,9 +119,9 @@ func (t *timeoutTicker) timeoutRoutine() { // NOTE time.Timer allows duration to be non-positive ti = newti t.timer.Reset(ti.Duration) - t.Logger.Debug("Scheduled timeout", "dur", ti.Duration, "height", ti.Height, "round", ti.Round, "step", ti.Step) + t.Logger.Debug("Scheduled timeout", "dur", ti.Duration, "height", ti.Height, "round", ti.Round, "step", ti.Step.String()) case <-t.timer.C: - t.Logger.Info("Timed out", "dur", ti.Duration, "height", ti.Height, "round", ti.Round, "step", ti.Step) + t.Logger.Info("Timed out", "dur", ti.Duration, "height", ti.Height, "round", ti.Round, "step", ti.Step.String()) // go routine here guarantees timeoutRoutine doesn't block. // Determinism comes from playback in the receiveRoutine. // We can eliminate it by merging the timeoutRoutine into receiveRoutine diff --git a/internal/consensus/types/round_state.go b/internal/consensus/types/round_state.go index c542ffcdd0..1a3331b9e4 100644 --- a/internal/consensus/types/round_state.go +++ b/internal/consensus/types/round_state.go @@ -3,6 +3,7 @@ package types import ( "encoding/json" "fmt" + "strconv" "time" "github.com/tendermint/tendermint/libs/bytes" @@ -55,7 +56,7 @@ func (rs RoundStepType) String() string { case RoundStepApplyCommit: return "RoundStepApplyCommit" default: - return "RoundStepUnknown" // Cannot panic. + return "RoundStepUnknown(" + strconv.Itoa(int(rs)) + ")" // Cannot panic. } } diff --git a/internal/consensus/wal.go b/internal/consensus/wal.go index 0d9efb839f..7d22c4b5fe 100644 --- a/internal/consensus/wal.go +++ b/internal/consensus/wal.go @@ -209,7 +209,7 @@ func (wal *BaseWAL) WriteSync(msg WALMessage) error { if err := wal.FlushAndSync(); err != nil { wal.Logger.Error(`WriteSync failed to flush consensus wal. - WARNING: may result in creating alternative proposals / votes for the current height iff the node restarted`, + WARNING: may result in creating alternative proposals / votes for the current height if the node restarted`, "err", err) return err } diff --git a/internal/evidence/reactor_test.go b/internal/evidence/reactor_test.go index dd04f09b0b..059679c5c3 100644 --- a/internal/evidence/reactor_test.go +++ b/internal/evidence/reactor_test.go @@ -15,6 +15,7 @@ import ( dbm "github.com/tendermint/tm-db" "github.com/dashevo/dashd-go/btcjson" + "github.com/tendermint/tendermint/crypto" "github.com/tendermint/tendermint/crypto/bls12381" "github.com/tendermint/tendermint/crypto/tmhash" diff --git a/internal/mempool/v1/mempool.go b/internal/mempool/v1/mempool.go index 32e2885057..06e9cff2b2 100644 --- a/internal/mempool/v1/mempool.go +++ b/internal/mempool/v1/mempool.go @@ -533,7 +533,7 @@ func (txmp *TxMempool) initTxCallback(wtx *WrappedTx, res *abci.Response, txInfo "rejected bad transaction", "priority", wtx.priority, "tx", fmt.Sprintf("%X", wtx.tx.Hash()), - "peer_id", txInfo.SenderNodeID, + "peer", txInfo.SenderNodeID, "code", checkTxRes.CheckTx.Code, "post_check_err", err, ) diff --git a/internal/p2p/conn/connection.go b/internal/p2p/conn/connection.go index 13a767f058..a024cb13f2 100644 --- a/internal/p2p/conn/connection.go +++ b/internal/p2p/conn/connection.go @@ -2,7 +2,6 @@ package conn import ( "bufio" - "encoding/base64" "errors" "fmt" "io" @@ -326,7 +325,7 @@ func (c *MConnection) String() string { } func (c *MConnection) flush() { - c.Logger.Debug("Flush", "conn", c) + // c.Logger.Debug("Flush", "conn", c) err := c.bufConnWriter.Flush() if err != nil { c.Logger.Debug("MConnection flush failed", "err", err) @@ -358,7 +357,7 @@ func (c *MConnection) Send(chID byte, msgBytes []byte) bool { return false } - c.Logger.Debug("Send", "channel", chID, "conn", c, "msgBytes", base64.StdEncoding.EncodeToString(msgBytes)) + // c.Logger.Debug("Send", "channel", chID, "conn", c, "msgBytes", base64.StdEncoding.EncodeToString(msgBytes)) // Send message to channel. channel, ok := c.channelsIdx[chID] if !ok { @@ -646,7 +645,7 @@ FOR_LOOP: break FOR_LOOP } if msgBytes != nil { - c.Logger.Debug("Received bytes", "chID", channelID, "msgBytes", base64.StdEncoding.EncodeToString(msgBytes)) + // c.Logger.Debug("Received bytes", "chID", channelID, "msgBytes", base64.StdEncoding.EncodeToString(msgBytes)) // NOTE: This means the reactor.Receive runs in the same thread as the p2p recv routine c.onReceive(channelID, msgBytes) } @@ -871,7 +870,7 @@ func (ch *Channel) writePacketMsgTo(w io.Writer) (n int, err error) { // complete, which is owned by the caller and will not be modified. // Not goroutine-safe func (ch *Channel) recvPacketMsg(packet tmp2p.PacketMsg) ([]byte, error) { - ch.Logger.Debug("Read PacketMsg", "conn", ch.conn, "packetData", base64.StdEncoding.EncodeToString(packet.Data)) + // ch.Logger.Debug("Read PacketMsg", "conn", ch.conn, "packetData", base64.StdEncoding.EncodeToString(packet.Data)) var recvCap, recvReceived = ch.desc.RecvMessageCapacity, len(ch.recving) + len(packet.Data) if recvCap < recvReceived { return nil, fmt.Errorf("received message exceeds available capacity: %v < %v", recvCap, recvReceived) diff --git a/internal/p2p/dash_dialer.go b/internal/p2p/dash_dialer.go index 6d5c7fdf2f..e787626a24 100644 --- a/internal/p2p/dash_dialer.go +++ b/internal/p2p/dash_dialer.go @@ -106,8 +106,10 @@ func (cm *routerDashDialer) Resolve(va types.ValidatorAddress) (nodeAddress Node } func (cm *routerDashDialer) lookupIPPort(ctx context.Context, ip net.IP, port uint16) (NodeAddress, error) { - for nodeID, peer := range cm.peerManager.store.peers { - for addr := range peer.AddressInfo { + peers := cm.peerManager.Peers() + for _, nodeID := range peers { + addresses := cm.peerManager.Addresses(nodeID) + for _, addr := range addresses { if endpoints, err := addr.Resolve(ctx); err != nil { for _, item := range endpoints { if item.IP.Equal(ip) && item.Port == port { diff --git a/internal/p2p/p2ptest/network.go b/internal/p2p/p2ptest/network.go index 8d0840f0fe..efabe96574 100644 --- a/internal/p2p/p2ptest/network.go +++ b/internal/p2p/p2ptest/network.go @@ -1,6 +1,7 @@ package p2ptest import ( + "bytes" "context" "math/rand" "testing" @@ -57,13 +58,12 @@ func MakeNetwork(t *testing.T, opts NetworkOptions) *Network { logger: logger, memoryNetwork: p2p.NewMemoryNetwork(logger, opts.BufferSize), } - for i := 0; i < opts.NumNodes; i++ { var proTxHash crypto.ProTxHash if i < len(opts.ProTxHashes) { proTxHash = opts.ProTxHashes[i] } - node := network.MakeNode(t, proTxHash, opts.NodeOpts) + node := network.MakeNode(t, proTxHash, opts.NodeOpts, network.logger.With("validator", i)) network.Nodes[node.NodeID] = node } @@ -94,6 +94,7 @@ func (n *Network) Start(t *testing.T) { for _, targetAddress := range dialQueue[i+1:] { // nodes 0 { + pu.SetProTxHash(peer.ProTxHash) } m.broadcast(pu) } @@ -785,8 +793,8 @@ func (m *PeerManager) Disconnected(peerID types.NodeID) { Status: PeerStatusDown, } peer, ok := m.store.Get(peerID) - if ok { - pu.ProTxHash = peer.ProTxHash + if ok && len(peer.ProTxHash) > 0 { + pu.SetProTxHash(peer.ProTxHash) } m.broadcast(pu) } @@ -1079,7 +1087,7 @@ func (m *PeerManager) SetHeight(peerID types.NodeID, height int64) error { // SetProTxHashToPeerInfo sets a proTxHash in peerInfo.proTxHash to keep this value in a store func SetProTxHashToPeerInfo(proTxHash types.ProTxHash) func(info *peerInfo) { return func(info *peerInfo) { - info.ProTxHash = proTxHash + info.ProTxHash = proTxHash.Copy() } } @@ -1301,6 +1309,7 @@ func (p *peerInfo) Copy() peerInfo { addressInfoCopy := addressInfo.Copy() c.AddressInfo[i] = &addressInfoCopy } + c.ProTxHash = p.ProTxHash.Copy() return c } diff --git a/internal/p2p/pex/reactor_test.go b/internal/p2p/pex/reactor_test.go index a00eedce11..9b6b2c3595 100644 --- a/internal/p2p/pex/reactor_test.go +++ b/internal/p2p/pex/reactor_test.go @@ -443,7 +443,7 @@ func (r *reactorTestSuite) addNodes(t *testing.T, nodes int) { node := r.network.MakeNode(t, nil, p2ptest.NodeOptions{ MaxPeers: r.opts.MaxPeers, MaxConnected: r.opts.MaxConnected, - }) + }, r.logger.With("validator", i)) r.network.Nodes[node.NodeID] = node nodeID := node.NodeID r.pexChannels[nodeID] = node.MakeChannelNoCleanup( diff --git a/internal/p2p/router.go b/internal/p2p/router.go index c9451708ad..80b9b48a66 100644 --- a/internal/p2p/router.go +++ b/internal/p2p/router.go @@ -633,7 +633,8 @@ func (r *Router) openConnection(ctx context.Context, conn Connection) { return } - if err := r.runWithPeerMutex(func() error { return r.peerManager.Accepted(peerInfo.NodeID) }); err != nil { + proTxHashSetter := SetProTxHashToPeerInfo(peerInfo.ProTxHash) + if err := r.runWithPeerMutex(func() error { return r.peerManager.Accepted(peerInfo.NodeID, proTxHashSetter) }); err != nil { r.logger.Error("failed to accept connection", "op", "incoming/accepted", "peer", peerInfo.NodeID, "err", err) return diff --git a/internal/state/execution.go b/internal/state/execution.go index f2044d36de..0a320fa0a8 100644 --- a/internal/state/execution.go +++ b/internal/state/execution.go @@ -13,6 +13,7 @@ import ( "github.com/tendermint/tendermint/internal/libs/fail" "github.com/tendermint/tendermint/internal/mempool" "github.com/tendermint/tendermint/internal/proxy" + tmbytes "github.com/tendermint/tendermint/libs/bytes" "github.com/tendermint/tendermint/libs/log" tmstate "github.com/tendermint/tendermint/proto/tendermint/state" "github.com/tendermint/tendermint/types" @@ -274,12 +275,10 @@ func (blockExec *BlockExecutor) ApplyBlockWithLogger( if len(validators) > 0 { blockExec.logger.Debug( "updates to validators", - "quorumHash", - quorumHash, - "thresholdPublicKey", - thresholdPublicKey, - "updates", - types.ValidatorListString(validators), + "height", block.Height, + "quorumHash", quorumHash.ShortString(), + "thresholdPublicKey", tmbytes.HexBytes(thresholdPublicKey.Bytes()).ShortString(), + "updates", types.ValidatorListString(validators), ) } diff --git a/internal/statesync/reactor.go b/internal/statesync/reactor.go index f1dead6033..c67c201c0d 100644 --- a/internal/statesync/reactor.go +++ b/internal/statesync/reactor.go @@ -12,6 +12,7 @@ import ( abci "github.com/tendermint/tendermint/abci/types" "github.com/tendermint/tendermint/config" dashcore "github.com/tendermint/tendermint/dashcore/rpc" + "github.com/tendermint/tendermint/internal/consensus" tmsync "github.com/tendermint/tendermint/internal/libs/sync" "github.com/tendermint/tendermint/internal/p2p" "github.com/tendermint/tendermint/internal/proxy" @@ -175,6 +176,8 @@ type Reactor struct { backfilledBlocks int64 dashCoreClient dashcore.Client + + csState *consensus.State } // NewReactor returns a reference to a new state sync reactor, which implements @@ -195,6 +198,7 @@ func NewReactor( tempDir string, ssMetrics *Metrics, client dashcore.Client, + csState *consensus.State, ) *Reactor { r := &Reactor{ chainID: chainID, @@ -216,6 +220,7 @@ func NewReactor( providers: make(map[types.NodeID]*BlockProvider), metrics: ssMetrics, dashCoreClient: client, + csState: csState, } r.BaseService = *service.NewBaseService(logger, "StateSync", r) @@ -327,6 +332,11 @@ func (r *Reactor) Sync(ctx context.Context) (sm.State, error) { return sm.State{}, err } + err = r.publishCommitEvent(commit) + if err != nil { + return state, err + } + err = r.stateStore.Bootstrap(state) if err != nil { return sm.State{}, fmt.Errorf("failed to bootstrap node with new state: %w", err) @@ -345,6 +355,13 @@ func (r *Reactor) Sync(ctx context.Context) (sm.State, error) { return state, nil } +func (r *Reactor) publishCommitEvent(commit *types.Commit) error { + if r.csState == nil { + return nil + } + return r.csState.PublishCommitEvent(commit) +} + // Backfill sequentially fetches, verifies and stores light blocks in reverse // order. It does not stop verifying blocks until reaching a block with a height // and time that is less or equal to the stopHeight and stopTime. The diff --git a/internal/statesync/reactor_test.go b/internal/statesync/reactor_test.go index 85d9ff030c..e68c929414 100644 --- a/internal/statesync/reactor_test.go +++ b/internal/statesync/reactor_test.go @@ -176,6 +176,7 @@ func setup( "", m, rts.dashcoreClient, + nil, ) rts.syncer = newSyncer( diff --git a/internal/statesync/snapshots.go b/internal/statesync/snapshots.go index 973d29e392..4c0f4966b1 100644 --- a/internal/statesync/snapshots.go +++ b/internal/statesync/snapshots.go @@ -8,6 +8,7 @@ import ( "strings" tmsync "github.com/tendermint/tendermint/internal/libs/sync" + tmbytes "github.com/tendermint/tendermint/libs/bytes" "github.com/tendermint/tendermint/types" ) @@ -20,10 +21,10 @@ type snapshot struct { CoreChainLockedHeight uint32 Format uint32 Chunks uint32 - Hash []byte + Hash tmbytes.HexBytes Metadata []byte - trustedAppHash []byte // populated by light client + trustedAppHash tmbytes.HexBytes // populated by light client } // Key generates a snapshot key, used for lookups. It takes into account not only the height and diff --git a/internal/statesync/syncer.go b/internal/statesync/syncer.go index b4212961a3..a4398e9687 100644 --- a/internal/statesync/syncer.go +++ b/internal/statesync/syncer.go @@ -134,7 +134,7 @@ func (s *syncer) AddSnapshot(peerID types.NodeID, snapshot *snapshot) (bool, err if added { s.metrics.TotalSnapshots.Add(1) s.logger.Info("Discovered new snapshot", "height", snapshot.Height, "format", snapshot.Format, - "hash", snapshot.Hash) + "hash", snapshot.Hash.ShortString()) } return added, nil } diff --git a/libs/bits/bit_array.go b/libs/bits/bit_array.go index cf3ad712c7..ffe192614e 100644 --- a/libs/bits/bit_array.go +++ b/libs/bits/bit_array.go @@ -8,8 +8,8 @@ import ( mrand "math/rand" "regexp" "strings" - "sync" + tmsync "github.com/tendermint/tendermint/internal/libs/sync" tmmath "github.com/tendermint/tendermint/libs/math" tmrand "github.com/tendermint/tendermint/libs/rand" tmprotobits "github.com/tendermint/tendermint/proto/tendermint/libs/bits" @@ -17,7 +17,7 @@ import ( // BitArray is a thread-safe implementation of a bit array. type BitArray struct { - mtx sync.Mutex + mtx tmsync.RWMutex Bits int `json:"bits"` // NOTE: persisted via reflect, must be exported Elems []uint64 `json:"elems"` // NOTE: persisted via reflect, must be exported } @@ -50,8 +50,8 @@ func (bA *BitArray) GetIndex(i int) bool { if bA == nil { return false } - bA.mtx.Lock() - defer bA.mtx.Unlock() + bA.mtx.RLock() + defer bA.mtx.RUnlock() return bA.getIndex(i) } @@ -90,8 +90,8 @@ func (bA *BitArray) Copy() *BitArray { if bA == nil { return nil } - bA.mtx.Lock() - defer bA.mtx.Unlock() + bA.mtx.RLock() + defer bA.mtx.RUnlock() return bA.copy() } @@ -126,15 +126,14 @@ func (bA *BitArray) Or(o *BitArray) *BitArray { if o == nil { return bA.Copy() } - bA.mtx.Lock() - o.mtx.Lock() + o = o.Copy() + bA.mtx.RLock() + defer bA.mtx.RUnlock() c := bA.copyBits(tmmath.MaxInt(bA.Bits, o.Bits)) smaller := tmmath.MinInt(len(bA.Elems), len(o.Elems)) for i := 0; i < smaller; i++ { c.Elems[i] |= o.Elems[i] } - bA.mtx.Unlock() - o.mtx.Unlock() return c } @@ -145,12 +144,9 @@ func (bA *BitArray) And(o *BitArray) *BitArray { if bA == nil || o == nil { return nil } - bA.mtx.Lock() - o.mtx.Lock() - defer func() { - bA.mtx.Unlock() - o.mtx.Unlock() - }() + o = o.Copy() + bA.mtx.RLock() + defer bA.mtx.RUnlock() return bA.and(o) } @@ -167,8 +163,8 @@ func (bA *BitArray) Not() *BitArray { if bA == nil { return nil // Degenerate } - bA.mtx.Lock() - defer bA.mtx.Unlock() + bA.mtx.RLock() + defer bA.mtx.RUnlock() return bA.not() } @@ -189,8 +185,9 @@ func (bA *BitArray) Sub(o *BitArray) *BitArray { // TODO: Decide if we should do 1's complement here? return nil } - bA.mtx.Lock() - o.mtx.Lock() + o = o.Copy() + bA.mtx.RLock() + defer bA.mtx.RUnlock() // output is the same size as bA c := bA.copyBits(bA.Bits) // Only iterate to the minimum size between the two. @@ -202,8 +199,6 @@ func (bA *BitArray) Sub(o *BitArray) *BitArray { // &^ is and not in golang c.Elems[i] &^= o.Elems[i] } - bA.mtx.Unlock() - o.mtx.Unlock() return c } @@ -212,8 +207,8 @@ func (bA *BitArray) IsEmpty() bool { if bA == nil { return true // should this be opposite? } - bA.mtx.Lock() - defer bA.mtx.Unlock() + bA.mtx.RLock() + defer bA.mtx.RUnlock() for _, e := range bA.Elems { if e > 0 { return false @@ -227,8 +222,8 @@ func (bA *BitArray) IsFull() bool { if bA == nil { return true } - bA.mtx.Lock() - defer bA.mtx.Unlock() + bA.mtx.RLock() + defer bA.mtx.RUnlock() // Check all elements except the last for _, elem := range bA.Elems[:len(bA.Elems)-1] { @@ -251,9 +246,9 @@ func (bA *BitArray) PickRandom() (int, bool) { return 0, false } - bA.mtx.Lock() + bA.mtx.RLock() trueIndices := bA.getTrueIndices() - bA.mtx.Unlock() + bA.mtx.RUnlock() if len(trueIndices) == 0 { // no bits set to true return 0, false @@ -308,12 +303,14 @@ func (bA *BitArray) StringIndented(indent string) string { if bA == nil { return "nil-BitArray" } - bA.mtx.Lock() - defer bA.mtx.Unlock() + bA.mtx.RLock() + defer bA.mtx.RUnlock() return bA.stringIndented(indent) } func (bA *BitArray) CountTrueBits() int { + bA.mtx.RLock() + defer bA.mtx.RUnlock() bits := 0 for i := 0; i < bA.Bits; i++ { if bA.getIndex(i) { @@ -351,8 +348,8 @@ func (bA *BitArray) stringIndented(indent string) string { // Bytes returns the byte representation of the bits within the bitarray. func (bA *BitArray) Bytes() []byte { - bA.mtx.Lock() - defer bA.mtx.Unlock() + bA.mtx.RLock() + defer bA.mtx.RUnlock() numBytes := (bA.Bits + 7) / 8 bytes := make([]byte, numBytes) @@ -370,11 +367,9 @@ func (bA *BitArray) Update(o *BitArray) { if bA == nil || o == nil { return } - + o = o.Copy() bA.mtx.Lock() - o.mtx.Lock() copy(bA.Elems, o.Elems) - o.mtx.Unlock() bA.mtx.Unlock() } @@ -385,8 +380,8 @@ func (bA *BitArray) MarshalJSON() ([]byte, error) { return []byte("null"), nil } - bA.mtx.Lock() - defer bA.mtx.Unlock() + bA.mtx.RLock() + defer bA.mtx.RUnlock() bits := `"` for i := 0; i < bA.Bits; i++ { @@ -441,10 +436,7 @@ func (bA *BitArray) ToProto() *tmprotobits.BitArray { return nil } - bA.mtx.Lock() - defer bA.mtx.Unlock() - - bc := bA.copy() + bc := bA.Copy() return &tmprotobits.BitArray{Bits: int64(bc.Bits), Elems: bc.Elems} } diff --git a/libs/bytes/bytes.go b/libs/bytes/bytes.go index 7aabe98997..a83a04e09f 100644 --- a/libs/bytes/bytes.go +++ b/libs/bytes/bytes.go @@ -101,3 +101,13 @@ func (bz HexBytes) Format(s fmt.State, verb rune) { s.Write([]byte(fmt.Sprintf("%X", []byte(bz)))) } } + +// Copy creates a deep copy of HexBytes. It allocates new buffer and copies data into it. +func (bz HexBytes) Copy() HexBytes { + if bz == nil { + return nil + } + copied := make(HexBytes, len(bz)) + copy(copied, bz) + return copied +} diff --git a/node/node.go b/node/node.go index f96bb5ad88..305e52db80 100644 --- a/node/node.go +++ b/node/node.go @@ -192,7 +192,7 @@ func makeNode(cfg *config.Config, llmqType = btcjson.LLMQType_100_67 }*/ if dashCoreRPCClient == nil { - rpcClient, err := DefaultDashCoreRPCClient(cfg) + rpcClient, err := DefaultDashCoreRPCClient(cfg, logger.With("module", dashcore.ModuleName)) if err != nil { return nil, fmt.Errorf("failed to create Dash Core RPC client %w", err) } @@ -285,7 +285,7 @@ func makeNode(cfg *config.Config, // and replays any blocks as necessary to sync tendermint with the app. consensusLogger := logger.With("module", "consensus") if len(proTxHash) > 0 { - consensusLogger = consensusLogger.With("proTxHash", proTxHash.ShortString()) + consensusLogger = consensusLogger.With("node_proTxHash", proTxHash.ShortString()) } proposedAppVersion := uint64(0) if !stateSync { @@ -435,6 +435,7 @@ func makeNode(cfg *config.Config, cfg.StateSync.TempDir, nodeMetrics.statesync, dashCoreRPCClient, + csState, ) // add the channel descriptors to both the transports @@ -492,12 +493,12 @@ func makeNode(cfg *config.Config, } if cfg.P2P.PexReactor { - pexReactor = createPEXReactorAndAddToSwitch(addrBook, cfg, sw, logger) + pexReactor = createPEXReactorAndAddToSwitch(addrBook, cfg, sw, logger.With("module", "pex")) } } else { addrBook = nil if cfg.P2P.PexReactor { - pexReactor, err = createPEXReactorV2(cfg, logger, peerManager, router) + pexReactor, err = createPEXReactorV2(cfg, logger.With("module", "pex"), peerManager, router) if err != nil { return nil, err } @@ -513,7 +514,7 @@ func makeNode(cfg *config.Config, // Start Dash connection executor var validatorConnExecutor *dashquorum.ValidatorConnExecutor if len(proTxHash) > 0 { - vcLogger := logger.With("proTxHash", proTxHash.ShortString(), "module", "ValidatorConnExecutor") + vcLogger := logger.With("node_proTxHash", proTxHash.ShortString(), "module", "ValidatorConnExecutor") dcm := p2p.NewRouterDashDialer(peerManager, vcLogger) validatorConnExecutor, err = dashquorum.NewValidatorConnExecutor( proTxHash, @@ -599,11 +600,12 @@ func makeNode(cfg *config.Config, } // DefaultDashCoreRPCClient returns RPC client for the Dash Core node -func DefaultDashCoreRPCClient(cfg *config.Config) (dashcore.Client, error) { +func DefaultDashCoreRPCClient(cfg *config.Config, logger log.Logger) (dashcore.Client, error) { return dashcore.NewRPCClient( cfg.PrivValidatorCoreRPCHost, cfg.BaseConfig.PrivValidatorCoreRPCUsername, cfg.BaseConfig.PrivValidatorCoreRPCPassword, + logger, ) } diff --git a/node/setup.go b/node/setup.go index b14c9efb98..604b424ca0 100644 --- a/node/setup.go +++ b/node/setup.go @@ -724,7 +724,7 @@ func makeNodeInfo( TxIndex: txIndexerStatus, RPCAddress: cfg.RPC.ListenAddress, }, - ProTxHash: proTxHash, + ProTxHash: proTxHash.Copy(), } if cfg.P2P.PexReactor { diff --git a/privval/file.go b/privval/file.go index 7bb14d9665..1ab7a02d44 100644 --- a/privval/file.go +++ b/privval/file.go @@ -20,6 +20,7 @@ import ( "github.com/tendermint/tendermint/crypto" "github.com/tendermint/tendermint/internal/libs/protoio" + tmsync "github.com/tendermint/tendermint/internal/libs/sync" "github.com/tendermint/tendermint/internal/libs/tempfile" tmbytes "github.com/tendermint/tendermint/libs/bytes" tmjson "github.com/tendermint/tendermint/libs/json" @@ -81,20 +82,6 @@ func (pvKey FilePVKey) Save() { } -func (pvKey FilePVKey) PrivateKeyForQuorumHash(quorumHash crypto.QuorumHash) (crypto.PrivKey, error) { - if keys, ok := pvKey.PrivateKeys[quorumHash.String()]; ok { - return keys.PrivKey, nil - } - return nil, fmt.Errorf("no private key for quorum hash %v", quorumHash) -} - -func (pvKey FilePVKey) PublicKeyForQuorumHash(quorumHash crypto.QuorumHash) (crypto.PubKey, error) { - if keys, ok := pvKey.PrivateKeys[quorumHash.String()]; ok { - return keys.PubKey, nil - } - return nil, fmt.Errorf("no public key for quorum hash %v", quorumHash) -} - func (pvKey FilePVKey) ThresholdPublicKeyForQuorumHash(quorumHash crypto.QuorumHash) (crypto.PubKey, error) { if keys, ok := pvKey.PrivateKeys[quorumHash.String()]; ok { return keys.ThresholdPublicKey, nil @@ -190,6 +177,7 @@ func (lss *FilePVLastSignState) Save() { type FilePV struct { Key FilePVKey LastSignState FilePVLastSignState + mtx tmsync.RWMutex } // FilePVOption ... @@ -303,12 +291,12 @@ func WithUpdateHeights(updateHeights map[string]crypto.QuorumHash) FilePVOption } // WithProTxHash ... -func WithProTxHash(proTxHash []byte) FilePVOption { +func WithProTxHash(proTxHash types.ProTxHash) FilePVOption { return func(filePV *FilePV) error { if len(proTxHash) != crypto.ProTxHashSize { return fmt.Errorf("error setting incorrect proTxHash size in NewFilePV") } - filePV.Key.ProTxHash = proTxHash + filePV.Key.ProTxHash = proTxHash.Copy() return nil } } @@ -403,6 +391,9 @@ func LoadOrGenFilePV(keyFilePath, stateFilePath string) (*FilePV, error) { // GetPubKey returns the public key of the validator. // Implements PrivValidator. func (pv *FilePV) GetPubKey(context context.Context, quorumHash crypto.QuorumHash) (crypto.PubKey, error) { + pv.mtx.RLock() + defer pv.mtx.RUnlock() + if keys, ok := pv.Key.PrivateKeys[quorumHash.String()]; ok { return keys.PubKey, nil } @@ -412,6 +403,9 @@ func (pv *FilePV) GetPubKey(context context.Context, quorumHash crypto.QuorumHas // GetFirstPubKey returns the first public key of the validator. // Implements PrivValidator. func (pv *FilePV) GetFirstPubKey(context context.Context) (crypto.PubKey, error) { + pv.mtx.RLock() + defer pv.mtx.RUnlock() + for _, quorumKeys := range pv.Key.PrivateKeys { return quorumKeys.PubKey, nil } @@ -419,6 +413,9 @@ func (pv *FilePV) GetFirstPubKey(context context.Context) (crypto.PubKey, error) } func (pv *FilePV) GetQuorumHashes(context context.Context) ([]crypto.QuorumHash, error) { + pv.mtx.RLock() + defer pv.mtx.RUnlock() + quorumHashes := make([]crypto.QuorumHash, len(pv.Key.PrivateKeys)) i := 0 for quorumHashString := range pv.Key.PrivateKeys { @@ -433,6 +430,9 @@ func (pv *FilePV) GetQuorumHashes(context context.Context) ([]crypto.QuorumHash, } func (pv *FilePV) GetFirstQuorumHash(context context.Context) (crypto.QuorumHash, error) { + pv.mtx.RLock() + defer pv.mtx.RUnlock() + for quorumHashString := range pv.Key.PrivateKeys { return hex.DecodeString(quorumHashString) } @@ -441,6 +441,9 @@ func (pv *FilePV) GetFirstQuorumHash(context context.Context) (crypto.QuorumHash // GetThresholdPublicKey ... func (pv *FilePV) GetThresholdPublicKey(context context.Context, quorumHash crypto.QuorumHash) (crypto.PubKey, error) { + pv.mtx.RLock() + defer pv.mtx.RUnlock() + if keys, ok := pv.Key.PrivateKeys[quorumHash.String()]; ok { return keys.ThresholdPublicKey, nil } @@ -449,14 +452,40 @@ func (pv *FilePV) GetThresholdPublicKey(context context.Context, quorumHash cryp // GetPrivateKey ... func (pv *FilePV) GetPrivateKey(context context.Context, quorumHash crypto.QuorumHash) (crypto.PrivKey, error) { + pv.mtx.RLock() + defer pv.mtx.RUnlock() + + return pv.getPrivateKey(context, quorumHash) +} + +func (pv *FilePV) getPrivateKey(context context.Context, quorumHash crypto.QuorumHash) (crypto.PrivKey, error) { if keys, ok := pv.Key.PrivateKeys[quorumHash.String()]; ok { return keys.PrivKey, nil } - return nil, fmt.Errorf("no private key for quorum hash %v", quorumHash) + hashes := make([]string, 0, len(pv.Key.PrivateKeys)) + for hash := range pv.Key.PrivateKeys { + hashes = append(hashes, hash[:8]) + } + return nil, fmt.Errorf("no private key for quorum hash %v, supported are: %v", quorumHash, hashes) +} + +func (pv *FilePV) GetPublicKey(context context.Context, quorumHash crypto.QuorumHash) (crypto.PubKey, error) { + pv.mtx.RLock() + defer pv.mtx.RUnlock() + + privateKey, err := pv.getPrivateKey(context, quorumHash) + if err != nil { + return nil, fmt.Errorf("no public key for quorum hash %v", quorumHash) + } + + return privateKey.PubKey(), nil } // GetHeight ... func (pv *FilePV) GetHeight(context context.Context, quorumHash crypto.QuorumHash) (int64, error) { + pv.mtx.RLock() + defer pv.mtx.RUnlock() + if intString, ok := pv.Key.FirstHeightOfQuorums[quorumHash.String()]; ok { return strconv.ParseInt(intString, 10, 64) } @@ -465,6 +494,9 @@ func (pv *FilePV) GetHeight(context context.Context, quorumHash crypto.QuorumHas // ExtractIntoValidator ... func (pv *FilePV) ExtractIntoValidator(context context.Context, quorumHash crypto.QuorumHash) *types.Validator { + pv.mtx.RLock() + defer pv.mtx.RUnlock() + pubKey, _ := pv.GetPubKey(context, quorumHash) if len(pv.Key.ProTxHash) != crypto.DefaultHashSize { panic("proTxHash wrong length") @@ -479,6 +511,9 @@ func (pv *FilePV) ExtractIntoValidator(context context.Context, quorumHash crypt // GetProTxHash returns the pro tx hash of the validator. // Implements PrivValidator. func (pv *FilePV) GetProTxHash(context context.Context) (crypto.ProTxHash, error) { + pv.mtx.RLock() + defer pv.mtx.RUnlock() + if len(pv.Key.ProTxHash) != crypto.ProTxHashSize { return nil, fmt.Errorf("file proTxHash is invalid size") } @@ -490,6 +525,9 @@ func (pv *FilePV) GetProTxHash(context context.Context) (crypto.ProTxHash, error func (pv *FilePV) SignVote( ctx context.Context, chainID string, quorumType btcjson.LLMQType, quorumHash crypto.QuorumHash, vote *tmproto.Vote, stateID types.StateID, logger log.Logger) error { + pv.mtx.RLock() + defer pv.mtx.RUnlock() + if err := pv.signVote(chainID, quorumType, quorumHash, vote, stateID); err != nil { return fmt.Errorf("error signing vote: %v", err) } @@ -501,6 +539,9 @@ func (pv *FilePV) SignVote( func (pv *FilePV) SignProposal( ctx context.Context, chainID string, quorumType btcjson.LLMQType, quorumHash crypto.QuorumHash, proposal *tmproto.Proposal, ) ([]byte, error) { + pv.mtx.RLock() + defer pv.mtx.RUnlock() + signID, err := pv.signProposal(chainID, quorumType, quorumHash, proposal) if err != nil { return signID, fmt.Errorf("error signing proposal: %v", err) @@ -510,6 +551,9 @@ func (pv *FilePV) SignProposal( // Save persists the FilePV to disk. func (pv *FilePV) Save() { + pv.mtx.Lock() + defer pv.mtx.Unlock() + pv.Key.Save() pv.LastSignState.Save() } @@ -517,6 +561,9 @@ func (pv *FilePV) Save() { // Reset resets all fields in the FilePV. // NOTE: Unsafe! func (pv *FilePV) Reset() { + pv.mtx.Lock() + defer pv.mtx.Unlock() + var blockSig []byte var stateSig []byte pv.LastSignState.Height = 0 @@ -531,6 +578,9 @@ func (pv *FilePV) Reset() { // String returns a string representation of the FilePV. func (pv *FilePV) String() string { + pv.mtx.RLock() + defer pv.mtx.RUnlock() + return fmt.Sprintf( "PrivValidator{%v LH:%v, LR:%v, LS:%v}", pv.Key.ProTxHash, @@ -547,6 +597,9 @@ func (pv *FilePV) UpdatePrivateKey( thresholdPublicKey crypto.PubKey, height int64, ) { + pv.mtx.Lock() + defer pv.mtx.Unlock() + pv.Key.PrivateKeys[quorumHash.String()] = crypto.QuorumKeys{ PrivKey: privateKey, PubKey: privateKey.PubKey(), @@ -605,11 +658,9 @@ func (pv *FilePV) signVote( return err } - var privKey crypto.PrivKey - if quorumKeys, ok := pv.Key.PrivateKeys[quorumHash.String()]; ok { - privKey = quorumKeys.PrivKey - } else { - return fmt.Errorf("file private validator could not sign vote for quorum hash %v", quorumHash) + privKey, err := pv.getPrivateKey(context.TODO(), quorumHash) + if err != nil { + return err } sigBlock, err := privKey.SignDigest(blockSignID) @@ -677,11 +728,9 @@ func (pv *FilePV) signProposal( return blockSignID, err } - var privKey crypto.PrivKey - if quorumKeys, ok := pv.Key.PrivateKeys[quorumHash.String()]; ok { - privKey = quorumKeys.PrivKey - } else { - return blockSignID, fmt.Errorf("file private validator could not sign vote for quorum hash %v", quorumHash) + privKey, err := pv.getPrivateKey(context.TODO(), quorumHash) + if err != nil { + return blockSignID, err } // It passed the checks. SignDigest the proposal diff --git a/rpc/client/evidence_test.go b/rpc/client/evidence_test.go index 5e4c8457eb..3ec23bab41 100644 --- a/rpc/client/evidence_test.go +++ b/rpc/client/evidence_test.go @@ -9,6 +9,7 @@ import ( "github.com/dashevo/dashd-go/btcjson" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "github.com/tendermint/tendermint/crypto" "github.com/tendermint/tendermint/libs" @@ -38,7 +39,7 @@ func newEvidence(t *testing.T, val *privval.FilePV, v := vote.ToProto() v2 := vote2.ToProto() - privKey, err := val.Key.PrivateKeyForQuorumHash(quorumHash) + privKey, err := val.GetPrivateKey(context.TODO(), quorumHash) require.NoError(t, err) vote.BlockSignature, err = privKey.SignDigest(types.VoteBlockSignID(chainID, v, quorumType, quorumHash)) diff --git a/test/e2e/Makefile b/test/e2e/Makefile index ba98be0587..f73554ef8c 100644 --- a/test/e2e/Makefile +++ b/test/e2e/Makefile @@ -6,7 +6,7 @@ docker: docker build --tag tenderdash/e2e-node -f docker/Dockerfile ../.. node: - go build -gcflags="all=-N -l" -o build/node -tags badgerdb,boltdb,cleveldb,rocksdb ./node + GOTRACEBACK=crash go build -gcflags="all=-N -l" -o build/node -tags badgerdb,boltdb,cleveldb,rocksdb ./node e2e/app/compile: docker run --rm -it --entrypoint "/src/tenderdash/test/e2e/entrypoint.sh" -w "/src/tenderdash/test/e2e" -v $(ROOT_PATH):/src/tenderdash tenderdash/e2e-node diff --git a/test/e2e/docker/Dockerfile b/test/e2e/docker/Dockerfile index efd0390536..c79a3082cc 100644 --- a/test/e2e/docker/Dockerfile +++ b/test/e2e/docker/Dockerfile @@ -7,7 +7,6 @@ RUN apt-get -qq update -y && apt-get -qq upgrade -y >/dev/null RUN apt-get -qq install -y cmake sudo libgmp-dev libleveldb-dev librocksdb-dev >/dev/null # Set up build directory /src/tenderdash -ENV TENDERMINT_BUILD_OPTIONS badgerdb,boltdb,cleveldb,rocksdb,nostrip WORKDIR /src/tenderdash # Install DLV debugger @@ -27,8 +26,6 @@ RUN make install-bls # Copy Tenderdash source COPY . . -# Build Tenderdash and install into /usr/bin/tenderdash -RUN make build && cp build/tenderdash /usr/bin/tenderdash COPY test/e2e/docker/entrypoint* /usr/bin/ RUN cd test/e2e && make node && cp build/node /usr/bin/app diff --git a/test/e2e/networks/dashcore.toml b/test/e2e/networks/dashcore.toml index b6ff845d79..a2fd98bd4d 100644 --- a/test/e2e/networks/dashcore.toml +++ b/test/e2e/networks/dashcore.toml @@ -5,6 +5,7 @@ initial_height = 1000 initial_state = {initial01 = "a", initial02 = "b", initial03 = "c"} initial_core_chain_locked_height = 3400 queue_type = "priority" +log_level = "debug" [chainlock_updates] 1000 = 3450 diff --git a/test/e2e/node/main.go b/test/e2e/node/main.go index 07a6f70985..65b51d004e 100644 --- a/test/e2e/node/main.go +++ b/test/e2e/node/main.go @@ -99,6 +99,7 @@ func run(configFile string) error { cfg.PrivValServer, tmcfg.BaseConfig.PrivValidatorCoreRPCUsername, tmcfg.BaseConfig.PrivValidatorCoreRPCPassword, + logger.With("module", dashcore.ModuleName), ) if err != nil { return fmt.Errorf("connection to Dash Core RPC failed: %w", err) diff --git a/test/e2e/pkg/mockcoreserver/core_server.go b/test/e2e/pkg/mockcoreserver/core_server.go index 363eb3a8b5..6d1705b1d3 100644 --- a/test/e2e/pkg/mockcoreserver/core_server.go +++ b/test/e2e/pkg/mockcoreserver/core_server.go @@ -7,6 +7,7 @@ import ( "strconv" "github.com/dashevo/dashd-go/btcjson" + "github.com/tendermint/tendermint/crypto" "github.com/tendermint/tendermint/crypto/bls12381" "github.com/tendermint/tendermint/privval" @@ -99,7 +100,7 @@ func (c *MockCoreServer) QuorumSign(_ context.Context, cmd btcjson.QuorumCmd) bt bls12381.ReverseBytes(reqID), bls12381.ReverseBytes(msgHash), ) - privateKey, err := c.FilePV.Key.PrivateKeyForQuorumHash(quorumHash) + privateKey, err := c.FilePV.GetPrivateKey(context.TODO(), quorumHash) if err != nil { panic(err) } diff --git a/test/e2e/pkg/mockcoreserver/methods.go b/test/e2e/pkg/mockcoreserver/methods.go index 2eb1db0fb4..8ce7abb08b 100644 --- a/test/e2e/pkg/mockcoreserver/methods.go +++ b/test/e2e/pkg/mockcoreserver/methods.go @@ -3,7 +3,6 @@ package mockcoreserver import ( "context" "encoding/json" - "fmt" "github.com/dashevo/dashd-go/btcjson" ) @@ -56,7 +55,7 @@ func WithQuorumSignMethod(cs CoreServer, times int) MethodFunc { func WithQuorumVerifyMethod(cs CoreServer, times int) MethodFunc { call := OnMethod(func(ctx context.Context, req btcjson.Request) (interface{}, error) { cmd := btcjson.QuorumCmd{} - fmt.Printf("request is %v\n", req) + // fmt.Printf("request is %v\n", req) err := unmarshalCmd( req, &cmd.SubCmd, @@ -66,7 +65,6 @@ func WithQuorumVerifyMethod(cs CoreServer, times int) MethodFunc { &cmd.QuorumHash, &cmd.Signature, ) - fmt.Printf("cmd is %v\n", cmd) if err != nil { return nil, err } diff --git a/test/e2e/pkg/mockcoreserver/server.go b/test/e2e/pkg/mockcoreserver/server.go index 8fe4c0f95d..824a7187fe 100644 --- a/test/e2e/pkg/mockcoreserver/server.go +++ b/test/e2e/pkg/mockcoreserver/server.go @@ -7,6 +7,7 @@ import ( "fmt" "io/ioutil" "log" + "net" "net/http" "sync" @@ -30,6 +31,7 @@ type HTTPServer struct { guard sync.Mutex handlers map[string]*handler httpSrv *http.Server + readyCh chan struct{} } // On returns a call structure to setup afterwards @@ -58,12 +60,24 @@ func (s *HTTPServer) Start() { Handler: s.mux, } s.guard.Unlock() - err := s.httpSrv.ListenAndServe() + l, err := net.Listen("tcp", s.addr) + if err != nil { + log.Fatalf("unable to listen an address %s: %v", s.addr, err) + } + go func() { + s.readyCh <- struct{}{} + }() + err = s.httpSrv.Serve(l) if err != nil && err != http.ErrServerClosed { log.Fatalf("unexpected stop a server: %v", err) } } +// WaitReady waits a signal in a channel that a server is ready +func (s *HTTPServer) WaitReady() { + <-s.readyCh +} + // Stop stops http server func (s *HTTPServer) Stop(ctx context.Context) { s.guard.Lock() @@ -80,6 +94,7 @@ func NewHTTPServer(addr string) *HTTPServer { srv := &HTTPServer{ mux: mux, addr: addr, + readyCh: make(chan struct{}), handlers: make(map[string]*handler), } return srv diff --git a/test/e2e/pkg/mockcoreserver/server_test.go b/test/e2e/pkg/mockcoreserver/server_test.go index dcb22b2a76..517b3b313e 100644 --- a/test/e2e/pkg/mockcoreserver/server_test.go +++ b/test/e2e/pkg/mockcoreserver/server_test.go @@ -4,29 +4,29 @@ import ( "context" "encoding/hex" "io/ioutil" - "log" "net/http" "net/url" "testing" - "time" - - dashcore "github.com/tendermint/tendermint/dashcore/rpc" "github.com/dashevo/dashd-go/btcjson" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "github.com/tendermint/tendermint/crypto" "github.com/tendermint/tendermint/crypto/bls12381" + dashcore "github.com/tendermint/tendermint/dashcore/rpc" + "github.com/tendermint/tendermint/libs/log" "github.com/tendermint/tendermint/privval" ) func TestServer(t *testing.T) { ctx := context.Background() srv := NewHTTPServer(":9981") + logger := log.TestingLogger() go func() { srv.Start() }() - time.Sleep(10 * time.Microsecond) + srv.WaitReady() testCases := []struct { url string e string @@ -50,14 +50,14 @@ func TestServer(t *testing.T) { srv. On("/test"). Expect(func(req *http.Request) error { - log.Println(req.URL.String()) + logger.Debug("mock core server request received", "url", req.URL.String()) return nil }). Expect(And(BodyShouldBeEmpty(), QueryShouldHave(tc.query))). Once(). Respond(JSONBody(tc.e), JSONContentType()) resp, err := http.Get(tc.url) - assert.NoError(t, err) + require.NoError(t, err) data, err := ioutil.ReadAll(resp.Body) _ = resp.Body.Close() assert.NoError(t, err) @@ -80,7 +80,8 @@ func TestDashCoreSignerPingMethod(t *testing.T) { go func() { srv.Start() }() - dashCoreRPCClient, err := dashcore.NewRPCClient(addr, "root", "root") + logger := log.TestingLogger() + dashCoreRPCClient, err := dashcore.NewRPCClient(addr, "root", "root", logger) assert.NoError(t, err) client, err := privval.NewDashCoreSignerClient(dashCoreRPCClient, btcjson.LLMQType_5_60) assert.NoError(t, err) @@ -126,7 +127,8 @@ func TestGetPubKey(t *testing.T) { srv.Start() }() - dashCoreRPCClient, err := dashcore.NewRPCClient(addr, "root", "root") + logger := log.TestingLogger() + dashCoreRPCClient, err := dashcore.NewRPCClient(addr, "root", "root", logger) assert.NoError(t, err) client, err := privval.NewDashCoreSignerClient(dashCoreRPCClient, btcjson.LLMQType_5_60) assert.NoError(t, err) diff --git a/test/e2e/runner/evidence.go b/test/e2e/runner/evidence.go index 5ec742ea52..02a7af0de3 100644 --- a/test/e2e/runner/evidence.go +++ b/test/e2e/runner/evidence.go @@ -203,8 +203,7 @@ func readPrivKey(keyFilePath string, quorumHash crypto.QuorumHash) (crypto.PrivK if err != nil { return nil, fmt.Errorf("error reading PrivValidator key from %v: %w", keyFilePath, err) } - - return pvKey.PrivateKeyForQuorumHash(quorumHash) + return pvKey.PrivateKeys[quorumHash.String()].PrivKey, nil } func makeRandomBlockID() types.BlockID { diff --git a/tools/tm-signer-harness/internal/test_harness_test.go b/tools/tm-signer-harness/internal/test_harness_test.go index b5d0f33124..349314fa07 100644 --- a/tools/tm-signer-harness/internal/test_harness_test.go +++ b/tools/tm-signer-harness/internal/test_harness_test.go @@ -1,6 +1,7 @@ package internal import ( + "context" "fmt" "io/ioutil" "os" @@ -104,7 +105,7 @@ func TestRemoteSignerTestHarnessSuccessfulRun(t *testing.T) { harnessTest( t, func(th *TestHarness) *privval.SignerServer { - privKey, err := th.fpv.Key.PrivateKeyForQuorumHash(th.quorumHash) + privKey, err := th.fpv.GetPrivateKey(context.TODO(), th.quorumHash) if err != nil { panic(err) } @@ -138,7 +139,7 @@ func TestRemoteSignerProposalSigningFailed(t *testing.T) { harnessTest( t, func(th *TestHarness) *privval.SignerServer { - privKey, err := th.fpv.Key.PrivateKeyForQuorumHash(th.quorumHash) + privKey, err := th.fpv.GetPrivateKey(context.TODO(), th.quorumHash) if err != nil { panic(err) } @@ -157,7 +158,7 @@ func TestRemoteSignerVoteSigningFailed(t *testing.T) { harnessTest( t, func(th *TestHarness) *privval.SignerServer { - privKey, err := th.fpv.Key.PrivateKeyForQuorumHash(th.quorumHash) + privKey, err := th.fpv.GetPrivateKey(context.TODO(), th.quorumHash) if err != nil { panic(err) } diff --git a/tools/tm-signer-harness/main.go b/tools/tm-signer-harness/main.go index 06b9d21f05..28d81d97f8 100644 --- a/tools/tm-signer-harness/main.go +++ b/tools/tm-signer-harness/main.go @@ -150,7 +150,7 @@ func extractKey(tmhome, outputPath string) { os.Exit(1) } quorumHash, _ := fpv.GetFirstQuorumHash(context.Background()) - privKey, _ := fpv.Key.PrivateKeyForQuorumHash(quorumHash) + privKey, _ := fpv.GetPrivateKey(context.TODO(), quorumHash) pkb := privKey.Bytes() if err := ioutil.WriteFile(internal.ExpandPath(outputPath), pkb[:32], 0600); err != nil { logger.Info("Failed to write private key", "output", outputPath, "err", err) diff --git a/types/block.go b/types/block.go index 5a435f7336..e502a64d14 100644 --- a/types/block.go +++ b/types/block.go @@ -845,12 +845,14 @@ func (commit *Commit) StringIndented(indent string) string { // MarshalZerologObject formats this object for logging purposes func (commit *Commit) MarshalZerologObject(e *zerolog.Event) { - e.Int64("height", commit.Height) - e.Int32("round", commit.Round) - e.Str("BlockID.Hash", commit.BlockID.Hash.String()) - e.Str("StateID", commit.StateID.String()) - e.Str("BlockSignature", hex.EncodeToString(commit.ThresholdBlockSignature)) - e.Str("StateSignature", hex.EncodeToString(commit.ThresholdStateSignature)) + if commit != nil { + e.Int64("height", commit.Height) + e.Int32("round", commit.Round) + e.Str("BlockID.Hash", commit.BlockID.Hash.String()) + e.Str("StateID", commit.StateID.String()) + e.Str("BlockSignature", hex.EncodeToString(commit.ThresholdBlockSignature)) + e.Str("StateSignature", hex.EncodeToString(commit.ThresholdStateSignature)) + } } // ToProto converts Commit to protobuf diff --git a/types/node_info.go b/types/node_info.go index c28722966e..7e1616e6ce 100644 --- a/types/node_info.go +++ b/types/node_info.go @@ -66,7 +66,7 @@ func (info NodeInfo) ID() NodeID { } func (info NodeInfo) GetProTxHash() crypto.ProTxHash { - return info.ProTxHash + return info.ProTxHash.Copy() } // Validate checks the self-reported DefaultNodeInfo is safe. @@ -203,7 +203,7 @@ func (info NodeInfo) Copy() NodeInfo { Channels: info.Channels, Moniker: info.Moniker, Other: info.Other, - ProTxHash: info.ProTxHash, + ProTxHash: info.ProTxHash.Copy(), } } @@ -222,7 +222,7 @@ func (info NodeInfo) ToProto() *tmp2p.NodeInfo { dni.Version = info.Version dni.Channels = info.Channels dni.Moniker = info.Moniker - dni.ProTxHash = info.ProTxHash + dni.ProTxHash = info.ProTxHash.Copy() dni.Other = tmp2p.NodeInfoOther{ TxIndex: info.Other.TxIndex, RPCAddress: info.Other.RPCAddress, diff --git a/types/priv_validator.go b/types/priv_validator.go index c87cbe192f..26b797f247 100644 --- a/types/priv_validator.go +++ b/types/priv_validator.go @@ -18,7 +18,7 @@ import ( tmproto "github.com/tendermint/tendermint/proto/tendermint/types" ) -// PrivValidatorType defines the implemtation types. +// PrivValidatorType defines the implementation types. type PrivValidatorType uint8 const ( @@ -189,6 +189,9 @@ func (pv *MockPV) GetPubKey(ctx context.Context, quorumHash crypto.QuorumHash) ( // GetProTxHash implements PrivValidator. func (pv *MockPV) GetProTxHash(ctx context.Context) (crypto.ProTxHash, error) { + pv.mtx.RLock() + defer pv.mtx.RUnlock() + if len(pv.ProTxHash) != crypto.ProTxHashSize { return nil, fmt.Errorf("mock proTxHash is invalid size") } diff --git a/types/proposal.go b/types/proposal.go index 2a50b2b962..b35532970b 100644 --- a/types/proposal.go +++ b/types/proposal.go @@ -8,6 +8,8 @@ import ( "time" "github.com/dashevo/dashd-go/btcjson" + "github.com/rs/zerolog" + "github.com/tendermint/tendermint/crypto" "github.com/tendermint/tendermint/crypto/bls12381" "github.com/tendermint/tendermint/internal/libs/protoio" @@ -112,6 +114,20 @@ func (p *Proposal) String() string { CanonicalTime(p.Timestamp)) } +// MarshalZerologObject implements zerolog.LogObjectMarshaler +func (p *Proposal) MarshalZerologObject(e *zerolog.Event) { + if p == nil { + return + } + + e.Int64("height", p.Height) + e.Int32("round", p.Round) + e.Str("block_id", p.BlockID.String()) + e.Int32("pol_round", p.POLRound) + e.Str("signature", tmbytes.HexBytes(p.Signature).String()) + e.Str("timestamp", CanonicalTime(p.Timestamp)) +} + // ProposalBlockSignBytes returns the proto-encoding of the canonicalized Proposal, // for signing. Panics if the marshaling fails. // diff --git a/types/proposal_test.go b/types/proposal_test.go index b6d67c49d5..a349769b1b 100644 --- a/types/proposal_test.go +++ b/types/proposal_test.go @@ -2,6 +2,7 @@ package types import ( "context" + "encoding/hex" "math" "testing" "time" @@ -14,8 +15,10 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "github.com/tendermint/tendermint/crypto/bls12381" "github.com/tendermint/tendermint/crypto/tmhash" "github.com/tendermint/tendermint/internal/libs/protoio" + tmbytes "github.com/tendermint/tendermint/libs/bytes" tmrand "github.com/tendermint/tendermint/libs/rand" tmproto "github.com/tendermint/tendermint/proto/tendermint/types" ) @@ -101,6 +104,52 @@ func TestProposalVerifySignature(t *testing.T) { require.True(t, valid) } +func decodeHex(s string) tmbytes.HexBytes { + decoded, err := hex.DecodeString(s) + if err != nil { + panic("cannot decode hex: " + err.Error()) + } + return decoded +} + +func TestProposalVerifySignatureHardcoded(t *testing.T) { + testCases := []struct { + quorumHash tmbytes.HexBytes + publicKey tmbytes.HexBytes + signID tmbytes.HexBytes + signature tmbytes.HexBytes + valid bool + }{ + { + quorumHash: decodeHex("3F8093C7DAD7FAD40651B186AE11B433101DB0318B4180E4E15EBD02EC035DB2"), + publicKey: decodeHex("972d08a96ce38e7f2de4d5186d84c7a8236854f396500fb1de963fc79464c0968150" + + "8c0b8a86177d219514e6c55bd223"), + signID: decodeHex("B907E1B1D6CF171B310DBC6120F3104379DE3CD3C5B5F1E5FAD9C3F7E4D5D3C0"), + signature: decodeHex("8C4CEC8DE50D990831009B62704AC6A9E76837DFD0D492D195F2417AEFFEEC299ADE" + + "5F8FCB96ADBCD539ED7A67BD3C0513F42FC3C6F82D5B8854D859539EBABF6D371DE98EBAC3DEBDFDD5226" + + "9478F85968B27EFF14B2873736271D7808DEF35"), + valid: true, + }, + { + publicKey: decodeHex("0eb0efcf0090d407a1c4339c0713a3be30852bc8274bc217d0ba59d12f5796af1be06" + + "f82b59cf3f3f598d542e2816148"), + signID: decodeHex("03A85F77715E4314B861F13405ACB2B0A9CE2A4174DD26E7BCD09F38513166D4"), + signature: decodeHex("941BA28D7AFE968FF0570835498623F3B2C89A695F23035468E06AB622F5CD72F53C9" + + "8951F9022C89C5A56E3BD023BF00BF04E1A6D0101179FE0B27E7594E4EC4CC56C1A8D6BBFD4E10E01752B2" + + "7BF2B6FCC24FF07854C6DFE6F27B662C4128B"), + valid: true, + }, + } + + for _, tc := range testCases { + t.Run("", func(t *testing.T) { + pubKey := bls12381.PubKey(tc.publicKey) + valid := pubKey.VerifySignatureDigest(tc.signID, tc.signature) + assert.Equal(t, tc.valid, valid, "signature validation result") + }) + } +} + func BenchmarkProposalWriteSignBytes(b *testing.B) { for i := 0; i < b.N; i++ { ProposalBlockSignBytes("test_chain_id", pbp) diff --git a/types/validator.go b/types/validator.go index 851dff40ae..9a5cfe1505 100644 --- a/types/validator.go +++ b/types/validator.go @@ -6,6 +6,8 @@ import ( "fmt" "strings" + "github.com/rs/zerolog" + "github.com/tendermint/tendermint/crypto" "github.com/tendermint/tendermint/crypto/bls12381" ce "github.com/tendermint/tendermint/crypto/encoding" @@ -46,7 +48,7 @@ func NewValidatorDefaultVotingPower(pubKey crypto.PubKey, proTxHash []byte) *Val } // NewValidator returns a new validator with the given pubkey and voting power. -func NewValidator(pubKey crypto.PubKey, votingPower int64, proTxHash []byte, address string) *Validator { +func NewValidator(pubKey crypto.PubKey, votingPower int64, proTxHash ProTxHash, address string) *Validator { var ( addr ValidatorAddress err error @@ -163,6 +165,23 @@ func (v *Validator) ShortStringBasic() string { v.PubKey) } +// MarshalZerologObject implements zerolog.LogObjectMarshaler +func (v *Validator) MarshalZerologObject(e *zerolog.Event) { + e.Str("protxhash", v.ProTxHash.ShortString()) + e.Int64("voting_power", v.VotingPower) + e.Int64("proposer_priority", v.ProposerPriority) + e.Str("address", v.NodeAddress.String()) + + if v.PubKey != nil { + pubkey := v.PubKey.HexString() + if len(pubkey) > 8 { + pubkey = pubkey[:8] + } + e.Str("pub_key", pubkey) + e.Str("pub_key_type", v.PubKey.Type()) + } +} + // ValidatorListString returns a prettified validator list for logging purposes. func ValidatorListString(vals []*Validator) string { chunks := make([]string, len(vals)) diff --git a/types/validator_address.go b/types/validator_address.go index 0ac58f1910..4fab1317cf 100644 --- a/types/validator_address.go +++ b/types/validator_address.go @@ -31,6 +31,8 @@ var ( ErrNoHostname = errors.New("no hostname") // ErrNoPort is returned when no valid port is set for the validator address ErrNoPort = errors.New("no port") + + errEmptyAddress = errors.New("address is empty") ) // ParseValidatorAddress parses provided address, which should be in `proto://nodeID@host:port` form. @@ -51,6 +53,9 @@ func stringHasScheme(str string) bool { // ParseValidatorAddressWithoutValidation parses a node address URL into a ValidatorAddress, normalizing it. // It does NOT validate parsed address func parseValidatorAddressString(urlString string) (ValidatorAddress, error) { + if urlString == "" { + return ValidatorAddress{}, nil + } // url.Parse requires a scheme, so if it fails to parse a scheme-less URL // we try to apply a default scheme. url, err := url.Parse(urlString) @@ -91,7 +96,9 @@ func parseValidatorAddressString(urlString string) (ValidatorAddress, error) { // Validate ensures the validator address is correct. // It ignores missing node IDs. func (va ValidatorAddress) Validate() error { - + if va.Zero() { + return errEmptyAddress + } if va.Hostname == "" { return ErrNoHostname } diff --git a/types/validator_address_test.go b/types/validator_address_test.go index c6384a9baf..ea794715dc 100644 --- a/types/validator_address_test.go +++ b/types/validator_address_test.go @@ -5,6 +5,8 @@ import ( "testing" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/tendermint/tendermint/libs/rand" ) @@ -70,13 +72,12 @@ func TestValidatorAddress_HostPortProto(t *testing.T) { nodeID := randNodeID() tests := []struct { - uri string - wantHost string - wantPort uint16 - wantProto string - wantNodeID string - wantValid bool - wantParseErr bool + uri string + wantHost string + wantPort uint16 + wantProto string + wantNodeID string + wantError bool }{ { uri: "tcp://" + nodeID + "@fqdn.address.com:1234", @@ -84,45 +85,40 @@ func TestValidatorAddress_HostPortProto(t *testing.T) { wantPort: 1234, wantProto: "tcp", wantNodeID: nodeID, - wantValid: true, }, { - uri: "tcp://test@fqdn.address.com:1234", - wantHost: "fqdn.address.com", - wantPort: 1234, - wantProto: "tcp", - wantValid: false, - wantParseErr: true, + uri: "tcp://test@fqdn.address.com:1234", + wantHost: "fqdn.address.com", + wantPort: 1234, + wantProto: "tcp", + wantError: true, }, { uri: "tcp://127.0.0.1:22", wantHost: "127.0.0.1", wantPort: 22, wantProto: "tcp", - wantValid: true, }, { - uri: "", - wantValid: false, - wantParseErr: true, + uri: "", + wantError: true, }, { - uri: "tcp://127.0.0.1", - wantHost: "127.0.0.1", - wantPort: 0, - wantProto: "tcp", - wantValid: false, - wantParseErr: true, + uri: "tcp://127.0.0.1", + wantHost: "127.0.0.1", + wantPort: 0, + wantProto: "tcp", + wantError: true, }, } for _, tt := range tests { t.Run(tt.uri, func(t *testing.T) { va, err := ParseValidatorAddress(tt.uri) - if tt.wantParseErr { + if tt.wantError { assert.Error(t, err) } else { - assert.NoError(t, err) + require.NoError(t, err) assert.EqualValues(t, tt.wantHost, va.Hostname) assert.EqualValues(t, tt.wantPort, va.Port) @@ -131,10 +127,10 @@ func TestValidatorAddress_HostPortProto(t *testing.T) { assert.EqualValues(t, tt.wantNodeID, nodeID) } err = va.Validate() - if tt.wantValid { - assert.NoError(t, err) - } else { + if tt.wantError { assert.Error(t, err) + } else { + require.NoError(t, err) } } }) diff --git a/types/validator_set.go b/types/validator_set.go index cf23c53183..dfff92cdcb 100644 --- a/types/validator_set.go +++ b/types/validator_set.go @@ -10,6 +10,8 @@ import ( "strings" "github.com/dashevo/dashd-go/btcjson" + "github.com/rs/zerolog" + abci "github.com/tendermint/tendermint/abci/types" "github.com/tendermint/tendermint/crypto" "github.com/tendermint/tendermint/crypto/bls12381" @@ -1090,6 +1092,17 @@ func (vals *ValidatorSet) StringIndentedBasic(indent string) string { } +// MarshalZerologObject implements zerolog.LogObjectMarshaler +func (vals *ValidatorSet) MarshalZerologObject(e *zerolog.Event) { + e.Str("proposer", vals.GetProposer().ProTxHash.ShortString()) + e.Str("quorum_hash", vals.QuorumHash.ShortString()) + validators := zerolog.Arr() + for _, item := range vals.Validators { + validators.Object(item) + } + e.Array("validators", validators) +} + //------------------------------------- // ValidatorsByVotingPower implements sort.Interface for []*Validator based on diff --git a/types/validator_set_test.go b/types/validator_set_test.go index 8e53c11328..a4621a9f7f 100644 --- a/types/validator_set_test.go +++ b/types/validator_set_test.go @@ -16,6 +16,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "github.com/tendermint/tendermint/crypto" "github.com/tendermint/tendermint/crypto/bls12381" tmmath "github.com/tendermint/tendermint/libs/math" diff --git a/types/vote.go b/types/vote.go index 11cb484edb..2c6f7ac5d9 100644 --- a/types/vote.go +++ b/types/vote.go @@ -77,8 +77,8 @@ type Vote struct { BlockID BlockID `json:"block_id"` // zero if vote is nil. ValidatorProTxHash ProTxHash `json:"validator_pro_tx_hash"` ValidatorIndex int32 `json:"validator_index"` - BlockSignature []byte `json:"block_signature"` - StateSignature []byte `json:"state_signature"` + BlockSignature tmbytes.HexBytes `json:"block_signature"` + StateSignature tmbytes.HexBytes `json:"state_signature"` } // VoteBlockSignBytes returns the proto-encoding of the canonicalized Vote, for @@ -216,7 +216,7 @@ func (vote *Vote) Verify( if !pubKey.VerifySignatureDigest(signID, vote.BlockSignature) { return nil, nil, fmt.Errorf( "%s proTxHash %s pubKey %v vote %v sign bytes %s block signature %s", ErrVoteInvalidBlockSignature.Error(), - proTxHash, pubKey, vote, hex.EncodeToString(voteBlockSignBytes), hex.EncodeToString(vote.BlockSignature)) + proTxHash.ShortString(), pubKey, vote, hex.EncodeToString(voteBlockSignBytes), hex.EncodeToString(vote.BlockSignature)) } stateSignID := []byte(nil) @@ -323,6 +323,13 @@ func (vote *Vote) MarshalZerologObject(e *zerolog.Event) { e.Str("vote", vote.String()) e.Int64("height", vote.Height) e.Int32("round", vote.Round) + e.Str("type", vote.Type.String()) + e.Str("block_key", vote.BlockID.String()) + e.Str("block_signature", vote.BlockSignature.ShortString()) + e.Str("state_signature", vote.StateSignature.ShortString()) + e.Str("val_proTxHash", vote.ValidatorProTxHash.ShortString()) + e.Int32("val_index", vote.ValidatorIndex) + e.Bool("nil", vote.BlockID.IsZero()) } // FromProto converts a proto generetad type to a handwritten type diff --git a/types/vote_set_test.go b/types/vote_set_test.go index 608cc370a7..e141026301 100644 --- a/types/vote_set_test.go +++ b/types/vote_set_test.go @@ -677,7 +677,7 @@ func randVoteSetWithLLMQType( } // Convenience: Return new vote with different validator address/index -func withValidator(vote *Vote, proTxHash []byte, idx int32) *Vote { +func withValidator(vote *Vote, proTxHash ProTxHash, idx int32) *Vote { vote = vote.Copy() vote.ValidatorProTxHash = proTxHash vote.ValidatorIndex = idx diff --git a/types/vote_test.go b/types/vote_test.go index 30b3df5264..b45b017f73 100644 --- a/types/vote_test.go +++ b/types/vote_test.go @@ -10,6 +10,7 @@ import ( "github.com/gogo/protobuf/proto" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "github.com/tendermint/tendermint/crypto/bls12381" "github.com/tendermint/tendermint/crypto"