Skip to content

Commit 40539ec

Browse files
authored
RATIS-2333. Fix TestInstallSnapshotNotificationWithGrpc failure. (#1289)
1 parent 35615d9 commit 40539ec

File tree

5 files changed

+34
-27
lines changed

5 files changed

+34
-27
lines changed

ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcUtil.java

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -138,25 +138,27 @@ static Throwable tryUnwrapThrowable(StatusRuntimeException se) {
138138
static long getCallId(Throwable t) {
139139
if (t instanceof StatusRuntimeException) {
140140
final Metadata trailers = ((StatusRuntimeException)t).getTrailers();
141-
String callId = trailers.get(CALL_ID);
142-
return callId != null ? Integer.parseInt(callId) : -1;
141+
if (trailers != null) {
142+
final String callId = trailers.get(CALL_ID);
143+
return callId != null ? Integer.parseInt(callId) : -1;
144+
}
143145
}
144146
return -1;
145147
}
146148

147149
static boolean isHeartbeat(Throwable t) {
148150
if (t instanceof StatusRuntimeException) {
149151
final Metadata trailers = ((StatusRuntimeException)t).getTrailers();
150-
String isHeartbeat = trailers != null ? trailers.get(HEARTBEAT) : null;
151-
return isHeartbeat != null && Boolean.valueOf(isHeartbeat);
152+
final String isHeartbeat = trailers != null ? trailers.get(HEARTBEAT) : null;
153+
return Boolean.parseBoolean(isHeartbeat);
152154
}
153155
return false;
154156
}
155157

156158
static IOException unwrapIOException(Throwable t) {
157159
final IOException e;
158160
if (t instanceof StatusRuntimeException) {
159-
e = GrpcUtil.unwrapException((StatusRuntimeException) t);
161+
e = unwrapException((StatusRuntimeException) t);
160162
} else {
161163
e = IOUtils.asIOException(t);
162164
}
@@ -172,15 +174,15 @@ static <REPLY, REPLY_PROTO> void asyncCall(
172174
supplier.get().whenComplete((reply, exception) -> {
173175
if (exception != null) {
174176
warning.accept(exception);
175-
responseObserver.onError(GrpcUtil.wrapException(exception));
177+
responseObserver.onError(wrapException(exception));
176178
} else {
177179
responseObserver.onNext(toProto.apply(reply));
178180
responseObserver.onCompleted();
179181
}
180182
});
181183
} catch (Exception e) {
182184
warning.accept(e);
183-
responseObserver.onError(GrpcUtil.wrapException(e));
185+
responseObserver.onError(wrapException(e));
184186
}
185187
}
186188

@@ -189,7 +191,7 @@ static void warn(Logger log, Supplier<String> message, Throwable t) {
189191
}
190192

191193
class StatusRuntimeExceptionMetadataBuilder {
192-
private Metadata trailers = new Metadata();
194+
private final Metadata trailers = new Metadata();
193195

194196
StatusRuntimeExceptionMetadataBuilder(Throwable t) {
195197
trailers.put(EXCEPTION_TYPE_KEY, t.getClass().getCanonicalName());

ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolService.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@
2424
import org.apache.ratis.server.RaftServer;
2525
import org.apache.ratis.server.protocol.RaftServerProtocol;
2626
import org.apache.ratis.server.util.ServerStringUtils;
27+
import org.apache.ratis.thirdparty.com.google.protobuf.MessageOrBuilder;
28+
import org.apache.ratis.thirdparty.com.google.protobuf.TextFormat;
2729
import org.apache.ratis.thirdparty.io.grpc.Status;
2830
import org.apache.ratis.thirdparty.io.grpc.StatusRuntimeException;
2931
import org.apache.ratis.thirdparty.io.grpc.stub.StreamObserver;
@@ -67,7 +69,8 @@ CompletableFuture<Void> getFuture() {
6769
}
6870
}
6971

70-
abstract class ServerRequestStreamObserver<REQUEST, REPLY> implements StreamObserver<REQUEST> {
72+
abstract class ServerRequestStreamObserver<REQUEST, REPLY extends MessageOrBuilder>
73+
implements StreamObserver<REQUEST> {
7174
private final RaftServer.Op op;
7275
private final Supplier<String> nameSupplier;
7376
private final StreamObserver<REPLY> responseObserver;
@@ -172,7 +175,8 @@ public void onCompleted() {
172175
getId(), op, getPreviousRequestString(), suffix));
173176
requestFuture.get().thenAccept(reply -> {
174177
BatchLogger.print(BatchLogKey.COMPLETED_REPLY, getName(),
175-
suffix -> LOG.info("{}: Completed {}, lastReply: {} {}", getId(), op, reply, suffix));
178+
suffix -> LOG.info("{}: Completed {}, lastReply: {} {}",
179+
getId(), op, TextFormat.shortDebugString(reply), suffix));
176180
responseObserver.onCompleted();
177181
});
178182
}

ratis-server/src/main/java/org/apache/ratis/server/impl/SnapshotInstallationHandler.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import org.apache.ratis.server.protocol.TermIndex;
3737
import org.apache.ratis.server.raftlog.LogProtoUtils;
3838
import org.apache.ratis.server.util.ServerStringUtils;
39+
import org.apache.ratis.thirdparty.com.google.protobuf.TextFormat;
3940
import org.apache.ratis.util.BatchLogger;
4041
import org.apache.ratis.util.CodeInjectionForTesting;
4142
import org.apache.ratis.util.LifeCycle;
@@ -144,7 +145,7 @@ private InstallSnapshotReplyProto installSnapshotImpl(InstallSnapshotRequestProt
144145
final LogEntryProto proto = request.getLastRaftConfigurationLogEntryProto();
145146
state.truncate(proto.getIndex());
146147
if (!state.getRaftConf().equals(LogProtoUtils.toRaftConfiguration(proto))) {
147-
LOG.info("{}: set new configuration {} from snapshot", getMemberId(), proto);
148+
LOG.info("{}: set new configuration {} from snapshot", getMemberId(), TextFormat.shortDebugString(proto));
148149
state.setRaftConf(proto);
149150
state.writeRaftConfiguration(proto);
150151
server.getStateMachine().event().notifyConfigurationChanged(

ratis-server/src/test/java/org/apache/ratis/InstallSnapshotNotificationTests.java

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,7 @@ public abstract class InstallSnapshotNotificationTests<CLUSTER extends MiniRaftC
7979

8080
RaftServerConfigKeys.Log.setPurgeGap(prop, PURGE_GAP);
8181
RaftServerConfigKeys.Log.setSegmentSizeMax(prop, SizeInBytes.valueOf(1024)); // 1k segment
82+
RaftServerConfigKeys.LeaderElection.setMemberMajorityAdd(prop, true);
8283
}
8384

8485
private static final int SNAPSHOT_TRIGGER_THRESHOLD = 64;
@@ -239,9 +240,8 @@ private void testAddNewFollowers(CLUSTER cluster, int numRequests) throws Except
239240
final boolean set = LEADER_SNAPSHOT_INFO_REF.compareAndSet(null, leaderSnapshotInfo);
240241
Assertions.assertTrue(set);
241242

242-
// add two more peers
243-
final MiniRaftCluster.PeerChanges change = cluster.addNewPeers(2, true,
244-
true);
243+
// Add new peer(s)
244+
final MiniRaftCluster.PeerChanges change = cluster.addNewPeers(1, true, true);
245245
// trigger setConfiguration
246246
RaftServerTestUtil.runWithMinorityPeers(cluster, Arrays.asList(change.allPeersInNewConf),
247247
peers -> cluster.setConfiguration(peers.toArray(RaftPeer.emptyArray())));
@@ -389,9 +389,9 @@ private void testInstallSnapshotNotificationCount(CLUSTER cluster) throws Except
389389
follower.getRaftLog().getNextIndex());
390390
}
391391

392-
// Add two more peers who will need snapshots from the leader.
393-
final MiniRaftCluster.PeerChanges change = cluster.addNewPeers(2, true,
394-
true);
392+
// Add new peer(s) who will need snapshots from the leader.
393+
final int numNewPeers = 1;
394+
final MiniRaftCluster.PeerChanges change = cluster.addNewPeers(numNewPeers, true, true);
395395
// trigger setConfiguration
396396
RaftServerTestUtil.runWithMinorityPeers(cluster, Arrays.asList(change.allPeersInNewConf),
397397
peers -> cluster.setConfiguration(peers.toArray(RaftPeer.emptyArray())));
@@ -412,7 +412,7 @@ private void testInstallSnapshotNotificationCount(CLUSTER cluster) throws Except
412412
}
413413

414414
// Make sure each new peer got one snapshot notification.
415-
Assertions.assertEquals(2, numSnapshotRequests.get());
415+
Assertions.assertEquals(numNewPeers, numSnapshotRequests.get());
416416

417417
} finally {
418418
cluster.shutdown();
@@ -556,9 +556,9 @@ private void testInstallSnapshotDuringBootstrap(CLUSTER cluster) throws Exceptio
556556
final boolean set = LEADER_SNAPSHOT_INFO_REF.compareAndSet(null, leaderSnapshotInfo);
557557
Assertions.assertTrue(set);
558558

559-
// add two more peers
560-
final MiniRaftCluster.PeerChanges change = cluster.addNewPeers(2, true,
561-
true);
559+
// Add new peer(s)
560+
final int numNewPeers = 1;
561+
final MiniRaftCluster.PeerChanges change = cluster.addNewPeers(numNewPeers, true, true);
562562
// trigger setConfiguration
563563
RaftServerTestUtil.runWithMinorityPeers(cluster, Arrays.asList(change.allPeersInNewConf),
564564
peers -> cluster.setConfiguration(peers.toArray(RaftPeer.emptyArray())));
@@ -573,7 +573,7 @@ private void testInstallSnapshotDuringBootstrap(CLUSTER cluster) throws Exceptio
573573
}
574574

