From 1a13c8f8148e1e94ac633e905fc62bca78a483bf Mon Sep 17 00:00:00 2001 From: Daniel Finke Date: Fri, 6 Sep 2024 03:00:44 +0000 Subject: [PATCH] Fix intermittent chunked response hang When streaming a chunked response, it is possible to cause a TCP receive hang under particular circumstances. If at one point the parser buffer doesn't have the whole chunk, at a later point the buffer ends up empty `<<>>`, and subsequently `hackney_response:stream_body/1` is called, `hackney_response:recv/2` will hang if the expected remaining size exceeds the remainder of the response. That expected size is actually stale, from the earlier point when the parser did not have the whole chunk. This issue slipped in with benoitc/hackney#710. This was identified when using https://github.com/benoitc/couchbeam and sending several chunked requests. If the last non-terminating chunk completed the response JSON object, `hackney_response:skip_body/1` is called to discard the remaining body, but is told to receive a number of bytes equal to the expected remaining size which will frequently exceed the small terminating chunk and trailers. As a result, the recv operation hangs waiting for bytes that will never arrive. Now, the transfer state (`BufSize`/`ExpectedSize`) are reset after each successful chunk. The speed benefit of benoitc/hackney#710 is retained (tested with the same approach as in that PR). - correct some related typespecs --- src/hackney_http.erl | 38 +++++++++++++++++++------------------ src/hackney_response.erl | 2 +- test/hackney_http_tests.erl | 32 +++++++++++++++++++++++++++++++ 3 files changed, 53 insertions(+), 19 deletions(-) diff --git a/src/hackney_http.erl b/src/hackney_http.erl index d37b5f93..2a73a947 100644 --- a/src/hackney_http.erl +++ b/src/hackney_http.erl @@ -400,33 +400,27 @@ parse_body(St) -> -spec transfer_decode(binary(), #hparser{}) - -> {ok, binary(), #hparser{}} | {done, binary()} | {error, atom()}. + -> body_result() | {more, binary()} | {error, atom()}. transfer_decode(Data, St=#hparser{ body_state={stream, TransferDecode, - TransferState, ContentDecode}, + TransferState, ContentDecode}=BodyState, buffer=Buf}) -> case TransferDecode(Data, TransferState) of {ok, Data2, TransferState2} -> content_decode(ContentDecode, Data2, - St#hparser{body_state= {stream, - TransferDecode, - TransferState2, - ContentDecode}}); + St#hparser{body_state=set_transfer_state(TransferState2, BodyState)}); {ok, Data2, Rest, TransferState2} -> content_decode(ContentDecode, Data2, St#hparser{buffer=Rest, - body_state={stream, - TransferDecode, - TransferState2, - ContentDecode}}); + body_state=set_transfer_state(TransferState2, BodyState)}); {chunk_done, Rest} -> parse_trailers(St#hparser{buffer=Rest, state=on_trailers, body_state=done}); {chunk_ok, Chunk, Rest} -> - {ok, Chunk, St#hparser{buffer=Rest}}; + {ok, Chunk, St#hparser{buffer=Rest, body_state=reset_chunked_transfer_state(BodyState)}}; more -> - {more, St#hparser{buffer=Data}, Buf}; + {more, St#hparser{buffer=Data, body_state=reset_chunked_transfer_state(BodyState)}, Buf}; {more, TransferState2} -> - {more, St#hparser{buffer=Data, body_state={stream, TransferDecode, TransferState2, ContentDecode}}, Buf}; + {more, St#hparser{buffer=Data, body_state=set_transfer_state(TransferState2, BodyState)}, Buf}; {done, Rest} -> {done, Rest}; {done, Data2, _Rest} -> @@ -450,14 +444,22 @@ content_decode(ContentDecode, Data, St) -> {error, Reason} -> {error, Reason} end. +set_transfer_state(TransferState, {stream, TransferDecode, _, ContentDecode}) -> + {stream, TransferDecode, TransferState, ContentDecode}. + +%% @doc Reset the transfer state (BufSize, ExpectedSize) of a chunked body +%% state. This must be done after each successful chunk. Otherwise, it is +%% possible to attempt a recv for more bytes than will be delivered by the end +%% of the response. +reset_chunked_transfer_state(BodyState={stream, _, _, _}) -> + set_transfer_state({0, 0}, BodyState). %% @doc Decode a stream of chunks. -spec te_chunked(binary(), any()) - -> more | {ok, binary(), {non_neg_integer(), non_neg_integer()}} - | {ok, binary(), binary(), {non_neg_integer(), non_neg_integer()}} - | {done, non_neg_integer(), binary()} | {error, badarg}. -te_chunked(<<>>, _) -> - done; + -> more + | {more, {non_neg_integer(), non_neg_integer()}} + | {chunk_ok, binary(), binary()} + | {chunk_done, binary()}. te_chunked(Data, _) -> case read_size(Data) of {ok, 0, Rest} -> diff --git a/src/hackney_response.erl b/src/hackney_response.erl index fde11bc0..413f2890 100644 --- a/src/hackney_response.erl +++ b/src/hackney_response.erl @@ -285,7 +285,7 @@ body(Client) -> body(MaxLength, Client) -> read_body(MaxLength, Client, <<>>). --spec skip_body(#client{}) -> {ok, #client{}} | {skip, #client{}} | {error, atom()}. +-spec skip_body(#client{}) -> {skip, #client{}} | {error, atom()}. skip_body(Client) -> case stream_body(Client) of {ok, _, Client2} -> skip_body(Client2); diff --git a/test/hackney_http_tests.erl b/test/hackney_http_tests.erl index 4b7e7315..ad188d39 100644 --- a/test/hackney_http_tests.erl +++ b/test/hackney_http_tests.erl @@ -66,3 +66,35 @@ parse_chunked_response_trailers_test() -> {_, P3} = hackney_http:execute(P2, <<"\r\n">>), {more, P4} = hackney_http:execute(P3, <<"0\r\nFoo: ">>), ?assertEqual({done, <<>>}, hackney_http:execute(P4, <<"Bar\r\n\r\n">>)). + + +%% Test that the transfer state of a chunked body state is properly reset. +%% Verify the fix for an edge case when calling `hackney_response:stream_body/1' +%% after receiving the last non-terminating chunk w/ specific buffer alignment. +reset_chunked_transfer_state_test() -> + P0 = hackney_http:parser([response]), + {_, _, _, _, P1} = hackney_http:execute(P0, <<"HTTP/1.1 200 OK\r\n">>), + {_, _, P2} = hackney_http:execute(P1, <<"Transfer-Encoding: chunked\r\n">>), + {_, P3} = hackney_http:execute(P2, <<"\r\n">>), + + %% Buffer doesn't have whole chunk, transfer state is set to {2, 16} + {more, P4, <<>>} = hackney_http:execute(P3, <<"10\r\naa">>), + + %% Chunk is read, transfer state should be reset to {0, 0} + {ok, <<"aaaaaaaaaaaaaaaa">>, P5} = hackney_http:execute(P4, <<"aaaaaaaaaaaaaa\r\n">>), + + %% Simulate what would happen if `hackney_response:stream_body/1' was called + %% (e.g. from `skip_body/1') + {more, #hparser{buffer = Buffer, + body_state = {stream, _, TransferState, _} + }, <<>>} = hackney_http:execute(P5), + + %% This edge case only cropped up when the buffer was empty at this stage + ?assertEqual(Buffer, <<>>), + + %% If not {0, 0}, the subsequent `Transport:recv/3' call from within + %% `hackney_response:recv/2' would attempt to receive additional bytes that + %% may not arrive and will hang until timeout. For this example, this is + %% because we are at the end of the response (aside from the terminating + %% chunk and an empty trailer). + ?assertEqual({0, 0}, TransferState).