Skip to content
This repository was archived by the owner on Apr 22, 2024. It is now read-only.
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions src/group/libp2p_group_gossip_server.erl
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,17 @@ handle_cast({add_handler, Key, Handler}, State=#state{handlers=Handlers}) ->
{noreply, State#state{handlers=maps:put(Key, Handler, Handlers)}};
handle_cast({request_target, inbound, WorkerPid, _Ref}, State=#state{}) ->
{noreply, stop_inbound_worker(WorkerPid, State)};
handle_cast({clear_target, _Kind, _WorkerPid, Ref}, State=#state{workers = Workers}) ->
lager:debug("clearing target for worker ~p ", [_WorkerPid]),
%% the ref is stable across restarts, so use that as the lookup key
case lookup_worker(Ref, #worker.ref, State) of
Worker=#worker{} ->
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In theory the workerpid in here should match the one passed in the message?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, Worker.pid() should equal WorkerPid from the message. As per the comment, the ref is used for the lookup as that is consistent across restarts of the worker

NewWorkers = lists:keyreplace(Ref, #worker.ref, Workers,
Worker#worker{target=undefined}),
{noreply, State#state{workers=NewWorkers}};
_ ->
{noreply, State}
end;
handle_cast({request_target, peerbook, WorkerPid, Ref}, State=#state{tid=TID}) ->
LocalAddr = libp2p_swarm:pubkey_bin(TID),
PeerList = case libp2p_swarm:peerbook(TID) of
Expand All @@ -156,6 +167,7 @@ handle_cast({request_target, peerbook, WorkerPid, Ref}, State=#state{tid=TID}) -
WorkerAddrs = [ libp2p_crypto:p2p_to_pubkey_bin(W#worker.target) || W <- State#state.workers, W#worker.target /= undefined, W#worker.kind /= seed ],
try libp2p_peerbook:random(Peerbook, [LocalAddr|WorkerAddrs]) of
{Addr, _} ->
lager:debug("found target ~p, assigning to worker ~p",[Addr, WorkerPid]),
[Addr];
false ->
lager:debug("cannot get target as no peers or already connected to all peers",[]),
Expand Down
5 changes: 5 additions & 0 deletions src/group/libp2p_group_relcast_server.erl
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,11 @@ handle_cast({request_target, Index, WorkerPid, _WorkerRef}, State=#state{tid=TID
{keys, State#state.group_keys}]}},
libp2p_group_worker:assign_target(WorkerPid, {Target, ClientSpec}),
{noreply, NewState};
handle_cast({clear_target, _Kind, _WorkerPid, _Ref}, State=#state{}) ->
%% relcast server's target assignment is fixed, the same group worker will always get the same target
%% as such clear target here is deliberately a noop, unlike gossip server
%% clear_target commands originate from the group workers
{noreply, State};
handle_cast({handle_input, _Msg}, State=#state{close_state=closing}) ->
{noreply, State};
handle_cast({handle_input, _Msg}, State=#state{store=Bad}) when Bad == not_started orelse
Expand Down
6 changes: 5 additions & 1 deletion src/group/libp2p_group_server.erl
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
-module(libp2p_group_server).

-export([request_target/4, send_result/3, send_ready/4]).
-export([request_target/4, clear_target/4, send_result/3, send_ready/4]).

-spec request_target(Server::pid(), term(), Worker::pid(), Ref::reference()) -> ok.
request_target(Pid, Kind, WorkerPid, Ref) ->
gen_server:cast(Pid, {request_target, Kind, WorkerPid, Ref}).

-spec clear_target(Server::pid(), term(), Worker::pid(), Ref::reference()) -> ok.
clear_target(Pid, Kind, WorkerPid, Ref) ->
gen_server:cast(Pid, {clear_target, Kind, WorkerPid, Ref}).

-spec send_result(Server::pid(), Ref::term(), Result::any()) -> ok.
send_result(Pid, Ref, Result) ->
gen_server:cast(Pid, {send_result, Ref, Result}).
Expand Down
3 changes: 2 additions & 1 deletion src/group/libp2p_group_worker.erl
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,8 @@ connecting(info, connect_retry_timeout, Data=#data{tid=TID,
{keep_state, stop_connect_retry_timer(Data#data{connect_pid=Pid})};
true ->
lager:debug("max connect retries exceeded, going back to targeting"),
{next_state, targeting, cancel_connect_retry_timer(Data), ?TRIGGER_TARGETING}
libp2p_group_server:clear_target(Data#data.server, Data#data.kind, self(), Data#data.ref),
{next_state, targeting, cancel_connect_retry_timer(Data#data{target = undefined}), ?TRIGGER_TARGETING}
end;
connecting(EventType, Msg, Data) ->
handle_event(EventType, Msg, Data).
Expand Down