From 909d307e24e72c140de1dedeed8a5df2be0454f8 Mon Sep 17 00:00:00 2001 From: zmstone Date: Mon, 13 Jan 2025 09:38:21 +0100 Subject: [PATCH] fix: async reply for expired requests prior to this fix, callback style async requests are not replied if a request is expired. fixed to always evaluate the callback, but do not send reply for gen_server calls. --- changelog.md | 4 ++ src/ehttpc.erl | 103 +++++++++++++++++++++++++++++++------------------ 2 files changed, 70 insertions(+), 37 deletions(-) diff --git a/changelog.md b/changelog.md index 2f328ce..24131a3 100644 --- a/changelog.md +++ b/changelog.md @@ -1,5 +1,9 @@ # ehttpc changes +## 0.7.1 + +- Reply error to async callbacks when request expired. + ## 0.7.0 - Switch to `gun` 2.1.0. diff --git a/src/ehttpc.erl b/src/ehttpc.erl index 7754703..d227e61 100644 --- a/src/ehttpc.erl +++ b/src/ehttpc.erl @@ -87,6 +87,8 @@ (is_binary(element(3, REQ)) orelse is_list(element(3, REQ)))) ). +-define(EXPIRED(Req), {expired, Req}). + -record(state, { pool :: term(), id :: pos_integer(), @@ -344,7 +346,8 @@ do_handle_info( % e.g. after the stream has been closed by gun, if we send a cancel stream % gun will reply with Reason={badstate,"The stream cannot be found."} State; - {expired, NRequests} -> + {?EXPIRED(?SENT_REQ(ReplyTo, _, _)), NRequests} -> + reply(ReplyTo, {error, expired}), State#state{requests = NRequests}; {?SENT_REQ(ReplyTo, _, _), NRequests} -> reply(ReplyTo, {error, Reason}), @@ -418,7 +421,8 @@ handle_gun_down(#state{requests = Requests} = State, KilledStreams, Reason) -> case take_sent_req(StreamRef, Acc) of error -> Acc; - {expired, NAcc} -> + {?EXPIRED(?SENT_REQ(ReplyTo, _, _)), NAcc} -> + reply(ReplyTo, {error, expired}), NAcc; {?SENT_REQ(ReplyTo, _, _), NAcc} -> reply(ReplyTo, {error, Reason}), @@ -620,7 +624,7 @@ take_sent_req(StreamRef, #{sent := Sent, max_sent_expire := T} = Requests) -> end, case is_sent_req_expired(Req, now_()) of true -> - {expired, Requests#{sent := NewSent, max_sent_expire := NewT}}; + {?EXPIRED(Req), Requests#{sent := NewSent, max_sent_expire := NewT}}; false -> {Req, Requests#{sent := NewSent, max_sent_expire := NewT}} end @@ -944,47 +948,72 @@ handle_gun_reply(State, Client, StreamRef, IsFin, StatusCode, Headers, Data) -> %% Received 'gun_data' message from unknown stream %% this may happen when the async cancel stream is sent too late State; - {expired, NRequests} -> - %% the call is expired, caller is no longer waiting for a reply + {?EXPIRED(?SENT_REQ(_, _, _) = SentReq), NRequests} -> ok = cancel_stream(IsFin, Client, StreamRef), - State#state{requests = NRequests}; - {?SENT_REQ(ReplyTo, ExpireAt, ?undef), NRequests} -> - %% gun_response, http head - - %% assert, no body yet - ?undef = Data, - case IsFin of - fin -> - %% only http heads no body - reply(ReplyTo, {ok, StatusCode, Headers}), - State#state{requests = NRequests}; - nofin -> - %% start accumulating data - Req = ?SENT_REQ(ReplyTo, ExpireAt, {StatusCode, Headers, []}), - State#state{requests = put_sent_req(StreamRef, Req, NRequests)} - end; - {?SENT_REQ(ReplyTo, ExpireAt, {StatusCode0, Headers0, Data0}), NRequests} -> - %% gun_data, http body + handle_known_gun_reply( + SentReq, NRequests, State, StreamRef, IsFin, StatusCode, Headers, Data + ); + {?SENT_REQ(_, _, _) = SentReq, NRequests} -> + handle_known_gun_reply( + SentReq, NRequests, State, StreamRef, IsFin, StatusCode, Headers, Data + ) + end. - %% assert - ?undef = StatusCode, - %% assert - ?undef = Headers, - case IsFin of - fin -> - reply( - ReplyTo, {ok, StatusCode0, Headers0, iolist_to_binary([Data0, Data])} - ), - State#state{requests = NRequests}; - nofin -> - Req = ?SENT_REQ(ReplyTo, ExpireAt, {StatusCode0, Headers0, [Data0, Data]}), - State#state{requests = put_sent_req(StreamRef, Req, NRequests)} - end +handle_known_gun_reply( + ?SENT_REQ(ReplyTo, ExpireAt, ?undef), + NRequests, + State, + StreamRef, + IsFin, + StatusCode, + Headers, + Data +) -> + %% gun_response, http head + %% assert, no body yet + ?undef = Data, + case IsFin of + fin -> + %% only http heads no body + reply(ReplyTo, {ok, StatusCode, Headers}), + State#state{requests = NRequests}; + nofin -> + %% start accumulating data + Req = ?SENT_REQ(ReplyTo, ExpireAt, {StatusCode, Headers, []}), + State#state{requests = put_sent_req(StreamRef, Req, NRequests)} + end; +handle_known_gun_reply( + ?SENT_REQ(ReplyTo, ExpireAt, {StatusCode0, Headers0, Data0}), + NRequests, + State, + StreamRef, + IsFin, + StatusCode, + Headers, + Data +) -> + %% gun_data, http body + %% assert + ?undef = StatusCode, + %% assert + ?undef = Headers, + case IsFin of + fin -> + reply( + ReplyTo, {ok, StatusCode0, Headers0, iolist_to_binary([Data0, Data])} + ), + State#state{requests = NRequests}; + nofin -> + Req = ?SENT_REQ(ReplyTo, ExpireAt, {StatusCode0, Headers0, [Data0, Data]}), + State#state{requests = put_sent_req(StreamRef, Req, NRequests)} end. reply({F, A}, Result) when is_function(F) -> _ = erlang:apply(F, A ++ [Result]), ok; +reply(_From, {error, expired}) -> + %% the caller should get {error, timeout} return from request/5 + ok; reply(From, Result) -> gen_server:reply(From, Result).