Skip to content
Open
Show file tree
Hide file tree
Changes from 8 commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
755e111
ext_proc: closing the gRPC stream ASAP once no more external processing
yanjunxiang-google Oct 8, 2025
f6b536b
fix format
yanjunxiang-google Oct 8, 2025
f8eba26
set processing_complete_ to true
yanjunxiang-google Oct 8, 2025
dcf5c6c
adding server close test
yanjunxiang-google Oct 8, 2025
07be305
skip onGrpcClose() if processing_complete_ true
yanjunxiang-google Oct 8, 2025
49fbe96
fix test
yanjunxiang-google Oct 8, 2025
33a1d74
fix format
yanjunxiang-google Oct 8, 2025
35ef112
address comments
yanjunxiang-google Oct 9, 2025
6d4aa09
fix test
yanjunxiang-google Oct 10, 2025
921c40b
moving stream close to the end of onReceiveMessage()
yanjunxiang-google Oct 14, 2025
bf71363
addressing comments
yanjunxiang-google Oct 14, 2025
2e8ab46
adding runtime protection
yanjunxiang-google Oct 14, 2025
b8934d1
fix format
yanjunxiang-google Oct 14, 2025
4f6542b
adding release notes
yanjunxiang-google Oct 15, 2025
8986c15
merge upstream main
yanjunxiang-google Oct 15, 2025
e713075
fix format
yanjunxiang-google Oct 15, 2025
1a55c23
fix format
yanjunxiang-google Oct 15, 2025
826a6cc
address comments
yanjunxiang-google Oct 16, 2025
422ec49
merge upstream main
yanjunxiang-google Oct 21, 2025
713ca86
moving stream closing to the end of encodeHeaders()
yanjunxiang-google Oct 21, 2025
d72caae
verifying stream is closed as expected with CONTINUE_AND_REPLACE
yanjunxiang-google Oct 24, 2025
ac44492
update changelogs and address comments
yanjunxiang-google Oct 24, 2025
ac0233f
address comments
yanjunxiang-google Oct 24, 2025
23a6c0f
fix format
yanjunxiang-google Oct 24, 2025
d40a260
fixing tests when graceful close is enabled
yanjunxiang-google Oct 28, 2025
258aad1
Merge branch 'main' of https://github.com/envoyproxy/envoy into close…
yanjunxiang-google Oct 29, 2025
9a6fa7f
Merge branch 'main' of https://github.com/envoyproxy/envoy into close…
yanjunxiang-google Oct 29, 2025
86ea4b2
Merge branch 'main' of https://github.com/envoyproxy/envoy into close…
yanjunxiang-google Oct 30, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
123 changes: 112 additions & 11 deletions source/extensions/filters/http/ext_proc/ext_proc.cc
Original file line number Diff line number Diff line change
Expand Up @@ -715,6 +715,16 @@ void Filter::deferredCloseStream() {
config_->threadLocalStreamManager().deferredErase(stream_, filter_callbacks_->dispatcher());
}

void Filter::closeStreamMaybeGraceful() {
processing_complete_ = true;
if (config_->gracefulGrpcClose()) {
halfCloseAndWaitForRemoteClose();
} else {
// Perform immediate close on the stream otherwise.
closeStream();
}
}

void Filter::onDestroy() {
ENVOY_STREAM_LOG(debug, "onDestroy", *decoder_callbacks_);
// Make doubly-sure we no longer use the stream, as
Expand Down Expand Up @@ -742,12 +752,7 @@ void Filter::onDestroy() {
// Second, perform stream deferred closure.
deferredCloseStream();
} else {
if (config_->gracefulGrpcClose()) {
halfCloseAndWaitForRemoteClose();
} else {
// Perform immediate close on the stream otherwise.
closeStream();
}
closeStreamMaybeGraceful();
}
}

