From 87a9469e68cc6f4ca6de8541366863c97414438b Mon Sep 17 00:00:00 2001 From: "Xu Han@AutoMQ" Date: Mon, 4 Mar 2024 11:15:24 +0800 Subject: [PATCH] feat(kafka_issues871): add 1min interval between stream object compaction (#952) Signed-off-by: Robin Han --- .../com/automq/stream/s3/S3StreamClient.java | 42 +++++++++++-------- .../stream/s3/StreamObjectCompactor.java | 15 +++---- 2 files changed, 33 insertions(+), 24 deletions(-) diff --git a/s3stream/src/main/java/com/automq/stream/s3/S3StreamClient.java b/s3stream/src/main/java/com/automq/stream/s3/S3StreamClient.java index b534f3e77..78499d4b6 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/S3StreamClient.java +++ b/s3stream/src/main/java/com/automq/stream/s3/S3StreamClient.java @@ -38,6 +38,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import org.slf4j.Logger; @@ -45,6 +46,7 @@ public class S3StreamClient implements StreamClient { private static final Logger LOGGER = LoggerFactory.getLogger(S3StreamClient.class); + private static final long STREAM_OBJECT_COMPACTION_INTERVAL_MS = TimeUnit.MINUTES.toMillis(1); private final ScheduledExecutorService streamObjectCompactionScheduler = Threads.newSingleThreadScheduledExecutor( ThreadUtils.createThreadFactory("stream-object-compaction-scheduler", true), LOGGER, true); private final Map openedStreams; @@ -153,7 +155,8 @@ public void shutdown() { class StreamWrapper implements Stream { private final S3Stream stream; - private volatile boolean compacting = false; + private final Semaphore trimCompactionSemaphore = new Semaphore(1); + private volatile long lastCompactionTimestamp = 0; public StreamWrapper(S3Stream stream) { this.stream = stream; @@ -198,13 +201,18 @@ public CompletableFuture fetch(FetchContext context, long startOffs @Override public CompletableFuture trim(long newStartOffset) { return stream.trim(newStartOffset).whenComplete((nil, ex) -> { - if (compacting) { - // skip compacting if the stream is compacting - // to avoid streamObjectCompactionScheduler task queue overflow. + if (!trimCompactionSemaphore.tryAcquire()) { + // ensure only one compaction task which trim triggers return; } - // trigger compaction after trim to clean up the expired stream objects. - streamObjectCompactionScheduler.execute(this::cleanupStreamObject); + streamObjectCompactionScheduler.execute(() -> { + try { + // trigger compaction after trim to clean up the expired stream objects. + this.cleanupStreamObject(); + } finally { + trimCompactionSemaphore.release(); + } + }); }); } @@ -237,18 +245,18 @@ public void compactStreamObject0(boolean onlyCleanup) { // so we need to check if the stream is closed before starting the compaction. return; } - try { - compacting = true; - StreamObjectCompactor task = StreamObjectCompactor.builder().objectManager(objectManager).stream(this) - .s3Operator(s3Operator).maxStreamObjectSize(config.streamObjectCompactionMaxSizeBytes()).build(); - if (onlyCleanup) { - task.cleanup(); - } else { - task.compact(); - } - } finally { - compacting = false; + if (System.currentTimeMillis() - lastCompactionTimestamp > STREAM_OBJECT_COMPACTION_INTERVAL_MS) { + // skip compaction if the last compaction is within the interval. + return; + } + StreamObjectCompactor task = StreamObjectCompactor.builder().objectManager(objectManager).stream(this) + .s3Operator(s3Operator).maxStreamObjectSize(config.streamObjectCompactionMaxSizeBytes()).build(); + if (onlyCleanup) { + task.cleanup(); + } else { + task.compact(); } + lastCompactionTimestamp = System.currentTimeMillis(); } } } diff --git a/s3stream/src/main/java/com/automq/stream/s3/StreamObjectCompactor.java b/s3stream/src/main/java/com/automq/stream/s3/StreamObjectCompactor.java index 261f53446..d0218a5c9 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/StreamObjectCompactor.java +++ b/s3stream/src/main/java/com/automq/stream/s3/StreamObjectCompactor.java @@ -57,6 +57,7 @@ public class StreamObjectCompactor { private final ObjectManager objectManager; private final S3Operator s3Operator; private final int dataBlockGroupSizeThreshold; + private CompactStreamObjectRequest request; private StreamObjectCompactor(ObjectManager objectManager, S3Operator s3Operator, Stream stream, long maxStreamObjectSize, int dataBlockGroupSizeThreshold) { @@ -73,7 +74,7 @@ public void compact() { try { compact0(false); } catch (Throwable e) { - handleCompactException(e); + handleCompactException(false, e); } } @@ -84,15 +85,15 @@ public void cleanup() { try { compact0(true); } catch (Throwable e) { - handleCompactException(e); + handleCompactException(true, e); } } - private void handleCompactException(Throwable e) { + private void handleCompactException(boolean onlyCleanup, Throwable e) { if (stream instanceof S3StreamClient.StreamWrapper && ((S3StreamClient.StreamWrapper) stream).isClosed()) { - LOGGER.warn("[STREAM_OBJECT_COMPACT_FAIL],[STREAM_CLOSED],{}", stream.streamId(), e); + LOGGER.warn("[STREAM_OBJECT_COMPACT_FAIL],[STREAM_CLOSED],{},onlyCleanup={},req={}", stream.streamId(), onlyCleanup, request, e); } else { - LOGGER.error("[STREAM_OBJECT_COMPACT_FAIL],[UNEXPECTED],{}", stream.streamId(), e); + LOGGER.error("[STREAM_OBJECT_COMPACT_FAIL],[UNEXPECTED],{},onlyCleanup={},req={}", stream.streamId(), onlyCleanup, request, e); } } @@ -114,7 +115,7 @@ void compact0(boolean onlyCleanup) throws ExecutionException, InterruptedExcepti // clean up the expired objects if (!expiredObjects.isEmpty()) { List compactedObjectIds = expiredObjects.stream().map(S3ObjectMetadata::objectId).collect(Collectors.toList()); - CompactStreamObjectRequest request = new CompactStreamObjectRequest(NOOP_OBJECT_ID, 0, + request = new CompactStreamObjectRequest(NOOP_OBJECT_ID, 0, streamId, stream.streamEpoch(), NOOP_OFFSET, NOOP_OFFSET, compactedObjectIds); objectManager.compactStreamObject(request).get(); if (s3ObjectLogger.isTraceEnabled()) { @@ -137,7 +138,7 @@ void compact0(boolean onlyCleanup) throws ExecutionException, InterruptedExcepti Optional requestOpt = new StreamObjectGroupCompactor(streamId, stream.streamEpoch(), startOffset, objectGroup, objectId, dataBlockGroupSizeThreshold, s3Operator).compact(); if (requestOpt.isPresent()) { - CompactStreamObjectRequest request = requestOpt.get(); + request = requestOpt.get(); objectManager.compactStreamObject(request).get(); if (s3ObjectLogger.isTraceEnabled()) { s3ObjectLogger.trace("{}", request);