Skip to content
Open
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
128 changes: 117 additions & 11 deletions source/extensions/filters/http/ext_proc/ext_proc.cc
Original file line number Diff line number Diff line change
Expand Up @@ -715,6 +715,16 @@ void Filter::deferredCloseStream() {
config_->threadLocalStreamManager().deferredErase(stream_, filter_callbacks_->dispatcher());
}

void Filter::closeStreamMaybeGraceful() {
processing_complete_ = true;
if (config_->gracefulGrpcClose()) {
halfCloseAndWaitForRemoteClose();
} else {
// Perform immediate close on the stream otherwise.
closeStream();
}
}

void Filter::onDestroy() {
ENVOY_STREAM_LOG(debug, "onDestroy", *decoder_callbacks_);
// Make doubly-sure we no longer use the stream, as
Expand Down Expand Up @@ -742,12 +752,7 @@ void Filter::onDestroy() {
// Second, perform stream deferred closure.
deferredCloseStream();
} else {
if (config_->gracefulGrpcClose()) {
halfCloseAndWaitForRemoteClose();
} else {
// Perform immediate close on the stream otherwise.
closeStream();
}
closeStreamMaybeGraceful();
}
}

Expand Down Expand Up @@ -1237,6 +1242,12 @@ FilterHeadersStatus Filter::encodeHeaders(ResponseHeaderMap& headers, bool end_s
// local reply.
mergePerRouteConfig();

// If there is no external processing configured in the encoding path,
// closing the gRPC stream if it is still open.
if (noExternalProcessInEncoding()) {
closeStreamMaybeGraceful();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if noExternalProcess can you simply return FilterHeadersStatus::Continue here and avoids the rest of handling in this function?

Copy link
Contributor Author

@yanjunxiang-google yanjunxiang-google Oct 16, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a good point. However, I prefer if noExternalProcess, just close the stream and do not change the other logic, like still set below flag:
if (end_stream) {
encoding_state_.setCompleteBodyAvailable(true);
}
Also prefer to still have the trace logs if response_header sending is SKIP: ENVOY_STREAM_LOG(trace, "encodeHeaders: Skipped header processing", *decoder_callbacks_);

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am curious if encoding_state_.setCompleteBodyAvailable(true); and all other logics are still needed when noExternalProcess . Basically there is no callout at encode path at all. ext_proc becomes a pass-through filter, no?

But if there are any corner cases you can think of, yes I agree going through rest of function is safer

}

if (encoding_state_.sendHeaders() && config_->observabilityMode()) {
return sendHeadersInObservabilityMode(headers, encoding_state_, end_stream);
}
Expand Down Expand Up @@ -1547,8 +1558,99 @@ ProcessingMode effectiveModeOverride(const ProcessingMode& target_override,
return mode_override;
}

bool isLastBodyResponse(ProcessorState& state,
const envoy::service::ext_proc::v3::ProcessingResponse& response) {
switch (state.bodyMode()) {
case ProcessingMode::BUFFERED:
case ProcessingMode::BUFFERED_PARTIAL:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

much appreciate if you could add a method comment talking about what this method is about, and comment heavily on which modes are not supported yet.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

// TODO: - skip stream closing optimization for BUFFERED and BUFFERED_PARTIAL for now.
break;
case ProcessingMode::STREAMED:
if (!state.chunkQueue().empty()) {
return state.chunkQueue().queue().front()->end_stream;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need to consider the corner case that body is done, because we received trailers already here?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are other corner cases:
If CONTINUE_AND_REPLACE is set, it also means we can safely half-close.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved the stream optimization logic to the end of the onReceiveMessage, i.e, after the ProcessingResponse is already processed. This will automatically count in CONTINUE_AN_REPLACE, as such message will modify the filter processing modes configuration, thus be counted in the check.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need to consider the corner case that body is done, because we received trailers already here?

This is a little bit tricky. Can we skip this corner case? If we missed closing the stream in the decoding path, the stream closing added at the encodeHeaders() will catch it and cleanup any way.

}
break;
case ProcessingMode::FULL_DUPLEX_STREAMED: {
const envoy::service::ext_proc::v3::BodyResponse* body_response = nullptr;
if (response.has_request_body()) {
body_response = &response.request_body();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit, there should not be a if-else here.
instead, use the state's type to get the real body response field?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the corner cases above apply here as well: trailers received, CONTINUE_AND_REPLACE set.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit, there should not be a if-else here. instead, use the state's type to get the real body response field?

done

} else if (response.has_response_body()) {
body_response = &response.response_body();
}
if (body_response != nullptr && body_response->has_response()) {
const auto& common_response = body_response->response();
if (common_response.has_body_mutation() &&
common_response.body_mutation().has_streamed_response()) {
return common_response.body_mutation().streamed_response().end_of_stream();
}
}
break;
}
default:
break;
}
return false;
}

} // namespace

