Skip to content

Commit

Permalink
Merge pull request #441 from Ceng23333/dev/consistency_ci
Browse files Browse the repository at this point in the history
[Rust/CI] Add consistency-ci
  • Loading branch information
Ceng23333 authored Feb 7, 2024
2 parents 1d0d150 + b5f1f74 commit 8842480
Show file tree
Hide file tree
Showing 18 changed files with 627 additions and 17 deletions.
136 changes: 136 additions & 0 deletions .github/workflows/consistency-ci.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
# SPDX-FileCopyrightText: 2023 LakeSoul Contributors
#
# SPDX-License-Identifier: Apache-2.0

on:
push:
paths:
- "rust/**"
branches:
- 'main'
pull_request:
paths:
- "rust/**"
branches:
- 'main'
- 'release/**'
workflow_dispatch:

name: Consistency CI

env:
RUSTFLAGS: "-Awarnings"

permissions:
actions: write

jobs:
verify-hash-consistency:
runs-on: ubuntu-latest
services:
# Label used to access the service container
postgres:
# Docker Hub image
image: postgres:14.5
# Provide the password for postgres
env:
POSTGRES_PASSWORD: lakesoul_test
POSTGRES_USER: lakesoul_test
POSTGRES_DB: lakesoul_test
# Set health checks to wait until postgres has started
options: >-
--health-cmd pg_isready
--health-interval 10s
--health-timeout 5s
--health-retries 5
--name lakesoul-test-pg
ports:
# Maps tcp port 5432 on service container to the host
- 5432:5432
steps:
- uses: actions/checkout@v4
- name: Install requirement
uses: ConorMacBride/install-package@v1
with:
apt: postgresql-client-14 cargo
- name: Init PG
run: |
./script/meta_init_for_local_test.sh -j 2
- name: Set up JDK 8
uses: actions/setup-java@v4
with:
java-version: '8'
distribution: 'temurin'
cache: maven
- name: Set up Python 3.9
uses: actions/setup-python@v4
with:
python-version: '3.9'
- name: Install dependencies
run: |
python -m pip install --upgrade pip setuptools wheel
pip install pymysql cryptography jproperties --no-cache-dir
- name: Install Protoc
uses: arduino/setup-protoc@v2
with:
version: "23.x"
repo-token: ${{ secrets.GITHUB_TOKEN }}
- uses: vemonet/setup-spark@v1
with:
spark-version: '3.3.1'
hadoop-version: '3'
- run: spark-submit --version

