From 280320822c3805b84b27f208bd32660b9a0fdb4c Mon Sep 17 00:00:00 2001 From: "sunny.xl" Date: Wed, 8 Jan 2025 15:13:40 +0800 Subject: [PATCH] lake compaction scheduler optimize in fe restart secinarios Signed-off-by: drake_wang --- .../lake/compaction/CompactionMgr.java | 56 ++++++++++++++++++- .../lake/compaction/CompactionScheduler.java | 24 +------- .../transaction/DatabaseTransactionMgr.java | 16 ++++++ .../transaction/GlobalTransactionMgr.java | 25 +++++++++ .../transaction/VisibleStateWaiter.java | 2 +- 5 files changed, 98 insertions(+), 25 deletions(-) diff --git a/fe/fe-core/src/main/java/com/starrocks/lake/compaction/CompactionMgr.java b/fe/fe-core/src/main/java/com/starrocks/lake/compaction/CompactionMgr.java index 686cf01ba3d5d..7b81d8f3a6594 100644 --- a/fe/fe-core/src/main/java/com/starrocks/lake/compaction/CompactionMgr.java +++ b/fe/fe-core/src/main/java/com/starrocks/lake/compaction/CompactionMgr.java @@ -29,6 +29,8 @@ import com.starrocks.persist.metablock.SRMetaBlockWriter; import com.starrocks.server.GlobalStateMgr; import com.starrocks.sql.common.MetaUtils; +import com.starrocks.transaction.TransactionState; +import com.starrocks.transaction.TransactionStatus; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -56,9 +58,25 @@ public class CompactionMgr implements MemoryTrackable { private Sorter sorter; private CompactionScheduler compactionScheduler; + /** + * In order to ensure that the input rowsets of compaction still exists when doing publishing version, it is + * necessary to ensure that the compaction task of the same partition is executed serially, that is, the next + * compaction task can be executed only after the status of the previous compaction task changes to visible or + * canceled. + * + * We use `activeCompactionTransactionMap` to track all lake compaction txns that are not published on FE restart. + * The key of the map is the transaction txn state related to the compaction task, and the value is table id of the + * compaction task. It's possible that multiple keys have the same value, + * because there might be multiple compaction jobs on different partitions with the same table id. + * + * Note that, this will prevent all partitions whose tableId is maintained in the map from being compacted + */ + private final ConcurrentHashMap activeCompactionTransactionMap = new ConcurrentHashMap<>(); + public CompactionMgr() { try { init(); + rebuildActiveCompactionTransactionMapOnRestart(); } catch (Exception e) { throw new RuntimeException(e); } @@ -90,6 +108,39 @@ public void start() { } } + /** + * iterate all transactions and find those with LAKE_COMPACTION labels and are not finished yet. + **/ + protected synchronized void rebuildActiveCompactionTransactionMapOnRestart() { + List activeTxnStates = + GlobalStateMgr.getCurrentState().getGlobalTransactionMgr().getLakeCompactionActiveTxnIds(); + for (TransactionState txnState : activeTxnStates) { + // for lake compaction txn, there can only be one table id for each txn state + Long tableId = txnState.getTableIdList().get(0); + activeCompactionTransactionMap.put(txnState, tableId); + } + } + + protected synchronized void checkActiveCompactionTransactionMap() { + if (activeCompactionTransactionMap.isEmpty()) { + return; + } + for (Map.Entry entry : activeCompactionTransactionMap.entrySet()) { + TransactionState txnState = entry.getKey(); + if (txnState.getTransactionStatus() == TransactionStatus.VISIBLE) { + activeCompactionTransactionMap.remove(txnState); + } + } + } + + protected synchronized boolean checkIfTableInActiveCompactionTransactionMap(long tableId) { + if (activeCompactionTransactionMap.isEmpty()) { + return false; + } + return activeCompactionTransactionMap.containsValue(tableId); + } + + public void handleLoadingFinished(PartitionIdentifier partition, long version, long versionTime, Quantiles compactionScore) { PartitionVersion currentVersion = new PartitionVersion(version, versionTime); @@ -106,7 +157,7 @@ public void handleLoadingFinished(PartitionIdentifier partition, long version, l } } - public void handleCompactionFinished(PartitionIdentifier partition, long version, long versionTime, + public synchronized void handleCompactionFinished(PartitionIdentifier partition, long version, long versionTime, Quantiles compactionScore) { PartitionVersion compactionVersion = new PartitionVersion(version, versionTime); PartitionStatistics statistics = partitionStatisticsHashMap.compute(partition, (k, v) -> { @@ -124,11 +175,12 @@ public void handleCompactionFinished(PartitionIdentifier partition, long version } @NotNull - List choosePartitionsToCompact(@NotNull Set excludes, + synchronized List choosePartitionsToCompact(@NotNull Set excludes, @NotNull Set excludeTables) { return choosePartitionsToCompact(excludeTables) .stream() .filter(p -> !excludes.contains(p.getPartition())) + .filter(p -> checkIfTableInActiveCompactionTransactionMap(p.getPartition().getTableId())) .collect(Collectors.toList()); } diff --git a/fe/fe-core/src/main/java/com/starrocks/lake/compaction/CompactionScheduler.java b/fe/fe-core/src/main/java/com/starrocks/lake/compaction/CompactionScheduler.java index 18a9452c138ab..fd9dcfe815882 100644 --- a/fe/fe-core/src/main/java/com/starrocks/lake/compaction/CompactionScheduler.java +++ b/fe/fe-core/src/main/java/com/starrocks/lake/compaction/CompactionScheduler.java @@ -79,8 +79,6 @@ public class CompactionScheduler extends Daemon { private final GlobalStateMgr stateMgr; private final ConcurrentHashMap runningCompactions; private final SynchronizedCircularQueue history; - private boolean finishedWaiting = false; - private long waitTxnId = -1; private long lastPartitionCleanTime; private Set disabledTables; // copy-on-write @@ -105,31 +103,13 @@ protected void runOneCycle() { cleanPhysicalPartition(); // Schedule compaction tasks only when this is a leader FE and all edit logs have finished replay. - // In order to ensure that the input rowsets of compaction still exists when doing publishing version, it is - // necessary to ensure that the compaction task of the same partition is executed serially, that is, the next - // compaction task can be executed only after the status of the previous compaction task changes to visible or - // canceled. - if (stateMgr.isLeader() && stateMgr.isReady() && allCommittedCompactionsBeforeRestartHaveFinished()) { + if (stateMgr.isLeader() && stateMgr.isReady()) { + compactionManager.checkActiveCompactionTransactionMap(); schedule(); history.changeMaxSize(Config.lake_compaction_history_size); } } - // Returns true if all compaction transactions committed before this restart have finished(i.e., of VISIBLE state). - private boolean allCommittedCompactionsBeforeRestartHaveFinished() { - if (finishedWaiting) { - return true; - } - // Note: must call getMinActiveCompactionTxnId() before getNextTransactionId(), otherwise if there are - // no running transactions waitTxnId <= minActiveTxnId will always be false. - long minActiveTxnId = transactionMgr.getMinActiveCompactionTxnId(); - if (waitTxnId < 0) { - waitTxnId = transactionMgr.getTransactionIDGenerator().getNextTransactionId(); - } - finishedWaiting = waitTxnId <= minActiveTxnId; - return finishedWaiting; - } - private void schedule() { // Check whether there are completed compaction jobs. for (Iterator> iterator = runningCompactions.entrySet().iterator(); diff --git a/fe/fe-core/src/main/java/com/starrocks/transaction/DatabaseTransactionMgr.java b/fe/fe-core/src/main/java/com/starrocks/transaction/DatabaseTransactionMgr.java index fed05d1bb36ea..5790596635b41 100644 --- a/fe/fe-core/src/main/java/com/starrocks/transaction/DatabaseTransactionMgr.java +++ b/fe/fe-core/src/main/java/com/starrocks/transaction/DatabaseTransactionMgr.java @@ -776,6 +776,22 @@ public List getCommittedTxnList() { } } + public List getLakeCompactionCommittedTxnList() { + readLock(); + try { + // only send task to committed transaction + return idToRunningTransactionState.values().stream() + .filter(transactionState -> (transactionState.getTransactionStatus() == + TransactionStatus.COMMITTED)) + .filter(transactionState -> transactionState.getSourceType() == + TransactionState.LoadJobSourceType.LAKE_COMPACTION) + .sorted(Comparator.comparing(TransactionState::getCommitTime)) + .collect(Collectors.toList()); + } finally { + readUnlock(); + } + } + // Check whether there is committed txns on partitionId. public boolean hasCommittedTxnOnPartition(long tableId, long partitionId) { readLock(); diff --git a/fe/fe-core/src/main/java/com/starrocks/transaction/GlobalTransactionMgr.java b/fe/fe-core/src/main/java/com/starrocks/transaction/GlobalTransactionMgr.java index 9274436366d7f..a6331554ff335 100644 --- a/fe/fe-core/src/main/java/com/starrocks/transaction/GlobalTransactionMgr.java +++ b/fe/fe-core/src/main/java/com/starrocks/transaction/GlobalTransactionMgr.java @@ -647,6 +647,19 @@ public long getMinActiveCompactionTxnId() { return minId; } + /** + * Get the list of active txn ids of compaction transactions. + * @return the list of active txn ids of compaction transactions. + */ + public List getLakeCompactionActiveTxnIds() { + List txnStateList = new ArrayList<>(); + for (Map.Entry entry : dbIdToDatabaseTransactionMgrs.entrySet()) { + DatabaseTransactionMgr dbTransactionMgr = entry.getValue(); + txnStateList.addAll(dbTransactionMgr.getLakeCompactionCommittedTxnList()); + } + return txnStateList; + } + /** * Get the smallest transaction ID of active transactions in a database. * If there are no active transactions in the database, return the transaction ID that will be assigned to the @@ -675,6 +688,18 @@ public TransactionState getTransactionState(long dbId, long transactionId) { } } + public TransactionState getTransactionState(long transactionId) { + TransactionState transactionState = null; + for (Map.Entry entry : dbIdToDatabaseTransactionMgrs.entrySet()) { + DatabaseTransactionMgr dbTransactionMgr = entry.getValue(); + transactionState = dbTransactionMgr.getTransactionState(transactionId); + if (transactionState != null) { + return transactionState; + } + } + return transactionState; + } + // for replay idToTransactionState // check point also run transaction cleaner, the cleaner maybe concurrently modify id to public void replayUpsertTransactionState(TransactionState transactionState) { diff --git a/fe/fe-core/src/main/java/com/starrocks/transaction/VisibleStateWaiter.java b/fe/fe-core/src/main/java/com/starrocks/transaction/VisibleStateWaiter.java index 85157fff19149..419b3467e15e3 100644 --- a/fe/fe-core/src/main/java/com/starrocks/transaction/VisibleStateWaiter.java +++ b/fe/fe-core/src/main/java/com/starrocks/transaction/VisibleStateWaiter.java @@ -24,7 +24,7 @@ public final class VisibleStateWaiter { private final TransactionState txnState; - VisibleStateWaiter(@NotNull TransactionState txnState) { + public VisibleStateWaiter(@NotNull TransactionState txnState) { this.txnState = txnState; }