diff --git a/dash/quorum/validator_conn_executor.go b/dash/quorum/validator_conn_executor.go index 082848361a..e4bda0a6b8 100644 --- a/dash/quorum/validator_conn_executor.go +++ b/dash/quorum/validator_conn_executor.go @@ -264,7 +264,6 @@ func (vc *ValidatorConnExecutor) resolveNodeID(va *types.ValidatorAddress) error ) allErrors = multierror.Append(allErrors, fmt.Errorf(method+" error: %w", err)) } - return allErrors } diff --git a/internal/consensus/peer_state.go b/internal/consensus/peer_state.go index d1d8f48d50..496d027c4b 100644 --- a/internal/consensus/peer_state.go +++ b/internal/consensus/peer_state.go @@ -468,6 +468,7 @@ func (ps *PeerState) ApplyNewRoundStepMessage(msg *NewRoundStepMessage) { // we'll update the BitArray capacity later ps.PRS.Prevotes = nil ps.PRS.Precommits = nil + ps.PRS.HasCommit = false } if psHeight == msg.Height && psRound != msg.Round && msg.Round == psCatchupCommitRound { @@ -479,7 +480,7 @@ func (ps *PeerState) ApplyNewRoundStepMessage(msg *NewRoundStepMessage) { } if psHeight != msg.Height { - // shift Precommits to LastCommit + // Shift Precommits to LastPrecommits. if psHeight+1 == msg.Height && psRound == msg.LastCommitRound { ps.PRS.LastCommitRound = msg.LastCommitRound ps.PRS.LastPrecommits = ps.PRS.Precommits.Copy() @@ -540,6 +541,16 @@ func (ps *PeerState) ApplyHasVoteMessage(msg *HasVoteMessage) { ps.setHasVote(msg.Height, msg.Round, msg.Type, msg.Index) } +// ApplyHasCommitMessage updates the peer state for the new commit. +func (ps *PeerState) ApplyHasCommitMessage(msg *HasCommitMessage) { + ps.mtx.Lock() + defer ps.mtx.Unlock() + if ps.PRS.Height != msg.Height { + return + } + ps.setHasCommit(msg.Height, msg.Round) +} + // ApplyVoteSetBitsMessage updates the peer state for the bit-array of votes // it claims to have for the corresponding BlockID. // `ourVotes` is a BitArray of votes we have for msg.BlockID diff --git a/internal/consensus/reactor.go b/internal/consensus/reactor.go index 67c284b225..c39e218857 100644 --- a/internal/consensus/reactor.go +++ b/internal/consensus/reactor.go @@ -282,7 +282,7 @@ func ReactorMetrics(metrics *Metrics) ReactorOption { func (r *Reactor) SwitchToConsensus(state sm.State, skipWAL bool) { r.Logger.Info("switching to consensus") - // we have no votes, so reconstruct LastCommit from SeenCommit + // We have no votes, so reconstruct LastPrecommits from SeenCommit. if state.LastBlockHeight > 0 { r.state.reconstructLastCommit(state) } @@ -386,6 +386,17 @@ func (r *Reactor) broadcastHasVoteMessage(vote *types.Vote) { } } +// Broadcasts HasCommitMessage to peers that care. +func (r *Reactor) broadcastHasCommitMessage(commit *types.Commit) { + r.stateCh.Out <- p2p.Envelope{ + Broadcast: true, + Message: &tmcons.HasCommit{ + Height: commit.Height, + Round: commit.Round, + }, + } +} + // subscribeToBroadcastEvents subscribes for new round steps and votes using the // internal pubsub defined in the consensus state to broadcast them to peers // upon receiving. @@ -426,6 +437,13 @@ func (r *Reactor) subscribeToBroadcastEvents() { if err != nil { r.Logger.Error("failed to add listener for events", "err", err) } + + if err := r.state.evsw.AddListenerForEvent(listenerIDConsensus, types.EventCommitValue, + func(data tmevents.EventData) { + r.broadcastHasCommitMessage(data.(*types.Commit)) + }); err != nil { + r.Logger.Error("Error adding listener for events", "err", err) + } } func (r *Reactor) unsubscribeFromBroadcastEvents() { @@ -538,8 +556,16 @@ OUTER_LOOP: rs := r.state.GetRoundState() prs := ps.GetRoundState() + isValidator := rs.Validators.HasProTxHash(ps.ProTxHash) + // Send proposal Block parts? - if rs.ProposalBlockParts.HasHeader(prs.ProposalBlockPartSetHeader) { + if (isValidator && rs.ProposalBlockParts.HasHeader(prs.ProposalBlockPartSetHeader)) || + (prs.HasCommit && rs.ProposalBlockParts != nil) { + if !isValidator && prs.HasCommit && prs.ProposalBlockParts == nil { + // We can assume if they have the commit then they should have the same part set header + ps.PRS.ProposalBlockPartSetHeader = rs.ProposalBlockParts.Header() + ps.PRS.ProposalBlockParts = bits.NewBitArray(int(rs.ProposalBlockParts.Header().Total)) + } if index, ok := rs.ProposalBlockParts.BitArray().Sub(prs.ProposalBlockParts.Copy()).PickRandom(); ok { part := rs.ProposalBlockParts.GetPart(index) partProto, err := part.ToProto() @@ -604,7 +630,7 @@ OUTER_LOOP: // Now consider sending other things, like the Proposal itself. // Send Proposal && ProposalPOL BitArray? - if rs.Proposal != nil && !prs.Proposal { + if rs.Proposal != nil && !prs.Proposal && isValidator { // Proposal: share the proposal metadata with peer. { propProto := rs.Proposal.ToProto() @@ -654,11 +680,13 @@ OUTER_LOOP: // there is a vote to send and false otherwise. func (r *Reactor) pickSendVote(ps *PeerState, votes types.VoteSetReader) bool { if vote, ok := ps.PickVoteToSend(votes); ok { + psJSON, _ := ps.ToJSON() ps.logger.Debug( "Sending vote message", - "ps", ps, + "ps", psJSON, "peer_id", ps.peerID, "vote", vote, + "peer_proTxHash", ps.ProTxHash.ShortString(), "val_proTxHash", vote.ValidatorProTxHash.ShortString(), "height", vote.Height, "round", vote.Round, @@ -695,10 +723,10 @@ func (r *Reactor) sendCommit(ps *PeerState, commit *types.Commit) bool { func (r *Reactor) gossipVotesForHeight(rs *cstypes.RoundState, prs *cstypes.PeerRoundState, ps *PeerState) bool { logger := r.Logger.With("height", prs.Height).With("peer", ps.peerID) - // if there are lastCommits to send... + // If there are lastPrecommits to send... if prs.Step == cstypes.RoundStepNewHeight { if r.pickSendVote(ps, rs.LastPrecommits) { - logger.Debug("picked rs.LastCommit to send") + logger.Debug("picked previous precommit vote to send") return true } } @@ -750,7 +778,7 @@ func (r *Reactor) gossipVotesForHeight(rs *cstypes.RoundState, prs *cstypes.Peer return false } -func (r *Reactor) gossipVotesRoutine(ps *PeerState) { +func (r *Reactor) gossipVotesAndCommitRoutine(ps *PeerState) { logger := r.Logger.With("peer", ps.peerID) defer ps.broadcastWG.Done() @@ -776,6 +804,9 @@ OUTER_LOOP: rs := r.state.GetRoundState() prs := ps.GetRoundState() + isValidator := rs.Validators.HasProTxHash(ps.ProTxHash) + wasValidator := rs.LastValidators.HasProTxHash(ps.ProTxHash) + switch logThrottle { case 1: // first sleep logThrottle = 2 @@ -785,14 +816,25 @@ OUTER_LOOP: // if height matches, then send LastCommit, Prevotes, and Precommits if rs.Height == prs.Height { - if r.gossipVotesForHeight(rs, prs, ps) { - continue OUTER_LOOP + if !wasValidator { + // If there are lastCommits to send... + if prs.Step == cstypes.RoundStepNewHeight && prs.Height+1 == rs.Height && !prs.HasCommit { + if r.sendCommit(ps, rs.LastCommit) { + logger.Info("Sending LastCommit to non-validator node") + continue OUTER_LOOP + } + } + } + if isValidator { + if r.gossipVotesForHeight(rs, prs, ps) { + continue OUTER_LOOP + } } } // special catchup logic -- if peer is lagging by height 1, send LastCommit - if prs.Height != 0 && rs.Height == prs.Height+1 { - if r.sendCommit(ps, rs.LastCommit) { + if prs.Height != 0 && rs.Height == prs.Height+1 && wasValidator { + if r.pickSendVote(ps, rs.LastPrecommits) { logger.Debug("picked rs.LastCommit to send", "height", prs.Height) continue OUTER_LOOP } @@ -988,7 +1030,7 @@ func (r *Reactor) processPeerUpdate(peerUpdate p2p.PeerUpdate) { // start goroutines for this peer go r.gossipDataRoutine(ps) - go r.gossipVotesRoutine(ps) + go r.gossipVotesAndCommitRoutine(ps) go r.queryMaj23Routine(ps) // Send our state to the peer. If we're block-syncing, broadcast a @@ -1045,6 +1087,9 @@ func (r *Reactor) handleStateMessage(envelope p2p.Envelope, msgI Message) error case *tmcons.NewValidBlock: ps.ApplyNewValidBlockMessage(msgI.(*NewValidBlockMessage)) + case *tmcons.HasCommit: + ps.ApplyHasCommitMessage(msgI.(*HasCommitMessage)) + case *tmcons.HasVote: ps.ApplyHasVoteMessage(msgI.(*HasVoteMessage)) diff --git a/internal/consensus/reactor_test.go b/internal/consensus/reactor_test.go index 9262201493..0f77572369 100644 --- a/internal/consensus/reactor_test.go +++ b/internal/consensus/reactor_test.go @@ -67,7 +67,6 @@ func setup(t *testing.T, numNodes int, states []*State, size int) *reactorTestSu for i, state := range states { privProTxHashes[i] = state.privValidatorProTxHash } - rts := &reactorTestSuite{ network: p2ptest.MakeNetwork(t, p2ptest.NetworkOptions{NumNodes: numNodes, ProTxHashes: privProTxHashes}), states: make(map[types.NodeID]*State), diff --git a/internal/consensus/state.go b/internal/consensus/state.go index 0b04f2c6f9..f0ab538b3c 100644 --- a/internal/consensus/state.go +++ b/internal/consensus/state.go @@ -2499,7 +2499,7 @@ func (cs *State) addVote(vote *types.Vote, peerID types.NodeID) (added bool, err switch vote.Type { case tmproto.PrevoteType: prevotes := cs.Votes.Prevotes(vote.Round) - cs.Logger.Debug("added vote to prevote", "vote", vote, "prevotes", prevotes.StringShort()) + cs.Logger.Debug("added vote to prevote", "vote", vote, "prevotes", prevotes.LogString()) // If +2/3 prevotes for a block or nil for *any* round: if blockID, ok := prevotes.TwoThirdsMajority(); ok { @@ -2584,6 +2584,7 @@ func (cs *State) addVote(vote *types.Vote, peerID types.NodeID) (added bool, err "height", vote.Height, "round", vote.Round, "validator", vote.ValidatorProTxHash.String(), + "val_index", vote.ValidatorIndex, "data", precommits.LogString()) blockID, ok := precommits.TwoThirdsMajority() @@ -2627,7 +2628,6 @@ func (cs *State) signVote( if cs.privValidatorProTxHash == nil { return nil, errProTxHashIsNotSet } - proTxHash := cs.privValidatorProTxHash valIdx, _ := cs.Validators.GetByProTxHash(proTxHash) @@ -2684,6 +2684,7 @@ func (cs *State) signAddVote(msgType tmproto.SignedMsgType, hash []byte, header // If the node not in the validator set, do nothing. if !cs.Validators.HasProTxHash(cs.privValidatorProTxHash) { + cs.Logger.Debug("do nothing, node is not a part of validator set") return nil } diff --git a/proto/tendermint/consensus/message.go b/proto/tendermint/consensus/message.go index c2b65523e0..7c3f4ef992 100644 --- a/proto/tendermint/consensus/message.go +++ b/proto/tendermint/consensus/message.go @@ -30,6 +30,9 @@ func (m *Message) Wrap(pb proto.Message) error { case *HasVote: m.Sum = &Message_HasVote{HasVote: msg} + case *HasCommit: + m.Sum = &Message_HasCommit{HasCommit: msg} + case *VoteSetMaj23: m.Sum = &Message_VoteSetMaj23{VoteSetMaj23: msg} @@ -77,6 +80,9 @@ func (m *Message) Unwrap() (proto.Message, error) { case *Message_VoteSetBits: return m.GetVoteSetBits(), nil + case *Message_HasCommit: + return m.GetHasCommit(), nil + case *Message_Commit: return m.GetCommit(), nil diff --git a/test/e2e/generator/generate.go b/test/e2e/generator/generate.go index a4380b6d96..b1ebff1f74 100644 --- a/test/e2e/generator/generate.go +++ b/test/e2e/generator/generate.go @@ -145,7 +145,6 @@ func generateTestnet(r *rand.Rand, opt map[string]interface{}) (e2e.Manifest, er InitialState: opt["initialState"].(map[string]string), Validators: map[string]int64{}, ValidatorUpdates: map[string]map[string]int64{}, - ChainLockUpdates: map[string]int64{}, Nodes: map[string]*e2e.ManifestNode{}, KeyType: keyType.Choose(r).(string), Evidence: evidence.Choose(r).(int), diff --git a/types/node_info.go b/types/node_info.go index 4af4115e7e..c28722966e 100644 --- a/types/node_info.go +++ b/types/node_info.go @@ -203,6 +203,7 @@ func (info NodeInfo) Copy() NodeInfo { Channels: info.Channels, Moniker: info.Moniker, Other: info.Other, + ProTxHash: info.ProTxHash, } } @@ -250,6 +251,7 @@ func NodeInfoFromProto(pb *tmp2p.NodeInfo) (NodeInfo, error) { TxIndex: pb.Other.TxIndex, RPCAddress: pb.Other.RPCAddress, }, + ProTxHash: pb.ProTxHash, } return dni, nil }