-
Notifications
You must be signed in to change notification settings - Fork 5.1k
ext_proc: closing the gRPC stream ASAP once no more external processing needed #41425
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from 17 commits
755e111
f6b536b
f8eba26
dcf5c6c
07be305
49fbe96
33a1d74
35ef112
6d4aa09
921c40b
bf71363
2e8ab46
b8934d1
4f6542b
8986c15
e713075
1a55c23
826a6cc
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -693,6 +693,16 @@ void Filter::deferredCloseStream() { | |
config_->threadLocalStreamManager().deferredErase(stream_, filter_callbacks_->dispatcher()); | ||
} | ||
|
||
void Filter::closeStreamMaybeGraceful() { | ||
processing_complete_ = true; | ||
if (config_->gracefulGrpcClose()) { | ||
halfCloseAndWaitForRemoteClose(); | ||
} else { | ||
// Perform immediate close on the stream otherwise. | ||
closeStream(); | ||
} | ||
} | ||
|
||
void Filter::onDestroy() { | ||
ENVOY_STREAM_LOG(debug, "onDestroy", *decoder_callbacks_); | ||
// Make doubly-sure we no longer use the stream, as | ||
|
@@ -720,12 +730,7 @@ void Filter::onDestroy() { | |
// Second, perform stream deferred closure. | ||
deferredCloseStream(); | ||
} else { | ||
if (config_->gracefulGrpcClose()) { | ||
halfCloseAndWaitForRemoteClose(); | ||
} else { | ||
// Perform immediate close on the stream otherwise. | ||
closeStream(); | ||
} | ||
closeStreamMaybeGraceful(); | ||
} | ||
} | ||
|
||
|
@@ -1215,6 +1220,12 @@ FilterHeadersStatus Filter::encodeHeaders(ResponseHeaderMap& headers, bool end_s | |
// local reply. | ||
mergePerRouteConfig(); | ||
|
||
// If there is no external processing configured in the encoding path, | ||
// closing the gRPC stream if it is still open. | ||
if (encoding_state_.noExternalProcess()) { | ||
closeStreamMaybeGraceful(); | ||
} | ||
|
||
if (encoding_state_.sendHeaders() && config_->observabilityMode()) { | ||
return sendHeadersInObservabilityMode(headers, encoding_state_, end_stream); | ||
} | ||
|
@@ -1525,8 +1536,96 @@ ProcessingMode effectiveModeOverride(const ProcessingMode& target_override, | |
return mode_override; | ||
} | ||
|
||
// Returns true if this body response is the last message in the current direction (request or | ||
// response path). This means no further body chunks or trailers are expected in this direction. | ||
// For now, such check is only done for STREAMED or FULL_DUPLEX_STREAMED body mode. For any | ||
// other body mode, it always return false. | ||
bool isLastBodyResponse(ProcessorState& state, | ||
const envoy::service::ext_proc::v3::BodyResponse& body_response) { | ||
switch (state.bodyMode()) { | ||
case ProcessingMode::BUFFERED: | ||
case ProcessingMode::BUFFERED_PARTIAL: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. much appreciate if you could add a method comment talking about what this method is about, and comment heavily on which modes are not supported yet. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. done |
||
// TODO: - skip stream closing optimization for BUFFERED and BUFFERED_PARTIAL for now. | ||
break; | ||
case ProcessingMode::STREAMED: | ||
if (!state.chunkQueue().empty()) { | ||
return state.chunkQueue().queue().front()->end_stream; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. do we need to consider the corner case that body is done, because we received trailers already here? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There are other corner cases: There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Moved the stream optimization logic to the end of the onReceiveMessage, i.e, after the ProcessingResponse is already processed. This will automatically count in CONTINUE_AN_REPLACE, as such message will modify the filter processing modes configuration, thus be counted in the check. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
This is a little bit tricky. Can we skip this corner case? If we missed closing the stream in the decoding path, the stream closing added at the encodeHeaders() will catch it and cleanup any way. |
||
} | ||
break; | ||
case ProcessingMode::FULL_DUPLEX_STREAMED: { | ||
if (body_response.has_response() && body_response.response().has_body_mutation()) { | ||
const auto& body_mutation = body_response.response().body_mutation(); | ||
if (body_mutation.has_streamed_response()) { | ||
return body_mutation.streamed_response().end_of_stream(); | ||
} | ||
} | ||
break; | ||
} | ||
default: | ||
break; | ||
} | ||
return false; | ||
} | ||
|
||
} // namespace | ||
|
||
void Filter::closeGrpcStreamIfLastRespReceived(const ProcessingResponse& response, | ||
const bool is_last_body_resp) { | ||
|
||
if (stream_ == nullptr || !Runtime::runtimeFeatureEnabled( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. checking There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah, there are some cases in onReceiveMessage() which already close the stream. For example, ImmediateResponse handling, or a header response contains invalid mutation. For these cases the stream is already closed, so no need to go through this logic again. For other normal cases, the stream is not closed. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. My point is closed stream should be handled by closeStream() below. i.e., this check is not needed. But I understand your point of avoiding going through the rest of the function, which is fair. Maybe consider adding comment to clarify the check There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. done |
||
"envoy.reloadable_features.ext_proc_stream_close_optimization")) { | ||
return; | ||
} | ||
|
||
bool last_response = false; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is something that should part of the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. hmm, the idea is to decide whether to terminate the gRPC stream once the ext_proc filter receives the ProcessingResponse, i.e, at the center place inside Filter::onReceiveMessage(), as Envoy has enough information here to decide whether to terminate it then. Basically, this is determined by 1)whether the end-of-stream is received, 2) and whether Envoy needs to send more data to the ext_proc server based on filter configuration. Hooking the logic to ProcessorState will unavoidably spread the logic to each of the handling header response, handling body response(different modes), and handling trailer response, which will make it error prone and hard to maintain. |
||
|
||
switch (response.response_case()) { | ||
case ProcessingResponse::ResponseCase::kRequestHeaders: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. if the ext_proc sends invalid response, e.g. requestHeaders on Body events, it's a bad state that would cause stream close already. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Moving the stream closing logic to the end of onReceiveMessage() automatically count in this as well as stream_ become nullptr in these conditions. |
||
if ((decoding_state_.hasNoBody() || | ||
(decoding_state_.bodyMode() == ProcessingMode::NONE && !decoding_state_.sendTrailers())) && | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. other case: body mode confgiured, but trailers received. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. trailer mode set, but EoS seen in headers or Body already There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I added the cases you mentioned in the TBD list in the PR description, please take a look. The goal of this PR is to get the normal case optimization working, then we can deal with the corner cases in the future enhancement. BTW, if some corner cases are missed in the decoding path, the stream closing logic added in encodeHeaders() will catch them all. |
||
encoding_state_.noExternalProcess()) { | ||
last_response = true; | ||
} | ||
break; | ||
case ProcessingResponse::ResponseCase::kRequestBody: | ||
if (is_last_body_resp && encoding_state_.noExternalProcess()) { | ||
last_response = true; | ||
} | ||
break; | ||
case ProcessingResponse::ResponseCase::kRequestTrailers: | ||
if (encoding_state_.noExternalProcess()) { | ||
last_response = true; | ||
} | ||
break; | ||
case ProcessingResponse::ResponseCase::kResponseHeaders: | ||
if (encoding_state_.hasNoBody() || | ||
(encoding_state_.bodyMode() == ProcessingMode::NONE && !encoding_state_.sendTrailers())) { | ||
last_response = true; | ||
} | ||
break; | ||
case ProcessingResponse::ResponseCase::kResponseBody: | ||
if (is_last_body_resp) { | ||
last_response = true; | ||
} | ||
break; | ||
case ProcessingResponse::ResponseCase::kResponseTrailers: | ||
last_response = true; | ||
break; | ||
case ProcessingResponse::ResponseCase::kImmediateResponse: | ||
// Immediate response currently may close the stream immediately. | ||
// Leave it as it is for now. | ||
break; | ||
default: | ||
break; | ||
} | ||
|
||
if (last_response) { | ||
ENVOY_STREAM_LOG(debug, "Closing gRPC stream after receiving last response", | ||
*decoder_callbacks_); | ||
closeStreamMaybeGraceful(); | ||
} | ||
} | ||
|
||
void Filter::onReceiveMessage(std::unique_ptr<ProcessingResponse>&& r) { | ||
|
||
if (config_->observabilityMode()) { | ||
|
@@ -1594,6 +1693,8 @@ void Filter::onReceiveMessage(std::unique_ptr<ProcessingResponse>&& r) { | |
|
||
ENVOY_STREAM_LOG(debug, "Received {} response", *decoder_callbacks_, | ||
responseCaseToString(response->response_case())); | ||
|
||
bool is_last_body_resp = false; | ||
absl::Status processing_status; | ||
switch (response->response_case()) { | ||
case ProcessingResponse::ResponseCase::kRequestHeaders: | ||
|
@@ -1605,10 +1706,16 @@ void Filter::onReceiveMessage(std::unique_ptr<ProcessingResponse>&& r) { | |
processing_status = encoding_state_.handleHeadersResponse(response->response_headers()); | ||
break; | ||
case ProcessingResponse::ResponseCase::kRequestBody: | ||
if (response->has_request_body()) { | ||
|
||
is_last_body_resp = isLastBodyResponse(decoding_state_, response->request_body()); | ||
} | ||
setDecoderDynamicMetadata(*response); | ||
processing_status = decoding_state_.handleBodyResponse(response->request_body()); | ||
break; | ||
case ProcessingResponse::ResponseCase::kResponseBody: | ||
if (response->has_response_body()) { | ||
is_last_body_resp = isLastBodyResponse(encoding_state_, response->response_body()); | ||
} | ||
setEncoderDynamicMetadata(*response); | ||
processing_status = encoding_state_.handleBodyResponse(response->response_body()); | ||
break; | ||
|
@@ -1634,11 +1741,7 @@ void Filter::onReceiveMessage(std::unique_ptr<ProcessingResponse>&& r) { | |
ENVOY_STREAM_LOG(debug, "Sending immediate response", *decoder_callbacks_); | ||
processing_complete_ = true; | ||
onFinishProcessorCalls(Grpc::Status::Ok); | ||
if (config_->gracefulGrpcClose()) { | ||
halfCloseAndWaitForRemoteClose(); | ||
} else { | ||
closeStream(); | ||
} | ||
closeStreamMaybeGraceful(); | ||
if (on_processing_response_) { | ||
on_processing_response_->afterReceivingImmediateResponse( | ||
response->immediate_response(), absl::OkStatus(), decoder_callbacks_->streamInfo()); | ||
|
@@ -1682,6 +1785,9 @@ void Filter::onReceiveMessage(std::unique_ptr<ProcessingResponse>&& r) { | |
stats_.stream_msgs_received_.inc(); | ||
handleErrorResponse(processing_status); | ||
} | ||
|
||
// Close the gRPC stream if no more external processing needed. | ||
closeGrpcStreamIfLastRespReceived(*response, is_last_body_resp); | ||
} | ||
|
||
void Filter::onGrpcError(Grpc::Status::GrpcStatus status, const std::string& message) { | ||
|
@@ -1717,6 +1823,10 @@ void Filter::onGrpcClose() { onGrpcCloseWithStatus(Grpc::Status::Aborted); } | |
void Filter::onGrpcCloseWithStatus(Grpc::Status::GrpcStatus status) { | ||
ENVOY_STREAM_LOG(debug, "Received gRPC stream close", *decoder_callbacks_); | ||
|
||
if (processing_complete_) { | ||
return; | ||
} | ||
|
||
processing_complete_ = true; | ||
stats_.streams_closed_.inc(); | ||
// Successful close. We can ignore the stream for the rest of our request | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if
noExternalProcess
can you simply return FilterHeadersStatus::Continue here and avoids the rest of handling in this function?Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's a good point. However, I prefer if noExternalProcess, just close the stream and do not change the other logic, like still set below flag:
if (end_stream) {
encoding_state_.setCompleteBodyAvailable(true);
}
Also prefer to still have the trace logs if response_header sending is SKIP: ENVOY_STREAM_LOG(trace, "encodeHeaders: Skipped header processing", *decoder_callbacks_);
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am curious if
encoding_state_.setCompleteBodyAvailable(true);
and all other logics are still needed whennoExternalProcess
. Basically there is no callout at encode path at all. ext_proc becomes a pass-through filter, no?But if there are any corner cases you can think of, yes I agree going through rest of function is safer