Skip to content

[SPARK-52088][CORE] Refactor the ClosureCleaner#clean and avoid using the -Djdk.reflect.useDirectMethodHandle=false #51028

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/build_and_test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ on:
java:
required: false
type: string
default: 17
default: 21
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The modifications made to this file need to be reverted before merging.

branch:
description: Branch to run the build against
required: false
Expand Down Expand Up @@ -372,7 +372,7 @@ jobs:
# Hive "other tests" test needs larger metaspace size based on experiment.
if [[ "$MODULES_TO_TEST" == "hive" ]] && [[ "$EXCLUDED_TAGS" == "org.apache.spark.tags.SlowHiveTest" ]]; then export METASPACE_SIZE=2g; fi
# SPARK-46283: should delete the following env replacement after SPARK 3.x EOL
if [[ "$MODULES_TO_TEST" == *"streaming-kinesis-asl"* ]] && [[ "${{ inputs.branch }}" =~ ^branch-3 ]]; then
if [[ "$MODULES_TO_TEST" == *"streaming-kinesis-asl"* ]] && [[ "${{ inputs.branch }}" =~ ^branch-3 ]]; then
MODULES_TO_TEST=${MODULES_TO_TEST//streaming-kinesis-asl, /}
fi
export SERIAL_SBT_TESTS=1
Expand Down
117 changes: 47 additions & 70 deletions common/utils/src/main/scala/org/apache/spark/util/ClosureCleaner.scala
Original file line number Diff line number Diff line change
Expand Up @@ -418,22 +418,26 @@ private[spark] object ClosureCleaner extends Logging {
logDebug(s" + fields accessed by starting closure: ${accessedFields.size} classes")
accessedFields.foreach { f => logDebug(" " + f) }

// Instead of cloning and trying to update lambda reference, directly modify the original
// REPL object to null out unused fields. This works because REPL objects are not hidden
// classes and their fields can be modified.
if (accessedFields(capturingClass).size < capturingClass.getDeclaredFields.length) {
// clone and clean the enclosing `this` only when there are fields to null out
logDebug(s" + cloning instance of REPL class ${capturingClass.getName}")
val clonedOuterThis = cloneAndSetFields(
parent = null, outerThis, capturingClass, accessedFields)

val outerField = func.getClass.getDeclaredField("arg$1")
// SPARK-37072: When Java 17 is used and `outerField` is read-only,
// the content of `outerField` cannot be set by reflect api directly.
// But we can remove the `final` modifier of `outerField` before set value
// and reset the modifier after set value.
setFieldAndIgnoreModifiers(func, outerField, clonedOuterThis)
val fieldsToNull = capturingClass.getDeclaredFields.filterNot { field =>
Copy link
Contributor

@LuciferYang LuciferYang May 28, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cc @rednaxelafx FYI
It seems we've found a way to make all tests pass for Java 21 without using the -Djdk.reflect.useDirectMethodHandle=false option (which is no longer effective after Java 22; for details, see JDK-8309635).

However, the new approach no longer involves cloning but instead operates on the original object. My understanding of this area isn't thorough. Do you have time to help review this PR? Thanks ~

also cc @cloud-fan @mridulm @srowen @JoshRosen @dongjoon-hyun @HyukjinKwon

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't this going to break applications - you capture a reference to object A and in order to serialize it you try trimming out some fields, but, this has the effect of breaking the object A in the REPL just because it's being copied for purposes of serializing a closure. Surely no?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@srowen Thanks for review!
emmm...I was thinking I need to save the original value of the reset-field, and set them back after the serialization. However, all UT passed, then I didn't do the recover. I can add a recovery if really necessary.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you're doing that, it seems easier to (shallow) copy the object as before. And safer. Is that possible?

Here's an example of what I'm worried about: a closure refers to class A. Class A has a reference to some non-serializable client object (that isn't needed by the closure). Here, you would null that reference, and then class A fails to work after this point in the REPL.

You can save and restore it, but, even then for a moment the object breaks in a really non-obvious way.

Do I misunderstand or is that a real risk? if so then I think this different approach, modifying user program objects, is too risky. Do we need to change this?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if we consider a multi-threaded scenario? Is it possible to operate on object A in an intermediate state?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@srowen @LuciferYang
Multi-threaded will be bad, I thought the outerThis won't be accessed in other threads(?).
As to unsafe broken object, if there's no multi-thread issue, it should be fine. Because in SparkClosureCleaner, serialization is called immediately after the clean, which means no other action in between.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is always a gap between two actions, we can't guarantee it without locking. Can we copy the object instead of updating the original object?

Copy link
Contributor

@LuciferYang LuciferYang May 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The original code should presumably have been executed on the copied object, right?
Previously, I tried updating fields in the copied object using approaches like VarHandle and Unsafe.putObject. However, due to the limitations imposed by hidden class and private final fields, all my attempts failed. We can continue to explore, but new ideas are needed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@LuciferYang @srowen @cloud-fan So do I. I tried almost everything I can find to clone and modify, but all failed in new java version. That's why I started to think about save-modify-restore outerThis.
As to the gap, clean-serialize-restore are logically close unless in multithread:

SparkClosureCleaner::clean  // simplified verion
    if (ClosureCleaner.clean(closure, cleanTransitively, mutable.Map.empty)) {
    try {
          SparkEnv.get.closureSerializer.newInstance().serialize(closure)
      } catch {
        case ex: Exception => throw new SparkException("Task not serializable", ex)
      } finally {
        ClosureCleaner.restoreFieldValues(closure)
      }

any other ideas?

accessedFields(capturingClass).contains(field.getName)
}

for (field <- fieldsToNull) {
try {
field.setAccessible(true)
field.set(outerThis, null)
} catch {
case _: Exception =>
// Ignore failures to set fields - this is a best-effort cleanup
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Logs should be added

}
}
}
}


/**
* Cleans up Ammonite closures and nulls out fields captured from cmd & cmd$Helper objects
* but not actually accessed by the closure. To achieve this, it does:
Expand Down Expand Up @@ -499,72 +503,45 @@ private[spark] object ClosureCleaner extends Logging {
s"${accessedAmmCmdFields.size} classes")
accessedAmmCmdFields.foreach { f => logTrace(" " + f) }

val cmdClones = Map[Class[_], AnyRef]()
for ((cmdClass, _) <- ammCmdInstances if !cmdClass.getName.contains("Helper")) {
logDebug(s" + Cloning instance of Ammonite command class ${cmdClass.getName}")
cmdClones(cmdClass) = instantiateClass(cmdClass, enclosingObject = null)
}
for ((cmdHelperClass, cmdHelperInstance) <- ammCmdInstances
if cmdHelperClass.getName.contains("Helper")) {
val cmdHelperOuter = cmdHelperClass.getDeclaredFields
.find(_.getName == "$outer")
.map { field =>
field.setAccessible(true)
field.get(cmdHelperInstance)
}
val outerClone = cmdHelperOuter.flatMap(o => cmdClones.get(o.getClass)).orNull
logDebug(s" + Cloning instance of Ammonite command helper class ${cmdHelperClass.getName}")
cmdClones(cmdHelperClass) =
instantiateClass(cmdHelperClass, enclosingObject = outerClone)
}
// Instead of cloning and trying to update lambda reference, directly modify the original
// REPL object to null out unused fields. This works because REPL objects are not hidden
// classes and their fields can be modified.
val capturingClass = outerThis.getClass
if (accessedFields(capturingClass).size < capturingClass.getDeclaredFields.length) {
val fieldsToNull = capturingClass.getDeclaredFields.filterNot { field =>
accessedFields(capturingClass).contains(field.getName)
}

// set accessed fields
for ((_, cmdClone) <- cmdClones) {
val cmdClass = cmdClone.getClass
val accessedFields = accessedAmmCmdFields(cmdClass)
for (field <- cmdClone.getClass.getDeclaredFields
// outer fields were initialized during clone construction
if accessedFields.contains(field.getName) && field.getName != "$outer") {
// get command clone if exists, otherwise use an original field value
val value = cmdClones.getOrElse(field.getType, {
for (field <- fieldsToNull) {
try {
field.setAccessible(true)
field.get(ammCmdInstances(cmdClass))
})
setFieldAndIgnoreModifiers(cmdClone, field, value)
field.set(outerThis, null)
} catch {
case _: Exception =>
// Ignore failures to set fields - this is a best-effort cleanup
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

}
}
}

val outerThisClone = if (!isAmmoniteCommandOrHelper(outerThis.getClass)) {
// if outer class is not Ammonite helper / command object then is was not cloned
// in the code above. We still need to clone it and update accessed fields
logDebug(s" + Cloning instance of lambda capturing class ${outerThis.getClass.getName}")
val clone = cloneAndSetFields(parent = null, outerThis, outerThis.getClass, accessedFields)
// making sure that the code below will update references to Ammonite objects if they exist
for (field <- outerThis.getClass.getDeclaredFields) {
field.setAccessible(true)
cmdClones.get(field.getType).foreach { value =>
setFieldAndIgnoreModifiers(clone, field, value)
// Also clean up Ammonite command fields if any
for ((cmdClass, cmdInstance) <- ammCmdInstances) {
val cmdAccessedFields = accessedAmmCmdFields.getOrElse(cmdClass, Set.empty)
if (cmdAccessedFields.size < cmdClass.getDeclaredFields.length) {
val fieldsToNull = cmdClass.getDeclaredFields.filterNot { field =>
cmdAccessedFields.contains(field.getName) || field.getName == "$outer"
}

for (field <- fieldsToNull) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems that each modification has similar code. Can we extract a common function?

try {
field.setAccessible(true)
field.set(cmdInstance, null)
} catch {
case _: Exception =>
// Ignore failures to set fields - this is a best-effort cleanup
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

}
}
}
clone
} else {
cmdClones(outerThis.getClass)
}

val outerField = func.getClass.getDeclaredField("arg$1")
// update lambda capturing class reference
setFieldAndIgnoreModifiers(func, outerField, outerThisClone)
}

private def setFieldAndIgnoreModifiers(obj: AnyRef, field: Field, value: AnyRef): Unit = {
val modifiersField = getFinalModifiersFieldForJava17(field)
modifiersField
.foreach(m => m.setInt(field, field.getModifiers & ~Modifier.FINAL))
field.setAccessible(true)
field.set(obj, value)

modifiersField
.foreach(m => m.setInt(field, field.getModifiers | Modifier.FINAL))
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ public class JavaModuleOptions {
"--add-opens=java.base/sun.security.action=ALL-UNNAMED",
"--add-opens=java.base/sun.util.calendar=ALL-UNNAMED",
"--add-opens=java.security.jgss/sun.security.krb5=ALL-UNNAMED",
"-Djdk.reflect.useDirectMethodHandle=false",
"-Dio.netty.tryReflectionSetAccessible=true",
"--enable-native-access=ALL-UNNAMED"};

Expand Down
1 change: 0 additions & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -334,7 +334,6 @@
--add-opens=java.base/sun.nio.cs=ALL-UNNAMED
--add-opens=java.base/sun.security.action=ALL-UNNAMED
--add-opens=java.base/sun.util.calendar=ALL-UNNAMED
-Djdk.reflect.useDirectMethodHandle=false
-Dio.netty.tryReflectionSetAccessible=true
--enable-native-access=ALL-UNNAMED
</extraJavaTestArgs>
Expand Down
1 change: 0 additions & 1 deletion project/SparkBuild.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1669,7 +1669,6 @@ object TestSettings {
"--add-opens=java.base/sun.nio.cs=ALL-UNNAMED",
"--add-opens=java.base/sun.security.action=ALL-UNNAMED",
"--add-opens=java.base/sun.util.calendar=ALL-UNNAMED",
"-Djdk.reflect.useDirectMethodHandle=false",
"-Dio.netty.tryReflectionSetAccessible=true",
"--enable-native-access=ALL-UNNAMED").mkString(" ")
s"-Xmx$heapSize -Xss4m -XX:MaxMetaspaceSize=$metaspaceSize -XX:ReservedCodeCacheSize=128m -Dfile.encoding=UTF-8 $extraTestJavaArgs"
Expand Down
1 change: 0 additions & 1 deletion sql/connect/bin/spark-connect-scala-client
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,6 @@ JVM_ARGS="-XX:+IgnoreUnrecognizedVMOptions \
--add-opens=java.base/sun.security.action=ALL-UNNAMED \
--add-opens=java.base/sun.util.calendar=ALL-UNNAMED \
--add-opens=java.security.jgss/sun.security.krb5=ALL-UNNAMED \
-Djdk.reflect.useDirectMethodHandle=false \
-Dio.netty.tryReflectionSetAccessible=true \
--enable-native-access=ALL-UNNAMED"

Expand Down