Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add proxy support, drop hot-upgrade #56

Merged
merged 5 commits into from
Sep 23, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions .github/workflows/erlang.yml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@ jobs:
with:
otp-version: ${{ matrix.otp }}
rebar3-version: 3
- name: Ensure style
run: ./check-style.sh
- name: setup tinyproxy
run: sudo ./test/scripts/setup_tinyproxy.sh
- name: Compile
run: rebar3 compile
- name: Run tests
Expand All @@ -37,5 +41,3 @@ jobs:
run: make dialyzer
- name: Ensure version consistency
run: ./check_vsns.escript
- name: Ensure style
run: ./check-style.sh
17 changes: 17 additions & 0 deletions changelog.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,22 @@
# ehttpc changes

## 0.4.15

- Added support for using HTTP proxy (HTTP 1.1 only).
To use it, pass `proxy` in the pool opts.

Ex:

```erlang
%% Point to the proxy host and port
ProxyOpts = #{host => "127.0.0.1", port => 8888}.
ehttpc_sup:start_pool(<<"pool">>, [{host, "target.host.com"}, {port, 80}, {proxy, ProxyOpts}]).

%% To use username and password
ProxyOpts = #{host => "127.0.0.1", port => 8888, username => "proxyuser", password => "secret"}.
ehttpc_sup:start_pool(<<"pool">>, [{host, "target.host.com"}, {port, 80}, {proxy, ProxyOpts}]).
```

## 0.4.14

