Skip to content

Commit

Permalink
feat(kafka_issues871): add 1min interval between stream object compac…
Browse files Browse the repository at this point in the history
…tion (#952)

Signed-off-by: Robin Han <[email protected]>
  • Loading branch information
superhx authored Mar 4, 2024
1 parent 04c6b29 commit 87a9469
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 24 deletions.
42 changes: 25 additions & 17 deletions s3stream/src/main/java/com/automq/stream/s3/S3StreamClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,15 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class S3StreamClient implements StreamClient {
private static final Logger LOGGER = LoggerFactory.getLogger(S3StreamClient.class);
private static final long STREAM_OBJECT_COMPACTION_INTERVAL_MS = TimeUnit.MINUTES.toMillis(1);
private final ScheduledExecutorService streamObjectCompactionScheduler = Threads.newSingleThreadScheduledExecutor(
ThreadUtils.createThreadFactory("stream-object-compaction-scheduler", true), LOGGER, true);
private final Map<Long, StreamWrapper> openedStreams;
Expand Down Expand Up @@ -153,7 +155,8 @@ public void shutdown() {

class StreamWrapper implements Stream {
private final S3Stream stream;
private volatile boolean compacting = false;
private final Semaphore trimCompactionSemaphore = new Semaphore(1);
private volatile long lastCompactionTimestamp = 0;

public StreamWrapper(S3Stream stream) {
this.stream = stream;
Expand Down Expand Up @@ -198,13 +201,18 @@ public CompletableFuture<FetchResult> fetch(FetchContext context, long startOffs
@Override
public CompletableFuture<Void> trim(long newStartOffset) {
return stream.trim(newStartOffset).whenComplete((nil, ex) -> {
if (compacting) {
// skip compacting if the stream is compacting
// to avoid streamObjectCompactionScheduler task queue overflow.
if (!trimCompactionSemaphore.tryAcquire()) {
// ensure only one compaction task which trim triggers
return;
}
// trigger compaction after trim to clean up the expired stream objects.
streamObjectCompactionScheduler.execute(this::cleanupStreamObject);
streamObjectCompactionScheduler.execute(() -> {
try {
// trigger compaction after trim to clean up the expired stream objects.
this.cleanupStreamObject();
} finally {
trimCompactionSemaphore.release();
}
});
});

}
Expand Down Expand Up @@ -237,18 +245,18 @@ public void compactStreamObject0(boolean onlyCleanup) {
// so we need to check if the stream is closed before starting the compaction.
return;
}
try {
compacting = true;
StreamObjectCompactor task = StreamObjectCompactor.builder().objectManager(objectManager).stream(this)
.s3Operator(s3Operator).maxStreamObjectSize(config.streamObjectCompactionMaxSizeBytes()).build();
if (onlyCleanup) {
task.cleanup();
} else {
task.compact();
}
} finally {
compacting = false;
if (System.currentTimeMillis() - lastCompactionTimestamp > STREAM_OBJECT_COMPACTION_INTERVAL_MS) {
// skip compaction if the last compaction is within the interval.
return;
}
StreamObjectCompactor task = StreamObjectCompactor.builder().objectManager(objectManager).stream(this)
.s3Operator(s3Operator).maxStreamObjectSize(config.streamObjectCompactionMaxSizeBytes()).build();
if (onlyCleanup) {
task.cleanup();
} else {
task.compact();
}
lastCompactionTimestamp = System.currentTimeMillis();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ public class StreamObjectCompactor {
private final ObjectManager objectManager;
private final S3Operator s3Operator;
private final int dataBlockGroupSizeThreshold;
private CompactStreamObjectRequest request;

private StreamObjectCompactor(ObjectManager objectManager, S3Operator s3Operator, Stream stream,
long maxStreamObjectSize, int dataBlockGroupSizeThreshold) {
Expand All @@ -73,7 +74,7 @@ public void compact() {
try {
compact0(false);
} catch (Throwable e) {
handleCompactException(e);
handleCompactException(false, e);
}
}

Expand All @@ -84,15 +85,15 @@ public void cleanup() {
try {
compact0(true);
} catch (Throwable e) {
handleCompactException(e);
handleCompactException(true, e);
}
}

private void handleCompactException(Throwable e) {
private void handleCompactException(boolean onlyCleanup, Throwable e) {
if (stream instanceof S3StreamClient.StreamWrapper && ((S3StreamClient.StreamWrapper) stream).isClosed()) {
LOGGER.warn("[STREAM_OBJECT_COMPACT_FAIL],[STREAM_CLOSED],{}", stream.streamId(), e);
LOGGER.warn("[STREAM_OBJECT_COMPACT_FAIL],[STREAM_CLOSED],{},onlyCleanup={},req={}", stream.streamId(), onlyCleanup, request, e);
} else {
LOGGER.error("[STREAM_OBJECT_COMPACT_FAIL],[UNEXPECTED],{}", stream.streamId(), e);
LOGGER.error("[STREAM_OBJECT_COMPACT_FAIL],[UNEXPECTED],{},onlyCleanup={},req={}", stream.streamId(), onlyCleanup, request, e);
}
}

Expand All @@ -114,7 +115,7 @@ void compact0(boolean onlyCleanup) throws ExecutionException, InterruptedExcepti
// clean up the expired objects
if (!expiredObjects.isEmpty()) {
List<Long> compactedObjectIds = expiredObjects.stream().map(S3ObjectMetadata::objectId).collect(Collectors.toList());
CompactStreamObjectRequest request = new CompactStreamObjectRequest(NOOP_OBJECT_ID, 0,
request = new CompactStreamObjectRequest(NOOP_OBJECT_ID, 0,
streamId, stream.streamEpoch(), NOOP_OFFSET, NOOP_OFFSET, compactedObjectIds);
objectManager.compactStreamObject(request).get();
if (s3ObjectLogger.isTraceEnabled()) {
Expand All @@ -137,7 +138,7 @@ void compact0(boolean onlyCleanup) throws ExecutionException, InterruptedExcepti
Optional<CompactStreamObjectRequest> requestOpt = new StreamObjectGroupCompactor(streamId, stream.streamEpoch(),
startOffset, objectGroup, objectId, dataBlockGroupSizeThreshold, s3Operator).compact();
if (requestOpt.isPresent()) {
CompactStreamObjectRequest request = requestOpt.get();
request = requestOpt.get();
objectManager.compactStreamObject(request).get();
if (s3ObjectLogger.isTraceEnabled()) {
s3ObjectLogger.trace("{}", request);
Expand Down

0 comments on commit 87a9469

Please sign in to comment.