Skip to content

Commit

Permalink
fix: async reply for expired requests
Browse files Browse the repository at this point in the history
prior to this fix, callback style async requests are not replied
if a request is expired.
fixed to always evaluate the callback, but do not send reply for
gen_server calls.
  • Loading branch information
zmstone committed Jan 13, 2025
1 parent eafe388 commit fa2c4a2
Showing 1 changed file with 48 additions and 57 deletions.
105 changes: 48 additions & 57 deletions src/ehttpc.erl
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,9 @@
(is_binary(element(3, REQ)) orelse is_list(element(3, REQ))))
).


-define(EXPIRED(Req), {expired, Req}).

-record(state, {
pool :: term(),
id :: pos_integer(),
Expand Down Expand Up @@ -327,12 +330,12 @@ do_handle_info(
{gun_response, Client, StreamRef, IsFin, StatusCode, Headers},
#state{client = Client} = State
) ->
handle_gun_reply(State, Client, StreamRef, IsFin, StatusCode, Headers, ?undef);
handle_gun_reply(State, StreamRef, IsFin, StatusCode, Headers, ?undef);
do_handle_info(
{gun_data, Client, StreamRef, IsFin, Data},
#state{client = Client} = State
) ->
handle_gun_reply(State, Client, StreamRef, IsFin, ?undef, ?undef, Data);
handle_gun_reply(State, StreamRef, IsFin, ?undef, ?undef, Data);
do_handle_info(
{gun_error, Client, StreamRef, Reason},
State = #state{client = Client, requests = Requests}
Expand All @@ -344,7 +347,8 @@ do_handle_info(
% e.g. after the stream has been closed by gun, if we send a cancel stream
% gun will reply with Reason={badstate,"The stream cannot be found."}
State;
{expired, NRequests} ->
{?EXPIRED(?SENT_REQ(ReplyTo, _, _)), NRequests} ->
reply(ReplyTo, {error, expired}),
State#state{requests = NRequests};
{?SENT_REQ(ReplyTo, _, _), NRequests} ->
reply(ReplyTo, {error, Reason}),
Expand Down Expand Up @@ -418,7 +422,8 @@ handle_gun_down(#state{requests = Requests} = State, KilledStreams, Reason) ->
case take_sent_req(StreamRef, Acc) of
error ->
Acc;
{expired, NAcc} ->
{?EXPIRED(?SENT_REQ(ReplyTo, _, _)), NAcc} ->
reply(ReplyTo, {error, expired}),
NAcc;
{?SENT_REQ(ReplyTo, _, _), NAcc} ->
reply(ReplyTo, {error, Reason}),
Expand Down Expand Up @@ -556,22 +561,6 @@ mk_reqopts(undefined) ->
mk_reqopts(TunnelRef) ->
#{tunnel => TunnelRef}.

cancel_stream(fin, _Client, _StreamRef) ->
%% nothing to cancel anyway
%% otherwise gun will reply with a gun_error messsage
%% which is then discarded anyway
ok;
cancel_stream(nofin, Client, StreamRef) ->
%% this is just an async message sent to gun
%% the gun stream process does not really cancel
%% anything, but just mark the receiving process (i.e. self())
%% as inactive, however, there could be messages already
%% delivered to self()'s mailbox
%% or the stream process might send more messages
%% before receiving the cancel message.
_ = gun:cancel(Client, StreamRef),
ok.

timeout(infinity = _ExpireAt) ->
infinity;
timeout(ExpireAt) ->
Expand Down Expand Up @@ -620,7 +609,7 @@ take_sent_req(StreamRef, #{sent := Sent, max_sent_expire := T} = Requests) ->
end,
case is_sent_req_expired(Req, now_()) of
true ->
{expired, Requests#{sent := NewSent, max_sent_expire := NewT}};
{?EXPIRED(Req), Requests#{sent := NewSent, max_sent_expire := NewT}};
false ->
{Req, Requests#{sent := NewSent, max_sent_expire := NewT}}
end
Expand Down Expand Up @@ -937,54 +926,56 @@ gun_await_tunnel(Pid, StreamRef, ExpireAt, Timeout, Headers, State0) ->
end.

%% normal handling of gun_response and gun_data reply
handle_gun_reply(State, Client, StreamRef, IsFin, StatusCode, Headers, Data) ->
handle_gun_reply(State, StreamRef, IsFin, StatusCode, Headers, Data) ->
#state{requests = Requests} = State,
case take_sent_req(StreamRef, Requests) of
error ->
%% Received 'gun_data' message from unknown stream
%% this may happen when the async cancel stream is sent too late
State;
{expired, NRequests} ->
%% the call is expired, caller is no longer waiting for a reply
ok = cancel_stream(IsFin, Client, StreamRef),
State#state{requests = NRequests};
{?SENT_REQ(ReplyTo, ExpireAt, ?undef), NRequests} ->
%% gun_response, http head

%% assert, no body yet
?undef = Data,
case IsFin of
fin ->
%% only http heads no body
reply(ReplyTo, {ok, StatusCode, Headers}),
State#state{requests = NRequests};
nofin ->
%% start accumulating data
Req = ?SENT_REQ(ReplyTo, ExpireAt, {StatusCode, Headers, []}),
State#state{requests = put_sent_req(StreamRef, Req, NRequests)}
end;
{?SENT_REQ(ReplyTo, ExpireAt, {StatusCode0, Headers0, Data0}), NRequests} ->
%% gun_data, http body
{?EXPIRED(?SENT_REQ(_, _, _) = SentReq), NRequests} ->
handle_known_gun_reply(SentReq, NRequests, State, StreamRef, IsFin, StatusCode, Headers, Data);
{?SENT_REQ(_, _, _) = SentReq, NRequests} ->
handle_known_gun_reply(SentReq, NRequests, State, StreamRef, IsFin, StatusCode, Headers, Data)
end.

%% assert
?undef = StatusCode,
%% assert
?undef = Headers,
case IsFin of
fin ->
reply(
ReplyTo, {ok, StatusCode0, Headers0, iolist_to_binary([Data0, Data])}
),
State#state{requests = NRequests};
nofin ->
Req = ?SENT_REQ(ReplyTo, ExpireAt, {StatusCode0, Headers0, [Data0, Data]}),
State#state{requests = put_sent_req(StreamRef, Req, NRequests)}
end
handle_known_gun_reply(?SENT_REQ(ReplyTo, ExpireAt, ?undef), NRequests, State, StreamRef, IsFin, StatusCode, Headers, Data) ->
%% gun_response, http head
%% assert, no body yet
?undef = Data,
case IsFin of
fin ->
%% only http heads no body
reply(ReplyTo, {ok, StatusCode, Headers}),
State#state{requests = NRequests};
nofin ->
%% start accumulating data
Req = ?SENT_REQ(ReplyTo, ExpireAt, {StatusCode, Headers, []}),
State#state{requests = put_sent_req(StreamRef, Req, NRequests)}
end;
handle_known_gun_reply(?SENT_REQ(ReplyTo, ExpireAt, {StatusCode0, Headers0, Data0}), NRequests, State, StreamRef, IsFin, StatusCode, Headers, Data) ->
%% gun_data, http body
%% assert
?undef = StatusCode,
%% assert
?undef = Headers,
case IsFin of
fin ->
reply(
ReplyTo, {ok, StatusCode0, Headers0, iolist_to_binary([Data0, Data])}
),
State#state{requests = NRequests};
nofin ->
Req = ?SENT_REQ(ReplyTo, ExpireAt, {StatusCode0, Headers0, [Data0, Data]}),
State#state{requests = put_sent_req(StreamRef, Req, NRequests)}
end.

reply({F, A}, Result) when is_function(F) ->
_ = erlang:apply(F, A ++ [Result]),
ok;
reply(_From, {error, expired}) ->
%% the caller should get {error, timeout} return from request/5
ok;
reply(From, Result) ->
gen_server:reply(From, Result).

Expand Down

0 comments on commit fa2c4a2

Please sign in to comment.