diff --git a/.github/workflows/build_and_test.yml b/.github/workflows/build_and_test.yml index f7061e81e544b..9e32937f7095e 100644 --- a/.github/workflows/build_and_test.yml +++ b/.github/workflows/build_and_test.yml @@ -25,7 +25,7 @@ on: java: required: false type: string - default: 17 + default: 21 branch: description: Branch to run the build against required: false @@ -372,7 +372,7 @@ jobs: # Hive "other tests" test needs larger metaspace size based on experiment. if [[ "$MODULES_TO_TEST" == "hive" ]] && [[ "$EXCLUDED_TAGS" == "org.apache.spark.tags.SlowHiveTest" ]]; then export METASPACE_SIZE=2g; fi # SPARK-46283: should delete the following env replacement after SPARK 3.x EOL - if [[ "$MODULES_TO_TEST" == *"streaming-kinesis-asl"* ]] && [[ "${{ inputs.branch }}" =~ ^branch-3 ]]; then + if [[ "$MODULES_TO_TEST" == *"streaming-kinesis-asl"* ]] && [[ "${{ inputs.branch }}" =~ ^branch-3 ]]; then MODULES_TO_TEST=${MODULES_TO_TEST//streaming-kinesis-asl, /} fi export SERIAL_SBT_TESTS=1 diff --git a/common/utils/src/main/scala/org/apache/spark/util/ClosureCleaner.scala b/common/utils/src/main/scala/org/apache/spark/util/ClosureCleaner.scala index 5ea3c9afa9c10..c9bd09abe28ed 100644 --- a/common/utils/src/main/scala/org/apache/spark/util/ClosureCleaner.scala +++ b/common/utils/src/main/scala/org/apache/spark/util/ClosureCleaner.scala @@ -418,22 +418,26 @@ private[spark] object ClosureCleaner extends Logging { logDebug(s" + fields accessed by starting closure: ${accessedFields.size} classes") accessedFields.foreach { f => logDebug(" " + f) } + // Instead of cloning and trying to update lambda reference, directly modify the original + // REPL object to null out unused fields. This works because REPL objects are not hidden + // classes and their fields can be modified. if (accessedFields(capturingClass).size < capturingClass.getDeclaredFields.length) { - // clone and clean the enclosing `this` only when there are fields to null out - logDebug(s" + cloning instance of REPL class ${capturingClass.getName}") - val clonedOuterThis = cloneAndSetFields( - parent = null, outerThis, capturingClass, accessedFields) - - val outerField = func.getClass.getDeclaredField("arg$1") - // SPARK-37072: When Java 17 is used and `outerField` is read-only, - // the content of `outerField` cannot be set by reflect api directly. - // But we can remove the `final` modifier of `outerField` before set value - // and reset the modifier after set value. - setFieldAndIgnoreModifiers(func, outerField, clonedOuterThis) + val fieldsToNull = capturingClass.getDeclaredFields.filterNot { field => + accessedFields(capturingClass).contains(field.getName) + } + + for (field <- fieldsToNull) { + try { + field.setAccessible(true) + field.set(outerThis, null) + } catch { + case _: Exception => + // Ignore failures to set fields - this is a best-effort cleanup + } + } } } - /** * Cleans up Ammonite closures and nulls out fields captured from cmd & cmd$Helper objects * but not actually accessed by the closure. To achieve this, it does: @@ -499,72 +503,45 @@ private[spark] object ClosureCleaner extends Logging { s"${accessedAmmCmdFields.size} classes") accessedAmmCmdFields.foreach { f => logTrace(" " + f) } - val cmdClones = Map[Class[_], AnyRef]() - for ((cmdClass, _) <- ammCmdInstances if !cmdClass.getName.contains("Helper")) { - logDebug(s" + Cloning instance of Ammonite command class ${cmdClass.getName}") - cmdClones(cmdClass) = instantiateClass(cmdClass, enclosingObject = null) - } - for ((cmdHelperClass, cmdHelperInstance) <- ammCmdInstances - if cmdHelperClass.getName.contains("Helper")) { - val cmdHelperOuter = cmdHelperClass.getDeclaredFields - .find(_.getName == "$outer") - .map { field => - field.setAccessible(true) - field.get(cmdHelperInstance) - } - val outerClone = cmdHelperOuter.flatMap(o => cmdClones.get(o.getClass)).orNull - logDebug(s" + Cloning instance of Ammonite command helper class ${cmdHelperClass.getName}") - cmdClones(cmdHelperClass) = - instantiateClass(cmdHelperClass, enclosingObject = outerClone) - } + // Instead of cloning and trying to update lambda reference, directly modify the original + // REPL object to null out unused fields. This works because REPL objects are not hidden + // classes and their fields can be modified. + val capturingClass = outerThis.getClass + if (accessedFields(capturingClass).size < capturingClass.getDeclaredFields.length) { + val fieldsToNull = capturingClass.getDeclaredFields.filterNot { field => + accessedFields(capturingClass).contains(field.getName) + } - // set accessed fields - for ((_, cmdClone) <- cmdClones) { - val cmdClass = cmdClone.getClass - val accessedFields = accessedAmmCmdFields(cmdClass) - for (field <- cmdClone.getClass.getDeclaredFields - // outer fields were initialized during clone construction - if accessedFields.contains(field.getName) && field.getName != "$outer") { - // get command clone if exists, otherwise use an original field value - val value = cmdClones.getOrElse(field.getType, { + for (field <- fieldsToNull) { + try { field.setAccessible(true) - field.get(ammCmdInstances(cmdClass)) - }) - setFieldAndIgnoreModifiers(cmdClone, field, value) + field.set(outerThis, null) + } catch { + case _: Exception => + // Ignore failures to set fields - this is a best-effort cleanup + } } } - val outerThisClone = if (!isAmmoniteCommandOrHelper(outerThis.getClass)) { - // if outer class is not Ammonite helper / command object then is was not cloned - // in the code above. We still need to clone it and update accessed fields - logDebug(s" + Cloning instance of lambda capturing class ${outerThis.getClass.getName}") - val clone = cloneAndSetFields(parent = null, outerThis, outerThis.getClass, accessedFields) - // making sure that the code below will update references to Ammonite objects if they exist - for (field <- outerThis.getClass.getDeclaredFields) { - field.setAccessible(true) - cmdClones.get(field.getType).foreach { value => - setFieldAndIgnoreModifiers(clone, field, value) + // Also clean up Ammonite command fields if any + for ((cmdClass, cmdInstance) <- ammCmdInstances) { + val cmdAccessedFields = accessedAmmCmdFields.getOrElse(cmdClass, Set.empty) + if (cmdAccessedFields.size < cmdClass.getDeclaredFields.length) { + val fieldsToNull = cmdClass.getDeclaredFields.filterNot { field => + cmdAccessedFields.contains(field.getName) || field.getName == "$outer" + } + + for (field <- fieldsToNull) { + try { + field.setAccessible(true) + field.set(cmdInstance, null) + } catch { + case _: Exception => + // Ignore failures to set fields - this is a best-effort cleanup + } } } - clone - } else { - cmdClones(outerThis.getClass) } - - val outerField = func.getClass.getDeclaredField("arg$1") - // update lambda capturing class reference - setFieldAndIgnoreModifiers(func, outerField, outerThisClone) - } - - private def setFieldAndIgnoreModifiers(obj: AnyRef, field: Field, value: AnyRef): Unit = { - val modifiersField = getFinalModifiersFieldForJava17(field) - modifiersField - .foreach(m => m.setInt(field, field.getModifiers & ~Modifier.FINAL)) - field.setAccessible(true) - field.set(obj, value) - - modifiersField - .foreach(m => m.setInt(field, field.getModifiers | Modifier.FINAL)) } /** diff --git a/launcher/src/main/java/org/apache/spark/launcher/JavaModuleOptions.java b/launcher/src/main/java/org/apache/spark/launcher/JavaModuleOptions.java index cef426035e630..e0cfdb21dbbe3 100644 --- a/launcher/src/main/java/org/apache/spark/launcher/JavaModuleOptions.java +++ b/launcher/src/main/java/org/apache/spark/launcher/JavaModuleOptions.java @@ -43,7 +43,6 @@ public class JavaModuleOptions { "--add-opens=java.base/sun.security.action=ALL-UNNAMED", "--add-opens=java.base/sun.util.calendar=ALL-UNNAMED", "--add-opens=java.security.jgss/sun.security.krb5=ALL-UNNAMED", - "-Djdk.reflect.useDirectMethodHandle=false", "-Dio.netty.tryReflectionSetAccessible=true", "--enable-native-access=ALL-UNNAMED"}; diff --git a/pom.xml b/pom.xml index 077bbea4380a7..196f7ea57415d 100644 --- a/pom.xml +++ b/pom.xml @@ -334,7 +334,6 @@ --add-opens=java.base/sun.nio.cs=ALL-UNNAMED --add-opens=java.base/sun.security.action=ALL-UNNAMED --add-opens=java.base/sun.util.calendar=ALL-UNNAMED - -Djdk.reflect.useDirectMethodHandle=false -Dio.netty.tryReflectionSetAccessible=true --enable-native-access=ALL-UNNAMED diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index 77001e6bdf227..5737f0cca77bc 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -1669,7 +1669,6 @@ object TestSettings { "--add-opens=java.base/sun.nio.cs=ALL-UNNAMED", "--add-opens=java.base/sun.security.action=ALL-UNNAMED", "--add-opens=java.base/sun.util.calendar=ALL-UNNAMED", - "-Djdk.reflect.useDirectMethodHandle=false", "-Dio.netty.tryReflectionSetAccessible=true", "--enable-native-access=ALL-UNNAMED").mkString(" ") s"-Xmx$heapSize -Xss4m -XX:MaxMetaspaceSize=$metaspaceSize -XX:ReservedCodeCacheSize=128m -Dfile.encoding=UTF-8 $extraTestJavaArgs" diff --git a/sql/connect/bin/spark-connect-scala-client b/sql/connect/bin/spark-connect-scala-client index 31fe0217c6e70..d344c683742ed 100755 --- a/sql/connect/bin/spark-connect-scala-client +++ b/sql/connect/bin/spark-connect-scala-client @@ -68,7 +68,6 @@ JVM_ARGS="-XX:+IgnoreUnrecognizedVMOptions \ --add-opens=java.base/sun.security.action=ALL-UNNAMED \ --add-opens=java.base/sun.util.calendar=ALL-UNNAMED \ --add-opens=java.security.jgss/sun.security.krb5=ALL-UNNAMED \ - -Djdk.reflect.useDirectMethodHandle=false \ -Dio.netty.tryReflectionSetAccessible=true \ --enable-native-access=ALL-UNNAMED"