Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,10 @@
import org.apache.flink.api.connector.source.SourceReader;
import org.apache.flink.api.connector.source.SourceReaderContext;
import org.apache.flink.api.connector.source.SourceSplit;
import org.apache.flink.api.connector.source.util.ratelimit.RateLimiterStrategy;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.base.source.reader.fetcher.SingleThreadFetcherManager;
import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;

import javax.annotation.Nullable;

Expand Down Expand Up @@ -84,7 +84,27 @@ public SingleThreadMultiplexSourceReaderBase(
/**
* This constructor behaves like {@link #SingleThreadMultiplexSourceReaderBase(Supplier,
* RecordEmitter, Configuration, SourceReaderContext)}, but accepts a specific {@link
* FutureCompletingBlockingQueue} and {@link SingleThreadFetcherManager}.
* RateLimiterStrategy}.
*/
public SingleThreadMultiplexSourceReaderBase(
Supplier<SplitReader<E, SplitT>> splitReaderSupplier,
RecordEmitter<E, T, SplitStateT> recordEmitter,
Configuration config,
SourceReaderContext context,
@Nullable RateLimiterStrategy<SplitT> rateLimiterStrategy) {
super(
new SingleThreadFetcherManager<>(splitReaderSupplier, config),
recordEmitter,
null,
config,
context,
rateLimiterStrategy);
}

/**
* This constructor behaves like {@link #SingleThreadMultiplexSourceReaderBase(Supplier,
* RecordEmitter, Configuration, SourceReaderContext)}, but accepts a specific {@link
* SingleThreadFetcherManager}.
*/
public SingleThreadMultiplexSourceReaderBase(
SingleThreadFetcherManager<E, SplitT> splitFetcherManager,
Expand All @@ -97,8 +117,7 @@ public SingleThreadMultiplexSourceReaderBase(
/**
* This constructor behaves like {@link #SingleThreadMultiplexSourceReaderBase(Supplier,
* RecordEmitter, Configuration, SourceReaderContext)}, but accepts a specific {@link
* FutureCompletingBlockingQueue}, {@link SingleThreadFetcherManager} and {@link
* RecordEvaluator}.
* SingleThreadFetcherManager} and {@link RecordEvaluator}.
*/
public SingleThreadMultiplexSourceReaderBase(
SingleThreadFetcherManager<E, SplitT> splitFetcherManager,
Expand All @@ -108,4 +127,26 @@ public SingleThreadMultiplexSourceReaderBase(
SourceReaderContext context) {
super(splitFetcherManager, recordEmitter, eofRecordEvaluator, config, context);
}

/**
* This constructor behaves like {@link
* #SingleThreadMultiplexSourceReaderBase(SingleThreadFetcherManager, RecordEmitter,
* RecordEvaluator, Configuration, SourceReaderContext)}, but accepts a specific {@link
* RateLimiterStrategy}.
*/
public SingleThreadMultiplexSourceReaderBase(
SingleThreadFetcherManager<E, SplitT> splitFetcherManager,
RecordEmitter<E, T, SplitStateT> recordEmitter,
@Nullable RecordEvaluator<T> eofRecordEvaluator,
Configuration config,
SourceReaderContext context,
@Nullable RateLimiterStrategy<SplitT> rateLimiterStrategy) {
super(
splitFetcherManager,
recordEmitter,
eofRecordEvaluator,
config,
context,
rateLimiterStrategy);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
import org.apache.flink.api.connector.source.SourceReader;
import org.apache.flink.api.connector.source.SourceReaderContext;
import org.apache.flink.api.connector.source.SourceSplit;
import org.apache.flink.api.connector.source.util.ratelimit.RateLimiter;
import org.apache.flink.api.connector.source.util.ratelimit.RateLimiterStrategy;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager;
import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
Expand Down Expand Up @@ -107,6 +109,15 @@ public abstract class SourceReaderBase<E, T, SplitT extends SourceSplit, SplitSt

@Nullable protected final RecordEvaluator<T> eofRecordEvaluator;

/** Indicating whether the SourceReader supports rate limiting or not. */
private final boolean rateLimitingEnabled;

/** The {@link RateLimiter} uses for rate limiting. */
@Nullable private final RateLimiter<SplitT> rateLimiter;

/** Future that tracks the result of acquiring permission from {@link #rateLimiter}. */
@Nullable private CompletableFuture<Void> rateLimitPermissionFuture;

/**
* The primary constructor for the source reader.
*
Expand All @@ -118,7 +129,7 @@ public SourceReaderBase(
RecordEmitter<E, T, SplitStateT> recordEmitter,
Configuration config,
SourceReaderContext context) {
this(splitFetcherManager, recordEmitter, null, config, context);
this(splitFetcherManager, recordEmitter, null, config, context, null);
}

public SourceReaderBase(
Expand All @@ -127,6 +138,16 @@ public SourceReaderBase(
@Nullable RecordEvaluator<T> eofRecordEvaluator,
Configuration config,
SourceReaderContext context) {
this(splitFetcherManager, recordEmitter, eofRecordEvaluator, config, context, null);
}

public SourceReaderBase(
SplitFetcherManager<E, SplitT> splitFetcherManager,
RecordEmitter<E, T, SplitStateT> recordEmitter,
@Nullable RecordEvaluator<T> eofRecordEvaluator,
Configuration config,
SourceReaderContext context,
@Nullable RateLimiterStrategy<SplitT> rateLimiterStrategy) {
this.elementsQueue = splitFetcherManager.getQueue();
this.splitFetcherManager = splitFetcherManager;
this.recordEmitter = recordEmitter;
Expand All @@ -136,8 +157,17 @@ public SourceReaderBase(
this.context = context;
this.noMoreSplitsAssignment = false;
this.eofRecordEvaluator = eofRecordEvaluator;

this.rateLimitingEnabled = rateLimiterStrategy != null;
numRecordsInCounter = context.metricGroup().getIOMetricGroup().getNumRecordsInCounter();

rateLimiter =
rateLimitingEnabled
? rateLimiterStrategy.createRateLimiter(context.currentParallelism())
: null;
LOG.info(
"Rate limiting of SourceReader is {}",
rateLimitingEnabled ? "enabled" : "disabled");
rateLimitPermissionFuture = CompletableFuture.completedFuture(null);
}

@Override
Expand All @@ -153,16 +183,66 @@ public InputStatus pollNext(ReaderOutput<T> output) throws Exception {
return trace(finishedOrAvailableLater());
}
}
if (rateLimitingEnabled) {
return pollNextWithRateLimiting(recordsWithSplitId, output);
} else {
return pollNextWithoutRateLimiting(recordsWithSplitId, output);
}
}

private InputStatus pollNextWithoutRateLimiting(
RecordsWithSplitIds<E> recordsWithSplitId, ReaderOutput<T> output) throws Exception {
// we need to loop here, in case we may have to go across splits
while (true) {
// Process one record.
final E record = recordsWithSplitId.nextRecordFromSplit();
if (record != null) {
// emit the record.
numRecordsInCounter.inc(1);
recordEmitter.emitRecord(record, currentSplitOutput, currentSplitContext.state);
LOG.trace("Emitted record: {}", record);
// We always emit MORE_AVAILABLE here, even though we do not strictly know whether
// more is available. If nothing more is available, the next invocation will find
// this out and return the correct status.
// That means we emit the occasional 'false positive' for availability, but this
// saves us doing checks for every record. Ultimately, this is cheaper.
return trace(InputStatus.MORE_AVAILABLE);
} else if (!moveToNextSplit(recordsWithSplitId, output)) {
// The fetch is done and we just discovered that and have not emitted anything, yet.
// We need to move to the next fetch. As a shortcut, we call pollNext() here again,
// rather than emitting nothing and waiting for the caller to call us again.
return pollNext(output);
}
}
}

// we need to loop here, because we may have to go across splits
private InputStatus pollNextWithRateLimiting(
RecordsWithSplitIds<E> recordsWithSplitId, ReaderOutput<T> output) throws Exception {
// make sure we have a fetch we are working on, or move to the next
while (true) {
// Check if the previous record count reached the limit of rateLimiter.
if (!rateLimitPermissionFuture.isDone()) {
return trace(InputStatus.MORE_AVAILABLE);
}
// Process one record.
final E record = recordsWithSplitId.nextRecordFromSplit();
if (record != null) {
// emit the record.
numRecordsInCounter.inc(1);
recordEmitter.emitRecord(record, currentSplitOutput, currentSplitContext.state);
LOG.trace("Emitted record: {}", record);
RateLimitingSourceOutputWrapper<T> rateLimitingSourceOutputWrapper =
(RateLimitingSourceOutputWrapper<T>) currentSplitOutput;
if (rateLimitingSourceOutputWrapper.getRecordsOfCurrentWindow() > 0) {
// Acquire permit from rateLimiter.
rateLimitPermissionFuture =
rateLimiter
.acquire(
rateLimitingSourceOutputWrapper
.getRecordsOfCurrentWindow())
.toCompletableFuture();
}
rateLimitingSourceOutputWrapper.resetWindowRecordCount();

// We always emit MORE_AVAILABLE here, even though we do not strictly know whether
// more is available. If nothing more is available, the next invocation will find
Expand Down Expand Up @@ -245,7 +325,14 @@ record -> {
return true;
};
}
currentSplitOutput = currentSplitContext.getOrCreateSplitOutput(output, eofRecordHandler);
if (rateLimitingEnabled) {
rateLimiter.notifyAddingSplit(
this.toSplitType(
this.currentSplitContext.splitId, this.currentSplitContext.state));
}
currentSplitOutput =
currentSplitContext.getOrCreateSplitOutput(
output, eofRecordHandler, rateLimitingEnabled);
LOG.trace("Emitting records from fetch for split {}", nextSplitId);
return true;
}
Expand All @@ -264,6 +351,16 @@ public List<SplitT> snapshotState(long checkpointId) {
return splits;
}

@Override
public void notifyCheckpointComplete(long checkpointId) throws Exception {
splitStates.forEach(
(id, context) -> {
if (rateLimitingEnabled) {
rateLimiter.notifyCheckpointComplete(checkpointId);
}
});
}

@Override
public void addSplits(List<SplitT> splits) {
LOG.info("Adding split(s) to reader: {}", splits);
Expand Down Expand Up @@ -364,7 +461,9 @@ private SplitContext(String splitId, SplitStateT state) {
}

SourceOutput<T> getOrCreateSplitOutput(
ReaderOutput<T> mainOutput, @Nullable Function<T, Boolean> eofRecordHandler) {
ReaderOutput<T> mainOutput,
@Nullable Function<T, Boolean> eofRecordHandler,
boolean rateLimitingEnabled) {
if (sourceOutput == null) {
// The split output should have been created when AddSplitsEvent was processed in
// SourceOperator. Here we just use this method to get the previously created
Expand All @@ -373,6 +472,9 @@ SourceOutput<T> getOrCreateSplitOutput(
if (eofRecordHandler != null) {
sourceOutput = new SourceOutputWrapper<>(sourceOutput, eofRecordHandler);
}
if (rateLimitingEnabled) {
sourceOutput = new RateLimitingSourceOutputWrapper<>(sourceOutput);
}
}
return sourceOutput;
}
Expand Down Expand Up @@ -435,4 +537,73 @@ private boolean isEndOfStreamReached(T record) {
return isStreamEnd;
}
}

/**
* A wrapper around {@link SourceOutput} that counts the number of records during the current
* rate-limiting window.
*
* <p>This wrapper is used when rate limiting is enabled to track how many records have been
* emitted since the last rate limit check, allowing the reader to properly apply backpressure
* when the rate limit is exceeded.
*
* @param <T> The type of records being emitted
*/
private static final class RateLimitingSourceOutputWrapper<T> implements SourceOutput<T> {
/** The underlying source output to delegate to. */
final SourceOutput<T> sourceOutput;

/** Number of records handled during the current rate-limiting window. */
private int recordsOfCurrentWindow;

/**
* Creates a new RecordCountingSourceOutputWrapper.
*
* @param sourceOutput The underlying source output to wrap
*/
public RateLimitingSourceOutputWrapper(SourceOutput<T> sourceOutput) {
this.sourceOutput = sourceOutput;
this.recordsOfCurrentWindow = 0;
}

@Override
public void emitWatermark(Watermark watermark) {
sourceOutput.emitWatermark(watermark);
}

@Override
public void markIdle() {
sourceOutput.markIdle();
}

@Override
public void markActive() {
sourceOutput.markActive();
}

@Override
public void collect(T record) {
sourceOutput.collect(record);
recordsOfCurrentWindow++;
}

@Override
public void collect(T record, long timestamp) {
sourceOutput.collect(record, timestamp);
recordsOfCurrentWindow++;
}

/**
* Gets the recordsOfCurrentWindow.
*
* @return the number of current window.
*/
public int getRecordsOfCurrentWindow() {
return recordsOfCurrentWindow;
}

/** Resets the recordsOfCurrentWindow to 0. */
public void resetWindowRecordCount() {
recordsOfCurrentWindow = 0;
}
}
}
Loading