diff --git a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/config/IoTConsensusConfig.java b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/config/IoTConsensusConfig.java index 0621aee23ef0..c01553c13913 100644 --- a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/config/IoTConsensusConfig.java +++ b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/config/IoTConsensusConfig.java @@ -359,7 +359,7 @@ public static class Builder { private long checkpointGap = 500; private IMemoryBlock consensusMemoryBlock = new AtomicLongMemoryBlock( - "Consensus-Default", null, Runtime.getRuntime().maxMemory() / 10); + "Consensus-Default", null, Runtime.getRuntime().totalMemory() / 10); private double maxMemoryRatioForQueue = 0.6; private long regionMigrationSpeedLimitBytesPerSecond = 32 * 1024 * 1024L; diff --git a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/IoTConsensusMemoryManager.java b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/IoTConsensusMemoryManager.java index 7bee00588cd7..4cb141f192ff 100644 --- a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/IoTConsensusMemoryManager.java +++ b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/IoTConsensusMemoryManager.java @@ -33,7 +33,7 @@ public class IoTConsensusMemoryManager { private final AtomicLong queueMemorySizeInByte = new AtomicLong(0); private final AtomicLong syncMemorySizeInByte = new AtomicLong(0); private IMemoryBlock memoryBlock = - new AtomicLongMemoryBlock("Consensus-Default", null, Runtime.getRuntime().maxMemory() / 10); + new AtomicLongMemoryBlock("Consensus-Default", null, Runtime.getRuntime().totalMemory() / 10); private Double maxMemoryRatioForQueue = 0.6; private IoTConsensusMemoryManager() { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/DataNodeMemoryConfig.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/DataNodeMemoryConfig.java index 695c25aeddb1..a7cc84f2eaa9 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/DataNodeMemoryConfig.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/DataNodeMemoryConfig.java @@ -24,6 +24,7 @@ import org.apache.iotdb.commons.memory.MemoryConfig; import org.apache.iotdb.commons.memory.MemoryManager; import org.apache.iotdb.db.storageengine.dataregion.compaction.selector.estimator.AbstractCompactionEstimator; +import org.apache.iotdb.db.storageengine.rescon.memory.SystemInfo; import org.apache.iotdb.db.utils.MemUtils; import org.slf4j.Logger; @@ -60,8 +61,9 @@ public class DataNodeMemoryConfig { private int queryThreadCount = Runtime.getRuntime().availableProcessors(); /** Max bytes of each FragmentInstance for DataExchange */ + // TODO @spricoder : influence dynamic change of memory size private long maxBytesPerFragmentInstance = - Runtime.getRuntime().maxMemory() * 3 / 10 * 200 / 1001 / queryThreadCount; + Runtime.getRuntime().totalMemory() * 3 / 10 * 200 / 1001 / queryThreadCount; /** The memory manager of on heap */ private MemoryManager onHeapMemoryManager; @@ -152,18 +154,18 @@ public void init(TrimProperties properties) { } } - long storageEngineMemorySize = Runtime.getRuntime().maxMemory() * 3 / 10; - long queryEngineMemorySize = Runtime.getRuntime().maxMemory() * 3 / 10; - long schemaEngineMemorySize = Runtime.getRuntime().maxMemory() / 10; - long consensusMemorySize = Runtime.getRuntime().maxMemory() / 10; - long pipeMemorySize = Runtime.getRuntime().maxMemory() / 10; + long storageEngineMemorySize = Runtime.getRuntime().totalMemory() * 3 / 10; + long queryEngineMemorySize = Runtime.getRuntime().totalMemory() * 3 / 10; + long schemaEngineMemorySize = Runtime.getRuntime().totalMemory() / 10; + long consensusMemorySize = Runtime.getRuntime().totalMemory() / 10; + long pipeMemorySize = Runtime.getRuntime().totalMemory() / 10; if (memoryAllocateProportion != null) { String[] proportions = memoryAllocateProportion.split(":"); int proportionSum = 0; for (String proportion : proportions) { proportionSum += Integer.parseInt(proportion.trim()); } - long maxMemoryAvailable = Runtime.getRuntime().maxMemory(); + long maxMemoryAvailable = Runtime.getRuntime().totalMemory(); if (proportionSum != 0) { storageEngineMemorySize = @@ -190,9 +192,12 @@ public void init(TrimProperties properties) { } } onHeapMemoryManager = - MemoryConfig.global().getOrCreateMemoryManager("OnHeap", Runtime.getRuntime().maxMemory()); + MemoryConfig.global() + .getOrCreateMemoryManager("OnHeap", Runtime.getRuntime().totalMemory()); storageEngineMemoryManager = - onHeapMemoryManager.getOrCreateMemoryManager("StorageEngine", storageEngineMemorySize); + onHeapMemoryManager + .getOrCreateMemoryManager("StorageEngine", storageEngineMemorySize) + .setMemoryUpdateCallback((before, after) -> SystemInfo.getInstance().loadWriteMemory()); queryEngineMemoryManager = onHeapMemoryManager.getOrCreateMemoryManager("QueryEngine", queryEngineMemorySize); schemaEngineMemoryManager = diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java index d9073afe1f07..845afde1cc71 100755 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java @@ -51,7 +51,6 @@ import org.apache.iotdb.db.storageengine.dataregion.wal.utils.WALMode; import org.apache.iotdb.db.storageengine.load.disk.ILoadDiskSelector; import org.apache.iotdb.db.storageengine.rescon.disk.TierManager; -import org.apache.iotdb.db.storageengine.rescon.memory.SystemInfo; import org.apache.iotdb.db.utils.DateTimeUtils; import org.apache.iotdb.db.utils.datastructure.TVListSortAlgorithm; import org.apache.iotdb.external.api.IPropertiesLoader; @@ -2338,6 +2337,7 @@ private void loadUDFProps(TrimProperties properties) { for (String proportion : proportions) { proportionSum += Integer.parseInt(proportion.trim()); } + // TODO @spricoder: consider whether this part need change when total memory is dynamic float maxMemoryAvailable = conf.getUdfMemoryBudgetInMB(); try { conf.setUdfReaderMemoryBudgetInMB( @@ -2635,12 +2635,12 @@ public void reclaimConsensusMemory() { // first we need to release the memory allocated for consensus MemoryManager storageEngineMemoryManager = memoryConfig.getStorageEngineMemoryManager(); MemoryManager consensusMemoryManager = memoryConfig.getConsensusMemoryManager(); + long originSize = storageEngineMemoryManager.getInitialAllocatedMemorySizeInBytes(); long newSize = - storageEngineMemoryManager.getTotalMemorySizeInBytes() - + consensusMemoryManager.getTotalMemorySizeInBytes(); - consensusMemoryManager.setTotalMemorySizeInBytes(0); - storageEngineMemoryManager.setTotalMemorySizeInBytesWithReload(newSize); - SystemInfo.getInstance().loadWriteMemory(); + storageEngineMemoryManager.getInitialAllocatedMemorySizeInBytes() + + consensusMemoryManager.getInitialAllocatedMemorySizeInBytes(); + storageEngineMemoryManager.resizeByRatio((double) newSize / originSize); + consensusMemoryManager.resizeByRatio(0); } private static class IoTDBDescriptorHolder { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/DataRegionConsensusImpl.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/DataRegionConsensusImpl.java index 0f71a1cece8d..ab0606295e36 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/DataRegionConsensusImpl.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/DataRegionConsensusImpl.java @@ -118,7 +118,7 @@ private static ConsensusConfig buildConsensusConfig() { IMemoryBlock memoryBlock = MEMORY_CONFIG .getConsensusMemoryManager() - .exactAllocate("Consensus", MemoryBlockType.DYNAMIC); + .exactAllocate("Consensus", MemoryBlockType.STATIC); return ConsensusConfig.newBuilder() .setThisNodeId(CONF.getDataNodeId()) .setThisNode(new TEndPoint(CONF.getInternalAddress(), CONF.getDataRegionConsensusPort())) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryManager.java index 73252aa37975..9b589aa8d9fc 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryManager.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryManager.java @@ -46,11 +46,11 @@ public class PipeMemoryManager { PipeConfig.getInstance().getPipeMemoryManagementEnabled(); // TODO @spricoder: consider combine memory block and used MemorySizeInBytes - private IMemoryBlock memoryBlock = + private final IMemoryBlock memoryBlock = IoTDBDescriptor.getInstance() .getMemoryConfig() .getPipeMemoryManager() - .exactAllocate("Stream", MemoryBlockType.DYNAMIC); + .exactAllocate("Stream", MemoryBlockType.STATIC); private static final double EXCEED_PROTECT_THRESHOLD = 0.95; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/partition/PartitionCache.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/partition/PartitionCache.java index 15828538c7ff..e45ab7a5b3b1 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/partition/PartitionCache.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/partition/PartitionCache.java @@ -62,6 +62,7 @@ import com.github.benmanes.caffeine.cache.Cache; import com.github.benmanes.caffeine.cache.Caffeine; +import com.google.common.util.concurrent.AtomicDouble; import org.apache.thrift.TException; import org.apache.tsfile.file.metadata.IDeviceID; import org.slf4j.Logger; @@ -120,12 +121,22 @@ public class PartitionCache { private final CacheMetrics cacheMetrics; private final IMemoryBlock memoryBlock; + private final AtomicDouble memoryUsageCheatFactor = new AtomicDouble(1); public PartitionCache() { this.memoryBlock = memoryConfig .getPartitionCacheMemoryManager() - .exactAllocate("PartitionCache", MemoryBlockType.STATIC); + .exactAllocate("PartitionCache", MemoryBlockType.STATIC) + .setMemoryUpdateCallback( + (oldMemory, newMemory) -> { + memoryUsageCheatFactor.updateAndGet( + factor -> factor / ((double) newMemory / oldMemory)); + logger.debug( + "[MemoryUsageCheatFactor] PartitionCache has updated from {} to {}.", + oldMemory, + newMemory); + }); this.memoryBlock.allocate(this.memoryBlock.getTotalMemorySizeInBytes()); // TODO @spricoder: PartitionCache need to be controlled according to memory this.schemaPartitionCache = diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/DataNodeDevicePathCache.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/DataNodeDevicePathCache.java index 07293bc4ca32..480aee2b1799 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/DataNodeDevicePathCache.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/DataNodeDevicePathCache.java @@ -29,6 +29,7 @@ import com.github.benmanes.caffeine.cache.Cache; import com.github.benmanes.caffeine.cache.Caffeine; import com.github.benmanes.caffeine.cache.Weigher; +import com.google.common.util.concurrent.AtomicDouble; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -39,6 +40,7 @@ public class DataNodeDevicePathCache { private static final DataNodeMemoryConfig memoryConfig = IoTDBDescriptor.getInstance().getMemoryConfig(); private final IMemoryBlock devicePathCacheMemoryBlock; + private final AtomicDouble memoryUsageCheatFactor = new AtomicDouble(1); private final Cache devicePathCache; @@ -46,14 +48,26 @@ private DataNodeDevicePathCache() { devicePathCacheMemoryBlock = memoryConfig .getDevicePathCacheMemoryManager() - .exactAllocate("DevicePathCache", MemoryBlockType.STATIC); + .exactAllocate("DevicePathCache", MemoryBlockType.STATIC) + .setMemoryUpdateCallback( + (oldMemory, newMemory) -> { + memoryUsageCheatFactor.updateAndGet( + factor -> factor / ((double) newMemory / oldMemory)); + LOGGER.debug( + "[MemoryUsageCheatFactor]DataNodeDevicePathCache has updated from {} to {}.", + oldMemory, + newMemory); + }); + ; // TODO @spricoder: later we can find a way to get the byte size of cache devicePathCacheMemoryBlock.allocate(devicePathCacheMemoryBlock.getTotalMemorySizeInBytes()); devicePathCache = Caffeine.newBuilder() .maximumWeight(devicePathCacheMemoryBlock.getTotalMemorySizeInBytes()) .weigher( - (Weigher) (key, val) -> (PartialPath.estimateSize(val) + 32)) + (Weigher) + (key, val) -> + (int) ((PartialPath.estimateSize(val) + 32) * memoryUsageCheatFactor.get())) .build(); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/cache/TableDeviceSchemaCache.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/cache/TableDeviceSchemaCache.java index a724bc427c8f..7ef79e8272f7 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/cache/TableDeviceSchemaCache.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/cache/TableDeviceSchemaCache.java @@ -38,6 +38,7 @@ import org.apache.iotdb.db.schemaengine.schemaregion.SchemaRegion; import org.apache.iotdb.db.schemaengine.table.DataNodeTableCache; +import com.google.common.util.concurrent.AtomicDouble; import org.apache.tsfile.file.metadata.IDeviceID; import org.apache.tsfile.file.metadata.StringArrayDeviceID; import org.apache.tsfile.read.TimeValuePair; @@ -106,20 +107,34 @@ public class TableDeviceSchemaCache { private final ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock(false); private final IMemoryBlock memoryBlock; + private final AtomicDouble memoryUsageCheatFactor = new AtomicDouble(1); private TableDeviceSchemaCache() { memoryBlock = memoryConfig .getSchemaCacheMemoryManager() - .exactAllocate("TableDeviceSchemaCache", MemoryBlockType.STATIC); + .exactAllocate("TableDeviceSchemaCache", MemoryBlockType.STATIC) + .setMemoryUpdateCallback( + (oldMemory, newMemory) -> { + memoryUsageCheatFactor.updateAndGet( + factor -> factor / ((double) newMemory / oldMemory)); + logger.debug( + "[MemoryUsageCheatFactor]TableDeviceSchemaCache has updated from {} to {}.", + oldMemory, + newMemory); + }); dualKeyCache = new DualKeyCacheBuilder() .cacheEvictionPolicy( DualKeyCachePolicy.valueOf(config.getDataNodeSchemaCacheEvictionPolicy())) .memoryCapacity(memoryBlock.getTotalMemorySizeInBytes()) - .firstKeySizeComputer(TableId::estimateSize) - .secondKeySizeComputer(deviceID -> (int) deviceID.ramBytesUsed()) - .valueSizeComputer(TableDeviceCacheEntry::estimateSize) + .firstKeySizeComputer( + tableId -> (int) (tableId.estimateSize() * memoryUsageCheatFactor.get())) + .secondKeySizeComputer( + deviceID -> (int) (deviceID.ramBytesUsed() * memoryUsageCheatFactor.get())) + .valueSizeComputer( + tableDeviceCacheEntry -> + (int) (tableDeviceCacheEntry.estimateSize() * memoryUsageCheatFactor.get())) .build(); memoryBlock.allocate(memoryBlock.getTotalMemorySizeInBytes()); MetricService.getInstance().addMetricSet(new TableDeviceSchemaCacheMetrics(this)); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNode.java index b554eb92419a..24c857eba8d5 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNode.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNode.java @@ -38,6 +38,7 @@ import org.apache.iotdb.commons.exception.IllegalPathException; import org.apache.iotdb.commons.exception.IoTDBException; import org.apache.iotdb.commons.exception.StartupException; +import org.apache.iotdb.commons.memory.MemoryRuntimeController; import org.apache.iotdb.commons.pipe.agent.plugin.meta.PipePluginMeta; import org.apache.iotdb.commons.pipe.config.PipeConfig; import org.apache.iotdb.commons.service.JMXService; @@ -245,6 +246,9 @@ protected void start() { // Serialize mutable system properties IoTDBStartCheck.getInstance().serializeMutableSystemPropertiesIfNecessary(); + // Setup memory controller + registerManager.register(MemoryRuntimeController.getInstance()); + logger.info("IoTDB configuration: {}", config.getConfigMessage()); logger.info("Congratulations, IoTDB DataNode is set up successfully. Now, enjoy yourself!"); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/buffer/BloomFilterCache.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/buffer/BloomFilterCache.java index 7a41352098bb..f3f58a24414c 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/buffer/BloomFilterCache.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/buffer/BloomFilterCache.java @@ -31,6 +31,7 @@ import com.github.benmanes.caffeine.cache.Cache; import com.github.benmanes.caffeine.cache.Caffeine; import com.github.benmanes.caffeine.cache.Weigher; +import com.google.common.util.concurrent.AtomicDouble; import org.apache.tsfile.read.TsFileSequenceReader; import org.apache.tsfile.utils.BloomFilter; import org.apache.tsfile.utils.RamUsageEstimator; @@ -51,32 +52,44 @@ public class BloomFilterCache { private static final Logger DEBUG_LOGGER = LoggerFactory.getLogger("QUERY_DEBUG"); private static final DataNodeMemoryConfig MEMORY_CONFIG = IoTDBDescriptor.getInstance().getMemoryConfig(); - private static final IMemoryBlock CACHE_MEMORY_BLOCK; + private static final IMemoryBlock cacheMemoryBlock; + private static final AtomicDouble memoryUsageCheatFactor = new AtomicDouble(1); private static final boolean CACHE_ENABLE = MEMORY_CONFIG.isMetaDataCacheEnable(); private final AtomicLong entryAverageSize = new AtomicLong(0); private final Cache lruCache; static { - CACHE_MEMORY_BLOCK = + cacheMemoryBlock = MEMORY_CONFIG .getBloomFilterCacheMemoryManager() - .exactAllocate("BloomFilterCache", MemoryBlockType.STATIC); + .exactAllocate("BloomFilterCache", MemoryBlockType.STATIC) + .setMemoryUpdateCallback( + (oldMemory, newMemory) -> { + memoryUsageCheatFactor.updateAndGet( + factor -> factor / ((double) newMemory / oldMemory)); + LOGGER.debug( + "[MemoryUsageCheatFactor]BloomFilterCache has updated from {} to {}.", + oldMemory, + newMemory); + }); // TODO @spricoder: find a way to get the size of the BloomFilterCache - CACHE_MEMORY_BLOCK.allocate(CACHE_MEMORY_BLOCK.getTotalMemorySizeInBytes()); + cacheMemoryBlock.allocate(cacheMemoryBlock.getTotalMemorySizeInBytes()); } private BloomFilterCache() { if (CACHE_ENABLE) { - LOGGER.info("BloomFilterCache size = {}", CACHE_MEMORY_BLOCK.getTotalMemorySizeInBytes()); + LOGGER.info("BloomFilterCache size = {}", cacheMemoryBlock.getTotalMemorySizeInBytes()); } lruCache = Caffeine.newBuilder() - .maximumWeight(CACHE_MEMORY_BLOCK.getTotalMemorySizeInBytes()) + .maximumWeight(cacheMemoryBlock.getTotalMemorySizeInBytes()) .weigher( (Weigher) (key, bloomFilter) -> - (int) (key.getRetainedSizeInBytes() + bloomFilter.getRetainedSizeInBytes())) + (int) + ((key.getRetainedSizeInBytes() + bloomFilter.getRetainedSizeInBytes()) + * memoryUsageCheatFactor.get())) .recordStats() .build(); } @@ -131,7 +144,7 @@ public long getEvictionCount() { } public long getMaxMemory() { - return CACHE_MEMORY_BLOCK.getTotalMemorySizeInBytes(); + return cacheMemoryBlock.getTotalMemorySizeInBytes(); } public double getAverageLoadPenalty() { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/buffer/ChunkCache.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/buffer/ChunkCache.java index 1c39b993a3c4..f84a1da7fe78 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/buffer/ChunkCache.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/buffer/ChunkCache.java @@ -35,6 +35,7 @@ import com.github.benmanes.caffeine.cache.Cache; import com.github.benmanes.caffeine.cache.Caffeine; import com.github.benmanes.caffeine.cache.Weigher; +import com.google.common.util.concurrent.AtomicDouble; import org.apache.tsfile.file.metadata.statistics.Statistics; import org.apache.tsfile.read.TsFileSequenceReader; import org.apache.tsfile.read.common.Chunk; @@ -63,7 +64,8 @@ public class ChunkCache { private static final Logger DEBUG_LOGGER = LoggerFactory.getLogger("QUERY_DEBUG"); private static final DataNodeMemoryConfig MEMORY_CONFIG = IoTDBDescriptor.getInstance().getMemoryConfig(); - private static final IMemoryBlock CACHE_MEMORY_BLOCK; + private static final IMemoryBlock cacheMemoryBlock; + private static final AtomicDouble memoryUsageCheatFactor = new AtomicDouble(1); private static final boolean CACHE_ENABLE = MEMORY_CONFIG.isMetaDataCacheEnable(); private static final SeriesScanCostMetricSet SERIES_SCAN_COST_METRIC_SET = @@ -73,25 +75,37 @@ public class ChunkCache { private final Cache lruCache; static { - CACHE_MEMORY_BLOCK = + cacheMemoryBlock = MEMORY_CONFIG .getChunkCacheMemoryManager() - .exactAllocate("ChunkCache", MemoryBlockType.STATIC); + .exactAllocate("ChunkCache", MemoryBlockType.STATIC) + .setMemoryUpdateCallback( + (oldMemory, newMemory) -> { + memoryUsageCheatFactor.updateAndGet( + factor -> factor / ((double) newMemory / oldMemory)); + LOGGER.info( + "[MemoryUsageCheatFactor]ChunkCache has updated from {} to {}.", + oldMemory, + newMemory); + }); + ; // TODO @spricoder: find a way to get the size of the ChunkCache - CACHE_MEMORY_BLOCK.allocate(CACHE_MEMORY_BLOCK.getTotalMemorySizeInBytes()); + cacheMemoryBlock.allocate(cacheMemoryBlock.getTotalMemorySizeInBytes()); } private ChunkCache() { if (CACHE_ENABLE) { - LOGGER.info("ChunkCache size = {}", CACHE_MEMORY_BLOCK.getTotalMemorySizeInBytes()); + LOGGER.info("ChunkCache size = {}", cacheMemoryBlock.getTotalMemorySizeInBytes()); } lruCache = Caffeine.newBuilder() - .maximumWeight(CACHE_MEMORY_BLOCK.getTotalMemorySizeInBytes()) + .maximumWeight(cacheMemoryBlock.getTotalMemorySizeInBytes()) .weigher( (Weigher) (key, chunk) -> - (int) (key.getRetainedSizeInBytes() + chunk.getRetainedSizeInBytes())) + (int) + ((key.getRetainedSizeInBytes() + chunk.getRetainedSizeInBytes()) + * memoryUsageCheatFactor.get())) .recordStats() .build(); @@ -202,7 +216,7 @@ public long getEvictionCount() { } public long getMaxMemory() { - return CACHE_MEMORY_BLOCK.getTotalMemorySizeInBytes(); + return cacheMemoryBlock.getTotalMemorySizeInBytes(); } public double getAverageLoadPenalty() { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/buffer/TimeSeriesMetadataCache.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/buffer/TimeSeriesMetadataCache.java index f2cd55c5e7d0..822f18eaec1d 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/buffer/TimeSeriesMetadataCache.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/buffer/TimeSeriesMetadataCache.java @@ -34,6 +34,7 @@ import com.github.benmanes.caffeine.cache.Cache; import com.github.benmanes.caffeine.cache.Caffeine; import com.github.benmanes.caffeine.cache.Weigher; +import com.google.common.util.concurrent.AtomicDouble; import org.apache.tsfile.common.constant.TsFileConstant; import org.apache.tsfile.file.metadata.IDeviceID; import org.apache.tsfile.file.metadata.TimeseriesMetadata; @@ -67,7 +68,8 @@ public class TimeSeriesMetadataCache { private static final Logger DEBUG_LOGGER = LoggerFactory.getLogger("QUERY_DEBUG"); private static final DataNodeMemoryConfig memoryConfig = IoTDBDescriptor.getInstance().getMemoryConfig(); - private static final IMemoryBlock CACHE_MEMORY_BLOCK; + private static final IMemoryBlock cacheMemoryBlock; + private static final AtomicDouble memoryUsageCheatFactor = new AtomicDouble(1); private static final boolean CACHE_ENABLE = memoryConfig.isMetaDataCacheEnable(); private final Cache lruCache; @@ -79,26 +81,37 @@ public class TimeSeriesMetadataCache { private static final String SEPARATOR = "$"; static { - CACHE_MEMORY_BLOCK = + cacheMemoryBlock = memoryConfig .getTimeSeriesMetaDataCacheMemoryManager() - .exactAllocate("TimeSeriesMetadataCache", MemoryBlockType.STATIC); + .exactAllocate("TimeSeriesMetadataCache", MemoryBlockType.STATIC) + .setMemoryUpdateCallback( + (oldMemory, newMemory) -> { + memoryUsageCheatFactor.updateAndGet( + factor -> factor / ((double) newMemory / oldMemory)); + logger.info( + "[MemoryUsageCheatFactor]TimeSeriesMetadataCache has updated from {} to {}.", + oldMemory, + newMemory); + }); // TODO @spricoder find a better way to get the size of cache - CACHE_MEMORY_BLOCK.allocate(CACHE_MEMORY_BLOCK.getTotalMemorySizeInBytes()); + cacheMemoryBlock.allocate(cacheMemoryBlock.getTotalMemorySizeInBytes()); } private TimeSeriesMetadataCache() { if (CACHE_ENABLE) { logger.info( - "TimeSeriesMetadataCache size = {}", CACHE_MEMORY_BLOCK.getTotalMemorySizeInBytes()); + "TimeSeriesMetadataCache size = {}", cacheMemoryBlock.getTotalMemorySizeInBytes()); } lruCache = Caffeine.newBuilder() - .maximumWeight(CACHE_MEMORY_BLOCK.getTotalMemorySizeInBytes()) + .maximumWeight(cacheMemoryBlock.getTotalMemorySizeInBytes()) .weigher( (Weigher) (key, value) -> - (int) (key.getRetainedSizeInBytes() + value.getRetainedSizeInBytes())) + (int) + ((key.getRetainedSizeInBytes() + value.getRetainedSizeInBytes()) + * memoryUsageCheatFactor.get())) .recordStats() .build(); // add metrics @@ -265,7 +278,7 @@ public long getEvictionCount() { } public long getMaxMemory() { - return CACHE_MEMORY_BLOCK.getTotalMemorySizeInBytes(); + return cacheMemoryBlock.getTotalMemorySizeInBytes(); } public double getAverageLoadPenalty() { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegionInfo.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegionInfo.java index 0e8e4902c8b1..54e7dfda8b6f 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegionInfo.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegionInfo.java @@ -38,6 +38,7 @@ public class DataRegionInfo { */ private final AtomicLong memoryCost; + // TODO @spricoder dynamic threshold /** The threshold of reporting it's size to SystemInfo */ private final long storageGroupSizeReportThreshold = (long) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/impl/RewriteCrossSpaceCompactionSelector.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/impl/RewriteCrossSpaceCompactionSelector.java index ffad8a487da8..562180212a3f 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/impl/RewriteCrossSpaceCompactionSelector.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/impl/RewriteCrossSpaceCompactionSelector.java @@ -89,6 +89,7 @@ public RewriteCrossSpaceCompactionSelector( this.dataRegionId = dataRegionId; this.timePartition = timePartition; this.tsFileManager = tsFileManager; + // TODO @spricoder update this compaction param this.memoryBudget = (long) ((double) SystemInfo.getInstance().getMemorySizeForCompaction() diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/CompactionSchedulerTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/CompactionSchedulerTest.java index 3d914cbeb3ed..ba5807466af4 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/CompactionSchedulerTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/CompactionSchedulerTest.java @@ -29,6 +29,7 @@ import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer.constant.CrossCompactionPerformer; import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer.constant.InnerSeqCompactionPerformer; import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer.constant.InnerUnseqCompactionPerformer; +import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.executor.batch.utils.BatchCompactionPlan; import org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.CompactionScheduler; import org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.CompactionTaskManager; import org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.constant.CompactionPriority; @@ -42,6 +43,7 @@ import org.apache.iotdb.db.utils.EnvironmentUtils; import org.apache.iotdb.db.utils.constant.TestConstant; +import org.apache.tsfile.common.conf.TSFileDescriptor; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -670,6 +672,13 @@ public void test5() throws IOException, MetadataException, InterruptedException */ @Test public void test6() throws IOException, MetadataException, InterruptedException { + logger.error( + "Total:" + + Runtime.getRuntime().totalMemory() + + ", Max:" + + Runtime.getRuntime().maxMemory()); + BatchCompactionPlan.setMaxCachedTimeChunksSize(1024); + TSFileDescriptor.getInstance().getConfig().setPageSizeInByte(100); logger.warn("Running test6"); boolean prevEnableSeqSpaceCompaction = IoTDBDescriptor.getInstance().getConfig().isEnableSeqSpaceCompaction(); diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/CrossSpaceCompactionWithUnusualCasesTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/CrossSpaceCompactionWithUnusualCasesTest.java index 260068805fa9..76c8ec3400b8 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/CrossSpaceCompactionWithUnusualCasesTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/CrossSpaceCompactionWithUnusualCasesTest.java @@ -25,6 +25,7 @@ import org.apache.iotdb.commons.path.PartialPath; import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.exception.StorageEngineException; +import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.executor.batch.utils.BatchCompactionPlan; import org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.CompactionScheduleContext; import org.apache.iotdb.db.storageengine.dataregion.compaction.selector.impl.RewriteCrossSpaceCompactionSelector; import org.apache.iotdb.db.storageengine.dataregion.compaction.selector.utils.CrossCompactionTaskResource; @@ -335,6 +336,8 @@ public void testUnSeqFileOverlapWithSeqFilesButOneDeviceNotExistInOverlapSeqFile @Test public void testMultiUnSeqFileOverlapWithSeqFilesButOneDeviceNotExistInOverlapSeqFiles() throws IOException, IllegalPathException { + BatchCompactionPlan.setMaxCachedTimeChunksSize(2 * 1024 * 1024 / 20); + TSFileDescriptor.getInstance().getConfig().setPageSizeInByte(1024); // seq file 1 // device: d1, time: [150, 400] TsFileResource seqTsFileResource1 = createEmptyFileAndResource(true); diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/cross/CrossSpaceCompactionSelectorTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/cross/CrossSpaceCompactionSelectorTest.java index a17d52e75f88..1b7eea0ff69e 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/cross/CrossSpaceCompactionSelectorTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/cross/CrossSpaceCompactionSelectorTest.java @@ -25,6 +25,7 @@ import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.exception.FileCannotTransitToCompactingException; import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.task.AbstractCompactionTask; import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.task.CrossSpaceCompactionTask; +import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.executor.batch.utils.BatchCompactionPlan; import org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.CompactionScheduleContext; import org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.CompactionTaskQueue; import org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.comparator.DefaultCompactionTaskComparatorImpl; @@ -37,6 +38,7 @@ import org.apache.iotdb.db.storageengine.rescon.memory.SystemInfo; import org.apache.iotdb.db.utils.datastructure.FixedPriorityBlockingQueue; +import org.apache.tsfile.common.conf.TSFileDescriptor; import org.apache.tsfile.exception.write.WriteProcessException; import org.junit.After; import org.junit.Assert; @@ -117,6 +119,8 @@ public void testSelectWithClosedSeqFileAndUnOverlapUnseqFile() @Test public void testSelectWithClosedSeqFileAndUncloseSeqFile() throws IOException, MetadataException, WriteProcessException { + BatchCompactionPlan.setMaxCachedTimeChunksSize(2 * 1024 * 1024 / 20); + TSFileDescriptor.getInstance().getConfig().setPageSizeInByte(1024); createFiles(2, 2, 3, 50, 0, 10000, 50, 50, false, true); createFiles(5, 2, 3, 50, 0, 10000, 50, 50, false, false); seqResources.get(1).setStatusForTest(TsFileResourceStatus.UNCLOSED); @@ -149,6 +153,8 @@ public void testSelectWithClosedSeqFileAndUncloseSeqFile() @Test public void testSelectWithMultiUnseqFilesOverlapWithOneSeqFile() throws IOException, MetadataException, WriteProcessException { + BatchCompactionPlan.setMaxCachedTimeChunksSize(2 * 1024 * 1024 / 20); + TSFileDescriptor.getInstance().getConfig().setPageSizeInByte(1024); createFiles(3, 2, 3, 50, 0, 10000, 50, 50, false, true); createFiles(1, 2, 3, 50, 0, 10000, 50, 50, false, false); createFiles(1, 2, 3, 50, 0, 10000, 50, 50, false, false); @@ -727,6 +733,7 @@ public void testSeqFileWithFileIndexBeenDeletedDuringSelectionAndAfterCopyingLis @Test public void testSeqFileWithFileIndexBeenDeletedDuringSelectionAndBeforeSettingCandidate() throws IOException, MetadataException, WriteProcessException, InterruptedException { + BatchCompactionPlan.setMaxCachedTimeChunksSize(2 * 1024 * 1024 / 10); createFiles(5, 2, 3, 50, 0, 10000, 50, 50, false, true); createFiles(5, 2, 3, 50, 0, 10000, 50, 50, false, false); tsFileManager.addAll(seqResources, true); diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java index 894a28487fa5..205fa6d4269f 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java @@ -386,6 +386,10 @@ public class CommonConfig { private volatile Pattern trustedUriPattern = Pattern.compile("file:.*"); + // Whtether to enable memory adaption + private boolean enableMemoryAdapt = false; + private long memoryAdaptIntervalInS = 20; + CommonConfig() { // Empty constructor } @@ -2266,4 +2270,20 @@ public Pattern getTrustedUriPattern() { public void setTrustedUriPattern(Pattern trustedUriPattern) { this.trustedUriPattern = trustedUriPattern; } + + public long getMemoryAdaptIntervalInS() { + return memoryAdaptIntervalInS; + } + + public void setMemoryAdaptIntervalInS(long memoryAdaptIntervalInS) { + this.memoryAdaptIntervalInS = memoryAdaptIntervalInS; + } + + public boolean isEnableMemoryAdapt() { + return enableMemoryAdapt; + } + + public void setEnableMemoryAdapt(boolean enableMemoryAdapt) { + this.enableMemoryAdapt = enableMemoryAdapt; + } } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java index ab9348f9de34..3fc610148860 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java @@ -250,6 +250,14 @@ public void loadCommonProps(TrimProperties properties) throws IOException { properties.getProperty( "cluster_device_limit_threshold", String.valueOf(config.getDeviceLimitThreshold())))); + config.setEnableMemoryAdapt( + Boolean.parseBoolean( + properties.getProperty( + "enable_memory_adapt", Boolean.toString(config.isEnableMemoryAdapt())))); + config.setMemoryAdaptIntervalInS( + Integer.parseInt( + properties.getProperty( + "memory_check_interval", String.valueOf(config.getMemoryAdaptIntervalInS())))); loadRetryProperties(properties); loadBinaryAllocatorProps(properties); diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/memory/AtomicLongMemoryBlock.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/memory/AtomicLongMemoryBlock.java index 1376d8ddb6d6..1fb4e0f17f71 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/memory/AtomicLongMemoryBlock.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/memory/AtomicLongMemoryBlock.java @@ -52,45 +52,45 @@ public AtomicLongMemoryBlock( } @Override - public long forceAllocateWithoutLimitation(long sizeInByte) { - return usedMemoryInBytes.addAndGet(sizeInByte); + public long forceAllocateWithoutLimitation(long sizeInBytes) { + return usedMemoryInBytes.addAndGet(sizeInBytes); } @Override - public boolean allocate(long sizeInByte) { + public boolean allocate(long sizeInBytes) { AtomicBoolean result = new AtomicBoolean(false); usedMemoryInBytes.updateAndGet( memCost -> { - if (memCost + sizeInByte > totalMemorySizeInBytes) { + if (memCost + sizeInBytes > totalMemorySizeInBytes) { return memCost; } result.set(true); - return memCost + sizeInByte; + return memCost + sizeInBytes; }); return result.get(); } @Override - public boolean allocateIfSufficient(final long sizeInByte, final double maxRatio) { + public boolean allocateIfSufficient(final long sizeInBytes, final double maxRatio) { AtomicBoolean result = new AtomicBoolean(false); usedMemoryInBytes.updateAndGet( memCost -> { - if (memCost + sizeInByte > totalMemorySizeInBytes * maxRatio) { + if (memCost + sizeInBytes > totalMemorySizeInBytes * maxRatio) { return memCost; } result.set(true); - return memCost + sizeInByte; + return memCost + sizeInBytes; }); return result.get(); } @Override - public boolean allocateUntilAvailable(long sizeInByte, long retryIntervalInMillis) + public boolean allocateUntilAvailable(long sizeInBytes, long retryIntervalInMillis) throws InterruptedException { long originSize = usedMemoryInBytes.get(); while (true) { - boolean canUpdate = originSize + sizeInByte <= totalMemorySizeInBytes; - if (canUpdate && usedMemoryInBytes.compareAndSet(originSize, originSize + sizeInByte)) { + boolean canUpdate = originSize + sizeInBytes <= totalMemorySizeInBytes; + if (canUpdate && usedMemoryInBytes.compareAndSet(originSize, originSize + sizeInBytes)) { break; } Thread.sleep(TimeUnit.MILLISECONDS.toMillis(retryIntervalInMillis)); @@ -100,16 +100,16 @@ public boolean allocateUntilAvailable(long sizeInByte, long retryIntervalInMilli } @Override - public long release(long sizeInByte) { + public long release(long sizeInBytes) { return usedMemoryInBytes.updateAndGet( memCost -> { - if (sizeInByte > memCost) { + if (sizeInBytes > memCost) { LOGGER.warn( "The memory cost to be released is larger than the memory cost of memory block {}", this); return 0; } - return memCost - sizeInByte; + return memCost - sizeInBytes; }); } @@ -118,11 +118,12 @@ public void setUsedMemoryInBytes(long usedMemoryInBytes) { this.usedMemoryInBytes.set(usedMemoryInBytes); } + @Override public long getUsedMemoryInBytes() { return usedMemoryInBytes.get(); } - /** Get the free memory in byte of this memory block */ + @Override public long getFreeMemoryInBytes() { return totalMemorySizeInBytes - usedMemoryInBytes.get(); } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/memory/IMemoryBlock.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/memory/IMemoryBlock.java index 416516bf2c7e..5d3bf164dfc1 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/memory/IMemoryBlock.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/memory/IMemoryBlock.java @@ -22,6 +22,9 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.BiConsumer; + public abstract class IMemoryBlock implements AutoCloseable { private static final Logger LOGGER = LoggerFactory.getLogger(IMemoryBlock.class); @@ -40,46 +43,50 @@ public abstract class IMemoryBlock implements AutoCloseable { /** The total memory size in byte of this memory block */ protected long totalMemorySizeInBytes; + /** Update related parameters when totalMemorySizeInBytes update */ + protected final AtomicReference> memoryUpdateCallback = + new AtomicReference<>(); + /** * Forcibly allocate memory without the limit of totalMemorySizeInBytes * - * @param sizeInByte the size of memory to be allocated, should be positive + * @param sizeInBytes the size of memory to be allocated, should be positive * @return the number of bytes actually allocated */ - public abstract long forceAllocateWithoutLimitation(final long sizeInByte); + public abstract long forceAllocateWithoutLimitation(final long sizeInBytes); /** * Allocate memory managed by this memory block * - * @param sizeInByte the size of memory to be allocated, should be positive + * @param sizeInBytes the size of memory to be allocated, should be positive */ - public abstract boolean allocate(final long sizeInByte); + public abstract boolean allocate(final long sizeInBytes); /** * Allocate memory managed by this memory block. if the currently used ratio is already above - * maxRatio, the allocation will fail". + * maxRatio, the allocation will fail. * - * @param sizeInByte the size of memory to be allocated, should be positive + * @param sizeInBytes the size of memory to be allocated, should be positive * @param maxRatio the maximum ratio of memory can be allocated */ - public abstract boolean allocateIfSufficient(final long sizeInByte, final double maxRatio); + public abstract boolean allocateIfSufficient(final long sizeInBytes, final double maxRatio); /** * Allocate memory managed by this memory block until the required memory is available * - * @param sizeInByte the size of memory to be allocated, should be positive + * @param sizeInBytes the size of memory to be allocated, should be positive * @param retryIntervalInMillis the time interval to wait after each allocation failure */ - public abstract boolean allocateUntilAvailable(final long sizeInByte, long retryIntervalInMillis) + public abstract boolean allocateUntilAvailable(final long sizeInBytes, long retryIntervalInMillis) throws InterruptedException; /** * Try to release memory managed by this memory block * - * @param sizeInByte the size of memory to be released, should be positive + * @param sizeInBytes the size of memory to be released, should be positive * @return the used size after release, zero if the release fails */ - public abstract long release(final long sizeInByte); + public abstract long release(final long sizeInBytes); /** * Try to set memory usage in byte of this memory block (for test only) @@ -99,6 +106,25 @@ public String getName() { return name; } + /** Update total memory size by ratio */ + public long resizeByRatio(double ratio) { + long before = this.totalMemorySizeInBytes; + this.totalMemorySizeInBytes = (long) (before * ratio); + if (memoryUpdateCallback.get() != null) { + try { + memoryUpdateCallback.get().accept(before, totalMemorySizeInBytes); + } catch (Exception e) { + LOGGER.warn("Failed to execute the update callback.", e); + } + } + return this.totalMemorySizeInBytes - before; + } + + public IMemoryBlock setMemoryUpdateCallback(BiConsumer memoryUpdateCallback) { + this.memoryUpdateCallback.set(memoryUpdateCallback); + return this; + } + /** Update maximum memory size in byte of this memory block */ public void setTotalMemorySizeInBytes(final long totalMemorySizeInBytes) { this.totalMemorySizeInBytes = totalMemorySizeInBytes; diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/memory/MemoryManager.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/memory/MemoryManager.java index 03fb10d393a0..072a45daac69 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/memory/MemoryManager.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/memory/MemoryManager.java @@ -26,6 +26,8 @@ import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.BiConsumer; import java.util.function.LongUnaryOperator; public class MemoryManager { @@ -46,10 +48,13 @@ public class MemoryManager { /** Whether memory management is enabled */ private final boolean enabled; - /** The total memory size in byte of memory manager */ + /** The memory size of this memory manager allocated by parent memory manager */ + private volatile long initialAllocatedMemorySizeInBytes = 0L; + + /** The max memory size of this memory manager */ private volatile long totalMemorySizeInBytes; - /** The allocated memory size */ + /** The allocated memory size of this memory manager */ private volatile long allocatedMemorySizeInBytes = 0L; /** The parent memory manager */ @@ -61,10 +66,15 @@ public class MemoryManager { /** The allocated memory blocks of this memory manager */ private final Map allocatedMemoryBlocks = new ConcurrentHashMap<>(); + /** Update related parameters when totalMemorySizeInBytes update */ + private final AtomicReference> memoryUpdateCallback = + new AtomicReference<>(); + @TestOnly public MemoryManager(long totalMemorySizeInBytes) { this.name = "Test"; this.parentMemoryManager = null; + this.initialAllocatedMemorySizeInBytes = totalMemorySizeInBytes; this.totalMemorySizeInBytes = totalMemorySizeInBytes; this.enabled = false; } @@ -72,6 +82,7 @@ public MemoryManager(long totalMemorySizeInBytes) { MemoryManager(String name, MemoryManager parentMemoryManager, long totalMemorySizeInBytes) { this.name = name; this.parentMemoryManager = parentMemoryManager; + this.initialAllocatedMemorySizeInBytes = totalMemorySizeInBytes; this.totalMemorySizeInBytes = totalMemorySizeInBytes; this.enabled = false; } @@ -83,6 +94,7 @@ private MemoryManager( boolean enabled) { this.name = name; this.parentMemoryManager = parentMemoryManager; + this.initialAllocatedMemorySizeInBytes = totalMemorySizeInBytes; this.totalMemorySizeInBytes = totalMemorySizeInBytes; this.enabled = enabled; } @@ -356,24 +368,6 @@ public synchronized MemoryManager getOrCreateMemoryManager( return getOrCreateMemoryManager(name, totalMemorySizeInBytes, false); } - /** - * Re-allocate memory according to ratio - * - * @param ratio the ratio of new total memory size to old total memory size - */ - private void reAllocateMemoryAccordingToRatio(double ratio) { - // first increase the total memory size of this memory manager - this.totalMemorySizeInBytes *= ratio; - // then re-allocate memory for all memory blocks - for (IMemoryBlock block : allocatedMemoryBlocks.values()) { - block.setTotalMemorySizeInBytes((long) (block.getTotalMemorySizeInBytes() * ratio)); - } - // finally re-allocate memory for all child memory managers - for (Map.Entry entry : children.entrySet()) { - entry.getValue().reAllocateMemoryAccordingToRatio(ratio); - } - } - /** * Get the memory manager with specified names in levels * @@ -458,12 +452,13 @@ public long getTotalMemorySizeInBytes() { return totalMemorySizeInBytes; } + @TestOnly public void setTotalMemorySizeInBytes(long totalMemorySizeInBytes) { this.totalMemorySizeInBytes = totalMemorySizeInBytes; } - public void setTotalMemorySizeInBytesWithReload(long totalMemorySizeInBytes) { - reAllocateMemoryAccordingToRatio((double) totalMemorySizeInBytes / this.totalMemorySizeInBytes); + public long getInitialAllocatedMemorySizeInBytes() { + return initialAllocatedMemorySizeInBytes; } /** Get available memory size in bytes of memory manager */ @@ -486,8 +481,45 @@ public long getUsedMemorySizeInBytes() { return memorySize; } + public MemoryManager setMemoryUpdateCallback(BiConsumer memoryUpdateCallback) { + this.memoryUpdateCallback.set(memoryUpdateCallback); + return this; + } + // endregion + /** + * Resize memory by ratio + * + * @param ratio the ratio to resize memory, values [0.0, 1.0] + */ + public synchronized void resizeByRatio(double ratio) { + // Update initial allocated memory size by ratio + long beforeInitialAllocatedMemorySizeInBytes = this.initialAllocatedMemorySizeInBytes; + this.initialAllocatedMemorySizeInBytes *= ratio; + // Update total memory size by actual size + long beforeTotalMemorySizeInBytes = this.totalMemorySizeInBytes; + this.totalMemorySizeInBytes += + (this.initialAllocatedMemorySizeInBytes - beforeInitialAllocatedMemorySizeInBytes); + // Get actual ratio of re-allocate memory size + double actualRatio = (double) this.totalMemorySizeInBytes / beforeTotalMemorySizeInBytes; + // Re-allocate memory for all memory blocks + for (IMemoryBlock block : allocatedMemoryBlocks.values()) { + this.allocatedMemorySizeInBytes += block.resizeByRatio(actualRatio); + } + // Re-allocate memory for all child memory managers + for (Map.Entry entry : children.entrySet()) { + entry.getValue().resizeByRatio(ratio); + } + if (memoryUpdateCallback.get() != null) { + try { + memoryUpdateCallback.get().accept(beforeTotalMemorySizeInBytes, totalMemorySizeInBytes); + } catch (Exception e) { + LOGGER.warn("Failed to execute the update callback.", e); + } + } + } + @Override public String toString() { return "MemoryManager{" diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/memory/MemoryPeriodicalJobExecutor.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/memory/MemoryPeriodicalJobExecutor.java new file mode 100644 index 000000000000..59662dc5a293 --- /dev/null +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/memory/MemoryPeriodicalJobExecutor.java @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.commons.memory; + +import org.apache.iotdb.commons.concurrent.WrappedRunnable; +import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil; +import org.apache.iotdb.commons.utils.TestOnly; + +import org.apache.tsfile.utils.Pair; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.Future; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +public class MemoryPeriodicalJobExecutor { + private static final Logger LOGGER = LoggerFactory.getLogger(MemoryPeriodicalJobExecutor.class); + + private final ScheduledExecutorService executorService; + private final long minIntervalSeconds; + + private long rounds; + private Future executorFuture; + + private final List> periodicalJobs = new CopyOnWriteArrayList<>(); + + public MemoryPeriodicalJobExecutor( + final ScheduledExecutorService executorService, final long minIntervalSeconds) { + this.executorService = executorService; + this.minIntervalSeconds = minIntervalSeconds; + } + + public void register(String id, Runnable periodicalJob, long intervalInSeconds) { + periodicalJobs.add( + new Pair<>( + new WrappedRunnable() { + @Override + public void runMayThrow() { + try { + periodicalJob.run(); + } catch (Exception e) { + LOGGER.warn("Periodical job {} failed.", id, e); + } + } + }, + Math.max(intervalInSeconds / minIntervalSeconds, 1))); + LOGGER.info( + "Memory periodical job {} is registered successfully. Interval: {} seconds.", + id, + Math.max(intervalInSeconds / minIntervalSeconds, 1) * minIntervalSeconds); + } + + public synchronized void start() { + if (executorFuture == null) { + rounds = 0; + + executorFuture = + ScheduledExecutorUtil.safelyScheduleWithFixedDelay( + executorService, + this::execute, + minIntervalSeconds, + minIntervalSeconds, + TimeUnit.SECONDS); + LOGGER.info("Memory periodical job executor is started successfully."); + } + } + + protected void execute() { + ++rounds; + + for (final Pair periodicalJob : periodicalJobs) { + if (rounds % periodicalJob.right == 0) { + periodicalJob.left.run(); + } + } + } + + public synchronized void stop() { + if (executorFuture != null) { + executorFuture.cancel(false); + executorFuture = null; + LOGGER.info("Memory periodical job executor is stopped successfully."); + } + } + + @TestOnly + public void clear() { + periodicalJobs.clear(); + LOGGER.info("All memory periodical jobs are cleared successfully."); + } +} diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/memory/MemoryRuntimeController.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/memory/MemoryRuntimeController.java new file mode 100644 index 000000000000..89b1976b00c2 --- /dev/null +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/memory/MemoryRuntimeController.java @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.commons.memory; + +import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory; +import org.apache.iotdb.commons.concurrent.ThreadName; +import org.apache.iotdb.commons.conf.CommonConfig; +import org.apache.iotdb.commons.conf.CommonDescriptor; +import org.apache.iotdb.commons.exception.StartupException; +import org.apache.iotdb.commons.service.IService; +import org.apache.iotdb.commons.service.ServiceType; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.atomic.AtomicBoolean; + +public class MemoryRuntimeController implements IService { + private static final Logger LOGGER = LoggerFactory.getLogger(MemoryRuntimeController.class); + private static final CommonConfig CONFIG = CommonDescriptor.getInstance().getConfig(); + private static final MemoryManager ON_HEAP_MEMORY_MANAGER = + MemoryConfig.global().getMemoryManager("OnHeap"); + private static final boolean ENABLE_MEMORY_ADAPT = CONFIG.isEnableMemoryAdapt(); + private static final long MEMORY_ADAPT_INTERVAL_IN_S = CONFIG.getMemoryAdaptIntervalInS(); + + private static final AtomicBoolean isShutdown = new AtomicBoolean(false); + + private static final MemoryPeriodicalJobExecutor memoryPeriodicalJobExecutor = + new MemoryPeriodicalJobExecutor( + IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor( + ThreadName.MEMORY_PERIODICAL_JOB_EXECUTOR.getName()), + MEMORY_ADAPT_INTERVAL_IN_S); + + @Override + public void start() throws StartupException { + memoryPeriodicalJobExecutor.start(); + + // Try to adapt total memory size according to the JVM total memory size + if (ENABLE_MEMORY_ADAPT) { + LOGGER.info( + "Enable automatic memory adapt with an interval of {} s", MEMORY_ADAPT_INTERVAL_IN_S); + MemoryRuntimeController.getInstance() + .registerPeriodicalJob( + "MemoryRuntimeAgent#adaptTotalMemory()", + this::adaptTotalMemory, + MEMORY_ADAPT_INTERVAL_IN_S); + } + + isShutdown.set(false); + } + + private void adaptTotalMemory() { + long totalMemory = Runtime.getRuntime().totalMemory(); + if (ON_HEAP_MEMORY_MANAGER != null) { + long originMemorySize = ON_HEAP_MEMORY_MANAGER.getInitialAllocatedMemorySizeInBytes(); + if (originMemorySize != totalMemory) { + LOGGER.info("Total memory size changed from {} to {}", originMemorySize, totalMemory); + ON_HEAP_MEMORY_MANAGER.resizeByRatio((double) totalMemory / originMemorySize); + } + } + } + + @Override + public void stop() { + if (isShutdown.get()) { + return; + } + isShutdown.set(true); + + memoryPeriodicalJobExecutor.stop(); + } + + public void registerPeriodicalJob(String id, Runnable periodicalJob, long intervalInSeconds) { + memoryPeriodicalJobExecutor.register(id, periodicalJob, intervalInSeconds); + } + + @Override + public ServiceType getID() { + return ServiceType.MEMORY_RUNTIME_CONTROLLER; + } + + private static class MemoryRuntimeAgentHolder { + private static final MemoryRuntimeController HANDLE = new MemoryRuntimeController(); + } + + public static MemoryRuntimeController getInstance() { + return MemoryRuntimeAgentHolder.HANDLE; + } +} diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/ServiceType.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/ServiceType.java index 7267c79a6655..098891c2937c 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/ServiceType.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/ServiceType.java @@ -80,7 +80,7 @@ public enum ServiceType { PIPE_PLUGIN_CLASSLOADER_MANAGER_SERVICE( "Pipe Plugin Classloader Manager Service", "PipePluginClassLoader"), AINode_RPC_SERVICE("Rpc Service for AINode", "AINodeRPCService"), - MEMORY_RUNTIME_AGENT("Memory Runtime Agent", "MemoryRuntimeAgent"), + MEMORY_RUNTIME_CONTROLLER("Memory Runtime Controller", "MemoryRuntimeController"), PIPE_RUNTIME_DATA_NODE_AGENT("Pipe Runtime Data Node Agent", "PipeRuntimeDataNodeAgent"), PIPE_RUNTIME_CONFIG_NODE_AGENT("Pipe Runtime Config Node Agent", "PipeRuntimeConfigNodeAgent"), SUBSCRIPTION_RUNTIME_AGENT("Subscription Runtime Agent", "SubscriptionRuntimeAgent"),