Skip to content

Commit

Permalink
Merge pull request #552 from Ceng23333/pick_compact_update
Browse files Browse the repository at this point in the history
[Spark][CherryPick]Compaction supports limiting file number and changing bucket number
  • Loading branch information
Ceng23333 authored Oct 26, 2024
2 parents de9c8fa + df4a71e commit 5412ac8
Show file tree
Hide file tree
Showing 30 changed files with 1,283 additions and 367 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/flink-cdc-hdfs-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ jobs:
- name: Start compaction task
run: |
cd ./script/benchmark/work-dir
nohup docker run --cpus 2 -m 5000m --net lakesoul-docker-compose-env_default --rm -t -v $HADOOP_HOME:/opt/hadoop --env HADOOP_HOME=/opt/hadoop -v ${PWD}:/opt/spark/work-dir --env lakesoul_home=/opt/spark/work-dir/lakesoul.properties bitnami/spark:3.3.1 spark-submit --proxy-user flink --driver-memory 2G --executor-memory 2G --conf spark.driver.memoryOverhead=1500m --conf spark.executor.memoryOverhead=1500m --conf spark.hadoop.fs.s3.buffer.dir=/tmp --conf spark.hadoop.fs.s3a.buffer.dir=/tmp --conf spark.hadoop.fs.s3a.fast.upload.buffer=disk --conf spark.hadoop.fs.s3a.fast.upload=true --conf spark.dmetasoul.lakesoul.native.io.enable=true --class com.dmetasoul.lakesoul.spark.compaction.CompactionTask --master local[4] /opt/spark/work-dir/$SPARK_JAR_NAME --threadpool.size=10 --database="" > compaction.log 2>&1 &
nohup docker run --cpus 2 -m 5000m --net lakesoul-docker-compose-env_default --rm -t -v $HADOOP_HOME:/opt/hadoop --env HADOOP_HOME=/opt/hadoop -v ${PWD}:/opt/spark/work-dir --env lakesoul_home=/opt/spark/work-dir/lakesoul.properties bitnami/spark:3.3.1 spark-submit --proxy-user flink --driver-memory 2G --executor-memory 2G --conf spark.driver.memoryOverhead=1500m --conf spark.executor.memoryOverhead=1500m --conf spark.hadoop.fs.s3.buffer.dir=/tmp --conf spark.hadoop.fs.s3a.buffer.dir=/tmp --conf spark.hadoop.fs.s3a.fast.upload.buffer=disk --conf spark.hadoop.fs.s3a.fast.upload=true --conf spark.dmetasoul.lakesoul.native.io.enable=true --class com.dmetasoul.lakesoul.spark.compaction.CompactionTask --master local[4] /opt/spark/work-dir/$SPARK_JAR_NAME --threadpool.size=10 --database="" --file_num_limit=5 > compaction.log 2>&1 &
- name: Start flink mysql cdc task-1
run: |
docker exec -t -u flink lakesoul-docker-compose-env-jobmanager-1 flink run -d -c org.apache.flink.lakesoul.entry.MysqlCdc /opt/flink/work-dir/$FLINK_JAR_NAME --source_db.host mysql --source_db.port 3306 --source_db.db_name test_cdc --source_db.user root --source_db.password root --source.parallelism 2 --sink.parallelism 4 --use.cdc true --warehouse_path hdfs://172.17.0.1:9000/lakesoul-test-bucket/data/ --flink.checkpoint hdfs://172.17.0.1:9000/lakesoul-test-bucket/chk --flink.savepoint hdfs://172.17.0.1:9000/lakesoul-test-bucket/svp --job.checkpoint_interval 5000 --server_time_zone UTC
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/flink-cdc-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ jobs:
- name: Start compaction task
run: |
cd ./script/benchmark/work-dir
nohup docker run --cpus 2 -m 5000m --net lakesoul-docker-compose-env_default --rm -t -v ${PWD}:/opt/spark/work-dir --env lakesoul_home=/opt/spark/work-dir/lakesoul.properties bitnami/spark:3.3.1 spark-submit --driver-memory 2G --executor-memory 2G --conf spark.driver.memoryOverhead=1500m --conf spark.executor.memoryOverhead=1500m --conf spark.hadoop.fs.s3.buffer.dir=/tmp --conf spark.hadoop.fs.s3a.buffer.dir=/tmp --conf spark.hadoop.fs.s3.impl=org.apache.hadoop.fs.s3a.S3AFileSystem --conf spark.hadoop.fs.s3a.path.style.access=true --conf spark.hadoop.fs.s3a.endpoint=http://minio:9000 --conf spark.hadoop.fs.s3a.access.key=minioadmin1 --conf spark.hadoop.fs.s3a.secret.key=minioadmin1 --conf spark.sql.warehouse.dir=s3://lakesoul-test-bucket/ --conf spark.hadoop.fs.s3a.fast.upload.buffer=disk --conf spark.hadoop.fs.s3a.fast.upload=true --conf spark.dmetasoul.lakesoul.native.io.enable=true --class com.dmetasoul.lakesoul.spark.compaction.CompactionTask --master local[4] /opt/spark/work-dir/$SPARK_JAR_NAME --threadpool.size=10 --database="" > compaction.log 2>&1 &
nohup docker run --cpus 2 -m 5000m --net lakesoul-docker-compose-env_default --rm -t -v ${PWD}:/opt/spark/work-dir --env lakesoul_home=/opt/spark/work-dir/lakesoul.properties bitnami/spark:3.3.1 spark-submit --driver-memory 2G --executor-memory 2G --conf spark.driver.memoryOverhead=1500m --conf spark.executor.memoryOverhead=1500m --conf spark.hadoop.fs.s3.buffer.dir=/tmp --conf spark.hadoop.fs.s3a.buffer.dir=/tmp --conf spark.hadoop.fs.s3.impl=org.apache.hadoop.fs.s3a.S3AFileSystem --conf spark.hadoop.fs.s3a.path.style.access=true --conf spark.hadoop.fs.s3a.endpoint=http://minio:9000 --conf spark.hadoop.fs.s3a.access.key=minioadmin1 --conf spark.hadoop.fs.s3a.secret.key=minioadmin1 --conf spark.sql.warehouse.dir=s3://lakesoul-test-bucket/ --conf spark.hadoop.fs.s3a.fast.upload.buffer=disk --conf spark.hadoop.fs.s3a.fast.upload=true --conf spark.dmetasoul.lakesoul.native.io.enable=true --class com.dmetasoul.lakesoul.spark.compaction.CompactionTask --master local[4] /opt/spark/work-dir/$SPARK_JAR_NAME --threadpool.size=10 --database="" --file_num_limit=5 > compaction.log 2>&1 &
- name: Start flink mysql cdc task-1
run: |
docker exec -t lakesoul-docker-compose-env-jobmanager-1 flink run -d -c org.apache.flink.lakesoul.entry.MysqlCdc /opt/flink/work-dir/$FLINK_JAR_NAME --source_db.host mysql --source_db.port 3306 --source_db.db_name test_cdc --source_db.user root --source_db.password root --source.parallelism 2 --sink.parallelism 4 --use.cdc true --warehouse_path s3://lakesoul-test-bucket/data/ --flink.checkpoint s3://lakesoul-test-bucket/chk --flink.savepoint s3://lakesoul-test-bucket/svp --job.checkpoint_interval 5000 --server_time_zone UTC
Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/rust-ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,6 @@ jobs:
uses: arduino/setup-protoc@v2
with:
version: "23.x"
- name: Run tests
run: cd rust && sudo cargo test --package lakesoul-datafusion
# - name: Run tests
# run: cd rust && sudo cargo test --package lakesoul-datafusion

Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import java.sql.SQLException;
import java.sql.Statement;
import java.util.*;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;

