Skip to content

Commit

Permalink
fix ci
Browse files Browse the repository at this point in the history
Signed-off-by: zenghua <[email protected]>
  • Loading branch information
zenghua committed Oct 26, 2024
1 parent 7697a34 commit df4a71e
Show file tree
Hide file tree
Showing 4 changed files with 14 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ abstract class MergeDeltaParquetScan(sparkSession: SparkSession,
val nativeIOEnable = sparkSession.sessionState.conf.getConf(LakeSoulSQLConf.NATIVE_IO_ENABLE)
if (nativeIOEnable) {
NativeMergeParquetPartitionReaderFactory(sparkSession.sessionState.conf, broadcastedConf,
dataSchema, readDataSchema, readPartitionSchema, pushedFilters, mergeOperatorInfo, defaultMergeOp)
dataSchema, readDataSchema, readPartitionSchema, pushedFilters, mergeOperatorInfo, defaultMergeOp, options.asScala.toMap)
} else {
MergeParquetPartitionReaderFactory(sparkSession.sessionState.conf, broadcastedConf,
dataSchema, readDataSchema, readPartitionSchema, newFilters, mergeOperatorInfo, defaultMergeOp)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,8 @@ case class NativeMergeParquetPartitionReaderFactory(sqlConf: SQLConf,
partitionSchema: StructType,
filters: Array[Filter],
mergeOperatorInfo: Map[String, MergeOperator[Any]],
defaultMergeOp: MergeOperator[Any])
defaultMergeOp: MergeOperator[Any],
options: Map[String, String] = Map.empty)
extends NativeMergeFilePartitionReaderFactory(mergeOperatorInfo, defaultMergeOp) with Logging {

private val isCaseSensitive = sqlConf.caseSensitiveAnalysis
Expand All @@ -70,8 +71,8 @@ case class NativeMergeParquetPartitionReaderFactory(sqlConf: SQLConf,
private val nativeIOPrefecherBufferSize = sqlConf.getConf(NATIVE_IO_PREFETCHER_BUFFER_SIZE)
private val nativeIOThreadNum = sqlConf.getConf(NATIVE_IO_THREAD_NUM)
private val nativeIOAwaitTimeout = sqlConf.getConf(NATIVE_IO_READER_AWAIT_TIMEOUT)
private val nativeIOCdcColumn = sqlConf.getConf(NATIVE_IO_CDC_COLUMN)
private val nativeIOIsCompacted = sqlConf.getConf(NATIVE_IO_IS_COMPACTED)
private val nativeIOCdcColumn = options.getOrElse(NATIVE_IO_CDC_COLUMN.key, "")
private val nativeIOIsCompacted = options.getOrElse(NATIVE_IO_IS_COMPACTED.key, "false")

// schemea: path->schema source: path->file|path->file|path->file
private val requestSchemaMap: mutable.Map[String, String] = broadcastedConf.value.value
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,25 +114,28 @@ case class LakeSoulScanBuilder(sparkSession: SparkSession,
} else {
hasNoDeltaFile = fileInfo.forall(f => f._2.size <= 1)
}
val writableOptions = mutable.Map.empty[String, String] ++ options.asScala
if (fileIndex.snapshotManagement.snapshot.getPartitionInfoArray.forall(p => p.commit_op.equals("CompactionCommit"))) {
sparkSession.sessionState.conf.setConfString(NATIVE_IO_IS_COMPACTED.key, "true")
println(s"set NATIVE_IO_IS_COMPACTED with ${fileIndex.snapshotManagement.snapshot.getPartitionInfoArray.mkString("Array(", ", ", ")")}")
writableOptions.put(NATIVE_IO_IS_COMPACTED.key, "true")
}
val updatedOptions = new CaseInsensitiveStringMap(writableOptions.asJava)
if (fileInfo.isEmpty) {
EmptyParquetScan(sparkSession, hadoopConf, fileIndex, dataSchema, readDataSchema(),
readPartitionSchema(), pushedParquetFilters, options, partitionFilters, dataFilters)
readPartitionSchema(), pushedParquetFilters, updatedOptions, partitionFilters, dataFilters)
} else if (tableInfo.hash_partition_columns.isEmpty) {
parquetScan()
} else if (onlyOnePartition) {
OnePartitionMergeBucketScan(sparkSession, hadoopConf, fileIndex, dataSchema, mergeReadDataSchema(),
readPartitionSchema(), pushedParquetFilters, options, tableInfo, partitionFilters, dataFilters)
readPartitionSchema(), pushedParquetFilters, updatedOptions, tableInfo, partitionFilters, dataFilters)
} else {
if (sparkSession.sessionState.conf
.getConf(LakeSoulSQLConf.BUCKET_SCAN_MULTI_PARTITION_ENABLE)) {
MultiPartitionMergeBucketScan(sparkSession, hadoopConf, fileIndex, dataSchema, mergeReadDataSchema(),
readPartitionSchema(), pushedParquetFilters, options, tableInfo, partitionFilters, dataFilters)
readPartitionSchema(), pushedParquetFilters, updatedOptions, tableInfo, partitionFilters, dataFilters)
} else {
MultiPartitionMergeScan(sparkSession, hadoopConf, fileIndex, dataSchema, mergeReadDataSchema(),
readPartitionSchema(), pushedParquetFilters, options, tableInfo, partitionFilters, dataFilters)
readPartitionSchema(), pushedParquetFilters, updatedOptions, tableInfo, partitionFilters, dataFilters)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import org.apache.spark.sql.connector.write._
import org.apache.spark.sql.lakesoul._
import org.apache.spark.sql.lakesoul.commands.WriteIntoTable
import org.apache.spark.sql.lakesoul.exception.LakeSoulErrors
import org.apache.spark.sql.lakesoul.sources.LakeSoulSQLConf.NATIVE_IO_CDC_COLUMN
import org.apache.spark.sql.lakesoul.sources.{LakeSoulDataSource, LakeSoulSQLConf, LakeSoulSourceUtils}
import org.apache.spark.sql.lakesoul.utils.SparkUtil
import org.apache.spark.sql.sources.{BaseRelation, Filter, InsertableRelation}
Expand Down Expand Up @@ -65,14 +64,7 @@ case class LakeSoulTableV2(spark: SparkSession,

// The loading of the SnapshotManagement is lazy in order to reduce the amount of FileSystem calls,
// in cases where we will fallback to the V1 behavior.
lazy val snapshotManagement: SnapshotManagement = {
val mgr = SnapshotManagement(rootPath, namespace)
val cdcColumn = mgr.snapshot.getTableInfo.configuration.get(LakeSoulTableProperties.lakeSoulCDCChangePropKey)
if (cdcColumn.isDefined) {
spark.sessionState.conf.setConfString(NATIVE_IO_CDC_COLUMN.key, cdcColumn.get)
}
mgr
}
lazy val snapshotManagement: SnapshotManagement = SnapshotManagement(rootPath, namespace)

override def name(): String = catalogTable.map(_.identifier.unquotedString)
.orElse(tableIdentifier)
Expand Down

0 comments on commit df4a71e

Please sign in to comment.