From 3b3d7046a9cad35dadde8e7413141d41cd03e8a4 Mon Sep 17 00:00:00 2001 From: wforget <643348094@qq.com> Date: Thu, 4 Sep 2025 11:53:26 +0800 Subject: [PATCH 01/16] chore: Add hdfs feature test job --- .github/actions/java-test/action.yaml | 10 +++++++- .github/workflows/pr_build_linux.yml | 35 ++++++++++++++++++++++++++- 2 files changed, 43 insertions(+), 2 deletions(-) diff --git a/.github/actions/java-test/action.yaml b/.github/actions/java-test/action.yaml index 1f7899f8ac..2260196fdb 100644 --- a/.github/actions/java-test/action.yaml +++ b/.github/actions/java-test/action.yaml @@ -29,6 +29,10 @@ inputs: description: 'Maven options passed to the mvn command' required: false default: '' + features: + description: 'Build native features (like hdfs e.g)' + required: false + default: '' scan_impl: description: 'The default Parquet scan implementation' required: false @@ -45,7 +49,11 @@ runs: shell: bash run: | cd native - cargo build + FEATURES_ARG="" + if [ -n "${{ inputs.features }}" ]; then + FEATURES_ARG="--features=${{ inputs.features }}" + fi + cargo build $FEATURES_ARG - name: Cache Maven dependencies uses: actions/cache@v4 diff --git a/.github/workflows/pr_build_linux.yml b/.github/workflows/pr_build_linux.yml index c45355978e..bc6462d8b8 100644 --- a/.github/workflows/pr_build_linux.yml +++ b/.github/workflows/pr_build_linux.yml @@ -170,4 +170,37 @@ jobs: suites: ${{ matrix.suite.value }} maven_opts: ${{ matrix.profile.maven_opts }} scan_impl: ${{ matrix.profile.scan_impl }} - upload-test-reports: true \ No newline at end of file + upload-test-reports: true + + # Java tests with native features + linux-test-features: + strategy: + matrix: + os: [ubuntu-latest] + java_version: [17] + features: + - value: "hdfs-opendal" + suites: | + org.apache.comet.parquet.ParquetReadFromFakeHadoopFsSuite + fail-fast: false + name: ${{ matrix.os }}/java ${{ matrix.java_version }}-features [${{ matrix.features.value }}] + runs-on: ${{ matrix.os }} + container: + image: amd64/rust + env: + JAVA_TOOL_OPTIONS: ${{ matrix.profile.java_version == '17' && '--add-exports=java.base/sun.nio.ch=ALL-UNNAMED --add-exports=java.base/sun.util.calendar=ALL-UNNAMED --add-opens=java.base/java.nio=ALL-UNNAMED --add-opens=java.base/java.lang=ALL-UNNAMED' || '' }} + + steps: + - uses: actions/checkout@v5 + - name: Setup Rust & Java toolchain + uses: ./.github/actions/setup-builder + with: + rust-version: ${{env.RUST_VERSION}} + jdk-version: ${{ matrix.java_version }} + - name: Java test steps + uses: ./.github/actions/java-test + with: + artifact_name: ${{ matrix.os }}-java-${{ matrix.java_version }}-features-${{ matrix.features.value }}-${{ github.run_id }}-${{ github.run_number }}-${{ github.run_attempt }} + features: ${{ matrix.features.value }} + suites: ${{ matrix.features.suites }} + upload-test-reports: true From 44c866b0a7d9ac7001d9e50e91920d0bb6b0ea7c Mon Sep 17 00:00:00 2001 From: wforget <643348094@qq.com> Date: Tue, 9 Sep 2025 14:16:26 +0800 Subject: [PATCH 02/16] fix and add hdfs suite --- .github/workflows/pr_build_linux.yml | 1 + .../org/apache/comet/WithHdfsCluster.scala | 4 +- .../ParquetReadFromFakeHadoopFsSuite.scala | 5 +- .../parquet/ParquetReadFromHdfsSuite.scala | 73 +++++++++++++++++++ 4 files changed, 78 insertions(+), 5 deletions(-) create mode 100644 spark/src/test/scala/org/apache/comet/parquet/ParquetReadFromHdfsSuite.scala diff --git a/.github/workflows/pr_build_linux.yml b/.github/workflows/pr_build_linux.yml index bc6462d8b8..b0727ec9e0 100644 --- a/.github/workflows/pr_build_linux.yml +++ b/.github/workflows/pr_build_linux.yml @@ -182,6 +182,7 @@ jobs: - value: "hdfs-opendal" suites: | org.apache.comet.parquet.ParquetReadFromFakeHadoopFsSuite + org.apache.comet.parquet.ParquetReadFromHdfsSuite fail-fast: false name: ${{ matrix.os }}/java ${{ matrix.java_version }}-features [${{ matrix.features.value }}] runs-on: ${{ matrix.os }} diff --git a/spark/src/test/scala/org/apache/comet/WithHdfsCluster.scala b/spark/src/test/scala/org/apache/comet/WithHdfsCluster.scala index 49124d63e5..68734a8c33 100644 --- a/spark/src/test/scala/org/apache/comet/WithHdfsCluster.scala +++ b/spark/src/test/scala/org/apache/comet/WithHdfsCluster.scala @@ -21,7 +21,6 @@ package org.apache.comet import java.io.{File, FileWriter} import java.net.InetAddress -import java.nio.file.Files import java.util.UUID import scala.collection.JavaConverters._ @@ -32,6 +31,7 @@ import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.hadoop.hdfs.MiniDFSCluster import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys import org.apache.spark.internal.Logging +import org.apache.spark.network.util.JavaUtils /** * Trait for starting and stopping a MiniDFSCluster for testing. @@ -65,7 +65,7 @@ trait WithHdfsCluster extends Logging { "NameNode address in configuration is " + s"${hdfsConf.get(HdfsClientConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY)}") hadoopConfDir = - Files.createTempDirectory(s"comet_hdfs_conf_${UUID.randomUUID().toString}").toFile + JavaUtils.createDirectory(System.getProperty("java.io.tmpdir"), "comet_hdfs_conf") saveHadoopConf(hadoopConfDir) fileSystem = hdfsCluster.getFileSystem diff --git a/spark/src/test/scala/org/apache/comet/parquet/ParquetReadFromFakeHadoopFsSuite.scala b/spark/src/test/scala/org/apache/comet/parquet/ParquetReadFromFakeHadoopFsSuite.scala index f4a8b5ed82..bd6c0f30bc 100644 --- a/spark/src/test/scala/org/apache/comet/parquet/ParquetReadFromFakeHadoopFsSuite.scala +++ b/spark/src/test/scala/org/apache/comet/parquet/ParquetReadFromFakeHadoopFsSuite.scala @@ -20,11 +20,10 @@ package org.apache.comet.parquet import java.io.File -import java.nio.file.Files -import java.util.UUID import org.apache.commons.io.FileUtils import org.apache.spark.SparkConf +import org.apache.spark.network.util.JavaUtils import org.apache.spark.sql.{CometTestBase, DataFrame, SaveMode} import org.apache.spark.sql.comet.CometNativeScanExec import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper @@ -46,7 +45,7 @@ class ParquetReadFromFakeHadoopFsSuite extends CometTestBase with AdaptiveSparkP override def beforeAll(): Unit = { // Initialize fake root dir - fake_root_dir = Files.createTempDirectory(s"comet_fake_${UUID.randomUUID().toString}").toFile + fake_root_dir = JavaUtils.createDirectory(System.getProperty("java.io.tmpdir"), "comet_test") // Initialize Spark session super.beforeAll() } diff --git a/spark/src/test/scala/org/apache/comet/parquet/ParquetReadFromHdfsSuite.scala b/spark/src/test/scala/org/apache/comet/parquet/ParquetReadFromHdfsSuite.scala new file mode 100644 index 0000000000..0f80d5d044 --- /dev/null +++ b/spark/src/test/scala/org/apache/comet/parquet/ParquetReadFromHdfsSuite.scala @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.comet.parquet + +import org.apache.spark.sql.{CometTestBase, DataFrame, SaveMode} +import org.apache.spark.sql.comet.CometNativeScanExec +import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper +import org.apache.spark.sql.functions.{col, sum} + +import org.apache.comet.{CometConf, WithHdfsCluster} + +class ParquetReadFromHdfsSuite + extends CometTestBase + with AdaptiveSparkPlanHelper + with WithHdfsCluster { + + override protected def createSparkSession: SparkSessionType = { + // start HDFS cluster and add hadoop conf + startHdfsCluster() + val sparkSession = super.createSparkSession + sparkSession.sparkContext.hadoopConfiguration.addResource(getHadoopConfFile) + sparkSession + } + + private def writeTestParquetFile(filePath: String): Unit = { + val df = spark.range(0, 1000) + df.write.format("parquet").mode(SaveMode.Overwrite).save(filePath) + } + + private def assertCometNativeScanOnHDFS(df: DataFrame): Unit = { + val scans = collect(df.queryExecution.executedPlan) { case p: CometNativeScanExec => + p + } + assert(scans.size == 1) + assert( + scans.head.nativeOp.getNativeScan + .getFilePartitions(0) + .getPartitionedFile(0) + .getFilePath + .startsWith("hdfs://")) + } + + test("test native_datafusion scan on fake fs") { + withTmpHdfsDir { dir => + { + val testFilePath = dir.toString + writeTestParquetFile(testFilePath) + withSQLConf(CometConf.COMET_NATIVE_SCAN_IMPL.key -> CometConf.SCAN_NATIVE_DATAFUSION) { + val df = spark.read.format("parquet").load(testFilePath).agg(sum(col("id"))) + assertCometNativeScanOnHDFS(df) + assert(df.first().getLong(0) == 499500) + } + } + } + } +} From 8c2736bccc9e779904a636ea7f420f001c342a78 Mon Sep 17 00:00:00 2001 From: wforget <643348094@qq.com> Date: Tue, 9 Sep 2025 14:34:59 +0800 Subject: [PATCH 03/16] skip java test --- .github/workflows/pr_build_linux.yml | 1 + 1 file changed, 1 insertion(+) diff --git a/.github/workflows/pr_build_linux.yml b/.github/workflows/pr_build_linux.yml index b0727ec9e0..e74b617378 100644 --- a/.github/workflows/pr_build_linux.yml +++ b/.github/workflows/pr_build_linux.yml @@ -203,5 +203,6 @@ jobs: with: artifact_name: ${{ matrix.os }}-java-${{ matrix.java_version }}-features-${{ matrix.features.value }}-${{ github.run_id }}-${{ github.run_number }}-${{ github.run_attempt }} features: ${{ matrix.features.value }} + maven_opts: "-Dtest=none" suites: ${{ matrix.features.suites }} upload-test-reports: true From 1863538a41a0410942968cfc34fbd40a8eb87d1a Mon Sep 17 00:00:00 2001 From: wforget <643348094@qq.com> Date: Tue, 9 Sep 2025 15:10:21 +0800 Subject: [PATCH 04/16] fix --- .../org/apache/comet/parquet/ParquetReadFromHdfsSuite.scala | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/spark/src/test/scala/org/apache/comet/parquet/ParquetReadFromHdfsSuite.scala b/spark/src/test/scala/org/apache/comet/parquet/ParquetReadFromHdfsSuite.scala index 0f80d5d044..3bb58fd11a 100644 --- a/spark/src/test/scala/org/apache/comet/parquet/ParquetReadFromHdfsSuite.scala +++ b/spark/src/test/scala/org/apache/comet/parquet/ParquetReadFromHdfsSuite.scala @@ -39,6 +39,11 @@ class ParquetReadFromHdfsSuite sparkSession } + protected override def afterAll(): Unit = { + super.afterAll() + stopHdfsCluster() + } + private def writeTestParquetFile(filePath: String): Unit = { val df = spark.range(0, 1000) df.write.format("parquet").mode(SaveMode.Overwrite).save(filePath) From 4f341f5c240a71ad4bfb14c7ada3c0a6eead6a13 Mon Sep 17 00:00:00 2001 From: wforget <643348094@qq.com> Date: Tue, 9 Sep 2025 17:56:04 +0800 Subject: [PATCH 05/16] disable hdrs async_file feature --- native/core/Cargo.toml | 9 +++++---- .../apache/comet/parquet/ParquetReadFromHdfsSuite.scala | 2 +- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/native/core/Cargo.toml b/native/core/Cargo.toml index 16a8a7316d..e5f862cd88 100644 --- a/native/core/Cargo.toml +++ b/native/core/Cargo.toml @@ -74,8 +74,9 @@ aws-credential-types = { workspace = true } parking_lot = "0.12.3" datafusion-comet-objectstore-hdfs = { path = "../hdfs", optional = true, default-features = false, features = ["hdfs"] } -object_store_opendal = {version = "0.54.0", optional = true} -hdfs-sys = {version = "0.3", optional = true, features = ["hdfs_3_3"]} +object_store_opendal = { version = "0.54.0", optional = true } +hdrs = { version = "0.3.2", optional = true, default-features = false } +hdfs-sys = { version = "0.3", optional = true, features = ["hdfs_3_3"] } opendal = { version ="0.54.0", optional = true, features = ["services-hdfs"] } [target.'cfg(target_os = "linux")'.dependencies] @@ -96,12 +97,12 @@ datafusion-functions-nested = { version = "49.0.2" } [features] default = [] hdfs = ["datafusion-comet-objectstore-hdfs"] -hdfs-opendal = ["opendal", "object_store_opendal", "hdfs-sys"] +hdfs-opendal = ["opendal", "object_store_opendal", "hdrs", "hdfs-sys"] jemalloc = ["tikv-jemallocator", "tikv-jemalloc-ctl"] # exclude optional packages from cargo machete verifications [package.metadata.cargo-machete] -ignored = ["datafusion-comet-objectstore-hdfs", "hdfs-sys"] +ignored = ["datafusion-comet-objectstore-hdfs", "hdrs", "hdfs-sys"] [lib] name = "comet" diff --git a/spark/src/test/scala/org/apache/comet/parquet/ParquetReadFromHdfsSuite.scala b/spark/src/test/scala/org/apache/comet/parquet/ParquetReadFromHdfsSuite.scala index 3bb58fd11a..fafb67d7f6 100644 --- a/spark/src/test/scala/org/apache/comet/parquet/ParquetReadFromHdfsSuite.scala +++ b/spark/src/test/scala/org/apache/comet/parquet/ParquetReadFromHdfsSuite.scala @@ -62,7 +62,7 @@ class ParquetReadFromHdfsSuite .startsWith("hdfs://")) } - test("test native_datafusion scan on fake fs") { + test("test native_datafusion scan on hdfs") { withTmpHdfsDir { dir => { val testFilePath = dir.toString From 832288b19c8b876fe4c0a69457c906d2292499c2 Mon Sep 17 00:00:00 2001 From: wforget <643348094@qq.com> Date: Thu, 11 Sep 2025 11:51:43 +0800 Subject: [PATCH 06/16] wait for hdfs async blocking thread exit --- spark/src/test/scala/org/apache/comet/WithHdfsCluster.scala | 3 +++ 1 file changed, 3 insertions(+) diff --git a/spark/src/test/scala/org/apache/comet/WithHdfsCluster.scala b/spark/src/test/scala/org/apache/comet/WithHdfsCluster.scala index 68734a8c33..a9201a9932 100644 --- a/spark/src/test/scala/org/apache/comet/WithHdfsCluster.scala +++ b/spark/src/test/scala/org/apache/comet/WithHdfsCluster.scala @@ -74,6 +74,9 @@ trait WithHdfsCluster extends Logging { } def stopHdfsCluster(): Unit = { + // wait for hdfs async blocking thread exit to avoid jvm crash, see: + // https://github.com/apache/datafusion-comet/issues/2354 + Thread.sleep(2000) if (hdfsCluster != null) hdfsCluster.shutdown(true) if (hadoopConfDir != null) FileUtils.deleteDirectory(hadoopConfDir) } From fb8e8082e0774fb21e476b8479798bd85c7858bb Mon Sep 17 00:00:00 2001 From: wforget <643348094@qq.com> Date: Thu, 11 Sep 2025 11:54:21 +0800 Subject: [PATCH 07/16] revert disable hdrs async_file --- native/core/Cargo.toml | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/native/core/Cargo.toml b/native/core/Cargo.toml index e5f862cd88..e6828237ae 100644 --- a/native/core/Cargo.toml +++ b/native/core/Cargo.toml @@ -75,7 +75,6 @@ parking_lot = "0.12.3" datafusion-comet-objectstore-hdfs = { path = "../hdfs", optional = true, default-features = false, features = ["hdfs"] } object_store_opendal = { version = "0.54.0", optional = true } -hdrs = { version = "0.3.2", optional = true, default-features = false } hdfs-sys = { version = "0.3", optional = true, features = ["hdfs_3_3"] } opendal = { version ="0.54.0", optional = true, features = ["services-hdfs"] } @@ -97,12 +96,12 @@ datafusion-functions-nested = { version = "49.0.2" } [features] default = [] hdfs = ["datafusion-comet-objectstore-hdfs"] -hdfs-opendal = ["opendal", "object_store_opendal", "hdrs", "hdfs-sys"] +hdfs-opendal = ["opendal", "object_store_opendal", "hdfs-sys"] jemalloc = ["tikv-jemallocator", "tikv-jemalloc-ctl"] # exclude optional packages from cargo machete verifications [package.metadata.cargo-machete] -ignored = ["datafusion-comet-objectstore-hdfs", "hdrs", "hdfs-sys"] +ignored = ["datafusion-comet-objectstore-hdfs", "hdfs-sys"] [lib] name = "comet" From ac894808ddd65006d6823a1ffcb5456a1e46bc67 Mon Sep 17 00:00:00 2001 From: wforget <643348094@qq.com> Date: Thu, 11 Sep 2025 14:28:36 +0800 Subject: [PATCH 08/16] fix --- spark/src/test/scala/org/apache/comet/WithHdfsCluster.scala | 4 ++-- .../comet/parquet/ParquetReadFromFakeHadoopFsSuite.scala | 5 +++-- .../org/apache/comet/parquet/ParquetReadFromHdfsSuite.scala | 2 +- 3 files changed, 6 insertions(+), 5 deletions(-) diff --git a/spark/src/test/scala/org/apache/comet/WithHdfsCluster.scala b/spark/src/test/scala/org/apache/comet/WithHdfsCluster.scala index a9201a9932..30d1581696 100644 --- a/spark/src/test/scala/org/apache/comet/WithHdfsCluster.scala +++ b/spark/src/test/scala/org/apache/comet/WithHdfsCluster.scala @@ -21,6 +21,7 @@ package org.apache.comet import java.io.{File, FileWriter} import java.net.InetAddress +import java.nio.file.Files import java.util.UUID import scala.collection.JavaConverters._ @@ -31,7 +32,6 @@ import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.hadoop.hdfs.MiniDFSCluster import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys import org.apache.spark.internal.Logging -import org.apache.spark.network.util.JavaUtils /** * Trait for starting and stopping a MiniDFSCluster for testing. @@ -65,7 +65,7 @@ trait WithHdfsCluster extends Logging { "NameNode address in configuration is " + s"${hdfsConf.get(HdfsClientConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY)}") hadoopConfDir = - JavaUtils.createDirectory(System.getProperty("java.io.tmpdir"), "comet_hdfs_conf") + Files.createTempDirectory(s"comet_hdfs_conf_${UUID.randomUUID().toString}").toFile saveHadoopConf(hadoopConfDir) fileSystem = hdfsCluster.getFileSystem diff --git a/spark/src/test/scala/org/apache/comet/parquet/ParquetReadFromFakeHadoopFsSuite.scala b/spark/src/test/scala/org/apache/comet/parquet/ParquetReadFromFakeHadoopFsSuite.scala index bd6c0f30bc..f4a8b5ed82 100644 --- a/spark/src/test/scala/org/apache/comet/parquet/ParquetReadFromFakeHadoopFsSuite.scala +++ b/spark/src/test/scala/org/apache/comet/parquet/ParquetReadFromFakeHadoopFsSuite.scala @@ -20,10 +20,11 @@ package org.apache.comet.parquet import java.io.File +import java.nio.file.Files +import java.util.UUID import org.apache.commons.io.FileUtils import org.apache.spark.SparkConf -import org.apache.spark.network.util.JavaUtils import org.apache.spark.sql.{CometTestBase, DataFrame, SaveMode} import org.apache.spark.sql.comet.CometNativeScanExec import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper @@ -45,7 +46,7 @@ class ParquetReadFromFakeHadoopFsSuite extends CometTestBase with AdaptiveSparkP override def beforeAll(): Unit = { // Initialize fake root dir - fake_root_dir = JavaUtils.createDirectory(System.getProperty("java.io.tmpdir"), "comet_test") + fake_root_dir = Files.createTempDirectory(s"comet_fake_${UUID.randomUUID().toString}").toFile // Initialize Spark session super.beforeAll() } diff --git a/spark/src/test/scala/org/apache/comet/parquet/ParquetReadFromHdfsSuite.scala b/spark/src/test/scala/org/apache/comet/parquet/ParquetReadFromHdfsSuite.scala index fafb67d7f6..27d37a3551 100644 --- a/spark/src/test/scala/org/apache/comet/parquet/ParquetReadFromHdfsSuite.scala +++ b/spark/src/test/scala/org/apache/comet/parquet/ParquetReadFromHdfsSuite.scala @@ -32,9 +32,9 @@ class ParquetReadFromHdfsSuite with WithHdfsCluster { override protected def createSparkSession: SparkSessionType = { + val sparkSession = super.createSparkSession // start HDFS cluster and add hadoop conf startHdfsCluster() - val sparkSession = super.createSparkSession sparkSession.sparkContext.hadoopConfiguration.addResource(getHadoopConfFile) sparkSession } From a1fd391681ed8b7a0bb9a8ff7e288eae33733f39 Mon Sep 17 00:00:00 2001 From: wforget <643348094@qq.com> Date: Thu, 11 Sep 2025 14:47:14 +0800 Subject: [PATCH 09/16] clean tmp dir --- .github/actions/java-test/action.yaml | 3 +++ 1 file changed, 3 insertions(+) diff --git a/.github/actions/java-test/action.yaml b/.github/actions/java-test/action.yaml index 2260196fdb..41bbad7c26 100644 --- a/.github/actions/java-test/action.yaml +++ b/.github/actions/java-test/action.yaml @@ -90,6 +90,9 @@ runs: MAVEN_SUITES="$(echo "${{ inputs.suites }}" | paste -sd, -)" echo "Running with MAVEN_SUITES=$MAVEN_SUITES" MAVEN_OPTS="-Xmx4G -Xms2G -DwildcardSuites=$MAVEN_SUITES -XX:+UnlockDiagnosticVMOptions -XX:+ShowMessageBoxOnError -XX:+HeapDumpOnOutOfMemoryError -XX:ErrorFile=./hs_err_pid%p.log" SPARK_HOME=`pwd` ./mvnw -B clean install ${{ inputs.maven_opts }} + // clean tmp dir to avoid hashFiles error + ls spark/target/tmp + rm -rf spark/target/tmp - name: Upload crash logs if: failure() uses: actions/upload-artifact@v4 From 5004d8f3b096d2f09f12c7ff15228a9980f53095 Mon Sep 17 00:00:00 2001 From: wforget <643348094@qq.com> Date: Thu, 11 Sep 2025 15:06:33 +0800 Subject: [PATCH 10/16] fix comment --- .github/actions/java-test/action.yaml | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/.github/actions/java-test/action.yaml b/.github/actions/java-test/action.yaml index 41bbad7c26..24af6908ce 100644 --- a/.github/actions/java-test/action.yaml +++ b/.github/actions/java-test/action.yaml @@ -90,7 +90,8 @@ runs: MAVEN_SUITES="$(echo "${{ inputs.suites }}" | paste -sd, -)" echo "Running with MAVEN_SUITES=$MAVEN_SUITES" MAVEN_OPTS="-Xmx4G -Xms2G -DwildcardSuites=$MAVEN_SUITES -XX:+UnlockDiagnosticVMOptions -XX:+ShowMessageBoxOnError -XX:+HeapDumpOnOutOfMemoryError -XX:ErrorFile=./hs_err_pid%p.log" SPARK_HOME=`pwd` ./mvnw -B clean install ${{ inputs.maven_opts }} - // clean tmp dir to avoid hashFiles error + # Hadoop mini cluster may interfere with spark shutdown hook, causing the tmp directory to not be fully cleaned up. + # Clean tmp dir to avoid hashFiles error ls spark/target/tmp rm -rf spark/target/tmp - name: Upload crash logs From 2e54b7c04c0881e2f8568d4d9a1f1d735a4b8169 Mon Sep 17 00:00:00 2001 From: wforget <643348094@qq.com> Date: Thu, 11 Sep 2025 16:09:24 +0800 Subject: [PATCH 11/16] add hdfs test --- .github/workflows/pr_build_linux.yml | 5 ++++- .../org/apache/comet/parquet/ParquetReadFromHdfsSuite.scala | 2 ++ .../src/test/scala/org/apache/spark/sql/CometTestBase.scala | 5 +++++ 3 files changed, 11 insertions(+), 1 deletion(-) diff --git a/.github/workflows/pr_build_linux.yml b/.github/workflows/pr_build_linux.yml index e74b617378..b4fa851725 100644 --- a/.github/workflows/pr_build_linux.yml +++ b/.github/workflows/pr_build_linux.yml @@ -183,6 +183,9 @@ jobs: suites: | org.apache.comet.parquet.ParquetReadFromFakeHadoopFsSuite org.apache.comet.parquet.ParquetReadFromHdfsSuite + - value: "hdfs" + suites: | + org.apache.comet.parquet.ParquetReadFromHdfsSuite fail-fast: false name: ${{ matrix.os }}/java ${{ matrix.java_version }}-features [${{ matrix.features.value }}] runs-on: ${{ matrix.os }} @@ -203,6 +206,6 @@ jobs: with: artifact_name: ${{ matrix.os }}-java-${{ matrix.java_version }}-features-${{ matrix.features.value }}-${{ github.run_id }}-${{ github.run_number }}-${{ github.run_attempt }} features: ${{ matrix.features.value }} - maven_opts: "-Dtest=none" + maven_opts: "-Dtest=none -Dfeatures=${{ matrix.features.value }}" suites: ${{ matrix.features.suites }} upload-test-reports: true diff --git a/spark/src/test/scala/org/apache/comet/parquet/ParquetReadFromHdfsSuite.scala b/spark/src/test/scala/org/apache/comet/parquet/ParquetReadFromHdfsSuite.scala index 27d37a3551..ad438e215a 100644 --- a/spark/src/test/scala/org/apache/comet/parquet/ParquetReadFromHdfsSuite.scala +++ b/spark/src/test/scala/org/apache/comet/parquet/ParquetReadFromHdfsSuite.scala @@ -63,6 +63,8 @@ class ParquetReadFromHdfsSuite } test("test native_datafusion scan on hdfs") { + assume(featureEnabled("hdfs") || featureEnabled("hdfs-opendal")) + withTmpHdfsDir { dir => { val testFilePath = dir.toString diff --git a/spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala b/spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala index b896943314..2fb00066b1 100644 --- a/spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala +++ b/spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala @@ -1147,4 +1147,9 @@ abstract class CometTestBase usingDataSourceExec(conf) && !CometConf.COMET_SCAN_ALLOW_INCOMPATIBLE.get(conf) } + + def featureEnabled(feature: String): Boolean = { + System.getProperty("feature", "").split(",").contains(feature) + } + } From 8e51b924d744753f72f9bef444bedbcb0cedc8d0 Mon Sep 17 00:00:00 2001 From: wforget <643348094@qq.com> Date: Thu, 11 Sep 2025 16:23:15 +0800 Subject: [PATCH 12/16] fix --- .../comet/parquet/ParquetReadFromFakeHadoopFsSuite.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/spark/src/test/scala/org/apache/comet/parquet/ParquetReadFromFakeHadoopFsSuite.scala b/spark/src/test/scala/org/apache/comet/parquet/ParquetReadFromFakeHadoopFsSuite.scala index f4a8b5ed82..2ae777d317 100644 --- a/spark/src/test/scala/org/apache/comet/parquet/ParquetReadFromFakeHadoopFsSuite.scala +++ b/spark/src/test/scala/org/apache/comet/parquet/ParquetReadFromFakeHadoopFsSuite.scala @@ -45,10 +45,10 @@ class ParquetReadFromFakeHadoopFsSuite extends CometTestBase with AdaptiveSparkP } override def beforeAll(): Unit = { - // Initialize fake root dir - fake_root_dir = Files.createTempDirectory(s"comet_fake_${UUID.randomUUID().toString}").toFile // Initialize Spark session super.beforeAll() + // Initialize fake root dir + fake_root_dir = Files.createTempDirectory(s"comet_fake_${UUID.randomUUID().toString}").toFile } protected override def afterAll(): Unit = { From 50f58734f875bdbf953e82ac755b03619adef111 Mon Sep 17 00:00:00 2001 From: wforget <643348094@qq.com> Date: Fri, 12 Sep 2025 10:47:56 +0800 Subject: [PATCH 13/16] add native_iceberg_compat test --- .../ParquetReadFromFakeHadoopFsSuite.scala | 34 ++++++++++++------- .../parquet/ParquetReadFromHdfsSuite.scala | 33 +++++++++++------- 2 files changed, 41 insertions(+), 26 deletions(-) diff --git a/spark/src/test/scala/org/apache/comet/parquet/ParquetReadFromFakeHadoopFsSuite.scala b/spark/src/test/scala/org/apache/comet/parquet/ParquetReadFromFakeHadoopFsSuite.scala index 2ae777d317..03f9437b29 100644 --- a/spark/src/test/scala/org/apache/comet/parquet/ParquetReadFromFakeHadoopFsSuite.scala +++ b/spark/src/test/scala/org/apache/comet/parquet/ParquetReadFromFakeHadoopFsSuite.scala @@ -26,7 +26,7 @@ import java.util.UUID import org.apache.commons.io.FileUtils import org.apache.spark.SparkConf import org.apache.spark.sql.{CometTestBase, DataFrame, SaveMode} -import org.apache.spark.sql.comet.CometNativeScanExec +import org.apache.spark.sql.comet.{CometNativeScanExec, CometScanExec} import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper import org.apache.spark.sql.functions.{col, sum} @@ -62,16 +62,21 @@ class ParquetReadFromFakeHadoopFsSuite extends CometTestBase with AdaptiveSparkP } private def assertCometNativeScanOnFakeFs(df: DataFrame): Unit = { - val scans = collect(df.queryExecution.executedPlan) { case p: CometNativeScanExec => - p + val scans = collect(df.queryExecution.executedPlan) { + case p: CometNativeScanExec => + assert( + p.nativeOp.getNativeScan + .getFilePartitions(0) + .getPartitionedFile(0) + .getFilePath + .startsWith(FakeHDFSFileSystem.PREFIX)) + p + case p: CometScanExec if p.scanImpl == CometConf.SCAN_NATIVE_ICEBERG_COMPAT => + assert(p.toString().contains(FakeHDFSFileSystem.PREFIX)) + p } assert(scans.size == 1) - assert( - scans.head.nativeOp.getNativeScan - .getFilePartitions(0) - .getPartitionedFile(0) - .getFilePath - .startsWith(FakeHDFSFileSystem.PREFIX)) + } // This test fails for 'hdfs' but succeeds for 'open-dal'. 'hdfs' requires this fix @@ -82,10 +87,13 @@ class ParquetReadFromFakeHadoopFsSuite extends CometTestBase with AdaptiveSparkP val testFilePath = s"${FakeHDFSFileSystem.PREFIX}${fake_root_dir.getAbsolutePath}/data/test-file.parquet" writeTestParquetFile(testFilePath) - withSQLConf(CometConf.COMET_NATIVE_SCAN_IMPL.key -> CometConf.SCAN_NATIVE_DATAFUSION) { - val df = spark.read.format("parquet").load(testFilePath).agg(sum(col("id"))) - assertCometNativeScanOnFakeFs(df) - assert(df.first().getLong(0) == 499500) + Seq(CometConf.SCAN_NATIVE_DATAFUSION, CometConf.SCAN_NATIVE_ICEBERG_COMPAT).foreach { + scanImpl => + withSQLConf(CometConf.COMET_NATIVE_SCAN_IMPL.key -> scanImpl) { + val df = spark.read.format("parquet").load(testFilePath).agg(sum(col("id"))) + assertCometNativeScanOnFakeFs(df) + assert(df.first().getLong(0) == 499500) + } } } } diff --git a/spark/src/test/scala/org/apache/comet/parquet/ParquetReadFromHdfsSuite.scala b/spark/src/test/scala/org/apache/comet/parquet/ParquetReadFromHdfsSuite.scala index ad438e215a..8ca5af30aa 100644 --- a/spark/src/test/scala/org/apache/comet/parquet/ParquetReadFromHdfsSuite.scala +++ b/spark/src/test/scala/org/apache/comet/parquet/ParquetReadFromHdfsSuite.scala @@ -20,7 +20,7 @@ package org.apache.comet.parquet import org.apache.spark.sql.{CometTestBase, DataFrame, SaveMode} -import org.apache.spark.sql.comet.CometNativeScanExec +import org.apache.spark.sql.comet.{CometNativeScanExec, CometScanExec} import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper import org.apache.spark.sql.functions.{col, sum} @@ -50,16 +50,20 @@ class ParquetReadFromHdfsSuite } private def assertCometNativeScanOnHDFS(df: DataFrame): Unit = { - val scans = collect(df.queryExecution.executedPlan) { case p: CometNativeScanExec => - p + val scans = collect(df.queryExecution.executedPlan) { + case p: CometNativeScanExec => + assert( + p.nativeOp.getNativeScan + .getFilePartitions(0) + .getPartitionedFile(0) + .getFilePath + .startsWith("hdfs://")) + p + case p: CometScanExec if p.scanImpl == CometConf.SCAN_NATIVE_ICEBERG_COMPAT => + assert(p.toString().contains("hdfs://")) + p } assert(scans.size == 1) - assert( - scans.head.nativeOp.getNativeScan - .getFilePartitions(0) - .getPartitionedFile(0) - .getFilePath - .startsWith("hdfs://")) } test("test native_datafusion scan on hdfs") { @@ -69,10 +73,13 @@ class ParquetReadFromHdfsSuite { val testFilePath = dir.toString writeTestParquetFile(testFilePath) - withSQLConf(CometConf.COMET_NATIVE_SCAN_IMPL.key -> CometConf.SCAN_NATIVE_DATAFUSION) { - val df = spark.read.format("parquet").load(testFilePath).agg(sum(col("id"))) - assertCometNativeScanOnHDFS(df) - assert(df.first().getLong(0) == 499500) + Seq(CometConf.SCAN_NATIVE_DATAFUSION, CometConf.SCAN_NATIVE_ICEBERG_COMPAT).foreach { + scanImpl => + withSQLConf(CometConf.COMET_NATIVE_SCAN_IMPL.key -> scanImpl) { + val df = spark.read.format("parquet").load(testFilePath).agg(sum(col("id"))) + assertCometNativeScanOnHDFS(df) + assert(df.first().getLong(0) == 499500) + } } } } From c2f665fc3ed00bbd2fbe9221baebf6866d6aeaf4 Mon Sep 17 00:00:00 2001 From: wforget <643348094@qq.com> Date: Mon, 15 Sep 2025 19:46:44 +0800 Subject: [PATCH 14/16] test macos --- .github/workflows/pr_build_macos.yml | 36 ++++++++++++++++++++++++++++ 1 file changed, 36 insertions(+) diff --git a/.github/workflows/pr_build_macos.yml b/.github/workflows/pr_build_macos.yml index 6c71006e5f..cf9a59f471 100644 --- a/.github/workflows/pr_build_macos.yml +++ b/.github/workflows/pr_build_macos.yml @@ -131,3 +131,39 @@ jobs: artifact_name: ${{ matrix.os }}-${{ matrix.profile.name }}-${{ matrix.suite.name }}-${{ github.run_id }}-${{ github.run_number }}-${{ github.run_attempt }} suites: ${{ matrix.suite.value }} maven_opts: ${{ matrix.profile.maven_opts }} + + # Java tests with native features + macos-aarch64-test-features: + strategy: + matrix: + os: [macos-14] + java_version: [17] + features: + - value: "hdfs-opendal" + suites: | + org.apache.comet.parquet.ParquetReadFromFakeHadoopFsSuite + org.apache.comet.parquet.ParquetReadFromHdfsSuite + - value: "hdfs" + suites: | + org.apache.comet.parquet.ParquetReadFromHdfsSuite + fail-fast: false + name: ${{ matrix.os }}/java ${{ matrix.java_version }}-features [${{ matrix.features.value }}] + runs-on: ${{ matrix.os }} + + steps: + - uses: actions/checkout@v5 + - name: Setup Rust & Java toolchain + uses: ./.github/actions/setup-macos-builder + with: + rust-version: ${{env.RUST_VERSION}} + jdk-version: ${{ matrix.profile.java_version }} + jdk-architecture: aarch64 + protoc-architecture: aarch_64 + - name: Java test steps + uses: ./.github/actions/java-test + with: + artifact_name: ${{ matrix.os }}-java-${{ matrix.java_version }}-features-${{ matrix.features.value }}-${{ github.run_id }}-${{ github.run_number }}-${{ github.run_attempt }} + features: ${{ matrix.features.value }} + maven_opts: "-Dtest=none -Dfeatures=${{ matrix.features.value }}" + suites: ${{ matrix.features.suites }} + upload-test-reports: true From 7211891abd33f16254905ed31ef495029f8d1a94 Mon Sep 17 00:00:00 2001 From: wforget <643348094@qq.com> Date: Mon, 15 Sep 2025 19:57:50 +0800 Subject: [PATCH 15/16] fix --- .github/workflows/pr_build_macos.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/pr_build_macos.yml b/.github/workflows/pr_build_macos.yml index cf9a59f471..c446ae1818 100644 --- a/.github/workflows/pr_build_macos.yml +++ b/.github/workflows/pr_build_macos.yml @@ -156,7 +156,7 @@ jobs: uses: ./.github/actions/setup-macos-builder with: rust-version: ${{env.RUST_VERSION}} - jdk-version: ${{ matrix.profile.java_version }} + jdk-version: ${{ matrix.java_version }} jdk-architecture: aarch64 protoc-architecture: aarch_64 - name: Java test steps From cc529cb17b4d1ec54674599517e7f8f2b3ee90dd Mon Sep 17 00:00:00 2001 From: wforget <643348094@qq.com> Date: Tue, 16 Sep 2025 09:59:34 +0800 Subject: [PATCH 16/16] rebase --- .github/workflows/pr_build_linux.yml | 2 +- .github/workflows/pr_build_macos.yml | 2 +- .../org/apache/comet/parquet/ParquetReadFromHdfsSuite.scala | 2 +- .../src/test/scala/org/apache/spark/sql/CometTestBase.scala | 5 ----- 4 files changed, 3 insertions(+), 8 deletions(-) diff --git a/.github/workflows/pr_build_linux.yml b/.github/workflows/pr_build_linux.yml index b4fa851725..c5651dc101 100644 --- a/.github/workflows/pr_build_linux.yml +++ b/.github/workflows/pr_build_linux.yml @@ -206,6 +206,6 @@ jobs: with: artifact_name: ${{ matrix.os }}-java-${{ matrix.java_version }}-features-${{ matrix.features.value }}-${{ github.run_id }}-${{ github.run_number }}-${{ github.run_attempt }} features: ${{ matrix.features.value }} - maven_opts: "-Dtest=none -Dfeatures=${{ matrix.features.value }}" + maven_opts: "-Dtest=none" suites: ${{ matrix.features.suites }} upload-test-reports: true diff --git a/.github/workflows/pr_build_macos.yml b/.github/workflows/pr_build_macos.yml index c446ae1818..7dab979b36 100644 --- a/.github/workflows/pr_build_macos.yml +++ b/.github/workflows/pr_build_macos.yml @@ -164,6 +164,6 @@ jobs: with: artifact_name: ${{ matrix.os }}-java-${{ matrix.java_version }}-features-${{ matrix.features.value }}-${{ github.run_id }}-${{ github.run_number }}-${{ github.run_attempt }} features: ${{ matrix.features.value }} - maven_opts: "-Dtest=none -Dfeatures=${{ matrix.features.value }}" + maven_opts: "-Dtest=none" suites: ${{ matrix.features.suites }} upload-test-reports: true diff --git a/spark/src/test/scala/org/apache/comet/parquet/ParquetReadFromHdfsSuite.scala b/spark/src/test/scala/org/apache/comet/parquet/ParquetReadFromHdfsSuite.scala index 8ca5af30aa..719e03d6e9 100644 --- a/spark/src/test/scala/org/apache/comet/parquet/ParquetReadFromHdfsSuite.scala +++ b/spark/src/test/scala/org/apache/comet/parquet/ParquetReadFromHdfsSuite.scala @@ -67,7 +67,7 @@ class ParquetReadFromHdfsSuite } test("test native_datafusion scan on hdfs") { - assume(featureEnabled("hdfs") || featureEnabled("hdfs-opendal")) + assume(isFeatureEnabled("hdfs") || isFeatureEnabled("hdfs-opendal")) withTmpHdfsDir { dir => { diff --git a/spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala b/spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala index 2fb00066b1..b896943314 100644 --- a/spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala +++ b/spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala @@ -1147,9 +1147,4 @@ abstract class CometTestBase usingDataSourceExec(conf) && !CometConf.COMET_SCAN_ALLOW_INCOMPATIBLE.get(conf) } - - def featureEnabled(feature: String): Boolean = { - System.getProperty("feature", "").split(",").contains(feature) - } - }