diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SingleThreadMultiplexSourceReaderBase.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SingleThreadMultiplexSourceReaderBase.java index 64a43ff49dfec..023a7d0c50db2 100644 --- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SingleThreadMultiplexSourceReaderBase.java +++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SingleThreadMultiplexSourceReaderBase.java @@ -22,10 +22,10 @@ import org.apache.flink.api.connector.source.SourceReader; import org.apache.flink.api.connector.source.SourceReaderContext; import org.apache.flink.api.connector.source.SourceSplit; +import org.apache.flink.api.connector.source.util.ratelimit.RateLimiterStrategy; import org.apache.flink.configuration.Configuration; import org.apache.flink.connector.base.source.reader.fetcher.SingleThreadFetcherManager; import org.apache.flink.connector.base.source.reader.splitreader.SplitReader; -import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue; import javax.annotation.Nullable; @@ -84,7 +84,27 @@ public SingleThreadMultiplexSourceReaderBase( /** * This constructor behaves like {@link #SingleThreadMultiplexSourceReaderBase(Supplier, * RecordEmitter, Configuration, SourceReaderContext)}, but accepts a specific {@link - * FutureCompletingBlockingQueue} and {@link SingleThreadFetcherManager}. + * RateLimiterStrategy}. + */ + public SingleThreadMultiplexSourceReaderBase( + Supplier> splitReaderSupplier, + RecordEmitter recordEmitter, + Configuration config, + SourceReaderContext context, + @Nullable RateLimiterStrategy rateLimiterStrategy) { + super( + new SingleThreadFetcherManager<>(splitReaderSupplier, config), + recordEmitter, + null, + config, + context, + rateLimiterStrategy); + } + + /** + * This constructor behaves like {@link #SingleThreadMultiplexSourceReaderBase(Supplier, + * RecordEmitter, Configuration, SourceReaderContext)}, but accepts a specific {@link + * SingleThreadFetcherManager}. */ public SingleThreadMultiplexSourceReaderBase( SingleThreadFetcherManager splitFetcherManager, @@ -97,8 +117,7 @@ public SingleThreadMultiplexSourceReaderBase( /** * This constructor behaves like {@link #SingleThreadMultiplexSourceReaderBase(Supplier, * RecordEmitter, Configuration, SourceReaderContext)}, but accepts a specific {@link - * FutureCompletingBlockingQueue}, {@link SingleThreadFetcherManager} and {@link - * RecordEvaluator}. + * SingleThreadFetcherManager} and {@link RecordEvaluator}. */ public SingleThreadMultiplexSourceReaderBase( SingleThreadFetcherManager splitFetcherManager, @@ -108,4 +127,26 @@ public SingleThreadMultiplexSourceReaderBase( SourceReaderContext context) { super(splitFetcherManager, recordEmitter, eofRecordEvaluator, config, context); } + + /** + * This constructor behaves like {@link + * #SingleThreadMultiplexSourceReaderBase(SingleThreadFetcherManager, RecordEmitter, + * RecordEvaluator, Configuration, SourceReaderContext)}, but accepts a specific {@link + * RateLimiterStrategy}. + */ + public SingleThreadMultiplexSourceReaderBase( + SingleThreadFetcherManager splitFetcherManager, + RecordEmitter recordEmitter, + @Nullable RecordEvaluator eofRecordEvaluator, + Configuration config, + SourceReaderContext context, + @Nullable RateLimiterStrategy rateLimiterStrategy) { + super( + splitFetcherManager, + recordEmitter, + eofRecordEvaluator, + config, + context, + rateLimiterStrategy); + } } diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java index f205b53a6c012..9fecaa41f4d83 100644 --- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java +++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java @@ -26,6 +26,8 @@ import org.apache.flink.api.connector.source.SourceReader; import org.apache.flink.api.connector.source.SourceReaderContext; import org.apache.flink.api.connector.source.SourceSplit; +import org.apache.flink.api.connector.source.util.ratelimit.RateLimiter; +import org.apache.flink.api.connector.source.util.ratelimit.RateLimiterStrategy; import org.apache.flink.configuration.Configuration; import org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager; import org.apache.flink.connector.base.source.reader.splitreader.SplitReader; @@ -107,6 +109,15 @@ public abstract class SourceReaderBase eofRecordEvaluator; + /** Indicating whether the SourceReader supports rate limiting or not. */ + private final boolean rateLimitingEnabled; + + /** The {@link RateLimiter} uses for rate limiting. */ + @Nullable private final RateLimiter rateLimiter; + + /** Future that tracks the result of acquiring permission from {@link #rateLimiter}. */ + @Nullable private CompletableFuture rateLimitPermissionFuture; + /** * The primary constructor for the source reader. * @@ -118,7 +129,7 @@ public SourceReaderBase( RecordEmitter recordEmitter, Configuration config, SourceReaderContext context) { - this(splitFetcherManager, recordEmitter, null, config, context); + this(splitFetcherManager, recordEmitter, null, config, context, null); } public SourceReaderBase( @@ -127,6 +138,16 @@ public SourceReaderBase( @Nullable RecordEvaluator eofRecordEvaluator, Configuration config, SourceReaderContext context) { + this(splitFetcherManager, recordEmitter, eofRecordEvaluator, config, context, null); + } + + public SourceReaderBase( + SplitFetcherManager splitFetcherManager, + RecordEmitter recordEmitter, + @Nullable RecordEvaluator eofRecordEvaluator, + Configuration config, + SourceReaderContext context, + @Nullable RateLimiterStrategy rateLimiterStrategy) { this.elementsQueue = splitFetcherManager.getQueue(); this.splitFetcherManager = splitFetcherManager; this.recordEmitter = recordEmitter; @@ -136,8 +157,17 @@ public SourceReaderBase( this.context = context; this.noMoreSplitsAssignment = false; this.eofRecordEvaluator = eofRecordEvaluator; - + this.rateLimitingEnabled = rateLimiterStrategy != null; numRecordsInCounter = context.metricGroup().getIOMetricGroup().getNumRecordsInCounter(); + + rateLimiter = + rateLimitingEnabled + ? rateLimiterStrategy.createRateLimiter(context.currentParallelism()) + : null; + LOG.info( + "Rate limiting of SourceReader is {}", + rateLimitingEnabled ? "enabled" : "disabled"); + rateLimitPermissionFuture = CompletableFuture.completedFuture(null); } @Override @@ -153,9 +183,47 @@ public InputStatus pollNext(ReaderOutput output) throws Exception { return trace(finishedOrAvailableLater()); } } + if (rateLimitingEnabled) { + return pollNextWithRateLimiting(recordsWithSplitId, output); + } else { + return pollNextWithoutRateLimiting(recordsWithSplitId, output); + } + } + + private InputStatus pollNextWithoutRateLimiting( + RecordsWithSplitIds recordsWithSplitId, ReaderOutput output) throws Exception { + // we need to loop here, in case we may have to go across splits + while (true) { + // Process one record. + final E record = recordsWithSplitId.nextRecordFromSplit(); + if (record != null) { + // emit the record. + numRecordsInCounter.inc(1); + recordEmitter.emitRecord(record, currentSplitOutput, currentSplitContext.state); + LOG.trace("Emitted record: {}", record); + // We always emit MORE_AVAILABLE here, even though we do not strictly know whether + // more is available. If nothing more is available, the next invocation will find + // this out and return the correct status. + // That means we emit the occasional 'false positive' for availability, but this + // saves us doing checks for every record. Ultimately, this is cheaper. + return trace(InputStatus.MORE_AVAILABLE); + } else if (!moveToNextSplit(recordsWithSplitId, output)) { + // The fetch is done and we just discovered that and have not emitted anything, yet. + // We need to move to the next fetch. As a shortcut, we call pollNext() here again, + // rather than emitting nothing and waiting for the caller to call us again. + return pollNext(output); + } + } + } - // we need to loop here, because we may have to go across splits + private InputStatus pollNextWithRateLimiting( + RecordsWithSplitIds recordsWithSplitId, ReaderOutput output) throws Exception { + // make sure we have a fetch we are working on, or move to the next while (true) { + // Check if the previous record count reached the limit of rateLimiter. + if (!rateLimitPermissionFuture.isDone()) { + return trace(InputStatus.MORE_AVAILABLE); + } // Process one record. final E record = recordsWithSplitId.nextRecordFromSplit(); if (record != null) { @@ -163,6 +231,18 @@ public InputStatus pollNext(ReaderOutput output) throws Exception { numRecordsInCounter.inc(1); recordEmitter.emitRecord(record, currentSplitOutput, currentSplitContext.state); LOG.trace("Emitted record: {}", record); + RateLimitingSourceOutputWrapper rateLimitingSourceOutputWrapper = + (RateLimitingSourceOutputWrapper) currentSplitOutput; + if (rateLimitingSourceOutputWrapper.getRecordsOfCurrentWindow() > 0) { + // Acquire permit from rateLimiter. + rateLimitPermissionFuture = + rateLimiter + .acquire( + rateLimitingSourceOutputWrapper + .getRecordsOfCurrentWindow()) + .toCompletableFuture(); + } + rateLimitingSourceOutputWrapper.resetWindowRecordCount(); // We always emit MORE_AVAILABLE here, even though we do not strictly know whether // more is available. If nothing more is available, the next invocation will find @@ -245,7 +325,14 @@ record -> { return true; }; } - currentSplitOutput = currentSplitContext.getOrCreateSplitOutput(output, eofRecordHandler); + if (rateLimitingEnabled) { + rateLimiter.notifyAddingSplit( + this.toSplitType( + this.currentSplitContext.splitId, this.currentSplitContext.state)); + } + currentSplitOutput = + currentSplitContext.getOrCreateSplitOutput( + output, eofRecordHandler, rateLimitingEnabled); LOG.trace("Emitting records from fetch for split {}", nextSplitId); return true; } @@ -264,6 +351,16 @@ public List snapshotState(long checkpointId) { return splits; } + @Override + public void notifyCheckpointComplete(long checkpointId) throws Exception { + splitStates.forEach( + (id, context) -> { + if (rateLimitingEnabled) { + rateLimiter.notifyCheckpointComplete(checkpointId); + } + }); + } + @Override public void addSplits(List splits) { LOG.info("Adding split(s) to reader: {}", splits); @@ -364,7 +461,9 @@ private SplitContext(String splitId, SplitStateT state) { } SourceOutput getOrCreateSplitOutput( - ReaderOutput mainOutput, @Nullable Function eofRecordHandler) { + ReaderOutput mainOutput, + @Nullable Function eofRecordHandler, + boolean rateLimitingEnabled) { if (sourceOutput == null) { // The split output should have been created when AddSplitsEvent was processed in // SourceOperator. Here we just use this method to get the previously created @@ -373,6 +472,9 @@ SourceOutput getOrCreateSplitOutput( if (eofRecordHandler != null) { sourceOutput = new SourceOutputWrapper<>(sourceOutput, eofRecordHandler); } + if (rateLimitingEnabled) { + sourceOutput = new RateLimitingSourceOutputWrapper<>(sourceOutput); + } } return sourceOutput; } @@ -435,4 +537,73 @@ private boolean isEndOfStreamReached(T record) { return isStreamEnd; } } + + /** + * A wrapper around {@link SourceOutput} that counts the number of records during the current + * rate-limiting window. + * + *

This wrapper is used when rate limiting is enabled to track how many records have been + * emitted since the last rate limit check, allowing the reader to properly apply backpressure + * when the rate limit is exceeded. + * + * @param The type of records being emitted + */ + private static final class RateLimitingSourceOutputWrapper implements SourceOutput { + /** The underlying source output to delegate to. */ + final SourceOutput sourceOutput; + + /** Number of records handled during the current rate-limiting window. */ + private int recordsOfCurrentWindow; + + /** + * Creates a new RecordCountingSourceOutputWrapper. + * + * @param sourceOutput The underlying source output to wrap + */ + public RateLimitingSourceOutputWrapper(SourceOutput sourceOutput) { + this.sourceOutput = sourceOutput; + this.recordsOfCurrentWindow = 0; + } + + @Override + public void emitWatermark(Watermark watermark) { + sourceOutput.emitWatermark(watermark); + } + + @Override + public void markIdle() { + sourceOutput.markIdle(); + } + + @Override + public void markActive() { + sourceOutput.markActive(); + } + + @Override + public void collect(T record) { + sourceOutput.collect(record); + recordsOfCurrentWindow++; + } + + @Override + public void collect(T record, long timestamp) { + sourceOutput.collect(record, timestamp); + recordsOfCurrentWindow++; + } + + /** + * Gets the recordsOfCurrentWindow. + * + * @return the number of current window. + */ + public int getRecordsOfCurrentWindow() { + return recordsOfCurrentWindow; + } + + /** Resets the recordsOfCurrentWindow to 0. */ + public void resetWindowRecordCount() { + recordsOfCurrentWindow = 0; + } + } } diff --git a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/SourceReaderBaseTest.java b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/SourceReaderBaseTest.java index 53913362e1d87..56a67193e0f6b 100644 --- a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/SourceReaderBaseTest.java +++ b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/SourceReaderBaseTest.java @@ -26,6 +26,7 @@ import org.apache.flink.api.connector.source.SourceSplit; import org.apache.flink.api.connector.source.mocks.MockSourceSplit; import org.apache.flink.api.connector.source.mocks.MockSourceSplitSerializer; +import org.apache.flink.api.connector.source.util.ratelimit.RateLimiterStrategy; import org.apache.flink.configuration.Configuration; import org.apache.flink.connector.base.source.reader.fetcher.SingleThreadFetcherManager; import org.apache.flink.connector.base.source.reader.mocks.MockSourceReader; @@ -49,6 +50,9 @@ import org.apache.flink.streaming.runtime.streamrecord.RecordAttributes; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus; +import org.apache.flink.util.concurrent.ExecutorThreadFactory; + +import org.apache.flink.shaded.guava33.com.google.common.util.concurrent.RateLimiter; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; @@ -63,6 +67,9 @@ import java.util.List; import java.util.Map; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; +import java.util.concurrent.Executor; +import java.util.concurrent.Executors; import java.util.function.Supplier; import java.util.stream.Collectors; import java.util.stream.IntStream; @@ -133,18 +140,137 @@ public void close() {} void testRecordsWithSplitsNotRecycledWhenRecordsLeft() throws Exception { final TestingRecordsWithSplitIds records = new TestingRecordsWithSplitIds<>("test-split", "value1", "value2"); - final SourceReader reader = createReaderAndAwaitAvailable("test-split", records); + final SourceReader reader = + createReaderAndAwaitAvailable( + Collections.singletonList("test-split"), + Collections.singletonList(records), + RateLimiterStrategy.noOp()); reader.pollNext(new TestingReaderOutput<>()); assertThat(records.isRecycled()).isFalse(); } + @Test + void testLimitingRateInSplitReader() throws Exception { + String[] recordArr = new String[60]; + for (int i = 0; i < recordArr.length; i++) { + recordArr[i] = "value" + i; + } + final TestingRecordsWithSplitIds records = + new TestingRecordsWithSplitIds<>("test-split", recordArr); + final SourceReader reader = + createReaderAndAwaitAvailable( + Collections.singletonList("test-split"), + Collections.singletonList(records), + RateLimiterStrategy.perSecond(2)); + TestingReaderOutput testingReaderOutput = new TestingReaderOutput<>(); + long startTime = System.currentTimeMillis(); + while (testingReaderOutput.getEmittedRecords().size() < recordArr.length) { + reader.pollNext(testingReaderOutput); + } + // Expected time: 60/2 ("test-split1") = 30 seconds. + // The first few seconds require preheating, there may be a deviation of a few seconds. + assertThat(System.currentTimeMillis() - startTime) + .isGreaterThan(Duration.ofSeconds(25).toMillis()) + .isLessThan(Duration.ofSeconds(35).toMillis()); + } + + @Test + void testLimitingRatePerCheckpointInSplitReader() throws Exception { + String[] recordArr = new String[30]; + for (int i = 0; i < recordArr.length; i++) { + recordArr[i] = "value" + i; + } + final TestingRecordsWithSplitIds records = + new TestingRecordsWithSplitIds<>("test-split", recordArr); + int recordsPerCheckpoint = 2; + final SourceReader reader = + createReaderAndAwaitAvailable( + Collections.singletonList("test-split"), + Collections.singletonList(records), + RateLimiterStrategy.perCheckpoint(recordsPerCheckpoint)); + TestingReaderOutput testingReaderOutput = new TestingReaderOutput<>(); + for (int i = 1; i <= recordArr.length / recordsPerCheckpoint; i++) { + long startTime = System.currentTimeMillis(); + while (System.currentTimeMillis() - startTime < Duration.ofSeconds(2).toMillis()) { + reader.pollNext(testingReaderOutput); + } + assertThat(testingReaderOutput.getEmittedRecords().size()) + .isGreaterThanOrEqualTo(1 + recordsPerCheckpoint * (i - 1)) + .isLessThanOrEqualTo(1 + recordsPerCheckpoint * i); + reader.notifyCheckpointComplete(i); + } + } + + @Test + void testLimitingRateWithAddingSplitInSplitReader() throws Exception { + String[] recordArr = new String[60]; + for (int i = 0; i < recordArr.length; i++) { + recordArr[i] = "value" + i; + } + final TestingRecordsWithSplitIds firstRecords = + new TestingRecordsWithSplitIds<>("test-split1", recordArr); + final TestingRecordsWithSplitIds secondRecords = + new TestingRecordsWithSplitIds<>("test-split2", recordArr); + int maxPerSecond = 2; + // SplitAwaredRateLimiter will reduce the maxPerSecond for splits whose splitId is not + // "test-split1". + final SourceReader reader = + createReaderAndAwaitAvailable( + Arrays.asList("test-split1", "test-split2"), + Arrays.asList(firstRecords, secondRecords), + parallelism -> + new SplitAwaredRateLimiter((double) maxPerSecond / parallelism)); + TestingReaderOutput testingReaderOutput = new TestingReaderOutput<>(); + long startTime = System.currentTimeMillis(); + while (testingReaderOutput.getEmittedRecords().size() < 2 * recordArr.length) { + reader.pollNext(testingReaderOutput); + } + // Expected time: 60/2 ("test-split1") + 60/1 ("test-split2") = 90 seconds. + // The first few seconds require preheating, there may be a deviation of a few seconds. + assertThat(System.currentTimeMillis() - startTime) + .isGreaterThan(Duration.ofSeconds(85).toMillis()) + .isLessThan(Duration.ofSeconds(95).toMillis()); + } + + /** A rate limiter that reduce the maxPerSecond for specific splits. */ + private static class SplitAwaredRateLimiter + implements org.apache.flink.api.connector.source.util.ratelimit.RateLimiter< + TestingSourceSplit> { + + private final Executor limiter = + Executors.newSingleThreadExecutor(new ExecutorThreadFactory("flink-rate-limiter")); + private RateLimiter rateLimiter; + private final double maxPerSecond; + + public SplitAwaredRateLimiter(double maxPerSecond) { + this.maxPerSecond = maxPerSecond; + this.rateLimiter = RateLimiter.create(maxPerSecond); + } + + @Override + public CompletionStage acquire(int numberOfEvents) { + return CompletableFuture.runAsync(() -> rateLimiter.acquire(numberOfEvents), limiter); + } + + @Override + public void notifyAddingSplit(TestingSourceSplit split) { + if (!split.splitId().equals("test-split1")) { + this.rateLimiter = RateLimiter.create(maxPerSecond / 2); + } + } + } + @Test void testRecordsWithSplitsRecycledWhenEmpty() throws Exception { final TestingRecordsWithSplitIds records = new TestingRecordsWithSplitIds<>("test-split", "value1", "value2"); - final SourceReader reader = createReaderAndAwaitAvailable("test-split", records); + final SourceReader reader = + createReaderAndAwaitAvailable( + Collections.singletonList("test-split"), + Collections.singletonList(records), + RateLimiterStrategy.noOp()); // poll thrice: twice to get all records, one more to trigger recycle and moving to the next // split @@ -414,18 +540,24 @@ private Configuration getConfig() { // ------------------------------------------------------------------------ private static SourceReader createReaderAndAwaitAvailable( - final String splitId, final RecordsWithSplitIds records) throws Exception { + final List splitIds, + final List> records, + RateLimiterStrategy rateLimiterStrategy) + throws Exception { final SourceReader reader = new SingleThreadMultiplexSourceReaderBase< E, E, TestingSourceSplit, TestingSourceSplit>( - () -> new TestingSplitReader<>(records), + () -> new TestingSplitReader<>(records.toArray(new RecordsWithSplitIds[0])), new PassThroughRecordEmitter<>(), new Configuration(), - new TestingReaderContext()) { + new TestingReaderContext(), + rateLimiterStrategy) { @Override - public void notifyCheckpointComplete(long checkpointId) {} + public void notifyCheckpointComplete(long checkpointId) throws Exception { + super.notifyCheckpointComplete(checkpointId); + } @Override protected void onSplitFinished( @@ -446,7 +578,7 @@ protected TestingSourceSplit toSplitType( reader.start(); final List splits = - Collections.singletonList(new TestingSourceSplit(splitId)); + splitIds.stream().map(TestingSourceSplit::new).collect(Collectors.toList()); reader.addSplits(splits); reader.isAvailable().get(); diff --git a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/MockSourceReader.java b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/MockSourceReader.java index 87bc7810c3144..5926020f314db 100644 --- a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/MockSourceReader.java +++ b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/MockSourceReader.java @@ -20,12 +20,15 @@ import org.apache.flink.api.connector.source.SourceReaderContext; import org.apache.flink.api.connector.source.mocks.MockSourceSplit; +import org.apache.flink.api.connector.source.util.ratelimit.RateLimiterStrategy; import org.apache.flink.configuration.Configuration; import org.apache.flink.connector.base.source.reader.RecordEvaluator; import org.apache.flink.connector.base.source.reader.SingleThreadMultiplexSourceReaderBase; import org.apache.flink.connector.base.source.reader.fetcher.SingleThreadFetcherManager; import org.apache.flink.connector.base.source.reader.splitreader.SplitReader; +import javax.annotation.Nullable; + import java.util.Map; import java.util.function.Supplier; @@ -65,6 +68,21 @@ public MockSourceReader( context); } + public MockSourceReader( + SingleThreadFetcherManager splitSplitFetcherManager, + Configuration config, + SourceReaderContext context, + RecordEvaluator recordEvaluator, + @Nullable RateLimiterStrategy rateLimiterStrategy) { + super( + splitSplitFetcherManager, + new MockRecordEmitter(context.metricGroup()), + recordEvaluator, + config, + context, + rateLimiterStrategy); + } + @Override protected void onSplitFinished(Map finishedSplitIds) {} diff --git a/flink-core/src/main/java/org/apache/flink/api/connector/source/util/ratelimit/GatedRateLimiter.java b/flink-core/src/main/java/org/apache/flink/api/connector/source/util/ratelimit/GatedRateLimiter.java index 914c58d43b751..2fa991af0138b 100644 --- a/flink-core/src/main/java/org/apache/flink/api/connector/source/util/ratelimit/GatedRateLimiter.java +++ b/flink-core/src/main/java/org/apache/flink/api/connector/source/util/ratelimit/GatedRateLimiter.java @@ -19,6 +19,7 @@ package org.apache.flink.api.connector.source.util.ratelimit; import org.apache.flink.annotation.Internal; +import org.apache.flink.api.connector.source.SourceSplit; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; @@ -31,7 +32,7 @@ * external notifications. */ @Internal -public class GatedRateLimiter implements RateLimiter { +public class GatedRateLimiter implements RateLimiter { private final int capacityPerCycle; private int capacityLeft; @@ -50,14 +51,14 @@ public GatedRateLimiter(int capacityPerCycle) { transient CompletableFuture gatingFuture = null; @Override - public CompletionStage acquire() { + public CompletionStage acquire(int numberOfEvents) { if (gatingFuture == null) { gatingFuture = CompletableFuture.completedFuture(null); } if (capacityLeft <= 0) { gatingFuture = new CompletableFuture<>(); } - return gatingFuture.thenRun(() -> capacityLeft -= 1); + return gatingFuture.thenRun(() -> capacityLeft -= numberOfEvents); } @Override diff --git a/flink-core/src/main/java/org/apache/flink/api/connector/source/util/ratelimit/GuavaRateLimiter.java b/flink-core/src/main/java/org/apache/flink/api/connector/source/util/ratelimit/GuavaRateLimiter.java index d8ea41eccd864..77fb6d6273655 100644 --- a/flink-core/src/main/java/org/apache/flink/api/connector/source/util/ratelimit/GuavaRateLimiter.java +++ b/flink-core/src/main/java/org/apache/flink/api/connector/source/util/ratelimit/GuavaRateLimiter.java @@ -19,6 +19,7 @@ package org.apache.flink.api.connector.source.util.ratelimit; import org.apache.flink.annotation.Internal; +import org.apache.flink.api.connector.source.SourceSplit; import org.apache.flink.util.concurrent.ExecutorThreadFactory; import org.apache.flink.shaded.guava33.com.google.common.util.concurrent.RateLimiter; @@ -30,8 +31,8 @@ /** An implementation of {@link RateLimiter} based on Guava's RateLimiter. */ @Internal -public class GuavaRateLimiter - implements org.apache.flink.api.connector.source.util.ratelimit.RateLimiter { +public class GuavaRateLimiter + implements org.apache.flink.api.connector.source.util.ratelimit.RateLimiter { private final Executor limiter = Executors.newSingleThreadExecutor(new ExecutorThreadFactory("flink-rate-limiter")); @@ -42,7 +43,7 @@ public GuavaRateLimiter(double maxPerSecond) { } @Override - public CompletionStage acquire() { - return CompletableFuture.runAsync(rateLimiter::acquire, limiter); + public CompletionStage acquire(int numberOfEvents) { + return CompletableFuture.runAsync(() -> rateLimiter.acquire(numberOfEvents), limiter); } } diff --git a/flink-core/src/main/java/org/apache/flink/api/connector/source/util/ratelimit/NoOpRateLimiter.java b/flink-core/src/main/java/org/apache/flink/api/connector/source/util/ratelimit/NoOpRateLimiter.java index 15938bbb81d6b..d7839cc5a3c15 100644 --- a/flink-core/src/main/java/org/apache/flink/api/connector/source/util/ratelimit/NoOpRateLimiter.java +++ b/flink-core/src/main/java/org/apache/flink/api/connector/source/util/ratelimit/NoOpRateLimiter.java @@ -19,16 +19,17 @@ package org.apache.flink.api.connector.source.util.ratelimit; import org.apache.flink.annotation.Internal; +import org.apache.flink.api.connector.source.SourceSplit; import org.apache.flink.util.concurrent.FutureUtils; import java.util.concurrent.CompletionStage; /** A convenience implementation of {@link RateLimiter} that does not throttle requests. */ @Internal -public class NoOpRateLimiter implements RateLimiter { +public class NoOpRateLimiter implements RateLimiter { @Override - public CompletionStage acquire() { + public CompletionStage acquire(int numberOfEvents) { return FutureUtils.completedVoidFuture(); } } diff --git a/flink-core/src/main/java/org/apache/flink/api/connector/source/util/ratelimit/RateLimitedSourceReader.java b/flink-core/src/main/java/org/apache/flink/api/connector/source/util/ratelimit/RateLimitedSourceReader.java index aff9b5c266eb8..60c89571ae1a0 100644 --- a/flink-core/src/main/java/org/apache/flink/api/connector/source/util/ratelimit/RateLimitedSourceReader.java +++ b/flink-core/src/main/java/org/apache/flink/api/connector/source/util/ratelimit/RateLimitedSourceReader.java @@ -35,7 +35,7 @@ public class RateLimitedSourceReader implements SourceReader { private final SourceReader sourceReader; - private final RateLimiter rateLimiter; + private final RateLimiter rateLimiter; private CompletableFuture availabilityFuture = null; /** @@ -44,7 +44,8 @@ public class RateLimitedSourceReader * @param sourceReader The actual source reader. * @param rateLimiter The rate limiter. */ - public RateLimitedSourceReader(SourceReader sourceReader, RateLimiter rateLimiter) { + public RateLimitedSourceReader( + SourceReader sourceReader, RateLimiter rateLimiter) { checkNotNull(sourceReader); checkNotNull(rateLimiter); this.sourceReader = sourceReader; diff --git a/flink-core/src/main/java/org/apache/flink/api/connector/source/util/ratelimit/RateLimiter.java b/flink-core/src/main/java/org/apache/flink/api/connector/source/util/ratelimit/RateLimiter.java index a6bd004b894b1..e0aecedb23a10 100644 --- a/flink-core/src/main/java/org/apache/flink/api/connector/source/util/ratelimit/RateLimiter.java +++ b/flink-core/src/main/java/org/apache/flink/api/connector/source/util/ratelimit/RateLimiter.java @@ -19,22 +19,38 @@ package org.apache.flink.api.connector.source.util.ratelimit; import org.apache.flink.annotation.Experimental; +import org.apache.flink.api.connector.source.SourceSplit; import javax.annotation.concurrent.NotThreadSafe; import java.util.concurrent.CompletionStage; -/** The interface to rate limit execution of methods. */ +/** + * The interface to rate limit execution of methods. + * + * @param The type of the source splits. + */ @NotThreadSafe @Experimental -public interface RateLimiter { +public interface RateLimiter { /** * Returns a future that is completed once another event would not exceed the rate limit. For * correct functioning, the next invocation of this method should only happen after the * previously returned future has been completed. */ - CompletionStage acquire(); + default CompletionStage acquire() { + return acquire(1); + } + + /** + * Returns a future that is completed once other events would not exceed the rate limit. For + * correct functioning, the next invocation of this method should only happen after the + * previously returned future has been completed. + * + * @param numberOfEvents The number of events. + */ + CompletionStage acquire(int numberOfEvents); /** * Notifies this {@code RateLimiter} that the checkpoint with the given {@code checkpointId} @@ -44,4 +60,13 @@ public interface RateLimiter { * @param checkpointId The ID of the checkpoint that has been completed. */ default void notifyCheckpointComplete(long checkpointId) {} + + /** + * Notifies this {@code RateLimiter} that a new split has been added. For correct functioning, + * this method should only be invoked after the returned future of previous {@link + * #acquire(int)} method invocation has been completed. + * + * @param split The split that has been added. + */ + default void notifyAddingSplit(SplitT split) {} } diff --git a/flink-core/src/main/java/org/apache/flink/api/connector/source/util/ratelimit/RateLimiterStrategy.java b/flink-core/src/main/java/org/apache/flink/api/connector/source/util/ratelimit/RateLimiterStrategy.java index 684e919a50016..5cb4ffa1217ed 100644 --- a/flink-core/src/main/java/org/apache/flink/api/connector/source/util/ratelimit/RateLimiterStrategy.java +++ b/flink-core/src/main/java/org/apache/flink/api/connector/source/util/ratelimit/RateLimiterStrategy.java @@ -18,6 +18,7 @@ package org.apache.flink.api.connector.source.util.ratelimit; import org.apache.flink.annotation.Experimental; +import org.apache.flink.api.connector.source.SourceSplit; import java.io.Serializable; @@ -25,16 +26,18 @@ /** * A factory for {@link RateLimiter RateLimiters} which apply rate-limiting to a source sub-task. + * + * @param The type of the source splits. */ @Experimental -public interface RateLimiterStrategy extends Serializable { +public interface RateLimiterStrategy extends Serializable { /** - * Creates a {@link RateLimiter} that lets records through with rate proportional to the - * parallelism. This method will be called once per source subtask. The cumulative rate over all - * rate limiters for a source must not exceed the rate limit configured for the strategy. + * Creates a {@link RateLimiter} that limits the rate of records going through. When there is + * parallelism, the limiting rate is evenly reduced per subtask, such that all the sub-tasks + * limiting rates equals the cumulative limiting rate. */ - RateLimiter createRateLimiter(int parallelism); + RateLimiter createRateLimiter(int parallelism); /** * Creates a {@code RateLimiterStrategy} that is limiting the number of records per second. @@ -44,7 +47,7 @@ public interface RateLimiterStrategy extends Serializable { * among the parallel instances. */ static RateLimiterStrategy perSecond(double recordsPerSecond) { - return parallelism -> new GuavaRateLimiter(recordsPerSecond / parallelism); + return parallelism -> new GuavaRateLimiter<>(recordsPerSecond / parallelism); } /** @@ -62,7 +65,7 @@ static RateLimiterStrategy perCheckpoint(int recordsPerCheckpoint) { "recordsPerCheckpoint has to be greater or equal to parallelism. " + "Either decrease the parallelism or increase the number of " + "recordsPerCheckpoint."); - return new GatedRateLimiter(recordsPerSubtask); + return new GatedRateLimiter<>(recordsPerSubtask); }; } diff --git a/flink-tests/src/test/java/org/apache/flink/api/connector/source/lib/util/RateLimitedSourceReaderITCase.java b/flink-tests/src/test/java/org/apache/flink/api/connector/source/lib/util/RateLimitedSourceReaderITCase.java index 0660253810355..f624fb754fabd 100644 --- a/flink-tests/src/test/java/org/apache/flink/api/connector/source/lib/util/RateLimitedSourceReaderITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/api/connector/source/lib/util/RateLimitedSourceReaderITCase.java @@ -20,6 +20,7 @@ import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.typeinfo.Types; +import org.apache.flink.api.connector.source.lib.NumberSequenceSource; import org.apache.flink.api.connector.source.util.ratelimit.RateLimiter; import org.apache.flink.api.connector.source.util.ratelimit.RateLimiterStrategy; import org.apache.flink.connector.datagen.source.DataGeneratorSource; @@ -87,12 +88,13 @@ private List range(int startInclusive, int endInclusive) { .collect(Collectors.toList()); } - private static final class MockRateLimiter implements RateLimiter { + private static final class MockRateLimiter + implements RateLimiter { int callCount; @Override - public CompletableFuture acquire() { + public CompletableFuture acquire(int numberOfEvents) { callCount++; return CompletableFuture.completedFuture(null); } @@ -102,13 +104,15 @@ public int getCallCount() { } } - private static class MockRateLimiterStrategy implements RateLimiterStrategy { + private static class MockRateLimiterStrategy + implements RateLimiterStrategy { private static final List rateLimiters = Collections.synchronizedList(new ArrayList<>()); @Override - public RateLimiter createRateLimiter(int parallelism) { + public RateLimiter createRateLimiter( + int parallelism) { MockRateLimiter mockRateLimiter = new MockRateLimiter(); rateLimiters.add(mockRateLimiter); return mockRateLimiter; diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SinkV2ITCase.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SinkV2ITCase.java index f8499cc6fb8cc..1f2155bc9bf44 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SinkV2ITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SinkV2ITCase.java @@ -25,6 +25,7 @@ import org.apache.flink.api.common.typeinfo.IntegerTypeInfo; import org.apache.flink.api.connector.sink2.Committer; import org.apache.flink.api.connector.source.Source; +import org.apache.flink.api.connector.source.lib.NumberSequenceSource; import org.apache.flink.api.connector.source.util.ratelimit.GatedRateLimiter; import org.apache.flink.api.connector.source.util.ratelimit.RateLimiter; import org.apache.flink.api.connector.source.util.ratelimit.RateLimiterStrategy; @@ -365,7 +366,7 @@ private void assertUnalignedCheckpointInNonSink(StreamGraph streamGraph) { * for another two checkpoints and 5) exiting. */ private Source createStreamingSource() { - RateLimiterStrategy rateLimiterStrategy = + RateLimiterStrategy rateLimiterStrategy = parallelism -> new BurstingRateLimiter(SOURCE_DATA.size() / 4, 2); return new DataGeneratorSource<>( l -> SOURCE_DATA.get(l.intValue() % SOURCE_DATA.size()), @@ -374,19 +375,20 @@ private void assertUnalignedCheckpointInNonSink(StreamGraph streamGraph) { IntegerTypeInfo.INT_TYPE_INFO); } - private static class BurstingRateLimiter implements RateLimiter { - private final RateLimiter rateLimiter; + private static class BurstingRateLimiter + implements RateLimiter { + private final RateLimiter rateLimiter; private final int numCheckpointCooldown; private int cooldown; public BurstingRateLimiter(int recordPerCycle, int numCheckpointCooldown) { - rateLimiter = new GatedRateLimiter(recordPerCycle); + rateLimiter = new GatedRateLimiter<>(recordPerCycle); this.numCheckpointCooldown = numCheckpointCooldown; } @Override - public CompletionStage acquire() { - CompletionStage stage = rateLimiter.acquire(); + public CompletionStage acquire(int numberOfEvents) { + CompletionStage stage = rateLimiter.acquire(numberOfEvents); cooldown = numCheckpointCooldown; return stage; }