Skip to content

Commit

Permalink
add compaction test
Browse files Browse the repository at this point in the history
Signed-off-by: chenxu <[email protected]>
  • Loading branch information
dmetasoul01 committed Jan 13, 2025
1 parent 811b553 commit 1890bf6
Show file tree
Hide file tree
Showing 3 changed files with 72 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ import org.junit.runner.RunWith
import org.scalatest.BeforeAndAfterEach
import org.scalatestplus.junit.JUnitRunner

import java.math.MathContext

@RunWith(classOf[JUnitRunner])
class CompactionSuite extends QueryTest
with SharedSparkSession with BeforeAndAfterEach
Expand Down Expand Up @@ -904,11 +906,74 @@ class CompactionSuite extends QueryTest
}
}


// Auxiliary method: Get the bucket number of table
def getTableInfo(tablePath: String): TableInfo = {
val sm = SnapshotManagement(SparkUtil.makeQualifiedTablePath(new Path(tablePath)).toString)
sm.getTableInfoOnly
}

test("Compaction with schema change") {
withSQLConf(LakeSoulSQLConf.SCHEMA_AUTO_MIGRATE.key -> "true") {
withTempDir { tempDir =>
val tablePath = tempDir.getCanonicalPath
val spark = SparkSession.active

val hashBucketNum = 2
val mathContext = new MathContext(20)

{
// Create test data
val df = Seq(
(1, "2023-01-01", 10, BigDecimal(10, 2, mathContext), "insert"),
(2, "2023-01-02", 20, BigDecimal(20, 2, mathContext), "insert"),
(3, "2023-01-03", 30, BigDecimal(30, 2, mathContext), "insert"),
(4, "2023-01-04", 40, BigDecimal(40, 2, mathContext), "insert"),
(5, "2023-01-05", 50, BigDecimal(50, 2, mathContext), "insert")
).toDF("id", "date", "value", "range", "op")
df.printSchema()

// Write initial data
df.write
.format("lakesoul")
.option("hashPartitions", "id")
.option(SHORT_TABLE_NAME, "compaction_test_table")
.option("lakesoul_cdc_change_column", "op")
.option("hashBucketNum", hashBucketNum.toString)
.save(tablePath)
}

{
// change schema
val df = Seq(
(1, "1", 1, "insert"),
(2, "2", 1, "update"),
(3, "3", 1, "delete"),
(4, "4", 1, "insert"),
(5, "5", 1, "update")
).toDF("id", "value", "range", "op")

val table = LakeSoulTable.forPath(tablePath)
table.upsert(df)
}

{
// compaction
val table = LakeSoulTable.forPath(tablePath)
LakeSoulTable.uncached(tablePath)
table.compaction()
}

val table = LakeSoulTable.forPath(tablePath)
LakeSoulTable.uncached(tablePath)
val tableDF = table.toDF

checkAnswer(tableDF, Seq(
Row(1, "2023-01-01", 1, 1.0, "insert"),
Row(2, "2023-01-02", 2, 1.0, "insert"),
Row(4, "2023-01-04", 4, 1.0, "insert"),
Row(5, "2023-01-05", 5, 1.0, "insert"))
)
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ case class LakeSoulArrowReader(reader: NativeIOReader,
override def hasNext: Boolean = {
if (!finished) {
val consumerArray = ArrowArray.allocateNew(reader.getAllocator)
val rowCount = reader.nextBatchBlocked(consumerArray.memoryAddress());
try {
val rowCount = reader.nextBatchBlocked(consumerArray.memoryAddress());
if (rowCount > 0) {
Data.importIntoVectorSchemaRoot(reader.getAllocator, consumerArray, root, provider)
root.setRowCount(rowCount)
Expand Down
6 changes: 5 additions & 1 deletion rust/lakesoul-io/src/constant.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use std::collections::HashMap;
use arrow::array::ArrayRef;
use arrow::compute::CastOptions;
use arrow_array::{new_empty_array, new_null_array};
use arrow_cast::display::FormatOptions;
use arrow_schema::DataType;

use lazy_static::lazy_static;
Expand All @@ -30,7 +31,10 @@ pub static TIMESTAMP_NANOSECOND_FORMAT: &str = "%Y-%m-%dT%H:%M:%S%.9f";
pub static NUM_COLUMN_OPTIMIZE_THRESHOLD: usize = 200;

lazy_static! {
pub static ref ARROW_CAST_OPTIONS: CastOptions<'static> = CastOptions::default();
pub static ref ARROW_CAST_OPTIONS: CastOptions<'static> = CastOptions {
safe: false,
format_options: FormatOptions::default(),
};
}

#[derive(Debug, Default)]
Expand Down

0 comments on commit 1890bf6

Please sign in to comment.