Skip to content

Commit

Permalink
fix: async reply for expired requests
Browse files Browse the repository at this point in the history
prior to this fix, callback style async requests are not replied
if a request is expired.
fixed to always evaluate the callback, but do not send reply for
gen_server calls.
  • Loading branch information
zmstone committed Jan 16, 2025
1 parent eafe388 commit 909d307
Show file tree
Hide file tree
Showing 2 changed files with 70 additions and 37 deletions.
4 changes: 4 additions & 0 deletions changelog.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# ehttpc changes

## 0.7.1

- Reply error to async callbacks when request expired.

## 0.7.0

- Switch to `gun` 2.1.0.
Expand Down
103 changes: 66 additions & 37 deletions src/ehttpc.erl
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,8 @@
(is_binary(element(3, REQ)) orelse is_list(element(3, REQ))))
).

-define(EXPIRED(Req), {expired, Req}).

-record(state, {
pool :: term(),
id :: pos_integer(),
Expand Down Expand Up @@ -344,7 +346,8 @@ do_handle_info(
% e.g. after the stream has been closed by gun, if we send a cancel stream
% gun will reply with Reason={badstate,"The stream cannot be found."}
State;
{expired, NRequests} ->
{?EXPIRED(?SENT_REQ(ReplyTo, _, _)), NRequests} ->
reply(ReplyTo, {error, expired}),
State#state{requests = NRequests};
{?SENT_REQ(ReplyTo, _, _), NRequests} ->
reply(ReplyTo, {error, Reason}),
Expand Down Expand Up @@ -418,7 +421,8 @@ handle_gun_down(#state{requests = Requests} = State, KilledStreams, Reason) ->
case take_sent_req(StreamRef, Acc) of
error ->
Acc;
{expired, NAcc} ->
{?EXPIRED(?SENT_REQ(ReplyTo, _, _)), NAcc} ->
reply(ReplyTo, {error, expired}),
NAcc;
{?SENT_REQ(ReplyTo, _, _), NAcc} ->
reply(ReplyTo, {error, Reason}),
Expand Down Expand Up @@ -620,7 +624,7 @@ take_sent_req(StreamRef, #{sent := Sent, max_sent_expire := T} = Requests) ->
end,
case is_sent_req_expired(Req, now_()) of
true ->
{expired, Requests#{sent := NewSent, max_sent_expire := NewT}};
{?EXPIRED(Req), Requests#{sent := NewSent, max_sent_expire := NewT}};
false ->
{Req, Requests#{sent := NewSent, max_sent_expire := NewT}}
end
Expand Down Expand Up @@ -944,47 +948,72 @@ handle_gun_reply(State, Client, StreamRef, IsFin, StatusCode, Headers, Data) ->
%% Received 'gun_data' message from unknown stream
%% this may happen when the async cancel stream is sent too late
State;
{expired, NRequests} ->
%% the call is expired, caller is no longer waiting for a reply
{?EXPIRED(?SENT_REQ(_, _, _) = SentReq), NRequests} ->
ok = cancel_stream(IsFin, Client, StreamRef),
State#state{requests = NRequests};
{?SENT_REQ(ReplyTo, ExpireAt, ?undef), NRequests} ->
%% gun_response, http head

%% assert, no body yet
?undef = Data,
case IsFin of
fin ->
%% only http heads no body
reply(ReplyTo, {ok, StatusCode, Headers}),
State#state{requests = NRequests};
nofin ->
%% start accumulating data
Req = ?SENT_REQ(ReplyTo, ExpireAt, {StatusCode, Headers, []}),
State#state{requests = put_sent_req(StreamRef, Req, NRequests)}
end;
{?SENT_REQ(ReplyTo, ExpireAt, {StatusCode0, Headers0, Data0}), NRequests} ->
%% gun_data, http body
handle_known_gun_reply(
SentReq, NRequests, State, StreamRef, IsFin, StatusCode, Headers, Data
);
{?SENT_REQ(_, _, _) = SentReq, NRequests} ->
handle_known_gun_reply(
SentReq, NRequests, State, StreamRef, IsFin, StatusCode, Headers, Data
)
end.

%% assert
?undef = StatusCode,
%% assert
?undef = Headers,
case IsFin of
fin ->
reply(
ReplyTo, {ok, StatusCode0, Headers0, iolist_to_binary([Data0, Data])}
),
State#state{requests = NRequests};
nofin ->
Req = ?SENT_REQ(ReplyTo, ExpireAt, {StatusCode0, Headers0, [Data0, Data]}),
State#state{requests = put_sent_req(StreamRef, Req, NRequests)}
end
handle_known_gun_reply(
?SENT_REQ(ReplyTo, ExpireAt, ?undef),
NRequests,
State,
StreamRef,
IsFin,
StatusCode,
Headers,
Data
) ->
%% gun_response, http head
%% assert, no body yet
?undef = Data,
case IsFin of
fin ->
%% only http heads no body
reply(ReplyTo, {ok, StatusCode, Headers}),
State#state{requests = NRequests};
nofin ->
%% start accumulating data
Req = ?SENT_REQ(ReplyTo, ExpireAt, {StatusCode, Headers, []}),
State#state{requests = put_sent_req(StreamRef, Req, NRequests)}
end;
handle_known_gun_reply(
?SENT_REQ(ReplyTo, ExpireAt, {StatusCode0, Headers0, Data0}),
NRequests,
State,
StreamRef,
IsFin,
StatusCode,
Headers,
Data
) ->
%% gun_data, http body
%% assert
?undef = StatusCode,
%% assert
?undef = Headers,
case IsFin of
fin ->
reply(
ReplyTo, {ok, StatusCode0, Headers0, iolist_to_binary([Data0, Data])}
),
State#state{requests = NRequests};
nofin ->
Req = ?SENT_REQ(ReplyTo, ExpireAt, {StatusCode0, Headers0, [Data0, Data]}),
State#state{requests = put_sent_req(StreamRef, Req, NRequests)}
end.

reply({F, A}, Result) when is_function(F) ->
_ = erlang:apply(F, A ++ [Result]),
ok;
reply(_From, {error, expired}) ->
%% the caller should get {error, timeout} return from request/5
ok;
reply(From, Result) ->
gen_server:reply(From, Result).

Expand Down

0 comments on commit 909d307

Please sign in to comment.