diff --git a/changelogs/current.yaml b/changelogs/current.yaml index 91aa071b201fa..ac687e6f49a88 100644 --- a/changelogs/current.yaml +++ b/changelogs/current.yaml @@ -21,6 +21,11 @@ minor_behavior_changes: ` fails. Removing request-specific details allows grouping by similar failure types. Detailed messages remain available in debug logs. +- area: ext_proc + change: | + Closing the gRPC stream if Envoy detects no more external processing needed. + This behavior can be reverted by setting the runtime guard + ``envoy.reloadable_features.ext_proc_stream_close_optimization`` to ``false``. - area: mobile change: | Use mobile specific network observer registries to propagate network change signals. This behavior can be reverted by diff --git a/source/common/runtime/runtime_features.cc b/source/common/runtime/runtime_features.cc index 4afe4197c3651..344737a659a0a 100644 --- a/source/common/runtime/runtime_features.cc +++ b/source/common/runtime/runtime_features.cc @@ -42,6 +42,7 @@ RUNTIME_GUARD(envoy_reloadable_features_enable_cel_regex_precompilation); RUNTIME_GUARD(envoy_reloadable_features_enable_compression_bomb_protection); RUNTIME_GUARD(envoy_reloadable_features_enable_new_query_param_present_match_behavior); RUNTIME_GUARD(envoy_reloadable_features_ext_proc_fail_close_spurious_resp); +RUNTIME_GUARD(envoy_reloadable_features_ext_proc_stream_close_optimization); RUNTIME_GUARD(envoy_reloadable_features_generic_proxy_codec_buffer_limit); RUNTIME_GUARD(envoy_reloadable_features_grpc_side_stream_flow_control); RUNTIME_GUARD(envoy_reloadable_features_http1_balsa_delay_reset); diff --git a/source/extensions/filters/http/ext_proc/ext_proc.cc b/source/extensions/filters/http/ext_proc/ext_proc.cc index 84305210d6d2b..769b96eb6a6ee 100644 --- a/source/extensions/filters/http/ext_proc/ext_proc.cc +++ b/source/extensions/filters/http/ext_proc/ext_proc.cc @@ -693,6 +693,16 @@ void Filter::deferredCloseStream() { config_->threadLocalStreamManager().deferredErase(stream_, filter_callbacks_->dispatcher()); } +void Filter::closeStreamMaybeGraceful() { + processing_complete_ = true; + if (config_->gracefulGrpcClose()) { + halfCloseAndWaitForRemoteClose(); + } else { + // Perform immediate close on the stream otherwise. + closeStream(); + } +} + void Filter::onDestroy() { ENVOY_STREAM_LOG(debug, "onDestroy", *decoder_callbacks_); // Make doubly-sure we no longer use the stream, as @@ -720,12 +730,7 @@ void Filter::onDestroy() { // Second, perform stream deferred closure. deferredCloseStream(); } else { - if (config_->gracefulGrpcClose()) { - halfCloseAndWaitForRemoteClose(); - } else { - // Perform immediate close on the stream otherwise. - closeStream(); - } + closeStreamMaybeGraceful(); } } @@ -1239,6 +1244,13 @@ FilterHeadersStatus Filter::encodeHeaders(ResponseHeaderMap& headers, bool end_s if (!processing_complete_ && encoding_state_.shouldRemoveContentLength()) { headers.removeContentLength(); } + + // If there is no external processing configured in the encoding path, + // closing the gRPC stream if it is still open. + if (encoding_state_.noExternalProcess()) { + closeStreamMaybeGraceful(); + } + return status; } @@ -1525,8 +1537,97 @@ ProcessingMode effectiveModeOverride(const ProcessingMode& target_override, return mode_override; } +// Returns true if this body response is the last message in the current direction (request or +// response path). This means no further body chunks or trailers are expected in this direction. +// For now, such check is only done for STREAMED or FULL_DUPLEX_STREAMED body mode. For any +// other body mode, it always return false. +bool isLastBodyResponse(ProcessorState& state, + const envoy::service::ext_proc::v3::BodyResponse& body_response) { + switch (state.bodyMode()) { + case ProcessingMode::BUFFERED: + case ProcessingMode::BUFFERED_PARTIAL: + // TODO: - skip stream closing optimization for BUFFERED and BUFFERED_PARTIAL for now. + break; + case ProcessingMode::STREAMED: + if (!state.chunkQueue().empty()) { + return state.chunkQueue().queue().front()->end_stream; + } + break; + case ProcessingMode::FULL_DUPLEX_STREAMED: { + if (body_response.has_response() && body_response.response().has_body_mutation()) { + const auto& body_mutation = body_response.response().body_mutation(); + if (body_mutation.has_streamed_response()) { + return body_mutation.streamed_response().end_of_stream(); + } + } + break; + } + default: + break; + } + return false; +} + } // namespace +void Filter::closeGrpcStreamIfLastRespReceived(const ProcessingResponse& response, + const bool is_last_body_resp) { + // Bail out if the gRPC stream has already been closed. This can happen in scenarios + // like immediate responses or rejected header mutations. + if (stream_ == nullptr || !Runtime::runtimeFeatureEnabled( + "envoy.reloadable_features.ext_proc_stream_close_optimization")) { + return; + } + + bool last_response = false; + + switch (response.response_case()) { + case ProcessingResponse::ResponseCase::kRequestHeaders: + if ((decoding_state_.hasNoBody() || + (decoding_state_.bodyMode() == ProcessingMode::NONE && !decoding_state_.sendTrailers())) && + encoding_state_.noExternalProcess()) { + last_response = true; + } + break; + case ProcessingResponse::ResponseCase::kRequestBody: + if (is_last_body_resp && encoding_state_.noExternalProcess()) { + last_response = true; + } + break; + case ProcessingResponse::ResponseCase::kRequestTrailers: + if (encoding_state_.noExternalProcess()) { + last_response = true; + } + break; + case ProcessingResponse::ResponseCase::kResponseHeaders: + if (encoding_state_.hasNoBody() || + (encoding_state_.bodyMode() == ProcessingMode::NONE && !encoding_state_.sendTrailers())) { + last_response = true; + } + break; + case ProcessingResponse::ResponseCase::kResponseBody: + if (is_last_body_resp) { + last_response = true; + } + break; + case ProcessingResponse::ResponseCase::kResponseTrailers: + last_response = true; + break; + case ProcessingResponse::ResponseCase::kImmediateResponse: + // Immediate response currently may close the stream immediately. + // Leave it as it is for now. + break; + default: + break; + } + + if (last_response) { + ENVOY_STREAM_LOG(debug, "Closing gRPC stream after receiving last response", + *decoder_callbacks_); + closeStreamMaybeGraceful(); + } +} + void Filter::onReceiveMessage(std::unique_ptr&& r) { if (config_->observabilityMode()) { @@ -1594,6 +1695,8 @@ void Filter::onReceiveMessage(std::unique_ptr&& r) { ENVOY_STREAM_LOG(debug, "Received {} response", *decoder_callbacks_, responseCaseToString(response->response_case())); + + bool is_last_body_resp = false; absl::Status processing_status; switch (response->response_case()) { case ProcessingResponse::ResponseCase::kRequestHeaders: @@ -1605,10 +1708,12 @@ void Filter::onReceiveMessage(std::unique_ptr&& r) { processing_status = encoding_state_.handleHeadersResponse(response->response_headers()); break; case ProcessingResponse::ResponseCase::kRequestBody: + is_last_body_resp = isLastBodyResponse(decoding_state_, response->request_body()); setDecoderDynamicMetadata(*response); processing_status = decoding_state_.handleBodyResponse(response->request_body()); break; case ProcessingResponse::ResponseCase::kResponseBody: + is_last_body_resp = isLastBodyResponse(encoding_state_, response->response_body()); setEncoderDynamicMetadata(*response); processing_status = encoding_state_.handleBodyResponse(response->response_body()); break; @@ -1634,11 +1739,7 @@ void Filter::onReceiveMessage(std::unique_ptr&& r) { ENVOY_STREAM_LOG(debug, "Sending immediate response", *decoder_callbacks_); processing_complete_ = true; onFinishProcessorCalls(Grpc::Status::Ok); - if (config_->gracefulGrpcClose()) { - halfCloseAndWaitForRemoteClose(); - } else { - closeStream(); - } + closeStreamMaybeGraceful(); if (on_processing_response_) { on_processing_response_->afterReceivingImmediateResponse( response->immediate_response(), absl::OkStatus(), decoder_callbacks_->streamInfo()); @@ -1682,6 +1783,9 @@ void Filter::onReceiveMessage(std::unique_ptr&& r) { stats_.stream_msgs_received_.inc(); handleErrorResponse(processing_status); } + + // Close the gRPC stream if no more external processing needed. + closeGrpcStreamIfLastRespReceived(*response, is_last_body_resp); } void Filter::onGrpcError(Grpc::Status::GrpcStatus status, const std::string& message) { @@ -1717,6 +1821,10 @@ void Filter::onGrpcClose() { onGrpcCloseWithStatus(Grpc::Status::Aborted); } void Filter::onGrpcCloseWithStatus(Grpc::Status::GrpcStatus status) { ENVOY_STREAM_LOG(debug, "Received gRPC stream close", *decoder_callbacks_); + if (processing_complete_) { + return; + } + processing_complete_ = true; stats_.streams_closed_.inc(); // Successful close. We can ignore the stream for the rest of our request diff --git a/source/extensions/filters/http/ext_proc/ext_proc.h b/source/extensions/filters/http/ext_proc/ext_proc.h index 84c4547c2b06c..0c880d9d20ba2 100644 --- a/source/extensions/filters/http/ext_proc/ext_proc.h +++ b/source/extensions/filters/http/ext_proc/ext_proc.h @@ -590,6 +590,15 @@ class Filter : public Logger::Loggable, Extensions::Filters::Common::Expr::BuilderInstanceSharedConstPtr builder, Server::Configuration::CommonFactoryContext& context); + // Gracefully close the gRPC stream based on configuration. + void closeStreamMaybeGraceful(); + + // Closing the gRPC stream if the last ProcessingResponse is received. + // This stream closing optimization only applies to STREAMED or FULL_DUPLEX_STREAMED body modes. + // For other body modes like BUFFERED or BUFFERED_PARTIAL, it is ignored. + void closeGrpcStreamIfLastRespReceived(const ProcessingResponse& response, + const bool is_last_body_resp); + const FilterConfigSharedPtr config_; const ClientBasePtr client_; const ExtProcFilterStats& stats_; diff --git a/source/extensions/filters/http/ext_proc/processor_state.h b/source/extensions/filters/http/ext_proc/processor_state.h index 86e12fb55e791..48fa47fd87080 100644 --- a/source/extensions/filters/http/ext_proc/processor_state.h +++ b/source/extensions/filters/http/ext_proc/processor_state.h @@ -45,6 +45,7 @@ class ChunkQueue { QueuedChunkPtr pop(Buffer::OwnedImpl& out_data); const QueuedChunk& consolidate(); Buffer::OwnedImpl& receivedData() { return received_data_; } + const std::deque& queue() const { return queue_; } private: std::deque queue_; @@ -100,6 +101,7 @@ class ProcessorState : public Logger::Loggable { bool completeBodyAvailable() const { return complete_body_available_; } void setCompleteBodyAvailable(bool d) { complete_body_available_ = d; } + bool hasNoBody() const { return no_body_; } void setHasNoBody(bool b) { no_body_ = b; } bool bodyReplaced() const { return body_replaced_; } bool bodyReceived() const { return body_received_; } @@ -593,6 +595,12 @@ class EncodingProcessorState : public ProcessorState { return mgr.evaluateResponseAttributes(activation); } + // Check whether external processing is configured in the encoding path. + bool noExternalProcess() const { + return (!send_headers_ && !send_trailers_ && + body_mode_ == envoy::extensions::filters::http::ext_proc::v3::ProcessingMode::NONE); + } + private: void setProcessingModeInternal( const envoy::extensions::filters::http::ext_proc::v3::ProcessingMode& mode); diff --git a/test/extensions/filters/http/ext_proc/ext_proc_full_duplex_integration_test.cc b/test/extensions/filters/http/ext_proc/ext_proc_full_duplex_integration_test.cc index b90eae0085286..42e7bba606601 100644 --- a/test/extensions/filters/http/ext_proc/ext_proc_full_duplex_integration_test.cc +++ b/test/extensions/filters/http/ext_proc/ext_proc_full_duplex_integration_test.cc @@ -447,6 +447,39 @@ TEST_P(ExtProcIntegrationTest, NoneToFullDuplexMoreDataAfterModeOverride) { verifyDownstreamResponse(*response, 200); } +TEST_P(ExtProcIntegrationTest, ServerWaitforEnvoyHalfCloseThenCloseStream) { + scoped_runtime_.mergeValues({{"envoy.reloadable_features.ext_proc_graceful_grpc_close", "true"}}); + proto_config_.mutable_processing_mode()->set_request_body_mode( + ProcessingMode::FULL_DUPLEX_STREAMED); + proto_config_.mutable_processing_mode()->set_request_trailer_mode(ProcessingMode::SEND); + proto_config_.mutable_processing_mode()->set_response_header_mode(ProcessingMode::SKIP); + initializeConfig(); + HttpIntegrationTest::initialize(); + auto response = sendDownstreamRequestWithBody("foo", absl::nullopt); + + processRequestHeadersMessage(*grpc_upstreams_[0], true, + [](const HttpHeaders& headers, HeadersResponse&) { + EXPECT_FALSE(headers.end_of_stream()); + return true; + }); + processRequestBodyMessage( + *grpc_upstreams_[0], false, [](const HttpBody& body, BodyResponse& resp) { + EXPECT_TRUE(body.end_of_stream()); + EXPECT_EQ(body.body().size(), 3); + auto* streamed_response = + resp.mutable_response()->mutable_body_mutation()->mutable_streamed_response(); + streamed_response->set_body("bar"); + streamed_response->set_end_of_stream(true); + return true; + }); + + // Server closes the stream. + processor_stream_->finishGrpcStream(Grpc::Status::Ok); + + handleUpstreamRequest(); + verifyDownstreamResponse(*response, 200); +} + } // namespace ExternalProcessing } // namespace HttpFilters } // namespace Extensions diff --git a/test/extensions/filters/http/ext_proc/ext_proc_integration_test.cc b/test/extensions/filters/http/ext_proc/ext_proc_integration_test.cc index fd6d0033db046..48db27fae482a 100644 --- a/test/extensions/filters/http/ext_proc/ext_proc_integration_test.cc +++ b/test/extensions/filters/http/ext_proc/ext_proc_integration_test.cc @@ -445,15 +445,12 @@ TEST_P(ExtProcIntegrationTest, OnlyRequestHeadersServerHalfClosesFirst) { mut1->mutable_header()->set_raw_value("new"); return true; }); - // ext_proc is configured to only send request headers. In this case, server indicates that it is - // not expecting any more messages from ext_proc filter and half-closes the stream. - processor_stream_->finishGrpcStream(Grpc::Status::Ok); - // ext_proc will immediately close side stream in this case, because by default Envoy gRPC client - // will reset the stream if the server half-closes before the client. Note that the ext_proc - // filter has not yet half-closed the sidestream, since it is doing it during its destruction. - // This is expected behavior for gRPC protocol. + // Envoy closes the side stream in this case. EXPECT_TRUE(processor_stream_->waitForReset()); + // ext_proc server indicates that it is not expecting any more messages + // from ext_proc filter and half-closes the stream. + processor_stream_->finishGrpcStream(Grpc::Status::Ok); ASSERT_TRUE(fake_upstreams_[0]->waitForHttpConnection(*dispatcher_, fake_upstream_connection_)); ASSERT_TRUE(fake_upstream_connection_->waitForNewStream(*dispatcher_, upstream_request_)); diff --git a/test/extensions/filters/http/ext_proc/filter_full_duplex_test.cc b/test/extensions/filters/http/ext_proc/filter_full_duplex_test.cc index 7c6d6d33b4c12..32fe0338f1133 100644 --- a/test/extensions/filters/http/ext_proc/filter_full_duplex_test.cc +++ b/test/extensions/filters/http/ext_proc/filter_full_duplex_test.cc @@ -167,13 +167,16 @@ TEST_F(HttpFilterTest, DuplexStreamedBodyProcessingTestNormal) { processResponseBodyHelper(" EEEEEEE ", want_response_body); processResponseBodyHelper(" F ", want_response_body); processResponseBodyHelper(" GGGGGGGGG ", want_response_body); + EXPECT_EQ(0, config_->stats().streams_closed_.value()); processResponseBodyHelper(" HH ", want_response_body, true, true); + EXPECT_EQ(1, config_->stats().streams_closed_.value()); // The two buffers should match. EXPECT_EQ(want_response_body.toString(), got_response_body.toString()); EXPECT_FALSE(encoding_watermarked); EXPECT_EQ(config_->stats().spurious_msgs_received_.value(), 0); filter_->onDestroy(); + EXPECT_EQ(1, config_->stats().streams_closed_.value()); } TEST_F(HttpFilterTest, DuplexStreamedBodyProcessingTestWithTrailer) { @@ -220,14 +223,16 @@ TEST_F(HttpFilterTest, DuplexStreamedBodyProcessingTestWithTrailer) { processResponseBodyStreamedAfterTrailer(" AAAAA ", want_response_body); processResponseBodyStreamedAfterTrailer(" BBBB ", want_response_body); + EXPECT_EQ(0, config_->stats().streams_closed_.value()); processResponseTrailers(absl::nullopt, true); - + EXPECT_EQ(1, config_->stats().streams_closed_.value()); // The two buffers should match. EXPECT_EQ(want_response_body.toString(), got_response_body.toString()); EXPECT_FALSE(encoding_watermarked); EXPECT_EQ(config_->stats().spurious_msgs_received_.value(), 0); filter_->onDestroy(); + EXPECT_EQ(1, config_->stats().streams_closed_.value()); } TEST_F(HttpFilterTest, DuplexStreamedBodyProcessingTestWithHeaderAndTrailer) { diff --git a/test/extensions/filters/http/ext_proc/filter_test.cc b/test/extensions/filters/http/ext_proc/filter_test.cc index 168657d27d0d5..94760f4734052 100644 --- a/test/extensions/filters/http/ext_proc/filter_test.cc +++ b/test/extensions/filters/http/ext_proc/filter_test.cc @@ -125,6 +125,7 @@ TEST_F(HttpFilterTest, SimplestPost) { {"x-some-other-header", "yes"}}; EXPECT_THAT(header_req.headers(), HeaderProtosEqual(expected)); }); + EXPECT_EQ(0, config_->stats().streams_closed_.value()); Buffer::OwnedImpl req_data("foo"); EXPECT_EQ(FilterDataStatus::Continue, filter_->decodeData(req_data, true)); @@ -144,11 +145,13 @@ TEST_F(HttpFilterTest, SimplestPost) { EXPECT_THAT(header_resp.headers(), HeaderProtosEqual(expected_response)); }); + EXPECT_EQ(1, config_->stats().streams_closed_.value()); Buffer::OwnedImpl resp_data("foo"); EXPECT_EQ(FilterDataStatus::Continue, filter_->encodeData(resp_data, false)); Buffer::OwnedImpl empty_data; EXPECT_EQ(FilterDataStatus::Continue, filter_->encodeData(empty_data, true)); EXPECT_EQ(FilterTrailersStatus::Continue, filter_->encodeTrailers(response_trailers_)); + filter_->onDestroy(); EXPECT_EQ(1, config_->stats().streams_started_.value()); @@ -378,6 +381,7 @@ TEST_F(HttpFilterTest, PostAndRespondImmediately) { hdr3->mutable_header()->set_key("x-another-thing"); hdr3->mutable_header()->set_raw_value("2"); stream_callbacks_->onReceiveMessage(std::move(resp1)); + EXPECT_EQ(1, config_->stats().streams_closed_.value()); TestResponseHeaderMapImpl expected_response_headers{ {"content-type", "text/plain"}, {"x-another-thing", "1"}, {"x-another-thing", "2"}}; @@ -1339,29 +1343,38 @@ TEST_F(HttpFilterTest, StreamingSendDataRandomGrpcLatency) { EXPECT_EQ(FilterDataStatus::Continue, filter_->decodeData(req_data, false)); EXPECT_TRUE(last_request_.has_protocol_config()); processRequestBody(absl::nullopt, false, std::chrono::microseconds(50)); + EXPECT_EQ(0, config_->stats().streams_closed_.value()); EXPECT_EQ(FilterDataStatus::Continue, filter_->decodeData(req_data, false)); EXPECT_FALSE(last_request_.has_protocol_config()); processRequestBody(absl::nullopt, false, std::chrono::microseconds(80)); + EXPECT_EQ(0, config_->stats().streams_closed_.value()); EXPECT_EQ(FilterDataStatus::Continue, filter_->decodeData(req_data, false)); EXPECT_FALSE(last_request_.has_protocol_config()); processRequestBody(absl::nullopt, false, std::chrono::microseconds(60)); + EXPECT_EQ(0, config_->stats().streams_closed_.value()); EXPECT_EQ(FilterDataStatus::Continue, filter_->decodeData(req_data, false)); EXPECT_FALSE(last_request_.has_protocol_config()); processRequestBody(absl::nullopt, false, std::chrono::microseconds(30)); + EXPECT_EQ(0, config_->stats().streams_closed_.value()); EXPECT_EQ(FilterDataStatus::Continue, filter_->decodeData(req_data, false)); EXPECT_FALSE(last_request_.has_protocol_config()); processRequestBody(absl::nullopt, false, std::chrono::microseconds(100)); + EXPECT_EQ(0, config_->stats().streams_closed_.value()); EXPECT_EQ(FilterTrailersStatus::Continue, filter_->decodeTrailers(request_trailers_)); response_headers_.addCopy(LowerCaseString(":status"), "200"); + EXPECT_EQ(0, config_->stats().streams_closed_.value()); + EXPECT_EQ(FilterHeadersStatus::Continue, filter_->encodeHeaders(response_headers_, false)); + EXPECT_EQ(1, config_->stats().streams_closed_.value()); Buffer::OwnedImpl resp_data("bar"); EXPECT_EQ(FilterDataStatus::Continue, filter_->encodeData(resp_data, false)); EXPECT_EQ(FilterTrailersStatus::Continue, filter_->encodeTrailers(response_trailers_)); + EXPECT_EQ(1, config_->stats().streams_closed_.value()); filter_->onDestroy(); EXPECT_EQ(1, config_->stats().streams_started_.value()); @@ -1426,6 +1439,7 @@ TEST_F(HttpFilterTest, PostStreamingBodies) { processRequestBody(absl::nullopt); EXPECT_EQ(want_request_body.toString(), got_request_body.toString()); EXPECT_FALSE(decoding_watermarked); + EXPECT_EQ(0, config_->stats().streams_closed_.value()); response_headers_.addCopy(LowerCaseString(":status"), "200"); response_headers_.addCopy(LowerCaseString("content-type"), "text/plain"); @@ -1435,6 +1449,7 @@ TEST_F(HttpFilterTest, PostStreamingBodies) { setUpEncodingWatermarking(encoding_watermarked); EXPECT_CALL(encoder_callbacks_, encodingBuffer()).WillRepeatedly(Return(nullptr)); EXPECT_EQ(FilterHeadersStatus::StopIteration, filter_->encodeHeaders(response_headers_, false)); + EXPECT_EQ(0, config_->stats().streams_closed_.value()); EXPECT_FALSE(last_request_.has_protocol_config()); processResponseHeaders(false, absl::nullopt); // Test content-length header is removed in response in streamed mode. @@ -1455,10 +1470,12 @@ TEST_F(HttpFilterTest, PostStreamingBodies) { EXPECT_FALSE(last_request_.has_protocol_config()); processResponseBody(absl::nullopt, false); } + EXPECT_EQ(0, config_->stats().streams_closed_.value()); Buffer::OwnedImpl last_resp_chunk; EXPECT_EQ(FilterDataStatus::StopIterationNoBuffer, filter_->encodeData(last_resp_chunk, true)); processResponseBody(absl::nullopt, true); + EXPECT_EQ(1, config_->stats().streams_closed_.value()); // At this point, since we injected the data from each chunk after the "encodeData" // callback, and since we also injected any chunks inserted using "injectEncodedData," @@ -3121,7 +3138,6 @@ TEST_F(HttpFilterTest, FailOnInvalidHeaderMutations) { )EOF"); EXPECT_EQ(FilterHeadersStatus::StopIteration, filter_->decodeHeaders(request_headers_, false)); - EXPECT_CALL(decoder_callbacks_, continueDecoding()); TestResponseHeaderMapImpl immediate_response_headers; EXPECT_CALL(encoder_callbacks_, @@ -4972,6 +4988,30 @@ TEST_F(HttpFilterTest, DontSaveProcessingResponse) { checkGrpcCallStatsAll(envoy::config::core::v3::TrafficDirection::INBOUND, chunk_number); checkGrpcCallStatsAll(envoy::config::core::v3::TrafficDirection::OUTBOUND, 2 * chunk_number); } + +TEST_F(HttpFilterTest, CloseStreamOnRequestHeaders) { + initialize(R"EOF( + grpc_service: + envoy_grpc: + cluster_name: "ext_proc_server" + processing_mode: + request_header_mode: SEND + response_header_mode: SKIP + request_body_mode: NONE + response_body_mode: NONE + request_trailer_mode: SKIP + response_trailer_mode: SKIP + )EOF"); + + EXPECT_EQ(FilterHeadersStatus::StopIteration, filter_->decodeHeaders(request_headers_, true)); + // The next response should be the last, so expect the stream to be closed. + processRequestHeaders(true, [](const HttpHeaders&, ProcessingResponse&, HeadersResponse&) {}); + EXPECT_EQ(1, config_->stats().stream_msgs_sent_.value()); + EXPECT_EQ(1, config_->stats().stream_msgs_received_.value()); + EXPECT_EQ(1, config_->stats().streams_closed_.value()); + filter_->onDestroy(); +} + } // namespace } // namespace ExternalProcessing } // namespace HttpFilters