bool Filter::noExternalProcessInEncoding() const {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not add this method to the EncodingProcessorState ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

return (!encoding_state_.sendHeaders() && encoding_state_.bodyMode() == ProcessingMode::NONE &&
!encoding_state_.sendTrailers());
}

// Close the gRPC stream if the last ProcessingResponse is received.
void Filter::closeGrpcStreamIfLastRespReceived(
const std::unique_ptr<envoy::service::ext_proc::v3::ProcessingResponse>& response) {
bool last_response = false;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is something that should part of the ProcessorState. After processing a response message it should return a flag indicating that a terminal state has been reached and gRPC stream can be closed.

Copy link
Contributor Author

@yanjunxiang-google yanjunxiang-google Oct 9, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm, the idea is to decide whether to terminate the gRPC stream once the ext_proc filter receives the ProcessingResponse, i.e, at the center place inside Filter::onReceiveMessage(), as Envoy has enough information here to decide whether to terminate it then. Basically, this is determined by 1)whether the end-of-stream is received, 2) and whether Envoy needs to send more data to the ext_proc server based on filter configuration. Hooking the logic to ProcessorState will unavoidably spread the logic to each of the handling header response, handling body response(different modes), and handling trailer response, which will make it error prone and hard to maintain.


switch (response->response_case()) {
case ProcessingResponse::ResponseCase::kRequestHeaders:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if the ext_proc sends invalid response, e.g. requestHeaders on Body events, it's a bad state that would cause stream close already.
Shall we evaluate all the cases after the response is processed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moving the stream closing logic to the end of onReceiveMessage() automatically count in this as well as stream_ become nullptr in these conditions.

if ((decoding_state_.hasNoBody() ||
(decoding_state_.bodyMode() == ProcessingMode::NONE && !decoding_state_.sendTrailers())) &&
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

other case: body mode confgiured, but trailers received.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

trailer mode set, but EoS seen in headers or Body already

Copy link
Contributor Author

@yanjunxiang-google yanjunxiang-google Oct 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added the cases you mentioned in the TBD list in the PR description, please take a look. The goal of this PR is to get the normal case optimization working, then we can deal with the corner cases in the future enhancement. BTW, if some corner cases are missed in the decoding path, the stream closing logic added in encodeHeaders() will catch them all.

noExternalProcessInEncoding()) {
last_response = true;
}
break;
case ProcessingResponse::ResponseCase::kRequestBody:
if (isLastBodyResponse(decoding_state_, *response) && noExternalProcessInEncoding()) {
last_response = true;
}
break;
case ProcessingResponse::ResponseCase::kRequestTrailers:
if (noExternalProcessInEncoding()) {
last_response = true;
}
break;
case ProcessingResponse::ResponseCase::kResponseHeaders:
if (encoding_state_.hasNoBody() ||
(encoding_state_.bodyMode() == ProcessingMode::NONE && !encoding_state_.sendTrailers())) {
last_response = true;
}
break;
case ProcessingResponse::ResponseCase::kResponseBody:
if (isLastBodyResponse(encoding_state_, *response)) {
last_response = true;
}
break;
case ProcessingResponse::ResponseCase::kResponseTrailers:
last_response = true;
break;
case ProcessingResponse::ResponseCase::kImmediateResponse:
// Immediate response currently may close the stream immediately.
// Leave it as it is for now.
break;
default:
break;
}

if (last_response) {
ENVOY_STREAM_LOG(debug, "Closing gRPC stream after receiving last response",
*decoder_callbacks_);
closeStreamMaybeGraceful();
}
}

