Skip to content

Commit

Permalink
[BugFix] Fix several problem for cluster snapshot backup
Browse files Browse the repository at this point in the history
Signed-off-by: srlch <[email protected]>
  • Loading branch information
srlch committed Jan 17, 2025
1 parent 3f85637 commit d0ad547
Show file tree
Hide file tree
Showing 12 changed files with 89 additions and 56 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ SchemaScanner::ColumnDesc SchemaClusterSnapshotsScanner::_s_columns[] = {
{"SNAPSHOT_NAME", TypeDescriptor::create_varchar_type(sizeof(StringValue)), sizeof(StringValue), true},
{"SNAPSHOT_TYPE", TypeDescriptor::create_varchar_type(sizeof(StringValue)), sizeof(StringValue), true},
{"CREATED_TIME", TypeDescriptor::from_logical_type(TYPE_DATETIME), sizeof(DateTimeValue), true},
{"FINISHED_TIME", TypeDescriptor::from_logical_type(TYPE_DATETIME), sizeof(DateTimeValue), true},
{"FE_JOURNAL_ID", TypeDescriptor::from_logical_type(TYPE_BIGINT), sizeof(long), true},
{"STARMGR_JOURNAL_ID", TypeDescriptor::from_logical_type(TYPE_BIGINT), sizeof(long), true},
{"PROPERTIES", TypeDescriptor::create_varchar_type(sizeof(StringValue)), sizeof(StringValue), true},
Expand Down Expand Up @@ -59,8 +58,6 @@ Status SchemaClusterSnapshotsScanner::_fill_chunk(ChunkPtr* chunk) {

TimestampValue::create_from_unixtime(info.created_time, _runtime_state->timezone_obj()),

TimestampValue::create_from_unixtime(info.finished_time, _runtime_state->timezone_obj()),

info.fe_jouranl_id,
info.starmgr_jouranl_id,
Slice(info.properties),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ public static SystemTable create() {
.column("SNAPSHOT_NAME", ScalarType.createVarchar(NAME_CHAR_LEN))
.column("SNAPSHOT_TYPE", ScalarType.createVarchar(NAME_CHAR_LEN))
.column("CREATED_TIME", ScalarType.createType(PrimitiveType.DATETIME))
.column("FINISHED_TIME", ScalarType.createType(PrimitiveType.DATETIME))
.column("FE_JOURNAL_ID", ScalarType.createType(PrimitiveType.BIGINT))
.column("STARMGR_JOURNAL_ID", ScalarType.createType(PrimitiveType.BIGINT))
.column("PROPERTIES", ScalarType.createVarchar(NAME_CHAR_LEN))
Expand Down
5 changes: 5 additions & 0 deletions fe/fe-core/src/main/java/com/starrocks/fs/HdfsUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,11 @@ public static void copyFromLocal(String srcPath, String destPath, Map<String, St
hdfsService.copyFromLocal(srcPath, destPath, properties);
}

public static List<TBrokerFileStatus> listPath(String path, boolean fileNameOnly, Map<String, String> properties)
throws StarRocksException {
return hdfsService.listPath(path, fileNameOnly, properties);
}

/**
* Parse file status in path with broker, except directory
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,11 @@ public void listPath(TBrokerListPathRequest request, List<TBrokerFileStatus> fil
}
}

public List<TBrokerFileStatus> listPath(String path, boolean fileNameOnly, Map<String, String> properties)
throws StarRocksException {
return fileSystemManager.listPath(path, fileNameOnly, properties);
}

public List<FileStatus> listFileMeta(String path, Map<String, String> properties, boolean skipDir)
throws StarRocksException {
LOG.info("try to list file meta: {}", path);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,6 @@ public TClusterSnapshotsItem getInfo() {
item.setSnapshot_name(snapshotName);
item.setSnapshot_type(type.name());
item.setCreated_time(createdTime / 1000);
item.setFinished_time(finishedTime / 1000);
item.setFe_jouranl_id(feJournalId);
item.setStarmgr_jouranl_id(starMgrJournalId);
item.setProperties("");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,13 @@ public class ClusterSnapshotCheckpointScheduler extends FrontendDaemon {
private final CheckpointController feController;
private final CheckpointController starMgrController;

private boolean firstRun;

public ClusterSnapshotCheckpointScheduler(CheckpointController feController, CheckpointController starMgrController) {
super("cluster_snapshot_checkpoint_scheduler", Config.automated_cluster_snapshot_interval_seconds * 1000L);
this.feController = feController;
this.starMgrController = starMgrController;
this.firstRun = true;
}

@Override
Expand All @@ -45,6 +48,13 @@ protected void runAfterCatalogReady() {
return;
}


// skip first run when the scheduler start
if (firstRun) {
firstRun = false;
return;
}

CheckpointController.exclusiveLock();
try {
runCheckpointScheduler();
Expand Down Expand Up @@ -74,20 +84,28 @@ protected void runCheckpointScheduler() {
job.logJob();

Pair<Long, Long> getFEIdsRet = feController.getCheckpointJournalIds();
Pair<Boolean, String> createFEImageRet = feController.runCheckpointControllerWithIds(getFEIdsRet.first,
consistentIds.first);
if (!createFEImageRet.first) {
errMsg = "checkpoint failed for FE image: " + createFEImageRet.second;
break;
long feImageJournalId = getFEIdsRet.first;
long feCheckpointJournalId = consistentIds.first;
if (feImageJournalId < feCheckpointJournalId) {
Pair<Boolean, String> createFEImageRet = feController.runCheckpointControllerWithIds(feImageJournalId,
feCheckpointJournalId);
if (!createFEImageRet.first) {
errMsg = "checkpoint failed for FE image: " + createFEImageRet.second;
break;
}
}
LOG.info("Finished create image for FE image, version: {}", consistentIds.first);

Pair<Long, Long> getStarMgrIdsRet = starMgrController.getCheckpointJournalIds();
Pair<Boolean, String> createStarMgrImageRet =
starMgrController.runCheckpointControllerWithIds(getStarMgrIdsRet.first, consistentIds.second);
if (!createStarMgrImageRet.first) {
errMsg = "checkpoint failed for starMgr image: " + createStarMgrImageRet.second;
break;
long starMgrImageJournalId = getStarMgrIdsRet.first;
long starMgrCheckpointJournalId = consistentIds.second;
if (starMgrImageJournalId < starMgrCheckpointJournalId) {
Pair<Boolean, String> createStarMgrImageRet =
starMgrController.runCheckpointControllerWithIds(starMgrImageJournalId, starMgrCheckpointJournalId);
if (!createStarMgrImageRet.first) {
errMsg = "checkpoint failed for starMgr image: " + createStarMgrImageRet.second;
break;
}
}
LOG.info("Finished create image for starMgr image, version: {}", consistentIds.second);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,9 +100,9 @@ public ClusterSnapshotJobState getState() {
}

public boolean isUnFinishedState() {
return state == ClusterSnapshotJobState.SNAPSHOTING ||
state == ClusterSnapshotJobState.UPLOADING ||
state == ClusterSnapshotJobState.FINISHED;
return state == ClusterSnapshotJobState.INITIALIZING ||
state == ClusterSnapshotJobState.SNAPSHOTING ||
state == ClusterSnapshotJobState.UPLOADING;
}

public void logJob() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,20 +78,18 @@ public boolean isAutomatedSnapshotOn() {

// Turn off automated snapshot, use stmt for extension in future
public void setAutomatedSnapshotOff(AdminSetAutomatedSnapshotOffStmt stmt) {
setAutomatedSnapshotOff();

ClusterSnapshotLog log = new ClusterSnapshotLog();
log.setDropSnapshot(AUTOMATED_NAME_PREFIX);
GlobalStateMgr.getCurrentState().getEditLog().logClusterSnapshotLog(log);

// avoid network communication when replay log
if (automatedSnapshot != null) {
try {
ClusterSnapshotUtils.clearAutomatedSnapshotFromRemote(automatedSnapshot.getSnapshotName());
} catch (StarRocksException e) {
LOG.warn("Cluster Snapshot: {} delete failed, err msg: {}", automatedSnapshot.getSnapshotName(), e.getMessage());
}
try {
ClusterSnapshotUtils.clearAllAutomatedSnapshotFromRemote(null);
} catch (StarRocksException e) {
LOG.warn("Cluster Snapshot delete failed, err msg: {}", e.getMessage());
}

setAutomatedSnapshotOff();
}

protected void setAutomatedSnapshotOff() {
Expand All @@ -105,12 +103,10 @@ protected void addAutomatedClusterSnapshot(ClusterSnapshot newAutomatedClusterSn
log.setCreateSnapshot(newAutomatedClusterSnapshot);
GlobalStateMgr.getCurrentState().getEditLog().logClusterSnapshotLog(log);

if (automatedSnapshot != null && automatedSnapshot.getSnapshotName().startsWith(AUTOMATED_NAME_PREFIX)) {
try {
ClusterSnapshotUtils.clearAutomatedSnapshotFromRemote(automatedSnapshot.getSnapshotName());
} catch (StarRocksException e) {
LOG.warn("Cluster Snapshot: {} delete failed, err msg: {}", automatedSnapshot.getSnapshotName(), e.getMessage());
}
try {
ClusterSnapshotUtils.clearAllAutomatedSnapshotFromRemote(newAutomatedClusterSnapshot.getSnapshotName());
} catch (StarRocksException e) {
LOG.warn("Cluster Snapshot delete failed, err msg: {}", e.getMessage());
}

automatedSnapshot = newAutomatedClusterSnapshot;
Expand Down Expand Up @@ -155,6 +151,10 @@ public synchronized void addJob(ClusterSnapshotJob job) {
historyAutomatedSnapshotJobs.put(job.getId(), job);
}

public TreeMap<Long, ClusterSnapshotJob> getHistoryAutomatedSnapshotJobs() {
return historyAutomatedSnapshotJobs;
}

public TClusterSnapshotJobsResponse getAllJobsInfo() {
TClusterSnapshotJobsResponse response = new TClusterSnapshotJobsResponse();
for (Map.Entry<Long, ClusterSnapshotJob> entry : historyAutomatedSnapshotJobs.entrySet()) {
Expand Down Expand Up @@ -217,14 +217,6 @@ public void replayLog(ClusterSnapshotLog log) {
LOG.warn("Invalid Cluster Snapshot Job state {}", state);
}
}

// if a job do not finished/error but fe restart, we should reset the state as error
// when replaying the log during FE restart. Because the job is unretryable after restart
if (!GlobalStateMgr.getServingState().isReady() && job.isUnFinishedState()) {
job.setState(ClusterSnapshotJobState.ERROR);
job.setErrMsg("Snapshot job has been failed");
}
break;
}
default: {
LOG.warn("Invalid Cluster Snapshot Log Type {}", logType);
Expand All @@ -236,7 +228,7 @@ public void resetLastUnFinishedAutomatedSnapshotJob() {
if (!historyAutomatedSnapshotJobs.isEmpty()) {
ClusterSnapshotJob job = historyAutomatedSnapshotJobs.lastEntry().getValue();
if (job.isUnFinishedState()) {
job.setErrMsg("Snapshot job has been failed");
job.setErrMsg("Snapshot job has been failed because of FE restart or leader change");
job.setState(ClusterSnapshotJobState.ERROR);
job.logJob();
}
Expand All @@ -252,6 +244,10 @@ public void save(ImageWriter imageWriter) throws IOException, SRMetaBlockExcepti
public void load(SRMetaBlockReader reader)
throws SRMetaBlockEOFException, IOException, SRMetaBlockException {
ClusterSnapshotMgr data = reader.readJson(ClusterSnapshotMgr.class);

automatedSnapshotSvName = data.getAutomatedSnapshotSvName();
automatedSnapshot = data.getAutomatedSnapshot();
historyAutomatedSnapshotJobs = data.getHistoryAutomatedSnapshotJobs();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,16 @@
import com.starrocks.fs.HdfsUtil;
import com.starrocks.server.GlobalStateMgr;
import com.starrocks.storagevolume.StorageVolume;
import com.starrocks.thrift.TBrokerFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.List;

public class ClusterSnapshotUtils {
public static final Logger LOG = LogManager.getLogger(ClusterSnapshotUtils.class);

public static String getSnapshotImagePath(StorageVolume sv, String snapshotName) {
return String.join("/", sv.getLocations().get(0),
GlobalStateMgr.getCurrentState().getStarOSAgent().getRawServiceId(), "meta/image", snapshotName);
Expand All @@ -38,16 +46,22 @@ public static void uploadAutomatedSnapshotToRemote(String snapshotName) throws S
HdfsUtil.copyFromLocal(localImagePath, snapshotImagePath, sv.getProperties());
}

public static void clearAutomatedSnapshotFromRemote(String snapshotName) throws StarRocksException {
if (snapshotName == null || snapshotName.isEmpty()) {
return;
}

public static void clearAllAutomatedSnapshotFromRemote(String notAllowDelete) throws StarRocksException {
StorageVolume sv = GlobalStateMgr.getCurrentState().getClusterSnapshotMgr().getAutomatedSnapshotSv();
BrokerDesc brokerDesc = new BrokerDesc(sv.getProperties());
String snapshotImagePath = String.join("/", sv.getLocations().get(0),
GlobalStateMgr.getCurrentState().getStarOSAgent().getRawServiceId(), "meta/image", snapshotName);
String snapshotImageRootPath = getSnapshotImagePath(sv, "");
List<TBrokerFileStatus> statuses =
HdfsUtil.listPath(snapshotImageRootPath + ClusterSnapshotMgr.AUTOMATED_NAME_PREFIX + "*",
false, sv.getProperties());

HdfsUtil.deletePath(snapshotImagePath, brokerDesc);
for (TBrokerFileStatus status : statuses) {
Path path = new Path(status.path);
if (notAllowDelete != null && path.getName().equals(notAllowDelete)) {
continue;
}
if (path.getName().startsWith(ClusterSnapshotMgr.AUTOMATED_NAME_PREFIX)) {
HdfsUtil.deletePath(status.path, brokerDesc);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1767,6 +1767,7 @@ public void saveImage(ImageWriter imageWriter, File curFile) throws IOException
keyMgr.save(imageWriter);
pipeManager.getRepo().save(imageWriter);
warehouseMgr.save(imageWriter);
clusterSnapshotMgr.save(imageWriter);
sqlBlackList.save(imageWriter);
} catch (SRMetaBlockException e) {
LOG.error("Save meta block failed ", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ public void testOperationOfAutomatedSnapshot() throws DdlException {
ExceptionChecker.expectThrowsNoException(() ->
ClusterSnapshotUtils.uploadAutomatedSnapshotToRemote(job.getSnapshotName()));
ExceptionChecker.expectThrowsNoException(() ->
ClusterSnapshotUtils.clearAutomatedSnapshotFromRemote(job.getSnapshotName()));
ClusterSnapshotUtils.clearAllAutomatedSnapshotFromRemote(null));
setAutomatedSnapshotOff(false);
}

Expand Down
11 changes: 5 additions & 6 deletions gensrc/thrift/FrontendService.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -1955,12 +1955,11 @@ struct TClusterSnapshotsItem {
1: optional string snapshot_name;
2: optional string snapshot_type;
3: optional i64 created_time;
4: optional i64 finished_time;
5: optional i64 fe_jouranl_id;
6: optional i64 starmgr_jouranl_id;
7: optional string properties;
8: optional string storage_volume;
9: optional string storage_path;
4: optional i64 fe_jouranl_id;
5: optional i64 starmgr_jouranl_id;
6: optional string properties;
7: optional string storage_volume;
8: optional string storage_path;
}

struct TClusterSnapshotsRequest {
Expand Down

0 comments on commit d0ad547

Please sign in to comment.