Skip to content

Commit 25aedb7

Browse files
authored
http2: fix a bug where memory will leak if stream is reset before send headers (#41260)
To close #40253. Risk Level: mid. core http2 code change. Testing: unit. Docs Changes: n/a. Release Notes: added. --------- Signed-off-by: WangBaiping <[email protected]> Signed-off-by: code <[email protected]>
1 parent 83b7962 commit 25aedb7

File tree

7 files changed

+90
-7
lines changed

7 files changed

+90
-7
lines changed

changelogs/current.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,11 @@ minor_behavior_changes:
2424
2525
bug_fixes:
2626
# *Changes expected to improve the state of the world and are unlikely to have negative effects*
27+
- area: http2
28+
change: |
29+
Fixed a bug where Envoy will leak memory if the HTTP2 stream is reset before the request headers are
30+
sent. For example, if one upstream http filter send a local reply after the connection is established but
31+
before the request headers are sent, the memory allocated for the stream will not be released.
2732
- area: lua
2833
change: |
2934
Fix a bug where Lua filters may result in Envoy crashes when setting response body to a

source/common/http/http2/codec_impl.cc

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -408,10 +408,8 @@ void ConnectionImpl::StreamImpl::processBufferedData() {
408408
ENVOY_CONN_LOG(debug, "invoking onStreamClose for stream: {} via processBufferedData",
409409
parent_.connection_, stream_id_);
410410
// We only buffer the onStreamClose if we had no errors.
411-
if (Status status = parent_.onStreamClose(this, 0); !status.ok()) {
412-
ENVOY_CONN_LOG(debug, "error invoking onStreamClose: {}", parent_.connection_,
413-
status.message()); // LCOV_EXCL_LINE
414-
}
411+
Status status = parent_.onStreamClose(this, 0);
412+
ASSERT(status.ok());
415413
}
416414
}
417415

