diff --git a/p2p/stream/types/safe_map.go b/common/types/safe_map.go similarity index 99% rename from p2p/stream/types/safe_map.go rename to common/types/safe_map.go index e4a5e559c5..001438da76 100644 --- a/p2p/stream/types/safe_map.go +++ b/common/types/safe_map.go @@ -1,4 +1,4 @@ -package sttypes +package types import ( "sync" diff --git a/consensus/leader.go b/consensus/leader.go index d97bb17e1b..27d7d3e563 100644 --- a/consensus/leader.go +++ b/consensus/leader.go @@ -93,19 +93,6 @@ func (consensus *Consensus) announce(block *types.Block) { consensus.switchPhase("Announce", FBFTPrepare) } -func (consensus *Consensus) checkFirstReceivedSignature(signerCount int64, phase quorum.Phase) (bool, bool) { - hasMultiBlsKeys := len(consensus.priKey) > 0 - if hasMultiBlsKeys { - var myPubkeys []bls.SerializedPublicKey - for _, key := range consensus.priKey { - myPubkeys = append(myPubkeys, key.Pub.Bytes) - } - mySignsCount := consensus.decider.GetBallotsCount(phase, myPubkeys) - return true, signerCount == mySignsCount - } - return false, false -} - // this method is called for each validator sent their vote message func (consensus *Consensus) onPrepare(recvMsg *FBFTMessage) { // TODO(audit): make FBFT lookup using map instead of looping through all items. @@ -135,20 +122,6 @@ func (consensus *Consensus) onPrepare(recvMsg *FBFTMessage) { } signerCount := consensus.decider.SignersCount(quorum.Prepare) - - // check if it is first received signatures - // it may multi bls key validators can achieve quorum on first signature - hasMultiBlsKeys, isFirstReceivedSignature := consensus.checkFirstReceivedSignature(signerCount, quorum.Prepare) - - quorumPreExisting := consensus.decider.IsQuorumAchieved(quorum.Prepare) - //// Read - End - - if quorumPreExisting { - // already have enough signatures - consensus.getLogger().Debug(). - Interface("validatorPubKeys", recvMsg.SenderPubkeys). - Msg("[OnPrepare] Received Additional Prepare Message") - } //// Read - End consensus.UpdateLeaderMetrics(float64(signerCount), float64(consensus.getBlockNum())) @@ -203,15 +176,14 @@ func (consensus *Consensus) onPrepare(recvMsg *FBFTMessage) { //// Write - End //// Read - Start - quorumFromInitialSignature := hasMultiBlsKeys && isFirstReceivedSignature && quorumPreExisting - quorumPostNewSignatures := consensus.decider.IsQuorumAchieved(quorum.Prepare) - quorumFromNewSignatures := !quorumPreExisting && quorumPostNewSignatures - - if quorumFromInitialSignature || quorumFromNewSignatures { + quorumIsMet := consensus.decider.IsQuorumAchieved(quorum.Prepare) + lastQuorumAchievedBlock := consensus.current.GetLastQuorumAchievedBlock(quorum.Prepare) + if quorumIsMet && recvMsg.BlockNum > lastQuorumAchievedBlock { // NOTE Let it handle its own logs if err := consensus.didReachPrepareQuorum(); err != nil { return } + consensus.current.SetLastQuorumAchievedBlock(quorum.Prepare, recvMsg.BlockNum) consensus.switchPhase("onPrepare", FBFTCommit) } //// Read - End @@ -236,15 +208,7 @@ func (consensus *Consensus) onCommit(recvMsg *FBFTMessage) { commitBitmap := consensus.commitBitmap - // has to be called before verifying signature - quorumWasMet := consensus.decider.IsQuorumAchieved(quorum.Commit) - signerCount := consensus.decider.SignersCount(quorum.Commit) - - // check if it is first received commit - // it may multi bls key validators can achieve quorum on first commit - hasMultiBlsKeys, isFirstReceivedSignature := consensus.checkFirstReceivedSignature(signerCount, quorum.Commit) - //// Read - End // Verify the signature on commitPayload is correct @@ -318,13 +282,11 @@ func (consensus *Consensus) onCommit(recvMsg *FBFTMessage) { quorumIsMet := consensus.decider.IsQuorumAchieved(quorum.Commit) //// Read - End - - quorumAchievedByFirstCommit := hasMultiBlsKeys && isFirstReceivedSignature && quorumWasMet - quorumAchievedByThisCommit := !quorumWasMet && quorumIsMet - - if quorumAchievedByFirstCommit || quorumAchievedByThisCommit { + lastQuorumAchievedBlock := consensus.current.GetLastQuorumAchievedBlock(quorum.Commit) + if quorumIsMet && blockObj.NumberU64() > lastQuorumAchievedBlock { logger.Info().Msg("[OnCommit] 2/3 Enough commits received") consensus.fBFTLog.MarkBlockVerified(blockObj) + consensus.current.SetLastQuorumAchievedBlock(quorum.Commit, blockObj.NumberU64()) if !blockObj.IsLastBlockInEpoch() { // only do early commit if it's not epoch block to avoid problems diff --git a/consensus/view_change.go b/consensus/view_change.go index 6fb6def705..36ef534e2b 100644 --- a/consensus/view_change.go +++ b/consensus/view_change.go @@ -7,6 +7,7 @@ import ( "github.com/ethereum/go-ethereum/common" msg_pb "github.com/harmony-one/harmony/api/proto/message" + types "github.com/harmony-one/harmony/common/types" "github.com/harmony-one/harmony/consensus/quorum" "github.com/harmony-one/harmony/crypto/bls" "github.com/harmony-one/harmony/internal/chain" @@ -32,11 +33,14 @@ type State struct { // view changing id is used during view change mode // it is the next view id viewChangingID uint64 + + quorumAchievedBlock *types.SafeMap[quorum.Phase, uint64] } func NewState(mode Mode) State { return State{ - mode: uint32(mode), + mode: uint32(mode), + quorumAchievedBlock: types.NewSafeMap[quorum.Phase, uint64](), } } @@ -73,6 +77,23 @@ func (pm *State) SetViewChangingID(id uint64) { atomic.StoreUint64(&pm.viewChangingID, id) } +// GetLastQuorumAchievedBlock retrieves the block number of the last block +// that achieved quorum for the specified phase. +// If no quorum has been achieved for the given phase, it returns 0. +func (pm *State) GetLastQuorumAchievedBlock(p quorum.Phase) uint64 { + lqab, exists := pm.quorumAchievedBlock.Get(p) + if !exists { + return 0 + } + return lqab +} + +// SetLastQuorumAchievedBlock updates the block number of the last block +// that achieved quorum for the specified phase. +func (pm *State) SetLastQuorumAchievedBlock(p quorum.Phase, blockNum uint64) { + pm.quorumAchievedBlock.Set(p, blockNum) +} + // GetViewChangeDuraion return the duration of the current view change // It increase in the power of difference betweeen view changing ID and current view ID func (pm *State) GetViewChangeDuraion() time.Duration { diff --git a/p2p/stream/common/requestmanager/interface_test.go b/p2p/stream/common/requestmanager/interface_test.go index 6fc087103e..1449198c0a 100644 --- a/p2p/stream/common/requestmanager/interface_test.go +++ b/p2p/stream/common/requestmanager/interface_test.go @@ -9,6 +9,7 @@ import ( "github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/rlp" + types "github.com/harmony-one/harmony/common/types" "github.com/harmony-one/harmony/p2p/stream/common/streammanager" sttypes "github.com/harmony-one/harmony/p2p/stream/types" ) @@ -138,8 +139,8 @@ func makeDummyTestStreams(indexes []int) []sttypes.Stream { return sts } -func makeDummyStreamSets(indexes []int) *sttypes.SafeMap[sttypes.StreamID, *stream] { - m := sttypes.NewSafeMap[sttypes.StreamID, *stream]() +func makeDummyStreamSets(indexes []int) *types.SafeMap[sttypes.StreamID, *stream] { + m := types.NewSafeMap[sttypes.StreamID, *stream]() for _, index := range indexes { st := &testStream{ diff --git a/p2p/stream/common/requestmanager/requestmanager.go b/p2p/stream/common/requestmanager/requestmanager.go index 923668a152..e128244fcb 100644 --- a/p2p/stream/common/requestmanager/requestmanager.go +++ b/p2p/stream/common/requestmanager/requestmanager.go @@ -10,6 +10,7 @@ import ( "github.com/rs/zerolog" "github.com/ethereum/go-ethereum/event" + types "github.com/harmony-one/harmony/common/types" "github.com/harmony-one/harmony/internal/utils" "github.com/harmony-one/harmony/p2p/stream/common/streammanager" sttypes "github.com/harmony-one/harmony/p2p/stream/types" @@ -20,10 +21,10 @@ import ( // TODO: each peer is able to have a queue of requests instead of one request at a time. // TODO: add QoS evaluation for each stream type requestManager struct { - streams *sttypes.SafeMap[sttypes.StreamID, *stream] // All streams - available *sttypes.SafeMap[sttypes.StreamID, struct{}] // Streams that are available for request - pendings *sttypes.SafeMap[uint64, *request] // requests that are sent but not received response - waitings requestQueues // double linked list of requests that are on the waiting list + streams *types.SafeMap[sttypes.StreamID, *stream] // All streams + available *types.SafeMap[sttypes.StreamID, struct{}] // Streams that are available for request + pendings *types.SafeMap[uint64, *request] // requests that are sent but not received response + waitings requestQueues // double linked list of requests that are on the waiting list // Stream events sm streammanager.Reader @@ -56,9 +57,9 @@ func newRequestManager(sm streammanager.ReaderSubscriber) *requestManager { logger := utils.Logger().With().Str("module", "request manager").Logger() return &requestManager{ - streams: sttypes.NewSafeMap[sttypes.StreamID, *stream](), - available: sttypes.NewSafeMap[sttypes.StreamID, struct{}](), - pendings: sttypes.NewSafeMap[uint64, *request](), + streams: types.NewSafeMap[sttypes.StreamID, *stream](), + available: types.NewSafeMap[sttypes.StreamID, struct{}](), + pendings: types.NewSafeMap[uint64, *request](), waitings: newRequestQueues(), sm: sm, @@ -355,7 +356,7 @@ func (rm *requestManager) refreshStreams() { } } -func checkStreamUpdates(exists *sttypes.SafeMap[sttypes.StreamID, *stream], targets []sttypes.Stream) (added []sttypes.Stream, removed []*stream) { +func checkStreamUpdates(exists *types.SafeMap[sttypes.StreamID, *stream], targets []sttypes.Stream) (added []sttypes.Stream, removed []*stream) { targetM := make(map[sttypes.StreamID]sttypes.Stream) for _, target := range targets { @@ -401,9 +402,9 @@ func (rm *requestManager) close() { rm.pendings.Iterate(func(key uint64, req *request) { req.doneWithResponse(responseData{err: ErrClosed}) }) - rm.streams = sttypes.NewSafeMap[sttypes.StreamID, *stream]() - rm.available = sttypes.NewSafeMap[sttypes.StreamID, struct{}]() - rm.pendings = sttypes.NewSafeMap[uint64, *request]() + rm.streams = types.NewSafeMap[sttypes.StreamID, *stream]() + rm.available = types.NewSafeMap[sttypes.StreamID, struct{}]() + rm.pendings = types.NewSafeMap[uint64, *request]() rm.waitings = newRequestQueues() close(rm.stopC) } diff --git a/p2p/stream/common/requestmanager/requestmanager_test.go b/p2p/stream/common/requestmanager/requestmanager_test.go index 64ad4458d7..ac6c2333dd 100644 --- a/p2p/stream/common/requestmanager/requestmanager_test.go +++ b/p2p/stream/common/requestmanager/requestmanager_test.go @@ -8,6 +8,7 @@ import ( "testing" "time" + types "github.com/harmony-one/harmony/common/types" sttypes "github.com/harmony-one/harmony/p2p/stream/types" "github.com/pkg/errors" ) @@ -368,7 +369,7 @@ func TestRequestManager_Concurrency(t *testing.T) { func TestGenReqID(t *testing.T) { retry := 100000 rm := &requestManager{ - pendings: sttypes.NewSafeMap[uint64, *request](), + pendings: types.NewSafeMap[uint64, *request](), } for i := 0; i != retry; i++ { @@ -382,7 +383,7 @@ func TestGenReqID(t *testing.T) { func TestCheckStreamUpdates(t *testing.T) { tests := []struct { - exists *sttypes.SafeMap[sttypes.StreamID, *stream] + exists *types.SafeMap[sttypes.StreamID, *stream] targets []sttypes.Stream expAddedIndexes []int expRemovedIndexes []int