Skip to content

Commit

Permalink
fix(s3stream): add compaction max throttle rate limit (#941)
Browse files Browse the repository at this point in the history
Signed-off-by: Shichao Nie <niesc@automq.com>
SCNieh authored Feb 22, 2024
1 parent f7a57c6 commit 0123f90
Showing 2 changed files with 15 additions and 7 deletions.
2 changes: 1 addition & 1 deletion s3stream/src/main/java/com/automq/stream/s3/Config.java
Original file line number Diff line number Diff line change
@@ -45,7 +45,7 @@ public class Config {
private long streamSetObjectCompactionStreamSplitSize = 16 * 1024 * 1024;
private int streamSetObjectCompactionForceSplitPeriod = 120;
private int streamSetObjectCompactionMaxObjectNum = 500;
private int maxStreamNumPerStreamSetObject = 10000;
private int maxStreamNumPerStreamSetObject = 100000;
private int maxStreamObjectNumPerCommit = 10000;
private boolean mockEnable = false;
private boolean objectLogEnable = false;
Original file line number Diff line number Diff line change
@@ -59,6 +59,8 @@

public class CompactionManager {
private static final int MIN_COMPACTION_DELAY_MS = 60000;
// Max refill rate for Bucket: 1 token per nanosecond
private static final int MAX_THROTTLE_BYTES_PER_SEC = 1000000000;
private final Logger logger;
private final Logger s3ObjectLogger;
private final ObjectManager objectManager;
@@ -209,13 +211,19 @@ private void compact(List<StreamMetadata> streamMetadataList,
long expectReadBytesPerSec;
if (expectCompleteTime > 0) {
expectReadBytesPerSec = totalSize / expectCompleteTime / 60;
compactionBucket = Bucket.builder().addLimit(limit -> limit
.capacity(expectReadBytesPerSec)
.refillIntervally(expectReadBytesPerSec, Duration.ofSeconds(1))).build();
logger.info("Throttle compaction read to {} bytes/s, expect to complete in no less than {}min",
expectReadBytesPerSec, expectCompleteTime);
if (expectReadBytesPerSec < MAX_THROTTLE_BYTES_PER_SEC) {
compactionBucket = Bucket.builder().addLimit(limit -> limit
.capacity(expectReadBytesPerSec)
.refillIntervally(expectReadBytesPerSec, Duration.ofSeconds(1))).build();
logger.info("Throttle compaction read to {} bytes/s, expect to complete in no less than {}min",
expectReadBytesPerSec, expectCompleteTime);
} else {
logger.warn("Compaction throttle rate {} bytes/s exceeds bucket refill limit, there will be no throttle for compaction this time", expectReadBytesPerSec);
compactionBucket = null;
}
} else {
logger.warn("Compaction interval {}min is too small, there will be no throttle for compaction", compactionInterval);
logger.warn("Compaction interval {}min is too small, there will be no throttle for compaction this time", compactionInterval);
compactionBucket = null;
}

if (!objectsToForceSplit.isEmpty()) {

0 comments on commit 0123f90

Please sign in to comment.