Skip to content

Commit

Permalink
Support cdc column value change in compaction (#336)
Browse files Browse the repository at this point in the history
* 1.support delete and update in flink batch mode
2.supoort drop|create|select view
3.add test cases

Signed-off-by: maosen <[email protected]>

* add flink view type symbol into properties

Signed-off-by: maosen <[email protected]>

* add view in catalog

Signed-off-by: maosen <[email protected]>

* fix hdfs permission bug

Signed-off-by: maosen <[email protected]>

* 1、remove "del" type files in compaction
2、convert  cdc column value from "update" into "insert" in compaction

Signed-off-by: maosen <[email protected]>

---------

Signed-off-by: maosen <[email protected]>
Co-authored-by: maosen <[email protected]>
  • Loading branch information
moresun and maosen authored Sep 21, 2023
1 parent acbbf80 commit c03015d
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@ import com.dmetasoul.lakesoul.meta.DBConfig.LAKESOUL_RANGE_PARTITION_SPLITTER
import org.apache.hadoop.fs.Path
import org.apache.spark.sql.Dataset
import org.apache.spark.sql.catalyst.catalog.BucketSpec
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeReference, CaseWhen, EqualTo, Literal}
import org.apache.spark.sql.execution.datasources.{BasicWriteJobStatsTracker, FileFormatWriter, WriteJobStatsTracker}
import org.apache.spark.sql.execution.{QueryExecution, SQLExecution}
import org.apache.spark.sql.execution.{ProjectExec, QueryExecution, SQLExecution}
import org.apache.spark.sql.functions.col
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.lakesoul.exception.LakeSoulErrors
Expand Down Expand Up @@ -145,7 +145,19 @@ trait TransactionalWrite {
output)

val physicalPlan = if (isCompaction) {
queryExecution.executedPlan
val cdcCol = snapshot.getTableInfo.configuration.get(LakeSoulTableProperties.lakeSoulCDCChangePropKey)
if(cdcCol.nonEmpty){
val tmpSparkPlan = queryExecution.executedPlan
val outColumns = outputSpec.outputColumns
val nonCdcAttrCols = outColumns.filter(p=>(!p.name.equalsIgnoreCase(cdcCol.get)))
val cdcAttrCol = outColumns.filter(p=>p.name.equalsIgnoreCase(cdcCol.get))
val cdcCaseWhen = CaseWhen.createFromParser(Seq(EqualTo(cdcAttrCol(0),Literal("update")),Literal("insert"),cdcAttrCol(0)))
val alias = Alias(cdcCaseWhen,cdcCol.get)()
val allAttrCols = nonCdcAttrCols :+ alias
ProjectExec(allAttrCols,tmpSparkPlan)
}else{
queryExecution.executedPlan
}
} else {
InvariantCheckerExec(queryExecution.executedPlan, invariants)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ case class CompactionCommand(snapshotManagement: SnapshotManagement,
tc.setReadFiles(newReadFiles)
tc.setCommitType("compaction")
val (newFiles, path) = tc.writeFiles(compactDF, isCompaction = true)
tc.commit(newFiles, newReadFiles, readPartitionInfo)
tc.commit(newFiles, Seq.empty, readPartitionInfo)
val partitionStr = escapeSingleBackQuotedString(conditionString)
if (hiveTableName.nonEmpty) {
val spark = SparkSession.active
Expand Down

0 comments on commit c03015d

Please sign in to comment.