Skip to content

Commit

Permalink
[Enhancement] Optimize LoadsHistorySyncer (StarRocks#53864)
Browse files Browse the repository at this point in the history
## Why I'm doing:
1. If there are no load jobs, LoadsHistorySyncer still runs.
2. LoadsHistorySyncer does not ignore its own load job, so cause Infinite loop.

## What I'm doing:
1. Record the last sync time, if there are no new load jobs, do not run sync SQL.
2. Ignore the LoadsHistorySyncer triggered dml SQL.

Signed-off-by: gengjun-git <[email protected]>
  • Loading branch information
gengjun-git authored Dec 16, 2024
1 parent 019de3a commit 884981f
Show file tree
Hide file tree
Showing 4 changed files with 78 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -284,4 +284,8 @@ public void afterVisible(TransactionState txnState, boolean txnOperated) {
@Override
public void replayOnVisible(TransactionState txnState) {
}

public long getTableId() {
return tableId;
}
}
21 changes: 21 additions & 0 deletions fe/fe-core/src/main/java/com/starrocks/load/loadv2/LoadMgr.java
Original file line number Diff line number Diff line change
Expand Up @@ -576,6 +576,27 @@ public List<LoadJob> getLoadJobs(String labelValue) {
}
}

public long getLatestFinishTimeExcludeTable(long dbId, long tableId) {
long latestFinishTime = -1L;
readLock();
try {
for (LoadJob loadJob : idToLoadJob.values()) {
if (loadJob instanceof InsertLoadJob
&& loadJob.getDbId() == dbId
&& ((InsertLoadJob) loadJob).getTableId() == tableId) {
continue;
}
if (loadJob.isFinal()) {
latestFinishTime = Math.max(latestFinishTime, loadJob.getFinishTimestamp());
}
}
} finally {
readUnlock();
}

return latestFinishTime;
}

public List<LoadJob> getLoadJobsByDb(long dbId, String labelValue, boolean accurateMatch) {
List<LoadJob> loadJobList = Lists.newArrayList();
readLock();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,15 @@
package com.starrocks.load.loadv2;

import com.starrocks.catalog.CatalogUtils;
import com.starrocks.catalog.Database;
import com.starrocks.catalog.Table;
import com.starrocks.common.Config;
import com.starrocks.common.FeConstants;
import com.starrocks.common.Pair;
import com.starrocks.common.util.FrontendDaemon;
import com.starrocks.load.pipe.filelist.RepoExecutor;
import com.starrocks.scheduler.history.TableKeeper;
import com.starrocks.server.GlobalStateMgr;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

Expand Down Expand Up @@ -83,6 +87,8 @@ public class LoadsHistorySyncer extends FrontendDaemon {
new TableKeeper(LOADS_HISTORY_DB_NAME, LOADS_HISTORY_TABLE_NAME, LOADS_HISTORY_TABLE_CREATE,
() -> Math.max(1, Config.loads_history_retained_days));

private long syncedLoadFinishTime = -1L;

public static TableKeeper createKeeper() {
return KEEPER;
}
Expand Down Expand Up @@ -114,7 +120,15 @@ protected void runAfterCatalogReady() {
firstSync = false;
return;
}
syncData();

long latestFinishTime = getLatestFinishTime();
if (syncedLoadFinishTime < latestFinishTime) {
// refer to SQL:LOADS_HISTORY_SYNC. Only sync loads that completed more than 1 minute ago
long oneMinAgo = System.currentTimeMillis() - 60000;
syncData();
// use (oneMinAgo - 10000) to cover the clock skew between FE and BE
syncedLoadFinishTime = Math.min(latestFinishTime, oneMinAgo - 10000);
}
} catch (Throwable e) {
LOG.warn("Failed to process one round of LoadJobScheduler with error message {}", e.getMessage(), e);
}
Expand All @@ -131,4 +145,27 @@ public static String buildSyncSql() {
}
}

private Pair<Long, Long> getTargetDbTableId() {
Database database = GlobalStateMgr.getCurrentState().getLocalMetastore().getDb(LOADS_HISTORY_DB_NAME);
if (database == null) {
return null;
}
Table table = database.getTable(LOADS_HISTORY_TABLE_NAME);
if (table == null) {
return null;
}

return Pair.create(database.getId(), table.getId());
}

private long getLatestFinishTime() {
Pair<Long, Long> dbTableId = getTargetDbTableId();
if (dbTableId == null) {
LOG.warn("failed to get db: {}, table: {}", LOADS_HISTORY_DB_NAME, LOADS_HISTORY_TABLE_NAME);
return -1L;
}
GlobalStateMgr state = GlobalStateMgr.getCurrentState();
return Math.max(state.getLoadMgr().getLatestFinishTimeExcludeTable(dbTableId.first, dbTableId.second),
state.getStreamLoadMgr().getLatestFinishTime());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -700,4 +700,19 @@ public List<Pair<List<Object>, Long>> getSamples() {
.collect(Collectors.toList());
return Lists.newArrayList(Pair.create(samples, (long) idToStreamLoadTask.size()));
}

public long getLatestFinishTime() {
long latestTime = -1L;
readLock();
try {
for (StreamLoadTask task : idToStreamLoadTask.values()) {
if (task.isFinal()) {
latestTime = Math.max(latestTime, task.getFinishTimestampMs());
}
}
} finally {
readUnlock();
}
return latestTime;
}
}

0 comments on commit 884981f

Please sign in to comment.