Expand Down Expand Up @@ -1237,6 +1242,12 @@ FilterHeadersStatus Filter::encodeHeaders(ResponseHeaderMap& headers, bool end_s
// local reply.
mergePerRouteConfig();

// If there is no external processing configured in the encoding path,
// closing the gRPC stream if it is still open.
if (encoding_state_.noExternalProcess()) {
closeStreamMaybeGraceful();
}

if (encoding_state_.sendHeaders() && config_->observabilityMode()) {
return sendHeadersInObservabilityMode(headers, encoding_state_, end_stream);
}
Expand Down Expand Up @@ -1547,8 +1558,94 @@ ProcessingMode effectiveModeOverride(const ProcessingMode& target_override,
return mode_override;
}

bool isLastBodyResponse(ProcessorState& state,
const envoy::service::ext_proc::v3::ProcessingResponse& response) {
switch (state.bodyMode()) {
case ProcessingMode::BUFFERED:
case ProcessingMode::BUFFERED_PARTIAL:
// TODO: - skip stream closing optimization for BUFFERED and BUFFERED_PARTIAL for now.
break;
case ProcessingMode::STREAMED:
if (!state.chunkQueue().empty()) {
return state.chunkQueue().queue().front()->end_stream;
}
break;
case ProcessingMode::FULL_DUPLEX_STREAMED: {
const envoy::service::ext_proc::v3::BodyResponse* body_response = nullptr;
if (response.has_request_body()) {
body_response = &response.request_body();
} else if (response.has_response_body()) {
body_response = &response.response_body();
}
if (body_response != nullptr && body_response->has_response()) {
const auto& common_response = body_response->response();
if (common_response.has_body_mutation() &&
common_response.body_mutation().has_streamed_response()) {
return common_response.body_mutation().streamed_response().end_of_stream();
}
}
break;
}
default:
break;
}
return false;
}

} // namespace

// Close the gRPC stream if the last ProcessingResponse is received.
void Filter::closeGrpcStreamIfLastRespReceived(
const std::unique_ptr<envoy::service::ext_proc::v3::ProcessingResponse>& response) {
bool last_response = false;

switch (response->response_case()) {
case ProcessingResponse::ResponseCase::kRequestHeaders:
if ((decoding_state_.hasNoBody() ||
(decoding_state_.bodyMode() == ProcessingMode::NONE && !decoding_state_.sendTrailers())) &&
encoding_state_.noExternalProcess()) {
last_response = true;
}
break;
case ProcessingResponse::ResponseCase::kRequestBody:
if (isLastBodyResponse(decoding_state_, *response) && encoding_state_.noExternalProcess()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what if trailers are configured?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here we detect EoS is received with body, so no trailers in this request. Thus even filter is configured to send trailers, the external processing in this direction is completed.

last_response = true;
}
break;
case ProcessingResponse::ResponseCase::kRequestTrailers:
if (encoding_state_.noExternalProcess()) {
last_response = true;
}
break;
case ProcessingResponse::ResponseCase::kResponseHeaders:
if (encoding_state_.hasNoBody() ||
(encoding_state_.bodyMode() == ProcessingMode::NONE && !encoding_state_.sendTrailers())) {
last_response = true;
}
break;
case ProcessingResponse::ResponseCase::kResponseBody:
if (isLastBodyResponse(encoding_state_, *response)) {
last_response = true;
}
break;
case ProcessingResponse::ResponseCase::kResponseTrailers:
last_response = true;
break;
case ProcessingResponse::ResponseCase::kImmediateResponse:
// Immediate response currently may close the stream immediately.
// Leave it as it is for now.
break;
default:
break;
}

if (last_response) {
ENVOY_STREAM_LOG(debug, "Closing gRPC stream after receiving last response",
*decoder_callbacks_);
closeStreamMaybeGraceful();
}
}

