Skip to content

Commit

Permalink
Fixed PruneOutdatedChains
Browse files Browse the repository at this point in the history
  • Loading branch information
greymistcube committed Nov 28, 2024
1 parent 71b6280 commit f74330a
Show file tree
Hide file tree
Showing 3 changed files with 207 additions and 83 deletions.
188 changes: 188 additions & 0 deletions src/Libplanet.RocksDBStore/RocksDBStore.Prune.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,188 @@
#nullable disable
using System;
using System.Collections.Generic;
using System.Linq;
using Libplanet.Common;
using Libplanet.Store;
using Libplanet.Types.Blocks;
using RocksDbSharp;

namespace Libplanet.RocksDBStore
{
public partial class RocksDBStore : BaseStore
{
/// <inheritdoc cref="IStore.PruneOutdatedChains"/>
public override void PruneOutdatedChains(bool noopWithoutCanon = false)
{
if (!(GetCanonicalChainId() is { } ccid))
{
if (noopWithoutCanon)
{
return;
}

throw new InvalidOperationException("Canonical chain ID is not assigned.");
}

using var batch = new WriteBatch();

// Copy indexes from previous chains.
// TxNonce is not copied because it is copied during .ForkTxNonces().
long index = 0;
foreach (var hash in IterateIndexesForPrune(ccid))
{
batch.Put(
IndexKey(ccid, RocksDBStoreBitConverter.GetBytes(index)),
hash.ToByteArray());

if (batch.Count() > 10000)
{
_chainDb.Write(batch);
batch.Clear();
}

index++;
}

_chainDb.Write(batch);
batch.Clear();

batch.Delete(PreviousChainIdKey(ccid));
batch.Delete(PreviousChainIndexKey(ccid));
batch.Delete(DeletedChainKey(ccid));
_chainDb.Write(batch);
batch.Clear();

int guidLength = ccid.ToByteArray().Length;
using Iterator it = _chainDb.NewIterator();
for (it.SeekToFirst();
it.Valid();
it.Next())
{
if (it.Key().StartsWith(CanonicalChainIdIdKey))
{
continue;
}

try
{
var id = new Guid(it.Key().Skip(1).Take(guidLength).ToArray());
if (id.Equals(ccid))
{
continue;
}

batch.Delete(it.Key());
}
catch (Exception)
{
// Key is corrupted, delete.
batch.Delete(it.Key());
}

if (batch.Count() > 10000)
{
_chainDb.Write(batch);
batch.Clear();
}
}

_chainDb.Write(batch);
batch.Clear();
}

private IEnumerable<BlockHash> IterateIndexesForPrune(Guid chainId)
{
Stack<(Guid Id, long Count)> chainInfos =
new Stack<(Guid Id, long Count)>();

chainInfos.Push((chainId, CountIndex(chainId)));
while (GetPreviousChainInfo(chainInfos.Peek().Id) is { } chainInfo)
{
chainInfos.Push(chainInfo);
}

List<BlockHash> hashes = new List<BlockHash>();

while (chainInfos.Count > 0)
{
var chainInfo = chainInfos.Pop();

foreach ((BlockHash hash, int i) in
IterateIndexesInnerForPrune(chainInfo.Id).Select((hash, i) => (hash, i)))
{
if (i == 0)
{
BlockDigest digest = GetBlockDigest(hash) ??
throw new InvalidOperationException(
$"Could not find a block corresponding to {hash} in storage.");

// NOTE: This means there is a gap between two chain ids.
if (digest.Index > hashes.Count)
{
throw new InvalidOperationException(
$"Next block is expected to be of index #{hashes.Count} but " +
$"got #{digest.Index} {digest.Hash}.");
}

// NOTE: This means there is an overlap between two chain ids.
// The newer one should overwrite the old.
if (digest.Index < hashes.Count)
{
// NOTE: Make sure it can be overwritten by checking continuity.
if (digest.PreviousHash is { } previousHash)
{
BlockHash targetHash = hashes[(int)digest.Index - 1];
if (!previousHash.Equals(targetHash))
{
throw new InvalidOperationException(
$"The previous hash {previousHash} of a retrieved " +
$"block #{digest.Index} {digest.Hash} " +
$"does not match the one iterated so far {targetHash}");
}
}

// NOTE: Truncate the iterated list so far.
_logger.Debug(
"Truncating hashes iterated so far from " +
"{IteratedCount} to {TargetCount}",
hashes.Count,
digest.Index);
hashes.RemoveRange(
(int)digest.Index, (int)(hashes.Count - digest.Index));
}
}

// NOTE: We assume non-first hashes are sequential for a chain id.
hashes.Add(hash);
}

Console.WriteLine($"Number of hashes iterated so far: {hashes.Count}");
}

BlockHash lastHash = hashes.Last();
BlockDigest lastDigest = GetBlockDigest(lastHash) ??
throw new InvalidOperationException(
$"Could not find a block corresponding to {lastHash} in storage.");

if (lastDigest.Index != hashes.Count - 1)
{
throw new InvalidOperationException(
$"The last iterated block is #{lastDigest.Index} {lastDigest.Hash} when " +
$"its expected index is {hashes.Count}");
}

return hashes;
}

private IEnumerable<BlockHash> IterateIndexesInnerForPrune(Guid chainId)
{
byte[] prefix = Concat(IndexKeyPrefix, chainId.ToByteArray());
foreach (Iterator it in IterateDb(_chainDb, prefix))
{
byte[] value = it.Value();
yield return new BlockHash(value);
}
}
}
}
82 changes: 1 addition & 81 deletions src/Libplanet.RocksDBStore/RocksDBStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ namespace Libplanet.RocksDBStore
/// </list>
/// </summary>
/// <seealso cref="IStore"/>
public class RocksDBStore : BaseStore
public partial class RocksDBStore : BaseStore
{
private const string BlockDbRootPathName = "block";
private const string BlockIndexDbName = "blockindex";
Expand Down Expand Up @@ -1173,86 +1173,6 @@ public override void ForkTxNonces(Guid sourceChainId, Guid destinationChainId)
}
}

