From 24f4ffa5da0bb77c32d7d27bc309ab7d4719536f Mon Sep 17 00:00:00 2001 From: Melissa Ginaldi Date: Thu, 16 Oct 2025 16:00:02 +0000 Subject: [PATCH 1/5] Add bits to log ImmediateResoonse and ContinueAndReplace occurrences in FilterState Signed-off-by: Melissa Ginaldi --- .../filters/http/ext_proc/ext_proc.cc | 13 ++- .../filters/http/ext_proc/ext_proc.h | 9 +- .../filters/http/ext_proc/processor_state.cc | 9 +- .../filters/http/ext_proc/processor_state.h | 2 +- .../ext_proc/ext_proc_integration_test.cc | 104 ++++++++++++++++++ 5 files changed, 129 insertions(+), 8 deletions(-) diff --git a/source/extensions/filters/http/ext_proc/ext_proc.cc b/source/extensions/filters/http/ext_proc/ext_proc.cc index 84305210d6d2b..aaa38ce51241a 100644 --- a/source/extensions/filters/http/ext_proc/ext_proc.cc +++ b/source/extensions/filters/http/ext_proc/ext_proc.cc @@ -280,7 +280,7 @@ FilterConfig::FilterConfig(const ExternalProcessor& config, void ExtProcLoggingInfo::recordGrpcCall( std::chrono::microseconds latency, Grpc::Status::GrpcStatus call_status, ProcessorState::CallbackState callback_state, - envoy::config::core::v3::TrafficDirection traffic_direction) { + envoy::config::core::v3::TrafficDirection traffic_direction, bool continue_and_replace) { ASSERT(callback_state != ProcessorState::CallbackState::Idle); // Record the gRPC call stats for the header. @@ -288,6 +288,9 @@ void ExtProcLoggingInfo::recordGrpcCall( if (grpcCalls(traffic_direction).header_stats_ == nullptr) { grpcCalls(traffic_direction).header_stats_ = std::make_unique(latency, call_status); } + if (continue_and_replace){ + grpcCalls(traffic_direction).continue_and_replace_ = true; + } return; } @@ -318,6 +321,10 @@ void ExtProcLoggingInfo::recordGrpcCall( } } +void ExtProcLoggingInfo::recordImmediateResponse(envoy::config::core::v3::TrafficDirection traffic_direction){ + grpcCalls(traffic_direction).immediate_response_ = true; +} + ExtProcLoggingInfo::GrpcCalls& ExtProcLoggingInfo::grpcCalls(envoy::config::core::v3::TrafficDirection traffic_direction) { ASSERT(traffic_direction != envoy::config::core::v3::TrafficDirection::UNSPECIFIED); @@ -1790,6 +1797,10 @@ void Filter::sendImmediateResponse(const ImmediateResponse& response) { }; sent_immediate_response_ = true; + if (logging_info_ != nullptr){ + logging_info_->recordImmediateResponse(encoding_state_.trafficDirection()); + logging_info_->recordImmediateResponse(decoding_state_.trafficDirection()); + } ENVOY_STREAM_LOG(debug, "Sending local reply with status code {}", *decoder_callbacks_, status_code); const auto details = StringUtil::replaceAllEmptySpace(response.details()); diff --git a/source/extensions/filters/http/ext_proc/ext_proc.h b/source/extensions/filters/http/ext_proc/ext_proc.h index 84c4547c2b06c..58990ad05ea92 100644 --- a/source/extensions/filters/http/ext_proc/ext_proc.h +++ b/source/extensions/filters/http/ext_proc/ext_proc.h @@ -89,13 +89,16 @@ class ExtProcLoggingInfo : public Envoy::StreamInfo::FilterState::Object { std::unique_ptr header_stats_; std::unique_ptr trailer_stats_; std::unique_ptr body_stats_; + bool immediate_response_; + bool continue_and_replace_; }; using GrpcCalls = struct GrpcCallStats; void recordGrpcCall(std::chrono::microseconds latency, Grpc::Status::GrpcStatus call_status, ProcessorState::CallbackState callback_state, - envoy::config::core::v3::TrafficDirection traffic_direction); + envoy::config::core::v3::TrafficDirection traffic_direction, bool continue_and_replace = false); + void recordImmediateResponse(envoy::config::core::v3::TrafficDirection traffic_direction); void setBytesSent(uint64_t bytes_sent) { bytes_sent_ = bytes_sent; } void setBytesReceived(uint64_t bytes_received) { bytes_received_ = bytes_received; } void setClusterInfo(absl::optional cluster_info) { @@ -134,8 +137,8 @@ class ExtProcLoggingInfo : public Envoy::StreamInfo::FilterState::Object { private: GrpcCalls& grpcCalls(envoy::config::core::v3::TrafficDirection traffic_direction); - GrpcCalls decoding_processor_grpc_calls_; - GrpcCalls encoding_processor_grpc_calls_; + GrpcCalls decoding_processor_grpc_calls_{}; + GrpcCalls encoding_processor_grpc_calls_{}; const Envoy::Protobuf::Struct filter_metadata_; // The following stats are populated for ext_proc filters using Envoy gRPC only. // The bytes sent and received are for the entire stream. diff --git a/source/extensions/filters/http/ext_proc/processor_state.cc b/source/extensions/filters/http/ext_proc/processor_state.cc index 38e094d0e19d9..9864d6d957e77 100644 --- a/source/extensions/filters/http/ext_proc/processor_state.cc +++ b/source/extensions/filters/http/ext_proc/processor_state.cc @@ -38,7 +38,7 @@ void ProcessorState::onStartProcessorCall(Event::TimerCb cb, std::chrono::millis } void ProcessorState::onFinishProcessorCall(Grpc::Status::GrpcStatus call_status, - CallbackState next_state) { + CallbackState next_state, bool continue_and_replace) { ENVOY_STREAM_LOG(debug, "Finish external processing call", *filter_callbacks_); filter_.logStreamInfo(); @@ -49,7 +49,7 @@ void ProcessorState::onFinishProcessorCall(Grpc::Status::GrpcStatus call_status, filter_callbacks_->dispatcher().timeSource().monotonicTime() - call_start_time_.value()); ExtProcLoggingInfo* logging_info = filter_.loggingInfo(); if (logging_info != nullptr) { - logging_info->recordGrpcCall(duration, call_status, callback_state_, trafficDirection()); + logging_info->recordGrpcCall(duration, call_status, callback_state_, trafficDirection(), continue_and_replace); } call_start_time_ = absl::nullopt; } @@ -165,10 +165,13 @@ absl::Status ProcessorState::handleHeadersResponse(const HeadersResponse& respon } clearRouteCache(common_response); - onFinishProcessorCall(Grpc::Status::Ok, getCallbackStateAfterHeaderResp(common_response)); + // onFinishProcessorCall(Grpc::Status::Ok, getCallbackStateAfterHeaderResp(common_response)); if (common_response.status() == CommonResponse::CONTINUE_AND_REPLACE) { + onFinishProcessorCall(Grpc::Status::Ok, getCallbackStateAfterHeaderResp(common_response), true); return handleHeaderContinueAndReplace(response); + } else { + onFinishProcessorCall(Grpc::Status::Ok, getCallbackStateAfterHeaderResp(common_response)); } filter_.onProcessHeadersResponse(response, absl::OkStatus(), trafficDirection()); diff --git a/source/extensions/filters/http/ext_proc/processor_state.h b/source/extensions/filters/http/ext_proc/processor_state.h index 86e12fb55e791..612cc61b77fb6 100644 --- a/source/extensions/filters/http/ext_proc/processor_state.h +++ b/source/extensions/filters/http/ext_proc/processor_state.h @@ -149,7 +149,7 @@ class ProcessorState : public Logger::Loggable { void onStartProcessorCall(Event::TimerCb cb, std::chrono::milliseconds timeout, CallbackState callback_state); void onFinishProcessorCall(Grpc::Status::GrpcStatus call_status, - CallbackState next_state = CallbackState::Idle); + CallbackState next_state = CallbackState::Idle, bool continue_and_replace = false); void stopMessageTimer(); bool restartMessageTimer(const uint32_t message_timeout_ms); diff --git a/test/extensions/filters/http/ext_proc/ext_proc_integration_test.cc b/test/extensions/filters/http/ext_proc/ext_proc_integration_test.cc index fd6d0033db046..593e05c0a65aa 100644 --- a/test/extensions/filters/http/ext_proc/ext_proc_integration_test.cc +++ b/test/extensions/filters/http/ext_proc/ext_proc_integration_test.cc @@ -4752,6 +4752,110 @@ TEST_P(ExtProcIntegrationTest, AccessLogExtProcInCompositeFilter) { EXPECT_THAT(log_content, testing::HasSubstr("response_header_latency_us")); } +// Test the ability of the filter to completely replace a request message with a new +// request message. +TEST_P(ExtProcIntegrationTest, ExtProcLoggingInfoContinueAndReplace) { + auto access_log_path = TestEnvironment::temporaryPath(TestUtility::uniqueFilename()); + proto_config_.mutable_processing_mode()->set_request_header_mode(ProcessingMode::SEND); + + config_helper_.addConfigModifier([&](HttpConnectionManager& cm) { + auto* access_log = cm.add_access_log(); + access_log->set_name("accesslog"); + envoy::extensions::access_loggers::file::v3::FileAccessLog access_log_config; + access_log_config.set_path(access_log_path); + auto* json_format = access_log_config.mutable_log_format()->mutable_json_format(); + + // Test field extraction for coverage. + (*json_format->mutable_fields())["field_request_header_effect"].set_string_value( + "%FILTER_STATE(envoy.filters.http.ext_proc:FIELD:request_header_processing_effect)%"); + + access_log->mutable_typed_config()->PackFrom(access_log_config); + }); + initializeConfig(); + HttpIntegrationTest::initialize(); + auto response = sendDownstreamRequestWithBody("Replace this!", absl::nullopt); + processRequestHeadersMessage( + *grpc_upstreams_[0], true, [](const HttpHeaders&, HeadersResponse& headers_resp) { + headers_resp.mutable_response()->mutable_body_mutation()->set_body("Hello, Server!"); + // This special status tells us to replace the whole request + headers_resp.mutable_response()->set_status(CommonResponse::CONTINUE_AND_REPLACE); + return true; + }); + handleUpstreamRequest(); + processResponseHeadersMessage(*grpc_upstreams_[0], false, absl::nullopt); + verifyDownstreamResponse(*response, 200); + + // Ensure that we replaced and did not append to the request. + EXPECT_EQ(upstream_request_->body().toString(), "Hello, Server!"); + std::string log_result = waitForAccessLog(access_log_path, 0, true); + LOG(INFO) << log_result; + // auto json_log = Json::Factory::loadFromString(log_result).value(); + // // 0: NONE, 1: CONTENT_MODIFIED, 2: IMMEDIATE_RESPONSE, 3: CONTINUE_AND_REPLACE + // auto field_request_header_effect = json_log->getString("field_request_header_effect"); + // EXPECT_TRUE(field_request_header_effect.ok()); + // EXPECT_EQ(*field_request_header_effect, "3"); + + cleanupUpstreamAndDownstream(); +} + +// Test immediate_response behavior with STREAMED request body. Even though the +// headers have been processed, an immediate response on a request body chunk +// should still be seen by the downstream. +// TEST_P(ExtProcIntegrationTest, ExtProcLoggingInfoImmediateResponse) { +// auto access_log_path = TestEnvironment::temporaryPath(TestUtility::uniqueFilename()); + +// config_helper_.addConfigModifier([&](HttpConnectionManager& cm) { +// auto* access_log = cm.add_access_log(); +// access_log->set_name("accesslog"); +// envoy::extensions::access_loggers::file::v3::FileAccessLog access_log_config; +// access_log_config.set_path(access_log_path); +// auto* json_format = access_log_config.mutable_log_format()->mutable_json_format(); + +// // Test field extraction for coverage. +// (*json_format->mutable_fields())["field_request_header_effect"].set_string_value( +// "%FILTER_STATE(envoy.filters.http.ext_proc:FIELD:request_header_processing_effect)%"); +// (*json_format->mutable_fields())["field_request_body_effect"].set_string_value( +// "%FILTER_STATE(envoy.filters.http.ext_proc:FIELD:request_body_processing_effect)%"); + +// access_log->mutable_typed_config()->PackFrom(access_log_config); +// }); +// proto_config_.mutable_processing_mode()->set_request_body_mode(ProcessingMode::STREAMED); +// ConfigOptions config_options; +// config_options.add_response_processor = true; +// initializeConfig(config_options); +// HttpIntegrationTest::initialize(); +// auto response = sendDownstreamRequestWithBody("Evil content!", absl::nullopt); +// processRequestHeadersMessage( +// *grpc_upstreams_[0], true, [](const HttpHeaders&, HeadersResponse& resp) { +// auto* hdr = resp.mutable_response()->mutable_header_mutation()->add_set_headers(); +// hdr->mutable_append()->set_value(false); +// hdr->mutable_header()->set_key("foo"); +// hdr->mutable_header()->set_raw_value("bar"); +// return true; +// }); +// processAndRespondImmediately(*grpc_upstreams_[0], false, [](ImmediateResponse& immediate) { +// immediate.mutable_status()->set_code(envoy::type::v3::StatusCode::BadRequest); +// immediate.set_body("{\"reason\": \"Request too evil\"}"); +// immediate.set_details("Failed because I don't like this payload"); +// }); +// // ext_proc will immediately close side stream in this case, which causes it +// // to be reset, since side stream codec had not yet observed server trailers. +// EXPECT_TRUE(processor_stream_->waitForReset()); + +// verifyDownstreamResponse(*response, 400); +// std::string log_result = waitForAccessLog(access_log_path, 0, true); +// auto json_log = Json::Factory::loadFromString(log_result).value(); +// // 0: NONE, 1: CONTENT_MODIFIED, 2: IMMEDIATE_RESPONSE, 3: CONTINUE_AND_REPLACE +// auto field_request_header_effect = json_log->getString("field_request_header_effect"); +// EXPECT_TRUE(field_request_header_effect.ok()); +// EXPECT_EQ(*field_request_header_effect, "1"); + +// auto field_request_body_effect = json_log->getString("field_request_body_effect"); +// EXPECT_TRUE(field_request_body_effect.ok()); +// EXPECT_EQ(*field_request_body_effect, "2"); +// cleanupUpstreamAndDownstream(); +// } + } // namespace ExternalProcessing } // namespace HttpFilters } // namespace Extensions From 99fe9805d29748f56423195798ff92a7b6c5a088 Mon Sep 17 00:00:00 2001 From: Melissa Ginaldi Date: Fri, 17 Oct 2025 17:53:36 +0000 Subject: [PATCH 2/5] Refactor data logging and write unit tests Signed-off-by: Melissa Ginaldi --- .../filters/http/ext_proc/ext_proc.cc | 25 +++- .../filters/http/ext_proc/ext_proc.h | 5 +- .../ext_proc/ext_proc_integration_test.cc | 125 +++++++++--------- 3 files changed, 83 insertions(+), 72 deletions(-) diff --git a/source/extensions/filters/http/ext_proc/ext_proc.cc b/source/extensions/filters/http/ext_proc/ext_proc.cc index aaa38ce51241a..3f36a15062b84 100644 --- a/source/extensions/filters/http/ext_proc/ext_proc.cc +++ b/source/extensions/filters/http/ext_proc/ext_proc.cc @@ -73,6 +73,9 @@ constexpr absl::string_view ResponseTrailerLatencyUsField = "response_trailer_la constexpr absl::string_view ResponseTrailerCallStatusField = "response_trailer_call_status"; constexpr absl::string_view BytesSentField = "bytes_sent"; constexpr absl::string_view BytesReceivedField = "bytes_received"; +constexpr absl::string_view ImmediateResonseField = "immediate_response"; +constexpr absl::string_view RequestHeaderContinueAndReplaceField = "request_header_continue_and_replace"; +constexpr absl::string_view ResponseHeaderContinueAndReplaceField = "response_header_continue_and_replace"; absl::optional initProcessingMode(const ExtProcPerRoute& config) { if (!config.disabled() && config.has_overrides() && config.overrides().has_processing_mode()) { @@ -321,10 +324,6 @@ void ExtProcLoggingInfo::recordGrpcCall( } } -void ExtProcLoggingInfo::recordImmediateResponse(envoy::config::core::v3::TrafficDirection traffic_direction){ - grpcCalls(traffic_direction).immediate_response_ = true; -} - ExtProcLoggingInfo::GrpcCalls& ExtProcLoggingInfo::grpcCalls(envoy::config::core::v3::TrafficDirection traffic_direction) { ASSERT(traffic_direction != envoy::config::core::v3::TrafficDirection::UNSPECIFIED); @@ -398,6 +397,12 @@ ProtobufTypes::MessagePtr ExtProcLoggingInfo::serializeAsProto() const { static_cast(bytes_sent_)); (*struct_msg->mutable_fields())[BytesReceivedField].set_number_value( static_cast(bytes_received_)); + (*struct_msg->mutable_fields())[ImmediateResonseField].set_bool_value( + immediate_response_); + (*struct_msg->mutable_fields())[RequestHeaderContinueAndReplaceField].set_bool_value( + decoding_processor_grpc_calls_.continue_and_replace_); + (*struct_msg->mutable_fields())[ResponseHeaderContinueAndReplaceField].set_bool_value( + encoding_processor_grpc_calls_.continue_and_replace_); return struct_msg; } @@ -502,6 +507,15 @@ ExtProcLoggingInfo::getField(absl::string_view field_name) const { if (field_name == BytesReceivedField) { return static_cast(bytes_received_); } + if (field_name == ImmediateResonseField){ + return bool(immediate_response_); + } + if (field_name == RequestHeaderContinueAndReplaceField){ + return bool(decoding_processor_grpc_calls_.continue_and_replace_); + } + if (field_name == ResponseHeaderContinueAndReplaceField){ + return bool(encoding_processor_grpc_calls_.continue_and_replace_); + } return {}; } @@ -1798,8 +1812,7 @@ void Filter::sendImmediateResponse(const ImmediateResponse& response) { sent_immediate_response_ = true; if (logging_info_ != nullptr){ - logging_info_->recordImmediateResponse(encoding_state_.trafficDirection()); - logging_info_->recordImmediateResponse(decoding_state_.trafficDirection()); + logging_info_->setImmediateResponse(); } ENVOY_STREAM_LOG(debug, "Sending local reply with status code {}", *decoder_callbacks_, status_code); diff --git a/source/extensions/filters/http/ext_proc/ext_proc.h b/source/extensions/filters/http/ext_proc/ext_proc.h index 58990ad05ea92..488d6240d7c41 100644 --- a/source/extensions/filters/http/ext_proc/ext_proc.h +++ b/source/extensions/filters/http/ext_proc/ext_proc.h @@ -89,7 +89,6 @@ class ExtProcLoggingInfo : public Envoy::StreamInfo::FilterState::Object { std::unique_ptr header_stats_; std::unique_ptr trailer_stats_; std::unique_ptr body_stats_; - bool immediate_response_; bool continue_and_replace_; }; @@ -98,7 +97,7 @@ class ExtProcLoggingInfo : public Envoy::StreamInfo::FilterState::Object { void recordGrpcCall(std::chrono::microseconds latency, Grpc::Status::GrpcStatus call_status, ProcessorState::CallbackState callback_state, envoy::config::core::v3::TrafficDirection traffic_direction, bool continue_and_replace = false); - void recordImmediateResponse(envoy::config::core::v3::TrafficDirection traffic_direction); + void setImmediateResponse(){immediate_response_ = true;}; void setBytesSent(uint64_t bytes_sent) { bytes_sent_ = bytes_sent; } void setBytesReceived(uint64_t bytes_received) { bytes_received_ = bytes_received; } void setClusterInfo(absl::optional cluster_info) { @@ -147,6 +146,8 @@ class ExtProcLoggingInfo : public Envoy::StreamInfo::FilterState::Object { Upstream::HostDescriptionConstSharedPtr upstream_host_; // The status details of the underlying HTTP/2 stream. Envoy gRPC only. std::string http_response_code_details_; + // True if an immediate response is sent + bool immediate_response_{false}; }; class ThreadLocalStreamManager; diff --git a/test/extensions/filters/http/ext_proc/ext_proc_integration_test.cc b/test/extensions/filters/http/ext_proc/ext_proc_integration_test.cc index 593e05c0a65aa..4fc8126448ae5 100644 --- a/test/extensions/filters/http/ext_proc/ext_proc_integration_test.cc +++ b/test/extensions/filters/http/ext_proc/ext_proc_integration_test.cc @@ -4764,11 +4764,13 @@ TEST_P(ExtProcIntegrationTest, ExtProcLoggingInfoContinueAndReplace) { envoy::extensions::access_loggers::file::v3::FileAccessLog access_log_config; access_log_config.set_path(access_log_path); auto* json_format = access_log_config.mutable_log_format()->mutable_json_format(); - // Test field extraction for coverage. - (*json_format->mutable_fields())["field_request_header_effect"].set_string_value( - "%FILTER_STATE(envoy.filters.http.ext_proc:FIELD:request_header_processing_effect)%"); - + (*json_format->mutable_fields())["field_request_header_cr"].set_string_value( + "%FILTER_STATE(envoy.filters.http.ext_proc:FIELD:request_header_continue_and_replace)%"); + (*json_format->mutable_fields())["field_response_header_cr"].set_string_value( + "%FILTER_STATE(envoy.filters.http.ext_proc:FIELD:response_header_continue_and_replace)%"); + (*json_format->mutable_fields())["field_immeidate_response"].set_string_value( + "%FILTER_STATE(envoy.filters.http.ext_proc:FIELD:immediate_response)%"); access_log->mutable_typed_config()->PackFrom(access_log_config); }); initializeConfig(); @@ -4788,12 +4790,15 @@ TEST_P(ExtProcIntegrationTest, ExtProcLoggingInfoContinueAndReplace) { // Ensure that we replaced and did not append to the request. EXPECT_EQ(upstream_request_->body().toString(), "Hello, Server!"); std::string log_result = waitForAccessLog(access_log_path, 0, true); - LOG(INFO) << log_result; - // auto json_log = Json::Factory::loadFromString(log_result).value(); - // // 0: NONE, 1: CONTENT_MODIFIED, 2: IMMEDIATE_RESPONSE, 3: CONTINUE_AND_REPLACE - // auto field_request_header_effect = json_log->getString("field_request_header_effect"); - // EXPECT_TRUE(field_request_header_effect.ok()); - // EXPECT_EQ(*field_request_header_effect, "3"); + auto json_log = Json::Factory::loadFromString(log_result).value(); + auto field_request_header_continue = json_log->getString("field_request_header_cr"); + EXPECT_TRUE(field_request_header_continue.ok()); + EXPECT_EQ(*field_request_header_continue, "1"); + auto field_response_header_continue = json_log->getString("field_response_header_cr"); + EXPECT_TRUE(field_response_header_continue.ok()); + EXPECT_EQ(*field_response_header_continue, "0"); + auto field_immediate_response = json_log->getString("field_immeidate_response"); + EXPECT_EQ(*field_immediate_response, "0"); cleanupUpstreamAndDownstream(); } @@ -4801,60 +4806,52 @@ TEST_P(ExtProcIntegrationTest, ExtProcLoggingInfoContinueAndReplace) { // Test immediate_response behavior with STREAMED request body. Even though the // headers have been processed, an immediate response on a request body chunk // should still be seen by the downstream. -// TEST_P(ExtProcIntegrationTest, ExtProcLoggingInfoImmediateResponse) { -// auto access_log_path = TestEnvironment::temporaryPath(TestUtility::uniqueFilename()); - -// config_helper_.addConfigModifier([&](HttpConnectionManager& cm) { -// auto* access_log = cm.add_access_log(); -// access_log->set_name("accesslog"); -// envoy::extensions::access_loggers::file::v3::FileAccessLog access_log_config; -// access_log_config.set_path(access_log_path); -// auto* json_format = access_log_config.mutable_log_format()->mutable_json_format(); - -// // Test field extraction for coverage. -// (*json_format->mutable_fields())["field_request_header_effect"].set_string_value( -// "%FILTER_STATE(envoy.filters.http.ext_proc:FIELD:request_header_processing_effect)%"); -// (*json_format->mutable_fields())["field_request_body_effect"].set_string_value( -// "%FILTER_STATE(envoy.filters.http.ext_proc:FIELD:request_body_processing_effect)%"); - -// access_log->mutable_typed_config()->PackFrom(access_log_config); -// }); -// proto_config_.mutable_processing_mode()->set_request_body_mode(ProcessingMode::STREAMED); -// ConfigOptions config_options; -// config_options.add_response_processor = true; -// initializeConfig(config_options); -// HttpIntegrationTest::initialize(); -// auto response = sendDownstreamRequestWithBody("Evil content!", absl::nullopt); -// processRequestHeadersMessage( -// *grpc_upstreams_[0], true, [](const HttpHeaders&, HeadersResponse& resp) { -// auto* hdr = resp.mutable_response()->mutable_header_mutation()->add_set_headers(); -// hdr->mutable_append()->set_value(false); -// hdr->mutable_header()->set_key("foo"); -// hdr->mutable_header()->set_raw_value("bar"); -// return true; -// }); -// processAndRespondImmediately(*grpc_upstreams_[0], false, [](ImmediateResponse& immediate) { -// immediate.mutable_status()->set_code(envoy::type::v3::StatusCode::BadRequest); -// immediate.set_body("{\"reason\": \"Request too evil\"}"); -// immediate.set_details("Failed because I don't like this payload"); -// }); -// // ext_proc will immediately close side stream in this case, which causes it -// // to be reset, since side stream codec had not yet observed server trailers. -// EXPECT_TRUE(processor_stream_->waitForReset()); - -// verifyDownstreamResponse(*response, 400); -// std::string log_result = waitForAccessLog(access_log_path, 0, true); -// auto json_log = Json::Factory::loadFromString(log_result).value(); -// // 0: NONE, 1: CONTENT_MODIFIED, 2: IMMEDIATE_RESPONSE, 3: CONTINUE_AND_REPLACE -// auto field_request_header_effect = json_log->getString("field_request_header_effect"); -// EXPECT_TRUE(field_request_header_effect.ok()); -// EXPECT_EQ(*field_request_header_effect, "1"); - -// auto field_request_body_effect = json_log->getString("field_request_body_effect"); -// EXPECT_TRUE(field_request_body_effect.ok()); -// EXPECT_EQ(*field_request_body_effect, "2"); -// cleanupUpstreamAndDownstream(); -// } +TEST_P(ExtProcIntegrationTest, ExtProcLoggingInfoImmediateResponse) { + auto access_log_path = TestEnvironment::temporaryPath(TestUtility::uniqueFilename()); + + config_helper_.addConfigModifier([&](HttpConnectionManager& cm) { + auto* access_log = cm.add_access_log(); + access_log->set_name("accesslog"); + envoy::extensions::access_loggers::file::v3::FileAccessLog access_log_config; + access_log_config.set_path(access_log_path); + auto* json_format = access_log_config.mutable_log_format()->mutable_json_format(); + + // Test field extraction for coverage. + (*json_format->mutable_fields())["field_immeidate_response"].set_string_value( + "%FILTER_STATE(envoy.filters.http.ext_proc:FIELD:immediate_response)%"); + + access_log->mutable_typed_config()->PackFrom(access_log_config); + }); + proto_config_.mutable_processing_mode()->set_request_body_mode(ProcessingMode::STREAMED); + ConfigOptions config_options; + config_options.add_response_processor = true; + initializeConfig(config_options); + HttpIntegrationTest::initialize(); + auto response = sendDownstreamRequestWithBody("Evil content!", absl::nullopt); + processRequestHeadersMessage( + *grpc_upstreams_[0], true, [](const HttpHeaders&, HeadersResponse& resp) { + auto* hdr = resp.mutable_response()->mutable_header_mutation()->add_set_headers(); + hdr->mutable_append()->set_value(false); + hdr->mutable_header()->set_key("foo"); + hdr->mutable_header()->set_raw_value("bar"); + return true; + }); + processAndRespondImmediately(*grpc_upstreams_[0], false, [](ImmediateResponse& immediate) { + immediate.mutable_status()->set_code(envoy::type::v3::StatusCode::BadRequest); + immediate.set_body("{\"reason\": \"Request too evil\"}"); + immediate.set_details("Failed because I don't like this payload"); + }); + // ext_proc will immediately close side stream in this case, which causes it + // to be reset, since side stream codec had not yet observed server trailers. + EXPECT_TRUE(processor_stream_->waitForReset()); + + verifyDownstreamResponse(*response, 400); + std::string log_result = waitForAccessLog(access_log_path, 0, true); + auto json_log = Json::Factory::loadFromString(log_result).value(); + auto field_immediate_response = json_log->getString("field_immeidate_response"); + EXPECT_EQ(*field_immediate_response, "1"); + cleanupUpstreamAndDownstream(); +} } // namespace ExternalProcessing } // namespace HttpFilters From 771a10f3d548eb81b59d3d62f3b2610995997be0 Mon Sep 17 00:00:00 2001 From: Melissa Ginaldi Date: Fri, 17 Oct 2025 17:56:29 +0000 Subject: [PATCH 3/5] Clang format corrections Signed-off-by: Melissa Ginaldi --- .../filters/http/ext_proc/ext_proc.cc | 28 ++++++++++--------- .../filters/http/ext_proc/ext_proc.h | 5 ++-- 2 files changed, 18 insertions(+), 15 deletions(-) diff --git a/source/extensions/filters/http/ext_proc/ext_proc.cc b/source/extensions/filters/http/ext_proc/ext_proc.cc index 3f36a15062b84..271baa6bc29e0 100644 --- a/source/extensions/filters/http/ext_proc/ext_proc.cc +++ b/source/extensions/filters/http/ext_proc/ext_proc.cc @@ -74,8 +74,10 @@ constexpr absl::string_view ResponseTrailerCallStatusField = "response_trailer_c constexpr absl::string_view BytesSentField = "bytes_sent"; constexpr absl::string_view BytesReceivedField = "bytes_received"; constexpr absl::string_view ImmediateResonseField = "immediate_response"; -constexpr absl::string_view RequestHeaderContinueAndReplaceField = "request_header_continue_and_replace"; -constexpr absl::string_view ResponseHeaderContinueAndReplaceField = "response_header_continue_and_replace"; +constexpr absl::string_view RequestHeaderContinueAndReplaceField = + "request_header_continue_and_replace"; +constexpr absl::string_view ResponseHeaderContinueAndReplaceField = + "response_header_continue_and_replace"; absl::optional initProcessingMode(const ExtProcPerRoute& config) { if (!config.disabled() && config.has_overrides() && config.overrides().has_processing_mode()) { @@ -280,10 +282,11 @@ FilterConfig::FilterConfig(const ExternalProcessor& config, [](Envoy::Event::Dispatcher&) { return std::make_shared(); }); } -void ExtProcLoggingInfo::recordGrpcCall( - std::chrono::microseconds latency, Grpc::Status::GrpcStatus call_status, - ProcessorState::CallbackState callback_state, - envoy::config::core::v3::TrafficDirection traffic_direction, bool continue_and_replace) { +void ExtProcLoggingInfo::recordGrpcCall(std::chrono::microseconds latency, + Grpc::Status::GrpcStatus call_status, + ProcessorState::CallbackState callback_state, + envoy::config::core::v3::TrafficDirection traffic_direction, + bool continue_and_replace) { ASSERT(callback_state != ProcessorState::CallbackState::Idle); // Record the gRPC call stats for the header. @@ -291,7 +294,7 @@ void ExtProcLoggingInfo::recordGrpcCall( if (grpcCalls(traffic_direction).header_stats_ == nullptr) { grpcCalls(traffic_direction).header_stats_ = std::make_unique(latency, call_status); } - if (continue_and_replace){ + if (continue_and_replace) { grpcCalls(traffic_direction).continue_and_replace_ = true; } return; @@ -397,8 +400,7 @@ ProtobufTypes::MessagePtr ExtProcLoggingInfo::serializeAsProto() const { static_cast(bytes_sent_)); (*struct_msg->mutable_fields())[BytesReceivedField].set_number_value( static_cast(bytes_received_)); - (*struct_msg->mutable_fields())[ImmediateResonseField].set_bool_value( - immediate_response_); + (*struct_msg->mutable_fields())[ImmediateResonseField].set_bool_value(immediate_response_); (*struct_msg->mutable_fields())[RequestHeaderContinueAndReplaceField].set_bool_value( decoding_processor_grpc_calls_.continue_and_replace_); (*struct_msg->mutable_fields())[ResponseHeaderContinueAndReplaceField].set_bool_value( @@ -507,13 +509,13 @@ ExtProcLoggingInfo::getField(absl::string_view field_name) const { if (field_name == BytesReceivedField) { return static_cast(bytes_received_); } - if (field_name == ImmediateResonseField){ + if (field_name == ImmediateResonseField) { return bool(immediate_response_); } - if (field_name == RequestHeaderContinueAndReplaceField){ + if (field_name == RequestHeaderContinueAndReplaceField) { return bool(decoding_processor_grpc_calls_.continue_and_replace_); } - if (field_name == ResponseHeaderContinueAndReplaceField){ + if (field_name == ResponseHeaderContinueAndReplaceField) { return bool(encoding_processor_grpc_calls_.continue_and_replace_); } return {}; @@ -1811,7 +1813,7 @@ void Filter::sendImmediateResponse(const ImmediateResponse& response) { }; sent_immediate_response_ = true; - if (logging_info_ != nullptr){ + if (logging_info_ != nullptr) { logging_info_->setImmediateResponse(); } ENVOY_STREAM_LOG(debug, "Sending local reply with status code {}", *decoder_callbacks_, diff --git a/source/extensions/filters/http/ext_proc/ext_proc.h b/source/extensions/filters/http/ext_proc/ext_proc.h index 488d6240d7c41..8287f71a8bdd2 100644 --- a/source/extensions/filters/http/ext_proc/ext_proc.h +++ b/source/extensions/filters/http/ext_proc/ext_proc.h @@ -96,8 +96,9 @@ class ExtProcLoggingInfo : public Envoy::StreamInfo::FilterState::Object { void recordGrpcCall(std::chrono::microseconds latency, Grpc::Status::GrpcStatus call_status, ProcessorState::CallbackState callback_state, - envoy::config::core::v3::TrafficDirection traffic_direction, bool continue_and_replace = false); - void setImmediateResponse(){immediate_response_ = true;}; + envoy::config::core::v3::TrafficDirection traffic_direction, + bool continue_and_replace = false); + void setImmediateResponse() { immediate_response_ = true; }; void setBytesSent(uint64_t bytes_sent) { bytes_sent_ = bytes_sent; } void setBytesReceived(uint64_t bytes_received) { bytes_received_ = bytes_received; } void setClusterInfo(absl::optional cluster_info) { From 169a01640235b2190884053a507d3390b5130b29 Mon Sep 17 00:00:00 2001 From: Melissa Ginaldi Date: Fri, 17 Oct 2025 17:59:40 +0000 Subject: [PATCH 4/5] Clang format and remove deugging comment Signed-off-by: Melissa Ginaldi --- source/extensions/filters/http/ext_proc/processor_state.cc | 4 ++-- source/extensions/filters/http/ext_proc/processor_state.h | 3 ++- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/source/extensions/filters/http/ext_proc/processor_state.cc b/source/extensions/filters/http/ext_proc/processor_state.cc index 9864d6d957e77..fb1bfcd429310 100644 --- a/source/extensions/filters/http/ext_proc/processor_state.cc +++ b/source/extensions/filters/http/ext_proc/processor_state.cc @@ -49,7 +49,8 @@ void ProcessorState::onFinishProcessorCall(Grpc::Status::GrpcStatus call_status, filter_callbacks_->dispatcher().timeSource().monotonicTime() - call_start_time_.value()); ExtProcLoggingInfo* logging_info = filter_.loggingInfo(); if (logging_info != nullptr) { - logging_info->recordGrpcCall(duration, call_status, callback_state_, trafficDirection(), continue_and_replace); + logging_info->recordGrpcCall(duration, call_status, callback_state_, trafficDirection(), + continue_and_replace); } call_start_time_ = absl::nullopt; } @@ -165,7 +166,6 @@ absl::Status ProcessorState::handleHeadersResponse(const HeadersResponse& respon } clearRouteCache(common_response); - // onFinishProcessorCall(Grpc::Status::Ok, getCallbackStateAfterHeaderResp(common_response)); if (common_response.status() == CommonResponse::CONTINUE_AND_REPLACE) { onFinishProcessorCall(Grpc::Status::Ok, getCallbackStateAfterHeaderResp(common_response), true); diff --git a/source/extensions/filters/http/ext_proc/processor_state.h b/source/extensions/filters/http/ext_proc/processor_state.h index 612cc61b77fb6..b2f6298e0a6bd 100644 --- a/source/extensions/filters/http/ext_proc/processor_state.h +++ b/source/extensions/filters/http/ext_proc/processor_state.h @@ -149,7 +149,8 @@ class ProcessorState : public Logger::Loggable { void onStartProcessorCall(Event::TimerCb cb, std::chrono::milliseconds timeout, CallbackState callback_state); void onFinishProcessorCall(Grpc::Status::GrpcStatus call_status, - CallbackState next_state = CallbackState::Idle, bool continue_and_replace = false); + CallbackState next_state = CallbackState::Idle, + bool continue_and_replace = false); void stopMessageTimer(); bool restartMessageTimer(const uint32_t message_timeout_ms); From 90801e8007da277e63a315a52467d8dd1c13d6b6 Mon Sep 17 00:00:00 2001 From: Melissa Ginaldi Date: Fri, 24 Oct 2025 20:11:43 +0000 Subject: [PATCH 5/5] Fix small typos Signed-off-by: Melissa Ginaldi --- source/extensions/filters/http/ext_proc/ext_proc.cc | 6 +++--- source/extensions/filters/http/ext_proc/ext_proc.h | 4 ++-- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/source/extensions/filters/http/ext_proc/ext_proc.cc b/source/extensions/filters/http/ext_proc/ext_proc.cc index 271baa6bc29e0..17bf88d386f4f 100644 --- a/source/extensions/filters/http/ext_proc/ext_proc.cc +++ b/source/extensions/filters/http/ext_proc/ext_proc.cc @@ -73,7 +73,7 @@ constexpr absl::string_view ResponseTrailerLatencyUsField = "response_trailer_la constexpr absl::string_view ResponseTrailerCallStatusField = "response_trailer_call_status"; constexpr absl::string_view BytesSentField = "bytes_sent"; constexpr absl::string_view BytesReceivedField = "bytes_received"; -constexpr absl::string_view ImmediateResonseField = "immediate_response"; +constexpr absl::string_view ImmediateResponseField = "immediate_response"; constexpr absl::string_view RequestHeaderContinueAndReplaceField = "request_header_continue_and_replace"; constexpr absl::string_view ResponseHeaderContinueAndReplaceField = @@ -400,7 +400,7 @@ ProtobufTypes::MessagePtr ExtProcLoggingInfo::serializeAsProto() const { static_cast(bytes_sent_)); (*struct_msg->mutable_fields())[BytesReceivedField].set_number_value( static_cast(bytes_received_)); - (*struct_msg->mutable_fields())[ImmediateResonseField].set_bool_value(immediate_response_); + (*struct_msg->mutable_fields())[ImmediateResponseField].set_bool_value(immediate_response_); (*struct_msg->mutable_fields())[RequestHeaderContinueAndReplaceField].set_bool_value( decoding_processor_grpc_calls_.continue_and_replace_); (*struct_msg->mutable_fields())[ResponseHeaderContinueAndReplaceField].set_bool_value( @@ -509,7 +509,7 @@ ExtProcLoggingInfo::getField(absl::string_view field_name) const { if (field_name == BytesReceivedField) { return static_cast(bytes_received_); } - if (field_name == ImmediateResonseField) { + if (field_name == ImmediateResponseField) { return bool(immediate_response_); } if (field_name == RequestHeaderContinueAndReplaceField) { diff --git a/source/extensions/filters/http/ext_proc/ext_proc.h b/source/extensions/filters/http/ext_proc/ext_proc.h index 8287f71a8bdd2..6fa2c50b4aaa4 100644 --- a/source/extensions/filters/http/ext_proc/ext_proc.h +++ b/source/extensions/filters/http/ext_proc/ext_proc.h @@ -98,7 +98,7 @@ class ExtProcLoggingInfo : public Envoy::StreamInfo::FilterState::Object { ProcessorState::CallbackState callback_state, envoy::config::core::v3::TrafficDirection traffic_direction, bool continue_and_replace = false); - void setImmediateResponse() { immediate_response_ = true; }; + void setImmediateResponse() { immediate_response_ = true; } void setBytesSent(uint64_t bytes_sent) { bytes_sent_ = bytes_sent; } void setBytesReceived(uint64_t bytes_received) { bytes_received_ = bytes_received; } void setClusterInfo(absl::optional cluster_info) { @@ -147,7 +147,7 @@ class ExtProcLoggingInfo : public Envoy::StreamInfo::FilterState::Object { Upstream::HostDescriptionConstSharedPtr upstream_host_; // The status details of the underlying HTTP/2 stream. Envoy gRPC only. std::string http_response_code_details_; - // True if an immediate response is sent + // True if an immediate response is sent. bool immediate_response_{false}; };