Skip to content

Redistribute request max bytes across remote fetch partitions in DelayedShareFetch#22740

Open
adixitconfluent wants to merge 1 commit into
apache:trunkfrom
adixitconfluent:remote_fetch_partition_max_bytes
Open

Redistribute request max bytes across remote fetch partitions in DelayedShareFetch#22740
adixitconfluent wants to merge 1 commit into
apache:trunkfrom
adixitconfluent:remote_fetch_partition_max_bytes

Conversation

@adixitconfluent

Copy link
Copy Markdown
Contributor

About

When a share fetch request spans a mix of local-log and remote (tiered) partitions, the
byte caps for the remote reads are computed while the budget is still divided across
all acquired partitions, and are never revised after the non-remote partitions drop out.

The sequence in DelayedShareFetch.tryComplete():

  1. maybeReadFromLog computes per-partition max bytes via PartitionMaxBytesStrategy
    as requestMaxBytes / N, where N = all acquired partitions (deliberately, so
    partitions fetched later don't starve).
  2. For a tiered partition, ReplicaManager bakes that value into
    RemoteStorageFetchInfo.fetchMaxBytes (and the nested PartitionData.maxBytes).
  3. maybeProcessRemoteFetch then releases the N−R non-remote partitions without
    fetching them — they consume none of the budget in this request — but the R remote
    reads proceed with their stale requestMaxBytes / N caps unchanged.

The leftover budget is only partially recovered by the compensating local read in
completeRemoteStorageShareFetchRequest (maxBytes − readableBytes). If the released
partitions cannot be re-acquired at completion time (e.g. grabbed by a concurrent share
fetch during the remote read — a wide window) or have no data, the response goes out
well below the request budget.

Example: maxBytes = 1 MB, 10 partitions acquired, 1 remote. The remote read is capped
at ~100 KB; if the 9 local partitions yield nothing at completion, the response carries
~100 KB of a 1 MB budget. For tiered-storage-heavy share groups this means more round
trips for the same data.

Change

In processRemoteFetchOrException, once the remote-only partition set is known,
recompute the per-partition budget with
partitionMaxBytesStrategy.maxBytes(requestMaxBytes, remotePartitions, remotePartitions.size())
and rebuild each RemoteStorageFetchInfo with the raised cap before scheduling the
RemoteLogManager.asyncRead.

Notes on the implementation:

  • Both caps are replaced. RemoteLogManager.read clamps the read at
    Math.min(fetchMaxBytes, fetchInfo.maxBytes), so raising only the top-level
    fetchMaxBytes would be silently clamped by the nested partition-level value. The
    nested PartitionData here is broker-synthesized (share fetch has no client-set
    per-partition max bytes), so raising it does not affect any client contract.
  • Caps are never lowered. If the recomputed value is not larger than the existing
    cap, the original RemoteStorageFetchInfo is returned untouched, so the
    redistribution can only grant a remote read more budget than before.
  • The response-size invariant is preserved. The resized remote caps sum to at most
    requestMaxBytes, and the follow-up local read in
    completeRemoteStorageShareFetchRequest already sizes itself from the actual remote
    bytes returned (maxBytes − readableBytes), so the total response cannot exceed the
    request budget.

Trade-off: RemoteLogManager.read eagerly allocates a buffer of the effective cap, so
mixed local/remote requests now allocate larger transient buffers (up to the full
request max bytes instead of a 1/N share). This does not raise the existing per-read
ceiling — a request whose only acquired partition is remote already received the full
budget — and concurrency remains bounded by the remote reader thread pool.

Testing

  • New test testRemoteStorageFetchMaxBytesResizedToRemoteFetchPartitions: 3 acquired
    partitions (2 local, 1 remote) with the real UNIFORM strategy, asserting the remote
    read is scheduled with both byte caps raised to the full request budget and all other
    fetch-info fields carried over unchanged.

@github-actions github-actions Bot added triage PRs from the community core Kafka Broker KIP-932 Queues for Kafka labels Jul 2, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

ci-approved core Kafka Broker KIP-932 Queues for Kafka triage PRs from the community

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants