Skip to content

Commit

Permalink
feat: update to use gun 2.1
Browse files Browse the repository at this point in the history
  • Loading branch information
keynslug committed Dec 29, 2024
1 parent b79fdd1 commit 05e9c6b
Show file tree
Hide file tree
Showing 4 changed files with 112 additions and 113 deletions.
2 changes: 1 addition & 1 deletion rebar.config
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
{minimum_otp_vsn, "21.0"}.

{deps, [
{gun, {git, "https://github.com/emqx/gun", {tag, "1.3.11"}}},
{gun, "~> 2.1"},
{gproc, {git, "https://github.com/emqx/gproc", {tag, "0.9.0.1"}}},
{snabbkaffe, {git, "https://github.com/kafka4beam/snabbkaffe.git", {tag, "1.0.10"}}}
]}.
Expand Down
202 changes: 104 additions & 98 deletions src/ehttpc.erl
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,15 @@
-define(IS_POOL(Pool), (not is_tuple(Pool) andalso not is_pid(Pool))).
-define(DEFAULT_MAX_INACTIVE, 10_000).

-define(IS_HEADERS_REQ(REQ),
(tuple_size(REQ) =:= 2 andalso is_list(element(2, REQ)))
).

-define(IS_BODY_REQ(REQ),
(tuple_size(REQ) =:= 3 andalso is_list(element(2, REQ)) andalso
(is_binary(element(3, REQ)) orelse is_list(element(3, REQ))))
).

-record(state, {
pool :: term(),
id :: pos_integer(),
Expand All @@ -91,8 +100,11 @@
enable_pipelining :: boolean() | non_neg_integer(),
gun_opts :: gun:opts(),
gun_state :: down | up,
gun_tunnel :: undefined | gun:stream_ref(),
requests :: map(),
proxy :: undefined | map(),
%% If defined, describes origin server.
%% In this case, host and port point to proxy server.
origin :: undefined | map(),
max_inactive :: pos_integer(),
inactive_check_tref :: reference() | ?undef
}).
Expand Down Expand Up @@ -155,7 +167,7 @@ request(Worker, Method, Request, Timeout, Retry) when is_pid(Worker) ->
infinity -> infinity;
T -> T + 500
end,
try gen_server:call(Worker, ?REQ(Method, Request, ExpireAt), CallTimeout) of
try gen_server:call(Worker, mk_request(Method, Request, ExpireAt), CallTimeout) of
%% gun will reply {gun_down, _Client, _, normal, _KilledStreams, _} message
%% when connection closed by keepalive

Expand Down Expand Up @@ -183,13 +195,43 @@ request(Worker, Method, Request, Timeout, Retry) when is_pid(Worker) ->
{error, {ehttpc_worker_down, Reason}}
end.

mk_request(head = Method, Req, ExpireAt) when ?IS_HEADERS_REQ(Req) ->
?REQ(Method, Req, ExpireAt);
mk_request(head = Method, Path, ExpireAt) ->
mk_request(Method, {Path, []}, ExpireAt);
mk_request(get = Method, Req, ExpireAt) when ?IS_HEADERS_REQ(Req) ->
?REQ(Method, Req, ExpireAt);
mk_request(patch = Method, Req, ExpireAt) when ?IS_BODY_REQ(Req) ->
?REQ(Method, Req, ExpireAt);
mk_request(post = Method, Req, ExpireAt) when ?IS_BODY_REQ(Req) ->
?REQ(Method, Req, ExpireAt);
mk_request(put = Method, Req, ExpireAt) when ?IS_BODY_REQ(Req) ->
?REQ(Method, Req, ExpireAt);
mk_request(delete = Method, Req, ExpireAt) when ?IS_HEADERS_REQ(Req) ->
?REQ(Method, Req, ExpireAt).

%% @doc Send an async request. The callback is evaluated when an error happens or http response is received.
-spec request_async(pid(), method(), request(), timeout(), callback()) -> ok.
request_async(Worker, Method, Request, Timeout, ResultCallback) when is_pid(Worker) ->
ExpireAt = fresh_expire_at(Timeout),
_ = erlang:send(Worker, ?ASYNC_REQ(Method, Request, ExpireAt, ResultCallback)),
_ = erlang:send(Worker, mk_async_request(Method, Request, ExpireAt, ResultCallback)),
ok.

