Skip to content

Commit

Permalink
Peer mngmt #639 (#688)
Browse files Browse the repository at this point in the history
* WIP - linking to connections and determening cause

* WIP - testing notification and kademlia route

* fixing test because of new peer supervisor interface

* remove vscode folder ignore

* code style and format

* adding is outbound to log
  • Loading branch information
Ino Murko authored Dec 4, 2018
1 parent cfe6055 commit 8bbacfa
Show file tree
Hide file tree
Showing 18 changed files with 267 additions and 84 deletions.
1 change: 0 additions & 1 deletion apps/ex_wire/config/dev.exs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use Mix.Config

config :ex_wire,
network_adapter: {ExWire.Adapter.UDP, NetworkClient},
sync: false,
discovery: false,
private_key:
Expand Down
1 change: 1 addition & 0 deletions apps/ex_wire/config/test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ config :ex_wire,
port: 30_399
],
sync_mock: ExWire.BridgeSyncMock,
kademlia_mock: ExWire.FakeKademlia,
discovery: false,
sync: false,
chain: :ropsten
20 changes: 15 additions & 5 deletions apps/ex_wire/lib/ex_wire.ex
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ defmodule ExWire do
@type node_id :: binary()

alias ExWire.Config
alias ExWire.ConnectionObserver
alias ExWire.NodeDiscoverySupervisor
alias ExWire.PeerSupervisor
alias ExWire.Sync
Expand All @@ -27,7 +28,7 @@ defmodule ExWire do

@spec get_children(Keyword.t()) :: list(Supervisor.child_spec())
defp get_children(_params) do
chain = ExWire.Config.chain()
chain = Config.chain()

