diff --git a/src/Libplanet.Net/Swarm.BlockCandidate.cs b/src/Libplanet.Net/Swarm.BlockCandidate.cs index 83bc085d4bf..92247d38efe 100644 --- a/src/Libplanet.Net/Swarm.BlockCandidate.cs +++ b/src/Libplanet.Net/Swarm.BlockCandidate.cs @@ -146,16 +146,14 @@ private BlockChain AppendPreviousBlocks( bool render, IProgress progress) { - BlockChain workspace = blockChain; + BlockChain workspace; List scope = new List(); bool forked = false; - Block oldTip = workspace.Tip; - Block newTip = candidate.Blocks.Last().Item1; + Block oldTip = blockChain.Tip; List<(Block, BlockCommit)> blocks = candidate.Blocks.ToList(); Block branchpoint = FindBranchpoint( oldTip, - newTip, blocks.Select(pair => pair.Item1).ToList()); if (branchpoint.Equals(oldTip)) @@ -163,8 +161,9 @@ private BlockChain AppendPreviousBlocks( _logger.Debug( "No need to fork. at {MethodName}()", nameof(AppendPreviousBlocks)); + workspace = blockChain; } - else if (!workspace.ContainsBlock(branchpoint.Hash)) + else if (!blockChain.ContainsBlock(branchpoint.Hash)) { // FIXME: This behavior can unexpectedly terminate the swarm (and the game // app) if it encounters a peer having a different blockchain, and therefore @@ -179,7 +178,7 @@ private BlockChain AppendPreviousBlocks( throw new InvalidGenesisBlockException( msg, branchpoint.Hash, - workspace.Genesis.Hash); + blockChain.Genesis.Hash); } else { @@ -187,7 +186,7 @@ private BlockChain AppendPreviousBlocks( "Trying to fork... at {MethodName}()", nameof(AppendPreviousBlocks) ); - workspace = workspace.Fork(branchpoint.Hash); + workspace = blockChain.Fork(branchpoint.Hash); forked = true; scope.Add(workspace.Id); _logger.Debug( @@ -274,8 +273,9 @@ private BlockChain AppendPreviousBlocks( return workspace; } - private Block FindBranchpoint(Block oldTip, Block newTip, List newBlocks) + private Block FindBranchpoint(Block oldTip, List newBlocks) { + var newTip = newBlocks.Last(); while (oldTip.Index > newTip.Index && oldTip.PreviousHash is { } aPrev) { @@ -440,14 +440,13 @@ private async Task BlockCandidateDownload( BlockLocator locator = blockChain.GetBlockLocator(); Block tip = blockChain.Tip; - IAsyncEnumerable> hashesAsync = GetBlockHashes( + List<(long, BlockHash)> hashes = await GetBlockHashes( peer: peer, locator: locator, stop: stop.Hash, timeout: null, logSessionIds: (logSessionId, subSessionId), cancellationToken: cancellationToken); - IEnumerable> hashes = await hashesAsync.ToArrayAsync(); if (!hashes.Any()) { diff --git a/src/Libplanet.Net/Swarm.BlockSync.cs b/src/Libplanet.Net/Swarm.BlockSync.cs index 554f81f1f3c..9e1e8dcdcab 100644 --- a/src/Libplanet.Net/Swarm.BlockSync.cs +++ b/src/Libplanet.Net/Swarm.BlockSync.cs @@ -92,14 +92,13 @@ private async Task PullBlocksAsync( completionPredicate: BlockChain.ContainsBlock, window: InitialBlockDownloadWindow ); - var demandBlockHashes = GetDemandBlockHashes( + var demandBlockHashes = await GetDemandBlockHashes( BlockChain, peersWithBlockExcerpt, chunkSize, progress, - cancellationToken - ).WithCancellation(cancellationToken); - await foreach ((long index, BlockHash hash) in demandBlockHashes) + cancellationToken); + foreach ((long index, BlockHash hash) in demandBlockHashes) { cancellationToken.ThrowIfCancellationRequested(); diff --git a/src/Libplanet.Net/Swarm.cs b/src/Libplanet.Net/Swarm.cs index 71cefc7001d..a6e68a6a08e 100644 --- a/src/Libplanet.Net/Swarm.cs +++ b/src/Libplanet.Net/Swarm.cs @@ -737,14 +737,13 @@ public Task AddPeersAsync( } // FIXME: This would be better if it's merged with GetDemandBlockHashes - internal async IAsyncEnumerable> GetBlockHashes( + internal async Task> GetBlockHashes( BoundPeer peer, BlockLocator locator, BlockHash? stop, TimeSpan? timeout = null, (int, int)? logSessionIds = null, - [EnumeratorCancellation] CancellationToken cancellationToken = default - ) + CancellationToken cancellationToken = default) { var sessionRandom = new System.Random(); int logSessionId = logSessionIds is(int i, _) ? i : sessionRandom.Next(); @@ -762,7 +761,7 @@ internal async IAsyncEnumerable> GetBlockHashes( sendMsg, logSessionId, subSessionId, - nameof(Messages.GetBlockHashesMsg), + nameof(GetBlockHashesMsg), locator.FirstOrDefault(), stop); @@ -773,46 +772,46 @@ internal async IAsyncEnumerable> GetBlockHashes( peer, request, timeout: transportTimeout, - cancellationToken: cancellationToken - ).ConfigureAwait(false); + cancellationToken: cancellationToken).ConfigureAwait(false); } catch (CommunicationFailException e) when (e.InnerException is TimeoutException) { - yield break; + return new List<(long, BlockHash)>(); } if (parsedMessage.Content is BlockHashesMsg blockHashes) { if (blockHashes.StartIndex is long idx) { - BlockHash[] hashes = blockHashes.Hashes.ToArray(); + List<(long, BlockHash)> hashes = blockHashes.Hashes + .Select((hash, i) => (idx + i, hash)) + .ToList(); const string msg = "{SessionId}/{SubSessionId}: Received a " + - nameof(Messages.BlockHashesMsg) + + nameof(BlockHashesMsg) + " message with an offset index {OffsetIndex} (total {Length} hashes)"; - _logger.Debug(msg, logSessionId, subSessionId, idx, hashes.LongLength); - foreach (BlockHash hash in hashes) - { - yield return new Tuple(idx, hash); - idx++; - } + _logger.Debug(msg, logSessionId, subSessionId, idx, hashes.LongCount()); + return hashes; } else { const string msg = "{SessionId}/{SubSessionId}: Received a " + - nameof(Messages.BlockHashesMsg) + + nameof(BlockHashesMsg) + " message, but it has zero hashes"; _logger.Debug(msg, logSessionId, subSessionId); + return new List<(long, BlockHash)>(); } - - yield break; } - - string errorMessage = - $"The response of {nameof(GetBlockHashes)} is expected to be " + - $"{nameof(BlockHashesMsg)}, not {parsedMessage.GetType().Name}: {parsedMessage}"; - throw new InvalidMessageContentException(errorMessage, parsedMessage.Content); + else + { + _logger.Debug( + "A response for " + nameof(GetBlockHashesMsg) + " is expected to be " + + "{ExpectedType}: {ReceivedType}", + nameof(BlockHashesMsg), + parsedMessage.GetType()); + return new List<(long, BlockHash)>(); + } } internal async IAsyncEnumerable<(Block, BlockCommit)> GetBlocksAsync( @@ -828,7 +827,7 @@ [EnumeratorCancellation] CancellationToken cancellationToken peer); var request = new GetBlocksMsg(blockHashesAsArray); - int hashCount = blockHashesAsArray.Count(); + int hashCount = blockHashesAsArray.Length; if (hashCount < 1) { @@ -980,8 +979,8 @@ internal async IAsyncEnumerable GetTxsAsync( /// whether to make a query in the first place. /// /// - /// Returned list of tuples are simply concatenation of query results from different - /// s with possible duplicates. + /// Returned list of tuples is simply the first successful query result from a + /// among . /// /// /// This implicitly assumes returned is properly @@ -991,22 +990,16 @@ internal async IAsyncEnumerable GetTxsAsync( /// to download. /// /// - internal async IAsyncEnumerable<(long, BlockHash)> GetDemandBlockHashes( + internal async Task> GetDemandBlockHashes( BlockChain blockChain, IList<(BoundPeer, IBlockExcerpt)> peersWithExcerpts, int chunkSize = int.MaxValue, IProgress progress = null, - [EnumeratorCancellation] CancellationToken cancellationToken = default - ) + CancellationToken cancellationToken = default) { - BlockLocator locator = blockChain.GetBlockLocator(); var exceptions = new List(); foreach ((BoundPeer peer, IBlockExcerpt excerpt) in peersWithExcerpts) { - long peerIndex = excerpt.Index; - long branchingIndex = -1; - BlockHash branchingBlock = default; - if (!IsBlockNeeded(excerpt)) { _logger.Verbose( @@ -1015,147 +1008,172 @@ internal async IAsyncEnumerable GetTxsAsync( continue; } - int totalBlockHashesToDownload = -1; - int chunkBlockHashesToDownload = -1; - var pairsToYield = new List>(); - Exception error = null; try { - var downloaded = new List(); - int previousDownloadedCount = -1; - int stagnant = 0; - const int stagnationLimit = 3; - do + List<(long, BlockHash)> downloadedHashes = await GetDemandBlockHashesFromPeer( + blockChain, + peer, + excerpt, + chunkSize, + progress, + cancellationToken); + return downloadedHashes; + } + catch (Exception e) + { + const string message = + "Failed to fetch demand block hashes from {Peer}; " + + "retry with another peer..."; + _logger.Debug(e, message, peer); + exceptions.Add(e); + continue; + } + } + + BoundPeer[] peers = peersWithExcerpts.Select(p => p.Item1).ToArray(); + _logger.Warning( + "Failed to fetch demand block hashes from peers: {Peers}", + peers); + throw new AggregateException( + "Failed to fetch demand block hashes from peers: " + + string.Join(", ", peers.Select(p => p.ToString())), + exceptions); + } + + internal async Task> GetDemandBlockHashesFromPeer( + BlockChain blockChain, + BoundPeer peer, + IBlockExcerpt excerpt, + int chunkSize = int.MaxValue, + IProgress progress = null, + CancellationToken cancellationToken = default) + { + BlockLocator locator = blockChain.GetBlockLocator(); + long peerIndex = excerpt.Index; + long branchingIndex = -1; + BlockHash branchingBlock = default; + + int totalBlockHashesToDownload = -1; + int chunkBlockHashesToDownload = -1; + var pairsToYield = new List<(long, BlockHash)>(); + + try + { + var downloaded = new List(); + int previousDownloadedCount = -1; + int stagnant = 0; + const int stagnationLimit = 3; + + do + { + if (previousDownloadedCount == downloaded.Count && + ++stagnant > stagnationLimit) + { + const string stagnationMessage = + "Stagnation limit of {StagnationLimit} reached while " + + "fetching hashes from {Peer}. Continuing operation with " + + "{DownloadCount} hashes downloaded so far"; + _logger.Information( + stagnationMessage, + stagnationLimit, + peer, + downloaded.Count); + break; + } + + previousDownloadedCount = downloaded.Count; + + // FIXME: First value of totalBlocksToDownload is -1. + _logger.Verbose( + "Request block hashes to {Peer} (height: {PeerHeight}) using " + + "locator [{LocatorHead}, ...] ({CurrentIndex}/{EstimatedTotalCount})", + peer, + peerIndex, + locator.FirstOrDefault(), + downloaded.Count, + totalBlockHashesToDownload + ); + + List<(long, BlockHash)> blockHashes = await GetBlockHashes( + peer: peer, + locator: locator, + stop: null, + timeout: null, + logSessionIds: null, + cancellationToken: cancellationToken); + + // NOTE: Runs only on the first iteration when + // downloading block hashes was successful. + if (branchingIndex == -1 && + blockHashes.Any()) { - if (previousDownloadedCount == downloaded.Count && - ++stagnant > stagnationLimit) + branchingIndex = blockHashes.First().Item1; + branchingBlock = blockHashes.First().Item2; + try + { + totalBlockHashesToDownload = Convert.ToInt32( + peerIndex - branchingIndex); + } + catch (OverflowException) { - const string stagnationMessage = - "Stagnation limit of {StagnationLimit} reached while " + - "fetching hashes from {Peer}. Continuing operation with " + - "{DownloadCount} hashes downloaded so far"; - _logger.Information( - stagnationMessage, - stagnationLimit, - peer, - downloaded.Count); - break; + totalBlockHashesToDownload = int.MaxValue; } - previousDownloadedCount = downloaded.Count; + chunkBlockHashesToDownload = Math.Min( + totalBlockHashesToDownload, chunkSize); + } - // FIXME: First value of totalBlocksToDownload is -1. + foreach (var pair in blockHashes) + { + long dlIndex = pair.Item1; + BlockHash dlHash = pair.Item2; _logger.Verbose( - "Request block hashes to {Peer} (height: {PeerHeight}) using " + - "locator [{LocatorHead}, ...] ({CurrentIndex}/{EstimatedTotalCount})", + "Received a block hash from {Peer}: #{BlockIndex} {BlockHash}", peer, - peerIndex, - locator.FirstOrDefault(), - downloaded.Count, - totalBlockHashesToDownload - ); + dlIndex, + dlHash); - List> blockHashes = - await GetBlockHashes( - peer: peer, - locator: locator, - stop: null, - timeout: null, - logSessionIds: null, - cancellationToken: cancellationToken) - .ToListAsync(cancellationToken); - - if (branchingIndex == -1 && - blockHashes.FirstOrDefault() is { } t) + // FIXME: Probably should check if dlIndex and dlHash is + // a valid hash of possible branching block by checking whether + // it is already stored locally. + if (downloaded.Contains(dlHash) || dlHash.Equals(branchingBlock)) { - t.Deconstruct(out branchingIndex, out branchingBlock); - try - { - totalBlockHashesToDownload = Convert.ToInt32( - peerIndex - branchingIndex); - } - catch (OverflowException) - { - totalBlockHashesToDownload = int.MaxValue; - } - - chunkBlockHashesToDownload = Math.Min( - totalBlockHashesToDownload, chunkSize); + continue; } - foreach (Tuple pair in blockHashes) - { - pair.Deconstruct(out long dlIndex, out BlockHash dlHash); - _logger.Verbose( - "Received a block hash from {Peer}: #{BlockIndex} {BlockHash}", - peer, - dlIndex, - dlHash - ); - - // FIXME: Probably should check if dlIndex and dlHash is - // a valid hash of possible branching block by checking whether - // it is already stored locally. - if (downloaded.Contains(dlHash) || dlHash.Equals(branchingBlock)) + downloaded.Add(dlHash); + + // As C# disallows to yield return inside try-catch block, + // we need to work around the limitation by having this buffer. + pairsToYield.Add(pair); + progress?.Report( + new BlockHashDownloadState { - continue; + EstimatedTotalBlockHashCount = Math.Max( + totalBlockHashesToDownload, + downloaded.Count), + ReceivedBlockHashCount = downloaded.Count, + SourcePeer = peer, } - - downloaded.Add(dlHash); - - // As C# disallows to yield return inside try-catch block, - // we need to work around the limitation by having this buffer. - pairsToYield.Add(pair); - progress?.Report( - new BlockHashDownloadState - { - EstimatedTotalBlockHashCount = Math.Max( - totalBlockHashesToDownload, - downloaded.Count), - ReceivedBlockHashCount = downloaded.Count, - SourcePeer = peer, - } - ); - } - - locator = downloaded.Count > 0 - ? BlockLocator.Create(tipHash: downloaded.Last()) - : locator; + ); } - while (downloaded.Count < chunkBlockHashesToDownload); - } - catch (Exception e) - { - error = e; - } - foreach (Tuple pair in pairsToYield) - { - yield return pair.ToValueTuple(); - } - - if (error is null) - { - yield break; - } - else - { - const string message = - "Failed to fetch demand block hashes from {Peer}; " + - "retry with another peer..."; - _logger.Debug(error, message, peer); - exceptions.Add(error); + locator = downloaded.Count > 0 + ? BlockLocator.Create(downloaded.Last()) + : locator; } + while (downloaded.Count < chunkBlockHashesToDownload); + } + catch (Exception e) + { + _logger.Error( + e, + "Failed to fetch demand block hashes from {Peer}", + peer); + throw new Exception("Failed"); } - BoundPeer[] peers = peersWithExcerpts.Select(p => p.Item1).ToArray(); - _logger.Warning( - "Failed to fetch demand block hashes from peers: {Peers}", - peers); - throw new AggregateException( - "Failed to fetch demand block hashes from peers: " + - string.Join(", ", peers.Select(p => p.ToString())), - exceptions); + return pairsToYield; } private void BroadcastBlock(Address? except, Block block) diff --git a/test/Libplanet.Net.Tests/SwarmTest.Preload.cs b/test/Libplanet.Net.Tests/SwarmTest.Preload.cs index aee8c9b96f1..9f7e95b650b 100644 --- a/test/Libplanet.Net.Tests/SwarmTest.Preload.cs +++ b/test/Libplanet.Net.Tests/SwarmTest.Preload.cs @@ -812,13 +812,13 @@ public async Task GetDemandBlockHashes() (minerSwarm.AsPeer, minerChain.Tip.Header), }; - (long, BlockHash)[] demands = await receiverSwarm.GetDemandBlockHashes( + List<(long, BlockHash)> demands = await receiverSwarm.GetDemandBlockHashes( receiverChain, peersWithExcerpt, chunkSize: int.MaxValue, progress: null, cancellationToken: CancellationToken.None - ).ToArrayAsync(); + ); IEnumerable<(long, BlockHash)> expectedBlocks = minerChain.IterateBlocks() .Where(b => b.Index >= receiverChain.Count) @@ -918,7 +918,7 @@ public async Task GetDemandBlockHashesDuringReorg() }; long receivedCount = 0; - (long, BlockHash)[] demands = await receiverSwarm.GetDemandBlockHashes( + List<(long, BlockHash)> demands = await receiverSwarm.GetDemandBlockHashes( receiverChain, peersWithBlockExcerpt, chunkSize: int.MaxValue, @@ -934,10 +934,9 @@ public async Task GetDemandBlockHashesDuringReorg() } } }), - cancellationToken: CancellationToken.None - ).ToArrayAsync(); + cancellationToken: CancellationToken.None); - Assert.Equal(receivedCount, demands.LongLength); + Assert.Equal(receivedCount, demands.LongCount()); CleaningSwarm(minerSwarm); CleaningSwarm(receiverSwarm); diff --git a/test/Libplanet.Net.Tests/SwarmTest.cs b/test/Libplanet.Net.Tests/SwarmTest.cs index 058a4b38431..e0154a15b4d 100644 --- a/test/Libplanet.Net.Tests/SwarmTest.cs +++ b/test/Libplanet.Net.Tests/SwarmTest.cs @@ -525,13 +525,10 @@ public async Task GetBlocks() await swarmA.AddPeersAsync(new[] { swarmB.AsPeer }, null); - (long, BlockHash)[] inventories1 = ( - await swarmB.GetBlockHashes( - swarmA.AsPeer, - new BlockLocator(new[] { genesis.Hash }), - null - ).ToArrayAsync() - ).Select(p => p.ToValueTuple()).ToArray(); + List<(long, BlockHash)> inventories1 = await swarmB.GetBlockHashes( + swarmA.AsPeer, + new BlockLocator(new[] { genesis.Hash }), + null); Assert.Equal( new[] { @@ -541,13 +538,10 @@ await swarmB.GetBlockHashes( }, inventories1); - (long, BlockHash)[] inventories2 = ( - await swarmB.GetBlockHashes( - swarmA.AsPeer, - new BlockLocator(new[] { genesis.Hash }), - block1.Hash - ).ToArrayAsync() - ).Select(p => p.ToValueTuple()).ToArray(); + List<(long, BlockHash)> inventories2 = await swarmB.GetBlockHashes( + swarmA.AsPeer, + new BlockLocator(new[] { genesis.Hash }), + block1.Hash); Assert.Equal( new[] { (genesis.Index, genesis.Hash), (block1.Index, block1.Hash) }, inventories2); @@ -598,11 +592,10 @@ public async Task GetMultipleBlocksAtOnce() await swarmB.AddPeersAsync(new[] { peer }, null); - Tuple[] hashes = await swarmB.GetBlockHashes( + List<(long, BlockHash)> hashes = await swarmB.GetBlockHashes( peer, new BlockLocator(new[] { genesis.Hash }), - null - ).ToArrayAsync(); + null); ITransport transport = swarmB.Transport;