diff --git a/apps/ex_wire/config/dev.exs b/apps/ex_wire/config/dev.exs index 660027702..8de81d009 100644 --- a/apps/ex_wire/config/dev.exs +++ b/apps/ex_wire/config/dev.exs @@ -1,7 +1,6 @@ use Mix.Config config :ex_wire, - network_adapter: {ExWire.Adapter.UDP, NetworkClient}, sync: false, discovery: false, private_key: diff --git a/apps/ex_wire/config/test.exs b/apps/ex_wire/config/test.exs index ee5f55f71..f47698b89 100644 --- a/apps/ex_wire/config/test.exs +++ b/apps/ex_wire/config/test.exs @@ -10,6 +10,7 @@ config :ex_wire, port: 30_399 ], sync_mock: ExWire.BridgeSyncMock, + kademlia_mock: ExWire.FakeKademlia, discovery: false, sync: false, chain: :ropsten diff --git a/apps/ex_wire/lib/ex_wire.ex b/apps/ex_wire/lib/ex_wire.ex index 9e5517b49..1bf437af0 100644 --- a/apps/ex_wire/lib/ex_wire.ex +++ b/apps/ex_wire/lib/ex_wire.ex @@ -10,6 +10,7 @@ defmodule ExWire do @type node_id :: binary() alias ExWire.Config + alias ExWire.ConnectionObserver alias ExWire.NodeDiscoverySupervisor alias ExWire.PeerSupervisor alias ExWire.Sync @@ -27,7 +28,7 @@ defmodule ExWire do @spec get_children(Keyword.t()) :: list(Supervisor.child_spec()) defp get_children(_params) do - chain = ExWire.Config.chain() + chain = Config.chain() perform_discovery = Config.perform_discovery?() warp = Config.warp?() @@ -62,7 +63,7 @@ defmodule ExWire do warp_processors ++ [ # Peer supervisor maintains a pool of outbound peers - child_spec({PeerSupervisor, start_nodes}, []), + {PeerSupervisor, [start_nodes: start_nodes, connection_observer: ConnectionObserver]}, # Sync coordinates asking peers for new blocks child_spec({Sync, {trie, chain, warp, warp_queue}}, []) @@ -74,14 +75,23 @@ defmodule ExWire do node_discovery = if perform_discovery do # Discovery tries to find new peers - [child_spec({NodeDiscoverySupervisor, []}, [])] + [ + child_spec( + {NodeDiscoverySupervisor, [connection_observer: ConnectionObserver]}, + [] + ) + ] else [] end # Listener accepts and hands off new inbound TCP connections - tcp_listening = [child_spec({TCPListeningSupervisor, :ok}, [])] + tcp_listening = [ + child_spec({TCPListeningSupervisor, [connection_observer: ConnectionObserver]}, []) + ] + + observerer = [child_spec({ConnectionObserver, :ok}, [])] - sync_children ++ node_discovery ++ tcp_listening + List.flatten([observerer, sync_children, node_discovery, tcp_listening]) end end diff --git a/apps/ex_wire/lib/ex_wire/connection_observer.ex b/apps/ex_wire/lib/ex_wire/connection_observer.ex new file mode 100644 index 000000000..d2112408d --- /dev/null +++ b/apps/ex_wire/lib/ex_wire/connection_observer.ex @@ -0,0 +1,79 @@ +defmodule ExWire.ConnectionObserver do + @moduledoc """ + Observing the inbound and outbound connections and gets notified if a connection gets dropped. + It also stores all Peers (both inbound and outbound) for further use. + """ + use GenServer + alias ExWire.Config + alias ExWire.Kademlia + alias ExWire.P2P.Connection + alias ExWire.PeerSupervisor + alias ExWire.Struct.Peer + require Logger + + @type t :: %__MODULE__{ + outbound_peers: MapSet.t(Peer.t()), + inbound_peers: MapSet.t(Peer.t()) + } + defstruct outbound_peers: MapSet.new(), inbound_peers: MapSet.new() + + @kademlia Application.get_env(:ex_wire, :kademlia_mock, Kademlia) + + def notify(:discovery_round) do + GenServer.cast(__MODULE__, :kademlia_discovery_round) + end + + @doc """ + Starts the observer process. + """ + @spec start_link(:ok) :: GenServer.on_start() + def start_link(:ok) do + GenServer.start_link(__MODULE__, %{}, name: __MODULE__) + end + + def init(_) do + Process.flag(:trap_exit, true) + {:ok, %__MODULE__{}} + end + + # getting exits of connections from us to them + def handle_info( + {:EXIT, _pid, {_, %Connection{is_outbound: true, peer: peer}}}, + state + ) do + :ok = start_new_outbound_connections() + {:noreply, %{state | outbound_peers: MapSet.put(state.outbound_peers, peer)}} + end + + # getting exits of connections from them to us + def handle_info( + {:EXIT, _pid, {_, %Connection{is_outbound: false, peer: peer}}}, + state + ) do + {:noreply, %{state | inbound_peers: MapSet.put(state.inbound_peers, peer)}} + end + + # Kademlia server process notifies us of their discovery round. + # We start new connections to peers from the discovery results. + def handle_cast(:kademlia_discovery_round, state) do + :ok = start_new_outbound_connections() + + {:noreply, state} + end + + @spec start_new_outbound_connections() :: :ok + defp start_new_outbound_connections() do + this_round_nodes = @kademlia.get_peers() + + _ = + if Config.perform_sync?() do + for node <- this_round_nodes do + node + |> Peer.from_node() + |> PeerSupervisor.new_peer(__MODULE__) + end + end + + :ok + end +end diff --git a/apps/ex_wire/lib/ex_wire/kademlia.ex b/apps/ex_wire/lib/ex_wire/kademlia.ex index 4e46be0e1..05194afef 100644 --- a/apps/ex_wire/lib/ex_wire/kademlia.ex +++ b/apps/ex_wire/lib/ex_wire/kademlia.ex @@ -67,4 +67,12 @@ defmodule ExWire.Kademlia do def handle_neighbours(server \\ Server.name(), neighbours) do GenServer.cast(server, {:handle_neighbours, neighbours}) end + + @doc """ + Gets new peers from Kademlia that we can connect to + """ + @spec get_peers(GenServer.server()) :: :ok + def get_peers(server \\ Server.name()) do + GenServer.call(server, :get_peers) + end end diff --git a/apps/ex_wire/lib/ex_wire/kademlia/discovery.ex b/apps/ex_wire/lib/ex_wire/kademlia/discovery.ex index 26b568b68..8bd6bf78d 100644 --- a/apps/ex_wire/lib/ex_wire/kademlia/discovery.ex +++ b/apps/ex_wire/lib/ex_wire/kademlia/discovery.ex @@ -2,10 +2,9 @@ defmodule ExWire.Kademlia.Discovery do @moduledoc """ Module that handles node discovery logic. """ - alias ExWire.{Config, Network, PeerSupervisor} + alias ExWire.Network alias ExWire.Kademlia.{Node, RoutingTable} alias ExWire.Message.FindNeighbours - alias ExWire.Struct.Peer @doc """ Starts discovery round. @@ -16,15 +15,6 @@ defmodule ExWire.Kademlia.Discovery do this_round_nodes = RoutingTable.discovery_nodes(table) - _ = - if Config.perform_sync?() do - for node <- this_round_nodes do - node - |> Peer.from_node() - |> PeerSupervisor.new_peer() - end - end - Enum.each(this_round_nodes, fn node -> find_neighbours(table, node) end) diff --git a/apps/ex_wire/lib/ex_wire/kademlia/server.ex b/apps/ex_wire/lib/ex_wire/kademlia/server.ex index 91f3516c8..51e028a88 100644 --- a/apps/ex_wire/lib/ex_wire/kademlia/server.ex +++ b/apps/ex_wire/lib/ex_wire/kademlia/server.ex @@ -7,10 +7,11 @@ defmodule ExWire.Kademlia.Server do alias ExWire.Kademlia.{Discovery, Node, RoutingTable} @type state :: %{ - routing_table: RoutingTable.t(), - ignore_pongs: boolean() + routing_table: RoutingTable.t() | nil, + ignore_pongs: boolean() | nil, + connection_observer: module() | nil } - + defstruct [:routing_table, :ignore_pongs, :connection_observer] @max_discovery_rounds 7 # 5s @@ -29,18 +30,23 @@ defmodule ExWire.Kademlia.Server do network_client_name = Keyword.fetch!(params, :network_client_name) current_node = Keyword.fetch!(params, :current_node) nodes = Keyword.get(params, :nodes, []) + connection_observer = Keyword.get(params, :connection_observer) - GenServer.start_link(__MODULE__, {current_node, network_client_name, nodes}, name: name) + GenServer.start_link( + __MODULE__, + {current_node, network_client_name, nodes, connection_observer}, + name: name + ) end @impl true - def init({current_node = %Node{}, network_client_name, nodes}) do + def init({current_node = %Node{}, network_client_name, nodes, connection_observer}) do routing_table = RoutingTable.new(current_node, network_client_name) _ = schedule_discovery_round(0, nodes) schedule_pongs_cleanup() - {:ok, %{routing_table: routing_table}} + {:ok, %__MODULE__{routing_table: routing_table, connection_observer: connection_observer}} end @impl true @@ -101,12 +107,29 @@ defmodule ExWire.Kademlia.Server do {:reply, neighbours, state} end + def handle_call( + :get_peers, + _from, + state + ) do + round = state.routing_table.discovery_round + + if round > 0 do + {:reply, RoutingTable.discovery_nodes(state.routing_table), state} + else + {:reply, [], state} + end + end + @impl true - def handle_info({:discovery_round, nodes}, state = %{routing_table: routing_table}) do + def handle_info( + {:discovery_round, nodes}, + state = %{routing_table: routing_table, connection_observer: connection_observer} + ) do updated_table = Discovery.start(routing_table, nodes) _ = schedule_discovery_round(updated_table.discovery_round) - + :ok = connection_observer.notify(:discovery_round) {:noreply, %{state | routing_table: updated_table}} end diff --git a/apps/ex_wire/lib/ex_wire/node_discovery_supervisor.ex b/apps/ex_wire/lib/ex_wire/node_discovery_supervisor.ex index b29aa1103..cec16f2f7 100644 --- a/apps/ex_wire/lib/ex_wire/node_discovery_supervisor.ex +++ b/apps/ex_wire/lib/ex_wire/node_discovery_supervisor.ex @@ -7,9 +7,10 @@ defmodule ExWire.NodeDiscoverySupervisor do """ use Supervisor - alias ExWire.{Config, Network} + alias ExWire.Config alias ExWire.Kademlia.Node alias ExWire.Kademlia.Server, as: KademliaServer + alias ExWire.Network alias ExWire.Struct.Endpoint def start_link(params \\ []) do @@ -19,8 +20,8 @@ defmodule ExWire.NodeDiscoverySupervisor do end def init(params) do - {udp_module, udp_process_name} = ExWire.Config.udp_network_adapter(params) - port = ExWire.Config.listen_port(params) + {udp_module, udp_process_name} = Config.udp_network_adapter(params) + port = Config.listen_port(params) bootnodes = params @@ -37,7 +38,8 @@ defmodule ExWire.NodeDiscoverySupervisor do [ current_node: current_node(params), network_client_name: udp_process_name, - nodes: bootnodes + nodes: bootnodes, + connection_observer: Keyword.get(params, :connection_observer) ] ] } diff --git a/apps/ex_wire/lib/ex_wire/p2p/connection.ex b/apps/ex_wire/lib/ex_wire/p2p/connection.ex index a7501c605..60abeb0ca 100644 --- a/apps/ex_wire/lib/ex_wire/p2p/connection.ex +++ b/apps/ex_wire/lib/ex_wire/p2p/connection.ex @@ -21,7 +21,8 @@ defmodule ExWire.P2P.Connection do subscribers: [any()], sent_message_count: integer(), datas: [binary()], - last_error: any() | nil + last_error: any() | nil, + is_outbound: boolean() } defstruct peer: nil, @@ -33,5 +34,6 @@ defmodule ExWire.P2P.Connection do subscribers: [], sent_message_count: 0, datas: [], - last_error: nil + last_error: nil, + is_outbound: false end diff --git a/apps/ex_wire/lib/ex_wire/p2p/server.ex b/apps/ex_wire/lib/ex_wire/p2p/server.ex index 10f9922df..1ac717e3c 100644 --- a/apps/ex_wire/lib/ex_wire/p2p/server.ex +++ b/apps/ex_wire/lib/ex_wire/p2p/server.ex @@ -16,13 +16,15 @@ defmodule ExWire.P2P.Server do require Logger - alias ExWire.{Packet, TCP} - alias ExWire.P2P.{Connection, Manager} + alias ExWire.P2P.Connection + alias ExWire.P2P.Manager + alias ExWire.Packet alias ExWire.Struct.Peer + alias ExWire.TCP @type state :: Connection.t() - @type subscription() :: {module(), atom(), list()} | {:server, pid()} + @type subscription() :: {module(), atom(), list()} | {:server, Process.dest()} @doc """ Child spec definition to be used by a supervisor when wanting to supervise an @@ -46,10 +48,10 @@ defmodule ExWire.P2P.Server do We spawn a temporary child process for each outbound connection. """ - def child_spec({:outbound, peer, subscribers}) do + def child_spec({:outbound, peer, subscribers, connection_observer}) do %{ id: ExWire.P2P.Outbound, - start: {__MODULE__, :start_link, [:outbound, peer, subscribers]}, + start: {__MODULE__, :start_link, [:outbound, peer, subscribers, connection_observer]}, restart: :temporary } end @@ -57,20 +59,22 @@ defmodule ExWire.P2P.Server do @doc """ Starts an outbound or inbound peer to peer connection. """ - @spec start_link(:outbound, Peer.t(), list(subscription())) :: GenServer.on_start() - def start_link(:outbound, peer, subscribers) do + @spec start_link(:outbound, Peer.t(), list(subscription()), module()) :: GenServer.on_start() + def start_link(:outbound, peer, subscribers, connection_observer) do GenServer.start_link(__MODULE__, %{ is_outbound: true, peer: peer, - subscribers: subscribers + subscribers: subscribers, + connection_observer: connection_observer }) end - @spec start_link(:inbound, TCP.socket()) :: GenServer.on_start() - def start_link(:inbound, socket) do + @spec start_link(:inbound, TCP.socket(), module()) :: GenServer.on_start() + def start_link(:inbound, socket, connection_observer) do GenServer.start_link(__MODULE__, %{ is_outbound: false, - socket: socket + socket: socket, + connection_observer: connection_observer }) end @@ -117,18 +121,19 @@ defmodule ExWire.P2P.Server do Initialize by opening up a `gen_tcp` connection to given host and port. """ @spec init(map()) :: {:ok, state()} - def init(opts = %{is_outbound: true, peer: peer}) do + def init(opts = %{is_outbound: true, peer: peer, connection_observer: connection_observer}) do Process.send_after(self(), {:connect, opts}, 0) - - {:ok, %Connection{peer: peer}} + true = link(connection_observer) + {:ok, %Connection{peer: peer, is_outbound: true}} end - def init(opts = %{is_outbound: false, socket: socket}) do + def init(opts = %{is_outbound: false, socket: socket, connection_observer: connection_observer}) do state = socket |> Manager.new_inbound_connection() |> Map.put(:subscribers, Map.get(opts, :subscribers, [])) + true = link(connection_observer) {:ok, state} end @@ -171,9 +176,9 @@ defmodule ExWire.P2P.Server do Function triggered when tcp closes the connection """ def handle_info({:tcp_closed, _socket}, state) do - {:ok, new_state} = handle_socket_close(state) + :ok = handle_socket_close(state) - {:noreply, new_state} + {:stop, :normal, state} end @doc """ @@ -194,6 +199,11 @@ defmodule ExWire.P2P.Server do {:noreply, new_state} end + # links should get reason and state + def terminate(reason, state) do + exit({reason, state}) + end + # Allows a client to subscribe to incoming packets. Subscribers must be in the form # of `{module, function, args}`, in which case we'll call `module.function(packet, ...args)`, # or `{:server, server_pid}` for a GenServer, in which case we'll send a message @@ -219,15 +229,12 @@ defmodule ExWire.P2P.Server do {:ok, new_state} end - @spec handle_socket_close(state()) :: {:ok, state()} + @spec handle_socket_close(state()) :: :ok defp handle_socket_close(state) do peer = Map.get(state, :peer, :unknown) + is_outbound = Map.get(state, :is_outbound) - :ok = Logger.warn(fn -> "[Network] [#{peer}] Peer closed connection" end) - - Process.exit(self(), :normal) - - {:ok, state} + Logger.warn(fn -> "[Network] [#{peer} is_outbound: #{is_outbound}] Peer closed connection" end) end @spec handle_send(Packet.packet(), state()) :: {:ok, state()} @@ -244,4 +251,11 @@ defmodule ExWire.P2P.Server do {:ok, new_state} end + + @spec link(module()) :: true + defp link(connection_observer), + do: + connection_observer + |> Process.whereis() + |> Process.link() end diff --git a/apps/ex_wire/lib/ex_wire/peer_supervisor.ex b/apps/ex_wire/lib/ex_wire/peer_supervisor.ex index c5e661fbd..ae948b431 100644 --- a/apps/ex_wire/lib/ex_wire/peer_supervisor.ex +++ b/apps/ex_wire/lib/ex_wire/peer_supervisor.ex @@ -11,6 +11,7 @@ defmodule ExWire.PeerSupervisor do alias ExWire.P2P.Server alias ExWire.Packet alias ExWire.Struct.Peer + alias ExWire.Sync @type node_selector :: :all | :random | :last @@ -18,9 +19,10 @@ defmodule ExWire.PeerSupervisor do @max_peers 10 - @spec start_link(list(String.t())) :: Supervisor.on_start() - def start_link(nodes) do - DynamicSupervisor.start_link(__MODULE__, nodes, name: @name) + @spec start_link(start_nodes: list(String.t()), connection_observer: module()) :: + Supervisor.on_start() + def start_link([nodes, connection_observer]) do + DynamicSupervisor.start_link(__MODULE__, [nodes, connection_observer], name: @name) end @doc """ @@ -29,8 +31,8 @@ defmodule ExWire.PeerSupervisor do This function should be called when we want connect to a new peer, this may come from start-up or via discovery (e.g. a find neighbors response). """ - @spec new_peer(Peer.t()) :: any() - def new_peer(peer) do + @spec new_peer(Peer.t(), module()) :: any() + def new_peer(peer, connection_observer) do peer_count = connected_peer_count() if peer_count < @max_peers do @@ -39,7 +41,7 @@ defmodule ExWire.PeerSupervisor do "[PeerSup] Connecting to peer #{peer} (#{peer_count} of #{@max_peers} peers)" end) - spec = {Server, {:outbound, peer, [{:server, ExWire.Sync}]}} + spec = {Server, {:outbound, peer, [server: Sync], connection_observer}} {:ok, _pid} = DynamicSupervisor.start_child(@name, spec) else @@ -98,16 +100,13 @@ defmodule ExWire.PeerSupervisor do defp do_find_children(:last) do @name |> DynamicSupervisor.which_children() - |> List.last() - |> List.wrap() + |> generate(:last) end defp do_find_children(:random) do @name |> DynamicSupervisor.which_children() - |> Enum.shuffle() - |> Enum.take(1) - |> List.wrap() + |> generate(:random) end @spec connected_peer_count() :: non_neg_integer() @@ -118,16 +117,20 @@ defmodule ExWire.PeerSupervisor do end @impl true - def init(peer_enode_urls) do + def init(start_nodes: peer_enode_urls, connection_observer: connection_observer) do _ = Task.start_link(fn -> for peer_enode_url <- peer_enode_urls do - {:ok, peer} = ExWire.Struct.Peer.from_uri(peer_enode_url) + {:ok, peer} = Peer.from_uri(peer_enode_url) - new_peer(peer) + new_peer(peer, connection_observer) end end) {:ok, _} = DynamicSupervisor.init(strategy: :one_for_one) end + + defp generate([], _), do: [] + defp generate(children, :random), do: [Enum.random(children)] + defp generate(children, :last), do: [List.last(children)] end diff --git a/apps/ex_wire/lib/ex_wire/tcp/inbound_connections_supervisor.ex b/apps/ex_wire/lib/ex_wire/tcp/inbound_connections_supervisor.ex index 39259d70f..edb406170 100644 --- a/apps/ex_wire/lib/ex_wire/tcp/inbound_connections_supervisor.ex +++ b/apps/ex_wire/lib/ex_wire/tcp/inbound_connections_supervisor.ex @@ -2,16 +2,17 @@ defmodule ExWire.TCP.InboundConnectionsSupervisor do @moduledoc """ Dynamic supervisor in charge of handling inbound tcp connections. - TODO: Why doesn't this just use peer supervisor? """ use DynamicSupervisor + alias ExWire.P2P.Server @doc """ Starts a new supervised process to handle an inbound tcp connection. """ - def new_connection_handler(socket) do - DynamicSupervisor.start_child(__MODULE__, {ExWire.P2P.Server, {:inbound, socket}}) + @spec new_connection_handler(:gen_tcp.socket(), module()) :: DynamicSupervisor.on_start_child() + def new_connection_handler(socket, connection_observer) do + DynamicSupervisor.start_child(__MODULE__, {Server, {:inbound, socket, connection_observer}}) end def start_link(args) do diff --git a/apps/ex_wire/lib/ex_wire/tcp/listener.ex b/apps/ex_wire/lib/ex_wire/tcp/listener.ex index ebaa4734f..b4f12b5f4 100644 --- a/apps/ex_wire/lib/ex_wire/tcp/listener.ex +++ b/apps/ex_wire/lib/ex_wire/tcp/listener.ex @@ -10,22 +10,24 @@ defmodule ExWire.TCP.Listener do alias ExWire.TCP @type state :: %{ - listen_socket: TCP.socket() + listen_socket: TCP.socket(), + connection_observer: module() } def start_link(args) do port = Keyword.fetch!(args, :port) name = Keyword.fetch!(args, :name) + connection_observer = Keyword.fetch!(args, :connection_observer) - GenServer.start_link(__MODULE__, port, name: name) + GenServer.start_link(__MODULE__, [port, connection_observer], name: name) end - def init(port) do + def init([port, connection_observer]) do {:ok, listen_socket} = TCP.listen(port) :ok = accept_tcp_connection() - {:ok, %{listen_socket: listen_socket}} + {:ok, %{listen_socket: listen_socket, connection_observer: connection_observer}} end @doc """ @@ -33,9 +35,14 @@ defmodule ExWire.TCP.Listener do that will henceforth handle that tcp connection. """ @spec handle_cast(atom(), state()) :: {:noreply, state()} - def handle_cast(:accept_tcp_connection, state = %{listen_socket: listen_socket}) do + def handle_cast( + :accept_tcp_connection, + state = %{listen_socket: listen_socket, connection_observer: connection_observer} + ) do {:ok, socket} = TCP.accept_connection(listen_socket) - {:ok, pid} = TCP.InboundConnectionsSupervisor.new_connection_handler(socket) + + {:ok, pid} = + TCP.InboundConnectionsSupervisor.new_connection_handler(socket, connection_observer) :ok = hand_over_control_of_socket(socket, pid) :ok = accept_tcp_connection() diff --git a/apps/ex_wire/lib/ex_wire/tcp_listening_supervisor.ex b/apps/ex_wire/lib/ex_wire/tcp_listening_supervisor.ex index 4861e3c6a..3347fafb8 100644 --- a/apps/ex_wire/lib/ex_wire/tcp_listening_supervisor.ex +++ b/apps/ex_wire/lib/ex_wire/tcp_listening_supervisor.ex @@ -7,22 +7,24 @@ defmodule ExWire.TCPListeningSupervisor do use Supervisor require Logger - alias ExWire.Config + alias ExWire.TCP.InboundConnectionsSupervisor + alias ExWire.TCP.Listener - def start_link(_) do - Supervisor.start_link(__MODULE__, [], name: __MODULE__) + @spec start_link(connection_observer: module()) :: Supervisor.on_start() + def start_link(param = [connection_observer: _connection_observer]) do + Supervisor.start_link(__MODULE__, param, name: __MODULE__) end @impl true - def init(_) do + def init(connection_observer: connection_observer) do :ok = Logger.info(fn -> "Public node URL: #{Config.public_node_url()}" end) port = ExWire.Config.listen_port() children = [ - {ExWire.TCP.InboundConnectionsSupervisor, []}, - {ExWire.TCP.Listener, [port: port, name: ExWire.TCP.Listener]} + {InboundConnectionsSupervisor, []}, + {Listener, [port: port, name: Listener, connection_observer: connection_observer]} ] Supervisor.init(children, strategy: :one_for_one) diff --git a/apps/ex_wire/test/ex_wire/connection_observer_test.exs b/apps/ex_wire/test/ex_wire/connection_observer_test.exs new file mode 100644 index 000000000..917ac7cda --- /dev/null +++ b/apps/ex_wire/test/ex_wire/connection_observer_test.exs @@ -0,0 +1,18 @@ +defmodule ExWire.ConnectionObserverTest do + use ExUnit.Case, async: true + alias ExWire.ConnectionObserver + alias ExWire.FakeKademlia + + test "if we are notified of discovery round messages" do + pid = Process.whereis(ConnectionObserver) + {:ok, kademlia_pid} = FakeKademlia.start_link() + :ok = GenServer.call(kademlia_pid, :setup_get_peers_call) + :erlang.trace(pid, true, [:receive]) + ConnectionObserver.notify(:discovery_round) + assert_receive {:trace, ^pid, :receive, {_, :kademlia_discovery_round}} + + receive do + :get_peers_call -> :ok + end + end +end diff --git a/apps/ex_wire/test/ex_wire/sync_test.exs b/apps/ex_wire/test/ex_wire/sync_test.exs index 70b2ad20d..1e5803fdb 100644 --- a/apps/ex_wire/test/ex_wire/sync_test.exs +++ b/apps/ex_wire/test/ex_wire/sync_test.exs @@ -60,7 +60,8 @@ defmodule ExWire.SyncTest do WarpQueue.new() |> WarpQueue.new_manifest(manifest) - {:ok, _peer_supervisor} = ExWire.PeerSupervisor.start_link([]) + {:ok, _peer_supervisor} = + ExWire.PeerSupervisor.start_link(start_nodes: [], connection_observer: nil) {:ok, _warp_processor} = WarpProcessor.start_link({1, trie, @empty_trie, PowProcessor}, name: :test_warp_processor) diff --git a/apps/ex_wire/lib/ex_wire/tcp_listening_supervisor_test.exs b/apps/ex_wire/test/ex_wire/tcp_listening_supervisor_test.exs similarity index 100% rename from apps/ex_wire/lib/ex_wire/tcp_listening_supervisor_test.exs rename to apps/ex_wire/test/ex_wire/tcp_listening_supervisor_test.exs diff --git a/apps/ex_wire/test/support/ex_wire/fake_kademlia.ex b/apps/ex_wire/test/support/ex_wire/fake_kademlia.ex new file mode 100644 index 000000000..7644c1af3 --- /dev/null +++ b/apps/ex_wire/test/support/ex_wire/fake_kademlia.ex @@ -0,0 +1,23 @@ +defmodule ExWire.FakeKademlia do + @moduledoc """ + Kademlia interface mock. + """ + use GenServer + + def get_peers() do + _ = GenServer.call(__MODULE__, :get_peers_call) + [] + end + + def start_link(), do: GenServer.start_link(__MODULE__, [], name: __MODULE__) + def init(_), do: {:ok, %{}} + + def handle_call(:setup_get_peers_call, {reporter, _ref}, _state) do + {:reply, :ok, %{setup_get_peers_call: reporter}} + end + + def handle_call(:get_peers_call, _, %{setup_get_peers_call: reporter}) do + _ = send(reporter, :get_peers_call) + {:reply, :ok, %{}} + end +end