diff --git a/lakesoul-spark/src/main/scala/org/apache/spark/sql/lakesoul/TransactionalWrite.scala b/lakesoul-spark/src/main/scala/org/apache/spark/sql/lakesoul/TransactionalWrite.scala index 57c37d25c..aef544661 100644 --- a/lakesoul-spark/src/main/scala/org/apache/spark/sql/lakesoul/TransactionalWrite.scala +++ b/lakesoul-spark/src/main/scala/org/apache/spark/sql/lakesoul/TransactionalWrite.scala @@ -9,9 +9,9 @@ import com.dmetasoul.lakesoul.meta.DBConfig.LAKESOUL_RANGE_PARTITION_SPLITTER import org.apache.hadoop.fs.Path import org.apache.spark.sql.Dataset import org.apache.spark.sql.catalyst.catalog.BucketSpec -import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeReference, CaseWhen, EqualTo, Literal} import org.apache.spark.sql.execution.datasources.{BasicWriteJobStatsTracker, FileFormatWriter, WriteJobStatsTracker} -import org.apache.spark.sql.execution.{QueryExecution, SQLExecution} +import org.apache.spark.sql.execution.{ProjectExec, QueryExecution, SQLExecution} import org.apache.spark.sql.functions.col import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.lakesoul.exception.LakeSoulErrors @@ -145,7 +145,19 @@ trait TransactionalWrite { output) val physicalPlan = if (isCompaction) { - queryExecution.executedPlan + val cdcCol = snapshot.getTableInfo.configuration.get(LakeSoulTableProperties.lakeSoulCDCChangePropKey) + if(cdcCol.nonEmpty){ + val tmpSparkPlan = queryExecution.executedPlan + val outColumns = outputSpec.outputColumns + val nonCdcAttrCols = outColumns.filter(p=>(!p.name.equalsIgnoreCase(cdcCol.get))) + val cdcAttrCol = outColumns.filter(p=>p.name.equalsIgnoreCase(cdcCol.get)) + val cdcCaseWhen = CaseWhen.createFromParser(Seq(EqualTo(cdcAttrCol(0),Literal("update")),Literal("insert"),cdcAttrCol(0))) + val alias = Alias(cdcCaseWhen,cdcCol.get)() + val allAttrCols = nonCdcAttrCols :+ alias + ProjectExec(allAttrCols,tmpSparkPlan) + }else{ + queryExecution.executedPlan + } } else { InvariantCheckerExec(queryExecution.executedPlan, invariants) } diff --git a/lakesoul-spark/src/main/scala/org/apache/spark/sql/lakesoul/commands/CompactionCommand.scala b/lakesoul-spark/src/main/scala/org/apache/spark/sql/lakesoul/commands/CompactionCommand.scala index cff7165c2..73acb28e6 100644 --- a/lakesoul-spark/src/main/scala/org/apache/spark/sql/lakesoul/commands/CompactionCommand.scala +++ b/lakesoul-spark/src/main/scala/org/apache/spark/sql/lakesoul/commands/CompactionCommand.scala @@ -86,7 +86,7 @@ case class CompactionCommand(snapshotManagement: SnapshotManagement, tc.setReadFiles(newReadFiles) tc.setCommitType("compaction") val (newFiles, path) = tc.writeFiles(compactDF, isCompaction = true) - tc.commit(newFiles, newReadFiles, readPartitionInfo) + tc.commit(newFiles, Seq.empty, readPartitionInfo) val partitionStr = escapeSingleBackQuotedString(conditionString) if (hiveTableName.nonEmpty) { val spark = SparkSession.active