Skip to content

Commit

Permalink
[Feature] Support Cluster Snapshot Backup: checkpoint and image backu…
Browse files Browse the repository at this point in the history
…p (part3) (#54695)

Signed-off-by: srlch <[email protected]>
(cherry picked from commit e332116)

# Conflicts:
#	fe/fe-core/src/main/java/com/starrocks/fs/hdfs/HdfsFsManager.java
#	fe/fe-core/src/test/java/com/starrocks/lake/ClusterSnapshotTest.java
  • Loading branch information
srlch authored and mergify[bot] committed Jan 13, 2025
1 parent 0a074ee commit b4ba25c
Show file tree
Hide file tree
Showing 22 changed files with 929 additions and 53 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ namespace starrocks {
SchemaScanner::ColumnDesc SchemaClusterSnapshotJobsScanner::_s_columns[] = {
{"SNAPSHOT_NAME", TypeDescriptor::create_varchar_type(sizeof(StringValue)), sizeof(StringValue), true},
{"JOB_ID", TypeDescriptor::from_logical_type(TYPE_BIGINT), sizeof(long), true},
{"CREATED_TIME", TypeDescriptor::from_logical_type(TYPE_BIGINT), sizeof(long), true},
{"FINISHED_TIME", TypeDescriptor::from_logical_type(TYPE_BIGINT), sizeof(long), true},
{"CREATED_TIME", TypeDescriptor::from_logical_type(TYPE_DATETIME), sizeof(DateTimeValue), true},
{"FINISHED_TIME", TypeDescriptor::from_logical_type(TYPE_DATETIME), sizeof(DateTimeValue), true},
{"STATE", TypeDescriptor::create_varchar_type(sizeof(StringValue)), sizeof(StringValue), true},
{"DETAIL_INFO", TypeDescriptor::create_varchar_type(sizeof(StringValue)), sizeof(StringValue), true},
{"ERROR_MESSAGE", TypeDescriptor::create_varchar_type(sizeof(StringValue)), sizeof(StringValue), true},
Expand All @@ -52,8 +52,16 @@ Status SchemaClusterSnapshotJobsScanner::_fill_chunk(ChunkPtr* chunk) {
auto& slot_id_map = (*chunk)->get_slot_id_to_index_map();
const TClusterSnapshotJobsItem& info = _result.items[_index];
DatumArray datum_array{
Slice(info.snapshot_name), info.job_id, info.created_time,
info.finished_time, Slice(info.state), Slice(info.detail_info),
Slice(info.snapshot_name),
info.job_id,

TimestampValue::create_from_unixtime(info.created_time, _runtime_state->timezone_obj()),

TimestampValue::create_from_unixtime(info.finished_time, _runtime_state->timezone_obj()),

Slice(info.state),
Slice(info.detail_info),

Slice(info.error_message),
};
for (const auto& [slot_id, index] : slot_id_map) {
Expand Down
16 changes: 12 additions & 4 deletions be/src/exec/schema_scanner/schema_cluster_snapshots_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ namespace starrocks {
SchemaScanner::ColumnDesc SchemaClusterSnapshotsScanner::_s_columns[] = {
{"SNAPSHOT_NAME", TypeDescriptor::create_varchar_type(sizeof(StringValue)), sizeof(StringValue), true},
{"SNAPSHOT_TYPE", TypeDescriptor::create_varchar_type(sizeof(StringValue)), sizeof(StringValue), true},
{"CREATED_TIME", TypeDescriptor::from_logical_type(TYPE_BIGINT), sizeof(long), true},
{"FINISHED_TIME", TypeDescriptor::from_logical_type(TYPE_BIGINT), sizeof(long), true},
{"CREATED_TIME", TypeDescriptor::from_logical_type(TYPE_DATETIME), sizeof(DateTimeValue), true},
{"FINISHED_TIME", TypeDescriptor::from_logical_type(TYPE_DATETIME), sizeof(DateTimeValue), true},
{"FE_JOURNAL_ID", TypeDescriptor::from_logical_type(TYPE_BIGINT), sizeof(long), true},
{"STARMGR_JOURNAL_ID", TypeDescriptor::from_logical_type(TYPE_BIGINT), sizeof(long), true},
{"PROPERTIES", TypeDescriptor::create_varchar_type(sizeof(StringValue)), sizeof(StringValue), true},
Expand Down Expand Up @@ -54,9 +54,17 @@ Status SchemaClusterSnapshotsScanner::_fill_chunk(ChunkPtr* chunk) {
auto& slot_id_map = (*chunk)->get_slot_id_to_index_map();
const TClusterSnapshotsItem& info = _result.items[_index];
DatumArray datum_array{
Slice(info.snapshot_name), Slice(info.snapshot_type), info.created_time, info.finished_time,
Slice(info.snapshot_name),
Slice(info.snapshot_type),

info.fe_jouranl_id, info.starmgr_jouranl_id, Slice(info.properties), Slice(info.storage_volume),
TimestampValue::create_from_unixtime(info.created_time, _runtime_state->timezone_obj()),

TimestampValue::create_from_unixtime(info.finished_time, _runtime_state->timezone_obj()),

info.fe_jouranl_id,
info.starmgr_jouranl_id,
Slice(info.properties),
Slice(info.storage_volume),

Slice(info.storage_path),
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ public static SystemTable create() {
builder()
.column("SNAPSHOT_NAME", ScalarType.createVarchar(NAME_CHAR_LEN))
.column("JOB_ID", ScalarType.createType(PrimitiveType.BIGINT))
.column("CREATED_TIME", ScalarType.createType(PrimitiveType.BIGINT))
.column("FINISHED_TIME", ScalarType.createType(PrimitiveType.BIGINT))
.column("CREATED_TIME", ScalarType.createType(PrimitiveType.DATETIME))
.column("FINISHED_TIME", ScalarType.createType(PrimitiveType.DATETIME))
.column("STATE", ScalarType.createVarchar(NAME_CHAR_LEN))
.column("DETAIL_INFO", ScalarType.createVarchar(NAME_CHAR_LEN))
.column("ERROR_MESSAGE", ScalarType.createVarchar(NAME_CHAR_LEN))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ public static SystemTable create() {
builder()
.column("SNAPSHOT_NAME", ScalarType.createVarchar(NAME_CHAR_LEN))
.column("SNAPSHOT_TYPE", ScalarType.createVarchar(NAME_CHAR_LEN))
.column("CREATED_TIME", ScalarType.createType(PrimitiveType.BIGINT))
.column("FINISHED_TIME", ScalarType.createType(PrimitiveType.BIGINT))
.column("CREATED_TIME", ScalarType.createType(PrimitiveType.DATETIME))
.column("FINISHED_TIME", ScalarType.createType(PrimitiveType.DATETIME))
.column("FE_JOURNAL_ID", ScalarType.createType(PrimitiveType.BIGINT))
.column("STARMGR_JOURNAL_ID", ScalarType.createType(PrimitiveType.BIGINT))
.column("PROPERTIES", ScalarType.createVarchar(NAME_CHAR_LEN))
Expand Down
6 changes: 6 additions & 0 deletions fe/fe-core/src/main/java/com/starrocks/common/Config.java
Original file line number Diff line number Diff line change
Expand Up @@ -3379,4 +3379,10 @@ public class Config extends ConfigBase {

@ConfField(mutable = false)
public static int query_deploy_threadpool_size = max(50, getRuntime().availableProcessors() * 10);

@ConfField(mutable = true)
public static long automated_cluster_snapshot_interval_seconds = 1800;

@ConfField(mutable = false)
public static int max_historical_automated_cluster_snapshot_jobs = 100;
}
5 changes: 5 additions & 0 deletions fe/fe-core/src/main/java/com/starrocks/fs/HdfsUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,11 @@ public static void copyToLocal(String srcPath, String destPath, Map<String, Stri
hdfsService.copyToLocal(srcPath, destPath, properties);
}

public static void copyFromLocal(String srcPath, String destPath, Map<String, String> properties)
throws StarRocksException {
hdfsService.copyFromLocal(srcPath, destPath, properties);
}

/**
* Parse file status in path with broker, except directory
*
Expand Down
22 changes: 22 additions & 0 deletions fe/fe-core/src/main/java/com/starrocks/fs/hdfs/HdfsFsManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.security.UserGroupInformation;
Expand All @@ -47,6 +48,7 @@
import software.amazon.awssdk.awscore.util.AwsHostNameUtils;
import software.amazon.awssdk.regions.Region;

import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InterruptedIOException;
Expand Down Expand Up @@ -1212,7 +1214,27 @@ public void copyToLocal(String srcPath, String destPath, Map<String, String> pro
}
}

<<<<<<< HEAD
public List<FileStatus> listFileMeta(String path, Map<String, String> properties) throws UserException {
=======
public void copyFromLocal(String srcPath, String destPath, Map<String, String> properties) throws StarRocksException {
HdfsFs fileSystem = getFileSystem(destPath, properties, null);
try {
WildcardURI destPathUri = new WildcardURI(destPath);
File srcFile = new File(srcPath);
FileUtil.copy(srcFile, fileSystem.getDFSFileSystem(), new Path(destPathUri.getPath()), false, new Configuration());
} catch (InterruptedIOException e) {
Thread.interrupted(); // clear interrupted flag
LOG.error("Interrupted while copy local {} to {} ", srcPath, destPath, e);
throw new StarRocksException("Failed to copy local " + srcPath + " to " + destPath, e);
} catch (Exception e) {
LOG.error("Exception while copy local {} to {} ", srcPath, destPath, e);
throw new StarRocksException("Failed to copy local " + srcPath + " to " + destPath, e);
}
}

public List<FileStatus> listFileMeta(String path, Map<String, String> properties) throws StarRocksException {
>>>>>>> e33211689 ([Feature] Support Cluster Snapshot Backup: checkpoint and image backup (part3) (#54695))
WildcardURI pathUri = new WildcardURI(path);
HdfsFs fileSystem = getFileSystem(path, properties, null);
Path pathPattern = new Path(pathUri.getPath());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,11 @@ public void copyToLocal(String srcPath, String destPath, Map<String, String> pro
LOG.info("Copied {} to local {}", srcPath, destPath);
}

public void copyFromLocal(String srcPath, String destPath, Map<String, String> properties) throws StarRocksException {
fileSystemManager.copyFromLocal(srcPath, destPath, properties);
LOG.info("Copied local {} to {}", srcPath, destPath);
}

public void listPath(TBrokerListPathRequest request, List<TBrokerFileStatus> fileStatuses, boolean skipDir,
boolean fileNameOnly) throws UserException {
LOG.info("receive a list path request, path: {}", request.path);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,9 +80,11 @@ protected void runAfterCatalogReady() {
if (nextPoint == null) {
return;
}

if (nextPoint.journalId <= getImageJournalId()) {
return;
}

if (nextPoint.epoch != servingGlobalState.getEpoch()) {
return;
}
Expand Down
4 changes: 4 additions & 0 deletions fe/fe-core/src/main/java/com/starrocks/lake/StarOSAgent.java
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,10 @@ public void getServiceId() {
LOG.info("get serviceId {} from starMgr", serviceId);
}

public String getRawServiceId() {
return serviceId;
}

public String addFileStore(FileStoreInfo fsInfo) throws DdlException {
try {
return client.addFileStore(fsInfo, serviceId);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
// Copyright 2021-present StarRocks, Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package com.starrocks.lake.snapshot;

import com.google.gson.annotations.SerializedName;
import com.starrocks.server.GlobalStateMgr;
import com.starrocks.server.StorageVolumeMgr;
import com.starrocks.storagevolume.StorageVolume;
import com.starrocks.thrift.TClusterSnapshotsItem;

public class ClusterSnapshot {
public enum ClusterSnapshotType { AUTOMATED, MANUAL, INCREMENTAL }

@SerializedName(value = "id")
private long id;
@SerializedName(value = "snapshotName")
private String snapshotName;
@SerializedName(value = "type")
private ClusterSnapshotType type;
@SerializedName(value = "storageVolumeName")
private String storageVolumeName;
@SerializedName(value = "createdTime")
private long createdTime;
@SerializedName(value = "finishedTime")
private long finishedTime;
@SerializedName(value = "feJournalId")
private long feJournalId;
@SerializedName(value = "starMgrJournal")
private long starMgrJournalId;

public ClusterSnapshot() {}

public ClusterSnapshot(long id, String snapshotName, String storageVolumeName, long createdTime,
long finishedTime, long feJournalId, long starMgrJournalId) {
this.id = id;
this.snapshotName = snapshotName;
this.type = ClusterSnapshotType.AUTOMATED;
this.storageVolumeName = storageVolumeName;
this.createdTime = createdTime;
this.finishedTime = finishedTime;
this.feJournalId = feJournalId;
this.starMgrJournalId = starMgrJournalId;
}

public void setJournalIds(long feJournalId, long starMgrJournalId) {
this.feJournalId = feJournalId;
this.starMgrJournalId = starMgrJournalId;
}

public void setFinishedTime(long finishedTime) {
this.finishedTime = finishedTime;
}

public String getSnapshotName() {
return snapshotName;
}

public String getStorageVolumeName() {
return storageVolumeName;
}

public long getCreatedTime() {
return createdTime;
}

public long getFinishedTime() {
return finishedTime;
}

public long getFeJournalId() {
return feJournalId;
}

public long getStarMgrJournalId() {
return starMgrJournalId;
}

public long getId() {
return id;
}

public TClusterSnapshotsItem getInfo() {
TClusterSnapshotsItem item = new TClusterSnapshotsItem();
item.setSnapshot_name(snapshotName);
item.setSnapshot_type(type.name());
item.setCreated_time(createdTime / 1000);
item.setFinished_time(finishedTime / 1000);
item.setFe_jouranl_id(feJournalId);
item.setStarmgr_jouranl_id(starMgrJournalId);
item.setProperties("");
item.setStorage_volume(storageVolumeName);

StorageVolumeMgr storageVolumeMgr = GlobalStateMgr.getCurrentState().getStorageVolumeMgr();
try {
StorageVolume sv = storageVolumeMgr.getStorageVolumeByName(storageVolumeName);
if (sv == null) {
throw new Exception("Unknown storage volume: " + storageVolumeName);
}
item.setStorage_path(ClusterSnapshotUtils.getSnapshotImagePath(sv, snapshotName));
} catch (Exception e) {
item.setStorage_path("");
}
return item;
}
}
Loading

0 comments on commit b4ba25c

Please sign in to comment.