import static com.dmetasoul.lakesoul.meta.DBConfig.*;
Expand Down Expand Up @@ -373,4 +375,25 @@ public void report() {
}

}
}

public static long parseMemoryExpression(String memoryExpression) {
Pattern pattern = Pattern.compile("(\\d+)(\\w+)");
Matcher matcher = pattern.matcher(memoryExpression);
if (matcher.find()) {
long value = Long.parseLong(matcher.group(1));
String unit = matcher.group(2);
switch (unit) {
case "KB":
return value * 1024;
case "MB":
return value * 1024 * 1024;
case "GB":
return value * 1024 * 1024 * 1024;
default:
return value;
}
}
return 0;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -163,4 +163,9 @@ private void createLakeSoulSourceTableWithoutPK(TableEnvironment tEnvs)
tEnvs.executeSql(createOrderSql);
tEnvs.executeSql("INSERT INTO order_noPK VALUES (1,'apple',20), (2,'tomato',10), (3,'water',15)").await();
}

public static void main(String[] args) {
TableEnvironment tEnv = TestUtils.createTableEnv(BATCH_TYPE);
tEnv.executeSql("select * from compaction_limit_table order by `id`, `date`").print();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,11 @@ public void initialize(InputSplit[] inputSplits, TaskAttemptContext taskAttemptC
initialize(inputSplits, taskAttemptContext, null, requestSchema, null);
}

public void initialize(InputSplit[] inputSplits, TaskAttemptContext taskAttemptContext, String[] primaryKeys, StructType requestSchema, Map<String, String> mergeOperatorInfo)
public void initialize(InputSplit[] inputSplits,
TaskAttemptContext taskAttemptContext,
String[] primaryKeys,
StructType requestSchema,
Map<String, String> mergeOperatorInfo)
throws IOException, InterruptedException, UnsupportedOperationException {
super.initialize(inputSplits[0], taskAttemptContext);
FileSplit split = (FileSplit) inputSplits[0];
Expand Down Expand Up @@ -220,6 +224,10 @@ public void setPrefetchBufferSize(int prefetchBufferSize) {
this.prefetchBufferSize = prefetchBufferSize;
}

public void setOptions(Map<String, String> options) {
this.options = options;
}

public void setThreadNum(int threadNum) {
this.threadNum = threadNum;
}
Expand Down Expand Up @@ -249,6 +257,12 @@ private void recreateNativeReader() throws IOException {
reader.setBufferSize(prefetchBufferSize);
reader.setThreadNum(threadNum);

if (options != null) {
for (Map.Entry<String, String> kv : options.entrySet()) {
reader.setOption(kv.getKey(), kv.getValue());
}
}

NativeIOUtils.setNativeIOOptions(reader, this.nativeIOOptions);

if (filter != null) {
Expand Down Expand Up @@ -371,6 +385,7 @@ private void initializeInternal() throws IOException, UnsupportedOperationExcept
private NativeIOOptions nativeIOOptions;

private Map<String, String> mergeOps = null;
private Map<String, String> options = null;

private final FilterPredicate filter;
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,14 @@
package com.dmetasoul.lakesoul.meta

import com.alibaba.fastjson.JSONObject
import com.dmetasoul.lakesoul.meta.entity.{FileOp, Uuid}
import com.dmetasoul.lakesoul.meta.entity.Uuid
import org.apache.spark.internal.Logging
import org.apache.spark.sql.lakesoul.exception.LakeSoulErrors
import org.apache.spark.sql.lakesoul.utils._

import java.util
import scala.collection.JavaConverters
import scala.collection.JavaConverters.asScalaBufferConverter

object MetaCommit extends Logging {
//meta commit process
Expand Down Expand Up @@ -94,21 +95,21 @@ object MetaCommit extends Logging {
for (dataCommitInfo <- dataCommitInfoArray) {
val metaDataCommitInfo = entity.DataCommitInfo.newBuilder
metaDataCommitInfo.setTableId(table_id)
metaDataCommitInfo.setPartitionDesc(dataCommitInfo.range_value)
metaDataCommitInfo.setPartitionDesc(dataCommitInfo.getPartitionDesc)
metaDataCommitInfo.setCommitOp(entity.CommitOp.valueOf(commitType))
val uuid = dataCommitInfo.commit_id
metaDataCommitInfo.setCommitId(Uuid.newBuilder.setHigh(uuid.getMostSignificantBits).setLow(uuid.getLeastSignificantBits).build)
val uuid = dataCommitInfo.getCommitId
metaDataCommitInfo.setCommitId(uuid)
val fileOps = new util.ArrayList[entity.DataFileOp]()
for (file_info <- dataCommitInfo.file_ops) {
for (file_info <- dataCommitInfo.getFileOpsList.asScala) {
val metaDataFileInfo = entity.DataFileOp.newBuilder
metaDataFileInfo.setPath(file_info.path)
metaDataFileInfo.setFileOp(FileOp.valueOf(file_info.file_op))
metaDataFileInfo.setSize(file_info.size)
metaDataFileInfo.setFileExistCols(file_info.file_exist_cols)
metaDataFileInfo.setPath(file_info.getPath)
metaDataFileInfo.setFileOp(file_info.getFileOp)
metaDataFileInfo.setSize(file_info.getSize)
metaDataFileInfo.setFileExistCols(file_info.getFileExistCols)
fileOps.add(metaDataFileInfo.build)
}
metaDataCommitInfo.addAllFileOps(fileOps)
metaDataCommitInfo.setTimestamp(dataCommitInfo.modification_time)
metaDataCommitInfo.setTimestamp(dataCommitInfo.getTimestamp)
metaDataCommitInfoList.add(metaDataCommitInfo.build)
}
SparkMetaVersion.dbManager.batchCommitDataCommitInfo(metaDataCommitInfoList)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ object CleanOldCompaction {
|""".stripMargin

sqlToDataframe(sql, spark).rdd.collect().foreach(p => {
pathSet.add(getPath(p.get(0).toString))
pathSet.add(splitCompactFilePath(p.get(0).toString)._1)

})
pathSet.foreach(p => {
Expand All @@ -92,16 +92,18 @@ object CleanOldCompaction {
})
}

def getPath(filePath: String): String = {
def splitCompactFilePath(filePath: String): (String, String) = {
val targetString = "compact_"
var directoryPath = ""
var basePath = ""
val lastCompactIndex = filePath.lastIndexOf(targetString)
if (lastCompactIndex != -1) {
val nextDirectoryIndex = filePath.indexOf("/", lastCompactIndex)
if (nextDirectoryIndex != -1) {
directoryPath = filePath.substring(0, nextDirectoryIndex)
basePath = filePath.substring(nextDirectoryIndex + 1)
}
}
directoryPath
directoryPath -> basePath
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,18 +22,30 @@ object CompactionTask {
val dateFormat: SimpleDateFormat = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss")
val THREADPOOL_SIZE_PARAMETER = "threadpool.size"
val DATABASE_PARAMETER = "database"
val CLEAN_OLD_COMPACTION = "clean_old_compaction"
val FILE_NUM_LIMIT_PARAMETER = "file_num_limit"
val FILE_SIZE_LIMIT_PARAMETER = "file_size_limit"

val NOTIFY_CHANNEL_NAME = "lakesoul_compaction_notify"
val threadMap: java.util.Map[String, Integer] = new ConcurrentHashMap

var threadPoolSize = 8
var database = ""
var cleanOldCompaction: Option[Boolean] = Some(false)
var fileNumLimit: Option[Int] = None
var fileSizeLimit: Option[String] = None

def main(args: Array[String]): Unit = {

val parameter = ParametersTool.fromArgs(args)
threadPoolSize = parameter.getInt(THREADPOOL_SIZE_PARAMETER, 8)
database = parameter.get(DATABASE_PARAMETER, "")
if (parameter.has(FILE_NUM_LIMIT_PARAMETER)) {
fileNumLimit = Some(parameter.getInt(FILE_NUM_LIMIT_PARAMETER))
}
if (parameter.has(FILE_SIZE_LIMIT_PARAMETER)) {
fileSizeLimit = Some(parameter.get(FILE_SIZE_LIMIT_PARAMETER))
}

val builder = SparkSession.builder()
.config("spark.sql.parquet.mergeSchema", value = true)
Expand Down Expand Up @@ -94,14 +106,14 @@ object CompactionTask {
try {
val table = LakeSoulTable.forPath(path)
if (partitionDesc == "") {
table.compaction()
table.compaction(cleanOldCompaction = cleanOldCompaction.get, fileNumLimit = fileNumLimit, fileSizeLimit = fileSizeLimit, force = fileSizeLimit.isEmpty)
} else {
val partitions = partitionDesc.split(",").map(
partition => {
partition.replace("=", "='") + "'"
}
).mkString(" and ")
table.compaction(partitions, true)
table.compaction(partitions, cleanOldCompaction = cleanOldCompaction.get, fileNumLimit = fileNumLimit, fileSizeLimit = fileSizeLimit, force = fileSizeLimit.isEmpty)
}
} catch {
case e: Exception => throw e
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
package com.dmetasoul.lakesoul.tables

import com.dmetasoul.lakesoul.meta.DBConfig.{LAKESOUL_HASH_PARTITION_SPLITTER, LAKESOUL_RANGE_PARTITION_SPLITTER}
import com.dmetasoul.lakesoul.meta.SparkMetaVersion
import com.dmetasoul.lakesoul.meta.{DBUtil, SparkMetaVersion}
import com.dmetasoul.lakesoul.tables.execution.LakeSoulTableOperations
import org.apache.hadoop.fs.Path
import org.apache.spark.internal.Logging
Expand Down Expand Up @@ -306,64 +306,16 @@ class LakeSoulTable(df: => Dataset[Row], snapshotManagement: SnapshotManagement)
}

//by default, force perform compaction on whole table
def compaction(): Unit = {
compaction("", true, Map.empty[String, Any], "", "", false)
}

def compaction(cleanOldCompaction: Boolean): Unit = {
compaction("", true, Map.empty[String, Any], "", "", cleanOldCompaction)
}

def compaction(condition: String): Unit = {
compaction(condition, true, Map.empty[String, Any], "", "", false)
}

def compaction(condition: String, cleanOldCompaction: Boolean): Unit = {
compaction(condition, true, Map.empty[String, Any], "", "", cleanOldCompaction)
}

def compaction(mergeOperatorInfo: Map[String, Any]): Unit = {
compaction("", true, mergeOperatorInfo, "", "", false)
}

def compaction(condition: String,
mergeOperatorInfo: Map[String, Any]): Unit = {
compaction(condition, true, mergeOperatorInfo, "", "", false)
}

def compaction(condition: String, hiveTableName: String): Unit = {
compaction(condition, true, Map.empty[String, Any], hiveTableName, "", false)
}

def compaction(condition: String, hiveTableName: String, hivePartitionName: String): Unit = {
compaction(condition, true, Map.empty[String, Any], hiveTableName, hivePartitionName, false)
}

def compaction(force: Boolean,
mergeOperatorInfo: Map[String, Any] = Map.empty[String, Any],
cleanOldCompaction: Boolean): Unit = {
compaction("", force, mergeOperatorInfo, "", "", cleanOldCompaction)
}

def compaction(condition: String,
force: Boolean,
cleanOldCompaction: Boolean): Unit = {
compaction(condition, true, Map.empty[String, Any], "", "", cleanOldCompaction)
}

def compaction(condition: String,
force: Boolean,
mergeOperatorInfo: java.util.Map[String, Any],
cleanOldCompaction: Boolean): Unit = {
compaction(condition, force, mergeOperatorInfo.asScala.toMap, "", "", cleanOldCompaction)
}

def compaction(condition: String,
force: Boolean,
mergeOperatorInfo: Map[String, Any],
hiveTableName: String,
hivePartitionName: String,
cleanOldCompaction: Boolean): Unit = {
def compaction(condition: String = "",
force: Boolean = true,
mergeOperatorInfo: Map[String, Any] = Map.empty,
hiveTableName: String = "",
hivePartitionName: String = "",
cleanOldCompaction: Boolean = false,
fileNumLimit: Option[Int] = None,
newBucketNum: Option[Int] = None,
fileSizeLimit: Option[String] = None,
): Unit = {
val newMergeOpInfo = mergeOperatorInfo.map(m => {
val key =
if (!m._1.startsWith(LakeSoulUtils.MERGE_OP_COL)) {
Expand All @@ -379,7 +331,8 @@ class LakeSoulTable(df: => Dataset[Row], snapshotManagement: SnapshotManagement)
(key, value)
})

executeCompaction(df, snapshotManagement, condition, force, newMergeOpInfo, hiveTableName, hivePartitionName, cleanOldCompaction)
val parsedFileSizeLimit = fileSizeLimit.map(DBUtil.parseMemoryExpression)
executeCompaction(df, snapshotManagement, condition, force, newMergeOpInfo, hiveTableName, hivePartitionName, cleanOldCompaction, fileNumLimit, newBucketNum, parsedFileSizeLimit)
}

def setCompactionTtl(days: Int): LakeSoulTable = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,15 +173,22 @@ trait LakeSoulTableOperations extends AnalysisHelper {
mergeOperatorInfo: Map[String, String],
hiveTableName: String = "",
hivePartitionName: String = "",
cleanOldCompaction: Boolean): Unit = {
cleanOldCompaction: Boolean,
fileNumLimit: Option[Int],
newBucketNum: Option[Int],
fileSizeLimit: Option[Long]): Unit = {
toDataset(sparkSession, CompactionCommand(
snapshotManagement,
condition,
force,
mergeOperatorInfo,
hiveTableName,
hivePartitionName,
cleanOldCompaction))
cleanOldCompaction,
fileNumLimit,
newBucketNum,
fileSizeLimit
))
}

protected def executeSetCompactionTtl(snapshotManagement: SnapshotManagement, days: Int): Unit = {
Expand Down
Loading

0 comments on commit 5412ac8

Please sign in to comment.