Skip to content

Commit 6ea2b84

Browse files
committed
grpcbox_channel can add and remove dynamic endpoints
1 parent 21a41ff commit 6ea2b84

File tree

2 files changed

+85
-12
lines changed

2 files changed

+85
-12
lines changed

src/grpcbox_channel.erl

Lines changed: 45 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@
55
-export([start_link/3,
66
is_ready/1,
77
pick/2,
8+
add_endpoints/2,
9+
remove_endpoints/3,
810
stop/1,
911
stop/2]).
1012
-export([init/1,
@@ -69,6 +71,12 @@ pick(Name, CallType) ->
6971
{error, undefined_channel}
7072
end.
7173

74+
add_endpoints(Name, Endpoints) ->
75+
gen_statem:call(?CHANNEL(Name), {add_endpoints, Endpoints}).
76+
77+
remove_endpoints(Name, Endpoints, Reason) ->
78+
gen_statem:call(?CHANNEL(Name), {remove_endpoints, Endpoints, Reason}).
79+
7280
-spec interceptor(name(), unary | stream) -> grpcbox_client:interceptor() | undefined.
7381
interceptor(Name, CallType) ->
7482
case ets:lookup(?CHANNELS_TAB, {Name, CallType}) of
@@ -98,14 +106,13 @@ init([Name, Endpoints, Options]) ->
98106
pool = Name,
99107
encoding = Encoding,
100108
stats_handler = StatsHandler,
101-
endpoints = Endpoints
109+
endpoints = lists:umerge(Endpoints, [])
102110
},
103-
104111
case maps:get(sync_start, Options, false) of
105112
false ->
106113
{ok, idle, Data, [{next_event, internal, connect}]};
107114
true ->
108-
_ = start_workers(Name, StatsHandler, Encoding, Endpoints),
115+
start_workers(Name, StatsHandler, Encoding, Endpoints),
109116
{ok, connected, Data}
110117
end.
111118

@@ -114,14 +121,33 @@ callback_mode() ->
114121

115122
connected({call, From}, is_ready, _Data) ->
116123
{keep_state_and_data, [{reply, From, true}]};
124+
connected({call, From}, {add_endpoints, Endpoints},
125+
Data=#data{pool=Pool,
126+
stats_handler=StatsHandler,
127+
encoding=Encoding,
128+
endpoints=TotalEndpoints}) ->
129+
NewEndpoints = lists:subtract(Endpoints, TotalEndpoints),
130+
NewTotalEndpoints = lists:umerge(TotalEndpoints, Endpoints),
131+
start_workers(Pool, StatsHandler, Encoding, NewEndpoints),
132+
{keep_state, Data#data{endpoints=NewTotalEndpoints}, [{reply, From, ok}]};
133+
connected({call, From}, {remove_endpoints, Endpoints, Reason},
134+
Data=#data{pool=Pool,
135+
endpoints=TotalEndpoints}) ->
136+
137+
NewEndpoints = sets:to_list(sets:intersection(sets:from_list(Endpoints),
138+
sets:from_list(TotalEndpoints))),
139+
NewTotalEndpoints = lists:subtract(TotalEndpoints, Endpoints),
140+
stop_workers(Pool, NewEndpoints, Reason),
141+
{keep_state, Data#data{endpoints = NewTotalEndpoints}, [{reply, From, ok}]};
117142
connected(EventType, EventContent, Data) ->
118143
handle_event(EventType, EventContent, Data).
119144

120145
idle(internal, connect, Data=#data{pool=Pool,
121-
stats_handler=StatsHandler,
122-
encoding=Encoding,
123-
endpoints=Endpoints}) ->
124-
_ = start_workers(Pool, StatsHandler, Encoding, Endpoints),
146+
stats_handler=StatsHandler,
147+
encoding=Encoding,
148+
endpoints=Endpoints}) ->
149+
150+
start_workers(Pool, StatsHandler, Encoding, Endpoints),
125151
{next_state, connected, Data};
126152
idle({call, From}, is_ready, _Data) ->
127153
{keep_state_and_data, [{reply, From, false}]};
@@ -170,9 +196,16 @@ insert_stream_interceptor(Name, _Type, Interceptors) ->
170196

