Skip to content

Commit

Permalink
wait driver endpoint start up in manger initialization
Browse files Browse the repository at this point in the history
Signed-off-by: zizhao <[email protected]>
  • Loading branch information
JeynmannZ committed Jan 5, 2024
1 parent e5a6742 commit d3445af
Showing 1 changed file with 16 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@ import java.util.concurrent.{CountDownLatch, TimeUnit}
import scala.concurrent.ExecutionContext.Implicits.global
import scala.util.Success

import org.apache.spark.rpc.RpcEnv
import org.apache.spark.SparkException
import org.apache.spark.rpc.{RpcEnv, RpcEndpointRef}
import org.apache.spark.shuffle.sort.SortShuffleManager
import org.apache.spark.shuffle.ucx.rpc.{UcxDriverRpcEndpoint, UcxExecutorRpcEndpoint}
import org.apache.spark.shuffle.ucx.rpc.UcxRpcMessages.{ExecutorAdded, IntroduceAllExecutors}
Expand Down Expand Up @@ -81,18 +82,28 @@ abstract class CommonUcxShuffleManager(val conf: SparkConf, isDriver: Boolean) e
val address = transport.init()
ucxTransport = transport
latch.countDown()
val rpcEnv = RpcEnv.create("ucx-rpc-env", blockManager.host, blockManager.port,
conf, new SecurityManager(conf), clientMode = false)
val rpcEnv = SparkEnv.get.rpcEnv
executorEndpoint = new UcxExecutorRpcEndpoint(rpcEnv, ucxTransport, setupThread)
val endpoint = rpcEnv.setupEndpoint(
s"ucx-shuffle-executor-${blockManager.executorId}",
executorEndpoint)
val driverEndpoint = RpcUtils.makeDriverRef(driverRpcName, conf, rpcEnv)
var driverCost = 0
var driverEndpoint: RpcEndpointRef = null
while (driverEndpoint == null) {
try {
driverEndpoint = RpcUtils.makeDriverRef(driverRpcName, conf, rpcEnv)
} catch {
case e: SparkException => {
Thread.sleep(5)
driverCost += 5
}
}
}
driverEndpoint.ask[IntroduceAllExecutors](ExecutorAdded(blockManager.executorId.toLong, endpoint,
new SerializableDirectBuffer(address)))
.andThen {
case Success(msg) =>
logInfo(s"Receive reply $msg")
logInfo(s"Driver take $driverCost ms. Receive reply ${msg.asInstanceOf[IntroduceAllExecutors].executorIdToAddress.keys}")
executorEndpoint.receive(msg)
}
}
Expand Down

0 comments on commit d3445af

Please sign in to comment.