From 7704807844ab575d89e5b142bbc0831197522e44 Mon Sep 17 00:00:00 2001 From: "Zaiming (Stone) Shi" Date: Wed, 6 Dec 2023 10:40:14 +0100 Subject: [PATCH 1/2] fix(ekka_locker): allow stop locker process with pid --- src/ekka_locker.erl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/ekka_locker.erl b/src/ekka_locker.erl index 8684315..4d5ce34 100644 --- a/src/ekka_locker.erl +++ b/src/ekka_locker.erl @@ -112,7 +112,7 @@ start_link(Name, LeaseTime) -> stop() -> stop(?SERVER). --spec(stop(atom()) -> ok). +-spec(stop(pid() | atom()) -> ok). stop(Name) -> gen_server:call(Name, stop). From c1c4656c5a541035ecb3f61f6ed13b98553bcd7a Mon Sep 17 00:00:00 2001 From: "Zaiming (Stone) Shi" Date: Wed, 6 Dec 2023 10:22:58 +0100 Subject: [PATCH 2/2] refactor: delete mcast clustering strategy --- README.md | 49 ------- data/app.mcast.config | 10 -- etc/ekka.config.example | 9 -- scripts/cluster.mcast.sh | 4 - src/ekka_cluster_mcast.erl | 231 ------------------------------ src/ekka_cluster_sup.erl | 9 -- test/ekka_autocluster_SUITE.erl | 42 ------ test/ekka_cluster_mcast_SUITE.erl | 83 ----------- 8 files changed, 437 deletions(-) delete mode 100644 data/app.mcast.config delete mode 100644 scripts/cluster.mcast.sh delete mode 100644 src/ekka_cluster_mcast.erl delete mode 100644 test/ekka_cluster_mcast_SUITE.erl diff --git a/README.md b/README.md index b57a1e6..cd7979c 100644 --- a/README.md +++ b/README.md @@ -23,7 +23,6 @@ Strategy | Description -----------|-------------------------------------- manual | Join cluster manually static | Static node list -mcast | IP Multicast dns | DNS A Records etcd | etcd k8s | Kubernetes @@ -49,54 +48,6 @@ Erlang config: ]}}, ``` -### Cluster using IP Multicast - -Cuttlefish style config: - -``` -cluster.discovery = mcast - -## IP Multicast Address. -## -## Value: IP Address -cluster.mcast.addr = 239.192.0.1 - -## Multicast Ports. -## -## Value: Port List -cluster.mcast.ports = 4369,4370 - -## Multicast Iface. -## -## Value: Iface Address -## -## Default: 0.0.0.0 -cluster.mcast.iface = 0.0.0.0 - -## Multicast Ttl. -## -## Value: 0-255 -cluster.mcast.ttl = 255 - -## Multicast loop. -## -## Value: on | off -cluster.mcast.loop = on -``` - -Erlang config: - -``` -{cluster_discovery, - {mcast, [ - {addr, {239,192,0,1}}, - {ports, [4369,4370]}, - {iface, {0,0,0,0}}, - {ttl, 255}, - {loop, true} - ]}}, -``` - ### Cluster using DNS A records Cuttlefish style config: diff --git a/data/app.mcast.config b/data/app.mcast.config deleted file mode 100644 index a97e846..0000000 --- a/data/app.mcast.config +++ /dev/null @@ -1,10 +0,0 @@ -[{ekka, [ - {cluster_autoheal,true}, - {cluster_name,ekka}, - {cluster_discovery, {mcast, [ - {addr,{239,192,0,1}}, - {ports,[4369,4370,4371,4372,4373]}, - {iface,{0,0,0,0}}, - {ttl, 255}, - {loop,true}]}} -]}]. diff --git a/etc/ekka.config.example b/etc/ekka.config.example index e4991b1..979778c 100644 --- a/etc/ekka.config.example +++ b/etc/ekka.config.example @@ -8,15 +8,6 @@ %% {seeds, ['ekka1@127.0.0.1', 'ekka2@127.0.0.1']} %% ]}}, -%% Clustering via IP Multicast -%% {cluster_discovery, {mcast, [ -%% {addr, {239,192,0,1}}, -%% {ports, [4369,4370]}, -%% {iface, {0,0,0,0}}, -%% {ttl, 255}, -%% {loop, true} -%% ]}}, - %% Clustering via DNS A Record %% {cluster_discovery, {dns, [ %% {name, "localhost"}, diff --git a/scripts/cluster.mcast.sh b/scripts/cluster.mcast.sh deleted file mode 100644 index 344cb0b..0000000 --- a/scripts/cluster.mcast.sh +++ /dev/null @@ -1,4 +0,0 @@ - -erl -pa ebin/ -pa deps/*/ebin -name ekka1@127.0.0.1 -setcookie cookie -config data/app.mcast -s ekka -s ekka autocluster - -erl -pa ebin/ -pa deps/*/ebin -name ekka2@127.0.0.1 -setcookie cookie -config data/app.mcast -s ekka -s ekka autocluster diff --git a/src/ekka_cluster_mcast.erl b/src/ekka_cluster_mcast.erl deleted file mode 100644 index 690e0e5..0000000 --- a/src/ekka_cluster_mcast.erl +++ /dev/null @@ -1,231 +0,0 @@ -%%-------------------------------------------------------------------- -%% Copyright (c) 2019 EMQ Technologies Co., Ltd. All Rights Reserved. -%% -%% Licensed under the Apache License, Version 2.0 (the "License"); -%% you may not use this file except in compliance with the License. -%% You may obtain a copy of the License at -%% -%% http://www.apache.org/licenses/LICENSE-2.0 -%% -%% Unless required by applicable law or agreed to in writing, software -%% distributed under the License is distributed on an "AS IS" BASIS, -%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -%% See the License for the specific language governing permissions and -%% limitations under the License. -%%-------------------------------------------------------------------- - --module(ekka_cluster_mcast). - --behaviour(gen_server). - --behaviour(ekka_cluster_strategy). - -%% Cluster strategy callbacks --export([ discover/1 - , lock/1 - , unlock/1 - , register/1 - , unregister/1 - ]). - --export([ ensure_started/1 - , start_link/1 - , stop/0 - ]). - -%% For tests --export([get_sock/0]). - -%% gen_server Callbacks --export([ init/1 - , handle_call/3 - , handle_cast/2 - , handle_info/2 - , terminate/2 - , code_change/3 - ]). - --import(proplists, [get_value/2, get_value/3]). - --type(option() :: {addr, inet:ip_address()} - | {ports, list(inet:port_number())} - | {iface, inet:ip_address()} - | {ttl, pos_integer()} - | {loop, boolean()} - | {senbuf, pos_integer()} - | {recbuf, pos_integer()} - | {buffer, pos_integer()}). - --record(state, { - sock :: inet:socket(), - addr :: inet:ip_address(), - ports :: list(inet:port_number()), - cookie :: integer(), - seen :: list(node()) - }). - --define(SERVER, ?MODULE). - --define(LOG(Level, Format, Args), - logger:Level("Ekka(Mcast): " ++ Format, Args)). - -%%-------------------------------------------------------------------- -%% ekka_cluster_strategy callbacks -%%-------------------------------------------------------------------- - -discover(Options) -> - Server = case whereis(?SERVER) of - Pid when is_pid(Pid) -> Pid; - undefined -> ensure_started(Options) - end, - gen_server:call(Server, discover, 60000). - -lock(_Options) -> - ignore. - -unlock(_Options) -> - ignore. - -register(_Options) -> - ignore. - -unregister(_Options) -> - ignore. - -%%-------------------------------------------------------------------- -%% Start/stop mcast server -%%-------------------------------------------------------------------- - -ensure_started(Options) -> - case ekka_cluster_sup:start_child(?SERVER, [Options]) of - {ok, Pid} -> Pid; - {error, {already_started, Pid}} -> Pid - end. - --spec(start_link(list(option())) -> {ok, pid()} | {error, term()}). -start_link(Options) -> - gen_server:start_link({local, ?SERVER}, ?MODULE, Options, []). - --spec(stop() -> ok). -stop() -> gen_server:stop(?SERVER). - --spec(get_sock() -> inet:socket()). -get_sock() -> - gen_server:call(?SERVER, get_sock). - -%%-------------------------------------------------------------------- -%% gen_server callbacks -%%-------------------------------------------------------------------- - -init(Options) -> - Addr = get_value(addr, Options), - Ports = get_value(ports, Options), - Loop = get_value(loop, Options, true), - TTL = get_value(ttl, Options, 1), - Iface = get_value(iface, Options, {0,0,0,0}), - case udp_open(Ports, [{multicast_if, Iface}, - {multicast_ttl, TTL}, - {multicast_loop, Loop}, - {add_membership, {Addr, Iface}}]) of - {ok, Sock} -> - Cookie = erlang:phash2(erlang:get_cookie()), - {ok, #state{sock = Sock, addr = Addr, ports = Ports, - cookie = Cookie, seen = []}}; - {error, Error} -> {stop, Error} - end. - -handle_call(discover, From, State = #state{sock = Sock, addr = Addr, - ports = Ports, cookie = Cookie}) -> - lists:foreach(fun(Port) -> - udp_send(Sock, Addr, Port, ping(Cookie)) - end, Ports), - erlang:send_after(3000, self(), {reply, discover, From}), - {noreply, State}; - -handle_call(get_sock, _From, State = #state{sock = Sock}) -> - {reply, Sock, State}; - -handle_call(Req, _From, State) -> - ?LOG(error, "Unexpected call: ~p", [Req]), - {reply, ignore, State}. - -handle_cast(Msg, State) -> - ?LOG(error, "Unexpected cast: ~p", [Msg]), - {noreply, State}. - -handle_info({reply, discover, From}, State = #state{seen = Seen}) -> - gen_server:reply(From, {ok, [node() | Seen]}), - {noreply, State#state{seen = []}, hibernate}; - -handle_info({udp, Sock, _Ip, InPort, Data}, - State = #state{sock = Sock, addr = Addr, - cookie = Cookie, seen = Seen}) -> - %%io:format("~s recv handshake from ~p: ~p~n", - %% [node(), {Ip, InPort}, binary_to_term(Data)]), - Cluster = ekka:env(cluster_name, ekka), - {noreply, try binary_to_term(Data) of - {ping, Node, _Cluster, _Cookie} when Node =:= node() -> - State; - {ping, Node, Cluster, Cookie} -> - udp_send(Sock, Addr, InPort, pong(Cookie)), - State#state{seen = lists:usort([Node | Seen])}; - {pong, Node, _Cluster, _Cookie} when Node =:= node() -> - State; - {pong, Node, Cluster, Cookie} -> - State#state{seen = lists:usort([Node | Seen])}; - Handshake = {_Type, _Node, _Cluster, _Cookie} -> - ?LOG(error, "Bad handshake: ~p", [Handshake]), - State; - Term -> ?LOG(error, "Bad term: ~p", [Term]), - State - catch - error:badarg -> - ?LOG(error, "Corrupt data: ~p", [Data]), - State - end, hibernate}; - -handle_info({udp_passive, Sock}, State = #state{sock = Sock}) -> - inet:setopts(Sock, [{active, 10}]), - {noreply, State}; - -handle_info({udp_closed, Sock}, State = #state{sock = Sock}) -> - {stop, udp_closed, State}; - -handle_info(Info, State) -> - ?LOG(error, "Unexpected info: ~p", [Info]), - {noreply, State}. - -terminate(_Reason, #state{sock = Sock}) -> - gen_udp:close(Sock). - -code_change(_OldVsn, State, _Extra) -> - {ok, State}. - -%%-------------------------------------------------------------------- -%% Internal functions -%%-------------------------------------------------------------------- - -ping(Cookie) -> handshake(ping, Cookie). - -pong(Cookie) -> handshake(pong, Cookie). - -handshake(Type, Cookie) -> - {Type, node(), ekka:env(cluster_name, undefined), Cookie}. - -udp_open([], _Options) -> - {error, eaddrinuse}; - -udp_open([Port|Ports], Options) -> - case gen_udp:open(Port, [binary, {active, 10}, {reuseaddr, true} | Options]) of - {ok, Sock} -> - {ok, Sock}; - {error, eaddrinuse} -> - ?LOG(warning, "Multicast Adddress inuse: ~p", [Port]), - udp_open(Ports, Options); - {error, Reason} -> - {error, Reason} - end. - -udp_send(Sock, Addr, Port, Term) -> - gen_udp:send(Sock, Addr, Port, term_to_binary(Term)). - diff --git a/src/ekka_cluster_sup.erl b/src/ekka_cluster_sup.erl index 91eec73..a9891af 100644 --- a/src/ekka_cluster_sup.erl +++ b/src/ekka_cluster_sup.erl @@ -53,15 +53,6 @@ stop_child(M) -> init([]) -> Childs = case ekka:env(cluster_discovery) of - {ok, {mcast, Options}} -> - Mcast = #{id => ekka_cluster_mcast, - start => {ekka_cluster_mcast, start_link, [Options]}, - restart => permanent, - shutdown => 5000, - type => worker, - modules => [ekka_cluster_mcast] - }, - [Mcast]; {ok, {etcd, Options}} -> case proplists:get_value(version, Options, v3) of v3 -> diff --git a/test/ekka_autocluster_SUITE.erl b/test/ekka_autocluster_SUITE.erl index 623b9a7..2cd95f5 100644 --- a/test/ekka_autocluster_SUITE.erl +++ b/test/ekka_autocluster_SUITE.erl @@ -41,13 +41,6 @@ {suffix, ""} ]). --define(MCAST_OPTIONS, [{addr, {239,192,0,1}}, - {ports, [5000,5001,5002]}, - {iface, {0,0,0,0}}, - {ttl, 255}, - {loop, true} - ]). - all() -> ekka_ct:all(?MODULE). %%-------------------------------------------------------------------- @@ -318,41 +311,6 @@ t_autocluster_retry_when_missing_nodes(Config) -> assert_cluster_nodes_equal(AllNodes, ThisNode), ok. -%%-------------------------------------------------------------------- -%% Autocluster via 'mcast' strategy - -t_autocluster_via_mcast(_Config) -> - ok = reboot_ekka_with_mcast_env(), - ok = timer:sleep(1000), - N1 = ekka_ct:start_slave(ekka, n1), - try - ok = ekka_ct:wait_running(N1), - ok = set_app_env(N1, {mcast, ?MCAST_OPTIONS}), - rpc:call(N1, ekka, autocluster, []), - ok = wait_for_node(N1), - ?assertMatch([_|_], rpc:call(N1, ekka_autocluster, core_node_discovery_callback, [])), - ok = ekka:force_leave(N1) - after - ok = ekka_ct:stop_slave(N1) - end. - -reboot_ekka_with_mcast_env() -> - ok = ekka:stop(), - ok = set_app_env(node(), {mcast, ?MCAST_OPTIONS}), - ok = ekka:start(). - -t_autocluster_mcast_lock_failure(_Config) -> - ok = reboot_ekka_with_mcast_env(), - ok = timer:sleep(1000), - N1 = ekka_ct:start_slave(ekka, n1), - try - ok = ekka_ct:wait_running(N1), - ok = set_app_env(N1, {mcast, ?MCAST_OPTIONS}), - assert_unlocked(ekka_cluster_mcast, N1) - after - ok = ekka_ct:stop_slave(N1) - end. - %%-------------------------------------------------------------------- %% Core node discovery callback diff --git a/test/ekka_cluster_mcast_SUITE.erl b/test/ekka_cluster_mcast_SUITE.erl deleted file mode 100644 index 0d8f82e..0000000 --- a/test/ekka_cluster_mcast_SUITE.erl +++ /dev/null @@ -1,83 +0,0 @@ -%%-------------------------------------------------------------------- -%% Copyright (c) 2019, 2022 EMQ Technologies Co., Ltd. All Rights Reserved. -%% -%% Licensed under the Apache License, Version 2.0 (the "License"); -%% you may not use this file except in compliance with the License. -%% You may obtain a copy of the License at -%% -%% http://www.apache.org/licenses/LICENSE-2.0 -%% -%% Unless required by applicable law or agreed to in writing, software -%% distributed under the License is distributed on an "AS IS" BASIS, -%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -%% See the License for the specific language governing permissions and -%% limitations under the License. -%%-------------------------------------------------------------------- - --module(ekka_cluster_mcast_SUITE). - --compile(export_all). --compile(nowarn_export_all). - --include_lib("eunit/include/eunit.hrl"). - --define(OPTIONS, [{addr, {239,192,0,1}}, - {ports, [4369,4370]}, - {iface, {0,0,0,0}}, - {ttl, 255}, - {loop, true} - ]). - -all() -> ekka_ct:all(?MODULE). - -%%-------------------------------------------------------------------- -%% CT callbacks -%%-------------------------------------------------------------------- - -init_per_testcase(t_discover, Config) -> - ok = meck:new(gen_udp, [unstick, non_strict, passthrough, no_history]), - ok = meck:expect(gen_udp, send, fun(_, _, _, _) -> ok end), - ok = meck:new(ekka_cluster_sup, [non_strict, passthrough, no_history]), - ok = meck:expect(ekka_cluster_sup, start_child, - fun(_, _) -> - ekka_cluster_mcast:start_link(?OPTIONS) - end), - Config; -init_per_testcase(_TestCase, Config) -> - Config. - -end_per_testcase(t_discover, Config) -> - ok = meck:unload(gen_udp), - ok = meck:unload(ekka_cluster_sup), - Config; -end_per_testcase(_TestCase, Config) -> - Config. - -%%-------------------------------------------------------------------- -%% Test cases -%%-------------------------------------------------------------------- - -t_discover(_) -> - %% Simulate a UDP packet. - Cookie = erlang:phash2(erlang:get_cookie()), - Pong = {pong, 'node1@192.168.10.10', ekka, Cookie}, - Pid = ekka_cluster_mcast:ensure_started(?OPTIONS), - ?assert(is_process_alive(Pid)), - Sock = ekka_cluster_mcast:get_sock(), - Datagram = {udp, Sock, {127,0,0,1}, 5000, term_to_binary(Pong)}, - Pid ! Datagram, - Node = node(), - {ok, [Node, 'node1@192.168.10.10']} = ekka_cluster_strategy:discover(ekka_cluster_mcast, ?OPTIONS), - ok = ekka_cluster_mcast:stop(). - -t_lock(_) -> - ignore = ekka_cluster_strategy:lock(ekka_cluster_mcast, []). - -t_unlock(_) -> - ignore = ekka_cluster_strategy:unlock(ekka_cluster_mcast, []). - -t_register(_) -> - ignore = ekka_cluster_strategy:register(ekka_cluster_mcast, []). - -t_unregister(_) -> - ignore = ekka_cluster_strategy:unregister(ekka_cluster_mcast, []).