Skip to content

Commit

Permalink
Merge pull request #352 from F-PHantam/optimize_incease_read
Browse files Browse the repository at this point in the history
[Spark]optimize incremental read and fix compact operation cause column disorder bug
  • Loading branch information
F-PHantam authored Oct 23, 2023
2 parents 3c566ca + e8c1090 commit e2f5113
Show file tree
Hide file tree
Showing 5 changed files with 53 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ public List<String> getDeleteFilePath(String tableId, String partitionDesc, long
if (StringUtils.isNotBlank(partitionDesc)) {
deleteSinglePartitionMetaInfo(tableId, partitionDesc, utcMills, fileOps, deleteFilePathList);
} else {
List<String> allPartitionDesc = partitionInfoDao.getAllPartitionDescByTableId(tableId);
List<String> allPartitionDesc = getTableAllPartitionDesc(tableId);
allPartitionDesc.forEach(partition -> deleteSinglePartitionMetaInfo(tableId, partition, utcMills, fileOps,
deleteFilePathList));
}
Expand Down Expand Up @@ -882,6 +882,10 @@ public void updateNamespaceProperties(String namespace, String properties) {
namespaceDao.updatePropertiesByNamespace(namespace, newProperties.toJSONString());
}

public List<String> getTableAllPartitionDesc(String tableId) {
return partitionInfoDao.getAllPartitionDescByTableId(tableId);
}

public void deleteNamespace(String namespace) {
namespaceDao.deleteByNamespace(namespace);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,21 +158,40 @@ object DataOperation {

def getSinglePartitionDataInfo(table_id: String, partition_desc: String, startTimestamp: Long,
endTimestamp: Long, readType: String): ArrayBuffer[DataFileInfo] = {
if (readType.equals(LakeSoulOptions.ReadType.INCREMENTAL_READ) || readType
.equals(LakeSoulOptions.ReadType.SNAPSHOT_READ)) {
val files_all_partitions_buf = new ArrayBuffer[DataFileInfo]()
if (readType.equals(LakeSoulOptions.ReadType.SNAPSHOT_READ)) {
if (null == partition_desc || "".equals(partition_desc)) {
val partitions = dbManager.getAllPartitionInfo(table_id)
val files_all_partitions_buf = new ArrayBuffer[DataFileInfo]()
val partitions = dbManager.getTableAllPartitionDesc(table_id)
partitions.forEach(partition => {
val preVersionTimestamp = dbManager
.getLastedVersionTimestampUptoTime(table_id, partition.getPartitionDesc, startTimestamp)
files_all_partitions_buf ++= getSinglePartitionIncrementalDataInfos(table_id, partition.getPartitionDesc,
preVersionTimestamp, endTimestamp)
val version = dbManager.getLastedVersionUptoTime(table_id, partition, endTimestamp)
files_all_partitions_buf ++= getSinglePartitionDataInfo(table_id, partition, version)
})
files_all_partitions_buf
} else {
val version = dbManager.getLastedVersionUptoTime(table_id, partition_desc, endTimestamp)
getSinglePartitionDataInfo(table_id, partition_desc, version)
}
} else if (readType.equals(LakeSoulOptions.ReadType.INCREMENTAL_READ)) {
if (null == partition_desc || "".equals(partition_desc)) {
val partitions = dbManager.getTableAllPartitionDesc(table_id)
partitions.forEach(partition => {
val preVersionTimestamp = dbManager.getLastedVersionTimestampUptoTime(table_id, partition, startTimestamp)
if (preVersionTimestamp == 0) {
val version = dbManager.getLastedVersionUptoTime(table_id, partition, endTimestamp)
files_all_partitions_buf ++= getSinglePartitionDataInfo(table_id, partition, version)
} else {
files_all_partitions_buf ++= getSinglePartitionIncrementalDataInfos(table_id, partition, preVersionTimestamp, endTimestamp)
}
})
files_all_partitions_buf
} else {
val preVersionTimestamp = dbManager.getLastedVersionTimestampUptoTime(table_id, partition_desc, startTimestamp)
getSinglePartitionIncrementalDataInfos(table_id, partition_desc, preVersionTimestamp, endTimestamp)
if (preVersionTimestamp == 0) {
val version = dbManager.getLastedVersionUptoTime(table_id, partition_desc, endTimestamp)
getSinglePartitionDataInfo(table_id, partition_desc, version)
} else {
getSinglePartitionIncrementalDataInfos(table_id, partition_desc, preVersionTimestamp, endTimestamp)
}
}
} else {
val version = dbManager.getLastedVersionUptoTime(table_id, partition_desc, endTimestamp)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,17 @@ import org.apache.spark.sql.SparkSession
import com.dmetasoul.lakesoul.spark.clean.CleanUtils.sqlToDataframe
import org.apache.hadoop.fs.Path

import java.time.{LocalDateTime, Period, ZoneOffset}
import java.time.{LocalDateTime, Period, ZoneId}
import java.util.TimeZone

object CleanExpiredData {

private val conn = DBConnector.getConn
var serverTimeZone = TimeZone.getDefault.getID

def main(args: Array[String]): Unit = {
val parameter = ParametersTool.fromArgs(args)
serverTimeZone = parameter.get("server.time.zone", serverTimeZone)

val spark: SparkSession = SparkSession.builder
.getOrCreate()
Expand Down Expand Up @@ -210,11 +214,11 @@ object CleanExpiredData {
}

def getExpiredDateZeroTimeStamp(days: Int): Long = {
val currentTime = LocalDateTime.now()
val currentTime = LocalDateTime.now(ZoneId.of(serverTimeZone))
val period = Period.ofDays(days)
val timeLine = currentTime.minus(period)
val zeroTime = timeLine.toLocalDate.atStartOfDay()
val expiredDateTimestamp = zeroTime.toInstant(ZoneOffset.UTC).toEpochMilli
val zeroTime = timeLine.toLocalDate.atStartOfDay(ZoneId.of(serverTimeZone))
val expiredDateTimestamp = zeroTime.toInstant().toEpochMilli
expiredDateTimestamp
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,18 +7,19 @@ package org.apache.spark.sql.lakesoul
import com.dmetasoul.lakesoul.meta.{CommitType, DataFileInfo}
import com.dmetasoul.lakesoul.meta.DBConfig.LAKESOUL_RANGE_PARTITION_SPLITTER
import org.apache.hadoop.fs.Path
import org.apache.spark.sql.Dataset
import org.apache.spark.sql.{Column, Dataset}
import org.apache.spark.sql.catalyst.catalog.BucketSpec
import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeReference, CaseWhen, EqualTo, Literal}
import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeReference, CaseWhen, EqualNullSafe, EqualTo, Literal, Not}
import org.apache.spark.sql.catalyst.plans.logical.Filter
import org.apache.spark.sql.execution.datasources.{BasicWriteJobStatsTracker, FileFormatWriter, WriteJobStatsTracker}
import org.apache.spark.sql.execution.{ProjectExec, QueryExecution, SQLExecution}
import org.apache.spark.sql.functions.col
import org.apache.spark.sql.execution.{FilterExec, ProjectExec, QueryExecution, SQLExecution}
import org.apache.spark.sql.functions.{col, expr, lit}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.lakesoul.exception.LakeSoulErrors
import org.apache.spark.sql.lakesoul.schema.{InvariantCheckerExec, Invariants, SchemaUtils}
import org.apache.spark.sql.lakesoul.sources.LakeSoulSQLConf
import org.apache.spark.sql.lakesoul.utils.SparkUtil
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.types.{BooleanType, StringType, StructType}
import org.apache.spark.util.SerializableConfiguration

import scala.collection.mutable
Expand Down Expand Up @@ -149,12 +150,13 @@ trait TransactionalWrite {
if (cdcCol.nonEmpty) {
val tmpSparkPlan = queryExecution.executedPlan
val outColumns = outputSpec.outputColumns
val nonCdcAttrCols = outColumns.filter(p => (!p.name.equalsIgnoreCase(cdcCol.get)))
val cdcAttrCol = outColumns.filter(p => p.name.equalsIgnoreCase(cdcCol.get))
val pos = outColumns.indexOf(cdcAttrCol(0))
val cdcCaseWhen = CaseWhen.createFromParser(Seq(EqualTo(cdcAttrCol(0), Literal("update")), Literal("insert"), cdcAttrCol(0)))
val alias = Alias(cdcCaseWhen, cdcCol.get)()
val allAttrCols = nonCdcAttrCols :+ alias
ProjectExec(allAttrCols, tmpSparkPlan)
val allAttrCols = outColumns.updated(pos, alias)
val filterCdcAdd = FilterExec(Not(EqualTo(cdcAttrCol(0), Literal("delete"))), tmpSparkPlan)
ProjectExec(allAttrCols, filterCdcAdd)
} else {
queryExecution.executedPlan
}
Expand Down
4 changes: 2 additions & 2 deletions rust/lakesoul-metadata/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ fn get_prepared_statement(
// Select PartitionInfo
DaoType::SelectPartitionVersionByTableIdAndDescAndVersion =>
"select table_id, partition_desc, version, commit_op, snapshot, expression, domain
from partition_info from partition_info
from partition_info
where table_id = $1::TEXT and partition_desc = $2::TEXT and version = $3::INT",
DaoType::SelectOnePartitionVersionByTableIdAndDesc =>
"select m.table_id, t.partition_desc, m.version, m.commit_op, m.snapshot, m.expression, m.domain from (
Expand Down Expand Up @@ -1287,7 +1287,7 @@ pub fn execute_query_scalar(
});
match result {
Ok(Some(row)) => {
let ts = row.get::<_, Option<i64>>(0);
let ts = row.get::<_, Option<i32>>(0);
match ts {
Some(ts) => Ok(Some(format!("{}", ts))),
None => Ok(None)
Expand Down

0 comments on commit e2f5113

Please sign in to comment.