perform_discovery = Config.perform_discovery?()
warp = Config.warp?()
Expand Down Expand Up @@ -62,7 +63,7 @@ defmodule ExWire do
warp_processors ++
[
# Peer supervisor maintains a pool of outbound peers
child_spec({PeerSupervisor, start_nodes}, []),
{PeerSupervisor, [start_nodes: start_nodes, connection_observer: ConnectionObserver]},

# Sync coordinates asking peers for new blocks
child_spec({Sync, {trie, chain, warp, warp_queue}}, [])
Expand All @@ -74,14 +75,23 @@ defmodule ExWire do
node_discovery =
if perform_discovery do
# Discovery tries to find new peers
[child_spec({NodeDiscoverySupervisor, []}, [])]
[
child_spec(
{NodeDiscoverySupervisor, [connection_observer: ConnectionObserver]},
[]
)
]
else
[]
end

# Listener accepts and hands off new inbound TCP connections
tcp_listening = [child_spec({TCPListeningSupervisor, :ok}, [])]
tcp_listening = [
child_spec({TCPListeningSupervisor, [connection_observer: ConnectionObserver]}, [])
]

observerer = [child_spec({ConnectionObserver, :ok}, [])]

sync_children ++ node_discovery ++ tcp_listening
List.flatten([observerer, sync_children, node_discovery, tcp_listening])
end
end
79 changes: 79 additions & 0 deletions apps/ex_wire/lib/ex_wire/connection_observer.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
defmodule ExWire.ConnectionObserver do
@moduledoc """
Observing the inbound and outbound connections and gets notified if a connection gets dropped.
It also stores all Peers (both inbound and outbound) for further use.
"""
use GenServer
alias ExWire.Config
alias ExWire.Kademlia
alias ExWire.P2P.Connection
alias ExWire.PeerSupervisor
alias ExWire.Struct.Peer
require Logger

@type t :: %__MODULE__{
outbound_peers: MapSet.t(Peer.t()),
inbound_peers: MapSet.t(Peer.t())
}
defstruct outbound_peers: MapSet.new(), inbound_peers: MapSet.new()

@kademlia Application.get_env(:ex_wire, :kademlia_mock, Kademlia)

def notify(:discovery_round) do
GenServer.cast(__MODULE__, :kademlia_discovery_round)
end

@doc """
Starts the observer process.
"""
@spec start_link(:ok) :: GenServer.on_start()
def start_link(:ok) do
GenServer.start_link(__MODULE__, %{}, name: __MODULE__)
end

def init(_) do
Process.flag(:trap_exit, true)
{:ok, %__MODULE__{}}
end

# getting exits of connections from us to them
def handle_info(
{:EXIT, _pid, {_, %Connection{is_outbound: true, peer: peer}}},
state
) do
:ok = start_new_outbound_connections()
{:noreply, %{state | outbound_peers: MapSet.put(state.outbound_peers, peer)}}
end

# getting exits of connections from them to us
def handle_info(
{:EXIT, _pid, {_, %Connection{is_outbound: false, peer: peer}}},
state
) do
{:noreply, %{state | inbound_peers: MapSet.put(state.inbound_peers, peer)}}
end

# Kademlia server process notifies us of their discovery round.
# We start new connections to peers from the discovery results.
def handle_cast(:kademlia_discovery_round, state) do
:ok = start_new_outbound_connections()

{:noreply, state}
end

@spec start_new_outbound_connections() :: :ok
defp start_new_outbound_connections() do
this_round_nodes = @kademlia.get_peers()

_ =
if Config.perform_sync?() do
for node <- this_round_nodes do
node
|> Peer.from_node()
|> PeerSupervisor.new_peer(__MODULE__)
end
end

:ok
end
end
8 changes: 8 additions & 0 deletions apps/ex_wire/lib/ex_wire/kademlia.ex
Original file line number Diff line number Diff line change
Expand Up @@ -67,4 +67,12 @@ defmodule ExWire.Kademlia do
def handle_neighbours(server \\ Server.name(), neighbours) do
GenServer.cast(server, {:handle_neighbours, neighbours})
end

@doc """
Gets new peers from Kademlia that we can connect to
"""
@spec get_peers(GenServer.server()) :: :ok
def get_peers(server \\ Server.name()) do
GenServer.call(server, :get_peers)
end
end
12 changes: 1 addition & 11 deletions apps/ex_wire/lib/ex_wire/kademlia/discovery.ex
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,9 @@ defmodule ExWire.Kademlia.Discovery do
@moduledoc """
Module that handles node discovery logic.
"""
alias ExWire.{Config, Network, PeerSupervisor}
alias ExWire.Network
alias ExWire.Kademlia.{Node, RoutingTable}
alias ExWire.Message.FindNeighbours
alias ExWire.Struct.Peer

@doc """
Starts discovery round.
Expand All @@ -16,15 +15,6 @@ defmodule ExWire.Kademlia.Discovery do

this_round_nodes = RoutingTable.discovery_nodes(table)

_ =
if Config.perform_sync?() do
for node <- this_round_nodes do
node
|> Peer.from_node()
|> PeerSupervisor.new_peer()
end
end

Enum.each(this_round_nodes, fn node ->
find_neighbours(table, node)
end)
Expand Down
39 changes: 31 additions & 8 deletions apps/ex_wire/lib/ex_wire/kademlia/server.ex
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,11 @@ defmodule ExWire.Kademlia.Server do
alias ExWire.Kademlia.{Discovery, Node, RoutingTable}

@type state :: %{
routing_table: RoutingTable.t(),
ignore_pongs: boolean()
routing_table: RoutingTable.t() | nil,
ignore_pongs: boolean() | nil,
connection_observer: module() | nil
}

defstruct [:routing_table, :ignore_pongs, :connection_observer]
@max_discovery_rounds 7

# 5s
Expand All @@ -29,18 +30,23 @@ defmodule ExWire.Kademlia.Server do
network_client_name = Keyword.fetch!(params, :network_client_name)
current_node = Keyword.fetch!(params, :current_node)
nodes = Keyword.get(params, :nodes, [])
connection_observer = Keyword.get(params, :connection_observer)

GenServer.start_link(__MODULE__, {current_node, network_client_name, nodes}, name: name)
GenServer.start_link(
__MODULE__,
{current_node, network_client_name, nodes, connection_observer},
name: name
)
end

@impl true
def init({current_node = %Node{}, network_client_name, nodes}) do
def init({current_node = %Node{}, network_client_name, nodes, connection_observer}) do
routing_table = RoutingTable.new(current_node, network_client_name)

_ = schedule_discovery_round(0, nodes)
schedule_pongs_cleanup()

{:ok, %{routing_table: routing_table}}
{:ok, %__MODULE__{routing_table: routing_table, connection_observer: connection_observer}}
end

@impl true
Expand Down Expand Up @@ -101,12 +107,29 @@ defmodule ExWire.Kademlia.Server do
{:reply, neighbours, state}
end

def handle_call(
:get_peers,
_from,
state
) do
round = state.routing_table.discovery_round

if round > 0 do
{:reply, RoutingTable.discovery_nodes(state.routing_table), state}
else
{:reply, [], state}
end
end

@impl true
def handle_info({:discovery_round, nodes}, state = %{routing_table: routing_table}) do
def handle_info(
{:discovery_round, nodes},
state = %{routing_table: routing_table, connection_observer: connection_observer}
) do
updated_table = Discovery.start(routing_table, nodes)

_ = schedule_discovery_round(updated_table.discovery_round)

:ok = connection_observer.notify(:discovery_round)
{:noreply, %{state | routing_table: updated_table}}
end

Expand Down
10 changes: 6 additions & 4 deletions apps/ex_wire/lib/ex_wire/node_discovery_supervisor.ex
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,10 @@ defmodule ExWire.NodeDiscoverySupervisor do
"""
use Supervisor

alias ExWire.{Config, Network}
alias ExWire.Config
alias ExWire.Kademlia.Node
alias ExWire.Kademlia.Server, as: KademliaServer
alias ExWire.Network
alias ExWire.Struct.Endpoint

def start_link(params \\ []) do
Expand All @@ -19,8 +20,8 @@ defmodule ExWire.NodeDiscoverySupervisor do
end

def init(params) do
{udp_module, udp_process_name} = ExWire.Config.udp_network_adapter(params)
port = ExWire.Config.listen_port(params)
{udp_module, udp_process_name} = Config.udp_network_adapter(params)
port = Config.listen_port(params)

bootnodes =
params
Expand All @@ -37,7 +38,8 @@ defmodule ExWire.NodeDiscoverySupervisor do
[
current_node: current_node(params),
network_client_name: udp_process_name,
nodes: bootnodes
nodes: bootnodes,
connection_observer: Keyword.get(params, :connection_observer)
]
]
}
Expand Down
6 changes: 4 additions & 2 deletions apps/ex_wire/lib/ex_wire/p2p/connection.ex
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ defmodule ExWire.P2P.Connection do
subscribers: [any()],
sent_message_count: integer(),
datas: [binary()],
last_error: any() | nil
last_error: any() | nil,
is_outbound: boolean()
}

defstruct peer: nil,
Expand All @@ -33,5 +34,6 @@ defmodule ExWire.P2P.Connection do
subscribers: [],
sent_message_count: 0,
datas: [],
last_error: nil
last_error: nil,
is_outbound: false
end
Loading

0 comments on commit 8bbacfa

Please sign in to comment.