- Forcefully recreate `gproc_pool`s during `ehttpc_pool:init/1` to prevent reusing pools in an inconsistent state.
Expand Down
2 changes: 1 addition & 1 deletion src/ehttpc.app.src
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{application, ehttpc, [
{description, "HTTP Client for Erlang/OTP"},
{vsn, "0.4.14"},
zmstone marked this conversation as resolved.
Show resolved Hide resolved
{vsn, "0.4.15"},
{registered, []},
{applications, [
kernel,
Expand Down
42 changes: 27 additions & 15 deletions src/ehttpc.appup.src
Original file line number Diff line number Diff line change
@@ -1,10 +1,16 @@
%% -*- mode: erlang -*-
{"0.4.14",
{"0.4.15",
[
{"0.4.14", [
{load_module, ehttpc_pool, brutal_purge, soft_purge, []},
{load_module, ehttpc, brutal_purge, soft_purge, []}
]},
{"0.4.13", [
{load_module, ehttpc_pool, brutal_purge, soft_purge, []}
{load_module, ehttpc_pool, brutal_purge, soft_purge, []},
{load_module, ehttpc, brutal_purge, soft_purge, []}
]},
{"0.4.12", [ % upgrade gun, no local beam changes in this version
{"0.4.12", [
{load_module, ehttpc, brutal_purge, soft_purge, []}
]},
{"0.4.11", [
{load_module, ehttpc_pool, brutal_purge, soft_purge, []},
Expand Down Expand Up @@ -78,55 +84,61 @@
]}
],
[
{"0.4.14", [
{load_module, ehttpc_pool, brutal_purge, soft_purge, []},
{update, ehttpc, {advanced, [no_proxy]}}
]},
{"0.4.13", [
{load_module, ehttpc_pool, brutal_purge, soft_purge, []}
{load_module, ehttpc_pool, brutal_purge, soft_purge, []},
{update, ehttpc, {advanced, [no_proxy]}}
]},
{"0.4.12", [ % upgrade gun, no local beam changes in this version
{"0.4.12", [
{update, ehttpc, {advanced, [no_proxy]}}
]},
{"0.4.11", [
{load_module, ehttpc_pool, brutal_purge, soft_purge, []},
{load_module, ehttpc, brutal_purge, soft_purge, []}
{update, ehttpc, {advanced, [no_proxy]}}
]},
{"0.4.10", [
{load_module, ehttpc_pool, brutal_purge, soft_purge, []},
{load_module, ehttpc, brutal_purge, soft_purge, []}
{update, ehttpc, {advanced, [no_proxy]}}
]},
{"0.4.9", [
{load_module, ehttpc_pool, brutal_purge, soft_purge, []},
{load_module, ehttpc, brutal_purge, soft_purge, []}
{update, ehttpc, {advanced, [no_proxy]}}
]},
{"0.4.8", [
{load_module, ehttpc_pool, brutal_purge, soft_purge, []},
{load_module, ehttpc, brutal_purge, soft_purge, []}
{update, ehttpc, {advanced, [no_proxy]}}
]},
{"0.4.7", [
{load_module, ehttpc_pool, brutal_purge, soft_purge, []},
{load_module, ehttpc, brutal_purge, soft_purge, []}
{update, ehttpc, {advanced, [no_proxy]}}
]},
{"0.4.6", [
{load_module, ehttpc_pool, brutal_purge, soft_purge, []},
{load_module, ehttpc, brutal_purge, soft_purge, []}
{update, ehttpc, {advanced, [no_proxy]}}
]},
{"0.4.5", [
{load_module, ehttpc_pool, brutal_purge, soft_purge, []},
{load_module, ehttpc, brutal_purge, soft_purge, []}
{update, ehttpc, {advanced, [no_proxy]}}
]},
{<<"0\\.4\\.[0-4]">>, [
{load_module, ehttpc, brutal_purge, soft_purge, []},
{update, ehttpc, {advanced, [no_proxy]}},
{load_module, ehttpc_sup, brutal_purge, soft_purge, []},
{load_module, ehttpc_pool, brutal_purge, soft_purge, []},
{load_module, ehttpc_pool_sup, brutal_purge, soft_purge, []},
{load_module, ehttpc_worker_sup, brutal_purge, soft_purge, []}
]},
{"0.3.0", [
{load_module, ehttpc, brutal_purge, soft_purge, []},
{update, ehttpc, {advanced, [no_proxy]}},
{load_module, ehttpc_sup, brutal_purge, soft_purge, []},
{load_module, ehttpc_pool, brutal_purge, soft_purge, []},
{load_module, ehttpc_pool_sup, brutal_purge, soft_purge, []},
{load_module, ehttpc_worker_sup, brutal_purge, soft_purge, []}
]},
{<<"0\\.2\\.[0-1]">>, [
{load_module, ehttpc, brutal_purge, soft_purge, []},
{update, ehttpc, {advanced, [no_proxy]}},
{load_module, ehttpc_sup, brutal_purge, soft_purge, []},
{load_module, ehttpc_pool, brutal_purge, soft_purge, []},
{load_module, ehttpc_pool_sup, brutal_purge, soft_purge, []},
Expand Down
158 changes: 147 additions & 11 deletions src/ehttpc.erl
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,8 @@
enable_pipelining :: boolean() | non_neg_integer(),
gun_opts :: gun:opts(),
gun_state :: down | up,
requests :: map()
requests :: map(),
proxy :: undefined | map()
}).

-type pool_name() :: any().
Expand Down Expand Up @@ -195,9 +196,10 @@ name(Pool) -> {?MODULE, Pool}.
%% gen_server callbacks
%%--------------------------------------------------------------------

init([Pool, Id, Opts]) ->
init([Pool, Id, Opts0]) ->
process_flag(trap_exit, true),
PrioLatest = proplists:get_bool(prioritise_latest, Opts),
PrioLatest = proplists:get_bool(prioritise_latest, Opts0),
#{opts := Opts, proxy := Proxy} = parse_proxy_opts(Opts0),
State = #state{
pool = Pool,
id = Id,
Expand All @@ -213,7 +215,8 @@ init([Pool, Id, Opts]) ->
pending_count => 0,
sent => #{},
prioritise_latest => PrioLatest
}
},
proxy = Proxy
},
true = gproc_pool:connect_worker(ehttpc:name(Pool), {Pool, Id}),
{ok, State}.
Expand Down Expand Up @@ -356,10 +359,37 @@ code_change({down, _Vsn}, State, [no_enable_pipelining]) ->
} = State,
OldRequests = downgrade_requests(Requests),
{ok, {state, Pool, ID, Client, MRef, Host, Port, GunOpts, GunState, OldRequests}};
code_change({down, _Vsn}, #state{requests = Requests} = State, [downgrade_requests]) ->
code_change({down, _Vsn}, State, [downgrade_requests]) ->
%% downgrade to a version which had old format 'requests'
#state{
pool = Pool,
id = ID,
client = Client,
mref = MRef,
host = Host,
port = Port,
enable_pipelining = Pipelining,
gun_opts = GunOpts,
gun_state = GunState,
requests = Requests
} = State,
OldRequests = downgrade_requests(Requests),
{ok, State#state{requests = OldRequests}};
{ok, {state, Pool, ID, Client, MRef, Host, Port, Pipelining, GunOpts, GunState, OldRequests}};
code_change({down, _Vsn}, State, [no_proxy]) ->
%% downgrade to a version before `proxy' was added
#state{
pool = Pool,
id = ID,
client = Client,
mref = MRef,
host = Host,
port = Port,
enable_pipelining = Pipelining,
gun_opts = GunOpts,
gun_state = GunState,
requests = Requests
} = State,
{ok, {state, Pool, ID, Client, MRef, Host, Port, Pipelining, GunOpts, GunState, Requests}};
%% below are upgrade instructions
code_change(_Vsn, {state, Pool, ID, Client, MRef, Host, Port, GunOpts, GunState}, _Extra) ->
%% upgrade from a version before 'requests' field was added
Expand All @@ -373,10 +403,11 @@ code_change(_Vsn, {state, Pool, ID, Client, MRef, Host, Port, GunOpts, GunState}
enable_pipelining = true,
gun_opts = GunOpts,
gun_state = GunState,
requests = upgrade_requests(#{})
requests = upgrade_requests(#{}),
proxy = undefined
}};
code_change(_Vsn, {state, Pool, ID, Client, MRef, Host, Port, GunOpts, GunState, Requests}, _) ->
%% upgrade from a version before 'enable_pipelining' filed was added
%% upgrade from a version before 'enable_pipelining' field was added
{ok, #state{
pool = Pool,
id = ID,
Expand All @@ -389,8 +420,25 @@ code_change(_Vsn, {state, Pool, ID, Client, MRef, Host, Port, GunOpts, GunState,
gun_state = GunState,
requests = upgrade_requests(Requests)
}};
code_change(
_Vsn, {state, Pool, ID, Client, MRef, Host, Port, Pipelining, GunOpts, GunState, Requests}, _
) ->
%% upgrade from a version before `proxy' field was added
{ok, #state{
pool = Pool,
id = ID,
client = Client,
mref = MRef,
host = Host,
port = Port,
enable_pipelining = Pipelining,
gun_opts = GunOpts,
gun_state = GunState,
requests = upgrade_requests(Requests),
proxy = undefined
}};
code_change(_Vsn, State, _) ->
%% upgrade from a version ahving old format 'requests' field
%% upgrade from a version having old format 'requests' field
{ok, upgrade_requests(State)}.

format_status(Status = #{state := State}) ->
Expand Down Expand Up @@ -776,11 +824,15 @@ do_after_gun_up(State0 = #state{client = Client, mref = MRef}, ExpireAt, Fun) ->
{Res, State} = gun_await_up(Client, ExpireAt, Timeout, MRef, State0),
case Res of
{ok, _} ->
Fun(State#state{gun_state = up});
Fun(State);
{error, connect_timeout} ->
%% the caller can not wait logger
%% but the connection is likely to be useful
{reply, {error, connect_timeout}, State};
{error, {proxy_error, _} = Error} ->
%% We keep the client around because the proxy might still send data as part
%% of the error response.
{reply, {error, Error}, State};
{error, Reason} ->
case is_reference(MRef) of
true ->
Expand All @@ -798,7 +850,13 @@ do_after_gun_up(State0 = #state{client = Client, mref = MRef}, ExpireAt, Fun) ->
gun_await_up(Pid, ExpireAt, Timeout, MRef, State0) ->
receive
{gun_up, Pid, Protocol} ->
{{ok, Protocol}, State0};
case State0#state.proxy of
undefined ->
State = State0#state{gun_state = up},
{{ok, Protocol}, State};
#{} = ProxyOpts ->
gun_connect_proxy(Pid, ExpireAt, Timeout, Protocol, ProxyOpts, State0)
end;
{'DOWN', MRef, process, Pid, {shutdown, Reason}} ->
%% stale code for appup since 0.4.12
{{error, Reason}, State0};
Expand All @@ -824,6 +882,38 @@ gun_await_up(Pid, ExpireAt, Timeout, MRef, State0) ->
{{error, connect_timeout}, State0}
end.

gun_connect_proxy(Pid, ExpireAt, Timeout, Protocol, ProxyOpts, State0) ->
StreamRef = gun:connect(Pid, ProxyOpts),
gun_await_connect_proxy(Pid, StreamRef, ExpireAt, Timeout, Protocol, ProxyOpts, State0).

gun_await_connect_proxy(Pid, StreamRef, ExpireAt, Timeout, Protocol, ProxyOpts, State0) ->
receive
{gun_response, Pid, StreamRef, fin, 200, Headers} ->
State = State0#state{gun_state = up},
{{ok, {Protocol, Headers}}, State};
{gun_response, Pid, StreamRef, _Fin, 407, _Headers} ->
{{error, {proxy_error, unauthorized}}, State0};
{gun_response, Pid, StreamRef, _Fin, StatusCode, Headers} ->
{{error, {proxy_error, {StatusCode, Headers}}}, State0};
?ASYNC_REQ(Method, Request, ExpireAt1, ResultCallback) ->
Req = ?REQ(Method, Request, ExpireAt1),
State = enqueue_req(ResultCallback, Req, State0),
%% keep waiting
NewTimeout = timeout(ExpireAt),
gun_await_connect_proxy(
Pid, StreamRef, ExpireAt, NewTimeout, Protocol, ProxyOpts, State
);
?GEN_CALL_REQ(From, Call) ->
State = enqueue_req(From, Call, State0),
%% keep waiting
NewTimeout = timeout(ExpireAt),
gun_await_connect_proxy(
Pid, StreamRef, ExpireAt, NewTimeout, Protocol, ProxyOpts, State
)
after Timeout ->
{{error, connect_timeout}, State0}
end.

%% normal handling of gun_response and gun_data reply
handle_gun_reply(State, Client, StreamRef, IsFin, StatusCode, Headers, Data) ->
#state{requests = Requests} = State,
Expand Down Expand Up @@ -891,6 +981,52 @@ fresh_expire_at(infinity = _Timeout) ->
fresh_expire_at(Timeout) when is_integer(Timeout) ->
now_() + Timeout.

parse_proxy_opts(Opts) ->
%% Target host and port
case proplists:get_value(proxy, Opts, undefined) of
undefined ->
#{opts => Opts, proxy => undefined};
#{host := _, port := _} = ProxyOpts0 ->
%% We open connection to proxy, then issue `gun:connect' to target host.
{ProxyOpts, NewOpts} =
lists:foldl(
fun(Key, {ProxyAcc, GunAcc}) ->
swap(Key, ProxyAcc, GunAcc)
end,
{ProxyOpts0, proplists:delete(proxy, Opts)},
[host, port, transport, {tls_opts, transport_opts}]
),
#{opts => NewOpts, proxy => ProxyOpts}
end.

swap(Key, Map, Proplist) when is_atom(Key) ->
swap({Key, Key}, Map, Proplist);
swap({KeyM, KeyP}, Map0, Proplist0) when is_map_key(KeyM, Map0) ->
ValueFromMap = maps:get(KeyM, Map0),
Map = maps:remove(KeyM, Map0),
case take_proplist(KeyP, Proplist0) of
{ValueFromProplist, Proplist} ->
{Map#{KeyM => ValueFromProplist}, [{KeyP, ValueFromMap} | Proplist]};
error ->
{Map, [{KeyP, ValueFromMap} | Proplist0]}
end;
swap({KeyM, KeyP}, Map0, Proplist0) ->
case take_proplist(KeyP, Proplist0) of
{ValueFromProplist, Proplist} ->
{Map0#{KeyM => ValueFromProplist}, Proplist};
error ->
{Map0, Proplist0}
end.

take_proplist(Key, Proplist0) ->
Proplist1 = lists:keydelete(Key, 1, Proplist0),
case lists:keyfind(Key, 1, Proplist0) of
false ->
error;
{Key, ValueFromProplist} ->
{ValueFromProplist, Proplist1}
end.

-ifdef(TEST).

prioritise_latest_test() ->
Expand Down
Loading
Loading