@@ -810,6 +808,11 @@ void ConnectionImpl::StreamImpl::resetStream(StreamResetReason reason) {
810808
void ConnectionImpl::StreamImpl::resetStreamWorker(StreamResetReason reason) {
811809
if (stream_id_ == -1) {
812810
// Handle the case where client streams are reset before headers are created.
811+
// For example, if we send local reply after the stream is created but before
812+
// headers are sent, we will end up here.
813+
ENVOY_CONN_LOG(trace, "Stream {} reset before headers sent.", parent_.connection_, stream_id_);
814+
Status status = parent_.onStreamClose(this, 0);
815+
ASSERT(status.ok());
813816
return;
814817
}
815818
if (codec_callbacks_) {
@@ -1883,6 +1886,7 @@ void ConnectionImpl::Http2Visitor::OnRstStream(Http2StreamId stream_id, Http2Err
18831886
bool ConnectionImpl::Http2Visitor::OnCloseStream(Http2StreamId stream_id,
18841887
Http2ErrorCode error_code) {
18851888
Status status = connection_->onStreamClose(stream_id, static_cast<uint32_t>(error_code));
1889+
ASSERT(status.ok());
18861890
if (stream_close_listener_) {
18871891
ENVOY_CONN_LOG(trace, "Http2Visitor invoking stream close listener for stream {}",
18881892
connection_->connection_, stream_id);

source/common/http/http2/codec_impl.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -800,7 +800,7 @@ class ConnectionImpl : public virtual Connection,
800800
bool raised_goaway_ : 1;
801801
Event::SchedulableCallbackPtr protocol_constraint_violation_callback_;
802802
Random::RandomGenerator& random_;
803-
MonotonicTime last_received_data_time_{};
803+
MonotonicTime last_received_data_time_;
804804
Event::TimerPtr keepalive_send_timer_;
805805
Event::TimerPtr keepalive_timeout_timer_;
806806
std::chrono::milliseconds keepalive_interval_;

test/common/http/http2/codec_impl_test.cc

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -252,6 +252,7 @@ class Http2CodecImplTestFixture {
252252
driveToCompletion();
253253

254254
EXPECT_CALL(server_callbacks_, newStream(_, _))
255+
.Times(AnyNumber())
255256
.WillRepeatedly(Invoke([&](ResponseEncoder& encoder, bool) -> RequestDecoder& {
256257
response_encoder_ = &encoder;
257258
encoder.getStream().addCallbacks(server_stream_callbacks_);
@@ -1009,6 +1010,14 @@ TEST_P(Http2CodecImplTest, RefusedStreamReset) {
10091010
driveToCompletion();
10101011
}
10111012

1013+
TEST_P(Http2CodecImplTest, ResetBeforeHeadersSent) {
1014+
initialize();
1015+
1016+
EXPECT_EQ(1, TestUtility::findGauge(client_stats_store_, "http2.streams_active")->value());
1017+
request_encoder_->getStream().resetStream(StreamResetReason::LocalReset);
1018+
EXPECT_EQ(0, TestUtility::findGauge(client_stats_store_, "http2.streams_active")->value());
1019+
}
1020+
10121021
TEST_P(Http2CodecImplTest, InvalidHeadersFrameMissing) {
10131022
initialize();
10141023
#ifdef ENVOY_ENABLE_UHV

test/integration/BUILD

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1023,6 +1023,7 @@ envoy_cc_test(
10231023
"//source/common/http:header_map_lib",
10241024
"//source/extensions/filters/http/buffer:config",
10251025
"//test/integration/filters:encoder_decoder_buffer_filter_lib",
1026+
"//test/integration/filters:local_reply_during_decoding_filter_lib",
10261027
"//test/integration/filters:random_pause_filter_lib",
10271028
"//test/test_common:utility_lib",
10281029
"@envoy_api//envoy/config/bootstrap/v3:pkg_cc_proto",

test/integration/filters/local_reply_during_decoding_filter.cc

Lines changed: 35 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,11 +11,42 @@
1111

1212
namespace Envoy {
1313

14-
class LocalReplyDuringDecode : public Http::PassThroughFilter {
14+
class LocalReplyDuringDecode : public Http::PassThroughFilter, public Http::UpstreamCallbacks {
1515
public:
1616
constexpr static char name[] = "local-reply-during-decode";
1717

18-
Http::FilterHeadersStatus decodeHeaders(Http::RequestHeaderMap& request_headers, bool) override {
18+
void setDecoderFilterCallbacks(Http::StreamDecoderFilterCallbacks& callbacks) override {
19+
decoder_callbacks_ = &callbacks;
20+
if (auto cb = decoder_callbacks_->upstreamCallbacks(); cb) {
21+
cb->addUpstreamCallbacks(*this);
22+
}
23+
}
24+
25+
void onUpstreamConnectionEstablished() override {
26+
if (latched_end_stream_.has_value()) {
27+
const bool end_stream = *latched_end_stream_;
28+
latched_end_stream_.reset();
29+
Http::FilterHeadersStatus status = decodeHeaders(*request_headers_, end_stream);
30+
if (status == Http::FilterHeadersStatus::Continue) {
31+
decoder_callbacks_->continueDecoding();
32+
}
33+
}
34+
}
35+
36+
Http::FilterHeadersStatus decodeHeaders(Http::RequestHeaderMap& request_headers,
37+
bool end_stream) override {
38+
// If this filter is being used as an upstream filter and the upstream connection is not yet
39+
// established, check for the "wait-upstream-connection" header to determine whether to wait
40+
// for the connection to be established before continuing decoding.
41+
if (auto cb = decoder_callbacks_->upstreamCallbacks(); cb && !cb->upstream()) {
42+
auto result = request_headers.get(Http::LowerCaseString("wait-upstream-connection"));
43+
if (!result.empty() && result[0]->value() == "true") {
44+
request_headers_ = &request_headers;
45+
latched_end_stream_ = end_stream;
46+
return Http::FilterHeadersStatus::StopAllIterationAndBuffer;
47+
}
48+
}
49+
1950
auto result = request_headers.get(Http::LowerCaseString("skip-local-reply"));
2051
if (!result.empty() && result[0]->value() == "true") {
2152
header_local_reply_skipped_ = true;
@@ -47,6 +78,8 @@ class LocalReplyDuringDecode : public Http::PassThroughFilter {
4778
}
4879

4980
private:
81+
Http::RequestHeaderMap* request_headers_{};
82+
absl::optional<bool> latched_end_stream_;
5083
bool header_local_reply_skipped_ = false;
5184
bool local_reply_during_data_ = false;
5285
};

test/integration/multiplexed_upstream_integration_test.cc

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -652,6 +652,37 @@ TEST_P(MultiplexedUpstreamIntegrationTest, MultipleRequestsLowStreamLimit) {
652652
cleanupUpstreamAndDownstream();
653653
}
654654

655+
TEST_P(MultiplexedUpstreamIntegrationTest, UpstreamFilterSendLocalReply) {
656+
if (upstreamProtocol() != Http::CodecType::HTTP2) {
657+
return;
658+
}
659+
autonomous_upstream_ = true;
660+
envoy::config::core::v3::Http2ProtocolOptions config;
661+
config.mutable_max_concurrent_streams()->set_value(20000);
662+
mergeOptions(config);
663+
config_helper_.prependFilter(fmt::format(R"EOF(
664+
name: local-reply-during-decode
665+
)EOF"),
666+
false);
667+
668+
initialize();
669+
codec_client_ = makeHttpConnection(lookupPort("http"));
670+
671+
// Start sending the request, but ensure no end stream will be sent, so the
672+
// stream will stay in use.
673+
auto response = codec_client_->makeHeaderOnlyRequest(
674+
Http::TestRequestHeaderMapImpl{{":method", "POST"},
675+
{":path", "/test/long/url"},
676+
{":scheme", "http"},
677+
{":authority", "sni.lyft.com"},
678+
{"wait-upstream-connection", "true"}});
679+
// Wait until the response is sent to ensure the SETTINGS frame has been read
680+
// by Envoy.
681+
ASSERT_TRUE(response->waitForEndStream());
682+
683+
EXPECT_EQ(0, test_server_->gauge("cluster.cluster_0.http2.streams_active")->value());
684+
}
685+
655686
// Regression test for https://github.com/envoyproxy/envoy/issues/13933
656687
TEST_P(MultiplexedUpstreamIntegrationTest, UpstreamGoaway) {
657688
initialize();

0 commit comments

Comments
 (0)