diff --git a/.github/workflows/paimon.yml b/.github/workflows/paimon.yml new file mode 100644 index 000000000..e86e8e266 --- /dev/null +++ b/.github/workflows/paimon.yml @@ -0,0 +1,67 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +name: Paimon + +on: + workflow_dispatch: + push: + branches: + - master + - branch-* + pull_request: + branches: + - master + - branch-* + +concurrency: + group: paimon-${{ github.workflow }}-${{ github.event.pull_request.number || github.ref }} + cancel-in-progress: true + +jobs: + test-flink: + name: Test Paimon ${{ matrix.paimon }} + runs-on: ubuntu-24.04 + strategy: + fail-fast: false + matrix: + paimon: [ "1.2" ] + javaver: [ "8" ] + scalaver: [ "2.12" ] + module: [ "thirdparty/auron-paimon" ] + sparkver: [ "spark-3.5" ] + + steps: + - name: Checkout Auron + uses: actions/checkout@v4 + + - name: Setup Java and Maven cache + uses: actions/setup-java@v4 + with: + distribution: 'adopt-hotspot' + java-version: ${{ matrix.javaver }} + cache: 'maven' + + - name: Test Paimon Module + run: ./build/mvn -B test -pl ${{ matrix.module }} -am -Pscala-${{ matrix.scalaver }} -Ppaimon-${{ matrix.paimon }} -P${{ matrix.sparkver }} -Prelease + + - name: Upload reports + if: failure() + uses: actions/upload-artifact@v4 + with: + name: ${{ matrix.module }}-test-report + path: ${{ matrix.module }}/target/surefire-reports \ No newline at end of file diff --git a/Cargo.lock b/Cargo.lock index 83557970d..0998d37dc 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -348,8 +348,8 @@ version = "0.1.0" dependencies = [ "arrow", "auron-jni-bridge", + "auron-memmgr", "auron-serde", - "bytesize", "chrono", "datafusion", "datafusion-ext-commons", @@ -381,6 +381,23 @@ dependencies = [ "paste", ] +[[package]] +name = "auron-memmgr" +version = "0.1.0" +dependencies = [ + "async-trait", + "auron-jni-bridge", + "bytesize", + "datafusion", + "datafusion-ext-commons", + "jni", + "log", + "once_cell", + "parking_lot", + "procfs", + "tempfile", +] + [[package]] name = "auron-serde" version = "0.1.0" @@ -388,12 +405,10 @@ dependencies = [ "arrow", "base64", "datafusion", - "datafusion-ext-commons", "datafusion-ext-exprs", "datafusion-ext-functions", "datafusion-ext-plans", "datafusion-spark", - "log", "object_store", "parking_lot", "prost 0.14.1", @@ -452,9 +467,9 @@ checksum = "72b3254f16251a8381aa12e40e3c4d2f0199f8c6508fbecb9d91f575e0fbb8c6" [[package]] name = "bigdecimal" -version = "0.4.8" +version = "0.4.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1a22f228ab7a1b23027ccc6c350b72868017af7ea8356fbdf19f8d991c690013" +checksum = "560f42649de9fa436b73517378a147ec21f6c997a546581df4b4b31677828934" dependencies = [ "autocfg", "libm", @@ -1162,26 +1177,24 @@ version = "0.1.0" dependencies = [ "arrow", "arrow-schema", - "async-trait", "auron-jni-bridge", "bigdecimal", "byteorder", - "bytes 1.10.1", "chrono", "datafusion", - "futures", "itertools 0.14.0", "jni", "log", + "lz4_flex", "num", "once_cell", "paste", "rand", "smallvec 2.0.0-alpha.11", - "tempfile", "tokio", "transpose", "unchecked-index", + "zstd", ] [[package]] @@ -1189,14 +1202,12 @@ name = "datafusion-ext-exprs" version = "0.1.0" dependencies = [ "arrow", - "async-trait", "auron-jni-bridge", "datafusion", "datafusion-ext-commons", "itertools 0.14.0", "jni", "log", - "num", "once_cell", "parking_lot", ] @@ -1206,7 +1217,6 @@ name = "datafusion-ext-functions" version = "0.1.0" dependencies = [ "arrow", - "async-trait", "auron-jni-bridge", "datafusion", "datafusion-ext-commons", @@ -1226,6 +1236,7 @@ dependencies = [ "arrow-schema", "async-trait", "auron-jni-bridge", + "auron-memmgr", "base64", "bitvec", "byteorder", @@ -1238,14 +1249,13 @@ dependencies = [ "datafusion-ext-commons", "datafusion-ext-exprs", "datafusion-ext-functions", - "foldhash", + "foldhash 0.2.0", "futures", "futures-util", "hashbrown 0.14.5", "itertools 0.14.0", "jni", "log", - "lz4_flex", "num", "object_store", "once_cell", @@ -1256,10 +1266,8 @@ dependencies = [ "procfs", "rand", "smallvec 2.0.0-alpha.11", - "tempfile", "tokio", "unchecked-index", - "zstd", ] [[package]] @@ -1723,6 +1731,12 @@ version = "0.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d9c4f5dac5e15c24eb999c26181a6ca40b39fe946cbe4c263c7209467bc83af2" +[[package]] +name = "foldhash" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "77ce24cb58228fbb8aa041425bb1050850ac19177686ea6e0f41a70416f56fdb" + [[package]] name = "form_urlencoded" version = "1.2.1" @@ -1940,7 +1954,7 @@ checksum = "9229cfe53dfd69f0609a49f65461bd93001ea1ef889cd5529dd176593f5338a1" dependencies = [ "allocator-api2", "equivalent", - "foldhash", + "foldhash 0.1.5", ] [[package]] @@ -3091,23 +3105,22 @@ dependencies = [ [[package]] name = "procfs" -version = "0.17.0" +version = "0.18.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cc5b72d8145275d844d4b5f6d4e1eef00c8cd889edb6035c21675d1bb1f45c9f" +checksum = "25485360a54d6861439d60facef26de713b1e126bf015ec8f98239467a2b82f7" dependencies = [ "bitflags 2.9.1", "chrono", "flate2", - "hex", "procfs-core", - "rustix 0.38.44", + "rustix 1.0.8", ] [[package]] name = "procfs-core" -version = "0.17.0" +version = "0.18.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "239df02d8349b06fc07398a3a1697b06418223b1c7725085e801e7c0fc6a12ec" +checksum = "e6401bf7b6af22f78b563665d15a22e9aef27775b79b149a66ca022468a4e405" dependencies = [ "bitflags 2.9.1", "chrono", @@ -3966,7 +3979,7 @@ dependencies = [ "getrandom 0.3.3", "once_cell", "rustix 1.0.8", - "windows-sys 0.59.0", + "windows-sys 0.61.1", ] [[package]] @@ -4022,9 +4035,9 @@ dependencies = [ [[package]] name = "tikv-jemalloc-ctl" -version = "0.6.0" +version = "0.6.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f21f216790c8df74ce3ab25b534e0718da5a1916719771d3fec23315c99e468b" +checksum = "661f1f6a57b3a36dc9174a2c10f19513b4866816e13425d3e418b11cc37bc24c" dependencies = [ "libc", "paste", @@ -4033,9 +4046,9 @@ dependencies = [ [[package]] name = "tikv-jemalloc-sys" -version = "0.6.0+5.3.0-1-ge13ca993e8ccb9ba9847cc330696e02839f328f7" +version = "0.6.1+5.3.0-1-ge13ca993e8ccb9ba9847cc330696e02839f328f7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cd3c60906412afa9c2b5b5a48ca6a5abe5736aec9eb48ad05037a677e52e4e2d" +checksum = "cd8aa5b2ab86a2cefa406d889139c162cbb230092f7d1d7cbc1716405d852a3b" dependencies = [ "cc", "libc", @@ -4043,9 +4056,9 @@ dependencies = [ [[package]] name = "tikv-jemallocator" -version = "0.6.0" +version = "0.6.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4cec5ff18518d81584f477e9bfdf957f5bb0979b0bac3af4ca30b5b3ae2d2865" +checksum = "0359b4327f954e0567e69fb191cf1436617748813819c94b8cd4a431422d053a" dependencies = [ "libc", "tikv-jemalloc-sys", diff --git a/Cargo.toml b/Cargo.toml index b2bec6e02..085e2febb 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -24,6 +24,7 @@ members = [ "native-engine/auron", "native-engine/auron-jni-bridge", "native-engine/auron-serde", + "native-engine/auron-memmgr", ] [profile.release] @@ -48,6 +49,7 @@ overflow-checks = false auron = { path = "./native-engine/auron" } auron-jni-bridge = { path = "./native-engine/auron-jni-bridge" } auron-serde = { path = "./native-engine/auron-serde" } +auron-memmgr = { path = "./native-engine/auron-memmgr" } datafusion-ext-commons = { path = "./native-engine/datafusion-ext-commons" } datafusion-ext-exprs = { path = "./native-engine/datafusion-ext-exprs" } datafusion-ext-functions = { path = "./native-engine/datafusion-ext-functions" } @@ -75,14 +77,14 @@ serde_json = { version = "1.0.96" } # other dependencies async-trait = "0.1.89" base64 = "0.22.1" -bigdecimal = "0.4.8" +bigdecimal = "0.4.9" bitvec = "1.0.1" byteorder = "1.5.0" bytes = "1.10.1" bytesize = "2.1.0" chrono = "0.4.42" count-write = "0.1.0" -foldhash = "0.1.5" +foldhash = "0.2.0" futures = "0.3" futures-util = "0.3.31" hashbrown = "0.14.5" @@ -96,7 +98,7 @@ once_cell = "1.21.3" panic-message = "0.3.0" parking_lot = "0.12.5" paste = "1.0.15" -procfs = "0.17.0" +procfs = "0.18.0" prost = "0.14.1" rand = "0.9.2" smallvec = "2.0.0-alpha.11" diff --git a/auron-core/src/main/java/org/apache/auron/jni/AuronAdaptor.java b/auron-core/src/main/java/org/apache/auron/jni/AuronAdaptor.java index 71a6349e3..07326670a 100644 --- a/auron-core/src/main/java/org/apache/auron/jni/AuronAdaptor.java +++ b/auron-core/src/main/java/org/apache/auron/jni/AuronAdaptor.java @@ -16,7 +16,6 @@ */ package org.apache.auron.jni; -import java.io.File; import java.io.IOException; import java.nio.ByteBuffer; import org.apache.auron.configuration.AuronConfiguration; @@ -84,29 +83,21 @@ public boolean isTaskRunning() { * @return Absolute path of the created temporary file. * @throws IOException If the temporary file cannot be created. */ - public String getDirectWriteSpillToDiskFile() throws IOException { - File tempFile = File.createTempFile("auron-spill-", ".tmp"); - tempFile.deleteOnExit(); - return tempFile.getAbsolutePath(); - } + public abstract String getDirectWriteSpillToDiskFile() throws IOException; /** * Retrieves the context classloader of the current thread. * - * @return The context classloader of the current thread. + * @return For Spark, return TaskContext of the current thread. */ - public Object getThreadContext() { - return Thread.currentThread().getContextClassLoader(); - } + public abstract Object getThreadContext(); /** - * Sets the context classloader for the current thread. + * Sets the context for the current thread. * - * @param context The classloader to set as the context classloader. + * @param context For spark is TaskContext. */ - public void setThreadContext(Object context) { - Thread.currentThread().setContextClassLoader((ClassLoader) context); - } + public abstract void setThreadContext(Object context); /** * Retrieves the on-heap spill manager implementation. diff --git a/auron-core/src/main/java/org/apache/auron/jni/AuronCallNativeWrapper.java b/auron-core/src/main/java/org/apache/auron/jni/AuronCallNativeWrapper.java index cbdf1f86a..661d8070e 100644 --- a/auron-core/src/main/java/org/apache/auron/jni/AuronCallNativeWrapper.java +++ b/auron-core/src/main/java/org/apache/auron/jni/AuronCallNativeWrapper.java @@ -33,6 +33,11 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +/** + * A wrapper class for calling native functions in the Auron project. + * It handles initialization, loading data batches, and error handling. + * Provides methods to interact with the native execution runtime and process data batches. + */ public class AuronCallNativeWrapper { private static final Logger LOG = LoggerFactory.getLogger(AuronCallNativeWrapper.class); @@ -106,6 +111,7 @@ public AuronCallNativeWrapper( * @throws RuntimeException If the native runtime encounters an error during batch processing. */ public boolean loadNextBatch(Consumer batchConsumer) { + checkError(); // load next batch try { this.batchConsumer = batchConsumer; @@ -132,6 +138,10 @@ protected void importSchema(long ffiSchemaPtr) { } } + public Schema getArrowSchema() { + return arrowSchema; + } + protected void importBatch(long ffiArrayPtr) { if (nativeRuntimePtr == 0) { throw new RuntimeException("Native runtime is finalized"); @@ -172,15 +182,11 @@ protected byte[] getRawTaskDefinition() { return taskDefinition.toByteArray(); } - private synchronized void close() { + public synchronized void close() { if (nativeRuntimePtr != 0) { JniBridge.finalizeNative(nativeRuntimePtr); nativeRuntimePtr = 0; - try { - dictionaryProvider.close(); - } catch (Exception e) { - LOG.error("Error closing dictionary provider", e); - } + dictionaryProvider.close(); checkError(); } } diff --git a/auron-core/src/main/java/org/apache/auron/jni/JniBridge.java b/auron-core/src/main/java/org/apache/auron/jni/JniBridge.java index 6155cce10..4d7edbf8f 100644 --- a/auron-core/src/main/java/org/apache/auron/jni/JniBridge.java +++ b/auron-core/src/main/java/org/apache/auron/jni/JniBridge.java @@ -35,7 +35,7 @@ */ @SuppressWarnings("unused") public class JniBridge { - public static final ConcurrentHashMap resourcesMap = new ConcurrentHashMap<>(); + private static final ConcurrentHashMap resourcesMap = new ConcurrentHashMap<>(); private static final List directMXBeans = ManagementFactory.getPlatformMXBeans(BufferPoolMXBean.class); @@ -60,6 +60,10 @@ public static Object getResource(String key) { return resourcesMap.remove(key); } + public static void putResource(String key, Object value) { + resourcesMap.put(key, value); + } + public static FSDataInputWrapper openFileAsDataInputWrapper(FileSystem fs, String path) throws Exception { // the path is a URI string, so we need to convert it to a URI object return FSDataInputWrapper.wrap(fs.open(new Path(new URI(path)))); diff --git a/auron-core/src/main/java/org/apache/auron/memory/OnHeapSpillManager.java b/auron-core/src/main/java/org/apache/auron/memory/OnHeapSpillManager.java index 5f98f8e11..3627101a6 100644 --- a/auron-core/src/main/java/org/apache/auron/memory/OnHeapSpillManager.java +++ b/auron-core/src/main/java/org/apache/auron/memory/OnHeapSpillManager.java @@ -22,21 +22,21 @@ * Interface for managing on-heap spill operations. * This interface provides methods to handle memory spilling to disk when on-heap memory is insufficient. */ -public abstract class OnHeapSpillManager { +public interface OnHeapSpillManager { /** * Check if on-heap memory is available for allocation. * * @return true if on-heap memory is available, false otherwise */ - abstract boolean isOnHeapAvailable(); + boolean isOnHeapAvailable(); /** * Create a new spill operation and return its identifier. * * @return spill identifier for the newly created spill */ - abstract int newSpill(); + int newSpill(); /** * Write data from a ByteBuffer to the spill identified by spillId. @@ -44,7 +44,7 @@ public abstract class OnHeapSpillManager { * @param spillId the identifier of the spill to write to * @param buffer the ByteBuffer containing data to be written */ - abstract void writeSpill(int spillId, ByteBuffer buffer); + void writeSpill(int spillId, ByteBuffer buffer); /** * Read data from the spill identified by spillId into the provided ByteBuffer. @@ -53,7 +53,7 @@ public abstract class OnHeapSpillManager { * @param buffer the ByteBuffer to read data into * @return the number of bytes actually read */ - abstract int readSpill(int spillId, ByteBuffer buffer); + int readSpill(int spillId, ByteBuffer buffer); /** * Get the disk usage in bytes for the spill identified by spillId. @@ -61,7 +61,7 @@ public abstract class OnHeapSpillManager { * @param spillId the identifier of the spill * @return the disk usage in bytes */ - abstract long getSpillDiskUsage(int spillId); + long getSpillDiskUsage(int spillId); /** * Get the total disk I/O time in nanoseconds for the spill identified by spillId. @@ -69,14 +69,14 @@ public abstract class OnHeapSpillManager { * @param spillId the identifier of the spill * @return the disk I/O time in nanoseconds */ - abstract long getSpillDiskIOTime(int spillId); + long getSpillDiskIOTime(int spillId); /** * Release and clean up resources associated with the spill identified by spillId. * * @param spillId the identifier of the spill to release */ - abstract void releaseSpill(int spillId); + void releaseSpill(int spillId); /** * Get the disabled on-heap spill manager instance. @@ -87,37 +87,37 @@ public static OnHeapSpillManager getDisabledOnHeapSpillManager() { return new OnHeapSpillManager() { @Override - boolean isOnHeapAvailable() { + public boolean isOnHeapAvailable() { return false; } @Override - int newSpill() { + public int newSpill() { throw new UnsupportedOperationException(); } @Override - void writeSpill(int spillId, ByteBuffer buffer) { + public void writeSpill(int spillId, ByteBuffer buffer) { throw new UnsupportedOperationException(); } @Override - int readSpill(int spillId, ByteBuffer buffer) { + public int readSpill(int spillId, ByteBuffer buffer) { throw new UnsupportedOperationException(); } @Override - long getSpillDiskUsage(int spillId) { + public long getSpillDiskUsage(int spillId) { throw new UnsupportedOperationException(); } @Override - long getSpillDiskIOTime(int spillId) { + public long getSpillDiskIOTime(int spillId) { throw new UnsupportedOperationException(); } @Override - void releaseSpill(int spillId) { + public void releaseSpill(int spillId) { throw new UnsupportedOperationException(); } }; diff --git a/auron-core/src/test/java/org/apache/auron/jni/MockAuronAdaptor.java b/auron-core/src/test/java/org/apache/auron/jni/MockAuronAdaptor.java index 9d37486f5..6cb2407f3 100644 --- a/auron-core/src/test/java/org/apache/auron/jni/MockAuronAdaptor.java +++ b/auron-core/src/test/java/org/apache/auron/jni/MockAuronAdaptor.java @@ -16,6 +16,8 @@ */ package org.apache.auron.jni; +import java.io.File; +import java.io.IOException; import java.nio.ByteBuffer; import org.apache.auron.configuration.AuronConfiguration; import org.apache.auron.configuration.MockAuronConfiguration; @@ -31,6 +33,23 @@ public void loadAuronLib() { // Mock implementation, no need to load auron library } + @Override + public String getDirectWriteSpillToDiskFile() throws IOException { + File tempFile = File.createTempFile("auron-spill-", ".tmp"); + tempFile.deleteOnExit(); + return tempFile.getAbsolutePath(); + } + + @Override + public Object getThreadContext() { + return null; + } + + @Override + public void setThreadContext(Object context) { + // Mock implementation, no need to set thread context + } + @Override public AuronConfiguration getAuronConfiguration() { return new MockAuronConfiguration(); diff --git a/dev/docker-build/docker-compose.yml b/dev/docker-build/docker-compose.yml index 27bbc9229..2db0005ce 100644 --- a/dev/docker-build/docker-compose.yml +++ b/dev/docker-build/docker-compose.yml @@ -28,7 +28,7 @@ services: - ./../../:/auron:rw - ./../../target-docker:/auron/target:rw - ./../../target-docker/spark-extension-target:/auron/spark-extension/target:rw - - ./../../target-docker/spark-extension-shims-spark3-target:/auron/spark-extension-shims-spark3/target:rw + - ./../../target-docker/spark-extension-shims-spark-target:/auron/spark-extension-shims-spark/target:rw - ./../../target-docker/build-helper-proto-target:/auron/dev/mvn-build-helper/proto/target:rw - ./../../target-docker/build-helper-assembly-target:/auron/dev/mvn-build-helper/assembly/target:rw environment: diff --git a/native-engine/auron-jni-bridge/src/jni_bridge.rs b/native-engine/auron-jni-bridge/src/jni_bridge.rs index 85cb4d0c6..90614cc23 100644 --- a/native-engine/auron-jni-bridge/src/jni_bridge.rs +++ b/native-engine/auron-jni-bridge/src/jni_bridge.rs @@ -553,18 +553,16 @@ pub struct JniBridge<'a> { pub method_getContextClassLoader_ret: ReturnType, pub method_setContextClassLoader: JStaticMethodID, pub method_setContextClassLoader_ret: ReturnType, - pub method_getSparkEnvConfAsString: JStaticMethodID, - pub method_getSparkEnvConfAsString_ret: ReturnType, pub method_getResource: JStaticMethodID, pub method_getResource_ret: ReturnType, - pub method_getTaskContext: JStaticMethodID, - pub method_getTaskContext_ret: ReturnType, + pub method_getThreadContext: JStaticMethodID, + pub method_getThreadContext_ret: ReturnType, + pub method_setThreadContext: JStaticMethodID, + pub method_setThreadContext_ret: ReturnType, pub method_getTaskOnHeapSpillManager: JStaticMethodID, pub method_getTaskOnHeapSpillManager_ret: ReturnType, pub method_isTaskRunning: JStaticMethodID, pub method_isTaskRunning_ret: ReturnType, - pub method_isDriverSide: JStaticMethodID, - pub method_isDriverSide_ret: ReturnType, pub method_openFileAsDataInputWrapper: JStaticMethodID, pub method_openFileAsDataInputWrapper_ret: ReturnType, pub method_createFileAsDataOutputWrapper: JStaticMethodID, @@ -575,14 +573,12 @@ pub struct JniBridge<'a> { pub method_getTotalMemoryLimited_ret: ReturnType, pub method_getDirectWriteSpillToDiskFile: JStaticMethodID, pub method_getDirectWriteSpillToDiskFile_ret: ReturnType, - pub method_initNativeThread: JStaticMethodID, - pub method_initNativeThread_ret: ReturnType, pub method_getAuronUDFWrapperContext: JStaticMethodID, pub method_getAuronUDFWrapperContext_ret: ReturnType, } impl<'a> JniBridge<'a> { - pub const SIG_TYPE: &'static str = "org/apache/spark/sql/auron/JniBridge"; + pub const SIG_TYPE: &'static str = "org/apache/auron/jni/JniBridge"; pub fn new(env: &JNIEnv<'a>) -> JniResult> { let class = get_global_jclass(env, Self::SIG_TYPE)?; @@ -600,33 +596,32 @@ impl<'a> JniBridge<'a> { "(Ljava/lang/ClassLoader;)V", )?, method_setContextClassLoader_ret: ReturnType::Primitive(Primitive::Void), - method_getSparkEnvConfAsString: env.get_static_method_id( - class, - "getSparkEnvConfAsString", - "(Ljava/lang/String;)Ljava/lang/String;", - )?, - method_getSparkEnvConfAsString_ret: ReturnType::Object, method_getResource: env.get_static_method_id( class, "getResource", "(Ljava/lang/String;)Ljava/lang/Object;", )?, method_getResource_ret: ReturnType::Object, - method_getTaskContext: env.get_static_method_id( + method_getThreadContext: env.get_static_method_id( class, - "getTaskContext", - "()Lorg/apache/spark/TaskContext;", + "getThreadContext", + "()Ljava/lang/Object;", )?, - method_getTaskContext_ret: ReturnType::Object, + method_getThreadContext_ret: ReturnType::Object, + method_setThreadContext: env.get_static_method_id( + class, + "setThreadContext", + "(Ljava/lang/Object;)V", + )?, + method_setThreadContext_ret: ReturnType::Primitive(Primitive::Void), method_getTaskOnHeapSpillManager: env.get_static_method_id( class, "getTaskOnHeapSpillManager", - "()Lorg/apache/spark/sql/auron/memory/OnHeapSpillManager;", + "()Lorg/apache/auron/memory/OnHeapSpillManager;", )?, method_getTaskOnHeapSpillManager_ret: ReturnType::Object, method_isTaskRunning: env.get_static_method_id(class, "isTaskRunning", "()Z")?, method_isTaskRunning_ret: ReturnType::Primitive(Primitive::Boolean), - method_isDriverSide: env.get_static_method_id(class, "isDriverSide", "()Z")?, method_openFileAsDataInputWrapper: env.get_static_method_id( class, "openFileAsDataInputWrapper", @@ -639,7 +634,6 @@ impl<'a> JniBridge<'a> { "(Lorg/apache/hadoop/fs/FileSystem;Ljava/lang/String;)Lorg/apache/auron/hadoop/fs/FSDataOutputWrapper;", )?, method_createFileAsDataOutputWrapper_ret: ReturnType::Object, - method_isDriverSide_ret: ReturnType::Primitive(Primitive::Boolean), method_getDirectMemoryUsed: env.get_static_method_id( class, "getDirectMemoryUsed", @@ -658,12 +652,6 @@ impl<'a> JniBridge<'a> { "()Ljava/lang/String;", )?, method_getDirectWriteSpillToDiskFile_ret: ReturnType::Object, - method_initNativeThread: env.get_static_method_id( - class, - "initNativeThread", - "(Ljava/lang/ClassLoader;Lorg/apache/spark/TaskContext;)V", - )?, - method_initNativeThread_ret: ReturnType::Primitive(Primitive::Void), method_getAuronUDFWrapperContext: env.get_static_method_id( class, @@ -1420,7 +1408,7 @@ pub struct AuronCallNativeWrapper<'a> { pub method_setError_ret: ReturnType, } impl<'a> AuronCallNativeWrapper<'a> { - pub const SIG_TYPE: &'static str = "org/apache/spark/sql/auron/AuronCallNativeWrapper"; + pub const SIG_TYPE: &'static str = "org/apache/auron/jni/AuronCallNativeWrapper"; pub fn new(env: &JNIEnv<'a>) -> JniResult> { let class = get_global_jclass(env, Self::SIG_TYPE)?; @@ -1467,7 +1455,7 @@ pub struct AuronOnHeapSpillManager<'a> { pub method_releaseSpill_ret: ReturnType, } impl<'a> AuronOnHeapSpillManager<'a> { - pub const SIG_TYPE: &'static str = "org/apache/spark/sql/auron/memory/OnHeapSpillManager"; + pub const SIG_TYPE: &'static str = "org/apache/spark/sql/auron/memory/SparkOnHeapSpillManager"; pub fn new(env: &JNIEnv<'a>) -> JniResult> { let class = get_global_jclass(env, Self::SIG_TYPE)?; diff --git a/native-engine/auron-memmgr/Cargo.toml b/native-engine/auron-memmgr/Cargo.toml new file mode 100644 index 000000000..a5aff2ae6 --- /dev/null +++ b/native-engine/auron-memmgr/Cargo.toml @@ -0,0 +1,37 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +[package] +name = "auron-memmgr" +version = "0.1.0" +edition = "2024" + +[dependencies] +auron-jni-bridge = { workspace = true } +datafusion = { workspace = true } +datafusion-ext-commons = { workspace = true } + +async-trait = { workspace = true } +bytesize = { workspace = true } +jni = { workspace = true } +log = { workspace = true } +once_cell = { workspace = true } +tempfile = { workspace = true } +parking_lot = { workspace = true } + +[target.'cfg(target_os = "linux")'.dependencies] +procfs = { workspace = true } diff --git a/native-engine/datafusion-ext-plans/src/memmgr/mod.rs b/native-engine/auron-memmgr/src/lib.rs similarity index 99% rename from native-engine/datafusion-ext-plans/src/memmgr/mod.rs rename to native-engine/auron-memmgr/src/lib.rs index cb28f09c3..e9cb90311 100644 --- a/native-engine/datafusion-ext-plans/src/memmgr/mod.rs +++ b/native-engine/auron-memmgr/src/lib.rs @@ -13,6 +13,8 @@ // See the License for the specific language governing permissions and // limitations under the License. +#![feature(get_mut_unchecked)] + pub mod metrics; pub mod spill; diff --git a/native-engine/datafusion-ext-plans/src/memmgr/metrics.rs b/native-engine/auron-memmgr/src/metrics.rs similarity index 100% rename from native-engine/datafusion-ext-plans/src/memmgr/metrics.rs rename to native-engine/auron-memmgr/src/metrics.rs diff --git a/native-engine/datafusion-ext-plans/src/memmgr/spill.rs b/native-engine/auron-memmgr/src/spill.rs similarity index 98% rename from native-engine/datafusion-ext-plans/src/memmgr/spill.rs rename to native-engine/auron-memmgr/src/spill.rs index 54f706309..3e7ef0b45 100644 --- a/native-engine/datafusion-ext-plans/src/memmgr/spill.rs +++ b/native-engine/auron-memmgr/src/spill.rs @@ -27,14 +27,12 @@ use auron_jni_bridge::{ jni_get_string, jni_new_direct_byte_buffer, jni_new_global_ref, }; use datafusion::{common::Result, parquet::file::reader::Length, physical_plan::metrics::Time}; +use datafusion_ext_commons::io::ipc_compression::{IoCompressionReader, IoCompressionWriter}; use jni::{objects::GlobalRef, sys::jlong}; use log::warn; use once_cell::sync::OnceCell; -use crate::{ - common::ipc_compression::{IoCompressionReader, IoCompressionWriter}, - memmgr::metrics::SpillMetrics, -}; +use crate::metrics::SpillMetrics; pub type SpillCompressedReader<'a> = IoCompressionReader>>; pub type SpillCompressedWriter<'a> = IoCompressionWriter>>; @@ -89,7 +87,8 @@ fn spill_compression_codec() -> &'static str { } pub fn try_new_spill(spill_metrics: &SpillMetrics) -> Result> { - if !is_jni_bridge_inited() || jni_call_static!(JniBridge.isDriverSide() -> bool)? { + if !is_jni_bridge_inited() { + // is driver Ok(Box::new(FileSpill::try_new(spill_metrics)?)) } else { // use on heap spill if on-heap memory is available, otherwise use file spill diff --git a/native-engine/auron-serde/Cargo.toml b/native-engine/auron-serde/Cargo.toml index 12037e03d..adb63dbff 100644 --- a/native-engine/auron-serde/Cargo.toml +++ b/native-engine/auron-serde/Cargo.toml @@ -26,17 +26,15 @@ default = ["prost/no-recursion-limit"] [dependencies] arrow = { workspace = true } datafusion = { workspace = true } -datafusion-ext-commons = { workspace = true } datafusion-ext-exprs = { workspace = true } datafusion-ext-functions = { workspace = true } datafusion-ext-plans = { workspace = true } datafusion-spark = { workspace = true } base64 = { workspace = true } -log = { workspace = true } object_store = { workspace = true } -prost = { workspace = true } parking_lot = { workspace = true } +prost = { workspace = true } [build-dependencies] tonic-build = { workspace = true } diff --git a/native-engine/auron/Cargo.toml b/native-engine/auron/Cargo.toml index bb173ff15..9e3f1353c 100644 --- a/native-engine/auron/Cargo.toml +++ b/native-engine/auron/Cargo.toml @@ -33,12 +33,12 @@ http-service = [] [dependencies] arrow = { workspace = true } auron-jni-bridge = { workspace = true } +auron-memmgr = { workspace = true } auron-serde = { workspace = true } datafusion = { workspace = true } datafusion-ext-commons = { workspace = true } datafusion-ext-plans = { workspace = true } -bytesize = { workspace = true } futures = { workspace = true } jni = { workspace = true } log = { workspace = true } @@ -49,7 +49,7 @@ tokio = { workspace = true } chrono = { workspace = true } [dependencies.tikv-jemalloc-ctl] -version = "0.6.0" +version = "0.6.1" optional = true features = ["use_std"] @@ -59,7 +59,7 @@ optional = true features = ["stats", "profiling", "unprefixed_malloc_on_supported_platforms", "disable_initial_exec_tls"] [dependencies.tikv-jemallocator] -version = "0.6.0" +version = "0.6.1" optional = true features = ["disable_initial_exec_tls"] diff --git a/native-engine/auron/src/exec.rs b/native-engine/auron/src/exec.rs index d8e4bdf8f..de8d372a3 100644 --- a/native-engine/auron/src/exec.rs +++ b/native-engine/auron/src/exec.rs @@ -18,6 +18,7 @@ use auron_jni_bridge::{ jni_bridge::JavaClasses, *, }; +use auron_memmgr::MemManager; use datafusion::{ common::Result, error::DataFusionError, @@ -28,7 +29,6 @@ use datafusion::{ }, prelude::{SessionConfig, SessionContext}, }; -use datafusion_ext_plans::memmgr::MemManager; use jni::{ JNIEnv, objects::{JClass, JObject, JString}, @@ -39,7 +39,7 @@ use crate::{handle_unwinded_scope, logging::init_logging, rt::NativeExecutionRun #[allow(non_snake_case)] #[unsafe(no_mangle)] -pub extern "system" fn Java_org_apache_spark_sql_auron_JniBridge_callNative( +pub extern "system" fn Java_org_apache_auron_jni_JniBridge_callNative( env: JNIEnv, _: JClass, executor_memory_overhead: i64, @@ -113,7 +113,7 @@ pub extern "system" fn Java_org_apache_spark_sql_auron_JniBridge_callNative( #[allow(non_snake_case)] #[unsafe(no_mangle)] -pub extern "system" fn Java_org_apache_spark_sql_auron_JniBridge_nextBatch( +pub extern "system" fn Java_org_apache_auron_jni_JniBridge_nextBatch( _: JNIEnv, _: JClass, raw_ptr: i64, @@ -124,7 +124,7 @@ pub extern "system" fn Java_org_apache_spark_sql_auron_JniBridge_nextBatch( #[allow(non_snake_case)] #[unsafe(no_mangle)] -pub extern "system" fn Java_org_apache_spark_sql_auron_JniBridge_finalizeNative( +pub extern "system" fn Java_org_apache_auron_jni_JniBridge_finalizeNative( _: JNIEnv, _: JClass, raw_ptr: i64, @@ -135,7 +135,7 @@ pub extern "system" fn Java_org_apache_spark_sql_auron_JniBridge_finalizeNative( #[allow(non_snake_case)] #[unsafe(no_mangle)] -pub extern "system" fn Java_org_apache_spark_sql_auron_JniBridge_onExit(_: JNIEnv, _: JClass) { +pub extern "system" fn Java_org_apache_auron_jni_JniBridge_onExit(_: JNIEnv, _: JClass) { log::info!("exiting native environment"); if MemManager::initialized() { MemManager::get().dump_status(); diff --git a/native-engine/auron/src/rt.rs b/native-engine/auron/src/rt.rs index 4b33b6d83..7389e79a9 100644 --- a/native-engine/auron/src/rt.rs +++ b/native-engine/auron/src/rt.rs @@ -26,10 +26,8 @@ use arrow::{ }; use auron_jni_bridge::{ conf::{IntConf, SPARK_TASK_CPUS, TOKIO_WORKER_THREADS_PER_CPU}, - is_task_running, - jni_bridge::JavaClasses, - jni_call, jni_call_static, jni_convert_byte_array, jni_exception_check, jni_exception_occurred, - jni_new_global_ref, jni_new_object, jni_new_string, + is_task_running, jni_call, jni_call_static, jni_convert_byte_array, jni_exception_check, + jni_exception_occurred, jni_new_global_ref, jni_new_object, jni_new_string, }; use auron_serde::protobuf::TaskDefinition; use datafusion::{ @@ -105,17 +103,22 @@ impl NativeExecutionRuntime { // create tokio runtime // propagate classloader and task context to spawned children threads - let spark_task_context = jni_call_static!(JniBridge.getTaskContext() -> JObject)?; - let spark_task_context_global = jni_new_global_ref!(spark_task_context.as_obj())?; + let thread_context = jni_call_static!(JniBridge.getThreadContext() -> JObject)?; + let thread_context_global = jni_new_global_ref!(thread_context.as_obj())?; + // classloader + let classloader = jni_call_static!(JniBridge.getContextClassLoader() -> JObject)?; + let classloader_global = jni_new_global_ref!(classloader.as_obj())?; let mut tokio_runtime_builder = tokio::runtime::Builder::new_multi_thread(); tokio_runtime_builder .thread_name(format!( "auron-native-stage-{stage_id}-part-{partition_id}-tid-{tid}" )) .on_thread_start(move || { - let classloader = JavaClasses::get().classloader; let _ = jni_call_static!( - JniBridge.initNativeThread(classloader,spark_task_context_global.as_obj()) -> () + JniBridge.setContextClassLoader(classloader_global.as_obj()) -> () + ); + let _ = jni_call_static!( + JniBridge.setThreadContext(thread_context_global.as_obj()) -> () ); THREAD_STAGE_ID.set(stage_id); THREAD_PARTITION_ID.set(partition_id); diff --git a/native-engine/datafusion-ext-commons/Cargo.toml b/native-engine/datafusion-ext-commons/Cargo.toml index 3dc0c1765..7ca7fe6a0 100644 --- a/native-engine/datafusion-ext-commons/Cargo.toml +++ b/native-engine/datafusion-ext-commons/Cargo.toml @@ -29,24 +29,21 @@ arrow = { workspace = true } arrow-schema = { workspace = true } auron-jni-bridge = { workspace = true } datafusion = { workspace = true } - -async-trait = { workspace = true } bigdecimal = { workspace = true } byteorder = { workspace = true } -bytes = { workspace = true } chrono = { workspace = true } -futures = { workspace = true } itertools = { workspace = true } jni = { workspace = true } log = { workspace = true } +lz4_flex = { workspace = true } num = { workspace = true } once_cell = { workspace = true } paste = { workspace = true } smallvec = { workspace = true } -tempfile = { workspace = true } -transpose = { workspace = true } tokio = { workspace = true } +transpose = { workspace = true } unchecked-index = { workspace = true } +zstd = { workspace = true } [dev-dependencies] rand = { workspace = true } diff --git a/native-engine/datafusion-ext-plans/src/common/ipc_compression.rs b/native-engine/datafusion-ext-commons/src/io/ipc_compression.rs similarity index 99% rename from native-engine/datafusion-ext-plans/src/common/ipc_compression.rs rename to native-engine/datafusion-ext-commons/src/io/ipc_compression.rs index 6f0ebcff8..d3ea0b293 100644 --- a/native-engine/datafusion-ext-plans/src/common/ipc_compression.rs +++ b/native-engine/datafusion-ext-commons/src/io/ipc_compression.rs @@ -25,11 +25,12 @@ use auron_jni_bridge::{ }; use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt}; use datafusion::common::Result; -use datafusion_ext_commons::{ +use once_cell::sync::OnceCell; + +use crate::{ df_execution_err, io::{read_one_batch, write_one_batch}, }; -use once_cell::sync::OnceCell; pub struct IpcCompressionWriter { output: W, diff --git a/native-engine/datafusion-ext-commons/src/io/mod.rs b/native-engine/datafusion-ext-commons/src/io/mod.rs index 243ee57d3..a81601742 100644 --- a/native-engine/datafusion-ext-commons/src/io/mod.rs +++ b/native-engine/datafusion-ext-commons/src/io/mod.rs @@ -27,6 +27,7 @@ pub use scalar_serde::{read_scalar, write_scalar}; use crate::{UninitializedInit, arrow::cast::cast}; mod batch_serde; +pub mod ipc_compression; mod scalar_serde; pub fn write_one_batch(num_rows: usize, cols: &[ArrayRef], mut output: impl Write) -> Result<()> { diff --git a/native-engine/datafusion-ext-exprs/Cargo.toml b/native-engine/datafusion-ext-exprs/Cargo.toml index f9d60fc2e..4387bbdf7 100644 --- a/native-engine/datafusion-ext-exprs/Cargo.toml +++ b/native-engine/datafusion-ext-exprs/Cargo.toml @@ -23,7 +23,6 @@ resolver = "1" [dependencies] arrow = { workspace = true } -async-trait = "0.1.89" auron-jni-bridge = { workspace = true } datafusion = { workspace = true } datafusion-ext-commons = { workspace = true } @@ -31,6 +30,5 @@ datafusion-ext-commons = { workspace = true } itertools = { workspace = true } jni = { workspace = true } log = { workspace = true } -num = { workspace = true } once_cell = { workspace = true } parking_lot = { workspace = true } diff --git a/native-engine/datafusion-ext-exprs/src/spark_udf_wrapper.rs b/native-engine/datafusion-ext-exprs/src/spark_udf_wrapper.rs index 3c1a94fd5..e3c43e6e6 100644 --- a/native-engine/datafusion-ext-exprs/src/spark_udf_wrapper.rs +++ b/native-engine/datafusion-ext-exprs/src/spark_udf_wrapper.rs @@ -27,7 +27,7 @@ use arrow::{ record_batch::{RecordBatch, RecordBatchOptions}, }; use auron_jni_bridge::{ - is_task_running, jni_call, jni_new_direct_byte_buffer, jni_new_global_ref, jni_new_object, + is_task_running, jni_call, jni_call_static, jni_new_direct_byte_buffer, jni_new_global_ref, }; use datafusion::{ error::Result, @@ -94,7 +94,7 @@ impl SparkUDFWrapperExpr { .get_or_try_init(|| { let serialized_buf = jni_new_direct_byte_buffer!(&self.serialized)?; let jcontext_local = - jni_new_object!(SparkAuronUDFWrapperContext(serialized_buf.as_obj()))?; + jni_call_static!(JniBridge.getAuronUDFWrapperContext(serialized_buf.as_obj()) -> JObject)?; jni_new_global_ref!(jcontext_local.as_obj()) }) .cloned() diff --git a/native-engine/datafusion-ext-functions/Cargo.toml b/native-engine/datafusion-ext-functions/Cargo.toml index 495e4c7a5..98de05f8f 100644 --- a/native-engine/datafusion-ext-functions/Cargo.toml +++ b/native-engine/datafusion-ext-functions/Cargo.toml @@ -23,7 +23,6 @@ resolver = "1" [dependencies] arrow = { workspace = true } -async-trait = "0.1.89" auron-jni-bridge = { workspace = true } datafusion = { workspace = true } datafusion-ext-commons = { workspace = true } @@ -33,4 +32,4 @@ log = { workspace = true } num = { workspace = true } paste = { workspace = true } serde_json = { workspace = true } -sonic-rs = { workspace = true } \ No newline at end of file +sonic-rs = { workspace = true } diff --git a/native-engine/datafusion-ext-plans/Cargo.toml b/native-engine/datafusion-ext-plans/Cargo.toml index d452ffe01..e6dff4195 100644 --- a/native-engine/datafusion-ext-plans/Cargo.toml +++ b/native-engine/datafusion-ext-plans/Cargo.toml @@ -28,6 +28,7 @@ default = ["tokio/rt-multi-thread"] arrow = { workspace = true } arrow-schema = { workspace = true } auron-jni-bridge = { workspace = true } +auron-memmgr = { workspace = true } datafusion = { workspace = true } datafusion-datasource = { workspace = true } datafusion-datasource-parquet = { workspace = true } @@ -50,7 +51,6 @@ hashbrown = { workspace = true } itertools = { workspace = true } jni = { workspace = true } log = { workspace = true } -lz4_flex = { workspace = true } num = { workspace = true } object_store = { workspace = true } once_cell = { workspace = true } @@ -58,10 +58,8 @@ panic-message = { workspace = true } parking_lot = { workspace = true } paste = { workspace = true } smallvec = { workspace = true } -tempfile = { workspace = true } tokio = { workspace = true } unchecked-index = { workspace = true } -zstd = { workspace = true } [target.'cfg(target_os = "linux")'.dependencies] procfs = { workspace = true } diff --git a/native-engine/datafusion-ext-plans/src/agg/acc.rs b/native-engine/datafusion-ext-plans/src/agg/acc.rs index 1292c0c87..d3ce80493 100644 --- a/native-engine/datafusion-ext-plans/src/agg/acc.rs +++ b/native-engine/datafusion-ext-plans/src/agg/acc.rs @@ -23,6 +23,7 @@ use arrow::{ array::*, datatypes::{DataType, *}, }; +use auron_memmgr::spill::{SpillCompressedReader, SpillCompressedWriter}; use bitvec::{bitvec, vec::BitVec}; use byteorder::{ReadBytesExt, WriteBytesExt}; use datafusion::common::{Result, ScalarValue, utils::proxy::VecAllocExt}; @@ -33,11 +34,7 @@ use datafusion_ext_commons::{ }; use smallvec::SmallVec; -use crate::{ - agg::agg::IdxSelection, - idx_for, idx_with_iter, - memmgr::spill::{SpillCompressedReader, SpillCompressedWriter}, -}; +use crate::{agg::agg::IdxSelection, idx_for, idx_with_iter}; pub trait AccColumn: Send { fn as_any(&self) -> &dyn Any; diff --git a/native-engine/datafusion-ext-plans/src/agg/agg_table.rs b/native-engine/datafusion-ext-plans/src/agg/agg_table.rs index d2adb51bf..2e4333912 100644 --- a/native-engine/datafusion-ext-plans/src/agg/agg_table.rs +++ b/native-engine/datafusion-ext-plans/src/agg/agg_table.rs @@ -21,6 +21,10 @@ use std::{ use arrow::{record_batch::RecordBatch, row::Rows}; use async_trait::async_trait; +use auron_memmgr::{ + MemConsumer, MemConsumerInfo, MemManager, + spill::{Spill, SpillCompressedReader, SpillCompressedWriter, try_new_spill}, +}; use bytesize::ByteSize; use datafusion::{ common::{DataFusionError, Result}, @@ -52,10 +56,6 @@ use crate::{ execution_context::{ExecutionContext, WrappedRecordBatchSender}, timer_helper::TimerHelper, }, - memmgr::{ - MemConsumer, MemConsumerInfo, MemManager, - spill::{Spill, SpillCompressedReader, SpillCompressedWriter, try_new_spill}, - }, }; pub type OwnedKey = SmallVec; diff --git a/native-engine/datafusion-ext-plans/src/agg/avg.rs b/native-engine/datafusion-ext-plans/src/agg/avg.rs index f8f702166..008dd34bd 100644 --- a/native-engine/datafusion-ext-plans/src/agg/avg.rs +++ b/native-engine/datafusion-ext-plans/src/agg/avg.rs @@ -21,6 +21,7 @@ use std::{ }; use arrow::{array::*, datatypes::*}; +use auron_memmgr::spill::{SpillCompressedReader, SpillCompressedWriter}; use datafusion::{ common::{ Result, @@ -30,15 +31,12 @@ use datafusion::{ }; use datafusion_ext_commons::downcast_any; -use crate::{ - agg::{ - Agg, - acc::{AccColumn, AccColumnRef}, - agg::IdxSelection, - count::AggCount, - sum::AggSum, - }, - memmgr::spill::{SpillCompressedReader, SpillCompressedWriter}, +use crate::agg::{ + Agg, + acc::{AccColumn, AccColumnRef}, + agg::IdxSelection, + count::AggCount, + sum::AggSum, }; pub struct AggAvg { @@ -165,9 +163,12 @@ impl Agg for AggAvg { Ok(Arc::new(avgs.with_precision_and_scale(prec, scale)?)) } else { let counts = counts_zero_free; - Ok(arrow::compute::kernels::numeric::div( - &arrow::compute::cast(&sums, &DataType::Float64)?, - &arrow::compute::cast(&counts, &DataType::Float64)?, + Ok(arrow::compute::cast( + &arrow::compute::kernels::numeric::div( + &arrow::compute::cast(&sums, &DataType::Float64)?, + &arrow::compute::cast(&counts, &DataType::Float64)?, + )?, + &self.data_type, )?) } } diff --git a/native-engine/datafusion-ext-plans/src/agg/bloom_filter.rs b/native-engine/datafusion-ext-plans/src/agg/bloom_filter.rs index c7b4dd9f8..7c6f1da33 100644 --- a/native-engine/datafusion-ext-plans/src/agg/bloom_filter.rs +++ b/native-engine/datafusion-ext-plans/src/agg/bloom_filter.rs @@ -24,6 +24,7 @@ use arrow::{ array::{ArrayRef, AsArray, BinaryBuilder}, datatypes::{DataType, Int64Type}, }; +use auron_memmgr::spill::{SpillCompressedReader, SpillCompressedWriter}; use byteorder::{ReadBytesExt, WriteBytesExt}; use datafusion::{common::Result, physical_expr::PhysicalExprRef}; use datafusion_ext_commons::{ @@ -37,7 +38,6 @@ use crate::{ agg::IdxSelection, }, idx_for, idx_for_zipped, - memmgr::spill::{SpillCompressedReader, SpillCompressedWriter}, }; pub struct AggBloomFilter { diff --git a/native-engine/datafusion-ext-plans/src/agg/collect.rs b/native-engine/datafusion-ext-plans/src/agg/collect.rs index 0472eedef..6ddeed832 100644 --- a/native-engine/datafusion-ext-plans/src/agg/collect.rs +++ b/native-engine/datafusion-ext-plans/src/agg/collect.rs @@ -23,6 +23,7 @@ use std::{ }; use arrow::{array::*, datatypes::*}; +use auron_memmgr::spill::{SpillCompressedReader, SpillCompressedWriter}; use datafusion::{ common::{Result, ScalarValue}, physical_expr::PhysicalExprRef, @@ -41,7 +42,6 @@ use crate::{ agg::{Agg, IdxSelection}, }, idx_for, idx_for_zipped, - memmgr::spill::{SpillCompressedReader, SpillCompressedWriter}, }; pub type AggCollectSet = AggGenericCollect; @@ -650,10 +650,10 @@ fn acc_hash(value: impl AsRef<[u8]>) -> u64 { #[cfg(test)] mod tests { use arrow::datatypes::DataType; + use auron_memmgr::spill::Spill; use datafusion::common::ScalarValue; use super::*; - use crate::memmgr::spill::Spill; #[test] fn test_acc_set_append() { diff --git a/native-engine/datafusion-ext-plans/src/agg/count.rs b/native-engine/datafusion-ext-plans/src/agg/count.rs index 253bcf582..ab17c5d36 100644 --- a/native-engine/datafusion-ext-plans/src/agg/count.rs +++ b/native-engine/datafusion-ext-plans/src/agg/count.rs @@ -21,6 +21,7 @@ use std::{ }; use arrow::{array::*, datatypes::*}; +use auron_memmgr::spill::{SpillCompressedReader, SpillCompressedWriter}; use datafusion::{common::Result, physical_expr::PhysicalExprRef}; use datafusion_ext_commons::{ downcast_any, @@ -33,7 +34,6 @@ use crate::{ agg::{Agg, IdxSelection}, }, idx_for, idx_for_zipped, idx_with_iter, - memmgr::spill::{SpillCompressedReader, SpillCompressedWriter}, }; pub struct AggCount { diff --git a/native-engine/datafusion-ext-plans/src/agg/first.rs b/native-engine/datafusion-ext-plans/src/agg/first.rs index 494631c9e..b4bbc1159 100644 --- a/native-engine/datafusion-ext-plans/src/agg/first.rs +++ b/native-engine/datafusion-ext-plans/src/agg/first.rs @@ -21,6 +21,7 @@ use std::{ }; use arrow::{array::*, datatypes::*}; +use auron_memmgr::spill::{SpillCompressedReader, SpillCompressedWriter}; use datafusion::{ common::{Result, ScalarValue}, physical_expr::PhysicalExprRef, @@ -37,7 +38,6 @@ use crate::{ agg::IdxSelection, }, idx_for_zipped, - memmgr::spill::{SpillCompressedReader, SpillCompressedWriter}, }; pub struct AggFirst { diff --git a/native-engine/datafusion-ext-plans/src/agg/spark_udaf_wrapper.rs b/native-engine/datafusion-ext-plans/src/agg/spark_udaf_wrapper.rs index 2eda77663..deba5d344 100644 --- a/native-engine/datafusion-ext-plans/src/agg/spark_udaf_wrapper.rs +++ b/native-engine/datafusion-ext-plans/src/agg/spark_udaf_wrapper.rs @@ -30,6 +30,7 @@ use auron_jni_bridge::{ jni_bridge::LocalRef, jni_call, jni_get_byte_array_len, jni_get_byte_array_region, jni_new_direct_byte_buffer, jni_new_global_ref, jni_new_object, jni_new_prim_array, }; +use auron_memmgr::spill::{SpillCompressedReader, SpillCompressedWriter}; use datafusion::{ common::{DataFusionError, Result}, physical_expr::PhysicalExprRef, @@ -47,7 +48,6 @@ use crate::{ agg::{Agg, IdxSelection}, }, idx_for_zipped, - memmgr::spill::{SpillCompressedReader, SpillCompressedWriter}, }; pub struct SparkUDAFWrapper { diff --git a/native-engine/datafusion-ext-plans/src/agg_exec.rs b/native-engine/datafusion-ext-plans/src/agg_exec.rs index 80c828dc3..a77babd04 100644 --- a/native-engine/datafusion-ext-plans/src/agg_exec.rs +++ b/native-engine/datafusion-ext-plans/src/agg_exec.rs @@ -24,6 +24,7 @@ use arrow::{ datatypes::SchemaRef, }; use auron_jni_bridge::conf::{IntConf, UDAF_FALLBACK_NUM_UDAFS_TRIGGER_SORT_AGG}; +use auron_memmgr::MemManager; use datafusion::{ common::{Result, Statistics}, error::DataFusionError, @@ -50,7 +51,6 @@ use crate::{ }, common::{execution_context::ExecutionContext, timer_helper::TimerHelper}, expand_exec::ExpandExec, - memmgr::MemManager, project_exec::ProjectExec, sort_exec::create_default_ascending_sort_exec, }; @@ -418,6 +418,7 @@ mod test { datatypes::{DataType, Field, Schema}, record_batch::RecordBatch, }; + use auron_memmgr::MemManager; use datafusion::{ assert_batches_sorted_eq, common::{Result, ScalarValue}, @@ -435,7 +436,6 @@ mod test { agg::create_agg, }, agg_exec::AggExec, - memmgr::MemManager, }; fn build_table_i32( @@ -691,6 +691,7 @@ mod fuzztest { datatypes::{DataType, Float64Type, Int64Type}, record_batch::RecordBatch, }; + use auron_memmgr::MemManager; use datafusion::{ common::Result, physical_expr::expressions as phys_expr, @@ -708,7 +709,6 @@ mod fuzztest { sum::AggSum, }, agg_exec::AggExec, - memmgr::MemManager, }; #[tokio::test(flavor = "multi_thread", worker_threads = 1)] diff --git a/native-engine/datafusion-ext-plans/src/common/execution_context.rs b/native-engine/datafusion-ext-plans/src/common/execution_context.rs index b79b13811..15b6e69d0 100644 --- a/native-engine/datafusion-ext-plans/src/common/execution_context.rs +++ b/native-engine/datafusion-ext-plans/src/common/execution_context.rs @@ -30,6 +30,7 @@ use arrow::{ }; use arrow_schema::Schema; use auron_jni_bridge::{conf, conf::BooleanConf, is_task_running}; +use auron_memmgr::metrics::SpillMetrics; use datafusion::{ common::{DataFusionError, Result}, execution::{RecordBatchStream, SendableRecordBatchStream, TaskContext}, @@ -62,7 +63,6 @@ use crate::{ }, timer_helper::TimerHelper, }, - memmgr::metrics::SpillMetrics, sort_exec::SortExec, }; diff --git a/native-engine/datafusion-ext-plans/src/common/mod.rs b/native-engine/datafusion-ext-plans/src/common/mod.rs index 2bbe09fcc..def20537b 100644 --- a/native-engine/datafusion-ext-plans/src/common/mod.rs +++ b/native-engine/datafusion-ext-plans/src/common/mod.rs @@ -16,7 +16,6 @@ pub mod cached_exprs_evaluator; pub mod column_pruning; pub mod execution_context; -pub mod ipc_compression; pub mod key_rows_output; pub mod offsetted; pub mod row_null_checker; diff --git a/native-engine/datafusion-ext-plans/src/expand_exec.rs b/native-engine/datafusion-ext-plans/src/expand_exec.rs index 8521eddca..9030e3cdf 100644 --- a/native-engine/datafusion-ext-plans/src/expand_exec.rs +++ b/native-engine/datafusion-ext-plans/src/expand_exec.rs @@ -195,6 +195,7 @@ mod test { datatypes::{DataType, Field, Schema}, record_batch::RecordBatch, }; + use auron_memmgr::MemManager; use datafusion::{ assert_batches_eq, common::{Result, ScalarValue}, @@ -204,7 +205,7 @@ mod test { prelude::SessionContext, }; - use crate::{expand_exec::ExpandExec, memmgr::MemManager}; + use crate::expand_exec::ExpandExec; // build i32 table fn build_table_i32(a: (&str, &Vec)) -> RecordBatch { diff --git a/native-engine/datafusion-ext-plans/src/ipc_reader_exec.rs b/native-engine/datafusion-ext-plans/src/ipc_reader_exec.rs index a1e391d8e..255e0e1f6 100644 --- a/native-engine/datafusion-ext-plans/src/ipc_reader_exec.rs +++ b/native-engine/datafusion-ext-plans/src/ipc_reader_exec.rs @@ -51,13 +51,15 @@ use datafusion_ext_commons::{ array_size::{ArraySize, BatchSize}, coalesce::coalesce_arrays_unchecked, }, - batch_size, df_execution_err, suggested_batch_mem_size, + batch_size, df_execution_err, + io::ipc_compression::IpcCompressionReader, + suggested_batch_mem_size, }; use jni::objects::{GlobalRef, JObject}; use once_cell::sync::OnceCell; use parking_lot::Mutex; -use crate::common::{execution_context::ExecutionContext, ipc_compression::IpcCompressionReader}; +use crate::common::execution_context::ExecutionContext; #[derive(Debug, Clone)] pub struct IpcReaderExec { diff --git a/native-engine/datafusion-ext-plans/src/ipc_writer_exec.rs b/native-engine/datafusion-ext-plans/src/ipc_writer_exec.rs index a28dfc77f..7c840fd71 100644 --- a/native-engine/datafusion-ext-plans/src/ipc_writer_exec.rs +++ b/native-engine/datafusion-ext-plans/src/ipc_writer_exec.rs @@ -32,14 +32,12 @@ use datafusion::{ stream::RecordBatchStreamAdapter, }, }; +use datafusion_ext_commons::io::ipc_compression::IpcCompressionWriter; use futures::{StreamExt, TryStreamExt, stream::once}; use jni::objects::{GlobalRef, JObject}; use once_cell::sync::OnceCell; -use crate::common::{ - execution_context::ExecutionContext, ipc_compression::IpcCompressionWriter, - timer_helper::TimerHelper, -}; +use crate::common::{execution_context::ExecutionContext, timer_helper::TimerHelper}; #[derive(Debug)] pub struct IpcWriterExec { diff --git a/native-engine/datafusion-ext-plans/src/joins/test.rs b/native-engine/datafusion-ext-plans/src/joins/test.rs index 06d2ad59b..9125ed53e 100644 --- a/native-engine/datafusion-ext-plans/src/joins/test.rs +++ b/native-engine/datafusion-ext-plans/src/joins/test.rs @@ -25,6 +25,7 @@ mod tests { datatypes::{DataType, Field, Schema, SchemaRef}, record_batch::RecordBatch, }; + use auron_memmgr::MemManager; use datafusion::{ assert_batches_sorted_eq, common::JoinSide, @@ -38,7 +39,6 @@ mod tests { broadcast_join_build_hash_map_exec::BroadcastJoinBuildHashMapExec, broadcast_join_exec::BroadcastJoinExec, joins::join_utils::{JoinType, JoinType::*}, - memmgr::MemManager, sort_merge_join_exec::SortMergeJoinExec, }; diff --git a/native-engine/datafusion-ext-plans/src/lib.rs b/native-engine/datafusion-ext-plans/src/lib.rs index 6d510ce20..f85f3b26a 100644 --- a/native-engine/datafusion-ext-plans/src/lib.rs +++ b/native-engine/datafusion-ext-plans/src/lib.rs @@ -48,9 +48,6 @@ pub mod sort_merge_join_exec; pub mod union_exec; pub mod window_exec; -// memory management -pub mod memmgr; - // helper modules pub mod common; pub mod generate; diff --git a/native-engine/datafusion-ext-plans/src/limit_exec.rs b/native-engine/datafusion-ext-plans/src/limit_exec.rs index 811104566..1b9f5892e 100644 --- a/native-engine/datafusion-ext-plans/src/limit_exec.rs +++ b/native-engine/datafusion-ext-plans/src/limit_exec.rs @@ -153,6 +153,7 @@ mod test { datatypes::{DataType, Field, Schema}, record_batch::RecordBatch, }; + use auron_memmgr::MemManager; use datafusion::{ assert_batches_eq, common::{Result, stats::Precision}, @@ -160,7 +161,7 @@ mod test { prelude::SessionContext, }; - use crate::{limit_exec::LimitExec, memmgr::MemManager}; + use crate::limit_exec::LimitExec; fn build_table_i32( a: (&str, &Vec), diff --git a/native-engine/datafusion-ext-plans/src/rss_shuffle_writer_exec.rs b/native-engine/datafusion-ext-plans/src/rss_shuffle_writer_exec.rs index 22c9687f0..bad864131 100644 --- a/native-engine/datafusion-ext-plans/src/rss_shuffle_writer_exec.rs +++ b/native-engine/datafusion-ext-plans/src/rss_shuffle_writer_exec.rs @@ -19,6 +19,7 @@ use std::{any::Any, fmt::Debug, sync::Arc}; use async_trait::async_trait; use auron_jni_bridge::{jni_call_static, jni_new_global_ref, jni_new_string}; +use auron_memmgr::MemManager; use datafusion::{ arrow::datatypes::SchemaRef, error::{DataFusionError, Result}, @@ -36,7 +37,6 @@ use once_cell::sync::OnceCell; use crate::{ common::execution_context::ExecutionContext, - memmgr::MemManager, shuffle::{ Partitioning, ShuffleRepartitioner, rss_single_repartitioner::RssSingleShuffleRepartitioner, diff --git a/native-engine/datafusion-ext-plans/src/shuffle/buffered_data.rs b/native-engine/datafusion-ext-plans/src/shuffle/buffered_data.rs index d6df4f191..6b88ba8fd 100644 --- a/native-engine/datafusion-ext-plans/src/shuffle/buffered_data.rs +++ b/native-engine/datafusion-ext-plans/src/shuffle/buffered_data.rs @@ -27,6 +27,7 @@ use datafusion_ext_commons::{ selection::{BatchInterleaver, create_batch_interleaver}, }, compute_suggested_batch_size_for_output, df_execution_err, + io::ipc_compression::IpcCompressionWriter, }; use itertools::Itertools; use jni::objects::GlobalRef; @@ -35,7 +36,6 @@ use parking_lot::Mutex; use crate::{ common::{ - ipc_compression::IpcCompressionWriter, offsetted::{Offsetted, OffsettedMergeIterator}, timer_helper::TimerHelper, }, diff --git a/native-engine/datafusion-ext-plans/src/shuffle/rss_single_repartitioner.rs b/native-engine/datafusion-ext-plans/src/shuffle/rss_single_repartitioner.rs index ca7ee73d8..a41f55167 100644 --- a/native-engine/datafusion-ext-plans/src/shuffle/rss_single_repartitioner.rs +++ b/native-engine/datafusion-ext-plans/src/shuffle/rss_single_repartitioner.rs @@ -17,14 +17,11 @@ use std::sync::Arc; use async_trait::async_trait; use datafusion::{arrow::record_batch::RecordBatch, common::Result, physical_plan::metrics::Time}; -use datafusion_ext_commons::df_execution_err; +use datafusion_ext_commons::{df_execution_err, io::ipc_compression::IpcCompressionWriter}; use jni::objects::GlobalRef; use parking_lot::Mutex; -use crate::{ - common::ipc_compression::IpcCompressionWriter, - shuffle::{ShuffleRepartitioner, rss::RssWriter}, -}; +use crate::shuffle::{ShuffleRepartitioner, rss::RssWriter}; pub struct RssSingleShuffleRepartitioner { rss_partition_writer: Arc>>, diff --git a/native-engine/datafusion-ext-plans/src/shuffle/rss_sort_repartitioner.rs b/native-engine/datafusion-ext-plans/src/shuffle/rss_sort_repartitioner.rs index 58d3ed0d2..b0b2c825c 100644 --- a/native-engine/datafusion-ext-plans/src/shuffle/rss_sort_repartitioner.rs +++ b/native-engine/datafusion-ext-plans/src/shuffle/rss_sort_repartitioner.rs @@ -17,15 +17,13 @@ use std::sync::Weak; use arrow::record_batch::RecordBatch; use async_trait::async_trait; +use auron_memmgr::{MemConsumer, MemConsumerInfo, MemManager}; use datafusion::{common::Result, physical_plan::metrics::Time}; use datafusion_ext_commons::arrow::array_size::BatchSize; use futures::lock::Mutex; use jni::objects::GlobalRef; -use crate::{ - memmgr::{MemConsumer, MemConsumerInfo, MemManager}, - shuffle::{Partitioning, ShuffleRepartitioner, buffered_data::BufferedData}, -}; +use crate::shuffle::{Partitioning, ShuffleRepartitioner, buffered_data::BufferedData}; pub struct RssSortShuffleRepartitioner { mem_consumer_info: Option>, diff --git a/native-engine/datafusion-ext-plans/src/shuffle/single_repartitioner.rs b/native-engine/datafusion-ext-plans/src/shuffle/single_repartitioner.rs index bfe87512c..d5f4f31dc 100644 --- a/native-engine/datafusion-ext-plans/src/shuffle/single_repartitioner.rs +++ b/native-engine/datafusion-ext-plans/src/shuffle/single_repartitioner.rs @@ -22,13 +22,11 @@ use std::{ use arrow::record_batch::RecordBatch; use async_trait::async_trait; use datafusion::{common::Result, physical_plan::metrics::Time}; +use datafusion_ext_commons::io::ipc_compression::IpcCompressionWriter; use tokio::sync::Mutex; use crate::{ - common::{ - ipc_compression::IpcCompressionWriter, - timer_helper::{TimedWriter, TimerHelper}, - }, + common::timer_helper::{TimedWriter, TimerHelper}, shuffle::{ShuffleRepartitioner, open_shuffle_file}, }; diff --git a/native-engine/datafusion-ext-plans/src/shuffle/sort_repartitioner.rs b/native-engine/datafusion-ext-plans/src/shuffle/sort_repartitioner.rs index 169ad212c..b76ed6314 100644 --- a/native-engine/datafusion-ext-plans/src/shuffle/sort_repartitioner.rs +++ b/native-engine/datafusion-ext-plans/src/shuffle/sort_repartitioner.rs @@ -20,6 +20,10 @@ use std::{ use arrow::record_batch::RecordBatch; use async_trait::async_trait; +use auron_memmgr::{ + MemConsumer, MemConsumerInfo, MemManager, + spill::{OwnedSpillBufReader, Spill, try_new_spill}, +}; use bytesize::ByteSize; use datafusion::{ common::{DataFusionError, Result}, @@ -34,10 +38,6 @@ use crate::{ offsetted::{Offsetted, OffsettedMergeIterator}, timer_helper::TimerHelper, }, - memmgr::{ - MemConsumer, MemConsumerInfo, MemManager, - spill::{OwnedSpillBufReader, Spill, try_new_spill}, - }, shuffle::{Partitioning, ShuffleRepartitioner, buffered_data::BufferedData, open_shuffle_file}, }; diff --git a/native-engine/datafusion-ext-plans/src/shuffle_writer_exec.rs b/native-engine/datafusion-ext-plans/src/shuffle_writer_exec.rs index afa16fd4b..32c2dc14e 100644 --- a/native-engine/datafusion-ext-plans/src/shuffle_writer_exec.rs +++ b/native-engine/datafusion-ext-plans/src/shuffle_writer_exec.rs @@ -19,6 +19,7 @@ use std::{any::Any, fmt::Debug, sync::Arc}; use arrow::datatypes::SchemaRef; use async_trait::async_trait; +use auron_memmgr::MemManager; use datafusion::{ error::Result, execution::context::TaskContext, @@ -36,7 +37,6 @@ use once_cell::sync::OnceCell; use crate::{ common::execution_context::ExecutionContext, - memmgr::MemManager, shuffle::{ Partitioning, ShuffleRepartitioner, single_repartitioner::SingleShuffleRepartitioner, sort_repartitioner::SortShuffleRepartitioner, diff --git a/native-engine/datafusion-ext-plans/src/sort_exec.rs b/native-engine/datafusion-ext-plans/src/sort_exec.rs index 2a201cdfd..24d0beb6b 100644 --- a/native-engine/datafusion-ext-plans/src/sort_exec.rs +++ b/native-engine/datafusion-ext-plans/src/sort_exec.rs @@ -33,6 +33,10 @@ use arrow::{ row::{RowConverter, Rows, SortField}, }; use async_trait::async_trait; +use auron_memmgr::{ + MemConsumer, MemConsumerInfo, MemManager, + spill::{Spill, SpillCompressedReader, SpillCompressedWriter, try_new_spill}, +}; use bytesize::ByteSize; use datafusion::{ common::{DataFusionError, Result, Statistics, utils::proxy::VecAllocExt}, @@ -62,19 +66,13 @@ use itertools::Itertools; use once_cell::sync::OnceCell; use parking_lot::Mutex; -use crate::{ - common::{ - execution_context::{ - ExecutionContext, WrappedRecordBatchSender, WrappedRecordBatchWithKeyRowsSender, - WrappedSenderTrait, - }, - key_rows_output::{RecordBatchWithKeyRows, SendableRecordBatchWithKeyRowsStream}, - timer_helper::TimerHelper, - }, - memmgr::{ - MemConsumer, MemConsumerInfo, MemManager, - spill::{Spill, SpillCompressedReader, SpillCompressedWriter, try_new_spill}, +use crate::common::{ + execution_context::{ + ExecutionContext, WrappedRecordBatchSender, WrappedRecordBatchWithKeyRowsSender, + WrappedSenderTrait, }, + key_rows_output::{RecordBatchWithKeyRows, SendableRecordBatchWithKeyRowsStream}, + timer_helper::TimerHelper, }; // reserve memory for each spill @@ -1412,6 +1410,7 @@ mod test { datatypes::{DataType, Field, Schema}, record_batch::RecordBatch, }; + use auron_memmgr::MemManager; use datafusion::{ assert_batches_eq, common::Result, @@ -1420,7 +1419,7 @@ mod test { prelude::SessionContext, }; - use crate::{memmgr::MemManager, sort_exec::SortExec}; + use crate::sort_exec::SortExec; fn build_table_i32( a: (&str, &Vec), @@ -1499,6 +1498,7 @@ mod fuzztest { compute::{SortOptions, concat_batches}, record_batch::RecordBatch, }; + use auron_memmgr::MemManager; use datafusion::{ common::{Result, stats::Precision}, physical_expr::{LexOrdering, PhysicalSortExpr, expressions::Column}, @@ -1506,7 +1506,7 @@ mod fuzztest { prelude::{SessionConfig, SessionContext}, }; - use crate::{memmgr::MemManager, sort_exec::SortExec}; + use crate::sort_exec::SortExec; #[tokio::test] async fn fuzztest_in_mem_sorting() -> Result<()> { diff --git a/pom.xml b/pom.xml index a717d7101..7c295bb54 100644 --- a/pom.xml +++ b/pom.xml @@ -49,7 +49,7 @@ 7.0.0-SNAPSHOT UTF-8 16.0.0 - 1.12.10 + 1.14.11 3.4.2 3.25.5 @@ -565,7 +565,7 @@ spark-3.0 spark-3.0 - spark-extension-shims-spark3 + spark-extension-shims-spark 3.0.8 3.0.3 @@ -575,7 +575,7 @@ spark-3.1 spark-3.1 - spark-extension-shims-spark3 + spark-extension-shims-spark 3.2.9 3.1.3 @@ -585,7 +585,7 @@ spark-3.2 spark-3.2 - spark-extension-shims-spark3 + spark-extension-shims-spark 3.2.9 3.2.4 @@ -595,7 +595,7 @@ spark-3.3 spark-3.3 - spark-extension-shims-spark3 + spark-extension-shims-spark 3.2.9 3.3.4 @@ -605,7 +605,7 @@ spark-3.4 spark-3.4 - spark-extension-shims-spark3 + spark-extension-shims-spark 3.2.9 3.4.4 @@ -615,7 +615,7 @@ spark-3.5 spark-3.5 - spark-extension-shims-spark3 + spark-extension-shims-spark 3.2.9 3.5.7 diff --git a/spark-extension-shims-spark3/pom.xml b/spark-extension-shims-spark/pom.xml similarity index 96% rename from spark-extension-shims-spark3/pom.xml rename to spark-extension-shims-spark/pom.xml index 267b6616b..20652d1b9 100644 --- a/spark-extension-shims-spark3/pom.xml +++ b/spark-extension-shims-spark/pom.xml @@ -25,9 +25,9 @@ ../pom.xml - spark-extension-shims-spark3_${scalaVersion} + spark-extension-shims-spark_${scalaVersion} jar - Apache Auron Spark Extension Shims Spark3_${scalaVersion} + Apache Auron Spark Extension Shims Spark ${sparkVersion}_${scalaVersion} Apache Auron Spark Extension Shims Project diff --git a/spark-extension-shims-spark3/src/main/java/org/apache/spark/sql/auron/ForceApplyShuffledHashJoinInjector.java b/spark-extension-shims-spark/src/main/java/org/apache/spark/sql/auron/ForceApplyShuffledHashJoinInjector.java similarity index 100% rename from spark-extension-shims-spark3/src/main/java/org/apache/spark/sql/auron/ForceApplyShuffledHashJoinInjector.java rename to spark-extension-shims-spark/src/main/java/org/apache/spark/sql/auron/ForceApplyShuffledHashJoinInjector.java diff --git a/spark-extension-shims-spark3/src/main/java/org/apache/spark/sql/auron/ForceApplyShuffledHashJoinInterceptor.java b/spark-extension-shims-spark/src/main/java/org/apache/spark/sql/auron/ForceApplyShuffledHashJoinInterceptor.java similarity index 100% rename from spark-extension-shims-spark3/src/main/java/org/apache/spark/sql/auron/ForceApplyShuffledHashJoinInterceptor.java rename to spark-extension-shims-spark/src/main/java/org/apache/spark/sql/auron/ForceApplyShuffledHashJoinInterceptor.java diff --git a/spark-extension-shims-spark3/src/main/java/org/apache/spark/sql/auron/ValidateSparkPlanApplyInterceptor.java b/spark-extension-shims-spark/src/main/java/org/apache/spark/sql/auron/ValidateSparkPlanApplyInterceptor.java similarity index 100% rename from spark-extension-shims-spark3/src/main/java/org/apache/spark/sql/auron/ValidateSparkPlanApplyInterceptor.java rename to spark-extension-shims-spark/src/main/java/org/apache/spark/sql/auron/ValidateSparkPlanApplyInterceptor.java diff --git a/spark-extension-shims-spark3/src/main/java/org/apache/spark/sql/auron/ValidateSparkPlanInjector.java b/spark-extension-shims-spark/src/main/java/org/apache/spark/sql/auron/ValidateSparkPlanInjector.java similarity index 100% rename from spark-extension-shims-spark3/src/main/java/org/apache/spark/sql/auron/ValidateSparkPlanInjector.java rename to spark-extension-shims-spark/src/main/java/org/apache/spark/sql/auron/ValidateSparkPlanInjector.java diff --git a/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/auron/InterceptedValidateSparkPlan.scala b/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/auron/InterceptedValidateSparkPlan.scala similarity index 100% rename from spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/auron/InterceptedValidateSparkPlan.scala rename to spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/auron/InterceptedValidateSparkPlan.scala diff --git a/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/auron/ShimsImpl.scala b/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/auron/ShimsImpl.scala similarity index 99% rename from spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/auron/ShimsImpl.scala rename to spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/auron/ShimsImpl.scala index 68cf36985..0ff2f5a52 100644 --- a/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/auron/ShimsImpl.scala +++ b/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/auron/ShimsImpl.scala @@ -685,7 +685,7 @@ class ShimsImpl extends Shims with Logging { // store fetch iterator in jni resource before native compute val jniResourceId = s"NativeShuffleReadExec:${UUID.randomUUID().toString}" - JniBridge.resourcesMap.put( + org.apache.auron.jni.JniBridge.putResource( jniResourceId, () => { reader.asInstanceOf[AuronBlockStoreShuffleReaderBase[_, _]].readIpc() @@ -778,7 +778,7 @@ class ShimsImpl extends Shims with Logging { // store fetch iterator in jni resource before native compute val jniResourceId = s"NativeShuffleReadExec:${UUID.randomUUID().toString}" - JniBridge.resourcesMap.put( + org.apache.auron.jni.JniBridge.putResource( jniResourceId, () => { reader.asInstanceOf[AuronBlockStoreShuffleReaderBase[_, _]].readIpc() @@ -871,7 +871,7 @@ class ShimsImpl extends Shims with Logging { // store fetch iterator in jni resource before native compute val jniResourceId = s"NativeShuffleReadExec:${UUID.randomUUID().toString}" - JniBridge.resourcesMap.put( + org.apache.auron.jni.JniBridge.putResource( jniResourceId, () => { reader.asInstanceOf[AuronBlockStoreShuffleReaderBase[_, _]].readIpc() diff --git a/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/auron/plan/ConvertToNativeExec.scala b/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/execution/auron/plan/ConvertToNativeExec.scala similarity index 100% rename from spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/auron/plan/ConvertToNativeExec.scala rename to spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/execution/auron/plan/ConvertToNativeExec.scala diff --git a/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeAggExec.scala b/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeAggExec.scala similarity index 100% rename from spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeAggExec.scala rename to spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeAggExec.scala diff --git a/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeBroadcastExchangeExec.scala b/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeBroadcastExchangeExec.scala similarity index 100% rename from spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeBroadcastExchangeExec.scala rename to spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeBroadcastExchangeExec.scala diff --git a/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeExpandExec.scala b/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeExpandExec.scala similarity index 100% rename from spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeExpandExec.scala rename to spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeExpandExec.scala diff --git a/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeFilterExec.scala b/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeFilterExec.scala similarity index 100% rename from spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeFilterExec.scala rename to spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeFilterExec.scala diff --git a/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeGenerateExec.scala b/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeGenerateExec.scala similarity index 100% rename from spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeGenerateExec.scala rename to spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeGenerateExec.scala diff --git a/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeGlobalLimitExec.scala b/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeGlobalLimitExec.scala similarity index 100% rename from spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeGlobalLimitExec.scala rename to spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeGlobalLimitExec.scala diff --git a/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeLocalLimitExec.scala b/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeLocalLimitExec.scala similarity index 100% rename from spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeLocalLimitExec.scala rename to spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeLocalLimitExec.scala diff --git a/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeOrcScanExec.scala b/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeOrcScanExec.scala similarity index 100% rename from spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeOrcScanExec.scala rename to spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeOrcScanExec.scala diff --git a/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeParquetInsertIntoHiveTableExec.scala b/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeParquetInsertIntoHiveTableExec.scala similarity index 100% rename from spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeParquetInsertIntoHiveTableExec.scala rename to spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeParquetInsertIntoHiveTableExec.scala diff --git a/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeParquetScanExec.scala b/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeParquetScanExec.scala similarity index 100% rename from spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeParquetScanExec.scala rename to spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeParquetScanExec.scala diff --git a/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeParquetSinkExec.scala b/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeParquetSinkExec.scala similarity index 100% rename from spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeParquetSinkExec.scala rename to spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeParquetSinkExec.scala diff --git a/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativePartialTakeOrderedExec.scala b/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativePartialTakeOrderedExec.scala similarity index 100% rename from spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativePartialTakeOrderedExec.scala rename to spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativePartialTakeOrderedExec.scala diff --git a/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeProjectExecProvider.scala b/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeProjectExecProvider.scala similarity index 100% rename from spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeProjectExecProvider.scala rename to spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeProjectExecProvider.scala diff --git a/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeRenameColumnsExecProvider.scala b/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeRenameColumnsExecProvider.scala similarity index 100% rename from spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeRenameColumnsExecProvider.scala rename to spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeRenameColumnsExecProvider.scala diff --git a/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeShuffleExchangeExec.scala b/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeShuffleExchangeExec.scala similarity index 100% rename from spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeShuffleExchangeExec.scala rename to spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeShuffleExchangeExec.scala diff --git a/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeSortExec.scala b/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeSortExec.scala similarity index 100% rename from spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeSortExec.scala rename to spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeSortExec.scala diff --git a/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeTakeOrderedExec.scala b/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeTakeOrderedExec.scala similarity index 100% rename from spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeTakeOrderedExec.scala rename to spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeTakeOrderedExec.scala diff --git a/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeUnionExec.scala b/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeUnionExec.scala similarity index 100% rename from spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeUnionExec.scala rename to spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeUnionExec.scala diff --git a/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeWindowExec.scala b/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeWindowExec.scala similarity index 100% rename from spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeWindowExec.scala rename to spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeWindowExec.scala diff --git a/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/auron/shuffle/AuronBlockStoreShuffleReader.scala b/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/execution/auron/shuffle/AuronBlockStoreShuffleReader.scala similarity index 100% rename from spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/auron/shuffle/AuronBlockStoreShuffleReader.scala rename to spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/execution/auron/shuffle/AuronBlockStoreShuffleReader.scala diff --git a/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/auron/shuffle/AuronRssShuffleManagerBase.scala b/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/execution/auron/shuffle/AuronRssShuffleManagerBase.scala similarity index 100% rename from spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/auron/shuffle/AuronRssShuffleManagerBase.scala rename to spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/execution/auron/shuffle/AuronRssShuffleManagerBase.scala diff --git a/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/auron/shuffle/AuronShuffleManager.scala b/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/execution/auron/shuffle/AuronShuffleManager.scala similarity index 100% rename from spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/auron/shuffle/AuronShuffleManager.scala rename to spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/execution/auron/shuffle/AuronShuffleManager.scala diff --git a/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/auron/shuffle/AuronShuffleWriter.scala b/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/execution/auron/shuffle/AuronShuffleWriter.scala similarity index 100% rename from spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/auron/shuffle/AuronShuffleWriter.scala rename to spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/execution/auron/shuffle/AuronShuffleWriter.scala diff --git a/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/joins/auron/plan/NativeBroadcastJoinExec.scala b/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/execution/joins/auron/plan/NativeBroadcastJoinExec.scala similarity index 100% rename from spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/joins/auron/plan/NativeBroadcastJoinExec.scala rename to spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/execution/joins/auron/plan/NativeBroadcastJoinExec.scala diff --git a/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/joins/auron/plan/NativeShuffledHashJoinExecProvider.scala b/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/execution/joins/auron/plan/NativeShuffledHashJoinExecProvider.scala similarity index 100% rename from spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/joins/auron/plan/NativeShuffledHashJoinExecProvider.scala rename to spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/execution/joins/auron/plan/NativeShuffledHashJoinExecProvider.scala diff --git a/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/joins/auron/plan/NativeSortMergeJoinExecProvider.scala b/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/execution/joins/auron/plan/NativeSortMergeJoinExecProvider.scala similarity index 100% rename from spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/joins/auron/plan/NativeSortMergeJoinExecProvider.scala rename to spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/execution/joins/auron/plan/NativeSortMergeJoinExecProvider.scala diff --git a/spark-extension-shims-spark3/src/test/scala/org/apache/spark/sql/auron/AuronAdaptiveQueryExecSuite.scala b/spark-extension-shims-spark/src/test/scala/org/apache/spark/sql/auron/AuronAdaptiveQueryExecSuite.scala similarity index 100% rename from spark-extension-shims-spark3/src/test/scala/org/apache/spark/sql/auron/AuronAdaptiveQueryExecSuite.scala rename to spark-extension-shims-spark/src/test/scala/org/apache/spark/sql/auron/AuronAdaptiveQueryExecSuite.scala diff --git a/spark-extension-shims-spark3/src/test/scala/org/apache/spark/sql/auron/AuronCheckConvertBroadcastExchangeSuite.scala b/spark-extension-shims-spark/src/test/scala/org/apache/spark/sql/auron/AuronCheckConvertBroadcastExchangeSuite.scala similarity index 100% rename from spark-extension-shims-spark3/src/test/scala/org/apache/spark/sql/auron/AuronCheckConvertBroadcastExchangeSuite.scala rename to spark-extension-shims-spark/src/test/scala/org/apache/spark/sql/auron/AuronCheckConvertBroadcastExchangeSuite.scala diff --git a/spark-extension-shims-spark3/src/test/scala/org/apache/spark/sql/auron/AuronCheckConvertShuffleExchangeSuite.scala b/spark-extension-shims-spark/src/test/scala/org/apache/spark/sql/auron/AuronCheckConvertShuffleExchangeSuite.scala similarity index 100% rename from spark-extension-shims-spark3/src/test/scala/org/apache/spark/sql/auron/AuronCheckConvertShuffleExchangeSuite.scala rename to spark-extension-shims-spark/src/test/scala/org/apache/spark/sql/auron/AuronCheckConvertShuffleExchangeSuite.scala diff --git a/spark-extension-shims-spark3/src/test/scala/org/apache/spark/sql/auron/AuronEmptyNativeRddSuite.scala b/spark-extension-shims-spark/src/test/scala/org/apache/spark/sql/auron/AuronEmptyNativeRddSuite.scala similarity index 100% rename from spark-extension-shims-spark3/src/test/scala/org/apache/spark/sql/auron/AuronEmptyNativeRddSuite.scala rename to spark-extension-shims-spark/src/test/scala/org/apache/spark/sql/auron/AuronEmptyNativeRddSuite.scala diff --git a/spark-extension-shims-spark3/src/test/scala/org/apache/spark/sql/auron/AuronFunctionSuite.scala b/spark-extension-shims-spark/src/test/scala/org/apache/spark/sql/auron/AuronFunctionSuite.scala similarity index 100% rename from spark-extension-shims-spark3/src/test/scala/org/apache/spark/sql/auron/AuronFunctionSuite.scala rename to spark-extension-shims-spark/src/test/scala/org/apache/spark/sql/auron/AuronFunctionSuite.scala diff --git a/spark-extension-shims-spark3/src/test/scala/org/apache/spark/sql/auron/AuronQuerySuite.scala b/spark-extension-shims-spark/src/test/scala/org/apache/spark/sql/auron/AuronQuerySuite.scala similarity index 100% rename from spark-extension-shims-spark3/src/test/scala/org/apache/spark/sql/auron/AuronQuerySuite.scala rename to spark-extension-shims-spark/src/test/scala/org/apache/spark/sql/auron/AuronQuerySuite.scala diff --git a/spark-extension-shims-spark3/src/test/scala/org/apache/spark/sql/auron/AuronSQLTestHelper.scala b/spark-extension-shims-spark/src/test/scala/org/apache/spark/sql/auron/AuronSQLTestHelper.scala similarity index 100% rename from spark-extension-shims-spark3/src/test/scala/org/apache/spark/sql/auron/AuronSQLTestHelper.scala rename to spark-extension-shims-spark/src/test/scala/org/apache/spark/sql/auron/AuronSQLTestHelper.scala diff --git a/spark-extension-shims-spark3/src/test/scala/org/apache/spark/sql/auron/BaseAuronSQLSuite.scala b/spark-extension-shims-spark/src/test/scala/org/apache/spark/sql/auron/BaseAuronSQLSuite.scala similarity index 100% rename from spark-extension-shims-spark3/src/test/scala/org/apache/spark/sql/auron/BaseAuronSQLSuite.scala rename to spark-extension-shims-spark/src/test/scala/org/apache/spark/sql/auron/BaseAuronSQLSuite.scala diff --git a/spark-extension-shims-spark3/src/test/scala/org/apache/spark/sql/auron/BuildInfoAuronSQLSuite.scala b/spark-extension-shims-spark/src/test/scala/org/apache/spark/sql/auron/BuildInfoAuronSQLSuite.scala similarity index 100% rename from spark-extension-shims-spark3/src/test/scala/org/apache/spark/sql/auron/BuildInfoAuronSQLSuite.scala rename to spark-extension-shims-spark/src/test/scala/org/apache/spark/sql/auron/BuildInfoAuronSQLSuite.scala diff --git a/spark-extension-shims-spark3/src/test/scala/org/apache/spark/sql/auron/BuildinfoInSparkUISuite.scala b/spark-extension-shims-spark/src/test/scala/org/apache/spark/sql/auron/BuildinfoInSparkUISuite.scala similarity index 88% rename from spark-extension-shims-spark3/src/test/scala/org/apache/spark/sql/auron/BuildinfoInSparkUISuite.scala rename to spark-extension-shims-spark/src/test/scala/org/apache/spark/sql/auron/BuildinfoInSparkUISuite.scala index 833ea7193..4b1cce199 100644 --- a/spark-extension-shims-spark3/src/test/scala/org/apache/spark/sql/auron/BuildinfoInSparkUISuite.scala +++ b/spark-extension-shims-spark/src/test/scala/org/apache/spark/sql/auron/BuildinfoInSparkUISuite.scala @@ -17,12 +17,18 @@ package org.apache.spark.sql.auron import org.apache.spark.sql.execution.ui.AuronSQLAppStatusListener +import org.apache.spark.util.Utils class BuildinfoInSparkUISuite extends org.apache.spark.sql.QueryTest with BuildInfoAuronSQLSuite with AuronSQLTestHelper { + override protected def beforeAll(): Unit = { + super.beforeAll() + val eventLogDir = Utils.createTempDir("/tmp/spark-events") + } + test("test build info in spark UI ") { val listeners = spark.sparkContext.listenerBus.findListenersByClass[AuronSQLAppStatusListener] assert(listeners.size === 1) diff --git a/spark-extension-shims-spark3/src/test/scala/org/apache/spark/sql/auron/EmptyNativeRddSuite.scala b/spark-extension-shims-spark/src/test/scala/org/apache/spark/sql/auron/EmptyNativeRddSuite.scala similarity index 100% rename from spark-extension-shims-spark3/src/test/scala/org/apache/spark/sql/auron/EmptyNativeRddSuite.scala rename to spark-extension-shims-spark/src/test/scala/org/apache/spark/sql/auron/EmptyNativeRddSuite.scala diff --git a/spark-extension-shims-spark3/src/test/scala/org/apache/spark/sql/auron/NativeConvertersSuite.scala b/spark-extension-shims-spark/src/test/scala/org/apache/spark/sql/auron/NativeConvertersSuite.scala similarity index 100% rename from spark-extension-shims-spark3/src/test/scala/org/apache/spark/sql/auron/NativeConvertersSuite.scala rename to spark-extension-shims-spark/src/test/scala/org/apache/spark/sql/auron/NativeConvertersSuite.scala diff --git a/spark-extension/src/main/java/org/apache/auron/jni/SparkAuronAdaptor.java b/spark-extension/src/main/java/org/apache/auron/jni/SparkAuronAdaptor.java new file mode 100644 index 000000000..c88f84570 --- /dev/null +++ b/spark-extension/src/main/java/org/apache/auron/jni/SparkAuronAdaptor.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.auron.jni; + +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; +import java.nio.file.Files; +import java.nio.file.StandardCopyOption; +import org.apache.auron.configuration.AuronConfiguration; +import org.apache.auron.functions.AuronUDFWrapperContext; +import org.apache.auron.memory.OnHeapSpillManager; +import org.apache.auron.spark.configuration.SparkAuronConfiguration; +import org.apache.auron.spark.sql.SparkAuronUDFWrapperContext; +import org.apache.spark.SparkEnv; +import org.apache.spark.SparkEnv$; +import org.apache.spark.TaskContext; +import org.apache.spark.TaskContext$; +import org.apache.spark.sql.auron.NativeHelper$; +import org.apache.spark.sql.auron.memory.SparkOnHeapSpillManager$; +import org.apache.spark.sql.auron.util.TaskContextHelper$; + +/** + * The adaptor for spark to call auron native library. + */ +public class SparkAuronAdaptor extends AuronAdaptor { + @Override + public void loadAuronLib() { + String libName = System.mapLibraryName("auron"); + ClassLoader classLoader = AuronAdaptor.class.getClassLoader(); + try { + InputStream libInputStream = classLoader.getResourceAsStream(libName); + File tempFile = File.createTempFile("libauron-", ".tmp"); + tempFile.deleteOnExit(); + Files.copy(libInputStream, tempFile.toPath(), StandardCopyOption.REPLACE_EXISTING); + System.load(tempFile.getAbsolutePath()); + } catch (IOException e) { + throw new IllegalStateException("error loading native libraries: " + e); + } + } + + @Override + public Object getThreadContext() { + return TaskContext$.MODULE$.get(); + } + + @Override + public void setThreadContext(Object context) { + TaskContext$.MODULE$.setTaskContext((TaskContext) context); + TaskContextHelper$.MODULE$.setNativeThreadName(); + TaskContextHelper$.MODULE$.setHDFSCallerContext(); + } + + @Override + public long getJVMTotalMemoryLimited() { + return NativeHelper$.MODULE$.totalMemory(); + } + + @Override + public String getDirectWriteSpillToDiskFile() { + return SparkEnv.get() + .blockManager() + .diskBlockManager() + .createTempLocalBlock() + ._2 + .getPath(); + } + + @Override + public OnHeapSpillManager getOnHeapSpillManager() { + return SparkOnHeapSpillManager$.MODULE$.current(); + } + + @Override + public AuronConfiguration getAuronConfiguration() { + return new SparkAuronConfiguration(SparkEnv$.MODULE$.get().conf()); + } + + @Override + public AuronUDFWrapperContext getAuronUDFWrapperContext(ByteBuffer udfSerialized) { + return new SparkAuronUDFWrapperContext(udfSerialized); + } +} diff --git a/spark-extension/src/main/java/org/apache/spark/sql/auron/JniBridge.java b/spark-extension/src/main/java/org/apache/spark/sql/auron/JniBridge.java index af035c7a2..3bea6e17a 100644 --- a/spark-extension/src/main/java/org/apache/spark/sql/auron/JniBridge.java +++ b/spark-extension/src/main/java/org/apache/spark/sql/auron/JniBridge.java @@ -27,51 +27,64 @@ import org.apache.auron.hadoop.fs.FSDataInputWrapper$; import org.apache.auron.hadoop.fs.FSDataOutputWrapper; import org.apache.auron.hadoop.fs.FSDataOutputWrapper$; +import org.apache.auron.memory.OnHeapSpillManager; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.spark.SparkEnv; import org.apache.spark.TaskContext; import org.apache.spark.TaskContext$; -import org.apache.spark.sql.auron.memory.OnHeapSpillManager; -import org.apache.spark.sql.auron.memory.OnHeapSpillManager$; +import org.apache.spark.sql.auron.memory.SparkOnHeapSpillManager$; import org.apache.spark.sql.auron.util.TaskContextHelper$; +/** + * This class has been deprecated and migrated to {@link org.apache.auron.jni.JniBridge}. + * Will be removed in the future. + */ +@Deprecated @SuppressWarnings("unused") public class JniBridge { + + @Deprecated public static final ConcurrentHashMap resourcesMap = new ConcurrentHashMap<>(); + @Deprecated public static native long callNative(long initNativeMemory, String logLevel, AuronCallNativeWrapper wrapper); + @Deprecated public static native boolean nextBatch(long ptr); + @Deprecated public static native void finalizeNative(long ptr); + @Deprecated public static native void onExit(); + @Deprecated public static ClassLoader getContextClassLoader() { return Thread.currentThread().getContextClassLoader(); } + @Deprecated public static void setContextClassLoader(ClassLoader cl) { Thread.currentThread().setContextClassLoader(cl); } - public static String getSparkEnvConfAsString(String key) { - return SparkEnv.get().conf().get(key); - } - + @Deprecated public static Object getResource(String key) { return resourcesMap.remove(key); } + @Deprecated public static TaskContext getTaskContext() { return TaskContext$.MODULE$.get(); } + @Deprecated public static OnHeapSpillManager getTaskOnHeapSpillManager() { - return OnHeapSpillManager$.MODULE$.current(); + return SparkOnHeapSpillManager$.MODULE$.current(); } + @Deprecated public static boolean isTaskRunning() { TaskContext tc = getTaskContext(); if (tc == null) { // driver is always running @@ -80,34 +93,35 @@ public static boolean isTaskRunning() { return !tc.isCompleted() && !tc.isInterrupted(); } - public static boolean isDriverSide() { - TaskContext tc = getTaskContext(); - return tc == null; - } - + @Deprecated public static FSDataInputWrapper openFileAsDataInputWrapper(FileSystem fs, String path) throws Exception { // the path is a URI string, so we need to convert it to a URI object, ref: // org.apache.spark.paths.SparkPath.toPath return FSDataInputWrapper$.MODULE$.wrap(fs.open(new Path(new URI(path)))); } + @Deprecated public static FSDataOutputWrapper createFileAsDataOutputWrapper(FileSystem fs, String path) throws Exception { return FSDataOutputWrapper$.MODULE$.wrap(fs.create(new Path(new URI(path)))); } + @Deprecated private static final List directMXBeans = ManagementFactory.getPlatformMXBeans(BufferPoolMXBean.class); + @Deprecated public static long getTotalMemoryLimited() { return NativeHelper$.MODULE$.totalMemory(); } + @Deprecated public static long getDirectMemoryUsed() { return directMXBeans.stream() .mapToLong(BufferPoolMXBean::getTotalCapacity) .sum(); } + @Deprecated public static String getDirectWriteSpillToDiskFile() { return SparkEnv.get() .blockManager() @@ -117,6 +131,7 @@ public static String getDirectWriteSpillToDiskFile() { .getPath(); } + @Deprecated public static void initNativeThread(ClassLoader cl, TaskContext tc) { setContextClassLoader(cl); TaskContext$.MODULE$.setTaskContext(tc); @@ -124,6 +139,7 @@ public static void initNativeThread(ClassLoader cl, TaskContext tc) { TaskContextHelper$.MODULE$.setHDFSCallerContext(); } + @Deprecated public static AuronUDFWrapperContext getAuronUDFWrapperContext(ByteBuffer udfSerialized) { throw new UnsupportedOperationException("This API is designed to support next-generation multi-engine."); } diff --git a/spark-extension/src/main/scala/org/apache/spark/sql/auron/AuronCallNativeWrapper.scala b/spark-extension/src/main/scala/org/apache/spark/sql/auron/AuronCallNativeWrapper.scala index 5cf2131bb..bf0918c40 100644 --- a/spark-extension/src/main/scala/org/apache/spark/sql/auron/AuronCallNativeWrapper.scala +++ b/spark-extension/src/main/scala/org/apache/spark/sql/auron/AuronCallNativeWrapper.scala @@ -49,6 +49,11 @@ import org.apache.auron.protobuf.PartitionId import org.apache.auron.protobuf.PhysicalPlanNode import org.apache.auron.protobuf.TaskDefinition +/** + * This class has been deprecated and migrated to {@link + * org.apache.auron.jni.AuronCallNativeWrapper}. Will be removed in the future. + */ +@Deprecated case class AuronCallNativeWrapper( nativePlan: PhysicalPlanNode, partition: Partition, @@ -94,6 +99,7 @@ case class AuronCallNativeWrapper( false } + @Deprecated override def next(): InternalRow = { val batchRow = batchRows(batchCurRowIdx) batchCurRowIdx += 1 @@ -104,13 +110,16 @@ case class AuronCallNativeWrapper( context.foreach(_.addTaskCompletionListener[Unit]((_: TaskContext) => close())) context.foreach(_.addTaskFailureListener((_, _) => close())) + @Deprecated def getRowIterator: Iterator[InternalRow] = { CompletionIterator[InternalRow, Iterator[InternalRow]](rowIterator, close()) } + @Deprecated protected def getMetrics: MetricNode = metrics + @Deprecated protected def importSchema(ffiSchemaPtr: Long): Unit = { Using.resource(ArrowSchema.wrap(ffiSchemaPtr)) { ffiSchema => arrowSchema = Data.importSchema(ROOT_ALLOCATOR, ffiSchema, dictionaryProvider) @@ -119,6 +128,7 @@ case class AuronCallNativeWrapper( } } + @Deprecated protected def importBatch(ffiArrayPtr: Long): Unit = { if (nativeRuntimePtr == 0) { throw new RuntimeException("Native runtime is finalized") @@ -137,10 +147,12 @@ case class AuronCallNativeWrapper( } } + @Deprecated protected def setError(error: Throwable): Unit = { this.error.set(error) } + @Deprecated protected def checkError(): Unit = { val throwable = error.getAndSet(null) if (throwable != null) { @@ -149,6 +161,7 @@ case class AuronCallNativeWrapper( } } + @Deprecated protected def getRawTaskDefinition: Array[Byte] = { val partitionId: PartitionId = PartitionId .newBuilder() diff --git a/spark-extension/src/main/scala/org/apache/spark/sql/auron/AuronConverters.scala b/spark-extension/src/main/scala/org/apache/spark/sql/auron/AuronConverters.scala index db3b710be..6c7f348a9 100644 --- a/spark-extension/src/main/scala/org/apache/spark/sql/auron/AuronConverters.scala +++ b/spark-extension/src/main/scala/org/apache/spark/sql/auron/AuronConverters.scala @@ -347,7 +347,11 @@ object AuronConverters extends Logging { } catch { case e @ (_: NotImplementedError | _: AssertionError | _: Exception) => - logWarning(s"Falling back exec: ${exec.getClass.getSimpleName}: ${e.getMessage}") + if (log.isDebugEnabled()) { + logWarning(s"Falling back exec: ${exec.getClass.getSimpleName}: ${e.getMessage}", e) + } else { + logWarning(s"Falling back exec: ${exec.getClass.getSimpleName}: ${e.getMessage}") + } val neverConvertReason = e match { case _: AssertionError => exec match { diff --git a/spark-extension/src/main/scala/org/apache/spark/sql/auron/NativeHelper.scala b/spark-extension/src/main/scala/org/apache/spark/sql/auron/NativeHelper.scala index eabe7ff6d..7e6bfdfc8 100644 --- a/spark-extension/src/main/scala/org/apache/spark/sql/auron/NativeHelper.scala +++ b/spark-extension/src/main/scala/org/apache/spark/sql/auron/NativeHelper.scala @@ -17,7 +17,9 @@ package org.apache.spark.sql.auron import scala.collection.immutable.TreeMap +import scala.collection.mutable.ArrayBuffer +import org.apache.arrow.vector.types.pojo.Schema import org.apache.hadoop.security.UserGroupInformation import org.apache.spark.Partition import org.apache.spark.SparkConf @@ -27,9 +29,15 @@ import org.apache.spark.TaskContext import org.apache.spark.internal.Logging import org.apache.spark.internal.config import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.UnsafeProjection import org.apache.spark.sql.execution.SparkPlan +import org.apache.spark.sql.execution.auron.arrowio.util.ArrowUtils +import org.apache.spark.sql.execution.auron.arrowio.util.ArrowUtils.ROOT_ALLOCATOR +import org.apache.spark.sql.execution.auron.columnar.ColumnarHelper import org.apache.spark.sql.execution.metric.SQLMetric import org.apache.spark.sql.execution.metric.SQLMetrics +import org.apache.spark.sql.types.StructType +import org.apache.spark.util.CompletionIterator import org.apache.auron.metric.SparkMetricNode import org.apache.auron.protobuf.PhysicalPlanNode @@ -89,7 +97,71 @@ object NativeHelper extends Logging { if (nativePlan == null) { return Iterator.empty } - AuronCallNativeWrapper(nativePlan, partition, context, metrics).getRowIterator + var auronCallNativeWrapper = new org.apache.auron.jni.AuronCallNativeWrapper( + ROOT_ALLOCATOR, + nativePlan, + metrics, + partition.index, + context.map(_.stageId()).getOrElse(0), + context.map(_.taskAttemptId().toInt).getOrElse(0), + NativeHelper.nativeMemory) + + context.foreach( + _.addTaskCompletionListener[Unit]((_: TaskContext) => auronCallNativeWrapper.close())) + context.foreach(_.addTaskFailureListener((_, _) => auronCallNativeWrapper.close())) + + val rowIterator = new Iterator[InternalRow] { + private var arrowSchema: Schema = _ + private var schema: StructType = _ + private var toUnsafe: UnsafeProjection = _ + private val batchRows: ArrayBuffer[InternalRow] = ArrayBuffer() + private var batchCurRowIdx = 0 + + override def hasNext: Boolean = { + // if current batch is not empty, return true + if (batchCurRowIdx < batchRows.length) { + return true + } + + // clear current batch + batchRows.clear() + batchCurRowIdx = 0 + + if (auronCallNativeWrapper.loadNextBatch(root => { + if (arrowSchema == null) { + arrowSchema = auronCallNativeWrapper.getArrowSchema + schema = ArrowUtils.fromArrowSchema(arrowSchema) + toUnsafe = UnsafeProjection.create(schema) + } + batchRows.append( + ColumnarHelper + .rootRowsIter(root) + .map(row => toUnsafe(row).copy().asInstanceOf[InternalRow]) + .toSeq: _*) + })) { + return hasNext + } + // clear current batch + arrowSchema = null + batchRows.clear() + batchCurRowIdx = 0 + false + } + + override def next(): InternalRow = { + val batchRow = batchRows(batchCurRowIdx) + batchCurRowIdx += 1 + batchRow + } + } + + CompletionIterator[InternalRow, Iterator[InternalRow]]( + rowIterator, + () -> { + synchronized { + auronCallNativeWrapper.close() + } + }) } def getDefaultNativeMetrics(sc: SparkContext): Map[String, SQLMetric] = { diff --git a/spark-extension/src/main/scala/org/apache/spark/sql/auron/Shims.scala b/spark-extension/src/main/scala/org/apache/spark/sql/auron/Shims.scala index fbac6a929..7ce1452af 100644 --- a/spark-extension/src/main/scala/org/apache/spark/sql/auron/Shims.scala +++ b/spark-extension/src/main/scala/org/apache/spark/sql/auron/Shims.scala @@ -55,6 +55,7 @@ import org.apache.spark.storage.BlockManagerId import org.apache.spark.storage.FileSegment import org.apache.auron.{protobuf => pb} +import org.apache.auron.jni.{AuronAdaptor, SparkAuronAdaptor} abstract class Shims { @@ -262,6 +263,7 @@ abstract class Shims { object Shims { lazy val get: Shims = { + AuronAdaptor.initInstance(new SparkAuronAdaptor) classOf[Shims].getClassLoader .loadClass("org.apache.spark.sql.auron.ShimsImpl") .newInstance() diff --git a/spark-extension/src/main/scala/org/apache/spark/sql/auron/SparkUDAFWrapperContext.scala b/spark-extension/src/main/scala/org/apache/spark/sql/auron/SparkUDAFWrapperContext.scala index a2be0ecc4..83b4bc8da 100644 --- a/spark-extension/src/main/scala/org/apache/spark/sql/auron/SparkUDAFWrapperContext.scala +++ b/spark-extension/src/main/scala/org/apache/spark/sql/auron/SparkUDAFWrapperContext.scala @@ -38,7 +38,7 @@ import org.apache.spark.internal.Logging import org.apache.spark.io.SnappyCompressionCodec import org.apache.spark.memory.MemoryConsumer import org.apache.spark.memory.MemoryMode -import org.apache.spark.sql.auron.memory.OnHeapSpillManager +import org.apache.spark.sql.auron.memory.SparkOnHeapSpillManager import org.apache.spark.sql.auron.util.Using import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.{AttributeReference, BoundReference, JoinedRow, Nondeterministic, UnsafeProjection, UnsafeRow} @@ -211,7 +211,7 @@ trait AggregateEvaluator[B, R <: BufferRowsColumn[B]] extends Logging { rows: R, indices: Iterator[Int], spillIdx: Long): Int = { - val hsm = OnHeapSpillManager.current + val hsm = SparkOnHeapSpillManager.current val spillId = memTracker.getSpill(spillIdx) val byteBuffer = ByteBuffer.wrap(serializeRows(rows, indices, spillCodec.compressedOutputStream)) @@ -224,7 +224,7 @@ trait AggregateEvaluator[B, R <: BufferRowsColumn[B]] extends Logging { memTracker: SparkUDAFMemTracker, spillBlockSize: Int, spillIdx: Long): BufferRowsColumn[B] = { - val hsm = OnHeapSpillManager.current + val hsm = SparkOnHeapSpillManager.current val spillId = memTracker.getSpill(spillIdx) val byteBuffer = ByteBuffer.allocate(spillBlockSize) val readSize = hsm.readSpill(spillId, byteBuffer).toLong @@ -539,7 +539,7 @@ class SparkUDAFMemTracker def getSpill(spillIdx: Long): Int = { this.spills.getOrElseUpdate( spillIdx, { - OnHeapSpillManager.current.newSpill() + SparkOnHeapSpillManager.current.newSpill() }) } diff --git a/spark-extension/src/main/scala/org/apache/spark/sql/auron/memory/OnHeapSpill.scala b/spark-extension/src/main/scala/org/apache/spark/sql/auron/memory/OnHeapSpill.scala index d634e6edf..541f4b395 100644 --- a/spark-extension/src/main/scala/org/apache/spark/sql/auron/memory/OnHeapSpill.scala +++ b/spark-extension/src/main/scala/org/apache/spark/sql/auron/memory/OnHeapSpill.scala @@ -20,7 +20,7 @@ import java.nio.ByteBuffer import org.apache.spark.internal.Logging -case class OnHeapSpill(hsm: OnHeapSpillManager, id: Int) extends Logging { +case class OnHeapSpill(hsm: SparkOnHeapSpillManager, id: Int) extends Logging { private var spillBuf: SpillBuf = new MemBasedSpillBuf def memUsed: Long = spillBuf.memUsed diff --git a/spark-extension/src/main/scala/org/apache/spark/sql/auron/memory/OnHeapSpillManager.scala b/spark-extension/src/main/scala/org/apache/spark/sql/auron/memory/SparkOnHeapSpillManager.scala similarity index 87% rename from spark-extension/src/main/scala/org/apache/spark/sql/auron/memory/OnHeapSpillManager.scala rename to spark-extension/src/main/scala/org/apache/spark/sql/auron/memory/SparkOnHeapSpillManager.scala index 2a898f491..01f037123 100644 --- a/spark-extension/src/main/scala/org/apache/spark/sql/auron/memory/OnHeapSpillManager.scala +++ b/spark-extension/src/main/scala/org/apache/spark/sql/auron/memory/SparkOnHeapSpillManager.scala @@ -32,13 +32,16 @@ import org.apache.spark.sql.auron.AuronConf import org.apache.spark.storage.BlockManager import org.apache.spark.util.Utils -class OnHeapSpillManager(taskContext: TaskContext) +import org.apache.auron.memory.OnHeapSpillManager + +class SparkOnHeapSpillManager(taskContext: TaskContext) extends MemoryConsumer( taskContext.taskMemoryManager, taskContext.taskMemoryManager.pageSizeBytes(), MemoryMode.ON_HEAP) + with OnHeapSpillManager with Logging { - import org.apache.spark.sql.auron.memory.OnHeapSpillManager._ + import org.apache.spark.sql.auron.memory.SparkOnHeapSpillManager._ private val _blockManager = SparkEnv.get.blockManager private val spills = ArrayBuffer[Option[OnHeapSpill]]() @@ -62,7 +65,11 @@ class OnHeapSpillManager(taskContext: TaskContext) * @return */ @SuppressWarnings(Array("unused")) - def isOnHeapAvailable: Boolean = { + override def isOnHeapAvailable: Boolean = { + // if driver, tc always null. + if (taskContext == null) { + return false + } val memoryPool = OnHeapSpillManagerHelper.getOnHeapExecutionMemoryPool val memoryUsed = memoryPool.memoryUsed val memoryFree = memoryPool.memoryFree @@ -92,7 +99,7 @@ class OnHeapSpillManager(taskContext: TaskContext) * @return * allocated spill id */ - def newSpill(): Int = { + override def newSpill(): Int = { synchronized { val spill = OnHeapSpill(this, spills.length) spills.append(Some(spill)) @@ -104,7 +111,7 @@ class OnHeapSpillManager(taskContext: TaskContext) } } - def writeSpill(spillId: Int, data: ByteBuffer): Unit = { + override def writeSpill(spillId: Int, data: ByteBuffer): Unit = { spills(spillId) .getOrElse( throw new RuntimeException( @@ -112,7 +119,7 @@ class OnHeapSpillManager(taskContext: TaskContext) .write(data) } - def readSpill(spillId: Int, buf: ByteBuffer): Int = { + override def readSpill(spillId: Int, buf: ByteBuffer): Int = { spills(spillId) .getOrElse( throw new RuntimeException( @@ -124,15 +131,15 @@ class OnHeapSpillManager(taskContext: TaskContext) spills(spillId).map(_.size).getOrElse(0) } - def getSpillDiskUsage(spillId: Int): Long = { + override def getSpillDiskUsage(spillId: Int): Long = { spills(spillId).map(_.diskUsed).getOrElse(0) } - def getSpillDiskIOTime(spillId: Int): Long = { + override def getSpillDiskIOTime(spillId: Int): Long = { spills(spillId).map(_.diskIOTime).getOrElse(0) // time unit: ns } - def releaseSpill(spillId: Int): Unit = { + override def releaseSpill(spillId: Int): Unit = { spills(spillId) match { case Some(spill) => spill.release() @@ -181,11 +188,11 @@ class OnHeapSpillManager(taskContext: TaskContext) } } -object OnHeapSpillManager extends Logging { +object SparkOnHeapSpillManager extends Logging { val all: mutable.Map[Long, OnHeapSpillManager] = concurrent.TrieMap[Long, OnHeapSpillManager]() def current: OnHeapSpillManager = { val tc = TaskContext.get - all.getOrElseUpdate(tc.taskAttemptId(), new OnHeapSpillManager(tc)) + all.getOrElseUpdate(tc.taskAttemptId(), new SparkOnHeapSpillManager(tc)) } } diff --git a/spark-extension/src/main/scala/org/apache/spark/sql/auron/memory/SpillBuf.scala b/spark-extension/src/main/scala/org/apache/spark/sql/auron/memory/SpillBuf.scala index 24adfbaa4..ba2a57492 100644 --- a/spark-extension/src/main/scala/org/apache/spark/sql/auron/memory/SpillBuf.scala +++ b/spark-extension/src/main/scala/org/apache/spark/sql/auron/memory/SpillBuf.scala @@ -81,7 +81,7 @@ class MemBasedSpillBuf extends SpillBuf with Logging { override def diskIOTime: Long = 0 override def size: Long = numWrittenBytes - def spill(hsm: OnHeapSpillManager): FileBasedSpillBuf = { + def spill(hsm: SparkOnHeapSpillManager): FileBasedSpillBuf = { logWarning(s"spilling in-mem spill buffer to disk, size=${Utils.bytesToString(size)}") val startTimeNs = System.nanoTime() diff --git a/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/ConvertToNativeBase.scala b/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/ConvertToNativeBase.scala index 6ea291a24..0d48ad275 100644 --- a/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/ConvertToNativeBase.scala +++ b/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/ConvertToNativeBase.scala @@ -21,7 +21,6 @@ import java.util.UUID import scala.collection.immutable.SortedMap import org.apache.spark.OneToOneDependency -import org.apache.spark.sql.auron.JniBridge import org.apache.spark.sql.auron.NativeConverters import org.apache.spark.sql.auron.NativeHelper import org.apache.spark.sql.auron.NativeRDD @@ -37,6 +36,7 @@ import org.apache.spark.sql.execution.metric.SQLMetric import org.apache.spark.sql.execution.metric.SQLMetrics import org.apache.spark.sql.types.StructType +import org.apache.auron.jni.JniBridge import org.apache.auron.metric.SparkMetricNode import org.apache.auron.protobuf.FFIReaderExecNode import org.apache.auron.protobuf.PhysicalPlanNode @@ -75,7 +75,7 @@ abstract class ConvertToNativeBase(override val child: SparkPlan) (partition, context) => { val inputRowIter = inputRDD.compute(partition, context) val resourceId = s"ConvertToNativeExec:${UUID.randomUUID().toString}" - JniBridge.resourcesMap.put(resourceId, new ArrowFFIExporter(inputRowIter, renamedSchema)) + JniBridge.putResource(resourceId, new ArrowFFIExporter(inputRowIter, renamedSchema)) PhysicalPlanNode .newBuilder() diff --git a/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeBroadcastExchangeBase.scala b/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeBroadcastExchangeBase.scala index b727a9299..b37dae194 100644 --- a/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeBroadcastExchangeBase.scala +++ b/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeBroadcastExchangeBase.scala @@ -36,7 +36,6 @@ import org.apache.spark.broadcast import org.apache.spark.broadcast.Broadcast import org.apache.spark.rdd.RDD import org.apache.spark.sql.SparkSession -import org.apache.spark.sql.auron.JniBridge import org.apache.spark.sql.auron.NativeConverters import org.apache.spark.sql.auron.NativeHelper import org.apache.spark.sql.auron.NativeRDD @@ -65,6 +64,7 @@ import org.apache.spark.sql.execution.metric.SQLMetrics import org.apache.spark.sql.types.BinaryType import org.apache.auron.{protobuf => pb} +import org.apache.auron.jni.JniBridge import org.apache.auron.metric.SparkMetricNode abstract class NativeBroadcastExchangeBase(mode: BroadcastMode, override val child: SparkPlan) @@ -178,7 +178,7 @@ abstract class NativeBroadcastExchangeBase(mode: BroadcastMode, override val chi }) } - JniBridge.resourcesMap.put(resourceId, () => provideIpcIterator()) + JniBridge.putResource(resourceId, () => provideIpcIterator()) pb.PhysicalPlanNode .newBuilder() .setIpcReader( @@ -211,7 +211,7 @@ abstract class NativeBroadcastExchangeBase(mode: BroadcastMode, override val chi override def compute(split: Partition, context: TaskContext): Iterator[Array[Byte]] = { val resourceId = s"ArrowBroadcastExchangeExec.input:${UUID.randomUUID()}" val bos = new ByteArrayOutputStream() - JniBridge.resourcesMap.put( + JniBridge.putResource( resourceId, (byteBuffer: ByteBuffer) => { val byteArray = new Array[Byte](byteBuffer.capacity()) @@ -322,7 +322,7 @@ object NativeBroadcastExchangeBase { override def close(): Unit = {} }) } - JniBridge.resourcesMap.put(readerIpcProviderResourceId, () => provideIpcIterator()) + JniBridge.putResource(readerIpcProviderResourceId, () => provideIpcIterator()) // output val bos = new ByteArrayOutputStream() @@ -331,7 +331,7 @@ object NativeBroadcastExchangeBase { byteBuffer.get(byteArray) bos.write(byteArray) } - JniBridge.resourcesMap.put(writerIpcProviderResourceId, consumeIpc) + JniBridge.putResource(writerIpcProviderResourceId, consumeIpc) // execute val singlePartition = new Partition { diff --git a/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeFileSourceScanBase.scala b/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeFileSourceScanBase.scala index 726c19c54..ad633a869 100644 --- a/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeFileSourceScanBase.scala +++ b/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeFileSourceScanBase.scala @@ -25,7 +25,6 @@ import org.apache.commons.lang3.reflect.MethodUtils import org.apache.hadoop.fs.FileSystem import org.apache.spark.broadcast.Broadcast import org.apache.spark.rdd.MapPartitionsRDD -import org.apache.spark.sql.auron.JniBridge import org.apache.spark.sql.auron.NativeConverters import org.apache.spark.sql.auron.NativeHelper import org.apache.spark.sql.auron.NativeSupports @@ -44,6 +43,7 @@ import org.apache.spark.sql.types.{DecimalType, NullType, StructField, StructTyp import org.apache.spark.util.SerializableConfiguration import org.apache.auron.{protobuf => pb} +import org.apache.auron.jni.JniBridge abstract class NativeFileSourceScanBase(basedFileScan: FileSourceScanExec) extends LeafExecNode @@ -131,7 +131,7 @@ abstract class NativeFileSourceScanBase(basedFileScan: FileSourceScanExec) resourceId: String, broadcastedHadoopConf: Broadcast[SerializableConfiguration]): Unit = { val sharedConf = broadcastedHadoopConf.value.value - JniBridge.resourcesMap.put( + JniBridge.putResource( resourceId, (location: String) => { val getFsTimeMetric = metrics("io_time_getfs") diff --git a/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeParquetSinkBase.scala b/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeParquetSinkBase.scala index c2bc49ca1..c66360b0a 100644 --- a/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeParquetSinkBase.scala +++ b/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeParquetSinkBase.scala @@ -37,7 +37,6 @@ import org.apache.hadoop.mapred.JobConf import org.apache.hadoop.mapreduce.Job import org.apache.spark.OneToOneDependency import org.apache.spark.sql.SparkSession -import org.apache.spark.sql.auron.JniBridge import org.apache.spark.sql.auron.NativeHelper import org.apache.spark.sql.auron.NativeRDD import org.apache.spark.sql.auron.NativeSupports @@ -52,6 +51,7 @@ import org.apache.spark.sql.execution.metric.SQLMetric import org.apache.spark.sql.hive.auron.HiveClientHelper import org.apache.spark.util.SerializableConfiguration +import org.apache.auron.jni.JniBridge import org.apache.auron.metric.SparkMetricNode import org.apache.auron.protobuf.ParquetProp import org.apache.auron.protobuf.ParquetSinkExecNode @@ -104,7 +104,7 @@ abstract class NativeParquetSinkBase( // init hadoop fs val resourceId = s"NativeParquetSinkExec:${UUID.randomUUID().toString}" - JniBridge.resourcesMap.put( + JniBridge.putResource( resourceId, (location: String) => { NativeHelper.currentUser.doAs(new PrivilegedExceptionAction[FileSystem] { diff --git a/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeShuffleExchangeBase.scala b/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeShuffleExchangeBase.scala index 47457a418..552dcff37 100644 --- a/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeShuffleExchangeBase.scala +++ b/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeShuffleExchangeBase.scala @@ -28,7 +28,6 @@ import org.apache.spark.{OneToOneDependency, Partitioner, RangePartitioner, Shuf import org.apache.spark.rdd.{PartitionPruningRDD, RDD} import org.apache.spark.serializer.Serializer import org.apache.spark.shuffle.ShuffleWriteProcessor -import org.apache.spark.sql.auron.JniBridge import org.apache.spark.sql.auron.NativeConverters import org.apache.spark.sql.auron.NativeHelper import org.apache.spark.sql.auron.NativeRDD @@ -49,6 +48,7 @@ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.ArrayType import org.apache.spark.util.{CompletionIterator, MutablePair} +import org.apache.auron.jni.JniBridge import org.apache.auron.metric.SparkMetricNode import org.apache.auron.protobuf.{IpcReaderExecNode, PhysicalExprNode, PhysicalHashRepartition, PhysicalPlanNode, PhysicalRangeRepartition, PhysicalRepartition, PhysicalRoundRobinRepartition, PhysicalSingleRepartition, PhysicalSortExprNode, Schema, SortExecNode} @@ -163,7 +163,7 @@ abstract class NativeShuffleExchangeBase( val ipcIterator = CompletionIterator[Object, Iterator[Object]]( reader.readIpc(), taskContext.taskMetrics().mergeShuffleReadMetrics()) - JniBridge.resourcesMap.put(jniResourceId, () => ipcIterator) + JniBridge.putResource(jniResourceId, () => ipcIterator) PhysicalPlanNode .newBuilder() diff --git a/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/shuffle/AuronRssShuffleWriterBase.scala b/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/shuffle/AuronRssShuffleWriterBase.scala index 19ba9bdfb..ef8267467 100644 --- a/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/shuffle/AuronRssShuffleWriterBase.scala +++ b/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/shuffle/AuronRssShuffleWriterBase.scala @@ -22,8 +22,9 @@ import org.apache.spark.{Partition, ShuffleDependency, SparkEnv, TaskContext} import org.apache.spark.scheduler.MapStatus import org.apache.spark.shuffle.{ShuffleHandle, ShuffleWriteMetricsReporter} import org.apache.spark.shuffle.ShuffleWriter -import org.apache.spark.sql.auron.{JniBridge, NativeHelper, NativeRDD, Shims} +import org.apache.spark.sql.auron.{NativeHelper, NativeRDD, Shims} +import org.apache.auron.jni.JniBridge import org.apache.auron.protobuf.{PhysicalPlanNode, RssShuffleWriterExecNode} import org.apache.auron.sparkver @@ -53,7 +54,7 @@ abstract class AuronRssShuffleWriterBase[K, V](metrics: ShuffleWriteMetricsRepor try { val jniResourceId = s"RssPartitionWriter:${UUID.randomUUID().toString}" - JniBridge.resourcesMap.put(jniResourceId, rpw) + JniBridge.putResource(jniResourceId, rpw) val nativeRssShuffleWriterExec = PhysicalPlanNode .newBuilder() .setRssShuffleWriter( diff --git a/spark-extension/src/main/scala/org/apache/spark/sql/hive/execution/auron/plan/NativeHiveTableScanBase.scala b/spark-extension/src/main/scala/org/apache/spark/sql/hive/execution/auron/plan/NativeHiveTableScanBase.scala index ce4b048f6..4c104d178 100644 --- a/spark-extension/src/main/scala/org/apache/spark/sql/hive/execution/auron/plan/NativeHiveTableScanBase.scala +++ b/spark-extension/src/main/scala/org/apache/spark/sql/hive/execution/auron/plan/NativeHiveTableScanBase.scala @@ -23,7 +23,6 @@ import scala.collection.JavaConverters._ import org.apache.hadoop.fs.FileSystem import org.apache.spark.broadcast.Broadcast -import org.apache.spark.sql.auron.JniBridge import org.apache.spark.sql.auron.NativeConverters import org.apache.spark.sql.auron.NativeHelper import org.apache.spark.sql.auron.NativeSupports @@ -43,6 +42,7 @@ import org.apache.spark.sql.types.StructType import org.apache.spark.util.SerializableConfiguration import org.apache.auron.{protobuf => pb} +import org.apache.auron.jni.JniBridge import org.apache.auron.sparkver abstract class NativeHiveTableScanBase(basedHiveScan: HiveTableScanExec) @@ -117,7 +117,7 @@ abstract class NativeHiveTableScanBase(basedHiveScan: HiveTableScanExec) resourceId: String, broadcastedHadoopConf: Broadcast[SerializableConfiguration]): Unit = { val sharedConf = broadcastedHadoopConf.value.value - JniBridge.resourcesMap.put( + JniBridge.putResource( resourceId, (location: String) => { val getFsTimeMetric = metrics("io_time_getfs") diff --git a/thirdparty/auron-celeborn-0.5/pom.xml b/thirdparty/auron-celeborn-0.5/pom.xml index d29bd0bfd..5c98b8f0e 100644 --- a/thirdparty/auron-celeborn-0.5/pom.xml +++ b/thirdparty/auron-celeborn-0.5/pom.xml @@ -32,7 +32,7 @@ org.apache.auron - spark-extension-shims-spark3_${scalaVersion} + spark-extension-shims-spark_${scalaVersion} ${project.version} diff --git a/thirdparty/auron-celeborn-0.6/pom.xml b/thirdparty/auron-celeborn-0.6/pom.xml index 1dc21dbab..b676e5cef 100644 --- a/thirdparty/auron-celeborn-0.6/pom.xml +++ b/thirdparty/auron-celeborn-0.6/pom.xml @@ -32,7 +32,7 @@ org.apache.auron - spark-extension-shims-spark3_${scalaVersion} + spark-extension-shims-spark_${scalaVersion} ${project.version} diff --git a/thirdparty/auron-uniffle/pom.xml b/thirdparty/auron-uniffle/pom.xml index 04d00666a..9c5b95aba 100644 --- a/thirdparty/auron-uniffle/pom.xml +++ b/thirdparty/auron-uniffle/pom.xml @@ -32,7 +32,7 @@ org.apache.auron - spark-extension-shims-spark3_${scalaVersion} + spark-extension-shims-spark_${scalaVersion} ${project.version}