Skip to content

Commit

Permalink
Optimized IterateIndexes() for pruned RocksDBStore
Browse files Browse the repository at this point in the history
  • Loading branch information
greymistcube committed Nov 29, 2024
1 parent 883dc22 commit f5362fe
Show file tree
Hide file tree
Showing 2 changed files with 86 additions and 17 deletions.
4 changes: 2 additions & 2 deletions src/Libplanet.RocksDBStore/RocksDBStore.Fork.cs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ BlockHash branchpoint
}

using var batch = new WriteBatch();
foreach (Iterator k in IterateDb(_chainDb, IndexKey(destinationChainId)))
foreach (Iterator k in IterateDbUnpruned(_chainDb, IndexKey(destinationChainId)))
{
batch.Delete(k.Key());
}
Expand Down Expand Up @@ -81,7 +81,7 @@ public void ForkTxNonces(Guid sourceChainId, Guid destinationChainId)
try
{
byte[] prefix = TxNonceKey(sourceChainId);
foreach (Iterator it in IterateDb(_chainDb, prefix))
foreach (Iterator it in IterateDbUnpruned(_chainDb, prefix))
{
exist = true;
Address address = new Address(it.Key().Skip(prefix.Length).ToArray());
Expand Down
99 changes: 84 additions & 15 deletions src/Libplanet.RocksDBStore/RocksDBStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,7 @@ public partial class RocksDBStore : BaseStore
private readonly ReaderWriterLockSlim _rwBlockCommitLock;
private readonly ReaderWriterLockSlim _rwNextStateRootHashLock;
private readonly ReaderWriterLockSlim _rwEvidenceLock;
private bool _pruned = false;
private bool _disposed = false;
private object _chainForkDeleteLock = new object();
private LruCache<Guid, LruCache<(int, int?), List<BlockHash>>> _indexCache;
Expand Down Expand Up @@ -283,6 +284,8 @@ public RocksDBStore(
new ReaderWriterLockSlim(LockRecursionPolicy.SupportsRecursion);
_rwEvidenceLock = new ReaderWriterLockSlim(LockRecursionPolicy.SupportsRecursion);

_pruned = ListChainIds().Count() <= 1;

_blockDbCache = new LruCache<string, RocksDb>(dbConnectionCacheSize);
_blockDbCache.SetPreRemoveDataMethod(db =>
{
Expand Down Expand Up @@ -310,7 +313,7 @@ public static bool MigrateChainDBFromColumnFamilies(string path)
}

RocksDb db = RocksDb.Open(opt, path, cfs);
if (cfs.Count() == 1 && IterateDb(db, ChainIdKeyPrefix).Any())
if (cfs.Count() == 1 && IterateDbUnpruned(db, ChainIdKeyPrefix).Any())
{
// Already migrated.
db.Dispose();
Expand Down Expand Up @@ -419,7 +422,7 @@ void CopyIndexes(ColumnFamilyHandle cfh, long? limit)
/// <inheritdoc/>
public override IEnumerable<Guid> ListChainIds()
{
foreach (var it in IterateDb(_chainDb, ChainIdKeyPrefix))
foreach (var it in IterateDbUnpruned(_chainDb, ChainIdKeyPrefix))
{
var guid = new Guid(it.Value());
if (IsDeletionMarked(guid) && HasFork(guid))
Expand All @@ -444,7 +447,7 @@ public override void DeleteChainId(Guid chainId)
// FIXME: We should remove this code after adjusting .ForkTxNonces().
using var batch = new WriteBatch();
byte[] prefix = TxNonceKey(chainId);
foreach (Iterator k in IterateDb(_chainDb, prefix))
foreach (Iterator k in IterateDbUnpruned(_chainDb, prefix))
{
batch.Delete(k.Key());
}
Expand All @@ -460,12 +463,12 @@ public override void DeleteChainId(Guid chainId)
try
{
using var batch = new WriteBatch();
foreach (Iterator it in IterateDb(_chainDb, IndexKey(chainId)))
foreach (Iterator it in IterateDbUnpruned(_chainDb, IndexKey(chainId)))
{
batch.Delete(it.Key());
}

foreach (Iterator it in IterateDb(_chainDb, TxNonceKey(chainId)))
foreach (Iterator it in IterateDbUnpruned(_chainDb, TxNonceKey(chainId)))
{
batch.Delete(it.Key());
}
Expand Down Expand Up @@ -767,7 +770,7 @@ public override IEnumerable<BlockHash> IterateBlockHashes()
{
byte[] prefix = BlockKeyPrefix;

foreach (Iterator it in IterateDb(_blockIndexDb, prefix))
foreach (Iterator it in IterateDbUnpruned(_blockIndexDb, prefix))
{
byte[] key = it.Key();
byte[] hashBytes = key.Skip(prefix.Length).ToArray();
Expand Down Expand Up @@ -964,7 +967,7 @@ public override void DeleteTxIdBlockHashIndex(TxId txId, BlockHash blockHash)
public override IEnumerable<BlockHash> IterateTxIdBlockHashIndex(TxId txId)
{
var prefix = TxIdBlockHashIndexTxIdKey(txId);
foreach (var it in IterateDb(_txIdBlockHashIndexDb, prefix))
foreach (var it in IterateDbUnpruned(_txIdBlockHashIndexDb, prefix))
{
yield return new BlockHash(it.Value());
}
Expand Down Expand Up @@ -993,7 +996,7 @@ public override void PutTxExecution(TxExecution txExecution) =>
public override IEnumerable<KeyValuePair<Address, long>> ListTxNonces(Guid chainId)
{
byte[] prefix = TxNonceKey(chainId);
foreach (Iterator it in IterateDb(_chainDb, prefix))
foreach (Iterator it in IterateDbUnpruned(_chainDb, prefix))
{
byte[] addressBytes = it.Key()
.Skip(prefix.Length)
Expand Down Expand Up @@ -1290,7 +1293,8 @@ public override IEnumerable<BlockHash> GetBlockCommitHashes()
{
try
{
IEnumerable<Iterator> iterators = IterateDb(_blockCommitDb, Array.Empty<byte>());
IEnumerable<Iterator> iterators = IterateDbUnpruned(
_blockCommitDb, Array.Empty<byte>());

// FIXME: Somehow key value comes with 0x76 prefix at the first index of
// byte array.
Expand Down Expand Up @@ -1390,7 +1394,7 @@ public override void DeleteNextStateRootHash(BlockHash blockHash)
/// <inheritdoc/>
public override IEnumerable<EvidenceId> IteratePendingEvidenceIds()
{
foreach (Iterator it in IterateDb(_pendingEvidenceDb, PendingEvidenceKeyPrefix))
foreach (Iterator it in IterateDbUnpruned(_pendingEvidenceDb, PendingEvidenceKeyPrefix))
{
byte[] key = it.Key();
byte[] idBytes = key.Skip(PendingEvidenceKeyPrefix.Length).ToArray();
Expand Down Expand Up @@ -1750,7 +1754,21 @@ private static byte[] PendingEvidenceKey(in EvidenceId evidenceId) =>
private static byte[] CommittedEvidenceKey(in EvidenceId evidenceId) =>
CommittedEvidenceKeyPrefix.Concat(evidenceId.ByteArray).ToArray();

private static IEnumerable<Iterator> IterateDb(RocksDb db, byte[] prefix)
private static IEnumerable<Iterator> IterateDbPruned(
RocksDb db,
byte[] start,
byte[] prefix)
{
using Iterator it = db.NewIterator();
for (it.Seek(start); it.Valid() && it.Key().StartsWith(prefix); it.Next())
{
yield return it;
}
}

private static IEnumerable<Iterator> IterateDbUnpruned(
RocksDb db,
byte[] prefix)
{
using Iterator it = db.NewIterator();
for (it.Seek(prefix); it.Valid() && it.Key().StartsWith(prefix); it.Next())
Expand Down Expand Up @@ -1807,6 +1825,38 @@ private void LogUnexpectedException(string methodName, Exception e)
}

private IEnumerable<BlockHash> IterateIndexes(
Guid chainId,
long offset,
long? limit,
bool includeDeleted) => _pruned
? IterateIndexesPruned(chainId, offset, limit, includeDeleted)
: IterateIndexesUnpruned(chainId, offset, limit, includeDeleted);

private IEnumerable<BlockHash> IterateIndexesPruned(
Guid chainId,
long offset,
long? limit,
bool includeDeleted)
{
if (!includeDeleted && IsDeletionMarked(chainId))
{
yield break;
}

long count = 0;
foreach (BlockHash hash in IterateIndexesInnerPruned(chainId, offset))
{
if (count >= limit)
{
yield break;
}

yield return hash;
count += 1;
}
}

private IEnumerable<BlockHash> IterateIndexesUnpruned(
Guid chainId,
long offset,
long? limit,
Expand Down Expand Up @@ -1853,7 +1903,7 @@ bool includeDeleted
long expectedCount = chainTipIndex - previousChainTipIndex +
(GetPreviousChainInfo(cid) is null ? 1 : 0);

foreach (BlockHash hash in IterateIndexesInner(cid, expectedCount))
foreach (BlockHash hash in IterateIndexesInnerUnpruned(cid, expectedCount))
{
if (offset > 0)
{
Expand All @@ -1874,11 +1924,30 @@ bool includeDeleted
}
}

private IEnumerable<BlockHash> IterateIndexesInner(Guid chainId, long expectedCount)
private IEnumerable<BlockHash> IterateIndexesInnerPruned(
Guid chainId,
long offset)
{
byte[] start = Concat(
IndexKeyPrefix,
chainId.ToByteArray(),
RocksDBStoreBitConverter.GetBytes(offset));
byte[] prefix = Concat(IndexKeyPrefix, chainId.ToByteArray());

long count = 0;
foreach (Iterator it in IterateDbPruned(_chainDb, start, prefix))
{
byte[] value = it.Value();
yield return new BlockHash(value);
count += 1;
}
}

private IEnumerable<BlockHash> IterateIndexesInnerUnpruned(Guid chainId, long expectedCount)
{
long count = 0;
byte[] prefix = Concat(IndexKeyPrefix, chainId.ToByteArray());
foreach (Iterator it in IterateDb(_chainDb, prefix))
foreach (Iterator it in IterateDbUnpruned(_chainDb, prefix))
{
if (count >= expectedCount)
{
Expand All @@ -1904,7 +1973,7 @@ private void RemoveFork(Guid chainId, Guid forkedChainId)
private bool HasFork(Guid chainId)
{
byte[] prefix = Concat(ForkedChainsKeyPrefix, chainId.ToByteArray());
return IterateDb(_chainDb, prefix).Any();
return IterateDbUnpruned(_chainDb, prefix).Any();
}

private bool IsDeletionMarked(Guid chainId)
Expand Down

0 comments on commit f5362fe

Please sign in to comment.