171197
start_workers(Pool, StatsHandler, Encoding, Endpoints) ->
172198
[begin
173-
gproc_pool:add_worker(Pool, Endpoint),
174-
{ok, Pid} = grpcbox_subchannel:start_link(Endpoint, Pool, {Transport, Host, Port, SSLOptions},
175-
Encoding, StatsHandler),
176-
Pid
177-
end || Endpoint={Transport, Host, Port, SSLOptions} <- Endpoints].
199+
gproc_pool:add_worker(Pool, Endpoint),
200+
{ok, Pid} = grpcbox_subchannel:start_link(Endpoint,
201+
Pool, Endpoint, Encoding, StatsHandler),
202+
Pid
203+
end || Endpoint <- Endpoints].
178204

205+
stop_workers(Pool, Endpoints, Reason) ->
206+
[begin
207+
case gproc_pool:whereis_worker(Pool, Endpoint) of
208+
undefined -> ok;
209+
Pid -> grpcbox_subchannel:stop(Pid, Reason)
210+
end
211+
end || Endpoint <- Endpoints].

test/grpcbox_channel_SUITE.erl

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
-module(grpcbox_channel_SUITE).
2+
3+
-export([all/0,
4+
init_per_suite/1,
5+
end_per_suite/1,
6+
add_and_remove_endpoints/1,
7+
pick_worker_strategy/1]).
8+
9+
-include_lib("eunit/include/eunit.hrl").
10+
11+
all() ->
12+
[
13+
add_and_remove_endpoints
14+
].
15+
init_per_suite(_Config) ->
16+
application:set_env(grpcbox, servers, []),
17+
application:ensure_all_started(grpcbox),
18+
grpcbox_channel_sup:start_link(),
19+
grpcbox_channel_sup:start_child(default_channel, [{https, "127.0.0.1", 8080, #{}}], #{}),
20+
grpcbox_channel_sup:start_child(random_channel,
21+
[{https, "127.0.0.1", 8080, #{}}, {https, "127.0.0.2", 8080, #{}}, {https, "127.0.0.3", 8080, #{}}, {https, "127.0.0.4", 8080, #{}}],
22+
#{balancer => random}),
23+
grpcbox_channel_sup:start_child(hash_channel,
24+
[{https, "127.0.0.1", 8080, #{}}, {https, "127.0.0.2", 8080, #{}}, {https, "127.0.0.3", 8080, #{}}, {https, "127.0.0.4", 8080, #{}}],
25+
#{balancer => hash}),
26+
grpcbox_channel_sup:start_child(direct_channel,
27+
[{https, "127.0.0.1", 8080, #{}}, {https, "127.0.0.2", 8080, #{}}, {https, "127.0.0.3", 8080, #{}}, {https, "127.0.0.4", 8080, #{}}],
28+
#{ balancer => direct}),
29+
30+
_Config.
31+
32+
end_per_suite(_Config) ->
33+
application:stop(grpcbox),
34+
ok.
35+
36+
add_and_remove_endpoints(_Config) ->
37+
grpcbox_channel:add_endpoints(default_channel, [{https, "127.0.0.2", 8080, #{}}, {https, "127.0.0.3", 8080, #{}}, {https, "127.0.0.4", 8080, #{}}]),
38+
?assertMatch(4, length(gproc_pool:active_workers(default_channel))),
39+
grpcbox_channel:remove_endpoints(default_channel, [{https, "127.0.0.1", 8080, #{}}, {https, "127.0.0.2", 8080, #{}}, {https, "127.0.0.4", 8080, #{}}], normal),
40+
?assertMatch(1, length(gproc_pool:active_workers(default_channel))).

0 commit comments

Comments
 (0)