Skip to content

Commit 247ae2b

Browse files
committed
Address comments.
1 parent 4d22a10 commit 247ae2b

File tree

9 files changed

+39
-25
lines changed

9 files changed

+39
-25
lines changed

flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -290,7 +290,7 @@ record -> {
290290
};
291291
}
292292
if (isRateLimited) {
293-
rateLimiter.notifyStatusChange(
293+
rateLimiter.notifyAddingSplit(
294294
this.toSplitType(
295295
this.currentSplitContext.splitId, this.currentSplitContext.state));
296296
}
@@ -516,7 +516,7 @@ private static final class RateLimitingSourceOutputWrapper<T> implements SourceO
516516
final SourceOutput<T> sourceOutput;
517517

518518
/** Count of records handled during the current rate-limiting window. */
519-
int currentWindowRecordCount;
519+
private int currentWindowRecordCount;
520520

521521
/**
522522
* Creates a new RecordCountingSourceOutputWrapper.

flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/SourceReaderBaseTest.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -205,7 +205,7 @@ void testLimitingRatePerCheckpointInSplitReader() throws Exception {
205205
}
206206

207207
@Test
208-
void testLimitingRateWithStatusChangeInSplitReader() throws Exception {
208+
void testLimitingRateWithAddingSplitInSplitReader() throws Exception {
209209
String[] recordArr = new String[60];
210210
for (int i = 0; i < recordArr.length; i++) {
211211
recordArr[i] = "value" + i;
@@ -237,7 +237,7 @@ void testLimitingRateWithStatusChangeInSplitReader() throws Exception {
237237
/** A rate limiter that reduce the maxPerSecond for specific splits. */
238238
private static class SplitAwaredRateLimiter
239239
implements org.apache.flink.api.connector.source.util.ratelimit.RateLimiter<
240-
org.apache.flink.connector.base.source.reader.mocks.TestingSourceSplit> {
240+
TestingSourceSplit> {
241241

242242
private final Executor limiter =
243243
Executors.newSingleThreadExecutor(new ExecutorThreadFactory("flink-rate-limiter"));
@@ -255,8 +255,7 @@ public CompletionStage<Void> acquire(int requestSize) {
255255
}
256256

257257
@Override
258-
public void notifyStatusChange(
259-
org.apache.flink.connector.base.source.reader.mocks.TestingSourceSplit split) {
258+
public void notifyAddingSplit(TestingSourceSplit split) {
260259
if (!split.splitId().equals("test-split1")) {
261260
this.rateLimiter = RateLimiter.create(maxPerSecond / 2);
262261
}

flink-core/src/main/java/org/apache/flink/api/connector/source/util/ratelimit/GatedRateLimiter.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
package org.apache.flink.api.connector.source.util.ratelimit;
2020

2121
import org.apache.flink.annotation.Internal;
22+
import org.apache.flink.api.connector.source.SourceSplit;
2223

2324
import java.util.concurrent.CompletableFuture;
2425
import java.util.concurrent.CompletionStage;
@@ -31,7 +32,7 @@
3132
* external notifications.
3233
*/
3334
@Internal
34-
public class GatedRateLimiter<S> implements RateLimiter<S> {
35+
public class GatedRateLimiter<Split extends SourceSplit> implements RateLimiter<Split> {
3536

3637
private final int capacityPerCycle;
3738
private int capacityLeft;

flink-core/src/main/java/org/apache/flink/api/connector/source/util/ratelimit/GuavaRateLimiter.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
package org.apache.flink.api.connector.source.util.ratelimit;
2020

2121
import org.apache.flink.annotation.Internal;
22+
import org.apache.flink.api.connector.source.SourceSplit;
2223
import org.apache.flink.util.concurrent.ExecutorThreadFactory;
2324

2425
import org.apache.flink.shaded.guava33.com.google.common.util.concurrent.RateLimiter;
@@ -30,8 +31,8 @@
3031

3132
/** An implementation of {@link RateLimiter} based on Guava's RateLimiter. */
3233
@Internal
33-
public class GuavaRateLimiter<S>
34-
implements org.apache.flink.api.connector.source.util.ratelimit.RateLimiter<S> {
34+
public class GuavaRateLimiter<Split extends SourceSplit>
35+
implements org.apache.flink.api.connector.source.util.ratelimit.RateLimiter<Split> {
3536

3637
private final Executor limiter =
3738
Executors.newSingleThreadExecutor(new ExecutorThreadFactory("flink-rate-limiter"));

flink-core/src/main/java/org/apache/flink/api/connector/source/util/ratelimit/NoOpRateLimiter.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,13 +19,14 @@
1919
package org.apache.flink.api.connector.source.util.ratelimit;
2020

2121
import org.apache.flink.annotation.Internal;
22+
import org.apache.flink.api.connector.source.SourceSplit;
2223
import org.apache.flink.util.concurrent.FutureUtils;
2324

2425
import java.util.concurrent.CompletionStage;
2526

2627
/** A convenience implementation of {@link RateLimiter} that does not throttle requests. */
2728
@Internal
28-
public class NoOpRateLimiter<S> implements RateLimiter<S> {
29+
public class NoOpRateLimiter<Split extends SourceSplit> implements RateLimiter<Split> {
2930

3031
@Override
3132
public CompletionStage<Void> acquire(int requestSize) {

flink-core/src/main/java/org/apache/flink/api/connector/source/util/ratelimit/RateLimiter.java

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -19,15 +19,20 @@
1919
package org.apache.flink.api.connector.source.util.ratelimit;
2020

2121
import org.apache.flink.annotation.Experimental;
22+
import org.apache.flink.api.connector.source.SourceSplit;
2223

2324
import javax.annotation.concurrent.NotThreadSafe;
2425

2526
import java.util.concurrent.CompletionStage;
2627

27-
/** The interface to rate limit execution of methods. */
28+
/**
29+
* The interface to rate limit execution of methods.
30+
*
31+
* @param <SplitT> The type of the source splits.
32+
*/
2833
@NotThreadSafe
2934
@Experimental
30-
public interface RateLimiter<S> {
35+
public interface RateLimiter<SplitT extends SourceSplit> {
3136

3237
/**
3338
* Returns a future that is completed once another event would not exceed the rate limit. For
@@ -57,10 +62,8 @@ default CompletionStage<Void> acquire() {
5762
default void notifyCheckpointComplete(long checkpointId) {}
5863

5964
/**
60-
* Notifies this {@code RateLimiter} that the status has changed. This can be used to adjust the
61-
* rate limiting behavior based on the new status.
62-
*
63-
* @param status The new status.
65+
* Notifies this {@code RateLimiter} that a new split has been added. the result of before
66+
* {@link #acquire(int)} method call should be ensured to be completed when calling this method.
6467
*/
65-
default void notifyStatusChange(S status) {}
68+
default void notifyAddingSplit(SplitT split) {}
6669
}

flink-core/src/main/java/org/apache/flink/api/connector/source/util/ratelimit/RateLimiterStrategy.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,23 +18,26 @@
1818
package org.apache.flink.api.connector.source.util.ratelimit;
1919

2020
import org.apache.flink.annotation.Experimental;
21+
import org.apache.flink.api.connector.source.SourceSplit;
2122

2223
import java.io.Serializable;
2324

2425
import static org.apache.flink.util.Preconditions.checkArgument;
2526

2627
/**
2728
* A factory for {@link RateLimiter RateLimiters} which apply rate-limiting to a source sub-task.
29+
*
30+
* @param <SplitT> The type of the source splits.
2831
*/
2932
@Experimental
30-
public interface RateLimiterStrategy<S> extends Serializable {
33+
public interface RateLimiterStrategy<SplitT extends SourceSplit> extends Serializable {
3134

3235
/**
3336
* Creates a {@link RateLimiter} that limits the rate of records going through. When there is
3437
* parallelism, the limiting rate is evenly reduced per subtask, such that all the sub-tasks
3538
* limiting rates equals the cumulative limiting rate.
3639
*/
37-
RateLimiter<S> createRateLimiter(int parallelism);
40+
RateLimiter<SplitT> createRateLimiter(int parallelism);
3841

3942
/**
4043
* Creates a {@code RateLimiterStrategy} that is limiting the number of records per second.

flink-tests/src/test/java/org/apache/flink/api/connector/source/lib/util/RateLimitedSourceReaderITCase.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020

2121
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
2222
import org.apache.flink.api.common.typeinfo.Types;
23+
import org.apache.flink.api.connector.source.lib.NumberSequenceSource;
2324
import org.apache.flink.api.connector.source.util.ratelimit.RateLimiter;
2425
import org.apache.flink.api.connector.source.util.ratelimit.RateLimiterStrategy;
2526
import org.apache.flink.connector.datagen.source.DataGeneratorSource;
@@ -87,7 +88,8 @@ private List<Long> range(int startInclusive, int endInclusive) {
8788
.collect(Collectors.toList());
8889
}
8990

90-
private static final class MockRateLimiter implements RateLimiter {
91+
private static final class MockRateLimiter
92+
implements RateLimiter<NumberSequenceSource.NumberSequenceSplit> {
9193

9294
int callCount;
9395

@@ -102,13 +104,15 @@ public int getCallCount() {
102104
}
103105
}
104106

105-
private static class MockRateLimiterStrategy implements RateLimiterStrategy {
107+
private static class MockRateLimiterStrategy
108+
implements RateLimiterStrategy<NumberSequenceSource.NumberSequenceSplit> {
106109

107110
private static final List<MockRateLimiter> rateLimiters =
108111
Collections.synchronizedList(new ArrayList<>());
109112

110113
@Override
111-
public RateLimiter createRateLimiter(int parallelism) {
114+
public RateLimiter<NumberSequenceSource.NumberSequenceSplit> createRateLimiter(
115+
int parallelism) {
112116
MockRateLimiter mockRateLimiter = new MockRateLimiter();
113117
rateLimiters.add(mockRateLimiter);
114118
return mockRateLimiter;

flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SinkV2ITCase.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import org.apache.flink.api.common.typeinfo.IntegerTypeInfo;
2626
import org.apache.flink.api.connector.sink2.Committer;
2727
import org.apache.flink.api.connector.source.Source;
28+
import org.apache.flink.api.connector.source.lib.NumberSequenceSource;
2829
import org.apache.flink.api.connector.source.util.ratelimit.GatedRateLimiter;
2930
import org.apache.flink.api.connector.source.util.ratelimit.RateLimiter;
3031
import org.apache.flink.api.connector.source.util.ratelimit.RateLimiterStrategy;
@@ -365,7 +366,7 @@ private void assertUnalignedCheckpointInNonSink(StreamGraph streamGraph) {
365366
* for another two checkpoints and 5) exiting.
366367
*/
367368
private Source<Integer, ?, ?> createStreamingSource() {
368-
RateLimiterStrategy<Void> rateLimiterStrategy =
369+
RateLimiterStrategy<NumberSequenceSource.NumberSequenceSplit> rateLimiterStrategy =
369370
parallelism -> new BurstingRateLimiter(SOURCE_DATA.size() / 4, 2);
370371
return new DataGeneratorSource<>(
371372
l -> SOURCE_DATA.get(l.intValue() % SOURCE_DATA.size()),
@@ -374,8 +375,9 @@ private void assertUnalignedCheckpointInNonSink(StreamGraph streamGraph) {
374375
IntegerTypeInfo.INT_TYPE_INFO);
375376
}
376377

377-
private static class BurstingRateLimiter implements RateLimiter<Void> {
378-
private final RateLimiter<Void> rateLimiter;
378+
private static class BurstingRateLimiter
379+
implements RateLimiter<NumberSequenceSource.NumberSequenceSplit> {
380+
private final RateLimiter<NumberSequenceSource.NumberSequenceSplit> rateLimiter;
379381
private final int numCheckpointCooldown;
380382
private int cooldown;
381383

0 commit comments

Comments
 (0)