-
Notifications
You must be signed in to change notification settings - Fork 438
RATIS-2421. Gracefully cancel stream after complete in GrpcLogAppender #1363
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
54af1e1
03d189d
6726e78
30b53c2
16297f7
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -35,6 +35,7 @@ | |
| import org.apache.ratis.server.util.ServerStringUtils; | ||
| import org.apache.ratis.thirdparty.io.grpc.StatusRuntimeException; | ||
| import org.apache.ratis.thirdparty.io.grpc.stub.CallStreamObserver; | ||
| import org.apache.ratis.thirdparty.io.grpc.stub.ClientCallStreamObserver; | ||
| import org.apache.ratis.thirdparty.io.grpc.stub.StreamObserver; | ||
| import org.apache.ratis.proto.RaftProtos.AppendEntriesReplyProto; | ||
| import org.apache.ratis.proto.RaftProtos.AppendEntriesReplyProto.AppendResult; | ||
|
|
@@ -60,7 +61,10 @@ | |
| import java.util.concurrent.CompletableFuture; | ||
| import java.util.concurrent.ConcurrentHashMap; | ||
| import java.util.concurrent.ExecutionException; | ||
| import java.util.concurrent.Executors; | ||
| import java.util.concurrent.ScheduledExecutorService; | ||
| import java.util.concurrent.TimeUnit; | ||
| import java.util.concurrent.atomic.AtomicBoolean; | ||
| import java.util.concurrent.atomic.AtomicLong; | ||
|
|
||
| /** | ||
|
|
@@ -341,20 +345,36 @@ private boolean haveTooManyPendingRequests() { | |
| } | ||
|
|
||
| static class StreamObservers { | ||
| private final CallStreamObserver<AppendEntriesRequestProto> appendLog; | ||
| private final CallStreamObserver<AppendEntriesRequestProto> heartbeat; | ||
| private final ClientCallStreamObserver<AppendEntriesRequestProto> appendLog; | ||
| private final ClientCallStreamObserver<AppendEntriesRequestProto> heartbeat; | ||
| private final TimeDuration waitForReady; | ||
| private final TimeDuration completeGracePeriod; | ||
| private volatile boolean running = true; | ||
|
|
||
| private final ScheduledExecutorService closer = | ||
| Executors.newSingleThreadScheduledExecutor(r -> { | ||
| Thread t = new Thread(r, "grpc-log-appender-stream-closer"); | ||
| t.setDaemon(true); | ||
| return t; | ||
| }); | ||
|
|
||
| private final AtomicBoolean completed = new AtomicBoolean(false); | ||
| private final AtomicBoolean cancelled = new AtomicBoolean(false); | ||
|
|
||
| StreamObservers(GrpcServerProtocolClient client, AppendLogResponseHandler handler, boolean separateHeartbeat, | ||
| TimeDuration waitTimeMin) { | ||
| this.appendLog = client.appendEntries(handler, false); | ||
| this.heartbeat = separateHeartbeat? client.appendEntries(handler, true): null; | ||
| TimeDuration waitTimeMin, TimeDuration completeGracePeriod) { | ||
| this.appendLog = (ClientCallStreamObserver<AppendEntriesRequestProto>) client.appendEntries(handler, false); | ||
| this.heartbeat = separateHeartbeat? | ||
| (ClientCallStreamObserver<AppendEntriesRequestProto>) client.appendEntries(handler, true): null; | ||
| this.waitForReady = waitTimeMin.isPositive()? waitTimeMin: TimeDuration.ONE_MILLISECOND; | ||
| this.completeGracePeriod = completeGracePeriod.isPositive()? completeGracePeriod : TimeDuration.ONE_SECOND; | ||
| } | ||
|
|
||
| void onNext(AppendEntriesRequestProto proto) | ||
| throws InterruptedIOException { | ||
| if (!running) { | ||
| throw new InterruptedIOException("StreamObservers is stopping/closing"); | ||
| } | ||
| CallStreamObserver<AppendEntriesRequestProto> stream; | ||
| boolean isHeartBeat = heartbeat != null && proto.getEntriesCount() == 0; | ||
| if (isHeartBeat) { | ||
|
|
@@ -366,16 +386,73 @@ void onNext(AppendEntriesRequestProto proto) | |
| while (!stream.isReady() && running) { | ||
| sleep(waitForReady, isHeartBeat); | ||
| } | ||
| stream.onNext(proto); | ||
| try { | ||
| stream.onNext(proto); | ||
| } catch (Exception e) { | ||
| InterruptedIOException ioe = | ||
| new InterruptedIOException("Failed to send request via stream"); | ||
| ioe.initCause(e); | ||
| throw ioe; | ||
| } | ||
| } | ||
|
|
||
| void stop() { | ||
| running = false; | ||
| } | ||
|
|
||
| void onCompleted() { | ||
| appendLog.onCompleted(); | ||
| Optional.ofNullable(heartbeat).ifPresent(StreamObserver::onCompleted); | ||
| if (completed.compareAndSet(false, true)) { | ||
| completeStreamGracefully(appendLog, "appendLog"); | ||
| Optional.ofNullable(heartbeat) | ||
| .ifPresent(s -> completeStreamGracefully(s, "heartbeat")); | ||
| } | ||
| final long delayMs = Math.max(1L, completeGracePeriod.toLong(TimeUnit.MILLISECONDS)); | ||
| closer.schedule(this::cancelIfStillNeeded, delayMs, TimeUnit.MILLISECONDS); | ||
| } | ||
|
|
||
| void cancelNow(String reason, Throwable cause) { | ||
| if (cancelled.compareAndSet(false, true)) { | ||
| running = false; | ||
| cancelStream(appendLog, "appendLog", reason, cause); | ||
| Optional.ofNullable(heartbeat) | ||
| .ifPresent(s -> cancelStream(s, "heartbeat", reason, cause)); | ||
| shutdownCloser(); | ||
| } | ||
| } | ||
|
|
||
| private void cancelIfStillNeeded() { | ||
| if (cancelled.compareAndSet(false, true)) { | ||
| cancelStream(appendLog, "appendLog", "Stream completion timeout", null); | ||
| Optional.ofNullable(heartbeat) | ||
| .ifPresent(s -> cancelStream(s, "heartbeat", "Stream completion timeout", null)); | ||
| } | ||
| shutdownCloser(); | ||
| } | ||
|
|
||
| private void completeStreamGracefully( | ||
| ClientCallStreamObserver<AppendEntriesRequestProto> stream, | ||
| String name) { | ||
| try { | ||
| stream.onCompleted(); | ||
| } catch (Exception e) { | ||
| LOG.warn("Failed to call onCompleted on {}", name, e); | ||
| } | ||
| } | ||
|
|
||
| private void cancelStream( | ||
| ClientCallStreamObserver<AppendEntriesRequestProto> stream, | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We are doing explicit cast to In such a case this can throw error.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. As a client, this one should always be a ClientCallStreamObserver. |
||
| String name, | ||
| String reason, | ||
| Throwable cause) { | ||
| try { | ||
| stream.cancel(reason, cause); | ||
| } catch (Exception e) { | ||
| LOG.warn("Failed to cancel {}", name, e); | ||
| } | ||
| } | ||
|
|
||
| private void shutdownCloser() { | ||
| closer.shutdown(); | ||
| } | ||
| } | ||
|
|
||
|
|
@@ -404,7 +481,8 @@ private void appendLog(boolean heartbeat) throws IOException { | |
| increaseNextIndex(pending); | ||
| if (appendLogRequestObserver == null) { | ||
| appendLogRequestObserver = new StreamObservers( | ||
| getClient(), new AppendLogResponseHandler(), useSeparateHBChannel, getWaitTimeMin()); | ||
| getClient(), new AppendLogResponseHandler(), useSeparateHBChannel, getWaitTimeMin(), | ||
| getCompleteGracePeriod()); | ||
| } | ||
| } | ||
|
|
||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This schedules
cancelIfStillNeeded()outside thecompleted.compareAndSet(...)block. Repeated onCompleted() calls can queue redundant cancel tasks.Can this be avoided?
Also I think if it’s invoked again after
shutdownCloser()it may throw exception.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The cancel and shutdown parts can be called multiple times.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But if it is completed already aren't subsequent cancel calls queuing tasks which are no longer required?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If onComplete() is invoked, the scheudler should be called and shutdown later, later requests should be rejected then.