void Filter::onReceiveMessage(std::unique_ptr<ProcessingResponse>&& r) {

if (config_->observabilityMode()) {
Expand Down Expand Up @@ -1616,6 +1713,10 @@ void Filter::onReceiveMessage(std::unique_ptr<ProcessingResponse>&& r) {

ENVOY_STREAM_LOG(debug, "Received {} response", *decoder_callbacks_,
responseCaseToString(response->response_case()));

// Close the gRPC stream if this is the last response message.
closeGrpcStreamIfLastRespReceived(response);

absl::Status processing_status;
switch (response->response_case()) {
case ProcessingResponse::ResponseCase::kRequestHeaders:
Expand Down Expand Up @@ -1656,11 +1757,7 @@ void Filter::onReceiveMessage(std::unique_ptr<ProcessingResponse>&& r) {
ENVOY_STREAM_LOG(debug, "Sending immediate response", *decoder_callbacks_);
processing_complete_ = true;
onFinishProcessorCalls(Grpc::Status::Ok);
if (config_->gracefulGrpcClose()) {
halfCloseAndWaitForRemoteClose();
} else {
closeStream();
}
closeStreamMaybeGraceful();
if (on_processing_response_) {
on_processing_response_->afterReceivingImmediateResponse(
response->immediate_response(), absl::OkStatus(), decoder_callbacks_->streamInfo());
Expand Down Expand Up @@ -1739,6 +1836,10 @@ void Filter::onGrpcClose() { onGrpcCloseWithStatus(Grpc::Status::Aborted); }
void Filter::onGrpcCloseWithStatus(Grpc::Status::GrpcStatus status) {
ENVOY_STREAM_LOG(debug, "Received gRPC stream close", *decoder_callbacks_);

if (processing_complete_) {
return;
}

processing_complete_ = true;
stats_.streams_closed_.inc();
// Successful close. We can ignore the stream for the rest of our request
Expand Down
7 changes: 7 additions & 0 deletions source/extensions/filters/http/ext_proc/ext_proc.h
Original file line number Diff line number Diff line change
Expand Up @@ -617,6 +617,13 @@ class Filter : public Logger::Loggable<Logger::Id::ext_proc>,
Extensions::Filters::Common::Expr::BuilderInstanceSharedConstPtr builder,
Server::Configuration::CommonFactoryContext& context);

// Gracefully close the gRPC stream based on configuration.
void closeStreamMaybeGraceful();

// Close the gRPC stream if the last ProcessingResponse is received.
void closeGrpcStreamIfLastRespReceived(
const std::unique_ptr<envoy::service::ext_proc::v3::ProcessingResponse>& response);

const FilterConfigSharedPtr config_;
const ClientBasePtr client_;
const ExtProcFilterStats& stats_;
Expand Down
8 changes: 8 additions & 0 deletions source/extensions/filters/http/ext_proc/processor_state.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ class ChunkQueue {
QueuedChunkPtr pop(Buffer::OwnedImpl& out_data);
const QueuedChunk& consolidate();
Buffer::OwnedImpl& receivedData() { return received_data_; }
const std::deque<QueuedChunkPtr>& queue() const { return queue_; }

private:
std::deque<QueuedChunkPtr> queue_;
Expand Down Expand Up @@ -100,6 +101,7 @@ class ProcessorState : public Logger::Loggable<Logger::Id::ext_proc> {

bool completeBodyAvailable() const { return complete_body_available_; }
void setCompleteBodyAvailable(bool d) { complete_body_available_ = d; }
bool hasNoBody() const { return no_body_; }
void setHasNoBody(bool b) { no_body_ = b; }
bool bodyReplaced() const { return body_replaced_; }
bool bodyReceived() const { return body_received_; }
Expand Down Expand Up @@ -593,6 +595,12 @@ class EncodingProcessorState : public ProcessorState {
return mgr.evaluateResponseAttributes(activation);
}

// Check whether external processing is configured in the encoding path.
bool noExternalProcess() const {
return (!send_headers_ && !send_trailers_ &&
body_mode_ == envoy::extensions::filters::http::ext_proc::v3::ProcessingMode::NONE);
}

private:
void setProcessingModeInternal(
const envoy::extensions::filters::http::ext_proc::v3::ProcessingMode& mode);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -447,6 +447,40 @@ TEST_P(ExtProcIntegrationTest, NoneToFullDuplexMoreDataAfterModeOverride) {
verifyDownstreamResponse(*response, 200);
}

TEST_P(ExtProcIntegrationTest, ServerWaitforEnvoyHalfCloseThenCloseStream) {
proto_config_.mutable_processing_mode()->set_request_body_mode(
ProcessingMode::FULL_DUPLEX_STREAMED);
proto_config_.mutable_processing_mode()->set_request_trailer_mode(ProcessingMode::SEND);
proto_config_.mutable_processing_mode()->set_response_header_mode(ProcessingMode::SKIP);
initializeConfig();
HttpIntegrationTest::initialize();
auto response = sendDownstreamRequestWithBody("foo", absl::nullopt);

processRequestHeadersMessage(*grpc_upstreams_[0], true,
[](const HttpHeaders& headers, HeadersResponse&) {
EXPECT_FALSE(headers.end_of_stream());
return true;
});
processRequestBodyMessage(
*grpc_upstreams_[0], false, [](const HttpBody& body, BodyResponse& resp) {
EXPECT_TRUE(body.end_of_stream());
EXPECT_EQ(body.body().size(), 3);
auto* streamed_response =
resp.mutable_response()->mutable_body_mutation()->mutable_streamed_response();
streamed_response->set_body("bar");
streamed_response->set_end_of_stream(true);
return true;
});

// Envoy half closes the stream.
EXPECT_TRUE(processor_stream_->waitForReset());
// Server closes the stream.
processor_stream_->finishGrpcStream(Grpc::Status::Ok);

handleUpstreamRequest();
verifyDownstreamResponse(*response, 200);
}

} // namespace ExternalProcessing
} // namespace HttpFilters
} // namespace Extensions
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -445,15 +445,12 @@ TEST_P(ExtProcIntegrationTest, OnlyRequestHeadersServerHalfClosesFirst) {
mut1->mutable_header()->set_raw_value("new");
return true;
});
// ext_proc is configured to only send request headers. In this case, server indicates that it is
// not expecting any more messages from ext_proc filter and half-closes the stream.
processor_stream_->finishGrpcStream(Grpc::Status::Ok);

// ext_proc will immediately close side stream in this case, because by default Envoy gRPC client
// will reset the stream if the server half-closes before the client. Note that the ext_proc
// filter has not yet half-closed the sidestream, since it is doing it during its destruction.
// This is expected behavior for gRPC protocol.
// Envoy closes the side stream in this case.
EXPECT_TRUE(processor_stream_->waitForReset());
// ext_proc server indicates that it is not expecting any more messages
// from ext_proc filter and half-closes the stream.
processor_stream_->finishGrpcStream(Grpc::Status::Ok);

ASSERT_TRUE(fake_upstreams_[0]->waitForHttpConnection(*dispatcher_, fake_upstream_connection_));
ASSERT_TRUE(fake_upstream_connection_->waitForNewStream(*dispatcher_, upstream_request_));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,13 +167,16 @@ TEST_F(HttpFilterTest, DuplexStreamedBodyProcessingTestNormal) {
processResponseBodyHelper(" EEEEEEE ", want_response_body);
processResponseBodyHelper(" F ", want_response_body);
processResponseBodyHelper(" GGGGGGGGG ", want_response_body);
EXPECT_EQ(0, config_->stats().streams_closed_.value());
processResponseBodyHelper(" HH ", want_response_body, true, true);
EXPECT_EQ(1, config_->stats().streams_closed_.value());

// The two buffers should match.
EXPECT_EQ(want_response_body.toString(), got_response_body.toString());
EXPECT_FALSE(encoding_watermarked);
EXPECT_EQ(config_->stats().spurious_msgs_received_.value(), 0);
filter_->onDestroy();
EXPECT_EQ(1, config_->stats().streams_closed_.value());
}

TEST_F(HttpFilterTest, DuplexStreamedBodyProcessingTestWithTrailer) {
Expand Down Expand Up @@ -220,14 +223,16 @@ TEST_F(HttpFilterTest, DuplexStreamedBodyProcessingTestWithTrailer) {

processResponseBodyStreamedAfterTrailer(" AAAAA ", want_response_body);
processResponseBodyStreamedAfterTrailer(" BBBB ", want_response_body);
EXPECT_EQ(0, config_->stats().streams_closed_.value());
processResponseTrailers(absl::nullopt, true);

EXPECT_EQ(1, config_->stats().streams_closed_.value());
// The two buffers should match.
EXPECT_EQ(want_response_body.toString(), got_response_body.toString());
EXPECT_FALSE(encoding_watermarked);

EXPECT_EQ(config_->stats().spurious_msgs_received_.value(), 0);
filter_->onDestroy();
EXPECT_EQ(1, config_->stats().streams_closed_.value());
}

TEST_F(HttpFilterTest, DuplexStreamedBodyProcessingTestWithHeaderAndTrailer) {
Expand Down
Loading