mk_async_request(head = Method, Req, ExpireAt, RC) when ?IS_HEADERS_REQ(Req) ->
?ASYNC_REQ(Method, Req, ExpireAt, RC);
mk_async_request(head = Method, Path, ExpireAt, RC) ->
mk_async_request(Method, {Path, []}, ExpireAt, RC);
mk_async_request(get = Method, Req, ExpireAt, RC) when ?IS_HEADERS_REQ(Req) ->
?ASYNC_REQ(Method, Req, ExpireAt, RC);
mk_async_request(patch = Method, Req, ExpireAt, RC) when ?IS_BODY_REQ(Req) ->
?ASYNC_REQ(Method, Req, ExpireAt, RC);
mk_async_request(post = Method, Req, ExpireAt, RC) when ?IS_BODY_REQ(Req) ->
?ASYNC_REQ(Method, Req, ExpireAt, RC);
mk_async_request(put = Method, Req, ExpireAt, RC) when ?IS_BODY_REQ(Req) ->
?ASYNC_REQ(Method, Req, ExpireAt, RC);
mk_async_request(delete = Method, Req, ExpireAt, RC) when ?IS_HEADERS_REQ(Req) ->
?ASYNC_REQ(Method, Req, ExpireAt, RC).

workers(Pool) ->
gproc_pool:active_workers(name(Pool)).

