Skip to content

Conversation

@lvyanquan
Copy link
Contributor

@lvyanquan lvyanquan commented Oct 21, 2025

What is the purpose of the change

Provide a configuration solution for fine-grained read speed control in the Flink Source connector.

Brief change log

  • Add new constructor for SourceReaderBase and SingleThreadMultiplexSourceReaderBase
  • Add CompletionStage acquire(int requestSize) and void notifyStatusChange(S status) method in RateLimiter

Verifying this change

This change added tests and can be verified as follows:

  • Added test in SourceReaderBaseTest that validates that Records were emitted with rate control

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): (no)
  • The public API, i.e., is any changed class annotated with @Public(Evolving): (yes)
  • The serializers: (no)
  • The runtime per-record code paths (performance sensitive): (yes)
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no)
  • The S3 file system connector: (no)

Documentation

  • Does this pull request introduce a new feature? (yes)
  • If yes, how is the feature documented? (JavaDocs)

@flinkbot
Copy link
Collaborator

flinkbot commented Oct 21, 2025

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

@github-actions github-actions bot added the community-reviewed PR has been reviewed by the community. label Oct 21, 2025
@lvyanquan
Copy link
Contributor Author

Hi @leonardBang @ruanhang1993, maybe you can help to review this.

@leonardBang leonardBang self-requested a review October 28, 2025 15:10
Copy link
Contributor

@leonardBang leonardBang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @lvyanquan for the contribution, I left some comments. And you need to open a task to add documentation for new feature

@Override
public CompletionStage<Void> acquire() {
CompletionStage<Void> stage = rateLimiter.acquire();
public CompletionStage<Void> acquire(int requestSize) {
Copy link
Contributor

@davidradl davidradl Oct 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When we say requestSize - does this mean the size of requests? Looking at the code it seems to relate to the number of futures we want to acquire, if so I would suggest renaming this. Maybe numberOfFuturesToAcquire.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The returned Future here represents a single asynchronous execution result, and there is no scenario where multiple Future instances are returned. Therefore, the variable name numberOfFutures is not appropriate.

The input parameter here represents the number of requests, which could specifically refer to the number (or even size) of data records. Since this interface is not exclusively called by SourceReader, I used the more generic term "Request" to abstract the concept beyond data-specific contexts.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok thanks for the clarification. numberOfRequests seems better to me as it is not todo the with the size of individual requests. It is up to you if you want to change or not.

Copy link
Contributor

@leonardBang leonardBang Oct 30, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 to use numberOfRequests @lvyanquan

Copy link
Contributor Author

@lvyanquan lvyanquan Oct 30, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree to use numberOf instead of size.
Note that the original comment for the acquire() method states:

* Returns a future that is completed once another event would not exceed the rate limit. For

Using event is more appropriate than request in this context, hence the term numberOfEvents is used now.

@lvyanquan
Copy link
Contributor Author

Thanks for suggestion of @davidradl and @leonardBang, updated.

@lvyanquan
Copy link
Contributor Author

@flinkbot run azure

@lvyanquan
Copy link
Contributor Author

lvyanquan commented Oct 30, 2025

Subtask for document: https://issues.apache.org/jira/browse/FLINK-38589.

@lvyanquan
Copy link
Contributor Author

@flinkbot run azure

@lvyanquan
Copy link
Contributor Author

Rebase master to fix CI failure of StreamExecutionEnvironmentTests::test_set_requirements_with_cached_directory.

@lvyanquan
Copy link
Contributor Author

Updated based on comments.

@lvyanquan
Copy link
Contributor Author

@flinkbot run azure

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

community-reviewed PR has been reviewed by the community.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants