diff --git a/source/extensions/filters/http/ext_proc/ext_proc.cc b/source/extensions/filters/http/ext_proc/ext_proc.cc index 84305210d6d2b..17bf88d386f4f 100644 --- a/source/extensions/filters/http/ext_proc/ext_proc.cc +++ b/source/extensions/filters/http/ext_proc/ext_proc.cc @@ -73,6 +73,11 @@ constexpr absl::string_view ResponseTrailerLatencyUsField = "response_trailer_la constexpr absl::string_view ResponseTrailerCallStatusField = "response_trailer_call_status"; constexpr absl::string_view BytesSentField = "bytes_sent"; constexpr absl::string_view BytesReceivedField = "bytes_received"; +constexpr absl::string_view ImmediateResponseField = "immediate_response"; +constexpr absl::string_view RequestHeaderContinueAndReplaceField = + "request_header_continue_and_replace"; +constexpr absl::string_view ResponseHeaderContinueAndReplaceField = + "response_header_continue_and_replace"; absl::optional initProcessingMode(const ExtProcPerRoute& config) { if (!config.disabled() && config.has_overrides() && config.overrides().has_processing_mode()) { @@ -277,10 +282,11 @@ FilterConfig::FilterConfig(const ExternalProcessor& config, [](Envoy::Event::Dispatcher&) { return std::make_shared(); }); } -void ExtProcLoggingInfo::recordGrpcCall( - std::chrono::microseconds latency, Grpc::Status::GrpcStatus call_status, - ProcessorState::CallbackState callback_state, - envoy::config::core::v3::TrafficDirection traffic_direction) { +void ExtProcLoggingInfo::recordGrpcCall(std::chrono::microseconds latency, + Grpc::Status::GrpcStatus call_status, + ProcessorState::CallbackState callback_state, + envoy::config::core::v3::TrafficDirection traffic_direction, + bool continue_and_replace) { ASSERT(callback_state != ProcessorState::CallbackState::Idle); // Record the gRPC call stats for the header. @@ -288,6 +294,9 @@ void ExtProcLoggingInfo::recordGrpcCall( if (grpcCalls(traffic_direction).header_stats_ == nullptr) { grpcCalls(traffic_direction).header_stats_ = std::make_unique(latency, call_status); } + if (continue_and_replace) { + grpcCalls(traffic_direction).continue_and_replace_ = true; + } return; } @@ -391,6 +400,11 @@ ProtobufTypes::MessagePtr ExtProcLoggingInfo::serializeAsProto() const { static_cast(bytes_sent_)); (*struct_msg->mutable_fields())[BytesReceivedField].set_number_value( static_cast(bytes_received_)); + (*struct_msg->mutable_fields())[ImmediateResponseField].set_bool_value(immediate_response_); + (*struct_msg->mutable_fields())[RequestHeaderContinueAndReplaceField].set_bool_value( + decoding_processor_grpc_calls_.continue_and_replace_); + (*struct_msg->mutable_fields())[ResponseHeaderContinueAndReplaceField].set_bool_value( + encoding_processor_grpc_calls_.continue_and_replace_); return struct_msg; } @@ -495,6 +509,15 @@ ExtProcLoggingInfo::getField(absl::string_view field_name) const { if (field_name == BytesReceivedField) { return static_cast(bytes_received_); } + if (field_name == ImmediateResponseField) { + return bool(immediate_response_); + } + if (field_name == RequestHeaderContinueAndReplaceField) { + return bool(decoding_processor_grpc_calls_.continue_and_replace_); + } + if (field_name == ResponseHeaderContinueAndReplaceField) { + return bool(encoding_processor_grpc_calls_.continue_and_replace_); + } return {}; } @@ -1790,6 +1813,9 @@ void Filter::sendImmediateResponse(const ImmediateResponse& response) { }; sent_immediate_response_ = true; + if (logging_info_ != nullptr) { + logging_info_->setImmediateResponse(); + } ENVOY_STREAM_LOG(debug, "Sending local reply with status code {}", *decoder_callbacks_, status_code); const auto details = StringUtil::replaceAllEmptySpace(response.details()); diff --git a/source/extensions/filters/http/ext_proc/ext_proc.h b/source/extensions/filters/http/ext_proc/ext_proc.h index 84c4547c2b06c..6fa2c50b4aaa4 100644 --- a/source/extensions/filters/http/ext_proc/ext_proc.h +++ b/source/extensions/filters/http/ext_proc/ext_proc.h @@ -89,13 +89,16 @@ class ExtProcLoggingInfo : public Envoy::StreamInfo::FilterState::Object { std::unique_ptr header_stats_; std::unique_ptr trailer_stats_; std::unique_ptr body_stats_; + bool continue_and_replace_; }; using GrpcCalls = struct GrpcCallStats; void recordGrpcCall(std::chrono::microseconds latency, Grpc::Status::GrpcStatus call_status, ProcessorState::CallbackState callback_state, - envoy::config::core::v3::TrafficDirection traffic_direction); + envoy::config::core::v3::TrafficDirection traffic_direction, + bool continue_and_replace = false); + void setImmediateResponse() { immediate_response_ = true; } void setBytesSent(uint64_t bytes_sent) { bytes_sent_ = bytes_sent; } void setBytesReceived(uint64_t bytes_received) { bytes_received_ = bytes_received; } void setClusterInfo(absl::optional cluster_info) { @@ -134,8 +137,8 @@ class ExtProcLoggingInfo : public Envoy::StreamInfo::FilterState::Object { private: GrpcCalls& grpcCalls(envoy::config::core::v3::TrafficDirection traffic_direction); - GrpcCalls decoding_processor_grpc_calls_; - GrpcCalls encoding_processor_grpc_calls_; + GrpcCalls decoding_processor_grpc_calls_{}; + GrpcCalls encoding_processor_grpc_calls_{}; const Envoy::Protobuf::Struct filter_metadata_; // The following stats are populated for ext_proc filters using Envoy gRPC only. // The bytes sent and received are for the entire stream. @@ -144,6 +147,8 @@ class ExtProcLoggingInfo : public Envoy::StreamInfo::FilterState::Object { Upstream::HostDescriptionConstSharedPtr upstream_host_; // The status details of the underlying HTTP/2 stream. Envoy gRPC only. std::string http_response_code_details_; + // True if an immediate response is sent. + bool immediate_response_{false}; }; class ThreadLocalStreamManager; diff --git a/source/extensions/filters/http/ext_proc/processor_state.cc b/source/extensions/filters/http/ext_proc/processor_state.cc index 38e094d0e19d9..fb1bfcd429310 100644 --- a/source/extensions/filters/http/ext_proc/processor_state.cc +++ b/source/extensions/filters/http/ext_proc/processor_state.cc @@ -38,7 +38,7 @@ void ProcessorState::onStartProcessorCall(Event::TimerCb cb, std::chrono::millis } void ProcessorState::onFinishProcessorCall(Grpc::Status::GrpcStatus call_status, - CallbackState next_state) { + CallbackState next_state, bool continue_and_replace) { ENVOY_STREAM_LOG(debug, "Finish external processing call", *filter_callbacks_); filter_.logStreamInfo(); @@ -49,7 +49,8 @@ void ProcessorState::onFinishProcessorCall(Grpc::Status::GrpcStatus call_status, filter_callbacks_->dispatcher().timeSource().monotonicTime() - call_start_time_.value()); ExtProcLoggingInfo* logging_info = filter_.loggingInfo(); if (logging_info != nullptr) { - logging_info->recordGrpcCall(duration, call_status, callback_state_, trafficDirection()); + logging_info->recordGrpcCall(duration, call_status, callback_state_, trafficDirection(), + continue_and_replace); } call_start_time_ = absl::nullopt; } @@ -165,10 +166,12 @@ absl::Status ProcessorState::handleHeadersResponse(const HeadersResponse& respon } clearRouteCache(common_response); - onFinishProcessorCall(Grpc::Status::Ok, getCallbackStateAfterHeaderResp(common_response)); if (common_response.status() == CommonResponse::CONTINUE_AND_REPLACE) { + onFinishProcessorCall(Grpc::Status::Ok, getCallbackStateAfterHeaderResp(common_response), true); return handleHeaderContinueAndReplace(response); + } else { + onFinishProcessorCall(Grpc::Status::Ok, getCallbackStateAfterHeaderResp(common_response)); } filter_.onProcessHeadersResponse(response, absl::OkStatus(), trafficDirection()); diff --git a/source/extensions/filters/http/ext_proc/processor_state.h b/source/extensions/filters/http/ext_proc/processor_state.h index 86e12fb55e791..b2f6298e0a6bd 100644 --- a/source/extensions/filters/http/ext_proc/processor_state.h +++ b/source/extensions/filters/http/ext_proc/processor_state.h @@ -149,7 +149,8 @@ class ProcessorState : public Logger::Loggable { void onStartProcessorCall(Event::TimerCb cb, std::chrono::milliseconds timeout, CallbackState callback_state); void onFinishProcessorCall(Grpc::Status::GrpcStatus call_status, - CallbackState next_state = CallbackState::Idle); + CallbackState next_state = CallbackState::Idle, + bool continue_and_replace = false); void stopMessageTimer(); bool restartMessageTimer(const uint32_t message_timeout_ms); diff --git a/test/extensions/filters/http/ext_proc/ext_proc_integration_test.cc b/test/extensions/filters/http/ext_proc/ext_proc_integration_test.cc index fd6d0033db046..4fc8126448ae5 100644 --- a/test/extensions/filters/http/ext_proc/ext_proc_integration_test.cc +++ b/test/extensions/filters/http/ext_proc/ext_proc_integration_test.cc @@ -4752,6 +4752,107 @@ TEST_P(ExtProcIntegrationTest, AccessLogExtProcInCompositeFilter) { EXPECT_THAT(log_content, testing::HasSubstr("response_header_latency_us")); } +// Test the ability of the filter to completely replace a request message with a new +// request message. +TEST_P(ExtProcIntegrationTest, ExtProcLoggingInfoContinueAndReplace) { + auto access_log_path = TestEnvironment::temporaryPath(TestUtility::uniqueFilename()); + proto_config_.mutable_processing_mode()->set_request_header_mode(ProcessingMode::SEND); + + config_helper_.addConfigModifier([&](HttpConnectionManager& cm) { + auto* access_log = cm.add_access_log(); + access_log->set_name("accesslog"); + envoy::extensions::access_loggers::file::v3::FileAccessLog access_log_config; + access_log_config.set_path(access_log_path); + auto* json_format = access_log_config.mutable_log_format()->mutable_json_format(); + // Test field extraction for coverage. + (*json_format->mutable_fields())["field_request_header_cr"].set_string_value( + "%FILTER_STATE(envoy.filters.http.ext_proc:FIELD:request_header_continue_and_replace)%"); + (*json_format->mutable_fields())["field_response_header_cr"].set_string_value( + "%FILTER_STATE(envoy.filters.http.ext_proc:FIELD:response_header_continue_and_replace)%"); + (*json_format->mutable_fields())["field_immeidate_response"].set_string_value( + "%FILTER_STATE(envoy.filters.http.ext_proc:FIELD:immediate_response)%"); + access_log->mutable_typed_config()->PackFrom(access_log_config); + }); + initializeConfig(); + HttpIntegrationTest::initialize(); + auto response = sendDownstreamRequestWithBody("Replace this!", absl::nullopt); + processRequestHeadersMessage( + *grpc_upstreams_[0], true, [](const HttpHeaders&, HeadersResponse& headers_resp) { + headers_resp.mutable_response()->mutable_body_mutation()->set_body("Hello, Server!"); + // This special status tells us to replace the whole request + headers_resp.mutable_response()->set_status(CommonResponse::CONTINUE_AND_REPLACE); + return true; + }); + handleUpstreamRequest(); + processResponseHeadersMessage(*grpc_upstreams_[0], false, absl::nullopt); + verifyDownstreamResponse(*response, 200); + + // Ensure that we replaced and did not append to the request. + EXPECT_EQ(upstream_request_->body().toString(), "Hello, Server!"); + std::string log_result = waitForAccessLog(access_log_path, 0, true); + auto json_log = Json::Factory::loadFromString(log_result).value(); + auto field_request_header_continue = json_log->getString("field_request_header_cr"); + EXPECT_TRUE(field_request_header_continue.ok()); + EXPECT_EQ(*field_request_header_continue, "1"); + auto field_response_header_continue = json_log->getString("field_response_header_cr"); + EXPECT_TRUE(field_response_header_continue.ok()); + EXPECT_EQ(*field_response_header_continue, "0"); + auto field_immediate_response = json_log->getString("field_immeidate_response"); + EXPECT_EQ(*field_immediate_response, "0"); + + cleanupUpstreamAndDownstream(); +} + +// Test immediate_response behavior with STREAMED request body. Even though the +// headers have been processed, an immediate response on a request body chunk +// should still be seen by the downstream. +TEST_P(ExtProcIntegrationTest, ExtProcLoggingInfoImmediateResponse) { + auto access_log_path = TestEnvironment::temporaryPath(TestUtility::uniqueFilename()); + + config_helper_.addConfigModifier([&](HttpConnectionManager& cm) { + auto* access_log = cm.add_access_log(); + access_log->set_name("accesslog"); + envoy::extensions::access_loggers::file::v3::FileAccessLog access_log_config; + access_log_config.set_path(access_log_path); + auto* json_format = access_log_config.mutable_log_format()->mutable_json_format(); + + // Test field extraction for coverage. + (*json_format->mutable_fields())["field_immeidate_response"].set_string_value( + "%FILTER_STATE(envoy.filters.http.ext_proc:FIELD:immediate_response)%"); + + access_log->mutable_typed_config()->PackFrom(access_log_config); + }); + proto_config_.mutable_processing_mode()->set_request_body_mode(ProcessingMode::STREAMED); + ConfigOptions config_options; + config_options.add_response_processor = true; + initializeConfig(config_options); + HttpIntegrationTest::initialize(); + auto response = sendDownstreamRequestWithBody("Evil content!", absl::nullopt); + processRequestHeadersMessage( + *grpc_upstreams_[0], true, [](const HttpHeaders&, HeadersResponse& resp) { + auto* hdr = resp.mutable_response()->mutable_header_mutation()->add_set_headers(); + hdr->mutable_append()->set_value(false); + hdr->mutable_header()->set_key("foo"); + hdr->mutable_header()->set_raw_value("bar"); + return true; + }); + processAndRespondImmediately(*grpc_upstreams_[0], false, [](ImmediateResponse& immediate) { + immediate.mutable_status()->set_code(envoy::type::v3::StatusCode::BadRequest); + immediate.set_body("{\"reason\": \"Request too evil\"}"); + immediate.set_details("Failed because I don't like this payload"); + }); + // ext_proc will immediately close side stream in this case, which causes it + // to be reset, since side stream codec had not yet observed server trailers. + EXPECT_TRUE(processor_stream_->waitForReset()); + + verifyDownstreamResponse(*response, 400); + std::string log_result = waitForAccessLog(access_log_path, 0, true); + auto json_log = Json::Factory::loadFromString(log_result).value(); + auto field_immediate_response = json_log->getString("field_immeidate_response"); + EXPECT_EQ(*field_immediate_response, "1"); + cleanupUpstreamAndDownstream(); +} + } // namespace ExternalProcessing } // namespace HttpFilters } // namespace Extensions