Skip to content
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
4a8a402
RATIS-2403. Support leader batch write to improve linearizable follow…
ivandika3 Feb 26, 2026
534240e
Move start daemon from constructor to start method to prevent race
ivandika3 Feb 26, 2026
5898f39
Try to adapt test to the new REPLIED_INDEX guarantee.
ivandika3 Feb 27, 2026
7f58060
Revert AtomicReference since it's not atomic and use synchronized lis…
ivandika3 Feb 27, 2026
36f8903
Introduce ReplyFlusher and update log
ivandika3 Mar 1, 2026
bfc5182
Remove unnecessary blank line
ivandika3 Mar 1, 2026
589117b
Fix findbugs
ivandika3 Mar 1, 2026
f1a2bae
Use appliedIndex during flush instead
ivandika3 Mar 2, 2026
e595eab
Fix replyFlusher never run issue
ivandika3 Mar 2, 2026
60380ee
Update documentation
ivandika3 Mar 2, 2026
5128716
Try to use read index as effective commit index
ivandika3 Mar 12, 2026
19321e3
Revert "Use appliedIndex during flush instead"
ivandika3 Mar 12, 2026
511e7df
Retain the original test
ivandika3 Mar 12, 2026
302b53e
Remove unnecessary parameter
ivandika3 Mar 12, 2026
c69b2b5
Assert read index type sync with the config
ivandika3 Mar 23, 2026
fb11a13
Address comments
ivandika3 Mar 27, 2026
57a7a64
Remove unused imports and fix tests
ivandika3 Mar 27, 2026
17d20ec
Update based on patch diff
ivandika3 Mar 31, 2026
676c52b
Add latch assertions to prevent test NPE
ivandika3 Mar 31, 2026
cea0549
Update repliedIndex to startupLogEntry if needed
ivandika3 Mar 31, 2026
2cf81b8
Update replyFlusher first to not blocked by notifyLeaderReady impleme…
ivandika3 Mar 31, 2026
85dd858
Initialize startupLogEntry once at ReplyFlusher start
ivandika3 Mar 31, 2026
bcec68e
Add some assertion logs
ivandika3 Apr 1, 2026
e9fe28b
Disable dummy request to not cause follower client to failover to leader
ivandika3 Apr 1, 2026
ece8ab6
Fix NoSuchElementException when dummy request is disabled
ivandika3 Apr 1, 2026
632e455
Implement Reply toString
ivandika3 Apr 1, 2026
78db698
Add logic to warm up clients to send first request
ivandika3 Apr 1, 2026
d48bcc9
Fix test
ivandika3 Apr 1, 2026
a078432
Update comments
ivandika3 Apr 1, 2026
45c1903
Merge branch 'master' into RATIS-2403
ivandika3 Apr 1, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 32 additions & 5 deletions ratis-docs/src/site/markdown/configurations.md
Original file line number Diff line number Diff line change
Expand Up @@ -220,11 +220,38 @@ if it fails to receive any RPC responses from this peer within this specified ti

### Read Index - Configurations related to ReadIndex used in linearizable read

