Skip to content

Commit

Permalink
client + server: add configuration for new memory pool
Browse files Browse the repository at this point in the history
  • Loading branch information
JeynmannZ committed Jun 3, 2024
1 parent 16e29d5 commit 5abb350
Show file tree
Hide file tree
Showing 4 changed files with 21 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ trait ExternalUcxConf {
lazy val preallocateBuffersMap: Map[Long, Int] =
ExternalUcxConf.preAllocateConfToMap(ExternalUcxConf.PREALLOCATE_BUFFERS_DEFAULT)
lazy val memoryLimit: Boolean = ExternalUcxConf.MEMORY_LIMIT_DEFAULT
lazy val memoryGroupSize: Int = ExternalUcxConf.MEMORY_GROUP_SIZE_DEFAULT
lazy val minBufferSize: Long = ExternalUcxConf.MIN_BUFFER_SIZE_DEFAULT
lazy val maxBufferSize: Long = ExternalUcxConf.MAX_BUFFER_SIZE_DEFAULT
lazy val minRegistrationSize: Long = ExternalUcxConf.MIN_REGISTRATION_SIZE_DEFAULT
Expand All @@ -36,6 +37,9 @@ object ExternalUcxConf {
lazy val MEMORY_LIMIT_KEY = getUcxConf("memory.limit")
lazy val MEMORY_LIMIT_DEFAULT = false

lazy val MEMORY_GROUP_SIZE_KEY = getUcxConf("memory.groupSize")
lazy val MEMORY_GROUP_SIZE_DEFAULT = 3

lazy val MIN_BUFFER_SIZE_KEY = getUcxConf("memory.minBufferSize")
lazy val MIN_BUFFER_SIZE_DEFAULT = 4096L

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,14 @@ class ExternalUcxClientConf(val sparkConf: SparkConf) extends SparkConf with Ext
override lazy val memoryLimit: Boolean = sparkConf.getBoolean(MEMORY_LIMIT.key,
MEMORY_LIMIT.defaultValue.get)

private lazy val MEMORY_GROUP_SIZE = ConfigBuilder(ExternalUcxConf.MEMORY_GROUP_SIZE_KEY)
.doc("Memory group size.")
.intConf
.createWithDefault(ExternalUcxConf.MEMORY_GROUP_SIZE_DEFAULT)

override lazy val memoryGroupSize: Int = sparkConf.getInt(MEMORY_GROUP_SIZE.key,
MEMORY_GROUP_SIZE.defaultValue.get)

private lazy val MIN_BUFFER_SIZE = ConfigBuilder(ExternalUcxConf.MIN_BUFFER_SIZE_KEY)
.doc("Minimal buffer size in memory pool.")
.bytesConf(ByteUnit.BYTE)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@ class ExternalUcxServerConf(val yarnConf: Configuration) extends ExternalUcxConf
ExternalUcxConf.MEMORY_LIMIT_KEY,
ExternalUcxConf.MEMORY_LIMIT_DEFAULT)

override lazy val memoryGroupSize: Int = yarnConf.getInt(
ExternalUcxConf.MEMORY_GROUP_SIZE_KEY,
ExternalUcxConf.MEMORY_GROUP_SIZE_DEFAULT)

override lazy val minBufferSize: Long = yarnConf.getLong(
ExternalUcxConf.MIN_BUFFER_SIZE_KEY,
ExternalUcxConf.MIN_BUFFER_SIZE_DEFAULT)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,8 +166,6 @@ case class UcxLinkedMemAllocator(length: Long, minRegistrationSize: Long,
case class UcxLimitedMemPool(ucxContext: UcpContext)
extends Closeable with UcxLogging {
private[memory] val allocatorMap = new ConcurrentHashMap[Long, UcxMemoryAllocator]()
private[memory] val memGroupSize = 3
private[memory] val maxMemFactor = 1.0 - 1.0 / (1 << (memGroupSize - 1))
private[memory] var minBufferSize: Long = 4096L
private[memory] var maxBufferSize: Long = 2L * 1024 * 1024 * 1024
private[memory] var minRegistrationSize: Long = 1024L * 1024
Expand All @@ -186,8 +184,11 @@ case class UcxLimitedMemPool(ucxContext: UcpContext)
}
}

def init(minBufSize: Long, maxBufSize: Long, minRegSize: Long, maxRegSize: Long, preAllocMap: Map[Long, Int], limit: Boolean):
def init(minBufSize: Long, maxBufSize: Long, minRegSize: Long, maxRegSize: Long,
preAllocMap: Map[Long, Int], limit: Boolean, memGroupSize: Int = 3):
Unit = {
assert(memGroupSize > 2, s"Invalid memGroupSize. Expect > 2. Actual $memGroupSize")
val maxMemFactor = 1.0 - 1.0 / (1 << (memGroupSize - 1))
minBufferSize = roundUpToTheNextPowerOf2(minBufSize)
maxBufferSize = roundUpToTheNextPowerOf2(maxBufSize)
minRegistrationSize = roundUpToTheNextPowerOf2(minRegSize)
Expand All @@ -213,7 +214,7 @@ case class UcxLimitedMemPool(ucxContext: UcpContext)
.min(Int.MaxValue)
logInfo(s"mem $memSize limit $memLimit")
current.setLimit(memLimit.toInt)
shift += 1
shift += memGroupSize - 2
}
superAllocator = current
allocatorMap.put(memSize, current)
Expand Down

0 comments on commit 5abb350

Please sign in to comment.