From 1890bf671c1ff2268e6cf2f9f85aad4ac05788b9 Mon Sep 17 00:00:00 2001 From: chenxu Date: Mon, 13 Jan 2025 19:33:08 +0800 Subject: [PATCH] add compaction test Signed-off-by: chenxu --- .../lakesoul/commands/CompactionSuite.scala | 67 ++++++++++++++++++- .../lakesoul/LakeSoulArrowReader.scala | 2 +- rust/lakesoul-io/src/constant.rs | 6 +- 3 files changed, 72 insertions(+), 3 deletions(-) diff --git a/lakesoul-spark/src/test/scala/org/apache/spark/sql/lakesoul/commands/CompactionSuite.scala b/lakesoul-spark/src/test/scala/org/apache/spark/sql/lakesoul/commands/CompactionSuite.scala index 810f10db9..95e2389fe 100644 --- a/lakesoul-spark/src/test/scala/org/apache/spark/sql/lakesoul/commands/CompactionSuite.scala +++ b/lakesoul-spark/src/test/scala/org/apache/spark/sql/lakesoul/commands/CompactionSuite.scala @@ -23,6 +23,8 @@ import org.junit.runner.RunWith import org.scalatest.BeforeAndAfterEach import org.scalatestplus.junit.JUnitRunner +import java.math.MathContext + @RunWith(classOf[JUnitRunner]) class CompactionSuite extends QueryTest with SharedSparkSession with BeforeAndAfterEach @@ -904,11 +906,74 @@ class CompactionSuite extends QueryTest } } - // Auxiliary method: Get the bucket number of table def getTableInfo(tablePath: String): TableInfo = { val sm = SnapshotManagement(SparkUtil.makeQualifiedTablePath(new Path(tablePath)).toString) sm.getTableInfoOnly } + test("Compaction with schema change") { + withSQLConf(LakeSoulSQLConf.SCHEMA_AUTO_MIGRATE.key -> "true") { + withTempDir { tempDir => + val tablePath = tempDir.getCanonicalPath + val spark = SparkSession.active + + val hashBucketNum = 2 + val mathContext = new MathContext(20) + + { + // Create test data + val df = Seq( + (1, "2023-01-01", 10, BigDecimal(10, 2, mathContext), "insert"), + (2, "2023-01-02", 20, BigDecimal(20, 2, mathContext), "insert"), + (3, "2023-01-03", 30, BigDecimal(30, 2, mathContext), "insert"), + (4, "2023-01-04", 40, BigDecimal(40, 2, mathContext), "insert"), + (5, "2023-01-05", 50, BigDecimal(50, 2, mathContext), "insert") + ).toDF("id", "date", "value", "range", "op") + df.printSchema() + + // Write initial data + df.write + .format("lakesoul") + .option("hashPartitions", "id") + .option(SHORT_TABLE_NAME, "compaction_test_table") + .option("lakesoul_cdc_change_column", "op") + .option("hashBucketNum", hashBucketNum.toString) + .save(tablePath) + } + + { + // change schema + val df = Seq( + (1, "1", 1, "insert"), + (2, "2", 1, "update"), + (3, "3", 1, "delete"), + (4, "4", 1, "insert"), + (5, "5", 1, "update") + ).toDF("id", "value", "range", "op") + + val table = LakeSoulTable.forPath(tablePath) + table.upsert(df) + } + + { + // compaction + val table = LakeSoulTable.forPath(tablePath) + LakeSoulTable.uncached(tablePath) + table.compaction() + } + + val table = LakeSoulTable.forPath(tablePath) + LakeSoulTable.uncached(tablePath) + val tableDF = table.toDF + + checkAnswer(tableDF, Seq( + Row(1, "2023-01-01", 1, 1.0, "insert"), + Row(2, "2023-01-02", 2, 1.0, "insert"), + Row(4, "2023-01-04", 4, 1.0, "insert"), + Row(5, "2023-01-05", 5, 1.0, "insert")) + ) + } + } + } } diff --git a/native-io/lakesoul-io-java/src/main/scala/com/dmetasoul/lakesoul/LakeSoulArrowReader.scala b/native-io/lakesoul-io-java/src/main/scala/com/dmetasoul/lakesoul/LakeSoulArrowReader.scala index 7e31fbfe9..52a368017 100644 --- a/native-io/lakesoul-io-java/src/main/scala/com/dmetasoul/lakesoul/LakeSoulArrowReader.scala +++ b/native-io/lakesoul-io-java/src/main/scala/com/dmetasoul/lakesoul/LakeSoulArrowReader.scala @@ -33,8 +33,8 @@ case class LakeSoulArrowReader(reader: NativeIOReader, override def hasNext: Boolean = { if (!finished) { val consumerArray = ArrowArray.allocateNew(reader.getAllocator) - val rowCount = reader.nextBatchBlocked(consumerArray.memoryAddress()); try { + val rowCount = reader.nextBatchBlocked(consumerArray.memoryAddress()); if (rowCount > 0) { Data.importIntoVectorSchemaRoot(reader.getAllocator, consumerArray, root, provider) root.setRowCount(rowCount) diff --git a/rust/lakesoul-io/src/constant.rs b/rust/lakesoul-io/src/constant.rs index c4a446658..6531653cf 100644 --- a/rust/lakesoul-io/src/constant.rs +++ b/rust/lakesoul-io/src/constant.rs @@ -7,6 +7,7 @@ use std::collections::HashMap; use arrow::array::ArrayRef; use arrow::compute::CastOptions; use arrow_array::{new_empty_array, new_null_array}; +use arrow_cast::display::FormatOptions; use arrow_schema::DataType; use lazy_static::lazy_static; @@ -30,7 +31,10 @@ pub static TIMESTAMP_NANOSECOND_FORMAT: &str = "%Y-%m-%dT%H:%M:%S%.9f"; pub static NUM_COLUMN_OPTIMIZE_THRESHOLD: usize = 200; lazy_static! { - pub static ref ARROW_CAST_OPTIONS: CastOptions<'static> = CastOptions::default(); + pub static ref ARROW_CAST_OPTIONS: CastOptions<'static> = CastOptions { + safe: false, + format_options: FormatOptions::default(), + }; } #[derive(Debug, Default)]