-
Notifications
You must be signed in to change notification settings - Fork 5.1k
ext_proc: closing the gRPC stream ASAP once no more external processing needed #41425
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
ext_proc: closing the gRPC stream ASAP once no more external processing needed #41425
Conversation
needed Signed-off-by: Yanjun Xiang <[email protected]>
Signed-off-by: Yanjun Xiang <[email protected]>
/retest |
/assign @yanavlasov @tyxia @stevenzzzz |
Signed-off-by: Yanjun Xiang <[email protected]>
Signed-off-by: Yanjun Xiang <[email protected]>
Signed-off-by: Yanjun Xiang <[email protected]>
Signed-off-by: Yanjun Xiang <[email protected]>
Signed-off-by: Yanjun Xiang <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
/wait
|
||
} // namespace | ||
|
||
bool Filter::noExternalProcessInEncoding() const { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not add this method to the EncodingProcessorState
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
// Close the gRPC stream if the last ProcessingResponse is received. | ||
void Filter::closeGrpcStreamIfLastRespReceived( | ||
const std::unique_ptr<envoy::service::ext_proc::v3::ProcessingResponse>& response) { | ||
bool last_response = false; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is something that should part of the ProcessorState
. After processing a response message it should return a flag indicating that a terminal state has been reached and gRPC stream can be closed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hmm, the idea is to decide whether to terminate the gRPC stream once the ext_proc filter receives the ProcessingResponse, i.e, at the center place inside Filter::onReceiveMessage(), as Envoy has enough information here to decide whether to terminate it then. Basically, this is determined by 1)whether the end-of-stream is received, 2) and whether Envoy needs to send more data to the ext_proc server based on filter configuration. Hooking the logic to ProcessorState will unavoidably spread the logic to each of the handling header response, handling body response(different modes), and handling trailer response, which will make it error prone and hard to maintain.
Signed-off-by: Yanjun Xiang <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hooah. You are fighting a beast here. :)
high level, since this is a behavior change that impacts prod traffic, could you guard this change with a feature flag?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
left some comments, pls consider some common cases like(there for sure are more):
- CONTINUE_AND_REPLACE finishes one direction's all events.
- EOS from trailers implicitly terminates body.
- only check if-last-reponse after response is "processed", on error cases, stream will already be closed.
On a high level, if we have gone this far already, let's also consider to wait-for-trailers after the half close been sent. but that could be in another PR I assume.
const envoy::service::ext_proc::v3::ProcessingResponse& response) { | ||
switch (state.bodyMode()) { | ||
case ProcessingMode::BUFFERED: | ||
case ProcessingMode::BUFFERED_PARTIAL: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
much appreciate if you could add a method comment talking about what this method is about, and comment heavily on which modes are not supported yet.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
break; | ||
case ProcessingMode::STREAMED: | ||
if (!state.chunkQueue().empty()) { | ||
return state.chunkQueue().queue().front()->end_stream; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we need to consider the corner case that body is done, because we received trailers already here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are other corner cases:
If CONTINUE_AND_REPLACE is set, it also means we can safely half-close.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Moved the stream optimization logic to the end of the onReceiveMessage, i.e, after the ProcessingResponse is already processed. This will automatically count in CONTINUE_AN_REPLACE, as such message will modify the filter processing modes configuration, thus be counted in the check.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we need to consider the corner case that body is done, because we received trailers already here?
This is a little bit tricky. Can we skip this corner case? If we missed closing the stream in the decoding path, the stream closing added at the encodeHeaders() will catch it and cleanup any way.
case ProcessingMode::FULL_DUPLEX_STREAMED: { | ||
const envoy::service::ext_proc::v3::BodyResponse* body_response = nullptr; | ||
if (response.has_request_body()) { | ||
body_response = &response.request_body(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit, there should not be a if-else here.
instead, use the state's type to get the real body response field?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the corner cases above apply here as well: trailers received, CONTINUE_AND_REPLACE set.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit, there should not be a if-else here. instead, use the state's type to get the real body response field?
done
void closeStreamMaybeGraceful(); | ||
|
||
// Close the gRPC stream if the last ProcessingResponse is received. | ||
void closeGrpcStreamIfLastRespReceived( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit, you could use "const ProcessingResponse&" here to save some dereferences.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
bool last_response = false; | ||
|
||
switch (response->response_case()) { | ||
case ProcessingResponse::ResponseCase::kRequestHeaders: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if the ext_proc sends invalid response, e.g. requestHeaders on Body events, it's a bad state that would cause stream close already.
Shall we evaluate all the cases after the response is processed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Moving the stream closing logic to the end of onReceiveMessage() automatically count in this as well as stream_ become nullptr in these conditions.
switch (response->response_case()) { | ||
case ProcessingResponse::ResponseCase::kRequestHeaders: | ||
if ((decoding_state_.hasNoBody() || | ||
(decoding_state_.bodyMode() == ProcessingMode::NONE && !decoding_state_.sendTrailers())) && |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
other case: body mode confgiured, but trailers received.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
trailer mode set, but EoS seen in headers or Body already
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added the cases you mentioned in the TBD list in the PR description, please take a look. The goal of this PR is to get the normal case optimization working, then we can deal with the corner cases in the future enhancement. BTW, if some corner cases are missed in the decoding path, the stream closing logic added in encodeHeaders() will catch them all.
} | ||
break; | ||
case ProcessingResponse::ResponseCase::kRequestBody: | ||
if (isLastBodyResponse(decoding_state_, *response) && encoding_state_.noExternalProcess()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what if trailers are configured?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here we detect EoS is received with body, so no trailers in this request. Thus even filter is configured to send trailers, the external processing in this direction is completed.
Signed-off-by: Yanjun Xiang <[email protected]>
Waiting for addressing comments. /wait |
Signed-off-by: Yanjun Xiang <[email protected]>
Signed-off-by: Yanjun Xiang <[email protected]>
Signed-off-by: Yanjun Xiang <[email protected]>
Signed-off-by: Yanjun Xiang <[email protected]>
|
Signed-off-by: Yanjun Xiang <[email protected]>
Signed-off-by: Yanjun Xiang <[email protected]>
Thanks for the comments! done |
Signed-off-by: Yanjun Xiang <[email protected]>
Signed-off-by: Yanjun Xiang <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for working on this!
+1 on runtime guard to protect this change,. The processing mode/ext_proc is getting more and more complicated )
processing_status = encoding_state_.handleHeadersResponse(response->response_headers()); | ||
break; | ||
case ProcessingResponse::ResponseCase::kRequestBody: | ||
if (response->has_request_body()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
qq; why we need this response->has_request_body
. Do we receive empty response when response case is set to kRequestBody?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's a good point! done
// If there is no external processing configured in the encoding path, | ||
// closing the gRPC stream if it is still open. | ||
if (encoding_state_.noExternalProcess()) { | ||
closeStreamMaybeGraceful(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if noExternalProcess
can you simply return FilterHeadersStatus::Continue here and avoids the rest of handling in this function?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's a good point. However, I prefer if noExternalProcess, just close the stream and do not change the other logic, like still set below flag:
if (end_stream) {
encoding_state_.setCompleteBodyAvailable(true);
}
Also prefer to still have the trace logs if response_header sending is SKIP: ENVOY_STREAM_LOG(trace, "encodeHeaders: Skipped header processing", *decoder_callbacks_);
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am curious if encoding_state_.setCompleteBodyAvailable(true);
and all other logics are still needed when noExternalProcess
. Basically there is no callout at encode path at all. ext_proc becomes a pass-through filter, no?
But if there are any corner cases you can think of, yes I agree going through rest of function is safer
void Filter::closeGrpcStreamIfLastRespReceived(const ProcessingResponse& response, | ||
const bool is_last_body_resp) { | ||
|
||
if (stream_ == nullptr || !Runtime::runtimeFeatureEnabled( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
checking if (stream_ == nullptr
looks a bit strange to me. I assume it means the stream has already been closed?
Should this case already been handled by other code path?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, there are some cases in onReceiveMessage() which already close the stream. For example, ImmediateResponse handling, or a header response contains invalid mutation. For these cases the stream is already closed, so no need to go through this logic again. For other normal cases, the stream is not closed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My point is closed stream should be handled by closeStream() below. i.e., this check is not needed.
But I understand your point of avoiding going through the rest of the function, which is fair. Maybe consider adding comment to clarify the check
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
Signed-off-by: Yanjun Xiang <[email protected]>
The initial optimization does not apply for a few scenarios listed below, which is TBD in the future:
|
This is to address a portion of the issue: #37088, i.e, close the gRPC stream once no further external processing needed.
Currently the ext_proc gRPC stream is opened when the 1st ProcessingRequest is sent to the ext_proc server. And it is closed during ext_proc filter destruction. This is wasting resource on both Envoy and ext_proc server side. For example, if envoy is configured to only send request headers, the gRPC stream is left open until all the way to the response is processed.
This PR is trying to close the ext_proc gRPC stream once Envoy detects no more external processing needed.