From fa2c4a2895d57f374d263f07fd2431609947f859 Mon Sep 17 00:00:00 2001 From: zmstone Date: Mon, 13 Jan 2025 09:38:21 +0100 Subject: [PATCH] fix: async reply for expired requests prior to this fix, callback style async requests are not replied if a request is expired. fixed to always evaluate the callback, but do not send reply for gen_server calls. --- src/ehttpc.erl | 105 ++++++++++++++++++++++--------------------------- 1 file changed, 48 insertions(+), 57 deletions(-) diff --git a/src/ehttpc.erl b/src/ehttpc.erl index 7754703..c79b98e 100644 --- a/src/ehttpc.erl +++ b/src/ehttpc.erl @@ -87,6 +87,9 @@ (is_binary(element(3, REQ)) orelse is_list(element(3, REQ)))) ). + +-define(EXPIRED(Req), {expired, Req}). + -record(state, { pool :: term(), id :: pos_integer(), @@ -327,12 +330,12 @@ do_handle_info( {gun_response, Client, StreamRef, IsFin, StatusCode, Headers}, #state{client = Client} = State ) -> - handle_gun_reply(State, Client, StreamRef, IsFin, StatusCode, Headers, ?undef); + handle_gun_reply(State, StreamRef, IsFin, StatusCode, Headers, ?undef); do_handle_info( {gun_data, Client, StreamRef, IsFin, Data}, #state{client = Client} = State ) -> - handle_gun_reply(State, Client, StreamRef, IsFin, ?undef, ?undef, Data); + handle_gun_reply(State, StreamRef, IsFin, ?undef, ?undef, Data); do_handle_info( {gun_error, Client, StreamRef, Reason}, State = #state{client = Client, requests = Requests} @@ -344,7 +347,8 @@ do_handle_info( % e.g. after the stream has been closed by gun, if we send a cancel stream % gun will reply with Reason={badstate,"The stream cannot be found."} State; - {expired, NRequests} -> + {?EXPIRED(?SENT_REQ(ReplyTo, _, _)), NRequests} -> + reply(ReplyTo, {error, expired}), State#state{requests = NRequests}; {?SENT_REQ(ReplyTo, _, _), NRequests} -> reply(ReplyTo, {error, Reason}), @@ -418,7 +422,8 @@ handle_gun_down(#state{requests = Requests} = State, KilledStreams, Reason) -> case take_sent_req(StreamRef, Acc) of error -> Acc; - {expired, NAcc} -> + {?EXPIRED(?SENT_REQ(ReplyTo, _, _)), NAcc} -> + reply(ReplyTo, {error, expired}), NAcc; {?SENT_REQ(ReplyTo, _, _), NAcc} -> reply(ReplyTo, {error, Reason}), @@ -556,22 +561,6 @@ mk_reqopts(undefined) -> mk_reqopts(TunnelRef) -> #{tunnel => TunnelRef}. -cancel_stream(fin, _Client, _StreamRef) -> - %% nothing to cancel anyway - %% otherwise gun will reply with a gun_error messsage - %% which is then discarded anyway - ok; -cancel_stream(nofin, Client, StreamRef) -> - %% this is just an async message sent to gun - %% the gun stream process does not really cancel - %% anything, but just mark the receiving process (i.e. self()) - %% as inactive, however, there could be messages already - %% delivered to self()'s mailbox - %% or the stream process might send more messages - %% before receiving the cancel message. - _ = gun:cancel(Client, StreamRef), - ok. - timeout(infinity = _ExpireAt) -> infinity; timeout(ExpireAt) -> @@ -620,7 +609,7 @@ take_sent_req(StreamRef, #{sent := Sent, max_sent_expire := T} = Requests) -> end, case is_sent_req_expired(Req, now_()) of true -> - {expired, Requests#{sent := NewSent, max_sent_expire := NewT}}; + {?EXPIRED(Req), Requests#{sent := NewSent, max_sent_expire := NewT}}; false -> {Req, Requests#{sent := NewSent, max_sent_expire := NewT}} end @@ -937,54 +926,56 @@ gun_await_tunnel(Pid, StreamRef, ExpireAt, Timeout, Headers, State0) -> end. %% normal handling of gun_response and gun_data reply -handle_gun_reply(State, Client, StreamRef, IsFin, StatusCode, Headers, Data) -> +handle_gun_reply(State, StreamRef, IsFin, StatusCode, Headers, Data) -> #state{requests = Requests} = State, case take_sent_req(StreamRef, Requests) of error -> %% Received 'gun_data' message from unknown stream %% this may happen when the async cancel stream is sent too late State; - {expired, NRequests} -> - %% the call is expired, caller is no longer waiting for a reply - ok = cancel_stream(IsFin, Client, StreamRef), - State#state{requests = NRequests}; - {?SENT_REQ(ReplyTo, ExpireAt, ?undef), NRequests} -> - %% gun_response, http head - - %% assert, no body yet - ?undef = Data, - case IsFin of - fin -> - %% only http heads no body - reply(ReplyTo, {ok, StatusCode, Headers}), - State#state{requests = NRequests}; - nofin -> - %% start accumulating data - Req = ?SENT_REQ(ReplyTo, ExpireAt, {StatusCode, Headers, []}), - State#state{requests = put_sent_req(StreamRef, Req, NRequests)} - end; - {?SENT_REQ(ReplyTo, ExpireAt, {StatusCode0, Headers0, Data0}), NRequests} -> - %% gun_data, http body + {?EXPIRED(?SENT_REQ(_, _, _) = SentReq), NRequests} -> + handle_known_gun_reply(SentReq, NRequests, State, StreamRef, IsFin, StatusCode, Headers, Data); + {?SENT_REQ(_, _, _) = SentReq, NRequests} -> + handle_known_gun_reply(SentReq, NRequests, State, StreamRef, IsFin, StatusCode, Headers, Data) + end. - %% assert - ?undef = StatusCode, - %% assert - ?undef = Headers, - case IsFin of - fin -> - reply( - ReplyTo, {ok, StatusCode0, Headers0, iolist_to_binary([Data0, Data])} - ), - State#state{requests = NRequests}; - nofin -> - Req = ?SENT_REQ(ReplyTo, ExpireAt, {StatusCode0, Headers0, [Data0, Data]}), - State#state{requests = put_sent_req(StreamRef, Req, NRequests)} - end +handle_known_gun_reply(?SENT_REQ(ReplyTo, ExpireAt, ?undef), NRequests, State, StreamRef, IsFin, StatusCode, Headers, Data) -> + %% gun_response, http head + %% assert, no body yet + ?undef = Data, + case IsFin of + fin -> + %% only http heads no body + reply(ReplyTo, {ok, StatusCode, Headers}), + State#state{requests = NRequests}; + nofin -> + %% start accumulating data + Req = ?SENT_REQ(ReplyTo, ExpireAt, {StatusCode, Headers, []}), + State#state{requests = put_sent_req(StreamRef, Req, NRequests)} + end; +handle_known_gun_reply(?SENT_REQ(ReplyTo, ExpireAt, {StatusCode0, Headers0, Data0}), NRequests, State, StreamRef, IsFin, StatusCode, Headers, Data) -> + %% gun_data, http body + %% assert + ?undef = StatusCode, + %% assert + ?undef = Headers, + case IsFin of + fin -> + reply( + ReplyTo, {ok, StatusCode0, Headers0, iolist_to_binary([Data0, Data])} + ), + State#state{requests = NRequests}; + nofin -> + Req = ?SENT_REQ(ReplyTo, ExpireAt, {StatusCode0, Headers0, [Data0, Data]}), + State#state{requests = put_sent_req(StreamRef, Req, NRequests)} end. reply({F, A}, Result) when is_function(F) -> _ = erlang:apply(F, A ++ [Result]), ok; +reply(_From, {error, expired}) -> + %% the caller should get {error, timeout} return from request/5 + ok; reply(From, Result) -> gen_server:reply(From, Result).