diff --git a/api/service/blockproposal/service.go b/api/service/blockproposal/service.go index 8330d6367e..6f123cf192 100644 --- a/api/service/blockproposal/service.go +++ b/api/service/blockproposal/service.go @@ -30,7 +30,7 @@ func (s *Service) Start() error { } func (s *Service) run() { - s.c.WaitForConsensusReadyV2(s.stopChan, s.stoppedChan) + s.c.StartCheckingForNewProposals(s.stopChan, s.stoppedChan) } // Stop stops block proposal service. diff --git a/p2p/stream/types/safe_map.go b/common/types/safe_map.go similarity index 99% rename from p2p/stream/types/safe_map.go rename to common/types/safe_map.go index e4a5e559c5..001438da76 100644 --- a/p2p/stream/types/safe_map.go +++ b/common/types/safe_map.go @@ -1,4 +1,4 @@ -package sttypes +package types import ( "sync" diff --git a/consensus/consensus.go b/consensus/consensus.go index f341319941..cd0b2597c1 100644 --- a/consensus/consensus.go +++ b/consensus/consensus.go @@ -32,25 +32,6 @@ const ( var errLeaderPriKeyNotFound = errors.New("leader private key not found locally") -type Proposal struct { - Type ProposalType - Caller string -} - -// NewProposal creates a new proposal -func NewProposal(t ProposalType) Proposal { - return Proposal{Type: t, Caller: utils.GetCallStackInfo(2)} -} - -// ProposalType is to indicate the type of signal for new block proposal -type ProposalType byte - -// Constant of the type of new block proposal -const ( - SyncProposal ProposalType = iota - AsyncProposal -) - type DownloadAsync interface { DownloadAsync() } @@ -74,6 +55,8 @@ type Consensus struct { prepareBitmap *bls_cosi.Mask commitBitmap *bls_cosi.Mask + proposalManager *ProposalManager + multiSigBitmap *bls_cosi.Mask // Bitmap for parsing multisig bitmap from validators pendingCXReceipts map[utils.CXKey]*types.CXReceiptsProof // All the receipts received but not yet processed for Consensus @@ -101,8 +84,6 @@ type Consensus struct { mutex *sync.RWMutex // ViewChange struct vc *viewChange - // Signal channel for proposing a new block and start new consensus - readySignal chan Proposal // Channel to send full commit signatures to finish new block proposal commitSigChannel chan []byte // verified block to state sync broadcast @@ -162,16 +143,27 @@ func (consensus *Consensus) ChainReader() engine.ChainReader { return consensus.Blockchain() } -func (consensus *Consensus) ReadySignal(p Proposal, signalSource string, signalReason string) { +func (consensus *Consensus) AddProposal(t ProposalType, source string, reason string) error { + bn := consensus.Blockchain().CurrentBlock().NumberU64() + v := consensus.GetViewChangingID() + p := NewProposal(t, v, bn, source, reason) + consensus.proposalManager.AddProposal(p) + return nil +} + +func (consensus *Consensus) ReadySignal(t ProposalType, signalSource string, signalReason string) { + if err := consensus.AddProposal(t, signalSource, signalReason); err != nil { + utils.Logger().Debug().Err(err). + Str("signalSource", signalSource). + Str("signalReason", signalReason). + Msg("ReadySignal is failed to add a new proposal") + return + } utils.Logger().Info(). + Str("ProposalType", t.String()). Str("signalSource", signalSource). Str("signalReason", signalReason). - Msg("ReadySignal is called to propose new block") - consensus.readySignal <- p -} - -func (consensus *Consensus) GetReadySignal() chan Proposal { - return consensus.readySignal + Msg("ReadySignal is called to propose a new block") } func (consensus *Consensus) GetCommitSigChannel() chan []byte { @@ -287,17 +279,18 @@ func New( Decider quorum.Decider, minPeers int, aggregateSig bool, ) (*Consensus, error) { consensus := Consensus{ - mutex: &sync.RWMutex{}, - ShardID: shard, - fBFTLog: NewFBFTLog(), - phase: FBFTAnnounce, - current: NewState(Normal), - decider: Decider, - registry: registry, - MinPeers: minPeers, - AggregateSig: aggregateSig, - host: host, - msgSender: NewMessageSender(host), + mutex: &sync.RWMutex{}, + ShardID: shard, + fBFTLog: NewFBFTLog(), + phase: FBFTAnnounce, + current: NewState(Normal), + decider: Decider, + registry: registry, + proposalManager: NewProposalManager(), + MinPeers: minPeers, + AggregateSig: aggregateSig, + host: host, + msgSender: NewMessageSender(host), // FBFT timeout consensusTimeout: createTimeout(), dHelper: downloadAsync{}, @@ -318,7 +311,6 @@ func New( // displayed on explorer as Height right now consensus.setCurBlockViewID(0) consensus.SlashChan = make(chan slash.Record) - consensus.readySignal = make(chan Proposal) consensus.commitSigChannel = make(chan []byte) // channel for receiving newly generated VDF consensus.RndChannel = make(chan [vdfAndSeedSize]byte) diff --git a/consensus/consensus_service.go b/consensus/consensus_service.go index 42e295573e..b25bf0f378 100644 --- a/consensus/consensus_service.go +++ b/consensus/consensus_service.go @@ -498,7 +498,7 @@ func (consensus *Consensus) updateConsensusInformation(reason string) Mode { consensus.GetLogger().Info(). Str("myKey", myPubKeys.SerializeToHexStr()). Msg("[UpdateConsensusInformation] I am the New Leader") - consensus.ReadySignal(NewProposal(SyncProposal), "updateConsensusInformation", "leader changed and I am the new leader") + consensus.ReadySignal(SyncProposal, "updateConsensusInformation", "leader changed and I am the new leader") }() } return Normal diff --git a/consensus/consensus_v2.go b/consensus/consensus_v2.go index 3567a324e0..47470738d9 100644 --- a/consensus/consensus_v2.go +++ b/consensus/consensus_v2.go @@ -278,7 +278,7 @@ func (consensus *Consensus) finalCommit(isLeader bool) { // No pipelining go func() { consensus.getLogger().Info().Msg("[finalCommit] sending block proposal signal") - consensus.ReadySignal(NewProposal(SyncProposal), "finalCommit", "I am leader and it's the last block in epoch") + consensus.ReadySignal(SyncProposal, "finalCommit", "I am leader and it's the last block in epoch") }() } else { // pipelining @@ -354,7 +354,7 @@ func (consensus *Consensus) StartChannel() { consensus.start = true consensus.getLogger().Info().Time("time", time.Now()).Msg("[ConsensusMainLoop] Send ReadySignal") consensus.mutex.Unlock() - consensus.ReadySignal(NewProposal(SyncProposal), "StartChannel", "consensus channel is started") + consensus.ReadySignal(SyncProposal, "StartChannel", "consensus channel is started") return } consensus.mutex.Unlock() @@ -606,7 +606,7 @@ func (consensus *Consensus) preCommitAndPropose(blk *types.Block) error { // Send signal to Node to propose the new block for consensus consensus.getLogger().Info().Msg("[preCommitAndPropose] sending block proposal signal") consensus.mutex.Unlock() - consensus.ReadySignal(NewProposal(AsyncProposal), "preCommitAndPropose", "proposing new block which will wait on the full commit signatures to finish") + consensus.ReadySignal(AsyncProposal, "preCommitAndPropose", "proposing new block which will wait on the full commit signatures to finish") }() return nil @@ -848,7 +848,7 @@ func (consensus *Consensus) setupForNewConsensus(blk *types.Block, committedMsg blockPeriod := consensus.BlockPeriod go func() { <-time.After(blockPeriod) - consensus.ReadySignal(NewProposal(SyncProposal), "setupForNewConsensus", "I am the new leader") + consensus.ReadySignal(SyncProposal, "setupForNewConsensus", "I am the new leader") }() } } diff --git a/consensus/proposal.go b/consensus/proposal.go new file mode 100644 index 0000000000..1a3e6b31f9 --- /dev/null +++ b/consensus/proposal.go @@ -0,0 +1,134 @@ +package consensus + +import ( + "sync" + "time" + + "github.com/harmony-one/harmony/internal/utils" +) + +// ProposalType is to indicate the type of signal for new block proposal +type ProposalType byte + +// Constant of the type of new block proposal +const ( + SyncProposal ProposalType = iota + AsyncProposal +) + +func (pt ProposalType) String() string { + if pt == SyncProposal { + return "SyncProposal" + } + return "AsyncProposal" +} + +// Proposal represents a new block proposal with associated metadata +type Proposal struct { + Type ProposalType + Caller string + Height uint64 + ViewID uint64 + Source string + Reason string + CreatedAt time.Time + lock *sync.RWMutex +} + +// NewProposal creates a new proposal +func NewProposal(t ProposalType, viewID uint64, height uint64, source string, reason string) *Proposal { + return &Proposal{ + Type: t, + Caller: utils.GetCallStackInfo(2), + ViewID: 0, + Height: 0, + Source: source, + Reason: reason, + CreatedAt: time.Now(), + lock: &sync.RWMutex{}, + } +} + +// Clone returns a copy of proposal +func (p *Proposal) Clone() *Proposal { + p.lock.RLock() + defer p.lock.RUnlock() + return &Proposal{ + Type: p.Type, + Caller: p.Caller, + ViewID: p.ViewID, + Height: p.Height, + CreatedAt: p.CreatedAt, + lock: &sync.RWMutex{}, + } +} + +// GetType retrieves the Proposal type +func (p *Proposal) GetType() ProposalType { + p.lock.RLock() + defer p.lock.RUnlock() + return p.Type +} + +// SetType updates the Proposal type +func (p *Proposal) SetType(t ProposalType) { + p.lock.Lock() + defer p.lock.Unlock() + p.Type = t +} + +// GetCaller retrieves the Proposal caller +func (p *Proposal) GetCaller() string { + p.lock.RLock() + defer p.lock.RUnlock() + return p.Caller +} + +// SetCaller updates the Proposal caller +func (p *Proposal) SetCaller(caller string) { + p.lock.Lock() + defer p.lock.Unlock() + p.Caller = caller +} + +// GetHeight retrieves the Proposal height +func (p *Proposal) GetHeight() uint64 { + p.lock.RLock() + defer p.lock.RUnlock() + return p.Height +} + +// SetHeight updates the Proposal height +func (p *Proposal) SetHeight(height uint64) { + p.lock.Lock() + defer p.lock.Unlock() + p.Height = height +} + +// GetViewID retrieves the Proposal view ID +func (p *Proposal) GetViewID() uint64 { + p.lock.RLock() + defer p.lock.RUnlock() + return p.ViewID +} + +// SetViewID updates the Proposal view ID +func (p *Proposal) SetViewID(viewID uint64) { + p.lock.Lock() + defer p.lock.Unlock() + p.ViewID = viewID +} + +// GetCreatedAt retrieves the Proposal creation time +func (p *Proposal) GetCreatedAt() time.Time { + p.lock.RLock() + defer p.lock.RUnlock() + return p.CreatedAt +} + +// SetCreatedAt updates the Proposal creation time +func (p *Proposal) SetCreatedAt(createdAt time.Time) { + p.lock.Lock() + defer p.lock.Unlock() + p.CreatedAt = createdAt +} diff --git a/consensus/proposal_manager.go b/consensus/proposal_manager.go new file mode 100644 index 0000000000..b9f954d91a --- /dev/null +++ b/consensus/proposal_manager.go @@ -0,0 +1,175 @@ +package consensus + +import ( + "sync" + + types "github.com/harmony-one/harmony/common/types" +) + +type ProposalCreationStatus int + +const ( + // Ready indicates the consensus is prepared to create a new proposal. + // No ongoing proposal or dependencies are blocking the proposal process. + Ready ProposalCreationStatus = iota + + // WaitingForCommitSigs signifies the consensus is currently waiting for commit signatures + // from the previous block or process. This state can persist for an extended duration, + // typically up to 8 seconds, depending on proposal type and processing time. + WaitingForCommitSigs + + // CreatingNewProposal indicates the consensus is already engaged in creating a new proposal. + // During this state, no additional proposals can be initiated until the current one completes. + CreatingNewPropsal +) + +func (pt ProposalCreationStatus) String() string { + switch pt { + case Ready: + return "Ready" + case WaitingForCommitSigs: + return "WaitingForCommitSigs" + case CreatingNewPropsal: + return "CreatingNewProposal" + default: + return "Unknown" + } +} + +type ProposalManager struct { + history *types.SafeMap[ProposalType, *Proposal] + lastHeight uint64 + status ProposalCreationStatus + lock *sync.RWMutex +} + +// NewProposalManager initializes a new ProposalManager. +func NewProposalManager() *ProposalManager { + return &ProposalManager{ + history: types.NewSafeMap[ProposalType, *Proposal](), + lastHeight: 0, + status: Ready, + lock: &sync.RWMutex{}, + } +} + +// SetLastHeight updates the last processed proposal height. +func (pm *ProposalManager) SetLastHeight(h uint64) { + pm.lock.Lock() + defer pm.lock.Unlock() + if h > pm.lastHeight { + pm.lastHeight = h + } +} + +// GetLastHeight retrieves the last processed proposal height. +func (pm *ProposalManager) GetLastHeight() uint64 { + pm.lock.RLock() + defer pm.lock.RUnlock() + return pm.lastHeight +} + +// SetStatus sets new proposal creation status. +func (pm *ProposalManager) SetStatus(newStatus ProposalCreationStatus) { + pm.lock.Lock() + defer pm.lock.Unlock() + pm.status = newStatus +} + +// StartWaitingForCommitSigs sets isWaitingForCommitSigs. +func (pm *ProposalManager) SetToWaitingForCommitSigsMode() { + pm.lock.Lock() + defer pm.lock.Unlock() + pm.status = WaitingForCommitSigs +} + +// IsWaitingForCommitSigs returns true if current proposal is waiting for commit sigs. +func (pm *ProposalManager) IsWaitingForCommitSigs() bool { + pm.lock.RLock() + defer pm.lock.RUnlock() + return pm.status == WaitingForCommitSigs +} + +// StartWaitingForCommitSigs sets isWaitingForCommitSigs. +func (pm *ProposalManager) SetToCreatingNewProposalMode() { + pm.lock.Lock() + defer pm.lock.Unlock() + pm.status = CreatingNewPropsal +} + +// IsCreatingNewProposal returns true if consensus is busy with proposal creation. +func (pm *ProposalManager) IsCreatingNewProposal() bool { + pm.lock.RLock() + defer pm.lock.RUnlock() + return pm.status == CreatingNewPropsal +} + +// Done sets status to ready. +func (pm *ProposalManager) Done() { + pm.lock.Lock() + defer pm.lock.Unlock() + pm.status = Ready +} + +// IsWaitingForCommitSigs returns true if current proposal is waiting for commit sigs. +func (pm *ProposalManager) IsReady() bool { + pm.lock.RLock() + defer pm.lock.RUnlock() + return pm.status == Ready +} + +// AddProposal adds a new proposal if valid or updates an existing one if better. +func (pm *ProposalManager) AddProposal(p *Proposal) bool { + pm.lock.Lock() + defer pm.lock.Unlock() + existingProposal, exists := pm.history.Get(p.Type) + if exists { + if p.Height > existingProposal.Height || (p.Height == existingProposal.Height && p.ViewID > existingProposal.ViewID) { + pm.history.Set(p.Type, p) + return true + } + return false + } + pm.history.Set(p.Type, p) + return true +} + +// GetNextProposal retrieves and removes the next proposal based on priority. +func (pm *ProposalManager) GetNextProposal() (*Proposal, error) { + pm.lock.Lock() + defer pm.lock.Unlock() + + syncProposal, syncExist := pm.history.Get(SyncProposal) + asyncProposal, asyncExist := pm.history.Get(AsyncProposal) + + var nextProposal *Proposal + if syncExist { + nextProposal = syncProposal.Clone() + pm.history.Delete(SyncProposal) + } else if asyncExist { + nextProposal = asyncProposal.Clone() + pm.history.Delete(AsyncProposal) + } + + if nextProposal == nil { + // no proposals available + return nil, nil + } + + pm.lastHeight = nextProposal.Height + return nextProposal, nil +} + +// ClearHistory clears all proposals from the history. +func (pm *ProposalManager) ClearHistory() { + pm.lock.Lock() + defer pm.lock.Unlock() + pm.history.Clear() +} + +// Length returns the number of proposals in the history. +func (pm *ProposalManager) Length() int { + pm.lock.RLock() + defer pm.lock.RUnlock() + return pm.history.Length() +} diff --git a/consensus/proposer.go b/consensus/proposer.go index 6f60887ecf..7026abff2e 100644 --- a/consensus/proposer.go +++ b/consensus/proposer.go @@ -16,9 +16,9 @@ func NewProposer(consensus *Consensus) *Proposer { return &Proposer{consensus} } -// WaitForConsensusReadyV2 listen for the readiness signal from consensus and generate new block for consensus. +// StartCheckingForNewProposals checks the proposal queue and generate new block for consensus. // only leader will receive the ready signal -func (p *Proposer) WaitForConsensusReadyV2(stopChan chan struct{}, stoppedChan chan struct{}) { +func (p *Proposer) StartCheckingForNewProposals(stopChan chan struct{}, stoppedChan chan struct{}) { consensus := p.consensus go func() { // Setup stoppedChan @@ -39,65 +39,116 @@ func (p *Proposer) WaitForConsensusReadyV2(stopChan chan struct{}, stoppedChan c utils.Logger().Warn(). Msg("Consensus new block proposal: STOPPED!") return - case proposal := <-consensus.GetReadySignal(): - for retryCount := 0; retryCount < 3 && consensus.IsLeader(); retryCount++ { - time.Sleep(SleepPeriod) - utils.Logger().Info(). - Uint64("blockNum", consensus.Blockchain().CurrentBlock().NumberU64()+1). - Bool("asyncProposal", proposal.Type == AsyncProposal). - Str("called", proposal.Caller). - Msg("PROPOSING NEW BLOCK ------------------------------------------------") - - // Prepare last commit signatures - newCommitSigsChan := make(chan []byte) - - go func() { - waitTime := 0 * time.Second - if proposal.Type == AsyncProposal { - waitTime = worker.CommitSigReceiverTimeout - } - select { - case <-time.After(waitTime): - if waitTime == 0 { - utils.Logger().Info().Msg("[ProposeNewBlock] Sync block proposal, reading commit sigs directly from DB") - } else { - utils.Logger().Info().Msg("[ProposeNewBlock] Timeout waiting for commit sigs, reading directly from DB") - } - sigs, err := consensus.BlockCommitSigs(consensus.Blockchain().CurrentBlock().NumberU64()) - - if err != nil { - utils.Logger().Error().Err(err).Msg("[ProposeNewBlock] Cannot get commit signatures from last block") - } else { - newCommitSigsChan <- sigs - } - case commitSigs := <-consensus.GetCommitSigChannel(): - utils.Logger().Info().Msg("[ProposeNewBlock] received commit sigs asynchronously") - if len(commitSigs) > bls.BLSSignatureSizeInBytes { - newCommitSigsChan <- commitSigs - } - } - }() - newBlock, err := consensus.ProposeNewBlock(newCommitSigsChan) - if err == nil { - utils.Logger().Info(). - Uint64("blockNum", newBlock.NumberU64()). - Uint64("epoch", newBlock.Epoch().Uint64()). - Uint64("viewID", newBlock.Header().ViewID().Uint64()). - Int("numTxs", newBlock.Transactions().Len()). - Int("numStakingTxs", newBlock.StakingTransactions().Len()). - Int("crossShardReceipts", newBlock.IncomingReceipts().Len()). - Msgf("=========Successfully Proposed New Block, shard: %d epoch: %d number: %d ==========", newBlock.ShardID(), newBlock.Epoch().Uint64(), newBlock.NumberU64()) - - // Send the new block to Consensus so it can be confirmed. - consensus.BlockChannel(newBlock) - break - } else { - utils.Logger().Err(err).Int("retryCount", retryCount). - Msg("!!!!!!!!!Failed Proposing New Block!!!!!!!!!") - continue - } + //case proposal := <-consensus.GetReadySignal(): + + case <-time.NewTicker(100 * time.Millisecond).C: + if !consensus.proposalManager.IsReady() { + continue + } + numProposalsInQueue := consensus.proposalManager.Length() + if numProposalsInQueue == 0 { + continue + } + // Send signal every 100ms + proposal, errNewProposal := consensus.proposalManager.GetNextProposal() + if errNewProposal != nil { + utils.Logger().Debug().Err(errNewProposal).Msg("[ProposeNewBlock] Cannot get next proposal") + } + if proposal == nil { + continue + } + if err := p.CreateProposal(proposal); err != nil { + utils.Logger().Warn().Err(err). + Str("Caller", proposal.Caller). + Uint64("Height", proposal.Height). + Uint64("ViewID", proposal.ViewID). + Msg("[ProposeNewBlock] proposal creation failed") + } + if !consensus.proposalManager.IsWaitingForCommitSigs() { + consensus.proposalManager.Done() } + } } }() } + +func (p *Proposer) CreateProposal(proposal *Proposal) error { + + consensus := p.consensus + if consensus == nil { + utils.Logger().Warn().Msg("[CreateProposal] trying to create a new block proposal while consensus is not initialized yet") + return nil + } + + // set proposal manager status status + consensus.proposalManager.SetToCreatingNewProposalMode() + + for retryCount := 0; retryCount < 3 && consensus.IsLeader(); retryCount++ { + time.Sleep(SleepPeriod) + utils.Logger().Info(). + Uint64("blockNum", consensus.Blockchain().CurrentBlock().NumberU64()+1). + Bool("asyncProposal", proposal.Type == AsyncProposal). + Str("called", proposal.Caller). + Msg("PROPOSING NEW BLOCK ------------------------------------------------") + + // Prepare last commit signatures + newCommitSigsChan := make(chan []byte) + + go func() { + waitTime := 0 * time.Second + if proposal.Type == AsyncProposal { + waitTime = worker.CommitSigReceiverTimeout + } + if waitTime > 0 { + consensus.proposalManager.SetToWaitingForCommitSigsMode() + } + select { + case <-time.After(waitTime): + if waitTime == 0 { + utils.Logger().Info().Msg("[ProposeNewBlock] Sync block proposal, reading commit sigs directly from DB") + } else { + utils.Logger().Info().Msg("[ProposeNewBlock] Timeout waiting for commit sigs, reading directly from DB") + } + sigs, err := consensus.BlockCommitSigs(consensus.Blockchain().CurrentBlock().NumberU64()) + + if err != nil { + utils.Logger().Error().Err(err).Msg("[ProposeNewBlock] Cannot get commit signatures from last block") + } else { + newCommitSigsChan <- sigs + } + if waitTime > 0 { + consensus.proposalManager.Done() + } + case commitSigs := <-consensus.GetCommitSigChannel(): + utils.Logger().Info().Msg("[ProposeNewBlock] received commit sigs asynchronously") + if len(commitSigs) > bls.BLSSignatureSizeInBytes { + newCommitSigsChan <- commitSigs + if waitTime > 0 { + consensus.proposalManager.Done() + } + } + } + }() + newBlock, err := consensus.ProposeNewBlock(newCommitSigsChan) + if err == nil { + utils.Logger().Info(). + Uint64("blockNum", newBlock.NumberU64()). + Uint64("epoch", newBlock.Epoch().Uint64()). + Uint64("viewID", newBlock.Header().ViewID().Uint64()). + Int("numTxs", newBlock.Transactions().Len()). + Int("numStakingTxs", newBlock.StakingTransactions().Len()). + Int("crossShardReceipts", newBlock.IncomingReceipts().Len()). + Msgf("=========Successfully Proposed New Block, shard: %d epoch: %d number: %d ==========", newBlock.ShardID(), newBlock.Epoch().Uint64(), newBlock.NumberU64()) + + // Send the new block to Consensus so it can be confirmed. + consensus.BlockChannel(newBlock) + break + } else { + utils.Logger().Err(err).Int("retryCount", retryCount). + Msg("!!!!!!!!!Failed Proposing New Block!!!!!!!!!") + continue + } + } + return nil +} diff --git a/consensus/view_change.go b/consensus/view_change.go index 6fb6def705..51e7912bd8 100644 --- a/consensus/view_change.go +++ b/consensus/view_change.go @@ -440,7 +440,7 @@ func (consensus *Consensus) onViewChange(recvMsg *FBFTMessage) { consensus.getLogger().Error().Err(err).Msg("[onViewChange] startNewView failed") return } - go consensus.ReadySignal(NewProposal(SyncProposal), "onViewChange", "quorum is achieved by mask and is view change mode and M1 payload is empty") + go consensus.ReadySignal(SyncProposal, "onViewChange", "quorum is achieved by mask and is view change mode and M1 payload is empty") return } diff --git a/p2p/stream/common/requestmanager/interface_test.go b/p2p/stream/common/requestmanager/interface_test.go index 6fc087103e..1449198c0a 100644 --- a/p2p/stream/common/requestmanager/interface_test.go +++ b/p2p/stream/common/requestmanager/interface_test.go @@ -9,6 +9,7 @@ import ( "github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/rlp" + types "github.com/harmony-one/harmony/common/types" "github.com/harmony-one/harmony/p2p/stream/common/streammanager" sttypes "github.com/harmony-one/harmony/p2p/stream/types" ) @@ -138,8 +139,8 @@ func makeDummyTestStreams(indexes []int) []sttypes.Stream { return sts } -func makeDummyStreamSets(indexes []int) *sttypes.SafeMap[sttypes.StreamID, *stream] { - m := sttypes.NewSafeMap[sttypes.StreamID, *stream]() +func makeDummyStreamSets(indexes []int) *types.SafeMap[sttypes.StreamID, *stream] { + m := types.NewSafeMap[sttypes.StreamID, *stream]() for _, index := range indexes { st := &testStream{ diff --git a/p2p/stream/common/requestmanager/requestmanager.go b/p2p/stream/common/requestmanager/requestmanager.go index 923668a152..e128244fcb 100644 --- a/p2p/stream/common/requestmanager/requestmanager.go +++ b/p2p/stream/common/requestmanager/requestmanager.go @@ -10,6 +10,7 @@ import ( "github.com/rs/zerolog" "github.com/ethereum/go-ethereum/event" + types "github.com/harmony-one/harmony/common/types" "github.com/harmony-one/harmony/internal/utils" "github.com/harmony-one/harmony/p2p/stream/common/streammanager" sttypes "github.com/harmony-one/harmony/p2p/stream/types" @@ -20,10 +21,10 @@ import ( // TODO: each peer is able to have a queue of requests instead of one request at a time. // TODO: add QoS evaluation for each stream type requestManager struct { - streams *sttypes.SafeMap[sttypes.StreamID, *stream] // All streams - available *sttypes.SafeMap[sttypes.StreamID, struct{}] // Streams that are available for request - pendings *sttypes.SafeMap[uint64, *request] // requests that are sent but not received response - waitings requestQueues // double linked list of requests that are on the waiting list + streams *types.SafeMap[sttypes.StreamID, *stream] // All streams + available *types.SafeMap[sttypes.StreamID, struct{}] // Streams that are available for request + pendings *types.SafeMap[uint64, *request] // requests that are sent but not received response + waitings requestQueues // double linked list of requests that are on the waiting list // Stream events sm streammanager.Reader @@ -56,9 +57,9 @@ func newRequestManager(sm streammanager.ReaderSubscriber) *requestManager { logger := utils.Logger().With().Str("module", "request manager").Logger() return &requestManager{ - streams: sttypes.NewSafeMap[sttypes.StreamID, *stream](), - available: sttypes.NewSafeMap[sttypes.StreamID, struct{}](), - pendings: sttypes.NewSafeMap[uint64, *request](), + streams: types.NewSafeMap[sttypes.StreamID, *stream](), + available: types.NewSafeMap[sttypes.StreamID, struct{}](), + pendings: types.NewSafeMap[uint64, *request](), waitings: newRequestQueues(), sm: sm, @@ -355,7 +356,7 @@ func (rm *requestManager) refreshStreams() { } } -func checkStreamUpdates(exists *sttypes.SafeMap[sttypes.StreamID, *stream], targets []sttypes.Stream) (added []sttypes.Stream, removed []*stream) { +func checkStreamUpdates(exists *types.SafeMap[sttypes.StreamID, *stream], targets []sttypes.Stream) (added []sttypes.Stream, removed []*stream) { targetM := make(map[sttypes.StreamID]sttypes.Stream) for _, target := range targets { @@ -401,9 +402,9 @@ func (rm *requestManager) close() { rm.pendings.Iterate(func(key uint64, req *request) { req.doneWithResponse(responseData{err: ErrClosed}) }) - rm.streams = sttypes.NewSafeMap[sttypes.StreamID, *stream]() - rm.available = sttypes.NewSafeMap[sttypes.StreamID, struct{}]() - rm.pendings = sttypes.NewSafeMap[uint64, *request]() + rm.streams = types.NewSafeMap[sttypes.StreamID, *stream]() + rm.available = types.NewSafeMap[sttypes.StreamID, struct{}]() + rm.pendings = types.NewSafeMap[uint64, *request]() rm.waitings = newRequestQueues() close(rm.stopC) } diff --git a/p2p/stream/common/requestmanager/requestmanager_test.go b/p2p/stream/common/requestmanager/requestmanager_test.go index 64ad4458d7..ac6c2333dd 100644 --- a/p2p/stream/common/requestmanager/requestmanager_test.go +++ b/p2p/stream/common/requestmanager/requestmanager_test.go @@ -8,6 +8,7 @@ import ( "testing" "time" + types "github.com/harmony-one/harmony/common/types" sttypes "github.com/harmony-one/harmony/p2p/stream/types" "github.com/pkg/errors" ) @@ -368,7 +369,7 @@ func TestRequestManager_Concurrency(t *testing.T) { func TestGenReqID(t *testing.T) { retry := 100000 rm := &requestManager{ - pendings: sttypes.NewSafeMap[uint64, *request](), + pendings: types.NewSafeMap[uint64, *request](), } for i := 0; i != retry; i++ { @@ -382,7 +383,7 @@ func TestGenReqID(t *testing.T) { func TestCheckStreamUpdates(t *testing.T) { tests := []struct { - exists *sttypes.SafeMap[sttypes.StreamID, *stream] + exists *types.SafeMap[sttypes.StreamID, *stream] targets []sttypes.Stream expAddedIndexes []int expRemovedIndexes []int