From 5abb350196e47bc0cca4ddad0ac7351ea26bf589 Mon Sep 17 00:00:00 2001 From: zizhao Date: Mon, 3 Jun 2024 10:27:31 +0300 Subject: [PATCH] client + server: add configuration for new memory pool --- .../spark/shuffle/ucx/external/ExternalUcxConf.scala | 4 ++++ .../ucx/external/client/ExternalUcxClientConf.scala | 8 ++++++++ .../ucx/external/server/ExternalUcxServerConf.scala | 4 ++++ .../spark/shuffle/ucx/memory/UcxLimitedMemPool.scala | 9 +++++---- 4 files changed, 21 insertions(+), 4 deletions(-) diff --git a/src/main/scala/org/apache/spark/shuffle/ucx/external/ExternalUcxConf.scala b/src/main/scala/org/apache/spark/shuffle/ucx/external/ExternalUcxConf.scala index cf7285c0..dae1d3d2 100644 --- a/src/main/scala/org/apache/spark/shuffle/ucx/external/ExternalUcxConf.scala +++ b/src/main/scala/org/apache/spark/shuffle/ucx/external/ExternalUcxConf.scala @@ -12,6 +12,7 @@ trait ExternalUcxConf { lazy val preallocateBuffersMap: Map[Long, Int] = ExternalUcxConf.preAllocateConfToMap(ExternalUcxConf.PREALLOCATE_BUFFERS_DEFAULT) lazy val memoryLimit: Boolean = ExternalUcxConf.MEMORY_LIMIT_DEFAULT + lazy val memoryGroupSize: Int = ExternalUcxConf.MEMORY_GROUP_SIZE_DEFAULT lazy val minBufferSize: Long = ExternalUcxConf.MIN_BUFFER_SIZE_DEFAULT lazy val maxBufferSize: Long = ExternalUcxConf.MAX_BUFFER_SIZE_DEFAULT lazy val minRegistrationSize: Long = ExternalUcxConf.MIN_REGISTRATION_SIZE_DEFAULT @@ -36,6 +37,9 @@ object ExternalUcxConf { lazy val MEMORY_LIMIT_KEY = getUcxConf("memory.limit") lazy val MEMORY_LIMIT_DEFAULT = false + lazy val MEMORY_GROUP_SIZE_KEY = getUcxConf("memory.groupSize") + lazy val MEMORY_GROUP_SIZE_DEFAULT = 3 + lazy val MIN_BUFFER_SIZE_KEY = getUcxConf("memory.minBufferSize") lazy val MIN_BUFFER_SIZE_DEFAULT = 4096L diff --git a/src/main/scala/org/apache/spark/shuffle/ucx/external/client/ExternalUcxClientConf.scala b/src/main/scala/org/apache/spark/shuffle/ucx/external/client/ExternalUcxClientConf.scala index 3f839404..bffcac4a 100644 --- a/src/main/scala/org/apache/spark/shuffle/ucx/external/client/ExternalUcxClientConf.scala +++ b/src/main/scala/org/apache/spark/shuffle/ucx/external/client/ExternalUcxClientConf.scala @@ -34,6 +34,14 @@ class ExternalUcxClientConf(val sparkConf: SparkConf) extends SparkConf with Ext override lazy val memoryLimit: Boolean = sparkConf.getBoolean(MEMORY_LIMIT.key, MEMORY_LIMIT.defaultValue.get) + private lazy val MEMORY_GROUP_SIZE = ConfigBuilder(ExternalUcxConf.MEMORY_GROUP_SIZE_KEY) + .doc("Memory group size.") + .intConf + .createWithDefault(ExternalUcxConf.MEMORY_GROUP_SIZE_DEFAULT) + + override lazy val memoryGroupSize: Int = sparkConf.getInt(MEMORY_GROUP_SIZE.key, + MEMORY_GROUP_SIZE.defaultValue.get) + private lazy val MIN_BUFFER_SIZE = ConfigBuilder(ExternalUcxConf.MIN_BUFFER_SIZE_KEY) .doc("Minimal buffer size in memory pool.") .bytesConf(ByteUnit.BYTE) diff --git a/src/main/scala/org/apache/spark/shuffle/ucx/external/server/ExternalUcxServerConf.scala b/src/main/scala/org/apache/spark/shuffle/ucx/external/server/ExternalUcxServerConf.scala index f149498f..398236d5 100644 --- a/src/main/scala/org/apache/spark/shuffle/ucx/external/server/ExternalUcxServerConf.scala +++ b/src/main/scala/org/apache/spark/shuffle/ucx/external/server/ExternalUcxServerConf.scala @@ -22,6 +22,10 @@ class ExternalUcxServerConf(val yarnConf: Configuration) extends ExternalUcxConf ExternalUcxConf.MEMORY_LIMIT_KEY, ExternalUcxConf.MEMORY_LIMIT_DEFAULT) + override lazy val memoryGroupSize: Int = yarnConf.getInt( + ExternalUcxConf.MEMORY_GROUP_SIZE_KEY, + ExternalUcxConf.MEMORY_GROUP_SIZE_DEFAULT) + override lazy val minBufferSize: Long = yarnConf.getLong( ExternalUcxConf.MIN_BUFFER_SIZE_KEY, ExternalUcxConf.MIN_BUFFER_SIZE_DEFAULT) diff --git a/src/main/scala/org/apache/spark/shuffle/ucx/memory/UcxLimitedMemPool.scala b/src/main/scala/org/apache/spark/shuffle/ucx/memory/UcxLimitedMemPool.scala index a50447f6..b40a3091 100644 --- a/src/main/scala/org/apache/spark/shuffle/ucx/memory/UcxLimitedMemPool.scala +++ b/src/main/scala/org/apache/spark/shuffle/ucx/memory/UcxLimitedMemPool.scala @@ -166,8 +166,6 @@ case class UcxLinkedMemAllocator(length: Long, minRegistrationSize: Long, case class UcxLimitedMemPool(ucxContext: UcpContext) extends Closeable with UcxLogging { private[memory] val allocatorMap = new ConcurrentHashMap[Long, UcxMemoryAllocator]() - private[memory] val memGroupSize = 3 - private[memory] val maxMemFactor = 1.0 - 1.0 / (1 << (memGroupSize - 1)) private[memory] var minBufferSize: Long = 4096L private[memory] var maxBufferSize: Long = 2L * 1024 * 1024 * 1024 private[memory] var minRegistrationSize: Long = 1024L * 1024 @@ -186,8 +184,11 @@ case class UcxLimitedMemPool(ucxContext: UcpContext) } } - def init(minBufSize: Long, maxBufSize: Long, minRegSize: Long, maxRegSize: Long, preAllocMap: Map[Long, Int], limit: Boolean): + def init(minBufSize: Long, maxBufSize: Long, minRegSize: Long, maxRegSize: Long, + preAllocMap: Map[Long, Int], limit: Boolean, memGroupSize: Int = 3): Unit = { + assert(memGroupSize > 2, s"Invalid memGroupSize. Expect > 2. Actual $memGroupSize") + val maxMemFactor = 1.0 - 1.0 / (1 << (memGroupSize - 1)) minBufferSize = roundUpToTheNextPowerOf2(minBufSize) maxBufferSize = roundUpToTheNextPowerOf2(maxBufSize) minRegistrationSize = roundUpToTheNextPowerOf2(minRegSize) @@ -213,7 +214,7 @@ case class UcxLimitedMemPool(ucxContext: UcpContext) .min(Int.MaxValue) logInfo(s"mem $memSize limit $memLimit") current.setLimit(memLimit.toInt) - shift += 1 + shift += memGroupSize - 2 } superAllocator = current allocatorMap.put(memSize, current)