Skip to content

Commit

Permalink
consensus: add HeaderWithTime
Browse files Browse the repository at this point in the history
- Define HeaderWithTime.

- Maintain time annotations on the ChainSync candidates.

- Maintain a parallel selection with time annotations since the
  ConsensusBlockFetch interface uses the same type argument for ChainSync
  candidates and the current selection.

- Note that HeaderWithTime is hidden from the tracer events, at least for now.

Co-authored-by: Damian Nadales <[email protected]>
Co-authored-by: amesgen <[email protected]>
  • Loading branch information
2 people authored and nfrisby committed Dec 18, 2024
1 parent e9bc172 commit df3b2a4
Show file tree
Hide file tree
Showing 34 changed files with 649 additions and 176 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ import Data.Void (Void)
import Network.TypedProtocol.Codec
import Ouroboros.Consensus.Block
import Ouroboros.Consensus.Config (DiffusionPipeliningSupport (..))
import Ouroboros.Consensus.HeaderValidation (HeaderWithTime)
import Ouroboros.Consensus.Ledger.SupportsMempool
import Ouroboros.Consensus.Ledger.SupportsProtocol
import Ouroboros.Consensus.MiniProtocol.BlockFetch.Server
Expand Down Expand Up @@ -152,7 +153,7 @@ data Handlers m addr blk = Handlers {
:: NodeToNodeVersion
-> ControlMessageSTM m
-> FetchedMetricsTracer m
-> BlockFetchClient (Header blk) blk m ()
-> BlockFetchClient (HeaderWithTime blk) blk m ()

, hBlockFetchServer
:: ConnectionId addr
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@ module Ouroboros.Consensus.Node.Genesis (

import Control.Monad (join)
import Data.Traversable (for)
import Data.Typeable (Typeable)
import Ouroboros.Consensus.Block
import Ouroboros.Consensus.HeaderValidation (HeaderWithTime (..))
import Ouroboros.Consensus.MiniProtocol.ChainSync.Client
(CSJConfig (..), CSJEnabledConfig (..),
ChainSyncLoPBucketConfig (..),
Expand Down Expand Up @@ -89,7 +91,7 @@ data GenesisNodeKernelArgs m blk = GenesisNodeKernelArgs {
-- 'ChainDB.GetLoEFragment' that will be replaced via 'setGetLoEFragment') and a
-- function to update the 'ChainDbArgs' accordingly.
mkGenesisNodeKernelArgs ::
forall m blk. (IOLike m, GetHeader blk)
forall m blk. (IOLike m, GetHeader blk, Typeable blk)
=> GenesisConfig
-> m ( GenesisNodeKernelArgs m blk
, Complete ChainDbArgs m blk -> Complete ChainDbArgs m blk
Expand All @@ -113,9 +115,9 @@ mkGenesisNodeKernelArgs gcfg = do
-- | Set 'gnkaGetLoEFragment' to the actual logic for determining the current
-- LoE fragment.
setGetLoEFragment ::
forall m blk. (IOLike m, GetHeader blk)
forall m blk. (IOLike m, GetHeader blk, Typeable blk)
=> STM m GSM.GsmState
-> STM m (AnchoredFragment (Header blk))
-> STM m (AnchoredFragment (HeaderWithTime blk))
-- ^ The LoE fragment.
-> StrictTVar m (ChainDB.GetLoEFragment m blk)
-> m ()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TypeApplications #-}
{-# LANGUAGE TypeFamilies #-}
{-# LANGUAGE TypeOperators #-}

module Ouroboros.Consensus.NodeKernel (
-- * Node kernel
Expand Down Expand Up @@ -45,6 +46,7 @@ import Data.List.NonEmpty (NonEmpty)
import Data.Map.Strict (Map)
import Data.Maybe (isJust, mapMaybe)
import Data.Proxy
import qualified Data.Set as Set
import qualified Data.Text as Text
import Data.Void (Void)
import Ouroboros.Consensus.Block hiding (blockMatchesHeader)
Expand Down Expand Up @@ -94,6 +96,7 @@ import Ouroboros.Network.AnchoredFragment (AnchoredFragment,
import qualified Ouroboros.Network.AnchoredFragment as AF
import Ouroboros.Network.Block (castTip, tipFromHeader)
import Ouroboros.Network.BlockFetch
import qualified Ouroboros.Network.BlockFetch.ClientState as BF
import Ouroboros.Network.Diffusion (PublicPeerSelectionState)
import Ouroboros.Network.NodeToNode (ConnectionId,
MiniProtocolParameters (..))
Expand Down Expand Up @@ -131,7 +134,7 @@ data NodeKernel m addrNTN addrNTC blk = NodeKernel {
, getTopLevelConfig :: TopLevelConfig blk

-- | The fetch client registry, used for the block fetch clients.
, getFetchClientRegistry :: FetchClientRegistry (ConnectionId addrNTN) (Header blk) blk m
, getFetchClientRegistry :: FetchClientRegistry (ConnectionId addrNTN) (HeaderWithTime blk) blk m

-- | The fetch mode, used by diffusion.
--
Expand Down Expand Up @@ -254,8 +257,8 @@ initNodeKernel args@NodeKernelArgs { registry, cfg, tracers
, GSM.equivalent = (==) `on` (AF.headPoint . fst)
, GSM.getChainSyncStates = fmap cschState <$> readTVar varChainSyncHandles
, GSM.getCurrentSelection = do
headers <- ChainDB.getCurrentChain chainDB
extLedgerState <- ChainDB.getCurrentLedger chainDB
headers <- ChainDB.getCurrentChainWithTime chainDB
extLedgerState <- ChainDB.getCurrentLedger chainDB
return (headers, ledgerState extLedgerState)
, GSM.minCaughtUpDuration = gsmMinCaughtUpDuration
, GSM.setCaughtUpPersistentMark = \upd ->
Expand Down Expand Up @@ -309,8 +312,8 @@ initNodeKernel args@NodeKernelArgs { registry, cfg, tracers
-- 'addFetchedBlock' whenever a new block is downloaded.
void $ forkLinkedThread registry "NodeKernel.blockFetchLogic" $
blockFetchLogic
(blockFetchDecisionTracer tracers)
(blockFetchClientTracer tracers)
(contramap (map (fmap (fmap (map castPoint)))) $ blockFetchDecisionTracer tracers)
(contramap (fmap castTraceFetchClientState) $ blockFetchClientTracer tracers)
blockFetchInterface
fetchClientRegistry
blockFetchConfiguration
Expand Down Expand Up @@ -344,6 +347,45 @@ initNodeKernel args@NodeKernelArgs { registry, cfg, tracers
blockForging' <- traverse (forkBlockForging st) blockForging
go blockForging'

castTraceFetchClientState ::
forall blk. HasHeader (Header blk)
=> TraceFetchClientState (HeaderWithTime blk) -> TraceFetchClientState (Header blk)
castTraceFetchClientState = mapTraceFetchClientState hwtHeader

mapTraceFetchClientState ::
(HeaderHash h1 ~ HeaderHash h2, HasHeader h2)
=> (h1 -> h2) -> TraceFetchClientState h1 -> TraceFetchClientState h2
mapTraceFetchClientState fheader = \case
AddedFetchRequest request inflight inflightLimits status -> AddedFetchRequest (frequest request) (finflight inflight) inflightLimits (fstatus status)

AcknowledgedFetchRequest request -> AcknowledgedFetchRequest (frequest request)

SendFetchRequest headers gsv -> SendFetchRequest (AF.mapAnchoredFragment fheader headers) gsv

StartedFetchBatch range inflight inflightLimits status -> StartedFetchBatch (frange range) (finflight inflight) inflightLimits (fstatus status)
CompletedBlockFetch point inflight inflightLimits status time size -> CompletedBlockFetch (fpoint point) (finflight inflight) inflightLimits (fstatus status) time size
CompletedFetchBatch range inflight inflightLimits status -> CompletedFetchBatch (frange range) (finflight inflight) inflightLimits (fstatus status)
RejectedFetchBatch range inflight inflightLimits status -> RejectedFetchBatch (frange range) (finflight inflight) inflightLimits (fstatus status)

ClientTerminating i -> ClientTerminating i
where
frequest (BF.FetchRequest headers) = BF.FetchRequest $ map (AF.mapAnchoredFragment fheader) headers

finflight inflight = inflight { BF.peerFetchBlocksInFlight = fpoints (BF.peerFetchBlocksInFlight inflight) }

fstatus = \case
BF.PeerFetchStatusShutdown -> BF.PeerFetchStatusShutdown
BF.PeerFetchStatusStarting -> BF.PeerFetchStatusStarting
BF.PeerFetchStatusAberrant -> BF.PeerFetchStatusAberrant
BF.PeerFetchStatusBusy -> BF.PeerFetchStatusBusy
BF.PeerFetchStatusReady points idle -> BF.PeerFetchStatusReady (fpoints points) idle

fpoints = Set.mapMonotonic fpoint

frange (BF.ChainRange p1 p2) = BF.ChainRange (fpoint p1) (fpoint p2)

fpoint = castPoint

{-------------------------------------------------------------------------------
Internal node components
-------------------------------------------------------------------------------}
Expand All @@ -354,8 +396,8 @@ data InternalState m addrNTN addrNTC blk = IS {
, registry :: ResourceRegistry m
, btime :: BlockchainTime m
, chainDB :: ChainDB m blk
, blockFetchInterface :: BlockFetchConsensusInterface (ConnectionId addrNTN) (Header blk) blk m
, fetchClientRegistry :: FetchClientRegistry (ConnectionId addrNTN) (Header blk) blk m
, blockFetchInterface :: BlockFetchConsensusInterface (ConnectionId addrNTN) (HeaderWithTime blk) blk m
, fetchClientRegistry :: FetchClientRegistry (ConnectionId addrNTN) (HeaderWithTime blk) blk m
, varChainSyncHandles :: StrictTVar m (Map (ConnectionId addrNTN) (ChainSyncClientHandle m blk))
, varGsmState :: StrictTVar m GSM.GsmState
, mempool :: Mempool m blk
Expand Down Expand Up @@ -394,7 +436,7 @@ initInternalState NodeKernelArgs { tracers, chainDB, registry, cfg

fetchClientRegistry <- newFetchClientRegistry

let getCandidates :: STM m (Map (ConnectionId addrNTN) (AnchoredFragment (Header blk)))
let getCandidates :: STM m (Map (ConnectionId addrNTN) (AnchoredFragment (HeaderWithTime blk)))
getCandidates = viewChainSyncState varChainSyncHandles csCandidate

slotForgeTimeOracle <- BlockFetchClientInterface.initSlotForgeTimeOracle cfg chainDB
Expand All @@ -403,7 +445,7 @@ initInternalState NodeKernelArgs { tracers, chainDB, registry, cfg
(ChainDB.getCurrentChain chainDB)
getUseBootstrapPeers
(GSM.gsmStateToLedgerJudgement <$> readTVar varGsmState)
blockFetchInterface :: BlockFetchConsensusInterface (ConnectionId addrNTN) (Header blk) blk m
blockFetchInterface :: BlockFetchConsensusInterface (ConnectionId addrNTN) (HeaderWithTime blk) blk m
blockFetchInterface = BlockFetchClientInterface.mkBlockFetchConsensusInterface
(configBlock cfg)
(BlockFetchClientInterface.defaultChainDbView chainDB)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

module Test.ThreadNet.Util.SimpleBlock (prop_validSimpleBlock) where

import Data.Typeable
import Data.Typeable (Typeable)
import Ouroboros.Consensus.Block
import Ouroboros.Consensus.Mock.Ledger
import Ouroboros.Consensus.Util.Condense (condense)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import Ouroboros.Consensus.Config.SecurityParam
(SecurityParam (SecurityParam), maxRollbacks)
import Ouroboros.Consensus.Genesis.Governor (DensityBounds,
densityDisconnect, sharedCandidatePrefix)
import Ouroboros.Consensus.HeaderValidation (HeaderWithTime)
import Ouroboros.Consensus.MiniProtocol.ChainSync.Client
(ChainSyncClientException (DensityTooLow),
ChainSyncState (..))
Expand Down Expand Up @@ -59,13 +60,16 @@ import Test.QuickCheck
import Test.QuickCheck.Extras (unsafeMapSuchThatJust)
import Test.Tasty
import Test.Tasty.QuickCheck
import Test.Util.HeaderValidation (attachSlotTimeToFragment)
import Test.Util.Orphans.IOLike ()
import Test.Util.PartialAccessors
import Test.Util.TersePrinting (terseHFragment, terseHeader)
import Test.Util.TestBlock (TestBlock)
import Test.Util.TersePrinting (terseHFragment, terseHWTFragment,
terseHeader)
import Test.Util.TestBlock (TestBlock, singleNodeTestConfig)
import Test.Util.TestEnv (adjustQuickCheckMaxSize,
adjustQuickCheckTests)


tests :: TestTree
tests =
adjustQuickCheckTests (* 4) $
Expand All @@ -87,9 +91,9 @@ data StaticCandidates =
StaticCandidates {
k :: SecurityParam,
sgen :: GenesisWindow,
suffixes :: [(PeerId, AnchoredFragment (Header TestBlock))],
suffixes :: [(PeerId, AnchoredFragment (HeaderWithTime TestBlock))],
tips :: Map PeerId (Tip TestBlock),
loeFrag :: AnchoredFragment (Header TestBlock)
loeFrag :: AnchoredFragment (HeaderWithTime TestBlock)
}
deriving Show

Expand All @@ -112,7 +116,11 @@ staticCandidates GenesisTest {gtSecurityParam, gtGenesisWindow, gtBlockTree} =
}
where
(loeFrag, suffixes) =
sharedCandidatePrefix curChain (second toHeaders <$> candidates)
sharedCandidatePrefix
curChain
(second (attachTimeUsingTestConfig . toHeaders)
<$> candidates
)

selections = selection <$> branches

Expand All @@ -128,6 +136,15 @@ staticCandidates GenesisTest {gtSecurityParam, gtGenesisWindow, gtBlockTree} =

branches = btBranches gtBlockTree

-- | Attach a relative slot time to a fragment of headers using the
-- 'singleNodeTestConfig'. Since 'k' is not used for time conversions,
-- it is safe to use this configuration even if other 'k' values are
-- used in the tests that call this function.
attachTimeUsingTestConfig ::
AnchoredFragment (Header TestBlock) ->
AnchoredFragment (HeaderWithTime TestBlock)
attachTimeUsingTestConfig = attachSlotTimeToFragment singleNodeTestConfig

-- | Check that the GDD disconnects from some peers for each full Genesis window starting at any of a block tree's
-- intersections, and that it's not the honest peer.
prop_densityDisconnectStatic :: Property
Expand All @@ -139,7 +156,7 @@ prop_densityDisconnectStatic =
counterexample "it should not disconnect the honest peers"
(not $ any isHonestPeerId disconnect)
where
mkState :: AnchoredFragment (Header TestBlock) -> ChainSyncState TestBlock
mkState :: AnchoredFragment (HeaderWithTime TestBlock) -> ChainSyncState TestBlock
mkState frag =
ChainSyncState {
csCandidate = frag,
Expand Down Expand Up @@ -167,7 +184,7 @@ data EvolvingPeers =
k :: SecurityParam,
sgen :: GenesisWindow,
peers :: Peers EvolvingPeer,
loeFrag :: AnchoredFragment (Header TestBlock),
loeFrag :: AnchoredFragment (HeaderWithTime TestBlock),
fullTree :: BlockTree TestBlock
}
deriving Show
Expand Down Expand Up @@ -227,7 +244,7 @@ data UpdateEvent = UpdateEvent {
, bounds :: [(PeerId, DensityBounds TestBlock)]
-- | The current chains
, tree :: BlockTree (Header TestBlock)
, loeFrag :: AnchoredFragment (Header TestBlock)
, loeFrag :: AnchoredFragment (HeaderWithTime TestBlock)
, curChain :: AnchoredFragment (Header TestBlock)
}

Expand All @@ -240,7 +257,7 @@ prettyUpdateEvent UpdateEvent {target, added, killed, bounds, tree, loeFrag, cur
[
"Extended " ++ condense target ++ " with " ++ terseHeader added,
" disconnect: " ++ show killed,
" LoE frag: " ++ terseHFragment loeFrag,
" LoE frag: " ++ terseHWTFragment loeFrag,
" selection: " ++ terseHFragment curChain
]
++ prettyDensityBounds bounds
Expand Down Expand Up @@ -377,12 +394,17 @@ evolveBranches EvolvingPeers {k, sgen, peers = initialPeers, fullTree} =
states =
candidates <&> \ csCandidate ->
ChainSyncState {
csCandidate,
csCandidate = attachTimeUsingTestConfig csCandidate,
csIdling = False,
csLatestSlot = SJust (AF.headSlot csCandidate)
}
-- Run GDD.
(loeFrag, suffixes) = sharedCandidatePrefix curChain (Map.toList candidates)
(loeFrag, suffixes) =
sharedCandidatePrefix
curChain
(Map.toList $
fmap attachTimeUsingTestConfig candidates
)
(killedNow, bounds) = first Set.fromList $ densityDisconnect sgen k states suffixes loeFrag
event = UpdateEvent {
target,
Expand Down Expand Up @@ -415,7 +437,7 @@ peerInfo EvolvingPeers {k = SecurityParam k, sgen = GenesisWindow sgen, loeFrag}
[
"k: " <> show k,
"sgen: " <> show sgen,
"loeFrag: " <> terseHFragment loeFrag
"loeFrag: " <> terseHWTFragment loeFrag
]

-- | Tests that when GDD disconnects a peer, it continues to disconnect it when
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import Network.TypedProtocol.Codec (ActiveState, AnyMessage,
import Ouroboros.Consensus.Block (HasHeader)
import Ouroboros.Consensus.Block.Abstract (Header, Point (..))
import Ouroboros.Consensus.Config
import Ouroboros.Consensus.HeaderValidation (HeaderWithTime (..))
import qualified Ouroboros.Consensus.MiniProtocol.BlockFetch.ClientInterface as BlockFetchClientInterface
import Ouroboros.Consensus.Node.ProtocolInfo
(NumCoreNodes (NumCoreNodes))
Expand Down Expand Up @@ -77,8 +78,8 @@ startBlockFetchLogic ::
=> ResourceRegistry m
-> Tracer m (TraceEvent TestBlock)
-> ChainDB m TestBlock
-> FetchClientRegistry PeerId (Header TestBlock) TestBlock m
-> STM m (Map PeerId (AnchoredFragment (Header TestBlock)))
-> FetchClientRegistry PeerId (HeaderWithTime TestBlock) TestBlock m
-> STM m (Map PeerId (AnchoredFragment (HeaderWithTime TestBlock)))
-> m ()
startBlockFetchLogic registry tracer chainDb fetchClientRegistry getCandidates = do
let slotForgeTime :: BlockFetchClientInterface.SlotForgeTimeOracle m blk
Expand Down Expand Up @@ -130,10 +131,10 @@ startBlockFetchLogic registry tracer chainDb fetchClientRegistry getCandidates =
decisionTracer = TraceOther . ("BlockFetchLogic | " ++) . show >$< tracer

startKeepAliveThread ::
forall m peer blk.
forall m peer blk hdr.
(Ord peer, IOLike m)
=> ResourceRegistry m
-> FetchClientRegistry peer (Header blk) blk m
-> FetchClientRegistry peer hdr blk m
-> peer
-> m ()
startKeepAliveThread registry fetchClientRegistry peerId =
Expand All @@ -147,7 +148,7 @@ runBlockFetchClient ::
-> PeerId
-> BlockFetchTimeout
-> StateViewTracers blk m
-> FetchClientRegistry PeerId (Header blk) blk m
-> FetchClientRegistry PeerId (HeaderWithTime blk) blk m
-> ControlMessageSTM m
-> Channel m (AnyMessage (BlockFetch blk (Point blk)))
-- ^ Send and receive message via the given 'Channel'.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@ import Control.Tracer (Tracer (..), traceWith)
import Data.Functor (void)
import Data.Set (Set)
import qualified Data.Set as Set
import Data.Typeable (Typeable)
import Ouroboros.Consensus.Block
import Ouroboros.Consensus.Config (TopLevelConfig (..))
import Ouroboros.Consensus.HeaderValidation (HeaderWithTime (..))
import Ouroboros.Consensus.Storage.ChainDB.API
import qualified Ouroboros.Consensus.Storage.ChainDB.API as ChainDB
import qualified Ouroboros.Consensus.Storage.ChainDB.Impl as ChainDB
Expand Down Expand Up @@ -88,7 +90,7 @@ data LiveResources blk m = LiveResources {
, lrCdb :: NodeDBs (StrictTMVar m MockFS)

-- | The LoE fragment must be reset for each live interval.
, lrLoEVar :: LoE (StrictTVar m (AnchoredFragment (Header blk)))
, lrLoEVar :: LoE (StrictTVar m (AnchoredFragment (HeaderWithTime blk)))
}

data LiveInterval blk m = LiveInterval {
Expand Down Expand Up @@ -188,7 +190,7 @@ lifecycleStart start liResources liResult = do
-- | Shut down the node by killing all its threads after extracting the
-- persistent state used to restart the node later.
lifecycleStop ::
(IOLike m, GetHeader blk) =>
(IOLike m, GetHeader blk, Typeable blk) =>
LiveResources blk m ->
LiveNode blk m ->
m (LiveIntervalResult blk)
Expand Down
Loading

0 comments on commit df3b2a4

Please sign in to comment.