| **Property** | `raft.server.read.read-index.applied-index.enabled` |
|:----------------|:----------------------------------------------------------------------|
| **Description** | whether applied index (instead of commit index) is used for ReadIndex |
| **Type** | boolean |
| **Default** | false |
| **Property** | `raft.server.read.read-index.type` |
|:----------------|:-----------------------------------------------------------------------------|
| **Description** | type of read index returned |
| **Type** | enum `Read.ReadIndex.Type` [`COMMIT_INDEX`, `APPLIED_INDEX`, `REPLIED_INDEX` |
| **Default** | `Read.ReadIndex.Type.COMMIT_INDEX` |

* `Read.ReadIndex.Type.COMMIT_INDEX` - Use leader's CommitIndex (see Raft Paper section 6.4)
* The safest type as it is specified in the Raft dissertation
* This ReadIndex type can be chosen if the base linearizable read from followers performance already meets expectations.

* `Read.ReadIndex.Type.APPLIED_INDEX` - Use leader's AppliedIndex
* Allow leader to return AppliedIndex (instead of CommitIndex) as the ReadIndex
* This reduces the time follower applying logs up to ReadIndex since AppliedIndex ≤ CommitIndex
* This ReadIndex type can be chosen `Read.ReadIndex.Type.COMMIT_INDEX` read latency is too high.

* `Read.ReadIndex.Type.REPLIED_INDEX` - Use leader's RepliedIndex
* RepliedIndex is defined as the AppliedIndex of the last write request replied by the leader.
* Leader delays replying write requests and only reply them every write batch boundary configurable by `raft.server.read.read-index.replied-index.batch-interval`.
* This allows the ReadIndex to advance in a coarser, less frequent steps, so followers are more likely to have already applied past the ReadIndex when a read arrives.
* This is most effective on read-heavy, follower-read workloads which prioritizes overall read throughput without consistency sacrifice.
* There is a trade-off in increased write latency (up to one `raft.server.read.read-index.replied-index.batch-interval`) per write.
* RepliedIndex still guarantees linearizability (no stale read) since by definition each ReadIndex returns the index of the last replied requests.
* If the RepliedIndex is set to 0, the behavior is identical to `Read.ReadIndex.Type.APPLIED_INDEX`

Note that theoretically all the ReadIndex types still guarantee linearizability,
but there are tradeoffs (e.g. Write and Read performance) between different types.

| **Property** | `raft.server.read.read-index.replied-index.batch-interval` |
|:----------------|:---------------------------------------------------------------------------------------------------------------------------------------------|
| **Description** | if `Read.ReadIndex.Type` is `REAPLIED_INDEX`, the interval at which held write replies are flushed to clients and `repliedIndex` is advanced |
| **Type** | TimeDuration |
| **Default** | 10ms |

| **Property** | `raft.server.read.leader.heartbeat-check.enabled` |
|:----------------|:--------------------------------------------------|
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -280,15 +280,34 @@ static void setWriteIndexCacheExpiryTime(RaftProperties properties, TimeDuration
interface ReadIndex {
String PREFIX = Read.PREFIX + ".read-index";

String APPLIED_INDEX_ENABLED_KEY = PREFIX + ".applied-index.enabled";
boolean APPLIED_INDEX_ENABLED_DEFAULT = false;
static boolean appliedIndexEnabled(RaftProperties properties) {
return getBoolean(properties::getBoolean, APPLIED_INDEX_ENABLED_KEY,
APPLIED_INDEX_ENABLED_DEFAULT, getDefaultLog());
enum Type {
/** ReadIndex returns leader's commitIndex (see Raft Paper section 6.4). */
COMMIT_INDEX,

/** ReadIndex returns leader's appliedIndex to reduce the ReadIndex latency. */
APPLIED_INDEX,

/** ReadIndex returns leader's repliedIndex, the index of the last replied request. */
REPLIED_INDEX
}

String TYPE_KEY = PREFIX + ".type";
Type TYPE_DEFAULT = Type.COMMIT_INDEX;
static Type type(RaftProperties properties) {
return get(properties::getEnum, TYPE_KEY, TYPE_DEFAULT, getDefaultLog());
}
static void setType(RaftProperties properties, Type type) {
set(properties::setEnum, TYPE_KEY, type);
}

static void setAppliedIndexEnabled(RaftProperties properties, boolean enabled) {
setBoolean(properties::setBoolean, APPLIED_INDEX_ENABLED_KEY, enabled);
String REPLIED_INDEX_BATCH_INTERVAL_KEY = PREFIX + ".replied-index.batch-interval";
TimeDuration REPLIED_INDEX_BATCH_INTERVAL_DEFAULT = TimeDuration.valueOf(10, TimeUnit.MILLISECONDS);
static TimeDuration repliedIndexBatchInterval(RaftProperties properties) {
return getTimeDuration(properties.getTimeDuration(REPLIED_INDEX_BATCH_INTERVAL_DEFAULT.getUnit()),
REPLIED_INDEX_BATCH_INTERVAL_KEY, REPLIED_INDEX_BATCH_INTERVAL_DEFAULT, getDefaultLog());
}
static void setRepliedIndexBatchInterval(RaftProperties properties, TimeDuration interval) {
setTimeDuration(properties::setTimeDuration, REPLIED_INDEX_BATCH_INTERVAL_KEY, interval);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.apache.ratis.protocol.exceptions.ReadIndexException;
import org.apache.ratis.protocol.exceptions.ReconfigurationTimeoutException;
import org.apache.ratis.server.RaftServerConfigKeys;
import org.apache.ratis.server.RaftServerConfigKeys.Read.ReadIndex.Type;
import org.apache.ratis.server.impl.ReadIndexHeartbeats.AppendEntriesListener;
import org.apache.ratis.server.leader.FollowerInfo;
import org.apache.ratis.server.leader.LeaderState;
Expand Down Expand Up @@ -80,8 +81,11 @@
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;

import java.util.function.LongSupplier;
import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.function.ToLongFunction;
import java.util.stream.Collectors;
import java.util.stream.Stream;
Expand Down Expand Up @@ -224,6 +228,19 @@ CompletableFuture<Void> stopAll() {
}
}

/** A write reply that has been built but not yet sent to the client */
private static class HeldReply {
private final PendingRequest pending;
private final RaftClientReply reply;
private final long index;

HeldReply(PendingRequest pending, RaftClientReply reply, long index) {
this.pending = pending;
this.reply = reply;
this.index = index;
}
}

/** For caching {@link FollowerInfo}s. This class is immutable. */
static class CurrentOldFollowerInfos {
private final RaftConfigurationImpl conf;
Expand Down Expand Up @@ -353,10 +370,23 @@ boolean isApplied() {
private final PendingStepDown pendingStepDown;

private final ReadIndexHeartbeats readIndexHeartbeats;
private final boolean readIndexAppliedIndexEnabled;
private final RaftServerConfigKeys.Read.ReadIndex.Type readIndexType;
private final Supplier<Long> readIndexSupplier;
private final MemoizedSupplier<String> readIndexLogPrefixSupplier;
private final boolean leaderHeartbeatCheckEnabled;
private final LeaderLease lease;

/** The interval at which held write replies are flushed. */
private final TimeDuration repliedIndexBatchInterval;
/** The highest log index for which a write reply has been flushed (sent to the client). */
private final AtomicLong repliedIndex;
/** Guards {@link #heldReplies}. */
private final Object heldRepliesLock = new Object();
/** Buffer holding write replies waiting to be flushed. Guarded by {@link #heldRepliesLock}. */
private List<HeldReply> heldReplies = new ArrayList<>();
/** Daemon thread that periodically flushes held replies. */
private volatile Daemon replyFlusher;

LeaderStateImpl(RaftServerImpl server) {
this.name = ServerStringUtils.generateUnifiedName(server.getMemberId(), getClass());
this.server = server;
Expand Down Expand Up @@ -391,8 +421,30 @@ boolean isApplied() {
} else {
this.followerMaxGapThreshold = (long) (followerGapRatioMax * maxPendingRequests);
}
this.readIndexAppliedIndexEnabled = RaftServerConfigKeys.Read.ReadIndex
.appliedIndexEnabled(properties);
this.readIndexType = RaftServerConfigKeys.Read.ReadIndex.type(properties);

this.repliedIndexBatchInterval =
RaftServerConfigKeys.Read.ReadIndex.repliedIndexBatchInterval(properties);
this.repliedIndex = new AtomicLong(state.getLastAppliedIndex());

switch (readIndexType) {
case REPLIED_INDEX:
readIndexSupplier = repliedIndex::get;
readIndexLogPrefixSupplier = MemoizedSupplier.valueOf(() -> "replied");
this.replyFlusher = Daemon.newBuilder()
.setName(name + "-ReplyFlusher")
.setRunnable(this::runReplyFlusher)
.build();
break;
case APPLIED_INDEX:
readIndexSupplier = () -> server.getState().getLastAppliedIndex();
readIndexLogPrefixSupplier = MemoizedSupplier.valueOf(() -> "applied");
break;
case COMMIT_INDEX:
default:
readIndexSupplier = () -> server.getRaftLog().getLastCommittedIndex();
readIndexLogPrefixSupplier = MemoizedSupplier.valueOf(() -> "commit");
}
this.leaderHeartbeatCheckEnabled = RaftServerConfigKeys.Read
.leaderHeartbeatCheckEnabled(properties);

Expand All @@ -419,6 +471,10 @@ void start() {
startupLogEntry.get();
processor.start();
senders.forEach(LogAppender::start);

if (replyFlusher != null) {
replyFlusher.start();
}
}

boolean isReady() {
Expand Down Expand Up @@ -453,6 +509,7 @@ CompletableFuture<Void> stop() {
startupLogEntry.get().getAppliedIndexFuture().completeExceptionally(
new ReadIndexException("failed to obtain read index since: ", nle));
server.getServerRpc().notifyNotLeader(server.getMemberId().getGroupId());
stopReplyFlusher();
logAppenderMetrics.unregister();
raftServerMetrics.unregister();
pendingRequests.close();
Expand Down Expand Up @@ -1140,22 +1197,21 @@ public boolean checkLeadership() {
/**
* Obtain the current readIndex for read only requests. See Raft paper section 6.4.
* 1. Leader makes sure at least one log from current term is committed.
* 2. Leader record last committed index or applied index (depending on configuration) as readIndex.
* 2. Leader record last committed index or applied index or replied index (depending on configuration) as readIndex.
* 3. Leader broadcast heartbeats to followers and waits for acknowledgements.
* 4. If majority respond success, returns readIndex.
* @return current readIndex.
*/
CompletableFuture<Long> getReadIndex(Long readAfterWriteConsistentIndex) {
final long index = readIndexAppliedIndexEnabled ?
server.getState().getLastAppliedIndex() : server.getRaftLog().getLastCommittedIndex();
final long index = readIndexSupplier.get();
final long readIndex;
if (readAfterWriteConsistentIndex != null && readAfterWriteConsistentIndex > index) {
readIndex = readAfterWriteConsistentIndex;
} else {
readIndex = index;
}
LOG.debug("readIndex={} ({}Index={}, readAfterWriteConsistentIndex={})",
readIndex, readIndexAppliedIndexEnabled ? "applied" : "commit",
readIndex, readIndexLogPrefixSupplier.get(),
index, readAfterWriteConsistentIndex);

// if group contains only one member, fast path
Expand Down Expand Up @@ -1218,9 +1274,73 @@ private boolean checkLeaderLease() {
}

void replyPendingRequest(TermIndex termIndex, RaftClientReply reply) {
pendingRequests.replyPendingRequest(termIndex, reply);
if (readIndexType == Type.REPLIED_INDEX) {
// Remove from pending map but hold the reply for batch flushing.
final PendingRequest pending = pendingRequests.removePendingRequest(termIndex);
if (pending != null) {
holdReply(pending, reply, termIndex.getIndex());
}
} else {
pendingRequests.replyPendingRequest(termIndex, reply);
}
}

/** Hold a write reply for later batch flushing. */
private void holdReply(PendingRequest pending, RaftClientReply reply, long index) {
synchronized (heldRepliesLock) {
heldReplies.add(new HeldReply(pending, reply, index));
}
}

/** Flush all held replies and advance {@link #repliedIndex}. */
private void flushReplies() {
final List<HeldReply> toFlush;
synchronized (heldRepliesLock) {
if (heldReplies.isEmpty()) {
return;
}
toFlush = heldReplies;
heldReplies = new ArrayList<>();
}

long maxIndex = repliedIndex.get();
for (HeldReply held : toFlush) {
held.pending.setReply(held.reply);
maxIndex = Math.max(maxIndex, held.index);
}
repliedIndex.set(maxIndex);
LOG.debug("{}: flushed {} replies, repliedIndex={}", name, toFlush.size(), maxIndex);
}

/** The reply flusher daemon loop. */
private void runReplyFlusher() {
while (isRunning()) {
try {
Thread.sleep(repliedIndexBatchInterval.toLong(TimeUnit.MILLISECONDS));
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
flushReplies();
}
// Flush remaining on exit.
flushReplies();
}

/** Stop the reply flusher daemon. */
private void stopReplyFlusher() {
final Daemon flusher = this.replyFlusher;
if (flusher != null) {
flusher.interrupt();
try {
flusher.join(repliedIndexBatchInterval.toLong(TimeUnit.MILLISECONDS) * 2);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}


TransactionContext getTransactionContext(TermIndex termIndex) {
return pendingRequests.getTransactionContext(termIndex);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,18 @@ void replyPendingRequest(TermIndex termIndex, RaftClientReply reply) {
}
}

/**
* Remove the {@link PendingRequest} for the given {@link TermIndex} without sending a reply.
* @return the removed {@link PendingRequest}, or null if not found.
*/
PendingRequest removePendingRequest(TermIndex termIndex) {
final PendingRequest pending = pendingRequests.remove(termIndex);
if (pending != null) {
Preconditions.assertEquals(termIndex, pending.getTermIndex(), "termIndex");
}
return pending;
}

/**
* The leader state is stopped. Send NotLeaderException to all the pending
* requests since they have not got applied to the state machine yet.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.ratis.retry.RetryPolicy;
import org.apache.ratis.server.RaftServer;
import org.apache.ratis.server.RaftServerConfigKeys;
import org.apache.ratis.server.RaftServerConfigKeys.Read.ReadIndex.Type;
import org.apache.ratis.server.impl.MiniRaftCluster;
import org.apache.ratis.util.Slf4jUtils;
import org.apache.ratis.util.TimeDuration;
Expand Down Expand Up @@ -60,7 +61,7 @@ public abstract class LinearizableReadTests<CLUSTER extends MiniRaftCluster>

public abstract boolean isLeaderLeaseEnabled();

public abstract boolean readIndexAppliedIndexEnabled();
public abstract Type readIndexType();

public abstract void assertRaftProperties(RaftProperties properties);

Expand All @@ -77,7 +78,7 @@ public void setup() {
CounterStateMachine.setProperties(p);
RaftServerConfigKeys.Read.setOption(p, LINEARIZABLE);
RaftServerConfigKeys.Read.setLeaderLeaseEnabled(p, isLeaderLeaseEnabled());
RaftServerConfigKeys.Read.ReadIndex.setAppliedIndexEnabled(p, readIndexAppliedIndexEnabled());
RaftServerConfigKeys.Read.ReadIndex.setType(p, readIndexType());
}

@Test
Expand Down Expand Up @@ -143,10 +144,12 @@ static <C extends MiniRaftCluster> void runTestFollowerLinearizableRead(C cluste

@Test
public void testFollowerLinearizableReadParallel() throws Exception {
runWithNewCluster(LinearizableReadTests::runTestFollowerReadOnlyParallel);
final Type type = readIndexType();
runWithNewCluster(cluster -> runTestFollowerReadOnlyParallel(type, cluster));
}

static <C extends MiniRaftCluster> void runTestFollowerReadOnlyParallel(C cluster) throws Exception {
static <C extends MiniRaftCluster> void runTestFollowerReadOnlyParallel(Type readIndexType, C cluster)
throws Exception {
final RaftPeerId leaderId = RaftTestUtil.waitForLeader(cluster).getId();

final List<RaftServer.Division> followers = cluster.getFollowers();
Expand All @@ -169,8 +172,17 @@ static <C extends MiniRaftCluster> void runTestFollowerReadOnlyParallel(C cluste
writeReplies.add(new Reply(count, leaderClient.async().send(WAIT_AND_INCREMENT)));
Thread.sleep(100);

assertReplyExact(count, f0Client.io().sendReadOnly(QUERY, f0));
f1Replies.add(new Reply(count, f1Client.async().sendReadOnly(QUERY, f1)));
if (readIndexType == Type.REPLIED_INDEX) {
// With REPLIED_INDEX the read index only advances after the leader has applied the
// transaction and the reply batch is flushed. WAIT_AND_INCREMENT takes 500 ms in
// the state machine but we only waited 100 ms, so its reply has not been generated
// yet and the follower read may only see the preceding sync INCREMENT (count - 1).
assertReplyAtLeast(count - 1, f0Client.io().sendReadOnly(QUERY, f0));
f1Replies.add(new Reply(count - 1, f1Client.async().sendReadOnly(QUERY, f1)));
} else {
assertReplyExact(count, f0Client.io().sendReadOnly(QUERY, f0));
f1Replies.add(new Reply(count, f1Client.async().sendReadOnly(QUERY, f1)));
}
Copy link
Copy Markdown
Contributor Author

@ivandika3 ivandika3 Feb 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried to fix this failing test, but the issue is that runTestFollowerReadOnlyParallel does not wait for the WAIT_AND_INCREMENT (which delays 500 ms although the test only sleep for 100ms) to return so technically we cannot assert that the count is incremented by WAIT_AND_INCREMENT since the StateMachine#applyTransactions future has not finished yet. I might be wrong.

When I tried to make repliedIndex to be current ServerState appliedIndex instead of the highest index of all the replies, the test pass. However, I think appliedIndex should be higher than repliedIndex so the follower might still need to wait longer than necessary. I considered to use the StateMachine.lastAppliedTermIndex, but we cannot guarantee the correctness or semantics of StateMachine.lastAppliedTermIndex (i.e. is it the highest index where applyTransaction future actually finish or simply the last applyTransaction called). I also found out that ServerState.appliedIndex does not mean that the async StateMachine#applyTransaction futures actually finish (although the sync StateMachine#applyTransactionSerial is guaranteed to be finished).

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry that the repliedIndex idea may not work:

We have the invariant

  • repliedIndex <= appliedIndex <= committedIndex

The repliedIndex idea is: When repliedIndex < appliedIndex, it is safe for a follower to return any states >= repliedIndex since appliedIndex is not yet visible to any clients -- The data is not yet stale in this sense.

However, after a follower/leader has returned a read call, its appliedIndex A becomes visible to that client. The subsequence reads must return a state >= A. Below is an example of the problem:

  1. Leader: repliedIndex = 10 < appliedIndex = 20
  2. Follower 1: appliedIndex = 18
  3. Follower 2: appliedIndex = 14
  4. Client first reads from Follower 1.
  5. The same client reads from Follower 2 (or the Leader) <---- stale read.

Let's see if there are some ways to make it work.

}

for (int i = 0; i < n; i++) {
Expand Down
Loading
Loading