Skip to content

Add heap memory adaption #15297

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 11 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -359,7 +359,7 @@ public static class Builder {
private long checkpointGap = 500;
private IMemoryBlock consensusMemoryBlock =
new AtomicLongMemoryBlock(
"Consensus-Default", null, Runtime.getRuntime().maxMemory() / 10);
"Consensus-Default", null, Runtime.getRuntime().totalMemory() / 10);
private double maxMemoryRatioForQueue = 0.6;
private long regionMigrationSpeedLimitBytesPerSecond = 32 * 1024 * 1024L;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ public class IoTConsensusMemoryManager {
private final AtomicLong queueMemorySizeInByte = new AtomicLong(0);
private final AtomicLong syncMemorySizeInByte = new AtomicLong(0);
private IMemoryBlock memoryBlock =
new AtomicLongMemoryBlock("Consensus-Default", null, Runtime.getRuntime().maxMemory() / 10);
new AtomicLongMemoryBlock("Consensus-Default", null, Runtime.getRuntime().totalMemory() / 10);
private Double maxMemoryRatioForQueue = 0.6;

private IoTConsensusMemoryManager() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.iotdb.commons.memory.MemoryConfig;
import org.apache.iotdb.commons.memory.MemoryManager;
import org.apache.iotdb.db.storageengine.dataregion.compaction.selector.estimator.AbstractCompactionEstimator;
import org.apache.iotdb.db.storageengine.rescon.memory.SystemInfo;
import org.apache.iotdb.db.utils.MemUtils;

import org.slf4j.Logger;
Expand Down Expand Up @@ -60,8 +61,9 @@ public class DataNodeMemoryConfig {
private int queryThreadCount = Runtime.getRuntime().availableProcessors();

/** Max bytes of each FragmentInstance for DataExchange */
// TODO @spricoder : influence dynamic change of memory size
private long maxBytesPerFragmentInstance =
Runtime.getRuntime().maxMemory() * 3 / 10 * 200 / 1001 / queryThreadCount;
Runtime.getRuntime().totalMemory() * 3 / 10 * 200 / 1001 / queryThreadCount;

/** The memory manager of on heap */
private MemoryManager onHeapMemoryManager;
Expand Down Expand Up @@ -152,18 +154,18 @@ public void init(TrimProperties properties) {
}
}

long storageEngineMemorySize = Runtime.getRuntime().maxMemory() * 3 / 10;
long queryEngineMemorySize = Runtime.getRuntime().maxMemory() * 3 / 10;
long schemaEngineMemorySize = Runtime.getRuntime().maxMemory() / 10;
long consensusMemorySize = Runtime.getRuntime().maxMemory() / 10;
long pipeMemorySize = Runtime.getRuntime().maxMemory() / 10;
long storageEngineMemorySize = Runtime.getRuntime().totalMemory() * 3 / 10;
long queryEngineMemorySize = Runtime.getRuntime().totalMemory() * 3 / 10;
long schemaEngineMemorySize = Runtime.getRuntime().totalMemory() / 10;
long consensusMemorySize = Runtime.getRuntime().totalMemory() / 10;
long pipeMemorySize = Runtime.getRuntime().totalMemory() / 10;
if (memoryAllocateProportion != null) {
String[] proportions = memoryAllocateProportion.split(":");
int proportionSum = 0;
for (String proportion : proportions) {
proportionSum += Integer.parseInt(proportion.trim());
}
long maxMemoryAvailable = Runtime.getRuntime().maxMemory();
long maxMemoryAvailable = Runtime.getRuntime().totalMemory();

if (proportionSum != 0) {
storageEngineMemorySize =
Expand All @@ -190,9 +192,12 @@ public void init(TrimProperties properties) {
}
}
onHeapMemoryManager =
MemoryConfig.global().getOrCreateMemoryManager("OnHeap", Runtime.getRuntime().maxMemory());
MemoryConfig.global()
.getOrCreateMemoryManager("OnHeap", Runtime.getRuntime().totalMemory());
storageEngineMemoryManager =
onHeapMemoryManager.getOrCreateMemoryManager("StorageEngine", storageEngineMemorySize);
onHeapMemoryManager
.getOrCreateMemoryManager("StorageEngine", storageEngineMemorySize)
.setMemoryUpdateCallback((before, after) -> SystemInfo.getInstance().loadWriteMemory());
queryEngineMemoryManager =
onHeapMemoryManager.getOrCreateMemoryManager("QueryEngine", queryEngineMemorySize);
schemaEngineMemoryManager =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@
import org.apache.iotdb.db.storageengine.dataregion.wal.utils.WALMode;
import org.apache.iotdb.db.storageengine.load.disk.ILoadDiskSelector;
import org.apache.iotdb.db.storageengine.rescon.disk.TierManager;
import org.apache.iotdb.db.storageengine.rescon.memory.SystemInfo;
import org.apache.iotdb.db.utils.DateTimeUtils;
import org.apache.iotdb.db.utils.datastructure.TVListSortAlgorithm;
import org.apache.iotdb.external.api.IPropertiesLoader;
Expand Down Expand Up @@ -2338,6 +2337,7 @@ private void loadUDFProps(TrimProperties properties) {
for (String proportion : proportions) {
proportionSum += Integer.parseInt(proportion.trim());
}
// TODO @spricoder: consider whether this part need change when total memory is dynamic
float maxMemoryAvailable = conf.getUdfMemoryBudgetInMB();
try {
conf.setUdfReaderMemoryBudgetInMB(
Expand Down Expand Up @@ -2635,12 +2635,12 @@ public void reclaimConsensusMemory() {
// first we need to release the memory allocated for consensus
MemoryManager storageEngineMemoryManager = memoryConfig.getStorageEngineMemoryManager();
MemoryManager consensusMemoryManager = memoryConfig.getConsensusMemoryManager();
long originSize = storageEngineMemoryManager.getInitialAllocatedMemorySizeInBytes();
long newSize =
storageEngineMemoryManager.getTotalMemorySizeInBytes()
+ consensusMemoryManager.getTotalMemorySizeInBytes();
consensusMemoryManager.setTotalMemorySizeInBytes(0);
storageEngineMemoryManager.setTotalMemorySizeInBytesWithReload(newSize);
SystemInfo.getInstance().loadWriteMemory();
storageEngineMemoryManager.getInitialAllocatedMemorySizeInBytes()
+ consensusMemoryManager.getInitialAllocatedMemorySizeInBytes();
storageEngineMemoryManager.resizeByRatio((double) newSize / originSize);
consensusMemoryManager.resizeByRatio(0);
}

private static class IoTDBDescriptorHolder {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ private static ConsensusConfig buildConsensusConfig() {
IMemoryBlock memoryBlock =
MEMORY_CONFIG
.getConsensusMemoryManager()
.exactAllocate("Consensus", MemoryBlockType.DYNAMIC);
.exactAllocate("Consensus", MemoryBlockType.STATIC);
return ConsensusConfig.newBuilder()
.setThisNodeId(CONF.getDataNodeId())
.setThisNode(new TEndPoint(CONF.getInternalAddress(), CONF.getDataRegionConsensusPort()))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,11 @@ public class PipeMemoryManager {
PipeConfig.getInstance().getPipeMemoryManagementEnabled();

// TODO @spricoder: consider combine memory block and used MemorySizeInBytes
private IMemoryBlock memoryBlock =
private final IMemoryBlock memoryBlock =
IoTDBDescriptor.getInstance()
.getMemoryConfig()
.getPipeMemoryManager()
.exactAllocate("Stream", MemoryBlockType.DYNAMIC);
.exactAllocate("Stream", MemoryBlockType.STATIC);

private static final double EXCEED_PROTECT_THRESHOLD = 0.95;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@

import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.google.common.util.concurrent.AtomicDouble;
import org.apache.thrift.TException;
import org.apache.tsfile.file.metadata.IDeviceID;
import org.slf4j.Logger;
Expand Down Expand Up @@ -120,12 +121,22 @@ public class PartitionCache {

private final CacheMetrics cacheMetrics;
private final IMemoryBlock memoryBlock;
private final AtomicDouble memoryUsageCheatFactor = new AtomicDouble(1);

public PartitionCache() {
this.memoryBlock =
memoryConfig
.getPartitionCacheMemoryManager()
.exactAllocate("PartitionCache", MemoryBlockType.STATIC);
.exactAllocate("PartitionCache", MemoryBlockType.STATIC)
.setMemoryUpdateCallback(
(oldMemory, newMemory) -> {
memoryUsageCheatFactor.updateAndGet(
factor -> factor / ((double) newMemory / oldMemory));
logger.debug(
"[MemoryUsageCheatFactor] PartitionCache has updated from {} to {}.",
oldMemory,
newMemory);
});
this.memoryBlock.allocate(this.memoryBlock.getTotalMemorySizeInBytes());
// TODO @spricoder: PartitionCache need to be controlled according to memory
this.schemaPartitionCache =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.Weigher;
import com.google.common.util.concurrent.AtomicDouble;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -39,21 +40,34 @@ public class DataNodeDevicePathCache {
private static final DataNodeMemoryConfig memoryConfig =
IoTDBDescriptor.getInstance().getMemoryConfig();
private final IMemoryBlock devicePathCacheMemoryBlock;
private final AtomicDouble memoryUsageCheatFactor = new AtomicDouble(1);

private final Cache<String, PartialPath> devicePathCache;

private DataNodeDevicePathCache() {
devicePathCacheMemoryBlock =
memoryConfig
.getDevicePathCacheMemoryManager()
.exactAllocate("DevicePathCache", MemoryBlockType.STATIC);
.exactAllocate("DevicePathCache", MemoryBlockType.STATIC)
.setMemoryUpdateCallback(
(oldMemory, newMemory) -> {
memoryUsageCheatFactor.updateAndGet(
factor -> factor / ((double) newMemory / oldMemory));
LOGGER.debug(
"[MemoryUsageCheatFactor]DataNodeDevicePathCache has updated from {} to {}.",
oldMemory,
newMemory);
});
;
// TODO @spricoder: later we can find a way to get the byte size of cache
devicePathCacheMemoryBlock.allocate(devicePathCacheMemoryBlock.getTotalMemorySizeInBytes());
devicePathCache =
Caffeine.newBuilder()
.maximumWeight(devicePathCacheMemoryBlock.getTotalMemorySizeInBytes())
.weigher(
(Weigher<String, PartialPath>) (key, val) -> (PartialPath.estimateSize(val) + 32))
(Weigher<String, PartialPath>)
(key, val) ->
(int) ((PartialPath.estimateSize(val) + 32) * memoryUsageCheatFactor.get()))
.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.apache.iotdb.db.schemaengine.schemaregion.SchemaRegion;
import org.apache.iotdb.db.schemaengine.table.DataNodeTableCache;

import com.google.common.util.concurrent.AtomicDouble;
import org.apache.tsfile.file.metadata.IDeviceID;
import org.apache.tsfile.file.metadata.StringArrayDeviceID;
import org.apache.tsfile.read.TimeValuePair;
Expand Down Expand Up @@ -106,20 +107,34 @@ public class TableDeviceSchemaCache {
private final ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock(false);

private final IMemoryBlock memoryBlock;
private final AtomicDouble memoryUsageCheatFactor = new AtomicDouble(1);

private TableDeviceSchemaCache() {
memoryBlock =
memoryConfig
.getSchemaCacheMemoryManager()
.exactAllocate("TableDeviceSchemaCache", MemoryBlockType.STATIC);
.exactAllocate("TableDeviceSchemaCache", MemoryBlockType.STATIC)
.setMemoryUpdateCallback(
(oldMemory, newMemory) -> {
memoryUsageCheatFactor.updateAndGet(
factor -> factor / ((double) newMemory / oldMemory));
logger.debug(
"[MemoryUsageCheatFactor]TableDeviceSchemaCache has updated from {} to {}.",
oldMemory,
newMemory);
});
dualKeyCache =
new DualKeyCacheBuilder<TableId, IDeviceID, TableDeviceCacheEntry>()
.cacheEvictionPolicy(
DualKeyCachePolicy.valueOf(config.getDataNodeSchemaCacheEvictionPolicy()))
.memoryCapacity(memoryBlock.getTotalMemorySizeInBytes())
.firstKeySizeComputer(TableId::estimateSize)
.secondKeySizeComputer(deviceID -> (int) deviceID.ramBytesUsed())
.valueSizeComputer(TableDeviceCacheEntry::estimateSize)
.firstKeySizeComputer(
tableId -> (int) (tableId.estimateSize() * memoryUsageCheatFactor.get()))
.secondKeySizeComputer(
deviceID -> (int) (deviceID.ramBytesUsed() * memoryUsageCheatFactor.get()))
.valueSizeComputer(
tableDeviceCacheEntry ->
(int) (tableDeviceCacheEntry.estimateSize() * memoryUsageCheatFactor.get()))
.build();
memoryBlock.allocate(memoryBlock.getTotalMemorySizeInBytes());
MetricService.getInstance().addMetricSet(new TableDeviceSchemaCacheMetrics(this));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.commons.exception.IoTDBException;
import org.apache.iotdb.commons.exception.StartupException;
import org.apache.iotdb.commons.memory.MemoryRuntimeController;
import org.apache.iotdb.commons.pipe.agent.plugin.meta.PipePluginMeta;
import org.apache.iotdb.commons.pipe.config.PipeConfig;
import org.apache.iotdb.commons.service.JMXService;
Expand Down Expand Up @@ -245,6 +246,9 @@ protected void start() {
// Serialize mutable system properties
IoTDBStartCheck.getInstance().serializeMutableSystemPropertiesIfNecessary();

// Setup memory controller
registerManager.register(MemoryRuntimeController.getInstance());

logger.info("IoTDB configuration: {}", config.getConfigMessage());
logger.info("Congratulations, IoTDB DataNode is set up successfully. Now, enjoy yourself!");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.Weigher;
import com.google.common.util.concurrent.AtomicDouble;
import org.apache.tsfile.read.TsFileSequenceReader;
import org.apache.tsfile.utils.BloomFilter;
import org.apache.tsfile.utils.RamUsageEstimator;
Expand All @@ -51,32 +52,44 @@ public class BloomFilterCache {
private static final Logger DEBUG_LOGGER = LoggerFactory.getLogger("QUERY_DEBUG");
private static final DataNodeMemoryConfig MEMORY_CONFIG =
IoTDBDescriptor.getInstance().getMemoryConfig();
private static final IMemoryBlock CACHE_MEMORY_BLOCK;
private static final IMemoryBlock cacheMemoryBlock;
private static final AtomicDouble memoryUsageCheatFactor = new AtomicDouble(1);
private static final boolean CACHE_ENABLE = MEMORY_CONFIG.isMetaDataCacheEnable();
private final AtomicLong entryAverageSize = new AtomicLong(0);

private final Cache<BloomFilterCacheKey, BloomFilter> lruCache;

static {
CACHE_MEMORY_BLOCK =
cacheMemoryBlock =
MEMORY_CONFIG
.getBloomFilterCacheMemoryManager()
.exactAllocate("BloomFilterCache", MemoryBlockType.STATIC);
.exactAllocate("BloomFilterCache", MemoryBlockType.STATIC)
.setMemoryUpdateCallback(
(oldMemory, newMemory) -> {
memoryUsageCheatFactor.updateAndGet(
factor -> factor / ((double) newMemory / oldMemory));
LOGGER.debug(
"[MemoryUsageCheatFactor]BloomFilterCache has updated from {} to {}.",
oldMemory,
newMemory);
});
// TODO @spricoder: find a way to get the size of the BloomFilterCache
CACHE_MEMORY_BLOCK.allocate(CACHE_MEMORY_BLOCK.getTotalMemorySizeInBytes());
cacheMemoryBlock.allocate(cacheMemoryBlock.getTotalMemorySizeInBytes());
}

private BloomFilterCache() {
if (CACHE_ENABLE) {
LOGGER.info("BloomFilterCache size = {}", CACHE_MEMORY_BLOCK.getTotalMemorySizeInBytes());
LOGGER.info("BloomFilterCache size = {}", cacheMemoryBlock.getTotalMemorySizeInBytes());
}
lruCache =
Caffeine.newBuilder()
.maximumWeight(CACHE_MEMORY_BLOCK.getTotalMemorySizeInBytes())
.maximumWeight(cacheMemoryBlock.getTotalMemorySizeInBytes())
.weigher(
(Weigher<BloomFilterCacheKey, BloomFilter>)
(key, bloomFilter) ->
(int) (key.getRetainedSizeInBytes() + bloomFilter.getRetainedSizeInBytes()))
(int)
((key.getRetainedSizeInBytes() + bloomFilter.getRetainedSizeInBytes())
* memoryUsageCheatFactor.get()))
.recordStats()
.build();
}
Expand Down Expand Up @@ -131,7 +144,7 @@ public long getEvictionCount() {
}

public long getMaxMemory() {
return CACHE_MEMORY_BLOCK.getTotalMemorySizeInBytes();
return cacheMemoryBlock.getTotalMemorySizeInBytes();
}

public double getAverageLoadPenalty() {
Expand Down
Loading
Loading