Expand All @@ -202,7 +244,7 @@ name(Pool) -> {?MODULE, Pool}.
init([Pool, Id, Opts0]) ->
process_flag(trap_exit, true),
PrioLatest = proplists:get_bool(prioritise_latest, Opts0),
#{opts := Opts, proxy := Proxy} = parse_proxy_opts(Opts0),
#{opts := Opts, origin := Origin} = parse_proxy_opts(Opts0),
MaxInactive = proplists:get_value(max_inactive, Opts, ?DEFAULT_MAX_INACTIVE),
State = #state{
pool = Pool,
Expand All @@ -220,7 +262,7 @@ init([Pool, Id, Opts0]) ->
max_sent_expire => 0,
prioritise_latest => PrioLatest
},
proxy = Proxy,
origin = Origin,
max_inactive = MaxInactive
},
true = gproc_pool:connect_worker(ehttpc:name(Pool), {Pool, Id}),
Expand Down Expand Up @@ -317,7 +359,7 @@ do_handle_info({gun_up, Client, _}, State = #state{client = Client}) ->
%% we can only hope it to be useful for the next call
State#state{gun_state = up};
do_handle_info(
{gun_down, Client, _, Reason, KilledStreams, _},
{gun_down, Client, _Protocol, Reason, KilledStreams},
State = #state{client = Client}
) ->
Reason =/= normal andalso Reason =/= closed andalso
Expand Down Expand Up @@ -401,7 +443,7 @@ handle_gun_down(#state{requests = Requests} = State, KilledStreams, Reason) ->
State#state{requests = NRequests, gun_state = down}.

open(State = #state{host = Host, port = Port, gun_opts = GunOpts}) ->
case gun:start_link(Host, Port, GunOpts) of
case gun:open(Host, Port, GunOpts) of
{ok, ConnPid} when is_pid(ConnPid) ->
{ok, State#state{client = ConnPid}};
{error, Reason} ->
Expand All @@ -420,7 +462,9 @@ gun_opts(Opts) ->
%% The keepalive mechanism of gun will send "\r\n" for keepalive,
%% which may cause misjudgment by some servers, so we disabled it by default
http_opts => #{keepalive => infinity},
protocols => [http]
protocols => [http],
%% Link with client process directly.
supervise => false
}).

gun_opts([], Acc) ->
Expand All @@ -433,77 +477,39 @@ gun_opts([{retry_timeout, _} | Opts], Acc) ->
gun_opts(Opts, Acc);
gun_opts([{connect_timeout, ConnectTimeout} | Opts], Acc) ->
gun_opts(Opts, Acc#{connect_timeout => ConnectTimeout});
gun_opts([{transport, Transport} | Opts], Acc) ->
gun_opts(Opts, Acc#{transport => Transport});
gun_opts([{transport_opts, TransportOpts} | Opts], Acc) ->
gun_opts(Opts, Acc#{transport_opts => TransportOpts});
gun_opts([{transport, Transport} | Opts0], Acc0) ->
Acc1 = Acc0#{transport => Transport},
case lists:keytake(transport_opts, 1, Opts0) of
{value, {_, TransportOpts}, Opts} when Transport == tcp ->
Acc = Acc1#{tcp_opts => TransportOpts};
{value, {_, TransportOpts}, Opts} when Transport == tls; Transport == ssl ->
Acc = Acc1#{tls_opts => TransportOpts};
false ->
Acc = Acc0,
Opts = Opts0
end,
gun_opts(Opts, Acc);
gun_opts([_ | Opts], Acc) ->
%% ignore by default
gun_opts(Opts, Acc).

do_request(Client, head, {Path, Headers}) ->
RequestRef = gun:head(Client, Path, Headers),
finish_body_call_if_needed(Client, RequestRef, Headers, <<>>),
RequestRef;
do_request(Client, head, Path) ->
do_request(Client, head, {Path, []});
do_request(Client, get, {Path, Headers}) ->
RequestRef = gun:get(Client, Path, Headers),
finish_body_call_if_needed(Client, RequestRef, Headers, <<>>),
RequestRef;
do_request(Client, patch, {Path, Headers, Body}) ->
RequestRef = gun:patch(Client, Path, Headers, Body),
finish_body_call_if_needed(Client, RequestRef, Headers, Body),
RequestRef;
do_request(Client, post, {Path, Headers, Body}) ->
RequestRef = gun:post(Client, Path, Headers, Body),
finish_body_call_if_needed(Client, RequestRef, Headers, Body),
RequestRef;
do_request(Client, put, {Path, Headers, Body}) ->
RequestRef = gun:put(Client, Path, Headers, Body),
finish_body_call_if_needed(Client, RequestRef, Headers, Body),
RequestRef;
do_request(Client, delete, {Path, Headers}) ->
RequestRef = gun:delete(Client, Path, Headers),
finish_body_call_if_needed(Client, RequestRef, Headers, <<>>),
RequestRef.

%% Finish the request only if the headers are set so that gun expect body data
%% to come with calls to gun:data/4. Otherwise, subsequent request will fail
%% with a function clause error.
finish_body_call_if_needed(Client, RequestRef, Headers, Body) ->
case is_finish_body_call_needed(Headers, Body) of
true ->
gun:data(Client, RequestRef, fin, <<>>);
false ->
ok
end.

%% The following function corresponds to request_io_from_headers from
%% src/gun_http.erl https://github.com/ninenines/gun commit id
%% 47ec03dcf0346ad827e5c8aa8c2bf9ac35398afe
%%
%% Gun checks the headers (in the same way as the following function) to detect
%% if more data is expected to be provided after the initial request with
%% gun:data/4.
is_finish_body_call_needed(Headers, <<>>) ->
case lists:keyfind(<<"content-length">>, 1, Headers) of
{_, <<"0">>} ->
false;
{_, _Length} ->
true;
false ->
is_content_type_field_set(Headers)
end;
%% Gun always finish the request if the body parameter (iodata()) is something
%% else than an empty binary. This means that, for example, gun:post(Client,
%% Path, [{<<"content-type">>, <<datat>>}], <<>>) and gun:post(Client, Path,
%% [{<<"content-type">>, <<datat>>}], []) are not equivalent.
is_finish_body_call_needed(_Headers, _NotEmptyBin) ->
false.

is_content_type_field_set(Headers) ->
lists:keymember(<<"content-type">>, 1, Headers).
do_request(Client, head, {Path, Headers}, TunnelRef) ->
gun:head(Client, Path, Headers, mk_reqopts(TunnelRef));
do_request(Client, get, {Path, Headers}, TunnelRef) ->
gun:get(Client, Path, Headers, mk_reqopts(TunnelRef));
do_request(Client, patch, {Path, Headers, Body}, TunnelRef) ->
gun:patch(Client, Path, Headers, Body, mk_reqopts(TunnelRef));
do_request(Client, post, {Path, Headers, Body}, TunnelRef) ->
gun:post(Client, Path, Headers, Body, mk_reqopts(TunnelRef));
do_request(Client, put, {Path, Headers, Body}, TunnelRef) ->
gun:put(Client, Path, Headers, Body, mk_reqopts(TunnelRef));
do_request(Client, delete, {Path, Headers}, TunnelRef) ->
gun:delete(Client, Path, Headers, mk_reqopts(TunnelRef)).

mk_reqopts(undefined) ->
#{};
mk_reqopts(TunnelRef) ->
#{tunnel => TunnelRef}.

cancel_stream(fin, _Client, _StreamRef) ->
%% nothing to cancel anyway
Expand Down Expand Up @@ -793,10 +799,11 @@ shoot(
State = #state{
client = Client,
requests = Requests,
gun_state = up
gun_state = up,
gun_tunnel = TunnelRef
}
) when is_pid(Client) ->
StreamRef = do_request(Client, Method, Request),
StreamRef = do_request(Client, Method, Request, TunnelRef),
?tp(shot, #{from => ReplyTo, req => Request, reqs => Requests}),
%% no need for the payload
Req = ?SENT_REQ(ReplyTo, ExpireAt, ?undef),
Expand All @@ -821,18 +828,18 @@ do_after_gun_up(State0 = #state{client = Client}, ExpireAt, Fun) ->
{reply, {error, Reason}, State#state{client = ?undef}}
end.

%% This is a copy of gun:wait_up/3
%% This is a copy of gun:await_up/3
%% with the '$gen_call' clause added so the calls in the mail box
%% are collected into the queue in time
gun_await_up(Pid, ExpireAt, Timeout, State0) ->
receive
{gun_up, Pid, Protocol} ->
case State0#state.proxy of
case State0#state.origin of
undefined ->
State = State0#state{gun_state = up},
{{ok, Protocol}, State};
#{} = ProxyOpts ->
gun_connect_proxy(Pid, ExpireAt, Timeout, Protocol, ProxyOpts, State0)
#{} = Origin ->
gun_connect_origin(Pid, ExpireAt, Timeout, Origin, State0)
end;
{'EXIT', Pid, {shutdown, Reason}} ->
{{error, Reason}, State0};
Expand All @@ -853,15 +860,18 @@ gun_await_up(Pid, ExpireAt, Timeout, State0) ->
{{error, connect_timeout}, State0}
end.

gun_connect_proxy(Pid, ExpireAt, Timeout, Protocol, ProxyOpts, State0) ->
StreamRef = gun:connect(Pid, ProxyOpts),
gun_await_connect_proxy(Pid, StreamRef, ExpireAt, Timeout, Protocol, ProxyOpts, State0).
gun_connect_origin(Pid, ExpireAt, Timeout, Origin, State0) ->
StreamRef = gun:connect(Pid, Origin),
gun_await_tunnel(Pid, StreamRef, ExpireAt, Timeout, [], State0).

gun_await_connect_proxy(Pid, StreamRef, ExpireAt, Timeout, Protocol, ProxyOpts, State0) ->
gun_await_tunnel(Pid, StreamRef, ExpireAt, Timeout, Headers, State0) ->
receive
{gun_response, Pid, StreamRef, fin, 200, Headers} ->
State = State0#state{gun_state = up},
{gun_tunnel_up, Pid, TunnelRef, Protocol} ->
State = State0#state{gun_state = up, gun_tunnel = TunnelRef},
{{ok, {Protocol, Headers}}, State};
{gun_response, Pid, StreamRef, fin, 200, Headers} ->
NewTimeout = timeout(ExpireAt),
gun_await_tunnel(Pid, StreamRef, ExpireAt, NewTimeout, Headers, State0);
{gun_response, Pid, StreamRef, _Fin, 407, _Headers} ->
{{error, {proxy_error, unauthorized}}, State0};
{gun_response, Pid, StreamRef, _Fin, StatusCode, Headers} ->
Expand All @@ -871,16 +881,12 @@ gun_await_connect_proxy(Pid, StreamRef, ExpireAt, Timeout, Protocol, ProxyOpts,
State = enqueue_req(ResultCallback, Req, State0),
%% keep waiting
NewTimeout = timeout(ExpireAt),
gun_await_connect_proxy(
Pid, StreamRef, ExpireAt, NewTimeout, Protocol, ProxyOpts, State
);
gun_await_tunnel(Pid, StreamRef, ExpireAt, NewTimeout, Headers, State);
?GEN_CALL_REQ(From, Call) ->
State = enqueue_req(From, Call, State0),
%% keep waiting
NewTimeout = timeout(ExpireAt),
gun_await_connect_proxy(
Pid, StreamRef, ExpireAt, NewTimeout, Protocol, ProxyOpts, State
)
gun_await_tunnel(Pid, StreamRef, ExpireAt, NewTimeout, Headers, State)
after Timeout ->
{{error, connect_timeout}, State0}
end.
Expand Down Expand Up @@ -956,18 +962,18 @@ parse_proxy_opts(Opts) ->
%% Target host and port
case proplists:get_value(proxy, Opts, undefined) of
undefined ->
#{opts => Opts, proxy => undefined};
#{opts => Opts, origin => undefined};
#{host := _, port := _} = ProxyOpts0 ->
%% We open connection to proxy, then issue `gun:connect' to target host.
{ProxyOpts, NewOpts} =
{Origin, NewOpts} =
lists:foldl(
fun(Key, {ProxyAcc, GunAcc}) ->
swap(Key, ProxyAcc, GunAcc)
fun(Key, {OriginAcc, GunAcc}) ->
swap(Key, OriginAcc, GunAcc)
end,
{ProxyOpts0, proplists:delete(proxy, Opts)},
[host, port, transport, {tls_opts, transport_opts}]
),
#{opts => NewOpts, proxy => ProxyOpts}
#{opts => NewOpts, origin => Origin}
end.

swap(Key, Map, Proplist) when is_atom(Key) ->
Expand Down
17 changes: 5 additions & 12 deletions test/ehttpc_google_tests.erl
Original file line number Diff line number Diff line change
Expand Up @@ -52,18 +52,11 @@ proxy_test_() ->
Port = ?PORT,
ProxyOpts0 = #{host => "127.0.0.1", port => 8888},
ProxyOpts1 = ProxyOpts0#{username => "user", password => "pass"},
%% host port enable_pipelining prioritise_latest
Opts1_ = pool_opts(Host, Port, true, true),
Opts2_ = pool_opts(Host, Port, true, false),
Opts3_ = pool_opts(Host, Port, false, true),
Opts4_ = pool_opts(Host, Port, false, false),
[Opts1, Opts2, Opts3, Opts4] = [
[{proxy, ProxyOpts0} | O]
|| O <- [Opts1_, Opts2_, Opts3_, Opts4_]
],
[Opts5, Opts6, Opts7, Opts8] = [
[{proxy, ProxyOpts1} | O]
|| O <- [Opts1_, Opts2_, Opts3_, Opts4_]
[Opts1, Opts2, Opts3, Opts4, Opts5, Opts6, Opts7, Opts8] = [
[{proxy, ProxyOpts} | pool_opts(Host, Port, Pipeline, PrioLatest)]
|| ProxyOpts <- [ProxyOpts0, ProxyOpts1],
Pipeline <- [true, false],
PrioLatest <- [true, false]
],
F = fun() -> req_async(?METHOD, N) end,
NoAuthConfPath = filename:absname("test/scripts/tinyproxy.conf"),
Expand Down
4 changes: 2 additions & 2 deletions test/ehttpc_tests.erl
Original file line number Diff line number Diff line change
Expand Up @@ -391,7 +391,7 @@ server_outage_test_() ->
case Res of
{error, {shutdown, closed}} ->
ok;
{error, {closed, _}} ->
{error, closed} ->
ok;
Other ->
throw({unexpected_result, Other})
Expand Down Expand Up @@ -864,7 +864,7 @@ gun_down_with_reason_normal_is_retried_test() ->
end
|| P <- processes(),
case proc_lib:initial_call(P) of
{gun, proc_lib_hack, _} -> true;
{gun, init, _} -> true;
_ -> false
end
],
Expand Down

0 comments on commit 05e9c6b

Please sign in to comment.