diff --git a/.github/workflows/maven-test.yml b/.github/workflows/maven-test.yml index fe83929fd..a603b92aa 100644 --- a/.github/workflows/maven-test.yml +++ b/.github/workflows/maven-test.yml @@ -277,7 +277,7 @@ jobs: continue-on-error: true uses: actions/upload-artifact@v4 with: - name: maven-test-report-artifact-spark-2 + name: maven-test-report-artifact-spark-3 path: lakesoul-spark/target/site retention-days: 5 if-no-files-found: error @@ -432,7 +432,7 @@ jobs: continue-on-error: true uses: actions/upload-artifact@v4 with: - name: maven-test-report-artifact-flink-1 + name: maven-test-report-artifact-flink-2 path: lakesoul-flink/target/site retention-days: 5 if-no-files-found: error diff --git a/.github/workflows/presto-cdc-test.yml b/.github/workflows/presto-cdc-test.yml index 89b5dcbbb..9e3a089ad 100644 --- a/.github/workflows/presto-cdc-test.yml +++ b/.github/workflows/presto-cdc-test.yml @@ -100,7 +100,7 @@ jobs: echo "FLINK_TEST_JAR_NAME=$(python script/get_jar_name.py lakesoul-flink | sed -e 's/.jar/-tests.jar/g')" >> $GITHUB_ENV echo "SPARK_JAR_NAME=$(python script/get_jar_name.py lakesoul-spark)" >> $GITHUB_ENV echo "SPARK_TEST_JAR_NAME=$(python script/get_jar_name.py lakesoul-spark | sed -e 's/.jar/-tests.jar/g')" >> $GITHUB_ENV - echo "PRESTO_JAR_NAME=$(python script/get_jar_name.py lakesoul-presto | sed -e 's/.jar/-jar-with-dependencies.jar/g')" >> $GITHUB_ENV + echo "PRESTO_JAR_NAME=$(python script/get_jar_name.py lakesoul-presto)" >> $GITHUB_ENV echo "PRESTO_TEST_JAR_NAME=$(python script/get_jar_name.py lakesoul-presto | sed -e 's/.jar/-tests.jar/g')" >> $GITHUB_ENV - name: Copy built jar to work-dir run: | @@ -131,13 +131,11 @@ jobs: run: | docker exec -t lakesoul-docker-compose-env-jobmanager-1 flink run -d -c org.apache.flink.lakesoul.test.benchmark.LakeSoulSourceToSinkTable -C file:///opt/flink/work-dir/$FLINK_JAR_NAME /opt/flink/work-dir/$FLINK_TEST_JAR_NAME --source.database.name test_cdc --source.table.name default_init --sink.database.name flink_sink --sink.table.name default_init --use.cdc true --hash.bucket.number 2 --job.checkpoint_interval 10000 --server_time_zone UTC --warehouse.path s3://lakesoul-test-bucket/flink-sink/data --flink.checkpoint s3://lakesoul-test-bucket/flink-sink/chk sleep 30s - # - name: Start flink DataGenSource without primary key task-3 - # run: | - # docker exec -t lakesoul-docker-compose-env-jobmanager-1 flink run -d -c org.apache.flink.lakesoul.test.benchmark.LakeSoulDataGenSourceTable -C file:///opt/flink/work-dir/$FLINK_JAR_NAME /opt/flink/work-dir/$FLINK_TEST_JAR_NAME --sink.database.name flink --sink.table.name sink_table --job.checkpoint_interval 10000 --server_time_zone UTC --warehouse.path s3://lakesoul-test-bucket/flink/ --flink.checkpoint s3://lakesoul-test-bucket/flink/chk --sink.parallel 2 --data.size 1000 --write.time 5 - name: Download mysql driver jar run: | cd ./script/benchmark/work-dir if [ ! -e mysql-connector-java-8.0.30.jar ]; then wget -q https://repo1.maven.org/maven2/mysql/mysql-connector-java/8.0.30/mysql-connector-java-8.0.30.jar; fi + if [ ! -e presto-jdbc-0.282.jar ]; then wget -q https://repo1.maven.org/maven2/com/facebook/presto/presto-jdbc/0.282/presto-jdbc-0.282.jar; fi - name: Create table and insert data run: | cd ./script/benchmark @@ -150,11 +148,11 @@ jobs: - name: "[Check] Mysql cdc data accuracy verification task" run: | cd ./script/benchmark - docker run --cpus 2 -m 5000m --net container:presto --rm -t -v ${PWD}/work-dir:/root openjdk:11 java -cp /root/$PRESTO_TEST_JAR_NAME:/root/$PRESTO_JAR_NAME com.facebook.presto.benchmark.Benchmark + docker run --cpus 2 -m 5000m --net container:presto --rm -t -v ${PWD}/work-dir:/root openjdk:11 java -cp /root/$PRESTO_TEST_JAR_NAME:/root/$PRESTO_JAR_NAME:/root/mysql-connector-java-8.0.30.jar:/root/presto-jdbc-0.282.jar com.facebook.presto.benchmark.Benchmark - name: "[Check] Presto source to sink data accuracy verification task" run: | cd ./script/benchmark - docker run --cpus 2 -m 5000m --net container:presto --rm -t -v ${PWD}/work-dir:/root openjdk:11 java -cp /root/$PRESTO_TEST_JAR_NAME:/root/$PRESTO_JAR_NAME com.facebook.presto.benchmark.Benchmark --cdc.contract false --single.table.contract true + docker run --cpus 2 -m 5000m --net container:presto --rm -t -v ${PWD}/work-dir:/root openjdk:11 java -cp /root/$PRESTO_TEST_JAR_NAME:/root/$PRESTO_JAR_NAME:/root/mysql-connector-java-8.0.30.jar:/root/presto-jdbc-0.282.jar com.facebook.presto.benchmark.Benchmark --cdc.contract false --single.table.contract true - name: Adding columns for tables and deleting some data from tables run: | cd ./script/benchmark @@ -165,11 +163,11 @@ jobs: - name: "[Check] Mysql cdc data accuracy verification task" run: | cd ./script/benchmark - docker run --cpus 2 -m 5000m --net container:presto --rm -t -v ${PWD}/work-dir:/root openjdk:11 java -cp /root/$PRESTO_TEST_JAR_NAME:/root/$PRESTO_JAR_NAME com.facebook.presto.benchmark.Benchmark + docker run --cpus 2 -m 5000m --net container:presto --rm -t -v ${PWD}/work-dir:/root openjdk:11 java -cp /root/$PRESTO_TEST_JAR_NAME:/root/$PRESTO_JAR_NAME:/root/mysql-connector-java-8.0.30.jar:/root/presto-jdbc-0.282.jar com.facebook.presto.benchmark.Benchmark - name: "[Check] Presto source to sink data accuracy verification task" run: | cd ./script/benchmark - docker run --cpus 2 -m 5000m --net container:presto --rm -t -v ${PWD}/work-dir:/root openjdk:11 java -cp /root/$PRESTO_TEST_JAR_NAME:/root/$PRESTO_JAR_NAME com.facebook.presto.benchmark.Benchmark --cdc.contract false --single.table.contract true + docker run --cpus 2 -m 5000m --net container:presto --rm -t -v ${PWD}/work-dir:/root openjdk:11 java -cp /root/$PRESTO_TEST_JAR_NAME:/root/$PRESTO_JAR_NAME:/root/mysql-connector-java-8.0.30.jar:/root/presto-jdbc-0.282.jar com.facebook.presto.benchmark.Benchmark --cdc.contract false --single.table.contract true - name: Updating data in tables run: | cd ./script/benchmark @@ -178,11 +176,11 @@ jobs: - name: "[Check] Mysql cdc data accuracy verification task" run: | cd ./script/benchmark - docker run --cpus 2 -m 5000m --net container:presto --rm -t -v ${PWD}/work-dir:/root openjdk:11 java -cp /root/$PRESTO_TEST_JAR_NAME:/root/$PRESTO_JAR_NAME com.facebook.presto.benchmark.Benchmark + docker run --cpus 2 -m 5000m --net container:presto --rm -t -v ${PWD}/work-dir:/root openjdk:11 java -cp /root/$PRESTO_TEST_JAR_NAME:/root/$PRESTO_JAR_NAME:/root/mysql-connector-java-8.0.30.jar:/root/presto-jdbc-0.282.jar com.facebook.presto.benchmark.Benchmark - name: "[Check] Presto source to sink data accuracy verification task" run: | cd ./script/benchmark - docker run --cpus 2 -m 5000m --net container:presto --rm -t -v ${PWD}/work-dir:/root openjdk:11 java -cp /root/$PRESTO_TEST_JAR_NAME:/root/$PRESTO_JAR_NAME com.facebook.presto.benchmark.Benchmark --cdc.contract false --single.table.contract true + docker run --cpus 2 -m 5000m --net container:presto --rm -t -v ${PWD}/work-dir:/root openjdk:11 java -cp /root/$PRESTO_TEST_JAR_NAME:/root/$PRESTO_JAR_NAME:/root/mysql-connector-java-8.0.30.jar:/root/presto-jdbc-0.282.jar com.facebook.presto.benchmark.Benchmark --cdc.contract false --single.table.contract true - name: Dropping columns and deleting some data in tables run: | cd ./script/benchmark @@ -193,15 +191,11 @@ jobs: - name: "[Check] Mysql cdc data accuracy verification task" run: | cd ./script/benchmark - docker run --cpus 2 -m 5000m --net container:presto --rm -t -v ${PWD}/work-dir:/root openjdk:11 java -cp /root/$PRESTO_TEST_JAR_NAME:/root/$PRESTO_JAR_NAME com.facebook.presto.benchmark.Benchmark + docker run --cpus 2 -m 5000m --net container:presto --rm -t -v ${PWD}/work-dir:/root openjdk:11 java -cp /root/$PRESTO_TEST_JAR_NAME:/root/$PRESTO_JAR_NAME:/root/mysql-connector-java-8.0.30.jar:/root/presto-jdbc-0.282.jar com.facebook.presto.benchmark.Benchmark - name: "[Check] Presto source to sink data accuracy verification task" run: | cd ./script/benchmark - docker run --cpus 2 -m 5000m --net container:presto --rm -t -v ${PWD}/work-dir:/root openjdk:11 java -cp /root/$PRESTO_TEST_JAR_NAME:/root/$PRESTO_JAR_NAME com.facebook.presto.benchmark.Benchmark --cdc.contract false --single.table.contract true - # - name: "[Check] Table without primary key data accuracy verification task" - # run: | - # cd ./script/benchmark - # docker run --cpus 2 -m 5000m --net lakesoul-docker-compose-env_default --rm -t -v ${PWD}/work-dir:/opt/spark/work-dir --env lakesoul_home=/opt/spark/work-dir/lakesoul.properties bitnami/spark:3.3.1 spark-submit --driver-memory 4G --executor-memory 4G --conf spark.driver.memoryOverhead=1500m --conf spark.executor.memoryOverhead=1500m --conf spark.dmetasoul.lakesoul.native.io.enable=true --jars /opt/spark/work-dir/$SPARK_JAR_NAME,/opt/spark/work-dir/mysql-connector-java-8.0.30.jar --class org.apache.spark.sql.lakesoul.benchmark.FlinkWriteDataCheck --master local[4] /opt/spark/work-dir/$SPARK_TEST_JAR_NAME --csv.path s3://lakesoul-test-bucket/flink/csv --lakesoul.table.path s3://lakesoul-test-bucket/flink/sink_table --server.time.zone UTC + docker run --cpus 2 -m 5000m --net container:presto --rm -t -v ${PWD}/work-dir:/root openjdk:11 java -cp /root/$PRESTO_TEST_JAR_NAME:/root/$PRESTO_JAR_NAME:/root/mysql-connector-java-8.0.30.jar:/root/presto-jdbc-0.282.jar com.facebook.presto.benchmark.Benchmark --cdc.contract false --single.table.contract true - name: Print Flink Log if: always() run: | diff --git a/lakesoul-common/pom.xml b/lakesoul-common/pom.xml index dd1a93c5b..93515dc8a 100644 --- a/lakesoul-common/pom.xml +++ b/lakesoul-common/pom.xml @@ -332,7 +332,6 @@ SPDX-License-Identifier: Apache-2.0 3.22.0 - com.github.jnr @@ -342,7 +341,7 @@ SPDX-License-Identifier: Apache-2.0 org.apache.hadoop hadoop-client-api - 3.3.6 + 3.3.2 ${local.scope} diff --git a/lakesoul-flink/pom.xml b/lakesoul-flink/pom.xml index c292107e7..1a9d24282 100644 --- a/lakesoul-flink/pom.xml +++ b/lakesoul-flink/pom.xml @@ -103,6 +103,7 @@ SPDX-License-Identifier: Apache-2.0 org.apache.flink flink-formats ${flink.version} + ${local.scope} pom @@ -130,6 +131,7 @@ SPDX-License-Identifier: Apache-2.0 com.ververica flink-sql-connector-sqlserver-cdc ${cdc.version} + ${local.scope} com.ververica @@ -140,36 +142,43 @@ SPDX-License-Identifier: Apache-2.0 com.ververica flink-sql-connector-oracle-cdc ${cdc.version} + ${local.scope} com.ververica flink-sql-connector-postgres-cdc ${cdc.version} + ${local.scope} com.ververica flink-sql-connector-mongodb-cdc ${cdc.version} + ${local.scope} org.apache.flink flink-connector-mongodb 1.0.1-1.17 + ${local.scope} org.mongodb bson - 4.3.4 + 4.7.2 + ${local.scope} org.apache.doris flink-doris-connector-1.17 1.5.0 + ${local.scope} org.apache.flink flink-connector-jdbc 3.1.1-1.17 + ${local.scope} org.apache.flink @@ -273,6 +282,16 @@ SPDX-License-Identifier: Apache-2.0 org.furyio fury-core 0.4.1 + + + org.slf4j + * + + + org.checkerframework + checker-qual + + @@ -328,6 +347,18 @@ SPDX-License-Identifier: Apache-2.0 ${flink.version} ${local.scope} + + org.apache.hadoop + hadoop-client-api + 3.3.2 + ${local.scope} + + + org.apache.hadoop + hadoop-client-runtime + 3.3.2 + ${local.scope} + @@ -413,19 +444,15 @@ SPDX-License-Identifier: Apache-2.0 com.dmetasoul:lakesoul-flink com.ververica:flink-sql-connector-mysql-cdc - com.ververica:flink-sql-connector-postgres-cdc - com.ververica:flink-sql-connector-oracle-cdc - org.apache.flink:flink-connector-jdbc - org.apache.flink:flink-doris-connector-1.17 com.dmetasoul:lakesoul-common com.dmetasoul:lakesoul-io-java com.github.jnr:* org.ow2.asm:* org.apache.arrow:* + org.eclipse.collections:* org.apache.parquet:* org.apache.yetus:* - io.netty:netty-buffer - io.netty:netty-common + io.netty:* com.google.flatbuffers:* com.zaxxer:HikariCP org.postgresql:postgresql @@ -476,16 +503,19 @@ SPDX-License-Identifier: Apache-2.0 - com.ververica:flink-sql-connector-mysql-cdc + *:* - org/apache/flink/shaded/guava30/** - META-INF/maven/com.google.guava/** + META-INF/*.SF + META-INF/*.DSA + META-INF/*.RSA + META-INF/versions/** - org.apache.arrow:arrow-c-data + com.ververica:flink-sql-connector-mysql-cdc - org/apache/arrow/c/jni/JniLoader.class + org/apache/flink/shaded/guava30/** + META-INF/maven/com.google.guava/** @@ -554,10 +584,58 @@ SPDX-License-Identifier: Apache-2.0 shaded.parquet com.lakesoul.shaded.shaded.parquet + + com.fasterxml.jackson + com.lakesoul.shaded.com.fasterxml.jackson + + + org.json4s + com.lakesoul.shaded.org.json4s + + + dev.failsafe + com.lakesoul.shaded.dev.failsafe + + + org.aspectj + com.lakesoul.shaded.org.aspectj + + + org.checkerframework + com.lakesoul.shaded.org.checkerframework + org.yaml.snakeyaml com.lakesoul.shaded.org.yaml.snakeyaml + + org.antlr + com.lakesoul.shaded.org.antlr + + + io.substrait + com.lakesoul.shaded.io.substrait + + + org.stringtemplate + com.lakesoul.shaded.org.stringtemplate + + + org.abego + com.lakesoul.shaded.org.abego + + + org.antlr + com.lakesoul.shaded.org.antlr + + + org.ow2.asm + com.lakesoul.shaded.org.ow2.asm + + + org.objectweb.asm + com.lakesoul.shaded.org.objectweb.asm + UTF-8 0.282 provided - 8.1.0 @@ -63,13 +62,6 @@ ${presto.version} test - - com.facebook.presto - presto-jdbc - compile - ${presto.version} - - org.apache.parquet parquet-column @@ -83,10 +75,20 @@ - com.mysql - mysql-connector-j - compile - ${mysql.version} + com.facebook.presto + presto-jdbc + ${presto.version} + ${local.scope} + + + org.apache.hadoop + hadoop-client-api + 3.3.2 + + + org.apache.hadoop + hadoop-client-runtime + 3.3.2 @@ -100,27 +102,35 @@ org.apache.maven.plugins - maven-assembly-plugin - 3.1.0 - - - - - - - - jar-with-dependencies - - + maven-shade-plugin + 3.5.2 - make-assembly - package + package - single + shade + + true + + + ** + + + + + *:* + + META-INF/*.SF + META-INF/*.DSA + META-INF/*.RSA + META-INF/versions/** + + + + net.alchim31.maven diff --git a/lakesoul-spark/pom.xml b/lakesoul-spark/pom.xml index 70eb53793..ff20be2c8 100644 --- a/lakesoul-spark/pom.xml +++ b/lakesoul-spark/pom.xml @@ -58,6 +58,12 @@ SPDX-License-Identifier: Apache-2.0 com.dmetasoul lakesoul-common ${revision} + + + com.fasterxml.jackson.core + * + + com.dmetasoul @@ -84,6 +90,14 @@ SPDX-License-Identifier: Apache-2.0 io.netty netty-common + + com.fasterxml.jackson.core + * + + + org.antlr + * + @@ -211,7 +225,7 @@ SPDX-License-Identifier: Apache-2.0 com.fasterxml.jackson.core - jackson-core + * @@ -226,6 +240,10 @@ SPDX-License-Identifier: Apache-2.0 com.google.protobuf protobuf-java + + com.fasterxml.jackson.core + * + @@ -284,7 +302,7 @@ SPDX-License-Identifier: Apache-2.0 org.apache.hadoop hadoop-aws - 3.3.3 + 3.3.6 provided @@ -296,49 +314,49 @@ SPDX-License-Identifier: Apache-2.0 - - io.glutenproject - backends-velox - ${gluten.version} - ${local.scope} - - - io.glutenproject - gluten-core - ${gluten.version} - - - io.glutenproject - spark-sql-columnar-shims-spark32 - - - ${local.scope} - - - io.glutenproject - gluten-core - ${gluten.version} - - - io.glutenproject - spark-sql-columnar-shims-spark32 - - - tests - test - - - io.glutenproject - gluten-data - ${gluten.version} - ${local.scope} - - - io.glutenproject - spark-sql-columnar-shims-common - ${gluten.version} - ${local.scope} - + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + @@ -473,6 +491,7 @@ SPDX-License-Identifier: Apache-2.0 com.github.jnr:* org.ow2.asm:* org.apache.arrow:* + org.eclipse.collections:* com.google.flatbuffers:* com.zaxxer:HikariCP org.postgresql:postgresql @@ -497,9 +516,12 @@ SPDX-License-Identifier: Apache-2.0 - org.apache.arrow:arrow-c-data + *:* - org/apache/arrow/c/jni/JniLoader.class + META-INF/*.SF + META-INF/*.DSA + META-INF/*.RSA + META-INF/versions/** @@ -540,6 +562,38 @@ SPDX-License-Identifier: Apache-2.0 com.google.protobuf com.lakesoul.shaded.com.google.protobuf + + dev.failsafe + com.lakesoul.shaded.dev.failsafe + + + org.aspectj + com.lakesoul.shaded.org.aspectj + + + org.checkerframework + com.lakesoul.shaded.org.checkerframework + + + io.substrait + com.lakesoul.shaded.io.substrait + + + org.stringtemplate + com.lakesoul.shaded.org.stringtemplate + + + org.abego + com.lakesoul.shaded.org.abego + + + org.ow2.asm + com.lakesoul.shaded.org.ow2.asm + + + org.objectweb.asm + com.lakesoul.shaded.org.objectweb.asm + diff --git a/lakesoul-spark/src/main/scala/com/dmetasoul/lakesoul/meta/MetaCommit.scala b/lakesoul-spark/src/main/scala/com/dmetasoul/lakesoul/meta/MetaCommit.scala index 1ec14acb1..98429cd4f 100644 --- a/lakesoul-spark/src/main/scala/com/dmetasoul/lakesoul/meta/MetaCommit.scala +++ b/lakesoul-spark/src/main/scala/com/dmetasoul/lakesoul/meta/MetaCommit.scala @@ -30,7 +30,7 @@ object MetaCommit extends Logging { tableInfo.setTableId(table_info.table_id) tableInfo.setTableNamespace(table_info.namespace) - tableInfo.setTablePath(table_info.table_path.toString) + tableInfo.setTablePath(table_info.table_path.toUri.toString) tableInfo.setTableSchema(table_info.table_schema) tableInfo.setPartitions(DBUtil.formatTableInfoPartitionsField(table_info.hash_column, table_info.range_column)) val json = new JSONObject() @@ -75,7 +75,7 @@ object MetaCommit extends Logging { } if (!result) { throw LakeSoulErrors.commitFailedReachLimit( - meta_info.table_info.table_path.toString, + meta_info.table_info.table_path.toUri.toString, "", MetaUtils.MAX_COMMIT_ATTEMPTS) } diff --git a/lakesoul-spark/src/main/scala/com/dmetasoul/lakesoul/meta/SparkMetaVersion.scala b/lakesoul-spark/src/main/scala/com/dmetasoul/lakesoul/meta/SparkMetaVersion.scala index 8ad706bba..9854b7fe6 100644 --- a/lakesoul-spark/src/main/scala/com/dmetasoul/lakesoul/meta/SparkMetaVersion.scala +++ b/lakesoul-spark/src/main/scala/com/dmetasoul/lakesoul/meta/SparkMetaVersion.scala @@ -84,7 +84,7 @@ object SparkMetaVersion { } def getTableInfo(namespace: String, table_path: String): TableInfo = { - val path = SparkUtil.makeQualifiedPath(table_path).toString + val path = SparkUtil.makeQualifiedPath(table_path).toUri.toString val info = dbManager.getTableInfoByPath(path) if (info == null) { return null diff --git a/lakesoul-spark/src/main/scala/com/dmetasoul/lakesoul/tables/LakeSoulTable.scala b/lakesoul-spark/src/main/scala/com/dmetasoul/lakesoul/tables/LakeSoulTable.scala index c630548a9..7e0e82df5 100644 --- a/lakesoul-spark/src/main/scala/com/dmetasoul/lakesoul/tables/LakeSoulTable.scala +++ b/lakesoul-spark/src/main/scala/com/dmetasoul/lakesoul/tables/LakeSoulTable.scala @@ -471,7 +471,7 @@ object LakeSoulTable { if (path.equals("")) { SnapshotManagement.clearCache() } else { - val p = SparkUtil.makeQualifiedTablePath(new Path(path)).toString + val p = SparkUtil.makeQualifiedTablePath(new Path(path)).toUri.toString if (!LakeSoulSourceUtils.isLakeSoulTableExists(p)) { println("table not in lakesoul. Please check table path") return @@ -517,7 +517,7 @@ object LakeSoulTable { * Create a LakeSoulTableRel for the data at the given `path` using the given SparkSession. */ def forPath(sparkSession: SparkSession, path: String): LakeSoulTable = { - val p = SparkUtil.makeQualifiedTablePath(new Path(path)).toString + val p = SparkUtil.makeQualifiedTablePath(new Path(path)).toUri.toString if (LakeSoulUtils.isLakeSoulTable(sparkSession, new Path(p))) { new LakeSoulTable(sparkSession.read.format(LakeSoulSourceUtils.SOURCENAME).load(p), SnapshotManagement(p)) @@ -527,7 +527,7 @@ object LakeSoulTable { } def forPath(sparkSession: SparkSession, path: String, partitionDesc: String, partitionVersion: Int): LakeSoulTable = { - val p = SparkUtil.makeQualifiedTablePath(new Path(path)).toString + val p = SparkUtil.makeQualifiedTablePath(new Path(path)).toUri.toString if (LakeSoulUtils.isLakeSoulTable(sparkSession, new Path(p))) { new LakeSoulTable(sparkSession.read.format(LakeSoulSourceUtils.SOURCENAME).load(p), SnapshotManagement(p, partitionDesc, partitionVersion)) @@ -544,7 +544,7 @@ object LakeSoulTable { val timeZoneID = if (timeZone.equals("") || !TimeZone.getAvailableIDs.contains(timeZone)) TimeZone.getDefault.getID else timeZone val startTime = TimestampFormatter.apply(TimeZone.getTimeZone(timeZoneID)).parse(startTimeStamp) val endTime = TimestampFormatter.apply(TimeZone.getTimeZone(timeZoneID)).parse(endTimeStamp) - val p = SparkUtil.makeQualifiedTablePath(new Path(path)).toString + val p = SparkUtil.makeQualifiedTablePath(new Path(path)).toUri.toString if (LakeSoulUtils.isLakeSoulTable(sparkSession, new Path(p))) { if (endTime < 0) { println("No version found in Table before time") diff --git a/lakesoul-spark/src/main/scala/org/apache/spark/sql/lakesoul/LakeSoulUtils.scala b/lakesoul-spark/src/main/scala/org/apache/spark/sql/lakesoul/LakeSoulUtils.scala index f3a5a2a5d..a8491ca9d 100644 --- a/lakesoul-spark/src/main/scala/org/apache/spark/sql/lakesoul/LakeSoulUtils.scala +++ b/lakesoul-spark/src/main/scala/org/apache/spark/sql/lakesoul/LakeSoulUtils.scala @@ -19,7 +19,7 @@ import org.apache.spark.sql.lakesoul.catalog.{LakeSoulCatalog, LakeSoulTableV2} import org.apache.spark.sql.lakesoul.exception.LakeSoulErrors import org.apache.spark.sql.lakesoul.rules.LakeSoulRelation import org.apache.spark.sql.lakesoul.sources.{LakeSoulBaseRelation, LakeSoulSourceUtils} -import org.apache.spark.sql.lakesoul.utils.TableInfo +import org.apache.spark.sql.lakesoul.utils.{SparkUtil, TableInfo} import org.apache.spark.sql.sources.{EqualTo, Filter, Not} import org.apache.spark.sql.{AnalysisException, SparkSession} import org.apache.spark.util.Utils @@ -73,7 +73,7 @@ object LakeSoulUtils extends PredicateHelper { def findTableRootPath(spark: SparkSession, path: Path): Option[Path] = { var current_path = path while (current_path != null) { - if (LakeSoulSourceUtils.isLakeSoulTableExists(current_path.toString)) { + if (LakeSoulSourceUtils.isLakeSoulTableExists(SparkUtil.makeQualifiedTablePath(current_path).toUri.toString)) { return Option(current_path) } current_path = current_path.getParent diff --git a/lakesoul-spark/src/main/scala/org/apache/spark/sql/lakesoul/SnapshotManagement.scala b/lakesoul-spark/src/main/scala/org/apache/spark/sql/lakesoul/SnapshotManagement.scala index 7e1d0a328..b16f18238 100644 --- a/lakesoul-spark/src/main/scala/org/apache/spark/sql/lakesoul/SnapshotManagement.scala +++ b/lakesoul-spark/src/main/scala/org/apache/spark/sql/lakesoul/SnapshotManagement.scala @@ -161,7 +161,7 @@ object SnapshotManagement { def forTable(spark: SparkSession, tableName: TableIdentifier): SnapshotManagement = { val path = LakeSoulSourceUtils.getLakeSoulPathByTableIdentifier(tableName) - apply(new Path(path.getOrElse(SparkUtil.getDefaultTablePath(tableName).toString))) + apply(new Path(path.getOrElse(SparkUtil.getDefaultTablePath(tableName).toUri.toString))) } def forTable(dataPath: File): SnapshotManagement = { @@ -176,7 +176,7 @@ object SnapshotManagement { def apply(path: String, namespace: String): SnapshotManagement = { try { - val qualifiedPath = SparkUtil.makeQualifiedTablePath(new Path(path)).toString + val qualifiedPath = SparkUtil.makeQualifiedTablePath(new Path(path)).toUri.toString snapshotManagementCache.get(qualifiedPath, () => { AnalysisHelper.allowInvokingTransformsInAnalyzer { new SnapshotManagement(qualifiedPath, namespace) @@ -190,7 +190,7 @@ object SnapshotManagement { //no cache just for snapshot def apply(path: String, partitionDesc: String, partitionVersion: Long): SnapshotManagement = { - val qualifiedPath = SparkUtil.makeQualifiedTablePath(new Path(path)).toString + val qualifiedPath = SparkUtil.makeQualifiedTablePath(new Path(path)).toUri.toString if (LakeSoulSourceUtils.isLakeSoulTableExists(qualifiedPath)) { val sm = apply(qualifiedPath) sm.updateSnapshotForVersion(partitionDesc, 0, partitionVersion, ReadType.SNAPSHOT_READ) @@ -201,7 +201,7 @@ object SnapshotManagement { } def apply(path: String, partitionDesc: String, startPartitionVersion: Long, endPartitionVersion: Long, readType: String): SnapshotManagement = { - val qualifiedPath = SparkUtil.makeQualifiedTablePath(new Path(path)).toString + val qualifiedPath = SparkUtil.makeQualifiedTablePath(new Path(path)).toUri.toString if (LakeSoulSourceUtils.isLakeSoulTableExists(qualifiedPath)) { val sm = apply(qualifiedPath) sm.updateSnapshotForVersion(partitionDesc, startPartitionVersion, endPartitionVersion, readType) @@ -212,7 +212,7 @@ object SnapshotManagement { } def invalidateCache(path: String): Unit = { - val qualifiedPath = SparkUtil.makeQualifiedTablePath(new Path(path)).toString + val qualifiedPath = SparkUtil.makeQualifiedTablePath(new Path(path)).toUri.toString snapshotManagementCache.invalidate(qualifiedPath) } diff --git a/lakesoul-spark/src/main/scala/org/apache/spark/sql/lakesoul/TransactionalWrite.scala b/lakesoul-spark/src/main/scala/org/apache/spark/sql/lakesoul/TransactionalWrite.scala index 617545fe2..2866ac3ea 100644 --- a/lakesoul-spark/src/main/scala/org/apache/spark/sql/lakesoul/TransactionalWrite.scala +++ b/lakesoul-spark/src/main/scala/org/apache/spark/sql/lakesoul/TransactionalWrite.scala @@ -35,7 +35,7 @@ trait TransactionalWrite { protected var hasWritten = false protected def getCommitter(outputPath: Path): DelayedCommitProtocol = - new DelayedCommitProtocol("lakesoul", outputPath.toString, None) + new DelayedCommitProtocol("lakesoul", outputPath.toUri.toString, None) /** * Normalize the schema of the query, and return the QueryExecution to execute. The output @@ -127,7 +127,7 @@ trait TransactionalWrite { val hashPartitionSchema = tableInfo.hash_partition_schema var outputPath = SparkUtil.makeQualifiedTablePath(tableInfo.table_path) if (isCompaction) { - outputPath = SparkUtil.makeQualifiedTablePath(new Path(tableInfo.table_path.toString + "/compact_" + System.currentTimeMillis())) + outputPath = SparkUtil.makeQualifiedTablePath(new Path(tableInfo.table_path.toUri.toString + "/compact_" + System.currentTimeMillis())) } val dc = if (isCompaction) { val cdcCol = snapshot.getTableInfo.configuration.get(LakeSoulTableProperties.lakeSoulCDCChangePropKey) @@ -182,7 +182,7 @@ trait TransactionalWrite { SQLExecution.withNewExecutionId(queryExecution) { val outputSpec = LakeSoulFileWriter.OutputSpec( - outputPath.toString, + outputPath.toUri.toString, Map.empty, output) diff --git a/lakesoul-spark/src/main/scala/org/apache/spark/sql/lakesoul/catalog/LakeSoulCatalog.scala b/lakesoul-spark/src/main/scala/org/apache/spark/sql/lakesoul/catalog/LakeSoulCatalog.scala index f3db803af..032c48f69 100644 --- a/lakesoul-spark/src/main/scala/org/apache/spark/sql/lakesoul/catalog/LakeSoulCatalog.scala +++ b/lakesoul-spark/src/main/scala/org/apache/spark/sql/lakesoul/catalog/LakeSoulCatalog.scala @@ -114,7 +114,7 @@ class LakeSoulCatalog(val spark: SparkSession) extends TableCatalog DataSourceUtils.checkFieldNames(new ParquetFileFormat(), tableDesc.schema) CreateTableCommand( withDb, - existingLocation.map(SparkUtil.makeQualifiedPath(_).toString), + existingLocation.map(SparkUtil.makeQualifiedPath(_).toUri.toString), operation.mode, sourceQuery, operation, @@ -138,7 +138,7 @@ class LakeSoulCatalog(val spark: SparkSession) extends TableCatalog spark, new Path(ident.name()), None, - Some(Identifier.of(ident.namespace(), tableInfo.short_table_name.getOrElse(tableInfo.table_path.toString)).toString) + Some(Identifier.of(ident.namespace(), tableInfo.short_table_name.getOrElse(tableInfo.table_path.toUri.toString)).toString) ) } else if (isNameIdentifier(ident)) { val tablePath = SparkMetaVersion.getTablePathFromShortTableName(ident.name, ident.namespace().mkString(".")) @@ -615,7 +615,7 @@ object LakeSoulCatalog { def listTables(namespaces: Array[String]): Array[Identifier] = { SparkMetaVersion.listTables(namespaces).asScala.map(tablePath => { val tableInfo = SparkMetaVersion.getTableInfo(tablePath) - Identifier.of(namespaces, tableInfo.short_table_name.getOrElse(tableInfo.table_path.toString)) + Identifier.of(namespaces, tableInfo.short_table_name.getOrElse(tableInfo.table_path.toUri.toString)) }).toArray } diff --git a/lakesoul-spark/src/main/scala/org/apache/spark/sql/lakesoul/catalog/LakeSoulTableV2.scala b/lakesoul-spark/src/main/scala/org/apache/spark/sql/lakesoul/catalog/LakeSoulTableV2.scala index d35dda5d7..5b070a6de 100644 --- a/lakesoul-spark/src/main/scala/org/apache/spark/sql/lakesoul/catalog/LakeSoulTableV2.scala +++ b/lakesoul-spark/src/main/scala/org/apache/spark/sql/lakesoul/catalog/LakeSoulTableV2.scala @@ -58,7 +58,7 @@ case class LakeSoulTableV2(spark: SparkSession, // Fast path for reducing path munging overhead (SparkUtil.makeQualifiedTablePath(new Path(catalogTable.get.location)), Nil) } else { - LakeSoulDataSource.parsePathIdentifier(spark, path.toString) + LakeSoulDataSource.parsePathIdentifier(spark, path.toUri.toString) } } @@ -102,7 +102,7 @@ case class LakeSoulTableV2(spark: SparkSession, } } base.put(TableCatalog.PROP_PROVIDER, "lakesoul") - base.put(TableCatalog.PROP_LOCATION, CatalogUtils.URIToString(path.toUri)) + base.put(TableCatalog.PROP_LOCATION, path.toUri.toString) // Option(snapshot.getTableInfo.description).foreach(base.put(TableCatalog.PROP_COMMENT, _)) base } diff --git a/lakesoul-spark/src/main/scala/org/apache/spark/sql/lakesoul/commands/CreateTableCommand.scala b/lakesoul-spark/src/main/scala/org/apache/spark/sql/lakesoul/commands/CreateTableCommand.scala index ec06d5887..0f6e5b3a2 100644 --- a/lakesoul-spark/src/main/scala/org/apache/spark/sql/lakesoul/commands/CreateTableCommand.scala +++ b/lakesoul-spark/src/main/scala/org/apache/spark/sql/lakesoul/commands/CreateTableCommand.scala @@ -68,12 +68,12 @@ case class CreateTableCommand(var table: CatalogTable, assert(existingTablePath.isDefined) val existingPath = existingTablePath.get table.storage.locationUri match { - case Some(location) if SparkUtil.makeQualifiedPath(location.getPath).toString != existingPath => + case Some(location) if SparkUtil.makeQualifiedPath(location.getPath).toUri.toString != existingPath => val tableName = table.identifier.quotedString throw new AnalysisException( s"The location of the existing table $tableName is " + s"`$existingPath`. It doesn't match the specified location " + - s"`${SparkUtil.makeQualifiedPath(location.getPath).toString}`.") + s"`${SparkUtil.makeQualifiedPath(location.getPath).toUri.toString}`.") case _ => table.copy(storage = table.storage.copy(locationUri = Some(new URI(existingPath)))) } @@ -103,7 +103,7 @@ case class CreateTableCommand(var table: CatalogTable, table.storage.properties ++ externalOptions, sparkSession.sessionState.conf) - val snapshotManagement = SnapshotManagement(modifiedPath.toString, table.database) + val snapshotManagement = SnapshotManagement(modifiedPath.toUri.toString, table.database) // don't support replace table operation match { diff --git a/lakesoul-spark/src/main/scala/org/apache/spark/sql/lakesoul/kafka/KafkaStream.scala b/lakesoul-spark/src/main/scala/org/apache/spark/sql/lakesoul/kafka/KafkaStream.scala index 689350e9c..60587e5ad 100644 --- a/lakesoul-spark/src/main/scala/org/apache/spark/sql/lakesoul/kafka/KafkaStream.scala +++ b/lakesoul-spark/src/main/scala/org/apache/spark/sql/lakesoul/kafka/KafkaStream.scala @@ -47,7 +47,7 @@ object KafkaStream { val tableName = info._1 val schema = info._2.json val path = warehouse + "/" + namespace + "/" + tableName - val tablePath = SparkUtil.makeQualifiedTablePath(new Path(path)).toString + val tablePath = SparkUtil.makeQualifiedTablePath(new Path(path)).toUri.toString val tableExists = dbManager.isTableExistsByTableName(tableName, namespace) if (!tableExists) { val tableId = KAFKA_TABLE_PREFIX + UUID.randomUUID().toString @@ -154,7 +154,7 @@ object KafkaStream { for (topic <- topicAndSchema.keySet) { val path = warehouse + "/" + namespace + "/" + topic - val tablePath = SparkUtil.makeQualifiedTablePath(new Path(path)).toString + val tablePath = SparkUtil.makeQualifiedTablePath(new Path(path)).toUri.toString val topicDF = batchDF.filter(col("topic").equalTo(topic)) if (!topicDF.rdd.isEmpty()) { val rows = topicDF.withColumn("payload", from_json(col("value"), topicAndSchema.get(topic).get)) diff --git a/lakesoul-spark/src/main/scala/org/apache/spark/sql/lakesoul/schema/ImplicitMetadataOperation.scala b/lakesoul-spark/src/main/scala/org/apache/spark/sql/lakesoul/schema/ImplicitMetadataOperation.scala index 579613cec..7673ca54e 100644 --- a/lakesoul-spark/src/main/scala/org/apache/spark/sql/lakesoul/schema/ImplicitMetadataOperation.scala +++ b/lakesoul-spark/src/main/scala/org/apache/spark/sql/lakesoul/schema/ImplicitMetadataOperation.scala @@ -145,7 +145,7 @@ trait ImplicitMetadataOperation extends Logging { tc.updateTableInfo( TableInfo( namespace = table_info.namespace, - table_path_s = Option(SparkUtil.makeQualifiedTablePath(new Path(table_info.table_path_s.get)).toString), + table_path_s = Option(SparkUtil.makeQualifiedTablePath(new Path(table_info.table_path_s.get)).toUri.toString), table_id = table_info.table_id, table_schema = ArrowUtils.toArrowSchema(dataSchema).toJson, range_column = normalizedRangePartitionCols.mkString(LAKESOUL_RANGE_PARTITION_SPLITTER), diff --git a/lakesoul-spark/src/main/scala/org/apache/spark/sql/lakesoul/utils/SparkUtil.scala b/lakesoul-spark/src/main/scala/org/apache/spark/sql/lakesoul/utils/SparkUtil.scala index d08113411..5888612fa 100644 --- a/lakesoul-spark/src/main/scala/org/apache/spark/sql/lakesoul/utils/SparkUtil.scala +++ b/lakesoul-spark/src/main/scala/org/apache/spark/sql/lakesoul/utils/SparkUtil.scala @@ -53,7 +53,17 @@ object SparkUtil { def makeQualifiedTablePath(tablePath: Path): Path = { val spark = SparkSession.active - tablePath.getFileSystem(spark.sessionState.newHadoopConf()).makeQualified(tablePath) + val uri = tablePath.toUri.toString + if (uri.startsWith("file:///")) { + tablePath + } else if (uri.startsWith("file:/")) { + // make local file path always starts with file:/// + tablePath.getFileSystem(spark.sessionState.newHadoopConf()).makeQualified( + new Path(uri.substring(5)) + ) + } else { + tablePath.getFileSystem(spark.sessionState.newHadoopConf()).makeQualified(tablePath) + } } def makeQualifiedPath(tablePath: String): Path = { diff --git a/lakesoul-spark/src/test/scala/com/dmetasoul/lakesoul/tables/LakeSoulTableSuite.scala b/lakesoul-spark/src/test/scala/com/dmetasoul/lakesoul/tables/LakeSoulTableSuite.scala index c5b5296a3..9dba502ea 100644 --- a/lakesoul-spark/src/test/scala/com/dmetasoul/lakesoul/tables/LakeSoulTableSuite.scala +++ b/lakesoul-spark/src/test/scala/com/dmetasoul/lakesoul/tables/LakeSoulTableSuite.scala @@ -102,14 +102,14 @@ class LakeSoulTableSuite extends QueryTest test("isLakeSoulTable - path") { withTempDir { dir => testData.write.format("lakesoul").save(dir.getAbsolutePath) - assert(LakeSoulUtils.isLakeSoulTable(SparkUtil.makeQualifiedTablePath(new Path(dir.getAbsolutePath)).toString)) + assert(LakeSoulUtils.isLakeSoulTable(SparkUtil.makeQualifiedTablePath(new Path(dir.getAbsolutePath)).toUri.toString)) } } test("isLakeSoulTable - with non-LakeSoul table path") { withTempDir { dir => testData.write.format("parquet").mode("overwrite").save(dir.getAbsolutePath) - assert(!LakeSoulUtils.isLakeSoulTable(SparkUtil.makeQualifiedTablePath(new Path(dir.getAbsolutePath)).toString)) + assert(!LakeSoulUtils.isLakeSoulTable(SparkUtil.makeQualifiedTablePath(new Path(dir.getAbsolutePath)).toUri.toString)) } } diff --git a/lakesoul-spark/src/test/scala/org/apache/spark/sql/execution/datasource/ParquetScanSuite.scala b/lakesoul-spark/src/test/scala/org/apache/spark/sql/execution/datasource/ParquetScanSuite.scala index c36296760..e515ea281 100644 --- a/lakesoul-spark/src/test/scala/org/apache/spark/sql/execution/datasource/ParquetScanSuite.scala +++ b/lakesoul-spark/src/test/scala/org/apache/spark/sql/execution/datasource/ParquetScanSuite.scala @@ -65,7 +65,7 @@ class ParquetScanSuite extends QueryTest test("It should use MultiPartitionMergeScan when reading multi partition") { withTempDir(dir => { - val tablePath = SparkUtil.makeQualifiedTablePath(new Path(dir.getCanonicalPath)).toString + val tablePath = SparkUtil.makeQualifiedTablePath(new Path(dir.getCanonicalPath)).toUri.toString Seq((20201101, 1, 1), (20201101, 2, 2), (20201101, 3, 3), (20201102, 1, 1)) .toDF("range", "hash", "value") .write diff --git a/lakesoul-spark/src/test/scala/org/apache/spark/sql/lakesoul/CDCSuite.scala b/lakesoul-spark/src/test/scala/org/apache/spark/sql/lakesoul/CDCSuite.scala index 0e1636865..7d3d6f322 100644 --- a/lakesoul-spark/src/test/scala/org/apache/spark/sql/lakesoul/CDCSuite.scala +++ b/lakesoul-spark/src/test/scala/org/apache/spark/sql/lakesoul/CDCSuite.scala @@ -55,7 +55,7 @@ class CDCSuite } protected def getDefaultTablePath(tableName: String): String = { - SparkUtil.getDefaultTablePath(TableIdentifier(tableName, Some("default"))).toString + SparkUtil.getDefaultTablePath(TableIdentifier(tableName, Some("default"))).toUri.toString } protected def getPartitioningColumns(tableName: String): Seq[String] = { @@ -77,7 +77,7 @@ class CDCSuite test(s"test cdc with MultiPartitionMergeScan(native_io_enabled=$nativeIOEnabled) ") { withTable("tt") { withTempDir(dir => { - val tablePath = SparkUtil.makeQualifiedTablePath(new Path(dir.getCanonicalPath)).toString + val tablePath = SparkUtil.makeQualifiedTablePath(new Path(dir.getCanonicalPath)).toUri.toString withSQLConf( LakeSoulSQLConf.NATIVE_IO_ENABLE.key -> nativeIOEnabled) { Seq(("range1", "hash1", "insert"), ("range2", "hash2", "insert"), ("range3", "hash2", "insert"), ("range4", "hash2", "insert"), ("range4", "hash4", "insert"), ("range3", "hash3", "insert")) @@ -105,7 +105,7 @@ class CDCSuite test(s"test cdc with OnePartitionMergeBucketScan(native_io_enabled=$nativeIOEnabled) ") { withTable("tt") { withTempDir(dir => { - val tablePath = SparkUtil.makeQualifiedTablePath(new Path(dir.getCanonicalPath)).toString + val tablePath = SparkUtil.makeQualifiedTablePath(new Path(dir.getCanonicalPath)).toUri.toString withSQLConf( LakeSoulSQLConf.NATIVE_IO_ENABLE.key -> nativeIOEnabled) { Seq(("range1", "hash1", "insert"), ("range1", "hash2", "insert"), ("range1", "hash3", "insert"), ("range1", "hash4", "insert"), ("range1", "hash5", "insert")) @@ -133,7 +133,7 @@ class CDCSuite test(s"test cdc with MultiPartitionMergeBucketScan(native_io_enabled=$nativeIOEnabled)") { withTable("tt") { withTempDir(dir => { - val tablePath = SparkUtil.makeQualifiedTablePath(new Path(dir.getCanonicalPath)).toString + val tablePath = SparkUtil.makeQualifiedTablePath(new Path(dir.getCanonicalPath)).toUri.toString withSQLConf( LakeSoulSQLConf.BUCKET_SCAN_MULTI_PARTITION_ENABLE.key -> "true", LakeSoulSQLConf.NATIVE_IO_ENABLE.key -> nativeIOEnabled) { @@ -167,7 +167,7 @@ class CDCSuite test("test cdc with snapshot") { withTable("tt") { withTempDir(dir => { - val tablePath = SparkUtil.makeQualifiedTablePath(new Path(dir.getCanonicalPath)).toString + val tablePath = SparkUtil.makeQualifiedTablePath(new Path(dir.getCanonicalPath)).toUri.toString withSQLConf( LakeSoulSQLConf.BUCKET_SCAN_MULTI_PARTITION_ENABLE.key -> "true") { Seq(("range1", "hash1-1", "insert"), ("range2", "hash2-1", "insert")) @@ -218,7 +218,7 @@ class CDCSuite test("test cdc with incremental") { withTable("tt") { withTempDir(dir => { - val tablePath = SparkUtil.makeQualifiedTablePath(new Path(dir.getCanonicalPath)).toString + val tablePath = SparkUtil.makeQualifiedTablePath(new Path(dir.getCanonicalPath)).toUri.toString withSQLConf( LakeSoulSQLConf.BUCKET_SCAN_MULTI_PARTITION_ENABLE.key -> "true") { Seq(("range1", "hash1-1", "insert"), ("range2", "hash2-1", "insert")) diff --git a/lakesoul-spark/src/test/scala/org/apache/spark/sql/lakesoul/DDLSuite.scala b/lakesoul-spark/src/test/scala/org/apache/spark/sql/lakesoul/DDLSuite.scala index b2ac0485a..e0d8e6581 100644 --- a/lakesoul-spark/src/test/scala/org/apache/spark/sql/lakesoul/DDLSuite.scala +++ b/lakesoul-spark/src/test/scala/org/apache/spark/sql/lakesoul/DDLSuite.scala @@ -78,7 +78,7 @@ abstract class DDLTestBase extends QueryTest with SQLTestUtils { val location = LakeSoulSourceUtils.getLakeSoulPathByTableIdentifier( TableIdentifier("lakesoul_test", Some("default"))) assert(location.isDefined) - assert(location.get == SparkUtil.makeQualifiedPath(dir.getAbsolutePath).toString) + assert(location.get == SparkUtil.makeQualifiedPath(dir.getAbsolutePath).toUri.toString) Seq((1L, "a")).toDF("a", "b") .write.format("lakesoul").mode("append").save(location.get) @@ -231,7 +231,7 @@ abstract class DDLTestBase extends QueryTest with SQLTestUtils { val location = LakeSoulSourceUtils.getLakeSoulPathByTableIdentifier( TableIdentifier("lakesoul_test", Some("default"))) assert(location.isDefined) - assert(location.get == SparkUtil.makeQualifiedPath(dir.getAbsolutePath).toString) + assert(location.get == SparkUtil.makeQualifiedPath(dir.getAbsolutePath).toUri.toString) val schema = new StructType() .add("x", @@ -437,7 +437,7 @@ abstract class DDLTestBase extends QueryTest with SQLTestUtils { sql(s"CREATE TABLE lakesoul_test USING lakesoul LOCATION '$path'") verifyDescribeTable("lakesoul_test") - verifyDescribeTable(s"lakesoul.`$path`") + verifyDescribeTable(s"lakesoul.`${SparkUtil.makeQualifiedPath(path)}`") } } } diff --git a/lakesoul-spark/src/test/scala/org/apache/spark/sql/lakesoul/ReadSuite.scala b/lakesoul-spark/src/test/scala/org/apache/spark/sql/lakesoul/ReadSuite.scala index 03e8020ac..b8f032ebb 100644 --- a/lakesoul-spark/src/test/scala/org/apache/spark/sql/lakesoul/ReadSuite.scala +++ b/lakesoul-spark/src/test/scala/org/apache/spark/sql/lakesoul/ReadSuite.scala @@ -63,7 +63,7 @@ class ReadSuite extends QueryTest test("test snapshot read with OnePartition") { withTable("tt") { withTempDir(dir => { - val tablePath = SparkUtil.makeQualifiedTablePath(new Path(dir.getCanonicalPath)).toString + val tablePath = SparkUtil.makeQualifiedTablePath(new Path(dir.getCanonicalPath)).toUri.toString Seq(("range1", "hash1-1", "insert"), ("range2", "hash2-1", "insert")) .toDF("range", "hash", "op") .write @@ -112,7 +112,7 @@ class ReadSuite extends QueryTest test("test snapshot read with MultiPartition") { withTable("tt") { withTempDir(dir => { - val tablePath = SparkUtil.makeQualifiedTablePath(new Path(dir.getCanonicalPath)).toString + val tablePath = SparkUtil.makeQualifiedTablePath(new Path(dir.getCanonicalPath)).toUri.toString Seq((1, "range1", "hash1-1", "insert"), (2, "range2", "hash2-1", "insert")) .toDF("id", "range", "hash", "op") .write @@ -160,7 +160,7 @@ class ReadSuite extends QueryTest test("test snapshot read without Partition") { withTable("tt") { withTempDir(dir => { - val tablePath = SparkUtil.makeQualifiedTablePath(new Path(dir.getCanonicalPath)).toString + val tablePath = SparkUtil.makeQualifiedTablePath(new Path(dir.getCanonicalPath)).toUri.toString Seq(("range1", "hash1-1", "insert"), ("range2", "hash2-1", "insert")) .toDF("range", "hash", "op") .write @@ -202,7 +202,7 @@ class ReadSuite extends QueryTest test("test incremental read with OnePartition") { withTable("tt") { withTempDir(dir => { - val tablePath = SparkUtil.makeQualifiedTablePath(new Path(dir.getCanonicalPath)).toString + val tablePath = SparkUtil.makeQualifiedTablePath(new Path(dir.getCanonicalPath)).toUri.toString withSQLConf( LakeSoulSQLConf.BUCKET_SCAN_MULTI_PARTITION_ENABLE.key -> "true") { Seq(("range1", "hash1-1", "insert"), ("range2", "hash2-1", "insert")) @@ -261,7 +261,7 @@ class ReadSuite extends QueryTest test("test incremental read with MultiPartition") { withTable("tt") { withTempDir(dir => { - val tablePath = SparkUtil.makeQualifiedTablePath(new Path(dir.getCanonicalPath)).toString + val tablePath = SparkUtil.makeQualifiedTablePath(new Path(dir.getCanonicalPath)).toUri.toString Seq((1, "range1", "hash1-1", "insert"), (2, "range2", "hash2-1", "insert")) .toDF("id", "range", "hash", "op") .write @@ -308,7 +308,7 @@ class ReadSuite extends QueryTest test("test incremental read without Partition") { withTable("tt") { withTempDir(dir => { - val tablePath = SparkUtil.makeQualifiedTablePath(new Path(dir.getCanonicalPath)).toString + val tablePath = SparkUtil.makeQualifiedTablePath(new Path(dir.getCanonicalPath)).toUri.toString withSQLConf( LakeSoulSQLConf.BUCKET_SCAN_MULTI_PARTITION_ENABLE.key -> "true") { Seq(("range1", "hash1-1", "insert"), ("range2", "hash2-1", "insert")) @@ -358,7 +358,7 @@ class ReadSuite extends QueryTest override def run(): Unit = { withTable("tt") { withTempDir(dir => { - val tablePath = SparkUtil.makeQualifiedTablePath(new Path(dir.getCanonicalPath)).toString + val tablePath = SparkUtil.makeQualifiedTablePath(new Path(dir.getCanonicalPath)).toUri.toString Seq((1, "range1", "hash1-1", "insert"), (2, "range2", "hash2-1", "insert")) .toDF("id", "range", "hash", "op") .write @@ -395,7 +395,7 @@ class ReadSuite extends QueryTest override def run(): Unit = { withTable("tt") { withTempDir(dir => { - val tablePath = SparkUtil.makeQualifiedTablePath(new Path(dir.getCanonicalPath)).toString + val tablePath = SparkUtil.makeQualifiedTablePath(new Path(dir.getCanonicalPath)).toUri.toString Seq((1, "range1", "hash1-1", "insert"), (2, "range2", "hash2-1", "insert")) .toDF("id", "range", "hash", "op") .write @@ -432,7 +432,7 @@ class ReadSuite extends QueryTest override def run(): Unit = { withTable("tt") { withTempDir(dir => { - val tablePath = SparkUtil.makeQualifiedTablePath(new Path(dir.getCanonicalPath)).toString + val tablePath = SparkUtil.makeQualifiedTablePath(new Path(dir.getCanonicalPath)).toUri.toString Seq((1, "range1", "hash1-1", "insert")) .toDF("id", "range", "hash", "op") .write diff --git a/lakesoul-spark/src/test/scala/org/apache/spark/sql/lakesoul/TableCreationTests.scala b/lakesoul-spark/src/test/scala/org/apache/spark/sql/lakesoul/TableCreationTests.scala index 3f1ca40c6..e0f68fbe0 100644 --- a/lakesoul-spark/src/test/scala/org/apache/spark/sql/lakesoul/TableCreationTests.scala +++ b/lakesoul-spark/src/test/scala/org/apache/spark/sql/lakesoul/TableCreationTests.scala @@ -74,7 +74,7 @@ trait TableCreationTests } protected def getDefaultTablePath(tableName: String): String = { - SparkUtil.getDefaultTablePath(TableIdentifier(tableName, Some("default"))).toString + SparkUtil.getDefaultTablePath(TableIdentifier(tableName, Some("default"))).toUri.toString } protected def getPartitioningColumns(tableName: String): Seq[String] = { @@ -150,7 +150,7 @@ trait TableCreationTests .saveAsTable(tbl) checkDatasetUnorderly(spark.table(tbl).as[(Long, String)], 1L -> "a") - assert(getTablePath(tbl) === new Path(dir.toURI).toString.stripSuffix("/"), + assert(getTablePath(tbl) === SparkUtil.makeQualifiedPath(dir.getAbsolutePath).toUri.toString.stripSuffix("/"), "Table path is wrong") assert(getPartitioningColumns(tbl) === cols, "Partitioning columns don't match") } @@ -171,7 +171,7 @@ trait TableCreationTests .saveAsTable(tbl) checkDatasetUnorderly(spark.table(tbl).as[(Long, String)]) - assert(getTablePath(tbl) === new Path(dir.toURI).toString.stripSuffix("/"), + assert(getTablePath(tbl) === SparkUtil.makeQualifiedPath(dir.getAbsolutePath).toUri.toString.stripSuffix("/"), "Table path is wrong") assert(getPartitioningColumns(tbl) === cols, "Partitioning columns don't match") } @@ -778,7 +778,7 @@ trait TableCreationTests val path = LakeSoulSourceUtils.getLakeSoulPathByTableIdentifier(TableIdentifier("t", Some("default"))) assert(path.isDefined) - assert(path.get == SparkUtil.makeQualifiedTablePath(new Path(dir.getAbsolutePath)).toString) + assert(path.get == SparkUtil.makeQualifiedTablePath(new Path(dir.getAbsolutePath)).toUri.toString) val catalog = spark.sessionState.catalogManager.currentCatalog.asInstanceOf[LakeSoulCatalog] val ident = toIdentifier("t") @@ -805,7 +805,7 @@ trait TableCreationTests val ident = toIdentifier("t1") val location = catalog.getTableLocation(ident) assert(location.isDefined) - assert(location.get == SparkUtil.makeQualifiedPath(dir.getAbsolutePath).toString) + assert(location.get == SparkUtil.makeQualifiedPath(dir.getAbsolutePath).toUri.toString) Seq((1, 2)).toDF("a", "b") .write.format("lakesoul").mode("append").save(location.get) @@ -835,7 +835,7 @@ trait TableCreationTests val ident = toIdentifier("t") val location = catalog.getTableLocation(ident) assert(location.isDefined) - assert(location.get == SparkUtil.makeQualifiedPath(dir.getAbsolutePath).toString) + assert(location.get == SparkUtil.makeQualifiedPath(dir.getAbsolutePath).toUri.toString) // Query the data and the metadata directly via the SnapshotManagement val snapshotManagement = getSnapshotManagement(new Path(location.get)) @@ -870,7 +870,7 @@ trait TableCreationTests val ident = toIdentifier("t1") val location = catalog.getTableLocation(ident) assert(location.isDefined) - assert(location.get == SparkUtil.makeQualifiedPath(dir.getAbsolutePath).toString) + assert(location.get == SparkUtil.makeQualifiedPath(dir.getAbsolutePath).toUri.toString) // Query the data and the metadata directly via the SnapshotManagement val snapshotManagement = getSnapshotManagement(new Path(location.get)) @@ -1038,7 +1038,7 @@ trait TableCreationTests .option(LakeSoulOptions.SHORT_TABLE_NAME, "tt") .save(path) - val shortName = SnapshotManagement(SparkUtil.makeQualifiedTablePath(new Path(path)).toString).snapshot.getTableInfo.short_table_name + val shortName = SnapshotManagement(SparkUtil.makeQualifiedTablePath(new Path(path)).toUri.toString).snapshot.getTableInfo.short_table_name assert(shortName.isDefined && shortName.get.equals("tt")) checkAnswer(sql(s"select i,p from $testDatabase.tt"), Seq((1, "a"), (2, "b")).toDF("i", "p")) @@ -1062,7 +1062,7 @@ trait TableCreationTests .format("lakesoul") .save(path) - val sm = SnapshotManagement(SparkUtil.makeQualifiedTablePath(new Path(path)).toString) + val sm = SnapshotManagement(SparkUtil.makeQualifiedTablePath(new Path(path)).toUri.toString) var shortName = sm.snapshot.getTableInfo.short_table_name assert(shortName.isEmpty) sql(s"create table tt using lakesoul location '$path'") @@ -1107,7 +1107,7 @@ trait TableCreationTests .format("lakesoul") .save(path) - val sm = SnapshotManagement(SparkUtil.makeQualifiedTablePath(new Path(path)).toString) + val sm = SnapshotManagement(SparkUtil.makeQualifiedTablePath(new Path(path)).toUri.toString) var shortName = sm.snapshot.getTableInfo.short_table_name assert(shortName.isEmpty) @@ -1158,7 +1158,7 @@ trait TableCreationTests .hashBucketNum(1) .create() - val tableInfo = SnapshotManagement(SparkUtil.makeQualifiedTablePath(new Path(path)).toString).getTableInfoOnly + val tableInfo = SnapshotManagement(SparkUtil.makeQualifiedTablePath(new Path(path)).toUri.toString).getTableInfoOnly assert(tableInfo.short_table_name.get.equals("tt")) assert(tableInfo.range_partition_columns.equals(Seq("i"))) assert(tableInfo.hash_partition_columns.equals(Seq("p"))) @@ -1197,7 +1197,7 @@ trait TableCreationTests val catalog = spark.sessionState.catalogManager.currentCatalog.asInstanceOf[LakeSoulCatalog] val table = catalog.loadTable(toIdentifier("test_table")) assert(table.properties.get("lakesoul_cdc_change_column") == "change_kind") - val tableInfo = SnapshotManagement(SparkUtil.makeQualifiedTablePath(new Path(path)).toString).getTableInfoOnly + val tableInfo = SnapshotManagement(SparkUtil.makeQualifiedTablePath(new Path(path)).toUri.toString).getTableInfoOnly assert(tableInfo.short_table_name.get.equals(tableName)) assert(tableInfo.range_partition_columns.equals(Seq("date"))) assert(tableInfo.hash_partition_columns.equals(Seq("id"))) @@ -1237,7 +1237,7 @@ trait TableCreationTests .tableProperty("lakesoul_cdc_change_column" -> "change_kind") .create() - val tableInfo = SnapshotManagement(SparkUtil.makeQualifiedTablePath(new Path(path)).toString).getTableInfoOnly + val tableInfo = SnapshotManagement(SparkUtil.makeQualifiedTablePath(new Path(path)).toUri.toString).getTableInfoOnly tableInfo.configuration should contain("lakesoul_cdc_change_column" -> "change_kind") }) } @@ -1253,7 +1253,7 @@ trait TableCreationTests .format("lakesoul") .option("lakesoul_cdc_change_column", "change_kind") .save(path) - val tableInfo = SnapshotManagement(SparkUtil.makeQualifiedTablePath(new Path(path)).toString).getTableInfoOnly + val tableInfo = SnapshotManagement(SparkUtil.makeQualifiedTablePath(new Path(path)).toUri.toString).getTableInfoOnly tableInfo.configuration should contain("lakesoul_cdc_change_column" -> "change_kind") }) } diff --git a/lakesoul-spark/src/test/scala/org/apache/spark/sql/lakesoul/benchmark/FlinkWriteDataCheck.scala b/lakesoul-spark/src/test/scala/org/apache/spark/sql/lakesoul/benchmark/FlinkWriteDataCheck.scala index cf39ae2b7..53dfa9219 100644 --- a/lakesoul-spark/src/test/scala/org/apache/spark/sql/lakesoul/benchmark/FlinkWriteDataCheck.scala +++ b/lakesoul-spark/src/test/scala/org/apache/spark/sql/lakesoul/benchmark/FlinkWriteDataCheck.scala @@ -70,8 +70,8 @@ object FlinkWriteDataCheck { val spark = builder.getOrCreate() spark.sparkContext.setLogLevel("ERROR") - val lakeSoulTablePath = SparkUtil.makeQualifiedTablePath(new Path(lakeSoulPath)).toString - val csvTablePath = SparkUtil.makeQualifiedTablePath(new Path(csvPath)).toString + val lakeSoulTablePath = SparkUtil.makeQualifiedTablePath(new Path(lakeSoulPath)).toUri.toString + val csvTablePath = SparkUtil.makeQualifiedTablePath(new Path(csvPath)).toUri.toString val lakeSoulDF = LakeSoulTable.forPath(lakeSoulTablePath).toDF val csvDF = spark.read.schema(lakeSoulDF.schema).format("parquet").load(csvTablePath) diff --git a/lakesoul-spark/src/test/scala/org/apache/spark/sql/lakesoul/commands/AlterTableTests.scala b/lakesoul-spark/src/test/scala/org/apache/spark/sql/lakesoul/commands/AlterTableTests.scala index 918403980..796969afd 100644 --- a/lakesoul-spark/src/test/scala/org/apache/spark/sql/lakesoul/commands/AlterTableTests.scala +++ b/lakesoul-spark/src/test/scala/org/apache/spark/sql/lakesoul/commands/AlterTableTests.scala @@ -1134,7 +1134,7 @@ trait AlterTableByNameTests extends AlterTableTests { sql("ALTER TABLE lakesoul_test ADD COLUMNS (v3 long, v4 double)") - val snapshotManagement = SnapshotManagement(SparkUtil.makeQualifiedTablePath(new Path(path)).toString) + val snapshotManagement = SnapshotManagement(SparkUtil.makeQualifiedTablePath(new Path(path)).toUri.toString) assert(snapshotManagement.updateSnapshot().getTableInfo.schema == new StructType() .add("v1", "integer").add("v2", "string") .add("v3", "long").add("v4", "double")) @@ -1205,7 +1205,7 @@ trait AlterTableByPathTests extends AlterTableLakeSoulTestBase { override protected def getSnapshotManagement(identifier: String): SnapshotManagement = { SnapshotManagement( SparkUtil.makeQualifiedTablePath(new Path(identifier.split("\\.") - .last.stripPrefix("`").stripSuffix("`"))).toString) + .last.stripPrefix("`").stripSuffix("`"))).toUri.toString) } override protected def ddlTest(testName: String)(f: String => Unit): Unit = { diff --git a/lakesoul-spark/src/test/scala/org/apache/spark/sql/lakesoul/commands/CompactionSuite.scala b/lakesoul-spark/src/test/scala/org/apache/spark/sql/lakesoul/commands/CompactionSuite.scala index 627836bb5..d21551c20 100644 --- a/lakesoul-spark/src/test/scala/org/apache/spark/sql/lakesoul/commands/CompactionSuite.scala +++ b/lakesoul-spark/src/test/scala/org/apache/spark/sql/lakesoul/commands/CompactionSuite.scala @@ -27,7 +27,6 @@ class CompactionSuite extends QueryTest override def sparkConf: SparkConf = { super.sparkConf - .set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.ColumnarShuffleManager") .set("spark.network.timeout", "10000000") .set("spark.sql.catalog.lakesoul", classOf[LakeSoulCatalog].getName) .set(SQLConf.DEFAULT_CATALOG.key, LakeSoulCatalog.CATALOG_NAME) diff --git a/lakesoul-spark/src/test/scala/org/apache/spark/sql/lakesoul/commands/DropTableSuite.scala b/lakesoul-spark/src/test/scala/org/apache/spark/sql/lakesoul/commands/DropTableSuite.scala index 37d42f0fe..92243043c 100644 --- a/lakesoul-spark/src/test/scala/org/apache/spark/sql/lakesoul/commands/DropTableSuite.scala +++ b/lakesoul-spark/src/test/scala/org/apache/spark/sql/lakesoul/commands/DropTableSuite.scala @@ -46,7 +46,7 @@ class DropTableSuite extends QueryTest val e1 = intercept[AnalysisException] { LakeSoulTable.forPath(tmpPath) } - assert(e1.getMessage().contains(s"Table ${SparkUtil.makeQualifiedPath(tmpPath).toString} doesn't exist.")) + assert(e1.getMessage().contains(s"Table ${SparkUtil.makeQualifiedPath(tmpPath).toUri.toString} doesn't exist.")) }) } diff --git a/lakesoul-spark/src/test/scala/org/apache/spark/sql/lakesoul/manual_execute_suites/CompactionDoNotChangeResult.scala b/lakesoul-spark/src/test/scala/org/apache/spark/sql/lakesoul/manual_execute_suites/CompactionDoNotChangeResult.scala index 98f927b9c..4b7d5f66b 100644 --- a/lakesoul-spark/src/test/scala/org/apache/spark/sql/lakesoul/manual_execute_suites/CompactionDoNotChangeResult.scala +++ b/lakesoul-spark/src/test/scala/org/apache/spark/sql/lakesoul/manual_execute_suites/CompactionDoNotChangeResult.scala @@ -24,7 +24,7 @@ class CompactionDoNotChangeResult { import spark.implicits._ - val tableName = SparkUtil.makeQualifiedTablePath(new Path(Utils.createTempDir().getCanonicalPath)).toString + val tableName = SparkUtil.makeQualifiedTablePath(new Path(Utils.createTempDir().getCanonicalPath)).toUri.toString try { val allData = TestUtils.getData2(5000, onlyOnePartition) .toDF("hash", "name", "age", "range") diff --git a/lakesoul-spark/src/test/scala/org/apache/spark/sql/lakesoul/rules/LakeSoulGlutenCompatSuite.scala b/lakesoul-spark/src/test/scala/org/apache/spark/sql/lakesoul/rules/LakeSoulGlutenCompatSuite.scala index 1e3832625..28dc66aa0 100644 --- a/lakesoul-spark/src/test/scala/org/apache/spark/sql/lakesoul/rules/LakeSoulGlutenCompatSuite.scala +++ b/lakesoul-spark/src/test/scala/org/apache/spark/sql/lakesoul/rules/LakeSoulGlutenCompatSuite.scala @@ -2,111 +2,111 @@ // // SPDX-License-Identifier: Apache-2.0 -package org.apache.spark.sql.lakesoul.rules - -import com.dmetasoul.lakesoul.tables.LakeSoulTable -import io.glutenproject.execution.WholeStageTransformerSuite -import org.apache.spark.SparkConf -import org.apache.spark.sql.execution.ExtendedMode -import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.sql.lakesoul.catalog.LakeSoulCatalog -import org.apache.spark.util.Utils -import org.junit.runner.RunWith -import org.scalatestplus.junit.JUnitRunner - -import java.io.File - -//@RunWith(classOf[JUnitRunner]) -class LakeSoulGlutenCompatSuite - extends WholeStageTransformerSuite { - - override def sparkConf: SparkConf = { - super.sparkConf - .set("spark.sql.codegen.wholeStage", "false") - .set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.ColumnarShuffleManager") - .set("spark.network.timeout", "10000000") - .set("spark.sql.catalog.lakesoul", classOf[LakeSoulCatalog].getName) - .set(SQLConf.DEFAULT_CATALOG.key, LakeSoulCatalog.CATALOG_NAME) - .set("spark.sql.extensions", "com.dmetasoul.lakesoul.sql.LakeSoulSparkSessionExtension") - } - - import testImplicits._ - - test("lakesoul write scan - nopk") { - withTempDir(dir => { - val tablePath = dir.getCanonicalPath - val df = Seq(("2021-01-01",1,"rice"),("2021-01-01",2,"bread")).toDF("date","id","name") - df.write - .mode("overwrite") - .format("lakesoul") - .option("rangePartitions","date") - .save(tablePath) - val dfRead = spark.read.format("lakesoul").load(tablePath).select("date", "id", "name") - assert(dfRead.queryExecution.explainString(ExtendedMode).contains("InputIteratorTransformer")) - assert(!dfRead.queryExecution.explainString(ExtendedMode).matches(".*\\bColumnarToRow\\b.*")) - checkAnswer(dfRead, Seq(("2021-01-01",1,"rice"),("2021-01-01",2,"bread")).toDF("date", "id", "name")) - }) - } - - test("lakesoul write scan - pk") { - withTempDir(dir => { - val tablePath = dir.getCanonicalPath - val df = Seq(("2021-01-01",1,"rice"),("2021-01-01",2,"bread")).toDF("date","id","name") - df.write - .mode("overwrite") - .format("lakesoul") - .option("hashPartitions","id") - .option("hashBucketNum","2") - .option("rangePartitions","date") - .save(tablePath) - val dfRead = spark.read.format("lakesoul").load(tablePath).select("date", "id", "name") - assert(dfRead.queryExecution.explainString(ExtendedMode).contains("InputIteratorTransformer")) - assert(!dfRead.queryExecution.explainString(ExtendedMode).matches(".*\\bColumnarToRow\\b.*")) - checkAnswer(dfRead, Seq(("2021-01-01",1,"rice"),("2021-01-01",2,"bread")).toDF("date", "id", "name")) - }) - } - - test("lakesoul write scan - table") { - withTable("temp")({ - val df = Seq(("2021-01-01",1,"rice"),("2021-01-01",2,"bread")).toDF("date","id","name") - df.write - .mode("overwrite") - .format("lakesoul") - .option("hashPartitions","id") - .option("hashBucketNum","2") - .option("rangePartitions","date") - .saveAsTable("temp") - val dfRead = spark.sql(s"select date, id, name from temp") - assert(dfRead.queryExecution.explainString(ExtendedMode).contains("InputIteratorTransformer")) - assert(!dfRead.queryExecution.explainString(ExtendedMode).matches(".*\\bColumnarToRow\\b.*")) - checkAnswer(dfRead, Seq(("2021-01-01",1,"rice"),("2021-01-01",2,"bread")).toDF("date", "id", "name")) - }) - } - - override def withTable(tableNames: String*)(f: => Unit): Unit = { - Utils.tryWithSafeFinally(f) { - tableNames.foreach { name => - spark.sql(s"DROP TABLE IF EXISTS $name") - } - } - } - - override def withTempDir(f: File => Unit): Unit = { - val dir = Utils.createTempDir() - try { - f(dir) - waitForTasksToFinish() - } finally { - Utils.deleteRecursively(dir) - try { - LakeSoulTable.forPath(dir.getCanonicalPath).dropTable() - } catch { - case _: Exception => - } - } - } - - override protected val backend: String = "velox" - override protected val resourcePath: String = "/tpch-data-parquet-velox" - override protected val fileFormat: String = "parquet" -} +//package org.apache.spark.sql.lakesoul.rules +// +//import com.dmetasoul.lakesoul.tables.LakeSoulTable +//import io.glutenproject.execution.WholeStageTransformerSuite +//import org.apache.spark.SparkConf +//import org.apache.spark.sql.execution.ExtendedMode +//import org.apache.spark.sql.internal.SQLConf +//import org.apache.spark.sql.lakesoul.catalog.LakeSoulCatalog +//import org.apache.spark.util.Utils +//import org.junit.runner.RunWith +//import org.scalatestplus.junit.JUnitRunner +// +//import java.io.File +// +////@RunWith(classOf[JUnitRunner]) +//class LakeSoulGlutenCompatSuite +// extends WholeStageTransformerSuite { +// +// override def sparkConf: SparkConf = { +// super.sparkConf +// .set("spark.sql.codegen.wholeStage", "false") +// .set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.ColumnarShuffleManager") +// .set("spark.network.timeout", "10000000") +// .set("spark.sql.catalog.lakesoul", classOf[LakeSoulCatalog].getName) +// .set(SQLConf.DEFAULT_CATALOG.key, LakeSoulCatalog.CATALOG_NAME) +// .set("spark.sql.extensions", "com.dmetasoul.lakesoul.sql.LakeSoulSparkSessionExtension") +// } +// +// import testImplicits._ +// +// test("lakesoul write scan - nopk") { +// withTempDir(dir => { +// val tablePath = dir.getCanonicalPath +// val df = Seq(("2021-01-01",1,"rice"),("2021-01-01",2,"bread")).toDF("date","id","name") +// df.write +// .mode("overwrite") +// .format("lakesoul") +// .option("rangePartitions","date") +// .save(tablePath) +// val dfRead = spark.read.format("lakesoul").load(tablePath).select("date", "id", "name") +// assert(dfRead.queryExecution.explainString(ExtendedMode).contains("InputIteratorTransformer")) +// assert(!dfRead.queryExecution.explainString(ExtendedMode).matches(".*\\bColumnarToRow\\b.*")) +// checkAnswer(dfRead, Seq(("2021-01-01",1,"rice"),("2021-01-01",2,"bread")).toDF("date", "id", "name")) +// }) +// } +// +// test("lakesoul write scan - pk") { +// withTempDir(dir => { +// val tablePath = dir.getCanonicalPath +// val df = Seq(("2021-01-01",1,"rice"),("2021-01-01",2,"bread")).toDF("date","id","name") +// df.write +// .mode("overwrite") +// .format("lakesoul") +// .option("hashPartitions","id") +// .option("hashBucketNum","2") +// .option("rangePartitions","date") +// .save(tablePath) +// val dfRead = spark.read.format("lakesoul").load(tablePath).select("date", "id", "name") +// assert(dfRead.queryExecution.explainString(ExtendedMode).contains("InputIteratorTransformer")) +// assert(!dfRead.queryExecution.explainString(ExtendedMode).matches(".*\\bColumnarToRow\\b.*")) +// checkAnswer(dfRead, Seq(("2021-01-01",1,"rice"),("2021-01-01",2,"bread")).toDF("date", "id", "name")) +// }) +// } +// +// test("lakesoul write scan - table") { +// withTable("temp")({ +// val df = Seq(("2021-01-01",1,"rice"),("2021-01-01",2,"bread")).toDF("date","id","name") +// df.write +// .mode("overwrite") +// .format("lakesoul") +// .option("hashPartitions","id") +// .option("hashBucketNum","2") +// .option("rangePartitions","date") +// .saveAsTable("temp") +// val dfRead = spark.sql(s"select date, id, name from temp") +// assert(dfRead.queryExecution.explainString(ExtendedMode).contains("InputIteratorTransformer")) +// assert(!dfRead.queryExecution.explainString(ExtendedMode).matches(".*\\bColumnarToRow\\b.*")) +// checkAnswer(dfRead, Seq(("2021-01-01",1,"rice"),("2021-01-01",2,"bread")).toDF("date", "id", "name")) +// }) +// } +// +// override def withTable(tableNames: String*)(f: => Unit): Unit = { +// Utils.tryWithSafeFinally(f) { +// tableNames.foreach { name => +// spark.sql(s"DROP TABLE IF EXISTS $name") +// } +// } +// } +// +// override def withTempDir(f: File => Unit): Unit = { +// val dir = Utils.createTempDir() +// try { +// f(dir) +// waitForTasksToFinish() +// } finally { +// Utils.deleteRecursively(dir) +// try { +// LakeSoulTable.forPath(dir.getCanonicalPath).dropTable() +// } catch { +// case _: Exception => +// } +// } +// } +// +// override protected val backend: String = "velox" +// override protected val resourcePath: String = "/tpch-data-parquet-velox" +// override protected val fileFormat: String = "parquet" +//} diff --git a/lakesoul-spark/src/test/scala/org/apache/spark/sql/lakesoul/schema/CaseSensitivitySuite.scala b/lakesoul-spark/src/test/scala/org/apache/spark/sql/lakesoul/schema/CaseSensitivitySuite.scala index d74a53d33..7d884ad40 100644 --- a/lakesoul-spark/src/test/scala/org/apache/spark/sql/lakesoul/schema/CaseSensitivitySuite.scala +++ b/lakesoul-spark/src/test/scala/org/apache/spark/sql/lakesoul/schema/CaseSensitivitySuite.scala @@ -47,7 +47,7 @@ class CaseSensitivitySuite extends QueryTest test("set range partition columns with option - rangePartitions") { withTempDir { tempDir => val p = tempDir.getCanonicalPath - val path = SparkUtil.makeQualifiedTablePath(new Path(p)).toString + val path = SparkUtil.makeQualifiedTablePath(new Path(p)).toUri.toString Seq((1, "a"), (2, "b")).toDF("Key", "val").write .option("rangePartitions", "key") .format("lakesoul").mode("append").save(path) @@ -69,7 +69,7 @@ class CaseSensitivitySuite extends QueryTest test("set range partition columns with partitionBy") { withTempDir { tempDir => val p = tempDir.getCanonicalPath - val path = SparkUtil.makeQualifiedTablePath(new Path(p)).toString + val path = SparkUtil.makeQualifiedTablePath(new Path(p)).toUri.toString Seq((1, "a"), (2, "b")).toDF("Key", "val").write .partitionBy("key") .format("lakesoul").mode("append").save(path) @@ -90,7 +90,7 @@ class CaseSensitivitySuite extends QueryTest test("set range partition columns - rangePartitions has higher priority than partitionBy") { withTempDir { tempDir => val p = tempDir.getCanonicalPath - val path = SparkUtil.makeQualifiedTablePath(new Path(p)).toString + val path = SparkUtil.makeQualifiedTablePath(new Path(p)).toUri.toString Seq((1, "a"), (2, "b")).toDF("Key", "val").write .option("rangePartitions", "val") .partitionBy("key") @@ -113,7 +113,7 @@ class CaseSensitivitySuite extends QueryTest test("set hash partition columns with option- hashPartitions and hashBucketNum") { withTempDir { tempDir => val p = tempDir.getCanonicalPath - val path = SparkUtil.makeQualifiedTablePath(new Path(p)).toString + val path = SparkUtil.makeQualifiedTablePath(new Path(p)).toUri.toString val e1 = intercept[AnalysisException] { Seq((1, "a"), (2, "d")).toDF("key", "val").write @@ -160,7 +160,7 @@ class CaseSensitivitySuite extends QueryTest test("set hash partition columns") { withTempDir { tempDir => val p = tempDir.getCanonicalPath - val path = SparkUtil.makeQualifiedTablePath(new Path(p)).toString + val path = SparkUtil.makeQualifiedTablePath(new Path(p)).toUri.toString Seq((1, "a", "1"), (2, "b", "2")).toDF("key", "val", "hash").write .partitionBy("key") @@ -215,7 +215,7 @@ class CaseSensitivitySuite extends QueryTest test("set partition columns - case sensitive") { withTempDir { tempDir => val p = tempDir.getCanonicalPath - val path = SparkUtil.makeQualifiedTablePath(new Path(p)).toString + val path = SparkUtil.makeQualifiedTablePath(new Path(p)).toUri.toString withSQLConf(SQLConf.CASE_SENSITIVE.key -> "false") { Seq((1, "a", "1"), (2, "b", "2")).toDF("key", "val", "hash").write @@ -246,7 +246,7 @@ class CaseSensitivitySuite extends QueryTest test("set partition columns - case insensitive") { withTempDir { tempDir => val p = tempDir.getCanonicalPath - val path = SparkUtil.makeQualifiedTablePath(new Path(p)).toString + val path = SparkUtil.makeQualifiedTablePath(new Path(p)).toUri.toString withSQLConf(SQLConf.CASE_SENSITIVE.key -> "true") { Seq((1, "a", "1"), (2, "b", "2")).toDF("key", "val", "hash").write diff --git a/lakesoul-spark/src/test/scala/org/apache/spark/sql/lakesoul/test/TestUtils.scala b/lakesoul-spark/src/test/scala/org/apache/spark/sql/lakesoul/test/TestUtils.scala index 6e7a4bbce..6a9454c2d 100644 --- a/lakesoul-spark/src/test/scala/org/apache/spark/sql/lakesoul/test/TestUtils.scala +++ b/lakesoul-spark/src/test/scala/org/apache/spark/sql/lakesoul/test/TestUtils.scala @@ -202,7 +202,7 @@ trait LakeSoulTestBeforeAndAfterEach extends BeforeAndAfterEach { var snapshotManagement: SnapshotManagement = _ - protected def tempPath: String = SparkUtil.makeQualifiedTablePath(new Path(tempDir.getCanonicalPath)).toString + protected def tempPath: String = SparkUtil.makeQualifiedTablePath(new Path(tempDir.getCanonicalPath)).toUri.toString protected def readLakeSoulTable(path: String): DataFrame = { spark.read.format("lakesoul").load(path) diff --git a/native-io/lakesoul-io-java/pom.xml b/native-io/lakesoul-io-java/pom.xml index 436a335ad..23ac2ca09 100644 --- a/native-io/lakesoul-io-java/pom.xml +++ b/native-io/lakesoul-io-java/pom.xml @@ -25,14 +25,13 @@ SPDX-License-Identifier: Apache-2.0 8 8 - 12.0.0 + 15.0.2 3.1.0 0.28.0 3.22.0 0.6.1 - com.dmetasoul @@ -49,21 +48,58 @@ SPDX-License-Identifier: Apache-2.0 org.apache.arrow arrow-vector ${arrow.version} + + + io.netty + * + + + org.slf4j + * + + org.apache.arrow arrow-memory-netty ${arrow.version} + + + io.netty + * + + + org.slf4j + * + + + + + io.netty + netty-buffer + 4.1.112.Final org.apache.arrow arrow-memory-core + + + org.slf4j + * + + ${arrow.version} org.apache.arrow arrow-c-data ${arrow.version} + + + org.slf4j + * + + @@ -110,7 +146,7 @@ SPDX-License-Identifier: Apache-2.0 com.fasterxml.jackson.core - jackson-databind + * ${substrait.version} @@ -129,6 +165,10 @@ SPDX-License-Identifier: Apache-2.0 spark-catalyst_${scala.binary.version} ${spark.version} + + org.apache.hadoop + * + org.apache.arrow * @@ -602,19 +642,14 @@ SPDX-License-Identifier: Apache-2.0 - org.apache.arrow:arrow-c-data + org.apache.spark:spark-core_2.12 - org/apache/arrow/c/jni/JniLoader.class + org/sparkproject/jetty/** + META-INF/maven/org.eclipse.jetty/** + jetty-dir.css + META-INF/services/org.eclipse.jetty.http.HttpFieldPreEncoder - - - **/*scala* - - **/*.class - - - @@ -622,8 +657,12 @@ SPDX-License-Identifier: Apache-2.0 com.lakesoul.shaded.scala. - org.apache.spark.sql - com.lakesoul.shaded.org.apache.spark.sql + com.zaxxer.hikari + com.lakesoul.shaded.com.zaxxer.hikari + + + org.postgresql + com.lakesoul.shaded.org.postgresql org.ow2.asm @@ -665,6 +704,26 @@ SPDX-License-Identifier: Apache-2.0 com.fasterxml.jackson com.lakesoul.shaded.com.fasterxml.jackson + + org.json4s + com.lakesoul.shaded.org.json4s + + + org.apache.commons + com.lakesoul.shaded.org.apache.commons + + + dev.failsafe + com.lakesoul.shaded.dev.failsafe + + + org.aspectj + com.lakesoul.shaded.org.aspectj + + + org.checkerframework + com.lakesoul.shaded.org.checkerframework + org.yaml.snakeyaml com.lakesoul.shaded.org.yaml.snakeyaml @@ -673,6 +732,34 @@ SPDX-License-Identifier: Apache-2.0 org.antlr com.lakesoul.shaded.org.antlr + + io.netty + com.lakesoul.shaded.io.netty + + + com.alibaba.fastjson + com.lakesoul.shaded.com.alibaba.fastjson + + + com.google.flatbuffers + com.lakesoul.shaded.com.google.flatbuffers + + + org.apache.spark + com.lakesoul.shaded.org.apache.spark + + + io.substrait + com.lakesoul.shaded.io.substrait + + + org.stringtemplate + com.lakesoul.shaded.org.stringtemplate + + + org.abego + com.lakesoul.shaded.org.abego + diff --git a/native-io/lakesoul-io-java/src/main/java/org/apache/arrow/c/jni/JniLoader.java b/native-io/lakesoul-io-java/src/main/java/org/apache/arrow/c/jni/JniLoader.java deleted file mode 100644 index 692dfff55..000000000 --- a/native-io/lakesoul-io-java/src/main/java/org/apache/arrow/c/jni/JniLoader.java +++ /dev/null @@ -1,118 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/* - * This file is modified from https://github.com/apache/arrow/blob/apache-arrow-11.0.0/java/c/src/main/java/org/apache/arrow/c/jni/JniLoader.java - * to fix loading dynamic libraries on Windows platform. - */ - -package org.apache.arrow.c.jni; - -import java.io.File; -import java.io.FileNotFoundException; -import java.io.IOException; -import java.io.InputStream; -import java.nio.file.Files; -import java.nio.file.StandardCopyOption; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashSet; -import java.util.List; -import java.util.Locale; -import java.util.Set; - -/** - * The JniLoader for C Data Interface API's native implementation. - */ -public class JniLoader { - private static final JniLoader INSTANCE = new JniLoader(Collections.singletonList("arrow_cdata_jni")); - - public static JniLoader get() { - return INSTANCE; - } - - private final Set librariesToLoad; - - private JniLoader(List libraryNames) { - librariesToLoad = new HashSet<>(libraryNames); - } - - private boolean finished() { - return librariesToLoad.isEmpty(); - } - - /** - * If required JNI libraries are not loaded, then load them. - */ - public void ensureLoaded() { - if (finished()) { - return; - } - loadRemaining(); - } - - private synchronized void loadRemaining() { - // The method is protected by a mutex via synchronized, if more than one thread - // race to call - // loadRemaining, at same time only one will do the actual loading and the - // others will wait for - // the mutex to be acquired then check on the remaining list: if there are - // libraries that were not - // successfully loaded then the mutex owner will try to load them again. - if (finished()) { - return; - } - List libs = new ArrayList<>(librariesToLoad); - for (String lib : libs) { - load(lib); - librariesToLoad.remove(lib); - } - } - - private void load(String name) { - final String libraryToLoad = - getNormalizedArch() + "/" + System.mapLibraryName(name); - try { - File temp = File.createTempFile("jnilib-", ".tmp", new File(System.getProperty("java.io.tmpdir"))); - temp.deleteOnExit(); - try (final InputStream is = JniWrapper.class.getClassLoader().getResourceAsStream(libraryToLoad)) { - if (is == null) { - throw new FileNotFoundException(libraryToLoad); - } - Files.copy(is, temp.toPath(), StandardCopyOption.REPLACE_EXISTING); - System.load(temp.getAbsolutePath()); - } - } catch (IOException e) { - throw new IllegalStateException("error loading native libraries: " + e); - } - } - - private String getNormalizedArch() { - String arch = System.getProperty("os.arch").toLowerCase(Locale.US); - switch (arch) { - case "amd64": - arch = "x86_64"; - break; - case "aarch64": - arch = "aarch_64"; - break; - default: - break; - } - return arch; - } -} diff --git a/rust/lakesoul-datafusion/src/catalog/mod.rs b/rust/lakesoul-datafusion/src/catalog/mod.rs index 6f808e930..2a04a0c93 100644 --- a/rust/lakesoul-datafusion/src/catalog/mod.rs +++ b/rust/lakesoul-datafusion/src/catalog/mod.rs @@ -39,7 +39,7 @@ pub(crate) async fn create_table(client: MetaDataClientRef, table_name: &str, co table_id: format!("table_{}", uuid::Uuid::new_v4()), table_name: table_name.to_string(), table_path: format!( - "file:{}/default/{}", + "file://{}/default/{}", env::current_dir() .unwrap() .to_str() diff --git a/rust/lakesoul-datafusion/src/datasource/table_provider.rs b/rust/lakesoul-datafusion/src/datasource/table_provider.rs index 34126520d..6c5308a5a 100644 --- a/rust/lakesoul-datafusion/src/datasource/table_provider.rs +++ b/rust/lakesoul-datafusion/src/datasource/table_provider.rs @@ -8,26 +8,22 @@ use std::sync::Arc; use arrow::compute::SortOptions; use arrow::datatypes::{DataType, Schema, SchemaRef}; - use async_trait::async_trait; - -use datafusion::common::{project_schema, FileTypeWriterOptions, Statistics, ToDFSchema}; -use datafusion::datasource::file_format::parquet::ParquetFormat; +use datafusion::{execution::context::SessionState, logical_expr::Expr}; +use datafusion::common::{FileTypeWriterOptions, project_schema, Statistics, ToDFSchema}; use datafusion::datasource::file_format::FileFormat; -use datafusion::datasource::listing::{ListingOptions, ListingTable, ListingTableUrl, PartitionedFile}; +use datafusion::datasource::file_format::parquet::ParquetFormat; +use datafusion::datasource::listing::{ListingOptions, ListingTableUrl, PartitionedFile}; use datafusion::datasource::physical_plan::{FileScanConfig, FileSinkConfig}; use datafusion::datasource::TableProvider; use datafusion::error::{DataFusionError, Result}; -use datafusion::logical_expr::expr::Sort; use datafusion::logical_expr::{TableProviderFilterPushDown, TableType}; - +use datafusion::logical_expr::expr::Sort; use datafusion::optimizer::utils::conjunction; use datafusion::physical_expr::{create_physical_expr, LexOrdering, PhysicalSortExpr}; use datafusion::physical_plan::empty::EmptyExec; use datafusion::physical_plan::ExecutionPlan; use datafusion::scalar::ScalarValue; -use datafusion::{execution::context::SessionState, logical_expr::Expr}; - use futures::stream::FuturesUnordered; use futures::StreamExt; diff --git a/rust/lakesoul-datafusion/src/lakesoul_table/mod.rs b/rust/lakesoul-datafusion/src/lakesoul_table/mod.rs index 5b77cd275..70e5d2371 100644 --- a/rust/lakesoul-datafusion/src/lakesoul_table/mod.rs +++ b/rust/lakesoul-datafusion/src/lakesoul_table/mod.rs @@ -94,11 +94,9 @@ impl LakeSoulTable { .build()?; let dataframe = DataFrame::new(sess_ctx.state(), logical_plan); - let results = dataframe - // .explain(true, false)? + dataframe .collect() .await?; - // dbg!(&results); Ok(()) } diff --git a/rust/lakesoul-io/src/hash_utils/mod.rs b/rust/lakesoul-io/src/hash_utils/mod.rs index d3dbcf860..5dce6018c 100644 --- a/rust/lakesoul-io/src/hash_utils/mod.rs +++ b/rust/lakesoul-io/src/hash_utils/mod.rs @@ -291,7 +291,6 @@ where /// /// The number of rows to hash is determined by `hashes_buffer.len()`. /// `hashes_buffer` should be pre-sized appropriately -#[cfg(not(feature = "force_hash_collisions"))] pub fn create_hashes<'a>( arrays: &[ArrayRef], // random_state: &RandomState, diff --git a/rust/lakesoul-io/src/lakesoul_io_config.rs b/rust/lakesoul-io/src/lakesoul_io_config.rs index 8f1f7e29a..64c65db52 100644 --- a/rust/lakesoul-io/src/lakesoul_io_config.rs +++ b/rust/lakesoul-io/src/lakesoul_io_config.rs @@ -24,6 +24,7 @@ use datafusion::prelude::{SessionConfig, SessionContext}; use datafusion_common::DataFusionError::{External, ObjectStore}; use datafusion_substrait::substrait::proto::Plan; use derivative::Derivative; +use log::info; use object_store::aws::AmazonS3Builder; use object_store::{ClientOptions, RetryConfig}; use url::{ParseError, Url}; @@ -495,11 +496,13 @@ pub fn create_session_context_with_planner( .cloned(); if let Some(fs) = default_fs { config.default_fs = fs.clone(); + info!("NativeIO register default fs {}", fs); register_object_store(&fs, config, &runtime)?; }; if !config.prefix.is_empty() { let prefix = config.prefix.clone(); + info!("NativeIO register prefix fs {}", prefix); let normalized_prefix = register_object_store(&prefix, config, &runtime)?; config.prefix = normalized_prefix; } @@ -512,6 +515,8 @@ pub fn create_session_context_with_planner( .map(|file_name| register_object_store(&file_name, config, &runtime)) .collect::>>()?; config.files = normalized_filenames; + info!("NativeIO normalized file names: {:?}", config.files); + info!("NativeIO final config: {:?}", config); // create session context let mut state = if let Some(planner) = planner {