void Filter::onReceiveMessage(std::unique_ptr<ProcessingResponse>&& r) {

if (config_->observabilityMode()) {
Expand Down Expand Up @@ -1616,6 +1718,10 @@ void Filter::onReceiveMessage(std::unique_ptr<ProcessingResponse>&& r) {

ENVOY_STREAM_LOG(debug, "Received {} response", *decoder_callbacks_,
responseCaseToString(response->response_case()));

// Close the gRPC stream if this is the last response message.
closeGrpcStreamIfLastRespReceived(response);

absl::Status processing_status;
switch (response->response_case()) {
case ProcessingResponse::ResponseCase::kRequestHeaders:
Expand Down Expand Up @@ -1656,11 +1762,7 @@ void Filter::onReceiveMessage(std::unique_ptr<ProcessingResponse>&& r) {
ENVOY_STREAM_LOG(debug, "Sending immediate response", *decoder_callbacks_);
processing_complete_ = true;
onFinishProcessorCalls(Grpc::Status::Ok);
if (config_->gracefulGrpcClose()) {
halfCloseAndWaitForRemoteClose();
} else {
closeStream();
}
closeStreamMaybeGraceful();
if (on_processing_response_) {
on_processing_response_->afterReceivingImmediateResponse(
response->immediate_response(), absl::OkStatus(), decoder_callbacks_->streamInfo());
Expand Down Expand Up @@ -1739,6 +1841,10 @@ void Filter::onGrpcClose() { onGrpcCloseWithStatus(Grpc::Status::Aborted); }
void Filter::onGrpcCloseWithStatus(Grpc::Status::GrpcStatus status) {
ENVOY_STREAM_LOG(debug, "Received gRPC stream close", *decoder_callbacks_);

if (processing_complete_) {
return;
}

processing_complete_ = true;
stats_.streams_closed_.inc();
// Successful close. We can ignore the stream for the rest of our request
Expand Down
10 changes: 10 additions & 0 deletions source/extensions/filters/http/ext_proc/ext_proc.h
Original file line number Diff line number Diff line change
Expand Up @@ -617,6 +617,16 @@ class Filter : public Logger::Loggable<Logger::Id::ext_proc>,
Extensions::Filters::Common::Expr::BuilderInstanceSharedConstPtr builder,
Server::Configuration::CommonFactoryContext& context);

// Gracefully close the gRPC stream based on configuration.
void closeStreamMaybeGraceful();

// Check whether external processing is configured in the encoding path.
bool noExternalProcessInEncoding() const;

// Close the gRPC stream if the last ProcessingResponse is received.
void closeGrpcStreamIfLastRespReceived(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit, you could use "const ProcessingResponse&" here to save some dereferences.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

const std::unique_ptr<envoy::service::ext_proc::v3::ProcessingResponse>& response);

const FilterConfigSharedPtr config_;
const ClientBasePtr client_;
const ExtProcFilterStats& stats_;
Expand Down
2 changes: 2 additions & 0 deletions source/extensions/filters/http/ext_proc/processor_state.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ class ChunkQueue {
QueuedChunkPtr pop(Buffer::OwnedImpl& out_data);
const QueuedChunk& consolidate();
Buffer::OwnedImpl& receivedData() { return received_data_; }
const std::deque<QueuedChunkPtr>& queue() const { return queue_; }

private:
std::deque<QueuedChunkPtr> queue_;
Expand Down Expand Up @@ -100,6 +101,7 @@ class ProcessorState : public Logger::Loggable<Logger::Id::ext_proc> {

bool completeBodyAvailable() const { return complete_body_available_; }
void setCompleteBodyAvailable(bool d) { complete_body_available_ = d; }
bool hasNoBody() const { return no_body_; }
void setHasNoBody(bool b) { no_body_ = b; }
bool bodyReplaced() const { return body_replaced_; }
bool bodyReceived() const { return body_received_; }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -447,6 +447,40 @@ TEST_P(ExtProcIntegrationTest, NoneToFullDuplexMoreDataAfterModeOverride) {
verifyDownstreamResponse(*response, 200);
}

TEST_P(ExtProcIntegrationTest, ServerWaitforEnvoyHalfCloseThenCloseStream) {
proto_config_.mutable_processing_mode()->set_request_body_mode(
ProcessingMode::FULL_DUPLEX_STREAMED);
proto_config_.mutable_processing_mode()->set_request_trailer_mode(ProcessingMode::SEND);
proto_config_.mutable_processing_mode()->set_response_header_mode(ProcessingMode::SKIP);
initializeConfig();
HttpIntegrationTest::initialize();
auto response = sendDownstreamRequestWithBody("foo", absl::nullopt);

processRequestHeadersMessage(*grpc_upstreams_[0], true,
[](const HttpHeaders& headers, HeadersResponse&) {
EXPECT_FALSE(headers.end_of_stream());
return true;
});
processRequestBodyMessage(
*grpc_upstreams_[0], false, [](const HttpBody& body, BodyResponse& resp) {
EXPECT_TRUE(body.end_of_stream());
EXPECT_EQ(body.body().size(), 3);
auto* streamed_response =
resp.mutable_response()->mutable_body_mutation()->mutable_streamed_response();
streamed_response->set_body("bar");
streamed_response->set_end_of_stream(true);
return true;
});

// Envoy half closes the stream.
EXPECT_TRUE(processor_stream_->waitForReset());
// Server closes the stream.
processor_stream_->finishGrpcStream(Grpc::Status::Ok);

handleUpstreamRequest();
verifyDownstreamResponse(*response, 200);
}

} // namespace ExternalProcessing
} // namespace HttpFilters
} // namespace Extensions
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -445,15 +445,12 @@ TEST_P(ExtProcIntegrationTest, OnlyRequestHeadersServerHalfClosesFirst) {
mut1->mutable_header()->set_raw_value("new");
return true;
});
// ext_proc is configured to only send request headers. In this case, server indicates that it is
// not expecting any more messages from ext_proc filter and half-closes the stream.
processor_stream_->finishGrpcStream(Grpc::Status::Ok);

// ext_proc will immediately close side stream in this case, because by default Envoy gRPC client
// will reset the stream if the server half-closes before the client. Note that the ext_proc
// filter has not yet half-closed the sidestream, since it is doing it during its destruction.
// This is expected behavior for gRPC protocol.
// Envoy closes the side stream in this case.
EXPECT_TRUE(processor_stream_->waitForReset());
// ext_proc server indicates that it is not expecting any more messages
// from ext_proc filter and half-closes the stream.
processor_stream_->finishGrpcStream(Grpc::Status::Ok);

ASSERT_TRUE(fake_upstreams_[0]->waitForHttpConnection(*dispatcher_, fake_upstream_connection_));
ASSERT_TRUE(fake_upstream_connection_->waitForNewStream(*dispatcher_, upstream_request_));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,13 +167,16 @@ TEST_F(HttpFilterTest, DuplexStreamedBodyProcessingTestNormal) {
processResponseBodyHelper(" EEEEEEE ", want_response_body);
processResponseBodyHelper(" F ", want_response_body);
processResponseBodyHelper(" GGGGGGGGG ", want_response_body);
EXPECT_EQ(0, config_->stats().streams_closed_.value());
processResponseBodyHelper(" HH ", want_response_body, true, true);
EXPECT_EQ(1, config_->stats().streams_closed_.value());

// The two buffers should match.
EXPECT_EQ(want_response_body.toString(), got_response_body.toString());
EXPECT_FALSE(encoding_watermarked);
EXPECT_EQ(config_->stats().spurious_msgs_received_.value(), 0);
filter_->onDestroy();
EXPECT_EQ(1, config_->stats().streams_closed_.value());
}

TEST_F(HttpFilterTest, DuplexStreamedBodyProcessingTestWithTrailer) {
Expand Down Expand Up @@ -220,14 +223,16 @@ TEST_F(HttpFilterTest, DuplexStreamedBodyProcessingTestWithTrailer) {

processResponseBodyStreamedAfterTrailer(" AAAAA ", want_response_body);
processResponseBodyStreamedAfterTrailer(" BBBB ", want_response_body);
EXPECT_EQ(0, config_->stats().streams_closed_.value());
processResponseTrailers(absl::nullopt, true);

EXPECT_EQ(1, config_->stats().streams_closed_.value());
// The two buffers should match.
EXPECT_EQ(want_response_body.toString(), got_response_body.toString());
EXPECT_FALSE(encoding_watermarked);

EXPECT_EQ(config_->stats().spurious_msgs_received_.value(), 0);
filter_->onDestroy();
EXPECT_EQ(1, config_->stats().streams_closed_.value());
}

TEST_F(HttpFilterTest, DuplexStreamedBodyProcessingTestWithHeaderAndTrailer) {
Expand Down
Loading