/// <inheritdoc />
public override void PruneOutdatedChains(bool noopWithoutCanon = false)
{
if (!(GetCanonicalChainId() is { } ccid))
{
if (noopWithoutCanon)
{
return;
}

throw new InvalidOperationException("Canonical chain ID is not assigned.");
}

using var batch = new WriteBatch();

// Copy indexes from previous chains.
// TxNonce is not copied because it is copied during .ForkTxNonces().
long index = 0;
foreach (var hash in IterateIndexes(ccid, 0, null))
{
batch.Put(
IndexKey(ccid, RocksDBStoreBitConverter.GetBytes(index)),
hash.ToByteArray());

if (batch.Count() > 10000)
{
_chainDb.Write(batch);
batch.Clear();
}

index++;
}

_chainDb.Write(batch);
batch.Clear();

batch.Delete(PreviousChainIdKey(ccid));
batch.Delete(PreviousChainIndexKey(ccid));
batch.Delete(DeletedChainKey(ccid));
_chainDb.Write(batch);
batch.Clear();

int guidLength = ccid.ToByteArray().Length;
using Iterator it = _chainDb.NewIterator();
for (it.SeekToFirst();
it.Valid();
it.Next())
{
if (it.Key().StartsWith(CanonicalChainIdIdKey))
{
continue;
}

try
{
var id = new Guid(it.Key().Skip(1).Take(guidLength).ToArray());
if (id.Equals(ccid))
{
continue;
}

batch.Delete(it.Key());
}
catch (Exception)
{
// Key is corrupted, delete.
batch.Delete(it.Key());
}

if (batch.Count() > 10000)
{
_chainDb.Write(batch);
batch.Clear();
}
}

_chainDb.Write(batch);
batch.Clear();
}

/// <inheritdoc />
public override BlockCommit? GetChainBlockCommit(Guid chainId)
{
Expand Down
20 changes: 18 additions & 2 deletions src/Libplanet.Store/IStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -294,9 +294,25 @@ public interface IStore : IDisposable
/// there is no canonical chain.
/// </param>
/// <exception cref="InvalidOperationException">
/// Thrown when there is no canonical chain and
/// <paramref name="noopWithoutCanon"/> is false.
/// Thrown for any of the following reasons:
/// <list type="bullet">
/// <item><description>
/// Thrown when there is no canonical chain and <paramref name="noopWithoutCanon"/>
/// is false.
/// </description></item>
/// <item><description>
/// A <see cref="BlockHash"/> for a non-existant <see cref="Block"/> is encountered

Check warning on line 304 in src/Libplanet.Store/IStore.cs

View workflow job for this annotation

GitHub Actions / typos

"existant" should be "existent".
/// while iterating.
/// </description></item>
/// <item><description>
/// If the chain in question is found to be "un-prunable" due to data corruption.
/// </description></item>
/// </list>
/// </exception>
/// <remarks>
/// If an <see cref="InvalidOperationException"/> is thrown, this results in
/// no-op for the <see cref="IStore"/>.
/// </remarks>
void PruneOutdatedChains(bool noopWithoutCanon = false);

/// <summary>
Expand Down

0 comments on commit f74330a

Please sign in to comment.