- uses: actions-rs/toolchain@v1
with:
profile: minimal
toolchain: nightly-2023-05-20
default: true
- uses: Swatinem/rust-cache@v2
with:
workspaces: "./rust -> target"
- uses: actions-rs/cargo@v1
with:
use-cross: true
command: build
args: '--manifest-path rust/Cargo.toml --target x86_64-unknown-linux-gnu --release --all-features'
- name: Build with Maven
run: |
mkdir -p rust/target/release
cp rust/target/x86_64-unknown-linux-gnu/release/liblakesoul_io_c.so rust/target/release
cp rust/target/x86_64-unknown-linux-gnu/release/liblakesoul_metadata_c.so rust/target/release
MAVEN_OPTS="-Xmx4000m" mvn -q -B package -f pom.xml -Pcross-build -DskipTests
- name: Get jar names
run: |
echo "FLINK_JAR_NAME=$(python script/get_jar_name.py lakesoul-flink)" >> $GITHUB_ENV
echo "FLINK_TEST_JAR_NAME=$(python script/get_jar_name.py lakesoul-flink | sed -e 's/.jar/-tests.jar/g')" >> $GITHUB_ENV
echo "SPARK_JAR_NAME=$(python script/get_jar_name.py lakesoul-spark)" >> $GITHUB_ENV
echo "SPARK_TEST_JAR_NAME=$(python script/get_jar_name.py lakesoul-spark | sed -e 's/.jar/-tests.jar/g')" >> $GITHUB_ENV
- name: Copy built jar to work-dir
run: |
cp ./lakesoul-flink/target/$FLINK_JAR_NAME ./script/ci/work-dir
cp ./lakesoul-flink/target/$FLINK_TEST_JAR_NAME ./script/ci/work-dir
cp ./lakesoul-spark/target/$SPARK_JAR_NAME ./script/ci/work-dir
cp ./lakesoul-spark/target/$SPARK_TEST_JAR_NAME ./script/ci/work-dir
- name: Generate benchmark data and expected query results
run: |
mkdir -p lakesoul/test_files/tpch/data
git clone https://github.com/databricks/tpch-dbgen.git
cd tpch-dbgen
make
./dbgen -f -s 0.001
mv *.tbl ../lakesoul/test_files/tpch/data
- name: Verify that benchmark queries return expected results
run: |
export TPCH_DATA=`realpath lakesoul/test_files/tpch/data`
cd ./rust
# use release build for plan verificaton because debug build causes stack overflow
cargo test load_tpch_data --package lakesoul-datafusion --features=ci -- --nocapture
PGPASSWORD='lakesoul_test' psql -U lakesoul_test -h 127.0.0.1 -p 5432 -d lakesoul_test -c "select * from table_info;"
PGPASSWORD='lakesoul_test' psql -U lakesoul_test -h 127.0.0.1 -p 5432 -d lakesoul_test -c "select * from data_commit_info;"
- name: Hash-Consistency Verification task
run: |
export TPCH_DATA=`realpath lakesoul/test_files/tpch/data`
export lakesoul_home=`realpath script/ci/work-dir/lakesoul.properties`
spark-submit --driver-memory 4G --executor-memory 4G --conf spark.driver.memoryOverhead=1500m --conf spark.executor.memoryOverhead=1500m --jars script/ci/work-dir/$SPARK_JAR_NAME --class org.apache.spark.sql.lakesoul.benchmark.ConsistencyCI --master local[4] script/ci/work-dir/$SPARK_TEST_JAR_NAME
10 changes: 5 additions & 5 deletions .github/workflows/rust-ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,10 @@ jobs:

steps:
- uses: actions/checkout@v4
- name: Install psql
run: sudo apt-get install -y postgresql-client-14
- name: Install cargo
run: sudo apt-get install -y cargo
- name: Install requirement
uses: ConorMacBride/install-package@v1
with:
apt: postgresql-client-14 cargo
- name: Init PG
run: |
./script/meta_init_for_local_test.sh -j 2
Expand All @@ -59,5 +59,5 @@ jobs:
with:
version: "23.x"
- name: Run tests
run: cd rust && sudo cargo test --all-features --package lakesoul-datafusion
run: cd rust && sudo cargo test --package lakesoul-datafusion

Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ public List<String> listTablePathsByNamespace(String tableNamespace) {
return tablePathIdDao.listAllPathByNamespace(tableNamespace);
}

