Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 30 additions & 4 deletions source/extensions/filters/http/ext_proc/ext_proc.cc
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,11 @@ constexpr absl::string_view ResponseTrailerLatencyUsField = "response_trailer_la
constexpr absl::string_view ResponseTrailerCallStatusField = "response_trailer_call_status";
constexpr absl::string_view BytesSentField = "bytes_sent";
constexpr absl::string_view BytesReceivedField = "bytes_received";
constexpr absl::string_view ImmediateResonseField = "immediate_response";
constexpr absl::string_view RequestHeaderContinueAndReplaceField =
"request_header_continue_and_replace";
constexpr absl::string_view ResponseHeaderContinueAndReplaceField =
"response_header_continue_and_replace";

absl::optional<ProcessingMode> initProcessingMode(const ExtProcPerRoute& config) {
if (!config.disabled() && config.has_overrides() && config.overrides().has_processing_mode()) {
Expand Down Expand Up @@ -277,17 +282,21 @@ FilterConfig::FilterConfig(const ExternalProcessor& config,
[](Envoy::Event::Dispatcher&) { return std::make_shared<ThreadLocalStreamManager>(); });
}

void ExtProcLoggingInfo::recordGrpcCall(
std::chrono::microseconds latency, Grpc::Status::GrpcStatus call_status,
ProcessorState::CallbackState callback_state,
envoy::config::core::v3::TrafficDirection traffic_direction) {
void ExtProcLoggingInfo::recordGrpcCall(std::chrono::microseconds latency,
Grpc::Status::GrpcStatus call_status,
ProcessorState::CallbackState callback_state,
envoy::config::core::v3::TrafficDirection traffic_direction,
bool continue_and_replace) {
ASSERT(callback_state != ProcessorState::CallbackState::Idle);

// Record the gRPC call stats for the header.
if (callback_state == ProcessorState::CallbackState::HeadersCallback) {
if (grpcCalls(traffic_direction).header_stats_ == nullptr) {
grpcCalls(traffic_direction).header_stats_ = std::make_unique<GrpcCall>(latency, call_status);
}
if (continue_and_replace) {
grpcCalls(traffic_direction).continue_and_replace_ = true;
}
return;
}

Expand Down Expand Up @@ -391,6 +400,11 @@ ProtobufTypes::MessagePtr ExtProcLoggingInfo::serializeAsProto() const {
static_cast<double>(bytes_sent_));
(*struct_msg->mutable_fields())[BytesReceivedField].set_number_value(
static_cast<double>(bytes_received_));
(*struct_msg->mutable_fields())[ImmediateResonseField].set_bool_value(immediate_response_);
(*struct_msg->mutable_fields())[RequestHeaderContinueAndReplaceField].set_bool_value(
decoding_processor_grpc_calls_.continue_and_replace_);
(*struct_msg->mutable_fields())[ResponseHeaderContinueAndReplaceField].set_bool_value(
encoding_processor_grpc_calls_.continue_and_replace_);
return struct_msg;
}

Expand Down Expand Up @@ -495,6 +509,15 @@ ExtProcLoggingInfo::getField(absl::string_view field_name) const {
if (field_name == BytesReceivedField) {
return static_cast<int64_t>(bytes_received_);
}
if (field_name == ImmediateResonseField) {
return bool(immediate_response_);
}
if (field_name == RequestHeaderContinueAndReplaceField) {
return bool(decoding_processor_grpc_calls_.continue_and_replace_);
}
if (field_name == ResponseHeaderContinueAndReplaceField) {
return bool(encoding_processor_grpc_calls_.continue_and_replace_);
}
return {};
}

Expand Down Expand Up @@ -1790,6 +1813,9 @@ void Filter::sendImmediateResponse(const ImmediateResponse& response) {
};

sent_immediate_response_ = true;
if (logging_info_ != nullptr) {
logging_info_->setImmediateResponse();
}
ENVOY_STREAM_LOG(debug, "Sending local reply with status code {}", *decoder_callbacks_,
status_code);
const auto details = StringUtil::replaceAllEmptySpace(response.details());
Expand Down
11 changes: 8 additions & 3 deletions source/extensions/filters/http/ext_proc/ext_proc.h
Original file line number Diff line number Diff line change
Expand Up @@ -89,13 +89,16 @@ class ExtProcLoggingInfo : public Envoy::StreamInfo::FilterState::Object {
std::unique_ptr<GrpcCall> header_stats_;
std::unique_ptr<GrpcCall> trailer_stats_;
std::unique_ptr<GrpcCallBody> body_stats_;
bool continue_and_replace_;
};

using GrpcCalls = struct GrpcCallStats;

void recordGrpcCall(std::chrono::microseconds latency, Grpc::Status::GrpcStatus call_status,
ProcessorState::CallbackState callback_state,
envoy::config::core::v3::TrafficDirection traffic_direction);
envoy::config::core::v3::TrafficDirection traffic_direction,
bool continue_and_replace = false);
void setImmediateResponse() { immediate_response_ = true; };
void setBytesSent(uint64_t bytes_sent) { bytes_sent_ = bytes_sent; }
void setBytesReceived(uint64_t bytes_received) { bytes_received_ = bytes_received; }
void setClusterInfo(absl::optional<Upstream::ClusterInfoConstSharedPtr> cluster_info) {
Expand Down Expand Up @@ -134,8 +137,8 @@ class ExtProcLoggingInfo : public Envoy::StreamInfo::FilterState::Object {

private:
GrpcCalls& grpcCalls(envoy::config::core::v3::TrafficDirection traffic_direction);
GrpcCalls decoding_processor_grpc_calls_;
GrpcCalls encoding_processor_grpc_calls_;
GrpcCalls decoding_processor_grpc_calls_{};
GrpcCalls encoding_processor_grpc_calls_{};
const Envoy::Protobuf::Struct filter_metadata_;
// The following stats are populated for ext_proc filters using Envoy gRPC only.
// The bytes sent and received are for the entire stream.
Expand All @@ -144,6 +147,8 @@ class ExtProcLoggingInfo : public Envoy::StreamInfo::FilterState::Object {
Upstream::HostDescriptionConstSharedPtr upstream_host_;
// The status details of the underlying HTTP/2 stream. Envoy gRPC only.
std::string http_response_code_details_;
// True if an immediate response is sent
bool immediate_response_{false};
};

class ThreadLocalStreamManager;
Expand Down
9 changes: 6 additions & 3 deletions source/extensions/filters/http/ext_proc/processor_state.cc
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ void ProcessorState::onStartProcessorCall(Event::TimerCb cb, std::chrono::millis
}

void ProcessorState::onFinishProcessorCall(Grpc::Status::GrpcStatus call_status,
CallbackState next_state) {
CallbackState next_state, bool continue_and_replace) {
ENVOY_STREAM_LOG(debug, "Finish external processing call", *filter_callbacks_);
filter_.logStreamInfo();

Expand All @@ -49,7 +49,8 @@ void ProcessorState::onFinishProcessorCall(Grpc::Status::GrpcStatus call_status,
filter_callbacks_->dispatcher().timeSource().monotonicTime() - call_start_time_.value());
ExtProcLoggingInfo* logging_info = filter_.loggingInfo();
if (logging_info != nullptr) {
logging_info->recordGrpcCall(duration, call_status, callback_state_, trafficDirection());
logging_info->recordGrpcCall(duration, call_status, callback_state_, trafficDirection(),
continue_and_replace);
}
call_start_time_ = absl::nullopt;
}
Expand Down Expand Up @@ -165,10 +166,12 @@ absl::Status ProcessorState::handleHeadersResponse(const HeadersResponse& respon
}

clearRouteCache(common_response);
onFinishProcessorCall(Grpc::Status::Ok, getCallbackStateAfterHeaderResp(common_response));

if (common_response.status() == CommonResponse::CONTINUE_AND_REPLACE) {
onFinishProcessorCall(Grpc::Status::Ok, getCallbackStateAfterHeaderResp(common_response), true);
return handleHeaderContinueAndReplace(response);
} else {
onFinishProcessorCall(Grpc::Status::Ok, getCallbackStateAfterHeaderResp(common_response));
}

filter_.onProcessHeadersResponse(response, absl::OkStatus(), trafficDirection());
Expand Down
3 changes: 2 additions & 1 deletion source/extensions/filters/http/ext_proc/processor_state.h
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,8 @@ class ProcessorState : public Logger::Loggable<Logger::Id::ext_proc> {
void onStartProcessorCall(Event::TimerCb cb, std::chrono::milliseconds timeout,
CallbackState callback_state);
void onFinishProcessorCall(Grpc::Status::GrpcStatus call_status,
CallbackState next_state = CallbackState::Idle);
CallbackState next_state = CallbackState::Idle,
bool continue_and_replace = false);
void stopMessageTimer();
bool restartMessageTimer(const uint32_t message_timeout_ms);

Expand Down
101 changes: 101 additions & 0 deletions test/extensions/filters/http/ext_proc/ext_proc_integration_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -4752,6 +4752,107 @@ TEST_P(ExtProcIntegrationTest, AccessLogExtProcInCompositeFilter) {
EXPECT_THAT(log_content, testing::HasSubstr("response_header_latency_us"));
}

// Test the ability of the filter to completely replace a request message with a new
// request message.
TEST_P(ExtProcIntegrationTest, ExtProcLoggingInfoContinueAndReplace) {
auto access_log_path = TestEnvironment::temporaryPath(TestUtility::uniqueFilename());
proto_config_.mutable_processing_mode()->set_request_header_mode(ProcessingMode::SEND);

config_helper_.addConfigModifier([&](HttpConnectionManager& cm) {
auto* access_log = cm.add_access_log();
access_log->set_name("accesslog");
envoy::extensions::access_loggers::file::v3::FileAccessLog access_log_config;
access_log_config.set_path(access_log_path);
auto* json_format = access_log_config.mutable_log_format()->mutable_json_format();
// Test field extraction for coverage.
(*json_format->mutable_fields())["field_request_header_cr"].set_string_value(
"%FILTER_STATE(envoy.filters.http.ext_proc:FIELD:request_header_continue_and_replace)%");
(*json_format->mutable_fields())["field_response_header_cr"].set_string_value(
"%FILTER_STATE(envoy.filters.http.ext_proc:FIELD:response_header_continue_and_replace)%");
(*json_format->mutable_fields())["field_immeidate_response"].set_string_value(
"%FILTER_STATE(envoy.filters.http.ext_proc:FIELD:immediate_response)%");
access_log->mutable_typed_config()->PackFrom(access_log_config);
});
initializeConfig();
HttpIntegrationTest::initialize();
auto response = sendDownstreamRequestWithBody("Replace this!", absl::nullopt);
processRequestHeadersMessage(
*grpc_upstreams_[0], true, [](const HttpHeaders&, HeadersResponse& headers_resp) {
headers_resp.mutable_response()->mutable_body_mutation()->set_body("Hello, Server!");
// This special status tells us to replace the whole request
headers_resp.mutable_response()->set_status(CommonResponse::CONTINUE_AND_REPLACE);
return true;
});
handleUpstreamRequest();
processResponseHeadersMessage(*grpc_upstreams_[0], false, absl::nullopt);
verifyDownstreamResponse(*response, 200);

// Ensure that we replaced and did not append to the request.
EXPECT_EQ(upstream_request_->body().toString(), "Hello, Server!");
std::string log_result = waitForAccessLog(access_log_path, 0, true);
auto json_log = Json::Factory::loadFromString(log_result).value();
auto field_request_header_continue = json_log->getString("field_request_header_cr");
EXPECT_TRUE(field_request_header_continue.ok());
EXPECT_EQ(*field_request_header_continue, "1");
auto field_response_header_continue = json_log->getString("field_response_header_cr");
EXPECT_TRUE(field_response_header_continue.ok());
EXPECT_EQ(*field_response_header_continue, "0");
auto field_immediate_response = json_log->getString("field_immeidate_response");
EXPECT_EQ(*field_immediate_response, "0");

cleanupUpstreamAndDownstream();
}

// Test immediate_response behavior with STREAMED request body. Even though the
// headers have been processed, an immediate response on a request body chunk
// should still be seen by the downstream.
TEST_P(ExtProcIntegrationTest, ExtProcLoggingInfoImmediateResponse) {
auto access_log_path = TestEnvironment::temporaryPath(TestUtility::uniqueFilename());

config_helper_.addConfigModifier([&](HttpConnectionManager& cm) {
auto* access_log = cm.add_access_log();
access_log->set_name("accesslog");
envoy::extensions::access_loggers::file::v3::FileAccessLog access_log_config;
access_log_config.set_path(access_log_path);
auto* json_format = access_log_config.mutable_log_format()->mutable_json_format();

// Test field extraction for coverage.
(*json_format->mutable_fields())["field_immeidate_response"].set_string_value(
"%FILTER_STATE(envoy.filters.http.ext_proc:FIELD:immediate_response)%");

access_log->mutable_typed_config()->PackFrom(access_log_config);
});
proto_config_.mutable_processing_mode()->set_request_body_mode(ProcessingMode::STREAMED);
ConfigOptions config_options;
config_options.add_response_processor = true;
initializeConfig(config_options);
HttpIntegrationTest::initialize();
auto response = sendDownstreamRequestWithBody("Evil content!", absl::nullopt);
processRequestHeadersMessage(
*grpc_upstreams_[0], true, [](const HttpHeaders&, HeadersResponse& resp) {
auto* hdr = resp.mutable_response()->mutable_header_mutation()->add_set_headers();
hdr->mutable_append()->set_value(false);
hdr->mutable_header()->set_key("foo");
hdr->mutable_header()->set_raw_value("bar");
return true;
});
processAndRespondImmediately(*grpc_upstreams_[0], false, [](ImmediateResponse& immediate) {
immediate.mutable_status()->set_code(envoy::type::v3::StatusCode::BadRequest);
immediate.set_body("{\"reason\": \"Request too evil\"}");
immediate.set_details("Failed because I don't like this payload");
});
// ext_proc will immediately close side stream in this case, which causes it
// to be reset, since side stream codec had not yet observed server trailers.
EXPECT_TRUE(processor_stream_->waitForReset());

verifyDownstreamResponse(*response, 400);
std::string log_result = waitForAccessLog(access_log_path, 0, true);
auto json_log = Json::Factory::loadFromString(log_result).value();
auto field_immediate_response = json_log->getString("field_immeidate_response");
EXPECT_EQ(*field_immediate_response, "1");
cleanupUpstreamAndDownstream();
}

} // namespace ExternalProcessing
} // namespace HttpFilters
} // namespace Extensions
Expand Down