Skip to content

Commit

Permalink
lake compaction scheduler optimize in fe restart secinarios
Browse files Browse the repository at this point in the history
Signed-off-by: drake_wang <[email protected]>
  • Loading branch information
wxl24life committed Jan 12, 2025
1 parent 2491673 commit 2803208
Show file tree
Hide file tree
Showing 5 changed files with 98 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
import com.starrocks.persist.metablock.SRMetaBlockWriter;
import com.starrocks.server.GlobalStateMgr;
import com.starrocks.sql.common.MetaUtils;
import com.starrocks.transaction.TransactionState;
import com.starrocks.transaction.TransactionStatus;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

Expand Down Expand Up @@ -56,9 +58,25 @@ public class CompactionMgr implements MemoryTrackable {
private Sorter sorter;
private CompactionScheduler compactionScheduler;

/**
* In order to ensure that the input rowsets of compaction still exists when doing publishing version, it is
* necessary to ensure that the compaction task of the same partition is executed serially, that is, the next
* compaction task can be executed only after the status of the previous compaction task changes to visible or
* canceled.
*
* We use `activeCompactionTransactionMap` to track all lake compaction txns that are not published on FE restart.
* The key of the map is the transaction txn state related to the compaction task, and the value is table id of the
* compaction task. It's possible that multiple keys have the same value,
* because there might be multiple compaction jobs on different partitions with the same table id.
*
* Note that, this will prevent all partitions whose tableId is maintained in the map from being compacted
*/
private final ConcurrentHashMap<TransactionState, Long> activeCompactionTransactionMap = new ConcurrentHashMap<>();

public CompactionMgr() {
try {
init();
rebuildActiveCompactionTransactionMapOnRestart();
} catch (Exception e) {
throw new RuntimeException(e);
}
Expand Down Expand Up @@ -90,6 +108,39 @@ public void start() {
}
}

/**
* iterate all transactions and find those with LAKE_COMPACTION labels and are not finished yet.
**/
protected synchronized void rebuildActiveCompactionTransactionMapOnRestart() {
List<TransactionState> activeTxnStates =
GlobalStateMgr.getCurrentState().getGlobalTransactionMgr().getLakeCompactionActiveTxnIds();
for (TransactionState txnState : activeTxnStates) {
// for lake compaction txn, there can only be one table id for each txn state
Long tableId = txnState.getTableIdList().get(0);
activeCompactionTransactionMap.put(txnState, tableId);
}
}

protected synchronized void checkActiveCompactionTransactionMap() {
if (activeCompactionTransactionMap.isEmpty()) {
return;
}
for (Map.Entry<TransactionState, Long> entry : activeCompactionTransactionMap.entrySet()) {
TransactionState txnState = entry.getKey();
if (txnState.getTransactionStatus() == TransactionStatus.VISIBLE) {
activeCompactionTransactionMap.remove(txnState);
}
}
}

protected synchronized boolean checkIfTableInActiveCompactionTransactionMap(long tableId) {
if (activeCompactionTransactionMap.isEmpty()) {
return false;
}
return activeCompactionTransactionMap.containsValue(tableId);
}


public void handleLoadingFinished(PartitionIdentifier partition, long version, long versionTime,
Quantiles compactionScore) {
PartitionVersion currentVersion = new PartitionVersion(version, versionTime);
Expand All @@ -106,7 +157,7 @@ public void handleLoadingFinished(PartitionIdentifier partition, long version, l
}
}

public void handleCompactionFinished(PartitionIdentifier partition, long version, long versionTime,
public synchronized void handleCompactionFinished(PartitionIdentifier partition, long version, long versionTime,
Quantiles compactionScore) {
PartitionVersion compactionVersion = new PartitionVersion(version, versionTime);
PartitionStatistics statistics = partitionStatisticsHashMap.compute(partition, (k, v) -> {
Expand All @@ -124,11 +175,12 @@ public void handleCompactionFinished(PartitionIdentifier partition, long version
}

@NotNull
List<PartitionStatisticsSnapshot> choosePartitionsToCompact(@NotNull Set<PartitionIdentifier> excludes,
synchronized List<PartitionStatisticsSnapshot> choosePartitionsToCompact(@NotNull Set<PartitionIdentifier> excludes,
@NotNull Set<Long> excludeTables) {
return choosePartitionsToCompact(excludeTables)
.stream()
.filter(p -> !excludes.contains(p.getPartition()))
.filter(p -> checkIfTableInActiveCompactionTransactionMap(p.getPartition().getTableId()))
.collect(Collectors.toList());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,6 @@ public class CompactionScheduler extends Daemon {
private final GlobalStateMgr stateMgr;
private final ConcurrentHashMap<PartitionIdentifier, CompactionJob> runningCompactions;
private final SynchronizedCircularQueue<CompactionRecord> history;
private boolean finishedWaiting = false;
private long waitTxnId = -1;
private long lastPartitionCleanTime;
private Set<Long> disabledTables; // copy-on-write

Expand All @@ -105,31 +103,13 @@ protected void runOneCycle() {
cleanPhysicalPartition();

// Schedule compaction tasks only when this is a leader FE and all edit logs have finished replay.
// In order to ensure that the input rowsets of compaction still exists when doing publishing version, it is
// necessary to ensure that the compaction task of the same partition is executed serially, that is, the next
// compaction task can be executed only after the status of the previous compaction task changes to visible or
// canceled.
if (stateMgr.isLeader() && stateMgr.isReady() && allCommittedCompactionsBeforeRestartHaveFinished()) {
if (stateMgr.isLeader() && stateMgr.isReady()) {
compactionManager.checkActiveCompactionTransactionMap();
schedule();
history.changeMaxSize(Config.lake_compaction_history_size);
}
}

// Returns true if all compaction transactions committed before this restart have finished(i.e., of VISIBLE state).
private boolean allCommittedCompactionsBeforeRestartHaveFinished() {
if (finishedWaiting) {
return true;
}
// Note: must call getMinActiveCompactionTxnId() before getNextTransactionId(), otherwise if there are
// no running transactions waitTxnId <= minActiveTxnId will always be false.
long minActiveTxnId = transactionMgr.getMinActiveCompactionTxnId();
if (waitTxnId < 0) {
waitTxnId = transactionMgr.getTransactionIDGenerator().getNextTransactionId();
}
finishedWaiting = waitTxnId <= minActiveTxnId;
return finishedWaiting;
}

private void schedule() {
// Check whether there are completed compaction jobs.
for (Iterator<Map.Entry<PartitionIdentifier, CompactionJob>> iterator = runningCompactions.entrySet().iterator();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -776,6 +776,22 @@ public List<TransactionState> getCommittedTxnList() {
}
}

public List<TransactionState> getLakeCompactionCommittedTxnList() {
readLock();
try {
// only send task to committed transaction
return idToRunningTransactionState.values().stream()
.filter(transactionState -> (transactionState.getTransactionStatus() ==
TransactionStatus.COMMITTED))
.filter(transactionState -> transactionState.getSourceType() ==
TransactionState.LoadJobSourceType.LAKE_COMPACTION)
.sorted(Comparator.comparing(TransactionState::getCommitTime))
.collect(Collectors.toList());
} finally {
readUnlock();
}
}

// Check whether there is committed txns on partitionId.
public boolean hasCommittedTxnOnPartition(long tableId, long partitionId) {
readLock();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -647,6 +647,19 @@ public long getMinActiveCompactionTxnId() {
return minId;
}

/**
* Get the list of active txn ids of compaction transactions.
* @return the list of active txn ids of compaction transactions.
*/
public List<TransactionState> getLakeCompactionActiveTxnIds() {
List<TransactionState> txnStateList = new ArrayList<>();
for (Map.Entry<Long, DatabaseTransactionMgr> entry : dbIdToDatabaseTransactionMgrs.entrySet()) {
DatabaseTransactionMgr dbTransactionMgr = entry.getValue();
txnStateList.addAll(dbTransactionMgr.getLakeCompactionCommittedTxnList());
}
return txnStateList;
}

/**
* Get the smallest transaction ID of active transactions in a database.
* If there are no active transactions in the database, return the transaction ID that will be assigned to the
Expand Down Expand Up @@ -675,6 +688,18 @@ public TransactionState getTransactionState(long dbId, long transactionId) {
}
}

public TransactionState getTransactionState(long transactionId) {
TransactionState transactionState = null;
for (Map.Entry<Long, DatabaseTransactionMgr> entry : dbIdToDatabaseTransactionMgrs.entrySet()) {
DatabaseTransactionMgr dbTransactionMgr = entry.getValue();
transactionState = dbTransactionMgr.getTransactionState(transactionId);
if (transactionState != null) {
return transactionState;
}
}
return transactionState;
}

// for replay idToTransactionState
// check point also run transaction cleaner, the cleaner maybe concurrently modify id to
public void replayUpsertTransactionState(TransactionState transactionState) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
public final class VisibleStateWaiter {
private final TransactionState txnState;

VisibleStateWaiter(@NotNull TransactionState txnState) {
public VisibleStateWaiter(@NotNull TransactionState txnState) {
this.txnState = txnState;
}

Expand Down

0 comments on commit 2803208

Please sign in to comment.