Skip to content

Commit 457924e

Browse files
committed
grpcbox_channel can add and remove dynamic endpoints
1 parent ebc25bd commit 457924e

File tree

2 files changed

+82
-9
lines changed

2 files changed

+82
-9
lines changed

src/grpcbox_channel.erl

Lines changed: 42 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@
66
is_ready/1,
77
pick/2,
88
pick/3,
9+
add_endpoints/2,
10+
remove_endpoints/3,
911
stop/1,
1012
stop/2]).
1113
-export([init/1,
@@ -83,6 +85,12 @@ pick_worker(Name, undefined) ->
8385
pick_worker(Name, Key) ->
8486
gproc_pool:pick_worker(Name, Key).
8587

88+
add_endpoints(Name, Endpoints) ->
89+
gen_statem:call(?CHANNEL(Name), {add_endpoints, Endpoints}).
90+
91+
remove_endpoints(Name, Endpoints, Reason) ->
92+
gen_statem:call(?CHANNEL(Name), {remove_endpoints, Endpoints, Reason}).
93+
8694
-spec interceptor(name(), unary | stream) -> grpcbox_client:interceptor() | undefined.
8795
interceptor(Name, CallType) ->
8896
case ets:lookup(?CHANNELS_TAB, {Name, CallType}) of
@@ -112,14 +120,13 @@ init([Name, Endpoints, Options]) ->
112120
pool = Name,
113121
encoding = Encoding,
114122
stats_handler = StatsHandler,
115-
endpoints = Endpoints
123+
endpoints = lists:usort(Endpoints)
116124
},
117-
118125
case maps:get(sync_start, Options, false) of
119126
false ->
120127
{ok, idle, Data, [{next_event, internal, connect}]};
121128
true ->
122-
_ = start_workers(Name, StatsHandler, Encoding, Endpoints),
129+
start_workers(Name, StatsHandler, Encoding, Endpoints),
123130
{ok, connected, Data}
124131
end.
125132

@@ -128,14 +135,32 @@ callback_mode() ->
128135

129136
connected({call, From}, is_ready, _Data) ->
130137
{keep_state_and_data, [{reply, From, true}]};
138+
connected({call, From}, {add_endpoints, Endpoints},
139+
Data=#data{pool=Pool,
140+
stats_handler=StatsHandler,
141+
encoding=Encoding,
142+
endpoints=TotalEndpoints}) ->
143+
NewEndpoints = lists:subtract(Endpoints, TotalEndpoints),
144+
NewTotalEndpoints = lists:umerge(TotalEndpoints, Endpoints),
145+
start_workers(Pool, StatsHandler, Encoding, NewEndpoints),
146+
{keep_state, Data#data{endpoints=NewTotalEndpoints}, [{reply, From, ok}]};
147+
connected({call, From}, {remove_endpoints, Endpoints, Reason},
148+
Data=#data{pool=Pool, endpoints=TotalEndpoints}) ->
149+
150+
NewEndpoints = sets:to_list(sets:intersection(sets:from_list(Endpoints),
151+
sets:from_list(TotalEndpoints))),
152+
NewTotalEndpoints = lists:subtract(TotalEndpoints, Endpoints),
153+
stop_workers(Pool, NewEndpoints, Reason),
154+
{keep_state, Data#data{endpoints = NewTotalEndpoints}, [{reply, From, ok}]};
131155
connected(EventType, EventContent, Data) ->
132156
handle_event(EventType, EventContent, Data).
133157

134158
idle(internal, connect, Data=#data{pool=Pool,
135159
stats_handler=StatsHandler,
136160
encoding=Encoding,
137161
endpoints=Endpoints}) ->
138-
_ = start_workers(Pool, StatsHandler, Encoding, Endpoints),
162+
163+
start_workers(Pool, StatsHandler, Encoding, Endpoints),
139164
{next_state, connected, Data};
140165
idle({call, From}, is_ready, _Data) ->
141166
{keep_state_and_data, [{reply, From, false}]};
@@ -184,8 +209,16 @@ insert_stream_interceptor(Name, _Type, Interceptors) ->
184209

185210
start_workers(Pool, StatsHandler, Encoding, Endpoints) ->
186211
[begin
187-
gproc_pool:add_worker(Pool, Endpoint),
188-
{ok, Pid} = grpcbox_subchannel:start_link(Endpoint, Pool, {Transport, Host, Port, SSLOptions},
189-
Encoding, StatsHandler),
190-
Pid
191-
end || Endpoint={Transport, Host, Port, SSLOptions} <- Endpoints].
212+
gproc_pool:add_worker(Pool, Endpoint),
213+
{ok, Pid} = grpcbox_subchannel:start_link(Endpoint,
214+
Pool, Endpoint, Encoding, StatsHandler),
215+
Pid
216+
end || Endpoint <- Endpoints].
217+
218+
stop_workers(Pool, Endpoints, Reason) ->
219+
[begin
220+
case gproc_pool:whereis_worker(Pool, Endpoint) of
221+
undefined -> ok;
222+
Pid -> grpcbox_subchannel:stop(Pid, Reason)
223+
end
224+
end || Endpoint <- Endpoints].

test/grpcbox_channel_SUITE.erl

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
-module(grpcbox_channel_SUITE).
2+
3+
-export([all/0,
4+
init_per_suite/1,
5+
end_per_suite/1,
6+
add_and_remove_endpoints/1,
7+
pick_worker_strategy/1]).
8+
9+
-include_lib("eunit/include/eunit.hrl").
10+
11+
all() ->
12+
[
13+
add_and_remove_endpoints
14+
].
15+
init_per_suite(_Config) ->
16+
application:set_env(grpcbox, servers, []),
17+
application:ensure_all_started(grpcbox),
18+
grpcbox_channel_sup:start_link(),
19+
grpcbox_channel_sup:start_child(default_channel, [{https, "127.0.0.1", 8080, #{}}], #{}),
20+
grpcbox_channel_sup:start_child(random_channel,
21+
[{https, "127.0.0.1", 8080, #{}}, {https, "127.0.0.2", 8080, #{}}, {https, "127.0.0.3", 8080, #{}}, {https, "127.0.0.4", 8080, #{}}],
22+
#{balancer => random}),
23+
grpcbox_channel_sup:start_child(hash_channel,
24+
[{https, "127.0.0.1", 8080, #{}}, {https, "127.0.0.2", 8080, #{}}, {https, "127.0.0.3", 8080, #{}}, {https, "127.0.0.4", 8080, #{}}],
25+
#{balancer => hash}),
26+
grpcbox_channel_sup:start_child(direct_channel,
27+
[{https, "127.0.0.1", 8080, #{}}, {https, "127.0.0.2", 8080, #{}}, {https, "127.0.0.3", 8080, #{}}, {https, "127.0.0.4", 8080, #{}}],
28+
#{ balancer => direct}),
29+
30+
_Config.
31+
32+
end_per_suite(_Config) ->
33+
application:stop(grpcbox),
34+
ok.
35+
36+
add_and_remove_endpoints(_Config) ->
37+
grpcbox_channel:add_endpoints(default_channel, [{https, "127.0.0.2", 8080, #{}}, {https, "127.0.0.3", 8080, #{}}, {https, "127.0.0.4", 8080, #{}}]),
38+
?assertMatch(4, length(gproc_pool:active_workers(default_channel))),
39+
grpcbox_channel:remove_endpoints(default_channel, [{https, "127.0.0.1", 8080, #{}}, {https, "127.0.0.2", 8080, #{}}, {https, "127.0.0.4", 8080, #{}}], normal),
40+
?assertMatch(1, length(gproc_pool:active_workers(default_channel))).

0 commit comments

Comments
 (0)