public List<TableInfo> getTableInfosByNamespace(String tableNamespace){
public List<TableInfo> getTableInfosByNamespace(String tableNamespace) {
return tableInfoDao.selectByNamespace(tableNamespace);
}

Expand Down Expand Up @@ -233,7 +233,7 @@ public void deleteSinglePartitionMetaInfo(String tableId, String partitionDesc,
}

private void getSnapshotAndFilePathInfo(String tableId, String partitionDesc, List<DataFileOp> fileOps, List<String> deleteFilePathList,
List<PartitionInfo> filterPartitionInfo, Set<Uuid> snapshotList) {
List<PartitionInfo> filterPartitionInfo, Set<Uuid> snapshotList) {
filterPartitionInfo.forEach(p -> snapshotList.addAll(p.getSnapshotList()));
List<DataCommitInfo> filterDataCommitInfo =
dataCommitInfoDao.selectByTableIdPartitionDescCommitList(tableId, partitionDesc, snapshotList.stream().collect(Collectors.toList()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,13 +95,13 @@ object SparkMetaVersion {
import scala.util.parsing.json.JSON
val configuration = JSON.parseFull(properties)
val configurationMap = configuration match {
case Some(map: collection.immutable.Map[String, String]) => map
case Some(map: collection.immutable.Map[String, Any]) => map.toSeq.map(kv => (kv._1, kv._2.toString)).toMap
}

// table may have no partition at all or only have range or hash partition
val partitionCols = DBUtil.parseTableInfoPartitions(partitions);
val bucket_num = configurationMap.get("hashBucketNum") match {
case Some(value) => value.toInt
val partitionCols = DBUtil.parseTableInfoPartitions(partitions)
val bucket_num = configurationMap.get(LakeSoulOptions.HASH_BUCKET_NUM) match {
case Some(value) => value.toDouble.toInt
case _ => -1
}
TableInfo(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ trait ImplicitMetadataOperation extends Logging {
if (partitionColumns.equalsIgnoreCase("")) {
Seq.empty[String]
} else {
partitionColumns.split(LAKESOUL_RANGE_PARTITION_SPLITTER).toSeq
partitionColumns.split(LAKESOUL_RANGE_PARTITION_SPLITTER).map(s => s.trim).toSeq
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
package org.apache.spark.sql.lakesoul.benchmark

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.lakesoul.catalog.LakeSoulCatalog
import org.apache.spark.sql.types._

object ConsistencyCI {


val tpchTable = Seq(
("customer",
StructType(Array(
StructField("c_custkey", LongType, nullable = false),
StructField("c_name", StringType, nullable = false),
StructField("c_address", StringType, nullable = false),
StructField("c_nationkey", LongType, nullable = false),
StructField("c_phone", StringType, nullable = false),
StructField("c_acctbal", DecimalType(15, 2), nullable = false),
StructField("c_mktsegment", StringType, nullable = false),
StructField("c_comment", StringType, nullable = false),
)),
"c_custkey, c_name"),
("part",
StructType(Array(
StructField("p_partkey", LongType, nullable = false),
StructField("p_name", StringType, nullable = false),
StructField("p_mfgr", StringType, nullable = false),
StructField("p_brand", StringType, nullable = false),
StructField("p_type", StringType, nullable = false),
StructField("p_size", IntegerType, nullable = false),
StructField("p_container", StringType, nullable = false),
StructField("p_retailprice", DecimalType(15, 2), nullable = false),
StructField("p_comment", StringType, nullable = false),
)),
"p_partkey, p_name"),
("supplier",
StructType(Array(
StructField("s_suppkey", LongType, nullable = false),
StructField("s_name", StringType, nullable = false),
StructField("s_address", StringType, nullable = false),
StructField("s_nationkey", LongType, nullable = false),
StructField("s_phone", StringType, nullable = false),
StructField("s_acctbal", DecimalType(15, 2), nullable = false),
StructField("s_comment", StringType, nullable = false),
)),
"s_suppkey, s_name"),
("partsupp",
StructType(Array(
StructField("ps_partkey", LongType, nullable = false),
StructField("ps_suppkey", LongType, nullable = false),
StructField("ps_availqty", IntegerType, nullable = false),
StructField("ps_supplycost", DecimalType(15, 2), nullable = false),
StructField("ps_comment", StringType, nullable = false),
)),
"ps_partkey, ps_suppkey"),
("orders",
StructType(Array(
StructField("o_orderkey", LongType, nullable = false),
StructField("o_custkey", LongType, nullable = false),
StructField("o_orderstatus", StringType, nullable = false),
StructField("o_totalprice", DecimalType(15, 2), nullable = false),
StructField("o_orderdate", DateType, nullable = false),
StructField("o_orderpriority", StringType, nullable = false),
StructField("o_clerk", StringType, nullable = false),
StructField("o_shippriority", IntegerType, nullable = false),
StructField("o_comment", StringType, nullable = false),
)),
"o_orderkey, o_custkey"),

("nation",
StructType(Array(
StructField("n_nationkey", LongType, nullable = false),
StructField("n_name", StringType, nullable = false),
StructField("n_regionkey", LongType, nullable = false),
StructField("n_comment", StringType, nullable = false),
)),
"n_nationkey, n_name"),
("region",
StructType(Array(
StructField("r_regionkey", LongType, nullable = false),
StructField("r_name", StringType, nullable = false),
StructField("r_comment", StringType, nullable = false),
)),
"r_regionkey, r_name"),
("lineitem",
StructType(Array(
StructField("l_orderkey", LongType, nullable = false),
StructField("l_partkey", LongType, nullable = false),
StructField("l_suppkey", LongType, nullable = false),
StructField("l_linenumber", IntegerType, nullable = false),
StructField("l_quantity", DecimalType(15, 2), nullable = false),
StructField("l_extendedprice", DecimalType(15, 2), nullable = false),
StructField("l_discount", DecimalType(15, 2), nullable = false),
StructField("l_tax", DecimalType(15, 2), nullable = false),
StructField("l_returnflag", StringType, nullable = false),
StructField("l_linestatus", StringType, nullable = false),
StructField("l_shipdate", DateType, nullable = false),
StructField("l_commitdate", DateType, nullable = false),
StructField("l_receiptdate", DateType, nullable = false),
StructField("l_shipinstruct", StringType, nullable = false),
StructField("l_shipmode", StringType, nullable = false),
StructField("l_comment", StringType, nullable = false),
)),
"l_orderkey, l_partkey"),
)

def load_data(spark: SparkSession): Unit = {

val tpchPath = System.getenv("TPCH_DATA")
val lakeSoulPath = "/tmp/lakesoul/tpch"
tpchTable.foreach(tup => {
val (name, schema, hashPartitions) = tup
val df = spark.read.option("delimiter", "|")
.schema(schema)
.csv(s"$tpchPath/$name.tbl")
// df.show
df.write.format("lakesoul")
.option("shortTableName", name)
.option("hashPartitions", hashPartitions)
.option("hashBucketNum", 5)
.mode("Overwrite")
.save(s"$lakeSoulPath/$name")
})

}

def main(args: Array[String]): Unit = {
val builder = SparkSession.builder()
.appName("Consistency CI")
.master("local[4]")
.config("spark.sql.extensions", "com.dmetasoul.lakesoul.sql.LakeSoulSparkSessionExtension")
.config("spark.sql.catalog.lakesoul", classOf[LakeSoulCatalog].getName)
.config(SQLConf.DEFAULT_CATALOG.key, LakeSoulCatalog.CATALOG_NAME)
.config("spark.default.parallelism", "16")
.config("spark.sql.parquet.binaryAsString", "true")

val spark = builder.getOrCreate()
spark.sparkContext.setLogLevel("ERROR")

spark.sql("show tables")
spark.sql("CREATE NAMESPACE IF NOT EXISTS tpch")
spark.sql("USE tpch")

load_data(spark)

tpchTable.foreach(tup => {
val sparkDF = spark.sql(s"select * from ${tup._1}")
val rustDF = spark.sql(s"select * from default.${tup._1}")
println(s"${tup._1} sparkDF: ")
sparkDF.show
println(s"${tup._1} rustDF: ")
rustDF.show
// val diff1 = sparkDF.rdd.subtract(rustDF.rdd)
// val diff2 = rustDF.rdd.subtract(sparkDF.rdd)
// val result = diff1.count() == 0 && diff2.count() == 0
// if (!result) {
// println("sparkDF: ")
// println(sparkDF.collectAsList())
// println("rustDF: ")
// println(rustDF.collectAsList())
// System.exit(1)
// }
})

}
}
5 changes: 4 additions & 1 deletion rust/lakesoul-datafusion/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,4 +36,7 @@ anyhow = { workspace = true }
ctor = "0.2"
test-log = { version = "0.2.14", features = ["trace"] }
rand = "0.8.5"
rand_chacha = "0.3.1"
rand_chacha = "0.3.1"

[features]
ci = []
5 changes: 5 additions & 0 deletions rust/lakesoul-datafusion/src/benchmarks/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
// SPDX-FileCopyrightText: 2023 LakeSoul Contributors
//
// SPDX-License-Identifier: Apache-2.0

pub mod tpch;
Loading

0 comments on commit 8842480

Please sign in to comment.