Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
755e111
ext_proc: closing the gRPC stream ASAP once no more external processing
yanjunxiang-google Oct 8, 2025
f6b536b
fix format
yanjunxiang-google Oct 8, 2025
f8eba26
set processing_complete_ to true
yanjunxiang-google Oct 8, 2025
dcf5c6c
adding server close test
yanjunxiang-google Oct 8, 2025
07be305
skip onGrpcClose() if processing_complete_ true
yanjunxiang-google Oct 8, 2025
49fbe96
fix test
yanjunxiang-google Oct 8, 2025
33a1d74
fix format
yanjunxiang-google Oct 8, 2025
35ef112
address comments
yanjunxiang-google Oct 9, 2025
6d4aa09
fix test
yanjunxiang-google Oct 10, 2025
921c40b
moving stream close to the end of onReceiveMessage()
yanjunxiang-google Oct 14, 2025
bf71363
addressing comments
yanjunxiang-google Oct 14, 2025
2e8ab46
adding runtime protection
yanjunxiang-google Oct 14, 2025
b8934d1
fix format
yanjunxiang-google Oct 14, 2025
4f6542b
adding release notes
yanjunxiang-google Oct 15, 2025
8986c15
merge upstream main
yanjunxiang-google Oct 15, 2025
e713075
fix format
yanjunxiang-google Oct 15, 2025
1a55c23
fix format
yanjunxiang-google Oct 15, 2025
826a6cc
address comments
yanjunxiang-google Oct 16, 2025
422ec49
merge upstream main
yanjunxiang-google Oct 21, 2025
713ca86
moving stream closing to the end of encodeHeaders()
yanjunxiang-google Oct 21, 2025
d72caae
verifying stream is closed as expected with CONTINUE_AND_REPLACE
yanjunxiang-google Oct 24, 2025
ac44492
update changelogs and address comments
yanjunxiang-google Oct 24, 2025
ac0233f
address comments
yanjunxiang-google Oct 24, 2025
23a6c0f
fix format
yanjunxiang-google Oct 24, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions changelogs/current.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,13 @@ minor_behavior_changes:
<envoy_v3_api_msg_service.ext_proc.v3.HeaderMutation>` fails. Removing
request-specific details allows grouping by similar failure types. Detailed
messages remain available in debug logs.
- area: ext_proc
change: |
Closing the gRPC stream if Envoy detects no more external processing needed.
This doesn't apply to BUFFERED and BUFFERED_PARTIAL mode and a few corner cases for now.
For those cases, the stream will be closed during the filter destruction.
This behavior can be reverted by setting the runtime guard
``envoy.reloadable_features.ext_proc_stream_close_optimization`` to ``false``.
- area: ext_authz
change: |
Check the response header count is < the configured limits before applying mutations and don't
Expand Down
1 change: 1 addition & 0 deletions source/common/runtime/runtime_features.cc
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ RUNTIME_GUARD(envoy_reloadable_features_enable_cel_regex_precompilation);
RUNTIME_GUARD(envoy_reloadable_features_enable_compression_bomb_protection);
RUNTIME_GUARD(envoy_reloadable_features_enable_new_query_param_present_match_behavior);
RUNTIME_GUARD(envoy_reloadable_features_ext_proc_fail_close_spurious_resp);
RUNTIME_GUARD(envoy_reloadable_features_ext_proc_stream_close_optimization);
RUNTIME_GUARD(envoy_reloadable_features_generic_proxy_codec_buffer_limit);
RUNTIME_GUARD(envoy_reloadable_features_grpc_side_stream_flow_control);
RUNTIME_GUARD(envoy_reloadable_features_http1_balsa_delay_reset);
Expand Down
130 changes: 119 additions & 11 deletions source/extensions/filters/http/ext_proc/ext_proc.cc
Original file line number Diff line number Diff line change
Expand Up @@ -693,6 +693,16 @@ void Filter::deferredCloseStream() {
config_->threadLocalStreamManager().deferredErase(stream_, filter_callbacks_->dispatcher());
}

void Filter::closeStreamMaybeGraceful() {
processing_complete_ = true;
if (config_->gracefulGrpcClose()) {
halfCloseAndWaitForRemoteClose();
} else {
// Perform immediate close on the stream otherwise.
closeStream();
}
}

void Filter::onDestroy() {
ENVOY_STREAM_LOG(debug, "onDestroy", *decoder_callbacks_);
// Make doubly-sure we no longer use the stream, as
Expand Down Expand Up @@ -720,12 +730,7 @@ void Filter::onDestroy() {
// Second, perform stream deferred closure.
deferredCloseStream();
} else {
if (config_->gracefulGrpcClose()) {
halfCloseAndWaitForRemoteClose();
} else {
// Perform immediate close on the stream otherwise.
closeStream();
}
closeStreamMaybeGraceful();
}
}

Expand Down Expand Up @@ -1239,6 +1244,13 @@ FilterHeadersStatus Filter::encodeHeaders(ResponseHeaderMap& headers, bool end_s
if (!processing_complete_ && encoding_state_.shouldRemoveContentLength()) {
headers.removeContentLength();
}

// If there is no external processing configured in the encoding path,
// closing the gRPC stream if it is still open.
if (encoding_state_.noExternalProcess()) {
closeStreamMaybeGraceful();
}

return status;
}

Expand Down Expand Up @@ -1525,8 +1537,97 @@ ProcessingMode effectiveModeOverride(const ProcessingMode& target_override,
return mode_override;
}

// Returns true if this body response is the last message in the current direction (request or
// response path). This means no further body chunks or trailers are expected in this direction.
// For now, such check is only done for STREAMED or FULL_DUPLEX_STREAMED body mode. For any
// other body mode, it always return false.
bool isLastBodyResponse(ProcessorState& state,
const envoy::service::ext_proc::v3::BodyResponse& body_response) {
switch (state.bodyMode()) {
case ProcessingMode::BUFFERED:
case ProcessingMode::BUFFERED_PARTIAL:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

much appreciate if you could add a method comment talking about what this method is about, and comment heavily on which modes are not supported yet.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, could you pls also update in the change log?

// TODO: - skip stream closing optimization for BUFFERED and BUFFERED_PARTIAL for now.
return false;
case ProcessingMode::STREAMED:
if (!state.chunkQueue().empty()) {
return state.chunkQueue().queue().front()->end_stream;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need to consider the corner case that body is done, because we received trailers already here?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are other corner cases:
If CONTINUE_AND_REPLACE is set, it also means we can safely half-close.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved the stream optimization logic to the end of the onReceiveMessage, i.e, after the ProcessingResponse is already processed. This will automatically count in CONTINUE_AN_REPLACE, as such message will modify the filter processing modes configuration, thus be counted in the check.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need to consider the corner case that body is done, because we received trailers already here?

This is a little bit tricky. Can we skip this corner case? If we missed closing the stream in the decoding path, the stream closing added at the encodeHeaders() will catch it and cleanup any way.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the end goal is to close/finish the whole side stream before destructing the filter (before mainstream tears down).
It's okay to skip, as we can fall back to the usual closure for now, but pls comment here which cases are not considered yet.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if you do the evaluation after the response is "processed". then no need to check on state.chunkQueue().queue().front()->end_stream().
queue empty + state.completeBodyAvailable() works the same

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

queue empty + state.completeBodyAvailable() is ambiguous here. It may mean two different cases:

  1. The body chunk is the last body with EoS = true;
  2. or The body chunk is the last body with EoS = false, and trailer is received.

For case 1), it is equivalent to state.chunkQueue().queue().front()->end_stream(), which we can for sure close the stream. However, for case 2), it needs cross check the trailer processing mode to determine whether we can close the stream. We can do this in a follow up PR.

}
return false;
case ProcessingMode::FULL_DUPLEX_STREAMED: {
if (body_response.has_response() && body_response.response().has_body_mutation()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not sure if it's my bad taste, but we can ignore all these has_XXX IMHO.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we do not check these has_xxx, and if they are not set, the code will rely on the default behavior of these messages, which is error-prone.

const auto& body_mutation = body_response.response().body_mutation();
if (body_mutation.has_streamed_response()) {
return body_mutation.streamed_response().end_of_stream();
}
}
return false;
}
default:
break;
}
return false;
}

} // namespace

void Filter::closeGrpcStreamIfLastRespReceived(const ProcessingResponse& response,
const bool is_last_body_resp) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't fully follow why is_last_body_resp is an input parameter, could it be inferred using the response?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good question!

For FULL_DUPLEX_STREAMED mode, it can be inferred from the response.

For STREAMED mode, the response does not contain EOS. In this case, the EOS info is stored in the Envoy chunk_queue element. However, as we are doing the stream_closing after the response message already processed, i.e, the chunk_queue element is already popped by then. So, for STREAMED mode, we have to detect whether this is_last_body_resp before we process the response message. Then passing the is_last_body_resp to the stream_closing logic.

// Bail out if the gRPC stream has already been closed. This can happen in scenarios
// like immediate responses or rejected header mutations.
if (stream_ == nullptr || !Runtime::runtimeFeatureEnabled(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

checking if (stream_ == nullptr looks a bit strange to me. I assume it means the stream has already been closed?
Should this case already been handled by other code path?

Copy link
Contributor Author

@yanjunxiang-google yanjunxiang-google Oct 16, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, there are some cases in onReceiveMessage() which already close the stream. For example, ImmediateResponse handling, or a header response contains invalid mutation. For these cases the stream is already closed, so no need to go through this logic again. For other normal cases, the stream is not closed.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My point is closed stream should be handled by closeStream() below. i.e., this check is not needed.

But I understand your point of avoiding going through the rest of the function, which is fair. Maybe consider adding comment to clarify the check

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

"envoy.reloadable_features.ext_proc_stream_close_optimization")) {
return;
}

bool last_response = false;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is something that should part of the ProcessorState. After processing a response message it should return a flag indicating that a terminal state has been reached and gRPC stream can be closed.

Copy link
Contributor Author

@yanjunxiang-google yanjunxiang-google Oct 9, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm, the idea is to decide whether to terminate the gRPC stream once the ext_proc filter receives the ProcessingResponse, i.e, at the center place inside Filter::onReceiveMessage(), as Envoy has enough information here to decide whether to terminate it then. Basically, this is determined by 1)whether the end-of-stream is received, 2) and whether Envoy needs to send more data to the ext_proc server based on filter configuration. Hooking the logic to ProcessorState will unavoidably spread the logic to each of the handling header response, handling body response(different modes), and handling trailer response, which will make it error prone and hard to maintain.


switch (response.response_case()) {
case ProcessingResponse::ResponseCase::kRequestHeaders:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if the ext_proc sends invalid response, e.g. requestHeaders on Body events, it's a bad state that would cause stream close already.
Shall we evaluate all the cases after the response is processed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moving the stream closing logic to the end of onReceiveMessage() automatically count in this as well as stream_ become nullptr in these conditions.

if ((decoding_state_.hasNoBody() ||
(decoding_state_.bodyMode() == ProcessingMode::NONE && !decoding_state_.sendTrailers())) &&
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

other case: body mode confgiured, but trailers received.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

trailer mode set, but EoS seen in headers or Body already

Copy link
Contributor Author

@yanjunxiang-google yanjunxiang-google Oct 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added the cases you mentioned in the TBD list in the PR description, please take a look. The goal of this PR is to get the normal case optimization working, then we can deal with the corner cases in the future enhancement. BTW, if some corner cases are missed in the decoding path, the stream closing logic added in encodeHeaders() will catch them all.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's add the TODO comments in code. those are not "corner cases" IMHO.
Since you started this, I hope you could finish it :)

encoding_state_.noExternalProcess()) {
last_response = true;
}
break;
case ProcessingResponse::ResponseCase::kRequestBody:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this didn't consider request trailers: if EoS is set by decodeTrailers, last_response can be inferred by trailers mode and the queue state for streamed mode.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can do this in future PR. Some of the cases are listed here: #41425 (comment).

IMHO, in this PR, we just close the side stream for the cases we are 100% sure. For corner cases, they will be taken care of by the fall back mechanism which is filter onDestroy(). We don't have to be perfect in this PR. We can iterate and enhance. From another angle, it's probably bad to accidently close a stream we should not close. So, let's do this step-by-step.

if (is_last_body_resp && encoding_state_.noExternalProcess()) {
last_response = true;
}
break;
case ProcessingResponse::ResponseCase::kRequestTrailers:
if (encoding_state_.noExternalProcess()) {
last_response = true;
}
break;
case ProcessingResponse::ResponseCase::kResponseHeaders:
if (encoding_state_.hasNoBody() ||
(encoding_state_.bodyMode() == ProcessingMode::NONE && !encoding_state_.sendTrailers())) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if body_replaced_ is true, there is no more message to send as well.

Copy link
Contributor Author

@yanjunxiang-google yanjunxiang-google Oct 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The logic considered body_replaced_ already as CONTINUE_AND_REPLACE processing will also end up with

body_mode_ = ProcessingMode::NONE;
send_trailers_ = false;

So, the logic can capture this case.

last_response = true;
}
break;
case ProcessingResponse::ResponseCase::kResponseBody:
if (is_last_body_resp) {
last_response = true;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if not in this CL, comment on the case that's discussed on the request path

}
break;
case ProcessingResponse::ResponseCase::kResponseTrailers:
last_response = true;
break;
case ProcessingResponse::ResponseCase::kImmediateResponse:
// Immediate response currently may close the stream immediately.
// Leave it as it is for now.
break;
default:
break;
}

if (last_response) {
ENVOY_STREAM_LOG(debug, "Closing gRPC stream after receiving last response",
*decoder_callbacks_);
closeStreamMaybeGraceful();
}
}

void Filter::onReceiveMessage(std::unique_ptr<ProcessingResponse>&& r) {

if (config_->observabilityMode()) {
Expand Down Expand Up @@ -1594,6 +1695,8 @@ void Filter::onReceiveMessage(std::unique_ptr<ProcessingResponse>&& r) {

ENVOY_STREAM_LOG(debug, "Received {} response", *decoder_callbacks_,
responseCaseToString(response->response_case()));

bool is_last_body_resp = false;
absl::Status processing_status;
switch (response->response_case()) {
case ProcessingResponse::ResponseCase::kRequestHeaders:
Expand All @@ -1605,10 +1708,12 @@ void Filter::onReceiveMessage(std::unique_ptr<ProcessingResponse>&& r) {
processing_status = encoding_state_.handleHeadersResponse(response->response_headers());
break;
case ProcessingResponse::ResponseCase::kRequestBody:
is_last_body_resp = isLastBodyResponse(decoding_state_, response->request_body());
setDecoderDynamicMetadata(*response);
processing_status = decoding_state_.handleBodyResponse(response->request_body());
break;
case ProcessingResponse::ResponseCase::kResponseBody:
is_last_body_resp = isLastBodyResponse(encoding_state_, response->response_body());
setEncoderDynamicMetadata(*response);
processing_status = encoding_state_.handleBodyResponse(response->response_body());
break;
Expand All @@ -1634,11 +1739,7 @@ void Filter::onReceiveMessage(std::unique_ptr<ProcessingResponse>&& r) {
ENVOY_STREAM_LOG(debug, "Sending immediate response", *decoder_callbacks_);
processing_complete_ = true;
onFinishProcessorCalls(Grpc::Status::Ok);
if (config_->gracefulGrpcClose()) {
halfCloseAndWaitForRemoteClose();
} else {
closeStream();
}
closeStreamMaybeGraceful();
if (on_processing_response_) {
on_processing_response_->afterReceivingImmediateResponse(
response->immediate_response(), absl::OkStatus(), decoder_callbacks_->streamInfo());
Expand Down Expand Up @@ -1682,6 +1783,9 @@ void Filter::onReceiveMessage(std::unique_ptr<ProcessingResponse>&& r) {
stats_.stream_msgs_received_.inc();
handleErrorResponse(processing_status);
}

// Close the gRPC stream if no more external processing needed.
closeGrpcStreamIfLastRespReceived(*response, is_last_body_resp);
}

void Filter::onGrpcError(Grpc::Status::GrpcStatus status, const std::string& message) {
Expand Down Expand Up @@ -1717,6 +1821,10 @@ void Filter::onGrpcClose() { onGrpcCloseWithStatus(Grpc::Status::Aborted); }
void Filter::onGrpcCloseWithStatus(Grpc::Status::GrpcStatus status) {
ENVOY_STREAM_LOG(debug, "Received gRPC stream close", *decoder_callbacks_);

if (processing_complete_) {
return;
}

processing_complete_ = true;
stats_.streams_closed_.inc();
// Successful close. We can ignore the stream for the rest of our request
Expand Down
9 changes: 9 additions & 0 deletions source/extensions/filters/http/ext_proc/ext_proc.h
Original file line number Diff line number Diff line change
Expand Up @@ -590,6 +590,15 @@ class Filter : public Logger::Loggable<Logger::Id::ext_proc>,
Extensions::Filters::Common::Expr::BuilderInstanceSharedConstPtr builder,
Server::Configuration::CommonFactoryContext& context);

// Gracefully close the gRPC stream based on configuration.
void closeStreamMaybeGraceful();

// Closing the gRPC stream if the last ProcessingResponse is received.
// This stream closing optimization only applies to STREAMED or FULL_DUPLEX_STREAMED body modes.
// For other body modes like BUFFERED or BUFFERED_PARTIAL, it is ignored.
void closeGrpcStreamIfLastRespReceived(const ProcessingResponse& response,
const bool is_last_body_resp);

const FilterConfigSharedPtr config_;
const ClientBasePtr client_;
const ExtProcFilterStats& stats_;
Expand Down
8 changes: 8 additions & 0 deletions source/extensions/filters/http/ext_proc/processor_state.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ class ChunkQueue {
QueuedChunkPtr pop(Buffer::OwnedImpl& out_data);
const QueuedChunk& consolidate();
Buffer::OwnedImpl& receivedData() { return received_data_; }
const std::deque<QueuedChunkPtr>& queue() const { return queue_; }

private:
std::deque<QueuedChunkPtr> queue_;
Expand Down Expand Up @@ -100,6 +101,7 @@ class ProcessorState : public Logger::Loggable<Logger::Id::ext_proc> {

bool completeBodyAvailable() const { return complete_body_available_; }
void setCompleteBodyAvailable(bool d) { complete_body_available_ = d; }
bool hasNoBody() const { return no_body_; }
void setHasNoBody(bool b) { no_body_ = b; }
bool bodyReplaced() const { return body_replaced_; }
bool bodyReceived() const { return body_received_; }
Expand Down Expand Up @@ -593,6 +595,12 @@ class EncodingProcessorState : public ProcessorState {
return mgr.evaluateResponseAttributes(activation);
}

// Check whether external processing is configured in the encoding path.
bool noExternalProcess() const {
return (!send_headers_ && !send_trailers_ &&
body_mode_ == envoy::extensions::filters::http::ext_proc::v3::ProcessingMode::NONE);
}

private:
void setProcessingModeInternal(
const envoy::extensions::filters::http::ext_proc::v3::ProcessingMode& mode);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -447,6 +447,39 @@ TEST_P(ExtProcIntegrationTest, NoneToFullDuplexMoreDataAfterModeOverride) {
verifyDownstreamResponse(*response, 200);
}

TEST_P(ExtProcIntegrationTest, ServerWaitforEnvoyHalfCloseThenCloseStream) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we have a parameterized test that use enumerate through the response types and possible "shortcut switches" (i.e. eos-in-mxn-mode, CONTINUE_AND_REPLACE in header response etc) and check whether an early half-close is sent?
I know this is a lot of cases, but with this set up, we will have confidence in the changes we are making in this PR and upcoming PRs.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please check the changes in filter_test.cc and filter_full_duplex_test.cc. The config_->stats().streams_closed_ counters are specially checked to make sure we are now closing the streams ASAP in those cases.

scoped_runtime_.mergeValues({{"envoy.reloadable_features.ext_proc_graceful_grpc_close", "true"}});
proto_config_.mutable_processing_mode()->set_request_body_mode(
ProcessingMode::FULL_DUPLEX_STREAMED);
proto_config_.mutable_processing_mode()->set_request_trailer_mode(ProcessingMode::SEND);
proto_config_.mutable_processing_mode()->set_response_header_mode(ProcessingMode::SKIP);
initializeConfig();
HttpIntegrationTest::initialize();
auto response = sendDownstreamRequestWithBody("foo", absl::nullopt);

processRequestHeadersMessage(*grpc_upstreams_[0], true,
[](const HttpHeaders& headers, HeadersResponse&) {
EXPECT_FALSE(headers.end_of_stream());
return true;
});
processRequestBodyMessage(
*grpc_upstreams_[0], false, [](const HttpBody& body, BodyResponse& resp) {
EXPECT_TRUE(body.end_of_stream());
EXPECT_EQ(body.body().size(), 3);
auto* streamed_response =
resp.mutable_response()->mutable_body_mutation()->mutable_streamed_response();
streamed_response->set_body("bar");
streamed_response->set_end_of_stream(true);
return true;
});

// Server closes the stream.
processor_stream_->finishGrpcStream(Grpc::Status::Ok);

handleUpstreamRequest();
verifyDownstreamResponse(*response, 200);
}

} // namespace ExternalProcessing
} // namespace HttpFilters
} // namespace Extensions
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -445,15 +445,12 @@ TEST_P(ExtProcIntegrationTest, OnlyRequestHeadersServerHalfClosesFirst) {
mut1->mutable_header()->set_raw_value("new");
return true;
});
// ext_proc is configured to only send request headers. In this case, server indicates that it is
// not expecting any more messages from ext_proc filter and half-closes the stream.
processor_stream_->finishGrpcStream(Grpc::Status::Ok);

// ext_proc will immediately close side stream in this case, because by default Envoy gRPC client
// will reset the stream if the server half-closes before the client. Note that the ext_proc
// filter has not yet half-closed the sidestream, since it is doing it during its destruction.
// This is expected behavior for gRPC protocol.
// Envoy closes the side stream in this case.
EXPECT_TRUE(processor_stream_->waitForReset());
// ext_proc server indicates that it is not expecting any more messages
// from ext_proc filter and half-closes the stream.
processor_stream_->finishGrpcStream(Grpc::Status::Ok);

ASSERT_TRUE(fake_upstreams_[0]->waitForHttpConnection(*dispatcher_, fake_upstream_connection_));
ASSERT_TRUE(fake_upstream_connection_->waitForNewStream(*dispatcher_, upstream_request_));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,13 +167,16 @@ TEST_F(HttpFilterTest, DuplexStreamedBodyProcessingTestNormal) {
processResponseBodyHelper(" EEEEEEE ", want_response_body);
processResponseBodyHelper(" F ", want_response_body);
processResponseBodyHelper(" GGGGGGGGG ", want_response_body);
EXPECT_EQ(0, config_->stats().streams_closed_.value());
processResponseBodyHelper(" HH ", want_response_body, true, true);
EXPECT_EQ(1, config_->stats().streams_closed_.value());

// The two buffers should match.
EXPECT_EQ(want_response_body.toString(), got_response_body.toString());
EXPECT_FALSE(encoding_watermarked);
EXPECT_EQ(config_->stats().spurious_msgs_received_.value(), 0);
filter_->onDestroy();
EXPECT_EQ(1, config_->stats().streams_closed_.value());
}

TEST_F(HttpFilterTest, DuplexStreamedBodyProcessingTestWithTrailer) {
Expand Down Expand Up @@ -220,14 +223,16 @@ TEST_F(HttpFilterTest, DuplexStreamedBodyProcessingTestWithTrailer) {

processResponseBodyStreamedAfterTrailer(" AAAAA ", want_response_body);
processResponseBodyStreamedAfterTrailer(" BBBB ", want_response_body);
EXPECT_EQ(0, config_->stats().streams_closed_.value());
processResponseTrailers(absl::nullopt, true);

EXPECT_EQ(1, config_->stats().streams_closed_.value());
// The two buffers should match.
EXPECT_EQ(want_response_body.toString(), got_response_body.toString());
EXPECT_FALSE(encoding_watermarked);

EXPECT_EQ(config_->stats().spurious_msgs_received_.value(), 0);
filter_->onDestroy();
EXPECT_EQ(1, config_->stats().streams_closed_.value());
}

TEST_F(HttpFilterTest, DuplexStreamedBodyProcessingTestWithHeaderAndTrailer) {
Expand Down
Loading
Loading