Skip to content

Commit

Permalink
fix: backport e2e tests (#248)
Browse files Browse the repository at this point in the history
  • Loading branch information
shotonoff authored Mar 23, 2022
1 parent 39f9571 commit 8ccea22
Show file tree
Hide file tree
Showing 8 changed files with 80 additions and 18 deletions.
1 change: 0 additions & 1 deletion dash/quorum/validator_conn_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,6 @@ func (vc *ValidatorConnExecutor) resolveNodeID(va *types.ValidatorAddress) error
)
allErrors = multierror.Append(allErrors, fmt.Errorf(method+" error: %w", err))
}

return allErrors
}

Expand Down
13 changes: 12 additions & 1 deletion internal/consensus/peer_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -468,6 +468,7 @@ func (ps *PeerState) ApplyNewRoundStepMessage(msg *NewRoundStepMessage) {
// we'll update the BitArray capacity later
ps.PRS.Prevotes = nil
ps.PRS.Precommits = nil
ps.PRS.HasCommit = false
}

if psHeight == msg.Height && psRound != msg.Round && msg.Round == psCatchupCommitRound {
Expand All @@ -479,7 +480,7 @@ func (ps *PeerState) ApplyNewRoundStepMessage(msg *NewRoundStepMessage) {
}

if psHeight != msg.Height {
// shift Precommits to LastCommit
// Shift Precommits to LastPrecommits.
if psHeight+1 == msg.Height && psRound == msg.LastCommitRound {
ps.PRS.LastCommitRound = msg.LastCommitRound
ps.PRS.LastPrecommits = ps.PRS.Precommits.Copy()
Expand Down Expand Up @@ -540,6 +541,16 @@ func (ps *PeerState) ApplyHasVoteMessage(msg *HasVoteMessage) {
ps.setHasVote(msg.Height, msg.Round, msg.Type, msg.Index)
}

// ApplyHasCommitMessage updates the peer state for the new commit.
func (ps *PeerState) ApplyHasCommitMessage(msg *HasCommitMessage) {
ps.mtx.Lock()
defer ps.mtx.Unlock()
if ps.PRS.Height != msg.Height {
return
}
ps.setHasCommit(msg.Height, msg.Round)
}

// ApplyVoteSetBitsMessage updates the peer state for the bit-array of votes
// it claims to have for the corresponding BlockID.
// `ourVotes` is a BitArray of votes we have for msg.BlockID
Expand Down
69 changes: 57 additions & 12 deletions internal/consensus/reactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,7 @@ func ReactorMetrics(metrics *Metrics) ReactorOption {
func (r *Reactor) SwitchToConsensus(state sm.State, skipWAL bool) {
r.Logger.Info("switching to consensus")

// we have no votes, so reconstruct LastCommit from SeenCommit
// We have no votes, so reconstruct LastPrecommits from SeenCommit.
if state.LastBlockHeight > 0 {
r.state.reconstructLastCommit(state)
}
Expand Down Expand Up @@ -386,6 +386,17 @@ func (r *Reactor) broadcastHasVoteMessage(vote *types.Vote) {
}
}

// Broadcasts HasCommitMessage to peers that care.
func (r *Reactor) broadcastHasCommitMessage(commit *types.Commit) {
r.stateCh.Out <- p2p.Envelope{
Broadcast: true,
Message: &tmcons.HasCommit{
Height: commit.Height,
Round: commit.Round,
},
}
}

// subscribeToBroadcastEvents subscribes for new round steps and votes using the
// internal pubsub defined in the consensus state to broadcast them to peers
// upon receiving.
Expand Down Expand Up @@ -426,6 +437,13 @@ func (r *Reactor) subscribeToBroadcastEvents() {
if err != nil {
r.Logger.Error("failed to add listener for events", "err", err)
}

if err := r.state.evsw.AddListenerForEvent(listenerIDConsensus, types.EventCommitValue,
func(data tmevents.EventData) {
r.broadcastHasCommitMessage(data.(*types.Commit))
}); err != nil {
r.Logger.Error("Error adding listener for events", "err", err)
}
}

func (r *Reactor) unsubscribeFromBroadcastEvents() {
Expand Down Expand Up @@ -538,8 +556,16 @@ OUTER_LOOP:
rs := r.state.GetRoundState()
prs := ps.GetRoundState()

isValidator := rs.Validators.HasProTxHash(ps.ProTxHash)

// Send proposal Block parts?
if rs.ProposalBlockParts.HasHeader(prs.ProposalBlockPartSetHeader) {
if (isValidator && rs.ProposalBlockParts.HasHeader(prs.ProposalBlockPartSetHeader)) ||
(prs.HasCommit && rs.ProposalBlockParts != nil) {
if !isValidator && prs.HasCommit && prs.ProposalBlockParts == nil {
// We can assume if they have the commit then they should have the same part set header
ps.PRS.ProposalBlockPartSetHeader = rs.ProposalBlockParts.Header()
ps.PRS.ProposalBlockParts = bits.NewBitArray(int(rs.ProposalBlockParts.Header().Total))
}
if index, ok := rs.ProposalBlockParts.BitArray().Sub(prs.ProposalBlockParts.Copy()).PickRandom(); ok {
part := rs.ProposalBlockParts.GetPart(index)
partProto, err := part.ToProto()
Expand Down Expand Up @@ -604,7 +630,7 @@ OUTER_LOOP:
// Now consider sending other things, like the Proposal itself.

// Send Proposal && ProposalPOL BitArray?
if rs.Proposal != nil && !prs.Proposal {
if rs.Proposal != nil && !prs.Proposal && isValidator {
// Proposal: share the proposal metadata with peer.
{
propProto := rs.Proposal.ToProto()
Expand Down Expand Up @@ -654,11 +680,13 @@ OUTER_LOOP:
// there is a vote to send and false otherwise.
func (r *Reactor) pickSendVote(ps *PeerState, votes types.VoteSetReader) bool {
if vote, ok := ps.PickVoteToSend(votes); ok {
psJSON, _ := ps.ToJSON()
ps.logger.Debug(
"Sending vote message",
"ps", ps,
"ps", psJSON,
"peer_id", ps.peerID,
"vote", vote,
"peer_proTxHash", ps.ProTxHash.ShortString(),
"val_proTxHash", vote.ValidatorProTxHash.ShortString(),
"height", vote.Height,
"round", vote.Round,
Expand Down Expand Up @@ -695,10 +723,10 @@ func (r *Reactor) sendCommit(ps *PeerState, commit *types.Commit) bool {
func (r *Reactor) gossipVotesForHeight(rs *cstypes.RoundState, prs *cstypes.PeerRoundState, ps *PeerState) bool {
logger := r.Logger.With("height", prs.Height).With("peer", ps.peerID)

// if there are lastCommits to send...
// If there are lastPrecommits to send...
if prs.Step == cstypes.RoundStepNewHeight {
if r.pickSendVote(ps, rs.LastPrecommits) {
logger.Debug("picked rs.LastCommit to send")
logger.Debug("picked previous precommit vote to send")
return true
}
}
Expand Down Expand Up @@ -750,7 +778,7 @@ func (r *Reactor) gossipVotesForHeight(rs *cstypes.RoundState, prs *cstypes.Peer
return false
}

func (r *Reactor) gossipVotesRoutine(ps *PeerState) {
func (r *Reactor) gossipVotesAndCommitRoutine(ps *PeerState) {
logger := r.Logger.With("peer", ps.peerID)

defer ps.broadcastWG.Done()
Expand All @@ -776,6 +804,9 @@ OUTER_LOOP:
rs := r.state.GetRoundState()
prs := ps.GetRoundState()

isValidator := rs.Validators.HasProTxHash(ps.ProTxHash)
wasValidator := rs.LastValidators.HasProTxHash(ps.ProTxHash)

switch logThrottle {
case 1: // first sleep
logThrottle = 2
Expand All @@ -785,14 +816,25 @@ OUTER_LOOP:

// if height matches, then send LastCommit, Prevotes, and Precommits
if rs.Height == prs.Height {
if r.gossipVotesForHeight(rs, prs, ps) {
continue OUTER_LOOP
if !wasValidator {
// If there are lastCommits to send...
if prs.Step == cstypes.RoundStepNewHeight && prs.Height+1 == rs.Height && !prs.HasCommit {
if r.sendCommit(ps, rs.LastCommit) {
logger.Info("Sending LastCommit to non-validator node")
continue OUTER_LOOP
}
}
}
if isValidator {
if r.gossipVotesForHeight(rs, prs, ps) {
continue OUTER_LOOP
}
}
}

// special catchup logic -- if peer is lagging by height 1, send LastCommit
if prs.Height != 0 && rs.Height == prs.Height+1 {
if r.sendCommit(ps, rs.LastCommit) {
if prs.Height != 0 && rs.Height == prs.Height+1 && wasValidator {
if r.pickSendVote(ps, rs.LastPrecommits) {
logger.Debug("picked rs.LastCommit to send", "height", prs.Height)
continue OUTER_LOOP
}
Expand Down Expand Up @@ -988,7 +1030,7 @@ func (r *Reactor) processPeerUpdate(peerUpdate p2p.PeerUpdate) {

// start goroutines for this peer
go r.gossipDataRoutine(ps)
go r.gossipVotesRoutine(ps)
go r.gossipVotesAndCommitRoutine(ps)
go r.queryMaj23Routine(ps)

// Send our state to the peer. If we're block-syncing, broadcast a
Expand Down Expand Up @@ -1045,6 +1087,9 @@ func (r *Reactor) handleStateMessage(envelope p2p.Envelope, msgI Message) error
case *tmcons.NewValidBlock:
ps.ApplyNewValidBlockMessage(msgI.(*NewValidBlockMessage))

case *tmcons.HasCommit:
ps.ApplyHasCommitMessage(msgI.(*HasCommitMessage))

case *tmcons.HasVote:
ps.ApplyHasVoteMessage(msgI.(*HasVoteMessage))

Expand Down
1 change: 0 additions & 1 deletion internal/consensus/reactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,6 @@ func setup(t *testing.T, numNodes int, states []*State, size int) *reactorTestSu
for i, state := range states {
privProTxHashes[i] = state.privValidatorProTxHash
}

rts := &reactorTestSuite{
network: p2ptest.MakeNetwork(t, p2ptest.NetworkOptions{NumNodes: numNodes, ProTxHashes: privProTxHashes}),
states: make(map[types.NodeID]*State),
Expand Down
5 changes: 3 additions & 2 deletions internal/consensus/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -2499,7 +2499,7 @@ func (cs *State) addVote(vote *types.Vote, peerID types.NodeID) (added bool, err
switch vote.Type {
case tmproto.PrevoteType:
prevotes := cs.Votes.Prevotes(vote.Round)
cs.Logger.Debug("added vote to prevote", "vote", vote, "prevotes", prevotes.StringShort())
cs.Logger.Debug("added vote to prevote", "vote", vote, "prevotes", prevotes.LogString())

// If +2/3 prevotes for a block or nil for *any* round:
if blockID, ok := prevotes.TwoThirdsMajority(); ok {
Expand Down Expand Up @@ -2584,6 +2584,7 @@ func (cs *State) addVote(vote *types.Vote, peerID types.NodeID) (added bool, err
"height", vote.Height,
"round", vote.Round,
"validator", vote.ValidatorProTxHash.String(),
"val_index", vote.ValidatorIndex,
"data", precommits.LogString())

blockID, ok := precommits.TwoThirdsMajority()
Expand Down Expand Up @@ -2627,7 +2628,6 @@ func (cs *State) signVote(
if cs.privValidatorProTxHash == nil {
return nil, errProTxHashIsNotSet
}

proTxHash := cs.privValidatorProTxHash
valIdx, _ := cs.Validators.GetByProTxHash(proTxHash)

Expand Down Expand Up @@ -2684,6 +2684,7 @@ func (cs *State) signAddVote(msgType tmproto.SignedMsgType, hash []byte, header

// If the node not in the validator set, do nothing.
if !cs.Validators.HasProTxHash(cs.privValidatorProTxHash) {
cs.Logger.Debug("do nothing, node is not a part of validator set")
return nil
}

Expand Down
6 changes: 6 additions & 0 deletions proto/tendermint/consensus/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ func (m *Message) Wrap(pb proto.Message) error {
case *HasVote:
m.Sum = &Message_HasVote{HasVote: msg}

case *HasCommit:
m.Sum = &Message_HasCommit{HasCommit: msg}

case *VoteSetMaj23:
m.Sum = &Message_VoteSetMaj23{VoteSetMaj23: msg}

Expand Down Expand Up @@ -77,6 +80,9 @@ func (m *Message) Unwrap() (proto.Message, error) {
case *Message_VoteSetBits:
return m.GetVoteSetBits(), nil

case *Message_HasCommit:
return m.GetHasCommit(), nil

case *Message_Commit:
return m.GetCommit(), nil

Expand Down
1 change: 0 additions & 1 deletion test/e2e/generator/generate.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,6 @@ func generateTestnet(r *rand.Rand, opt map[string]interface{}) (e2e.Manifest, er
InitialState: opt["initialState"].(map[string]string),
Validators: map[string]int64{},
ValidatorUpdates: map[string]map[string]int64{},
ChainLockUpdates: map[string]int64{},
Nodes: map[string]*e2e.ManifestNode{},
KeyType: keyType.Choose(r).(string),
Evidence: evidence.Choose(r).(int),
Expand Down
2 changes: 2 additions & 0 deletions types/node_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,7 @@ func (info NodeInfo) Copy() NodeInfo {
Channels: info.Channels,
Moniker: info.Moniker,
Other: info.Other,
ProTxHash: info.ProTxHash,
}
}

Expand Down Expand Up @@ -250,6 +251,7 @@ func NodeInfoFromProto(pb *tmp2p.NodeInfo) (NodeInfo, error) {
TxIndex: pb.Other.TxIndex,
RPCAddress: pb.Other.RPCAddress,
},
ProTxHash: pb.ProTxHash,
}
return dni, nil
}

0 comments on commit 8ccea22

Please sign in to comment.