diff --git a/changelog.md b/changelog.md index c801e77..16b2e68 100644 --- a/changelog.md +++ b/changelog.md @@ -1,5 +1,13 @@ # ehttpc changes +## 0.6.0 + +- Changed log format to be more structured, `host` and `port` are included in the log data fields. +- Add `max_inactive` duration option (default is `10_000` milliseconds). + This is to detect zombie connections especially when pipelining is set > 1. + With `{max_inactive, 10_000}` added to the `start_pool` option, + it will try to reconnect HTTP server up on detection of the last sent request had been expired for 10 seconds. + ## 0.5.0 - Dropped hot-upgrade support. diff --git a/src/ehttpc.erl b/src/ehttpc.erl index 6e3ea0e..71ec84a 100644 --- a/src/ehttpc.erl +++ b/src/ehttpc.erl @@ -15,6 +15,7 @@ %%-------------------------------------------------------------------- -module(ehttpc). +-feature(maybe_expr, enable). -behaviour(gen_server). @@ -67,7 +68,6 @@ -include_lib("eunit/include/eunit.hrl"). -endif. --define(LOG(Level, Format, Args), logger:Level("ehttpc: " ++ Format, Args)). -define(REQ(Method, Req, ExpireAt), {Method, Req, ExpireAt}). -define(PEND_REQ(ReplyTo, Req), {ReplyTo, Req}). -define(SENT_REQ(ReplyTo, ExpireAt, Acc), {ReplyTo, ExpireAt, Acc}). @@ -77,6 +77,7 @@ -define(GEN_CALL_REQ(From, Call), {'$gen_call', From, ?REQ(_, _, _) = Call}). -define(undef, undefined). -define(IS_POOL(Pool), (not is_tuple(Pool) andalso not is_pid(Pool))). +-define(DEFAULT_MAX_INACTIVE, 10_000). -record(state, { pool :: term(), @@ -90,7 +91,9 @@ gun_opts :: gun:opts(), gun_state :: down | up, requests :: map(), - proxy :: undefined | map() + proxy :: undefined | map(), + max_inactive :: pos_integer(), + inactive_check_tref :: reference() | ?undef }). -type pool_name() :: any(). @@ -199,6 +202,7 @@ init([Pool, Id, Opts0]) -> process_flag(trap_exit, true), PrioLatest = proplists:get_bool(prioritise_latest, Opts0), #{opts := Opts, proxy := Proxy} = parse_proxy_opts(Opts0), + MaxInactive = proplists:get_value(max_inactive, Opts, ?DEFAULT_MAX_INACTIVE), State = #state{ pool = Pool, id = Id, @@ -213,12 +217,14 @@ init([Pool, Id, Opts0]) -> pending => queue:new(), pending_count => 0, sent => #{}, + max_sent_expire => 0, prioritise_latest => PrioLatest }, - proxy = Proxy + proxy = Proxy, + max_inactive = MaxInactive }, true = gproc_pool:connect_worker(ehttpc:name(Pool), {Pool, Id}), - {ok, State}. + {ok, start_check_inactive_timer(State)}. handle_call({health_check, _}, _From, State = #state{gun_state = up}) -> {reply, ok, State}; @@ -267,11 +273,18 @@ handle_info({suspend, Time}, State) -> %% only for testing timer:sleep(Time), {noreply, State}; +handle_info(check_inactive, State0) -> + State = maybe_shoot(State0), + {noreply, start_check_inactive_timer(State)}; handle_info(Info, State0) -> State1 = do_handle_info(Info, upgrade_requests(State0)), State = maybe_shoot(State1), {noreply, State}. +start_check_inactive_timer(#state{inactive_check_tref = Tref, max_inactive = T} = State) -> + is_reference(Tref) andalso erlang:cancel_timer(Tref), + State#state{inactive_check_tref = erlang:send_after(T, self(), check_inactive)}. + do_handle_info( {gun_response, Client, StreamRef, IsFin, StatusCode, Headers}, #state{client = Client} = State @@ -308,7 +321,7 @@ do_handle_info( State = #state{client = Client} ) -> Reason =/= normal andalso Reason =/= closed andalso - ?LOG(warning, "Received 'gun_down' message with reason: ~p", [Reason]), + log(warning, #{msg => "http_connection_down", reason => Reason}, State), NewState = handle_gun_down(State, KilledStreams, Reason), NewState; do_handle_info( @@ -320,7 +333,14 @@ do_handle_info( do_handle_info({'EXIT', Client, Reason}, State = #state{client = Client}) -> handle_client_down(State, Reason); do_handle_info(Info, State) -> - ?LOG(warning, "~p unexpected_info: ~p, client: ~p", [?MODULE, Info, State#state.client]), + log( + warning, + #{ + msg => "ehttpc_unexpected_info", + info => Info + }, + State + ), State. terminate(_Reason, #state{pool = Pool, id = Id, client = Client}) -> @@ -532,22 +552,49 @@ upgrade_requests(Map) when is_map(Map) -> pending => queue:new(), pending_count => 0, sent => Map, - prioritise_latest => false + prioritise_latest => false, + max_sent_expire => 0 }. -put_sent_req(StreamRef, Req, #{sent := Sent} = Requests) -> - Requests#{sent := maps:put(StreamRef, Req, Sent)}. +put_sent_req( + StreamRef, + Req, + #{ + sent := Sent, + max_sent_expire := T + } = Requests +) -> + ?SENT_REQ(_, Expire, _) = Req, + Requests#{ + sent := maps:put(StreamRef, Req, Sent), + max_sent_expire := max_expire(T, Expire) + }. + +%% if a request has infinity timeout, ignore it +max_expire(T, infinity) -> T; +max_expire(T1, T2) when is_integer(T2) -> max(T1, T2). -take_sent_req(StreamRef, #{sent := Sent} = Requests) -> +take_sent_req(StreamRef, #{sent := Sent, max_sent_expire := T} = Requests) -> case maps:take(StreamRef, Sent) of error -> error; {Req, NewSent} -> + %% we assume all calls use the same timeout value + %% so there is no need to scan the map to find a new max + %% or even if calls may use different timeout + %% the impact of a wrong max is minimal: delayed detection of zombie connection + NewT = + case map_size(NewSent) of + 0 -> + 0; + _ -> + T + end, case is_sent_req_expired(Req, now_()) of true -> - {expired, Requests#{sent := NewSent}}; + {expired, Requests#{sent := NewSent, max_sent_expire := NewT}}; false -> - {Req, Requests#{sent := NewSent}} + {Req, Requests#{sent := NewSent, max_sent_expire := NewT}} end end. @@ -579,14 +626,7 @@ reply_error_for_sent_reqs(#{sent := Sent} = R, Reason) -> end, maps:to_list(Sent) ), - R#{sent := #{}}. - -%% allow 100 async requests maximum when enable_pipelining is 'true' -%% allow only 1 async request when enable_pipelining is 'false' -%% otherwise stop shooting at the number limited by enable_pipelining -should_cool_down(true, Sent) -> Sent >= 100; -should_cool_down(false, Sent) -> Sent > 0; -should_cool_down(N, Sent) when is_integer(N) -> Sent >= N. + R#{sent => #{}, max_sent_expire => 0}. %% Continue droping expired requests, to avoid the state RAM usage %% explosion if http client can not keep up. @@ -633,20 +673,100 @@ enqueue_req(ReplyTo, Req, #state{requests = Requests0} = State) -> State#state{requests = drop_expired(Requests)}. %% call gun to shoot the request out -maybe_shoot(#state{enable_pipelining = EP, requests = Requests0, client = Client} = State0) -> - #{sent := Sent} = Requests0, +maybe_shoot( + #state{ + requests = + #{ + sent := Sent, + max_sent_expire := MaxExpire + } = Requests0, + client = Client, + max_inactive = MaxInactive, + enable_pipelining = PipelineLimit + } = State0 +) -> State = State0#state{requests = drop_expired(Requests0)}, - %% If the gun http client is down - ClientDown = is_pid(Client) andalso (not is_process_alive(Client)), - %% Or when too many has been sent already - case ClientDown orelse should_cool_down(EP, maps:size(Sent)) of - true -> + SentCount = map_size(Sent), + case check_gun(Client, PipelineLimit, SentCount, MaxExpire, MaxInactive) of + continue -> + do_shoot(State); + pause -> %% Then we should cool down, and let the gun responses %% or 'EXIT' message to trigger the flow again - ?tp(cool_down, #{enable_pipelining => EP}), + ?tp(cool_down, #{enable_pipelining => State#state.enable_pipelining}), State; + reconnect -> + %% assert + true = (MaxExpire > 0), + %% the connection has been inactive for too long + log( + error, + #{ + msg => "force_reconnecting_zombie_http_connection", + last_request_expire => calendar:system_time_to_rfc3339(MaxExpire, [ + {unit, millisecond} + ]), + inactive_duration_threshold => MaxInactive, + inflight_requests => SentCount, + connection_pid => Client + }, + State + ), + ?tp(reconnect, #{sent => SentCount}), + _ = exit(Client, kill), + State + end. + +check_gun(ClientPid, PipelineLimit, SentCount, MaxExpireTs, MaxInactiveDuration) -> + maybe + ok ?= check_gun_pid(ClientPid), + ok ?= check_gun_jammed(SentCount, MaxExpireTs, MaxInactiveDuration), + check_gun_limit(PipelineLimit, SentCount) + end. + +check_gun_pid(Pid) when not is_pid(Pid) -> + %% go straight to initialize client + continue; +check_gun_pid(Pid) -> + case is_process_alive(Pid) of + true -> + %% ok to send + ok; false -> - do_shoot(State) + %% once initialized but now restarting + %% should not send but wait for EXIT message to trigger + %% reconnect + pause + end. + +%% if there are sent requests, and the last reply is older than max_inactive, +%% the connection is considered in zomebie state hence require a reconnect. +check_gun_jammed(_SentCount, 0, _MaxInactiveDuration) -> + %% there was no expire time recorded before + ok; +check_gun_jammed(_SentCount, MaxExpireTs, MaxInactiveDuration) -> + case (now_() - MaxExpireTs) > MaxInactiveDuration of + true -> + reconnect; + false -> + ok + end. + +%% allow 100 async requests maximum when enable_pipelining is 'true' +%% allow only 1 async request when enable_pipelining is 'false' +%% otherwise stop shooting at the number limited by enable_pipelining +check_gun_limit(_EnablePipeline = true, SentCount) -> + %% backward compatible + check_gun_limit(100, SentCount); +check_gun_limit(_EnablePipeline = false, SentCount) -> + %% backward compatible + check_gun_limit(1, SentCount); +check_gun_limit(PipelineLimit, SentCount) -> + case SentCount < PipelineLimit of + true -> + continue; + false -> + pause end. do_shoot(#state{requests = #{pending_count := 0}} = State) -> @@ -914,6 +1034,9 @@ take_proplist(Key, Proplist0) -> {ValueFromProplist, Proplist1} end. +log(Level, Data, #state{host = Host, port = Port}) -> + logger:log(Level, Data#{host => Host, port => Port}). + -ifdef(TEST). prioritise_latest_test() -> diff --git a/test/ehttpc_tests.erl b/test/ehttpc_tests.erl index fd9dc90..1c3292f 100644 --- a/test/ehttpc_tests.erl +++ b/test/ehttpc_tests.erl @@ -335,7 +335,7 @@ health_check_abnormal_test_() -> name => ?FUNCTION_NAME, delay => 0 }, - pool_opts(Unreachable, Port, true), + [{connect_timeout, 100} | pool_opts(Unreachable, Port, true)], begin Worker = ehttpc_pool:pick_worker(?POOL), ?assertEqual( @@ -466,7 +466,7 @@ cool_down_after_5_reqs_test() -> oneoff => false }, PoolOpts = pool_opts("127.0.0.1", Port, _Pipelining = 5, _PrioritiseLatest = false), - Reqs = [{"/", [], iolist_to_binary(["test-put-", integer_to_list(I)])} || I <- lists:seq(1, 6)], + Reqs = [["test-put-", integer_to_list(I)] || I <- lists:seq(1, 6)], ?WITH( ServerOpts, PoolOpts, @@ -487,6 +487,90 @@ cool_down_after_5_reqs_test() -> end ). +zombie_detect_inflight_not_full_test() -> + Port = ?PORT, + ServerOpts = #{ + port => Port, + name => ?FUNCTION_NAME, + %% no response during this test + delay => 30_000, + oneoff => false + }, + Pipelining = 5, + PoolOpts0 = pool_opts("127.0.0.1", Port, Pipelining, _PrioritiseLatest = false), + PoolOpts = [{max_inactive, 1_000} | PoolOpts0], + Reqs = [["test-put-", integer_to_list(I)] || I <- lists:seq(1, Pipelining - 1)], + ?WITH( + ServerOpts, + PoolOpts, + begin + lists:foreach( + fun(Req) -> + spawn_link(fun() -> ehttpc:request(?POOL, put, {<<"/">>, [], Req}, 100) end) + end, + Reqs + ), + Pid = ehttpc_pool:pick_worker(?POOL), + {ok, _} = ?block_until(#{?snk_kind := reconnect}, 2500, infinity), + %% let the EXIT signal get to ehttpc process + {ok, _} = ?block_until(#{?snk_kind := handle_client_down}, 1000, infinity), + #{requests := Requests} = ehttpc:get_state(Pid, normal), + #{sent := Sent, pending_count := PendingCount, max_sent_expire := MaxTs} = Requests, + ?assertEqual(0, maps:size(Sent)), + ?assertEqual(0, PendingCount), + ?assertEqual(0, MaxTs), + ok + end + ). + +zombie_detect_inflight_full_test() -> + Port = ?PORT, + ServerOpts = #{ + port => Port, + name => ?FUNCTION_NAME, + %% no response during this test + delay => 30_000, + oneoff => false + }, + Pipelining = 5, + PoolOpts0 = pool_opts("127.0.0.1", Port, Pipelining, _PrioritiseLatest = false), + PoolOpts = [{max_inactive, 1_000} | PoolOpts0], + Reqs = [["test-put-", integer_to_list(I)] || I <- lists:seq(1, Pipelining)], + ?WITH( + ServerOpts, + PoolOpts, + begin + %% fill up the inflight window + lists:foreach( + fun(Req) -> + spawn_link(fun() -> ehttpc:request(?POOL, put, {<<"/">>, [], Req}, 100) end) + end, + Reqs + ), + Pid = ehttpc_pool:pick_worker(?POOL), + Tester = self(), + Callback = {fun(Reason) -> Tester ! {reason, Reason} end, []}, + %% the next request should be in the pending + ehttpc:request_async(Pid, put, {<<"/">>, [], "foo"}, 100, Callback), + %% wait for reconnect + {ok, _} = ?block_until(#{?snk_kind := reconnect}, 2_500, infinity), + %% now the pending request is processed, but should eventually timeout again + %% detected by zomebie check timer + receive + {reason, Reason} -> + ?assertEqual({error, killed}, Reason) + after 2_500 -> + error(timeout) + end, + #{requests := Requests} = ehttpc:get_state(Pid, normal), + #{sent := Sent, pending_count := PendingCount, max_sent_expire := MaxTs} = Requests, + ?assertEqual(0, maps:size(Sent)), + ?assertEqual(0, PendingCount), + ?assertEqual(0, MaxTs), + ok + end + ). + head_request_test() -> Port = ?PORT, Host = "127.0.0.1",