From 0123f9056117a18c5b21c42ae8f348c82ce865fd Mon Sep 17 00:00:00 2001 From: Shichao Nie Date: Thu, 22 Feb 2024 11:29:43 +0800 Subject: [PATCH] fix(s3stream): add compaction max throttle rate limit (#941) Signed-off-by: Shichao Nie --- .../java/com/automq/stream/s3/Config.java | 2 +- .../stream/s3/compact/CompactionManager.java | 20 +++++++++++++------ 2 files changed, 15 insertions(+), 7 deletions(-) diff --git a/s3stream/src/main/java/com/automq/stream/s3/Config.java b/s3stream/src/main/java/com/automq/stream/s3/Config.java index 8e77e4e17..596d73290 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/Config.java +++ b/s3stream/src/main/java/com/automq/stream/s3/Config.java @@ -45,7 +45,7 @@ public class Config { private long streamSetObjectCompactionStreamSplitSize = 16 * 1024 * 1024; private int streamSetObjectCompactionForceSplitPeriod = 120; private int streamSetObjectCompactionMaxObjectNum = 500; - private int maxStreamNumPerStreamSetObject = 10000; + private int maxStreamNumPerStreamSetObject = 100000; private int maxStreamObjectNumPerCommit = 10000; private boolean mockEnable = false; private boolean objectLogEnable = false; diff --git a/s3stream/src/main/java/com/automq/stream/s3/compact/CompactionManager.java b/s3stream/src/main/java/com/automq/stream/s3/compact/CompactionManager.java index d5994b6c4..0546399e7 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/compact/CompactionManager.java +++ b/s3stream/src/main/java/com/automq/stream/s3/compact/CompactionManager.java @@ -59,6 +59,8 @@ public class CompactionManager { private static final int MIN_COMPACTION_DELAY_MS = 60000; + // Max refill rate for Bucket: 1 token per nanosecond + private static final int MAX_THROTTLE_BYTES_PER_SEC = 1000000000; private final Logger logger; private final Logger s3ObjectLogger; private final ObjectManager objectManager; @@ -209,13 +211,19 @@ private void compact(List streamMetadataList, long expectReadBytesPerSec; if (expectCompleteTime > 0) { expectReadBytesPerSec = totalSize / expectCompleteTime / 60; - compactionBucket = Bucket.builder().addLimit(limit -> limit - .capacity(expectReadBytesPerSec) - .refillIntervally(expectReadBytesPerSec, Duration.ofSeconds(1))).build(); - logger.info("Throttle compaction read to {} bytes/s, expect to complete in no less than {}min", - expectReadBytesPerSec, expectCompleteTime); + if (expectReadBytesPerSec < MAX_THROTTLE_BYTES_PER_SEC) { + compactionBucket = Bucket.builder().addLimit(limit -> limit + .capacity(expectReadBytesPerSec) + .refillIntervally(expectReadBytesPerSec, Duration.ofSeconds(1))).build(); + logger.info("Throttle compaction read to {} bytes/s, expect to complete in no less than {}min", + expectReadBytesPerSec, expectCompleteTime); + } else { + logger.warn("Compaction throttle rate {} bytes/s exceeds bucket refill limit, there will be no throttle for compaction this time", expectReadBytesPerSec); + compactionBucket = null; + } } else { - logger.warn("Compaction interval {}min is too small, there will be no throttle for compaction", compactionInterval); + logger.warn("Compaction interval {}min is too small, there will be no throttle for compaction this time", compactionInterval); + compactionBucket = null; } if (!objectsToForceSplit.isEmpty()) {