575575
// Make sure each new peer got at least one snapshot notification.
576-
Assertions.assertTrue(2 <= numSnapshotRequests.get());
576+
Assertions.assertTrue(numNewPeers <= numSnapshotRequests.get());
577577
} finally {
578578
cluster.shutdown();
579579
}

ratis-server/src/test/java/org/apache/ratis/statemachine/RaftSnapshotBaseTest.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -96,16 +96,16 @@ public static void assertLeaderContent(MiniRaftCluster cluster) throws Exception
9696
assertLogContent(leader, true);
9797
}
9898

99-
public static void assertLogContent(RaftServer.Division server, boolean isLeader) throws Exception {
99+
public static void checkMetadataEntry(RaftServer.Division server) throws Exception {
100100
final RaftLog log = server.getRaftLog();
101101
final long lastIndex = log.getLastEntryTermIndex().getIndex();
102102
final LogEntryProto e = log.get(lastIndex);
103103
Assertions.assertTrue(e.hasMetadataEntry());
104+
Assertions.assertEquals(log.getLastCommittedIndex() - 1, e.getMetadataEntry().getCommitIndex());
105+
}
104106

105-
JavaUtils.attemptRepeatedly(() -> {
106-
Assertions.assertEquals(log.getLastCommittedIndex() - 1, e.getMetadataEntry().getCommitIndex());
107-
return null;
108-
}, 50, BaseTest.HUNDRED_MILLIS, "CheckMetadataEntry", LOG);
107+
public static void assertLogContent(RaftServer.Division server, boolean isLeader) throws Exception {
108+
JavaUtils.attempt(() -> checkMetadataEntry(server), 50, HUNDRED_MILLIS, "checkMetadataEntry", LOG);
109109

110110
SimpleStateMachine4Testing simpleStateMachine = SimpleStateMachine4Testing.get(server);
111111
if (isLeader) {

0 commit comments

Comments
 (0)