diff --git a/apps/blockchain/lib/blockchain/block.ex b/apps/blockchain/lib/blockchain/block.ex index eca38aebf..2dc4dc510 100644 --- a/apps/blockchain/lib/blockchain/block.ex +++ b/apps/blockchain/lib/blockchain/block.ex @@ -286,7 +286,7 @@ defmodule Blockchain.Block do iex> trie = MerklePatriciaTree.Trie.new(MerklePatriciaTree.Test.random_ets_db()) iex> {updated_block, _new_trie} = Blockchain.Block.put_receipt(%Blockchain.Block{}, 6, %Blockchain.Transaction.Receipt{state: <<1, 2, 3>>, cumulative_gas: 10, bloom_filter: <<2, 3, 4>>, logs: []}, trie) iex> {updated_block, _new_trie} = Blockchain.Block.put_receipt(updated_block, 7, %Blockchain.Transaction.Receipt{state: <<4, 5, 6>>, cumulative_gas: 11, bloom_filter: <<5, 6, 7>>, logs: []}, trie) - iex> Blockchain.Block.get_receipt(updated_block, 6, trie.db) + iex> Blockchain.Block.x(updated_block, 6, trie.db) %Blockchain.Transaction.Receipt{state: <<1, 2, 3>>, cumulative_gas: 10, bloom_filter: <<2, 3, 4>>, logs: []} iex> trie = MerklePatriciaTree.Trie.new(MerklePatriciaTree.Test.random_ets_db()) diff --git a/apps/cli/lib/mix/tasks/mana.ex b/apps/cli/lib/mix/tasks/mana.ex index 65033d8d8..b97691be5 100644 --- a/apps/cli/lib/mix/tasks/mana.ex +++ b/apps/cli/lib/mix/tasks/mana.ex @@ -36,6 +36,7 @@ defmodule Mix.Tasks.Mana do sync: sync, bootnodes: bootnodes, warp: warp, + fast: fast, debug: debug }} -> :ok = Logger.warn("Starting mana chain #{Atom.to_string(chain_name)}...") @@ -48,13 +49,14 @@ defmodule Mix.Tasks.Mana do sync: sync, discovery: discovery, bootnodes: bootnodes, - warp: warp + warp: warp, + fast: fast ) {:ok, _} = Application.ensure_all_started(:ex_wire) - # No Halt - Process.sleep(:infinity) + # No Halt + # Process.sleep(:infinity) {:error, error} -> _ = Logger.error("Error: #{error}") diff --git a/apps/cli/lib/parser/mana_parser.ex b/apps/cli/lib/parser/mana_parser.ex index 44f37f254..65ad61d75 100644 --- a/apps/cli/lib/parser/mana_parser.ex +++ b/apps/cli/lib/parser/mana_parser.ex @@ -8,6 +8,7 @@ defmodule CLI.Parser.ManaParser do @default_no_sync false @default_bootnodes "from_chain" @default_warp false + @default_fast false @default_debug false @doc """ @@ -19,6 +20,7 @@ defmodule CLI.Parser.ManaParser do * `--no-sync` - Perform syncing (default: false) * `--bootnodes` - Comma separated list of bootnodes (default: from_chain) * `--warp` - Perform warp sync (default: false) + * `--fast` - Perform fast sync (default: false) * `--debug` - Add remote debugging (default: false) ## Examples @@ -30,16 +32,18 @@ defmodule CLI.Parser.ManaParser do sync: false, bootnodes: :from_chain, warp: false, + fast: false, debug: false }} - iex> CLI.Parser.ManaParser.mana_args(["--chain", "foundation", "--bootnodes", "enode://google.com,enode://apple.com", "--warp", "--debug"]) + iex> CLI.Parser.ManaParser.mana_args(["--chain", "foundation", "--bootnodes", "enode://google.com,enode://apple.com", "--warp", "--fast", "--debug"]) {:ok, %{ chain_name: :foundation, discovery: true, sync: true, bootnodes: ["enode://google.com", "enode://apple.com"], warp: true, + fast: true, debug: true }} @@ -50,6 +54,7 @@ defmodule CLI.Parser.ManaParser do sync: true, bootnodes: :from_chain, warp: false, + fast: false, debug: false }} @@ -64,6 +69,7 @@ defmodule CLI.Parser.ManaParser do sync: boolean(), bootnodes: :from_chain | list(String.t()), warp: boolean(), + fast: boolean(), debug: boolean() }} | {:error, String.t()} @@ -76,6 +82,7 @@ defmodule CLI.Parser.ManaParser do no_sync: :boolean, bootnodes: :string, warp: :boolean, + fast: :boolean, debug: :boolean ] ) @@ -85,6 +92,7 @@ defmodule CLI.Parser.ManaParser do {:ok, sync} <- get_sync(kw_args), {:ok, bootnodes} <- get_bootnodes(kw_args), {:ok, warp} <- get_warp(kw_args), + {:ok, fast} <- get_fast(kw_args), {:ok, debug} <- get_debug(kw_args) do {:ok, %{ @@ -93,6 +101,7 @@ defmodule CLI.Parser.ManaParser do sync: sync, bootnodes: bootnodes, warp: warp, + fast: fast, debug: debug }} end @@ -158,6 +167,15 @@ defmodule CLI.Parser.ManaParser do {:ok, given_warp} end + @spec get_fast(fast: boolean()) :: {:ok, boolean()} | {:error, String.t()} + defp get_fast(kw_args) do + given_fast = + kw_args + |> Keyword.get(:fast, @default_fast) + + {:ok, given_fast} + end + @spec get_debug(debug: boolean()) :: {:ok, boolean()} | {:error, String.t()} defp get_debug(kw_args) do given_debug = diff --git a/apps/ex_wire/lib/ex_wire.ex b/apps/ex_wire/lib/ex_wire.ex index 9e5517b49..41e0a2bd9 100644 --- a/apps/ex_wire/lib/ex_wire.ex +++ b/apps/ex_wire/lib/ex_wire.ex @@ -13,8 +13,8 @@ defmodule ExWire do alias ExWire.NodeDiscoverySupervisor alias ExWire.PeerSupervisor alias ExWire.Sync + alias ExWire.Sync.{BlockState, WarpState} alias ExWire.Sync.WarpProcessor.PowProcessor - alias ExWire.Sync.WarpState alias ExWire.TCPListeningSupervisor alias MerklePatriciaTree.{CachingTrie, DB.RocksDB, Trie} @@ -39,6 +39,8 @@ defmodule ExWire do |> Trie.new() |> CachingTrie.new() + block_queue = BlockState.load_block_queue(db) + warp_queue = if warp do WarpState.load_warp_queue(db) @@ -64,8 +66,11 @@ defmodule ExWire do # Peer supervisor maintains a pool of outbound peers child_spec({PeerSupervisor, start_nodes}, []), + # Processes blocks + {ExWire.Sync.BlockProcessor, {trie}}, + # Sync coordinates asking peers for new blocks - child_spec({Sync, {trie, chain, warp, warp_queue}}, []) + child_spec({Sync, {trie, chain, block_queue, warp, warp_queue}}, []) ] else [] diff --git a/apps/ex_wire/lib/ex_wire/config.ex b/apps/ex_wire/lib/ex_wire/config.ex index e6e727d35..6498d7d6d 100644 --- a/apps/ex_wire/lib/ex_wire/config.ex +++ b/apps/ex_wire/lib/ex_wire/config.ex @@ -28,6 +28,7 @@ defmodule ExWire.Config do | :public_ip | :sync | :warp + | :fast @doc """ Allows application to configure ExWire before it starts. @@ -156,6 +157,11 @@ defmodule ExWire.Config do get_env(given_params, :warp) end + @spec fast?(Keyword.t()) :: boolean() + def fast?(given_params \\ []) do + get_env(given_params, :fast) + end + @spec bootnodes(Keyword.t()) :: [String.t()] def bootnodes(given_params \\ []) do if conf_ip = System.get_env("BOOTNODES") do diff --git a/apps/ex_wire/lib/ex_wire/dev_p2p/session.ex b/apps/ex_wire/lib/ex_wire/dev_p2p/session.ex index 9fa719010..bccd4c300 100644 --- a/apps/ex_wire/lib/ex_wire/dev_p2p/session.ex +++ b/apps/ex_wire/lib/ex_wire/dev_p2p/session.ex @@ -65,7 +65,7 @@ defmodule ExWire.DEVp2p.Session do """ @spec disconnect(t) :: t def disconnect(session = %__MODULE__{}) do - %{session | hello_sent: nil, hello_received: nil, packet_id_map: PacketIdMap.default_map()} + %{session | hello_sent: nil, hello_received: nil, packet_id_map: nil} end @doc """ @@ -100,6 +100,6 @@ defmodule ExWire.DEVp2p.Session do """ @spec compatible_capabilities?(t) :: boolean() def compatible_capabilities?(%__MODULE__{packet_id_map: packet_id_map}) do - packet_id_map != PacketIdMap.default_map() + Map.has_key?(packet_id_map.ids_to_modules, 0x10) end end diff --git a/apps/ex_wire/lib/ex_wire/framing/frame.ex b/apps/ex_wire/lib/ex_wire/framing/frame.ex index 7b85939d1..3c3fc5552 100644 --- a/apps/ex_wire/lib/ex_wire/framing/frame.ex +++ b/apps/ex_wire/lib/ex_wire/framing/frame.ex @@ -16,6 +16,7 @@ defmodule ExWire.Framing.Frame do @type frame :: binary() @padding_size 16 + @min_frame_size 16 + 16 @spec frame(integer(), ExRLP.t(), Secrets.t()) :: {frame, Secrets.t()} def frame( @@ -97,7 +98,11 @@ defmodule ExWire.Framing.Frame do end @spec unframe(binary(), Secrets.t()) :: - {:ok, integer(), binary(), binary(), Secrets.t()} | {:error, String.t()} + {:ok, integer(), binary(), binary(), Secrets.t()} + | {:error, + :insufficient_data + | :failed_to_match_header_ingress_mac + | :failed_to_match_frame_ingress_mac} def unframe( frame, frame_secrets = %Secrets{ @@ -107,80 +112,84 @@ defmodule ExWire.Framing.Frame do mac_secret: mac_secret } ) do - << - # is header always 128 bits? - header_enc::binary-size(16), - header_mac::binary-size(16), - frame_rest::binary() - >> = frame - - # verify header mac - ingress_mac = update_mac(ingress_mac, mac_encoder, mac_secret, header_enc) - expected_header_mac = ingress_mac |> MAC.final() |> Binary.take(16) - - if expected_header_mac != header_mac do - :ok = - Logger.error( - "Failed to match ingress header mac, expected: #{inspect(expected_header_mac)}, got #{ - inspect(header_mac) - }" - ) - - {:error, "Failed to match header ingress mac"} + if byte_size(frame) < @min_frame_size do + {:error, :insufficient_data} else - {decoder_stream, header} = AES.stream_decrypt(header_enc, decoder_stream) - << - frame_size::integer-size(24), - _header_data_and_padding::binary() - >> = header - - # TODO: We should read the header? But, it's unused by all clients. - # header_rlp = header_data_and_padding |> ExRLP.decode - # protocol_id = Enum.at(header_rlp, 0) |> ExRLP.decode - - frame_padding_bytes = padding_size(frame_size, @padding_size) - - if byte_size(frame_rest) < frame_size + frame_padding_bytes + 16 do - {:error, "Insufficent data"} + # is header always 128 bits? + header_enc::binary-size(16), + header_mac::binary-size(16), + frame_rest::binary() + >> = frame + + # verify header mac + ingress_mac = update_mac(ingress_mac, mac_encoder, mac_secret, header_enc) + expected_header_mac = ingress_mac |> MAC.final() |> Binary.take(16) + + if expected_header_mac != header_mac do + :ok = + Logger.error( + "Failed to match ingress header mac, expected: #{inspect(expected_header_mac)}, got #{ + inspect(header_mac) + }" + ) + + {:error, :failed_to_match_header_ingress_mac} else - # let's go and ignore the entire header data.... + {decoder_stream, header} = AES.stream_decrypt(header_enc, decoder_stream) + << - frame_enc::binary-size(frame_size), - frame_padding::binary-size(frame_padding_bytes), - frame_mac::binary-size(16), - frame_rest::binary() - >> = frame_rest + frame_size::integer-size(24), + _header_data_and_padding::binary() + >> = header - frame_enc_with_padding = frame_enc <> frame_padding + # TODO: We should read the header? But, it's unused by all clients. + # header_rlp = header_data_and_padding |> ExRLP.decode + # protocol_id = Enum.at(header_rlp, 0) |> ExRLP.decode - ingress_mac = MAC.update(ingress_mac, frame_enc_with_padding) - ingress_mac = update_mac(ingress_mac, mac_encoder, mac_secret, nil) - expected_frame_mac = ingress_mac |> MAC.final() |> Binary.take(16) + frame_padding_bytes = padding_size(frame_size, @padding_size) - if expected_frame_mac != frame_mac do - {:error, "Failed to match frame ingress mac"} + if byte_size(frame_rest) < frame_size + frame_padding_bytes + 16 do + {:error, :insufficient_data} else - {decoder_stream, frame_with_padding} = - AES.stream_decrypt(frame_enc_with_padding, decoder_stream) - - << - frame::binary-size(frame_size), - _frame_padding::binary() - >> = frame_with_padding - + # let's go and ignore the entire header data.... << - packet_type_rlp::binary-size(1), - packet_data_rlp::binary() - >> = frame - - { - :ok, - packet_type_rlp |> ExRLP.decode() |> :binary.decode_unsigned(), - packet_data_rlp |> ExRLP.decode(), - frame_rest, - %{frame_secrets | ingress_mac: ingress_mac, decoder_stream: decoder_stream} - } + frame_enc::binary-size(frame_size), + frame_padding::binary-size(frame_padding_bytes), + frame_mac::binary-size(16), + frame_rest::binary() + >> = frame_rest + + frame_enc_with_padding = frame_enc <> frame_padding + + ingress_mac = MAC.update(ingress_mac, frame_enc_with_padding) + ingress_mac = update_mac(ingress_mac, mac_encoder, mac_secret, nil) + expected_frame_mac = ingress_mac |> MAC.final() |> Binary.take(16) + + if expected_frame_mac != frame_mac do + {:error, :failed_to_match_frame_ingress_mac} + else + {decoder_stream, frame_with_padding} = + AES.stream_decrypt(frame_enc_with_padding, decoder_stream) + + << + frame::binary-size(frame_size), + _frame_padding::binary() + >> = frame_with_padding + + << + packet_type_rlp::binary-size(1), + packet_data_rlp::binary() + >> = frame + + { + :ok, + packet_type_rlp |> ExRLP.decode() |> :binary.decode_unsigned(), + packet_data_rlp |> ExRLP.decode(), + frame_rest, + %{frame_secrets | ingress_mac: ingress_mac, decoder_stream: decoder_stream} + } + end end end end diff --git a/apps/ex_wire/lib/ex_wire/p2p/manager.ex b/apps/ex_wire/lib/ex_wire/p2p/manager.ex index 4e2948e2a..c24c81342 100644 --- a/apps/ex_wire/lib/ex_wire/p2p/manager.ex +++ b/apps/ex_wire/lib/ex_wire/p2p/manager.ex @@ -133,13 +133,15 @@ defmodule ExWire.P2P.Manager do # TOOD: How does this work exactly? Is this for multiple frames? handle_packet_data(frame_rest, conn_after_handle) - {:error, "Insufficent data"} -> + {:error, :insufficient_data} -> %{conn | queued_data: total_data} {:error, reason} -> _ = Logger.error( - "[Network] [#{peer}] Failed to read incoming packet from #{peer.host_name} `#{reason}`)" + "[Network] [#{peer}] Failed to read incoming packet from #{peer.host_name} `#{ + to_string(reason) + }`)" ) %{conn | last_error: reason} diff --git a/apps/ex_wire/lib/ex_wire/p2p/server.ex b/apps/ex_wire/lib/ex_wire/p2p/server.ex index 10f9922df..88c4580ac 100644 --- a/apps/ex_wire/lib/ex_wire/p2p/server.ex +++ b/apps/ex_wire/lib/ex_wire/p2p/server.ex @@ -74,6 +74,11 @@ defmodule ExWire.P2P.Server do }) end + @spec get_state(pid()) :: Connection.t() + def get_state(pid) do + GenServer.call(pid, :get_state, :infinity) + end + @doc """ Client function for sending a packet over to a peer. """ @@ -132,6 +137,10 @@ defmodule ExWire.P2P.Server do {:ok, state} end + def handle_call(:get_state, _from, state) do + {:reply, state, state} + end + def handle_call(:get_peer, _from, state = %{peer: peer}) do {:reply, peer, state} end diff --git a/apps/ex_wire/lib/ex_wire/packet/capability/eth.ex b/apps/ex_wire/lib/ex_wire/packet/capability/eth.ex index e5dcaaf79..bae4c1e43 100644 --- a/apps/ex_wire/lib/ex_wire/packet/capability/eth.ex +++ b/apps/ex_wire/lib/ex_wire/packet/capability/eth.ex @@ -16,7 +16,35 @@ defmodule ExWire.Packet.Capability.Eth do Eth.BlockHeaders, Eth.GetBlockBodies, Eth.BlockBodies, - Eth.NewBlock + Eth.NewBlock, + :reserved, + :reserved, + :reserved, + :reserved, + :reserved, + :reserved, + :reserved, + :reserved, + :reserved + ], + 63 => [ + Eth.Status, + Eth.NewBlockHashes, + Eth.Transactions, + Eth.GetBlockHeaders, + Eth.BlockHeaders, + Eth.GetBlockBodies, + Eth.BlockBodies, + Eth.NewBlock, + Eth.GetNodeData, + Eth.NodeData, + Eth.GetReceipts, + Eth.Receipts, + :reserved, + :reserved, + :reserved, + :reserved, + :reserved ] } diff --git a/apps/ex_wire/lib/ex_wire/packet/capability/eth/get_node_data.ex b/apps/ex_wire/lib/ex_wire/packet/capability/eth/get_node_data.ex new file mode 100644 index 000000000..018023b2e --- /dev/null +++ b/apps/ex_wire/lib/ex_wire/packet/capability/eth/get_node_data.ex @@ -0,0 +1,120 @@ +defmodule ExWire.Packet.Capability.Eth.GetNodeData do + @moduledoc """ + TODO + + ``` + **GetNodeData** [`+0x0d`, `hash_0`: `B_32`, `hash_1`: `B_32`, `...`] + Require peer to return a NodeData message. Hint that useful values in it are those which correspond to given hashes. + ``` + """ + + alias ExWire.Bridge.Sync + alias ExWire.Packet + alias ExWire.Packet.Capability.Eth.NodeData + alias MerklePatriciaTree.TrieStorage + require Logger + + @behaviour Packet + + @sync Application.get_env(:ex_wire, :sync_mock, Sync) + @max_hashes_supported 100 + + @type t :: %__MODULE__{ + hashes: list(EVM.hash()) + } + + defstruct hashes: [] + + @doc """ + Returns the relative message id offset for this message. + + This will help determine what its message ID is relative to other Packets in the same Capability. + """ + @impl true + @spec message_id_offset() :: 0x0D + def message_id_offset do + 0x0D + end + + @doc """ + Given a GetNodeData packet, serializes for transport over Eth Wire Protocol. + + ## Examples + + iex> %ExWire.Packet.Capability.Eth.GetNodeData{hashes: [<<1::256>>, <<2::256>>]} + ...> |> ExWire.Packet.Capability.Eth.GetNodeData.serialize() + [<<1::256>>, <<2::256>>] + """ + @impl true + @spec serialize(t()) :: ExRLP.t() + def serialize(packet = %__MODULE__{}) do + packet.hashes + end + + @doc """ + Given an RLP-encoded GetNodeData packet from Eth Wire Protocol, + decodes into a GetNodeData struct. + + ## Examples + + iex> ExWire.Packet.Capability.Eth.GetNodeData.deserialize([<<1::256>>, <<2::256>>]) + %ExWire.Packet.Capability.Eth.GetNodeData{hashes: [<<1::256>>, <<2::256>>]} + """ + @impl true + @spec deserialize(ExRLP.t()) :: t() + def deserialize(rlp) do + hashes = rlp + + %__MODULE__{ + hashes: hashes + } + end + + @doc """ + Handles a GetNodeData message. We should send the node data for the given + keys if we have that data. + """ + @impl true + @spec handle(t()) :: ExWire.Packet.handle_response() + def handle(packet = %__MODULE__{}) do + values = + case @sync.get_current_trie() do + {:ok, trie} -> + get_node_data( + trie, + Enum.take(packet.hashes, @max_hashes_supported) + ) + + {:error, error} -> + :ok = + Logger.warn(fn -> + "#{__MODULE__} Error calling Sync.get_current_trie(): #{error}. Returning empty values." + end) + + [] + end + + {:send, %NodeData{values: values}} + end + + @spec get_node_data(Trie.t(), list(EVM.hash())) :: list(binary()) + defp get_node_data(trie, hashes) do + do_get_node_data(trie, hashes, []) + end + + @spec do_get_node_data(Trie.t(), list(EVM.hash()), list(binary())) :: list(binary()) + defp do_get_node_data(_trie, [], acc_values), do: Enum.reverse(acc_values) + + defp do_get_node_data(trie, [hash | rest_hashes], acc_values) do + new_acc = + case TrieStorage.get_raw_key(trie, hash) do + :not_found -> + acc_values + + {:ok, value} -> + [value | acc_values] + end + + do_get_node_data(trie, rest_hashes, new_acc) + end +end diff --git a/apps/ex_wire/lib/ex_wire/packet/capability/eth/get_receipts.ex b/apps/ex_wire/lib/ex_wire/packet/capability/eth/get_receipts.ex new file mode 100644 index 000000000..62f2148b6 --- /dev/null +++ b/apps/ex_wire/lib/ex_wire/packet/capability/eth/get_receipts.ex @@ -0,0 +1,128 @@ +defmodule ExWire.Packet.Capability.Eth.GetReceipts do + @moduledoc """ + TODO + + ``` + **GetReceipts** [`+0x0d`, `hash_0`: `B_32`, `hash_1`: `B_32`, `...`] + Require peer to return a `Receipts` message. Hint that useful values in it + are those which correspond to blocks of the given hashes. + ``` + """ + require Logger + + alias Blockchain.Transaction.Receipt + alias ExWire.Bridge.Sync + alias ExWire.Packet + alias ExWire.Packet.Capability.Eth.Receipts + alias MerklePatriciaTree.TrieStorage + + @behaviour Packet + + @sync Application.get_env(:ex_wire, :sync_mock, Sync) + @max_hashes_supported 100 + + @type t :: %__MODULE__{ + hashes: list(EVM.hash()) + } + + defstruct hashes: [] + + @doc """ + Returns the relative message id offset for this message. + + This will help determine what its message ID is relative to other Packets in the same Capability. + """ + @impl true + @spec message_id_offset() :: 0x0D + def message_id_offset do + 0x0D + end + + @doc """ + Given a GetReceipts packet, serializes for transport over Eth Wire Protocol. + + ## Examples + + iex> %ExWire.Packet.Capability.Eth.GetReceipts{hashes: [<<1::256>>, <<2::256>>]} + ...> |> ExWire.Packet.Capability.Eth.GetReceipts.serialize() + [<<1::256>>, <<2::256>>] + """ + @impl true + @spec serialize(t()) :: ExRLP.t() + def serialize(packet = %__MODULE__{}) do + packet.hashes + end + + @doc """ + Given an RLP-encoded GetReceipts packet from Eth Wire Protocol, + decodes into a GetReceipts struct. + + ## Examples + + iex> ExWire.Packet.Capability.Eth.GetReceipts.deserialize([<<1::256>>, <<2::256>>]) + %ExWire.Packet.Capability.Eth.GetReceipts{hashes: [<<1::256>>, <<2::256>>]} + """ + @impl true + @spec deserialize(ExRLP.t()) :: t() + def deserialize(rlp) do + hashes = rlp + + %__MODULE__{ + hashes: hashes + } + end + + @doc """ + Handles a GetReceipts message. We should send the node data for the given + keys if we have that data. + """ + @impl true + @spec handle(t()) :: ExWire.Packet.handle_response() + def handle(packet = %__MODULE__{}) do + receipts = + case @sync.get_current_trie() do + {:ok, trie} -> + get_receipts( + trie, + Enum.take(packet.hashes, @max_hashes_supported) + ) + + {:error, error} -> + :ok = + Logger.warn(fn -> + "#{__MODULE__} Error calling Sync.get_current_trie(): #{error}. Returning empty receipts." + end) + + [] + end + + {:send, %Receipts{receipts: receipts}} + end + + @spec get_receipts(Trie.t(), list(EVM.hash())) :: list(Receipt.t()) + defp get_receipts(trie, hashes) do + do_get_receipts(trie, hashes, []) + end + + @spec do_get_receipts(Trie.t(), list(EVM.hash()), list(Receipt.t())) :: list(Receipt.t()) + defp do_get_receipts(_trie, [], acc_receipts), do: Enum.reverse(acc_receipts) + + defp do_get_receipts(trie, [hash | rest_hashes], acc_receipts) do + # TODO: Get receipts correctly or whatever. + new_acc = + case TrieStorage.get_raw_key(trie, hash) do + :not_found -> + acc_receipts + + {:ok, receipt_rlp_bin} -> + receipt = + receipt_rlp_bin + |> ExRLP.decode() + |> Receipt.deserialize() + + [receipt | acc_receipts] + end + + do_get_receipts(trie, rest_hashes, new_acc) + end +end diff --git a/apps/ex_wire/lib/ex_wire/packet/capability/eth/new_block.ex b/apps/ex_wire/lib/ex_wire/packet/capability/eth/new_block.ex index 6e580dbb9..6fa636812 100644 --- a/apps/ex_wire/lib/ex_wire/packet/capability/eth/new_block.ex +++ b/apps/ex_wire/lib/ex_wire/packet/capability/eth/new_block.ex @@ -15,14 +15,13 @@ defmodule ExWire.Packet.Capability.Eth.NewBlock do alias Block.Header alias Blockchain.Block - alias ExWire.Packet.Capability.Eth.Transactions + alias Blockchain.Transaction @behaviour ExWire.Packet - # TODO: fill in Transactions typespec when that packet is figured out @type t :: %__MODULE__{ header: Header.t(), - transactions: [any()], + transactions: [Transaction.t()], uncles: [Block.t()], total_difficulty: integer() | nil } @@ -53,8 +52,8 @@ defmodule ExWire.Packet.Capability.Eth.NewBlock do [ [ Header.serialize(packet.header), - Transactions.serialize(%Transactions{transactions: packet.transactions}), - packet.uncles |> Block.serialize() |> Enum.to_list() + Enum.map(packet.transactions, &Transaction.serialize/1), + Enum.map(packet.uncles, &Header.serialize/1) ], packet.total_difficulty ] @@ -70,16 +69,16 @@ defmodule ExWire.Packet.Capability.Eth.NewBlock do [ [ header, - transactions, - uncles + transaction_rlp, + uncles_rlp ], total_difficulty ] = rlp %__MODULE__{ header: Header.deserialize(header), - transactions: Transactions.deserialize(transactions).transactions, - uncles: uncles |> Block.deserialize() |> Enum.to_list(), + transactions: Enum.map(transaction_rlp, &Transaction.deserialize/1), + uncles: Enum.map(uncles_rlp, &Header.deserialize/1), total_difficulty: total_difficulty |> :binary.decode_unsigned() } end diff --git a/apps/ex_wire/lib/ex_wire/packet/capability/eth/node_data.ex b/apps/ex_wire/lib/ex_wire/packet/capability/eth/node_data.ex new file mode 100644 index 000000000..8492fd95d --- /dev/null +++ b/apps/ex_wire/lib/ex_wire/packet/capability/eth/node_data.ex @@ -0,0 +1,83 @@ +defmodule ExWire.Packet.Capability.Eth.NodeData do + @moduledoc """ + TODO + + ``` + **NodeData** [`+0x0e`, `value_0`: `B`, `value_1`: `B`, `...`] + Provide a set of values which correspond to previously asked node data hashes + from GetNodeData. Does not need to contain all; best effort is fine. If it + contains none, then has no information for previous GetNodeData hashes. + ``` + """ + + alias ExWire.Packet + require Logger + + @behaviour Packet + + @type t :: %__MODULE__{ + values: list(binary()) + } + + defstruct values: [] + + @doc """ + Returns the relative message id offset for this message. + + This will help determine what its message ID is relative to other Packets in the same Capability. + """ + @impl true + @spec message_id_offset() :: 0x0E + def message_id_offset do + 0x0E + end + + @doc """ + Given a NodeData packet, serializes for transport over Eth Wire Protocol. + + ## Examples + + iex> %ExWire.Packet.Capability.Eth.NodeData{values: [<<1::256>>, <<2::256>>]} + ...> |> ExWire.Packet.Capability.Eth.NodeData.serialize() + [<<1::256>>, <<2::256>>] + """ + @impl true + @spec serialize(t()) :: ExRLP.t() + def serialize(packet = %__MODULE__{}) do + packet.values + end + + @doc """ + Given an RLP-encoded NodeData packet from Eth Wire Protocol, + decodes into a NodeData struct. + + ## Examples + + iex> ExWire.Packet.Capability.Eth.NodeData.deserialize([<<1::256>>, <<2::256>>]) + %ExWire.Packet.Capability.Eth.NodeData{values: [<<1::256>>, <<2::256>>]} + """ + @impl true + @spec deserialize(ExRLP.t()) :: t() + def deserialize(rlp) do + values = rlp + + %__MODULE__{ + values: values + } + end + + @doc """ + Handles a NodeData message. We do not respond. + + ## Examples + + iex> %ExWire.Packet.Capability.Eth.NodeData{values: [<<1::256>>, <<2::256>>]} + ...> |> ExWire.Packet.Capability.Eth.NodeData.handle() + :ok + """ + @impl true + @spec handle(t()) :: ExWire.Packet.handle_response() + def handle(_packet = %__MODULE__{}) do + :ok + end +end diff --git a/apps/ex_wire/lib/ex_wire/packet/capability/eth/receipts.ex b/apps/ex_wire/lib/ex_wire/packet/capability/eth/receipts.ex new file mode 100644 index 000000000..ac1a37ffd --- /dev/null +++ b/apps/ex_wire/lib/ex_wire/packet/capability/eth/receipts.ex @@ -0,0 +1,87 @@ +defmodule ExWire.Packet.Capability.Eth.Receipts do + @moduledoc """ + TODO + + ``` + **Receipts** [`+0x10`, [`receipt_0`, `receipt_1`], ...] + Provide a set of receipts which correspond to previously asked in + `GetReceipts`. + ``` + """ + require Logger + + alias Blockchain.Transaction.Receipt + alias ExWire.Packet + + @behaviour Packet + + @type t :: %__MODULE__{ + receipts: list(Receipt.t()) + } + + defstruct receipts: [] + + @doc """ + Returns the relative message id offset for this message. + + This will help determine what its message ID is relative to other Packets in the same Capability. + """ + @impl true + @spec message_id_offset() :: 0x10 + def message_id_offset do + 0x10 + end + + @doc """ + Given a Receipts packet, serializes for transport over Eth Wire Protocol. + + ## Examples + + iex> %ExWire.Packet.Capability.Eth.Receipts{receipts: [ + ...> %Blockchain.Transaction.Receipt{state: <<1,2,3>>, cumulative_gas: 5, bloom_filter: <<2,3,4>>, logs: []} + ...> ]} + ...> |> ExWire.Packet.Capability.Eth.Receipts.serialize() + [[<<1, 2, 3>>, 5, <<2, 3, 4>>, []]] + """ + @impl true + @spec serialize(t()) :: ExRLP.t() + def serialize(packet = %__MODULE__{}) do + Enum.map(packet.receipts, &Receipt.serialize/1) + end + + @doc """ + Given an RLP-encoded Receipts packet from Eth Wire Protocol, + decodes into a Receipts struct. + + ## Examples + + iex> ExWire.Packet.Capability.Eth.Receipts.deserialize([[<<1, 2, 3>>, 5, <<2, 3, 4>>, []]]) + %ExWire.Packet.Capability.Eth.Receipts{receipts: [ + %Blockchain.Transaction.Receipt{state: <<1,2,3>>, cumulative_gas: 5, bloom_filter: <<2,3,4>>, logs: []} + ]} + """ + @impl true + @spec deserialize(ExRLP.t()) :: t() + def deserialize(rlp) do + receipts_rlp = rlp + + %__MODULE__{ + receipts: Enum.map(receipts_rlp, &Receipt.deserialize/1) + } + end + + @doc """ + Handles a Receipts message. We do not respond. + + ## Examples + + iex> %ExWire.Packet.Capability.Eth.Receipts{receipts: []} + ...> |> ExWire.Packet.Capability.Eth.Receipts.handle() + :ok + """ + @impl true + @spec handle(t()) :: ExWire.Packet.handle_response() + def handle(_packet = %__MODULE__{}) do + :ok + end +end diff --git a/apps/ex_wire/lib/ex_wire/packet/capability/mana.ex b/apps/ex_wire/lib/ex_wire/packet/capability/mana.ex index 4c968d44b..b9089a3d2 100644 --- a/apps/ex_wire/lib/ex_wire/packet/capability/mana.ex +++ b/apps/ex_wire/lib/ex_wire/packet/capability/mana.ex @@ -1,9 +1,11 @@ defmodule ExWire.Packet.Capability.Mana do alias ExWire.Packet.Capability alias ExWire.Packet.Capability.Eth + alias ExWire.Packet.Capability.Par @our_capabilities_map %{ - Eth.get_name() => Eth + Eth.get_name() => Eth, + Par.get_name() => Par } @our_capabilities @our_capabilities_map diff --git a/apps/ex_wire/lib/ex_wire/packet/capability/par.ex b/apps/ex_wire/lib/ex_wire/packet/capability/par.ex index 8cd0da994..c505fa8b9 100644 --- a/apps/ex_wire/lib/ex_wire/packet/capability/par.ex +++ b/apps/ex_wire/lib/ex_wire/packet/capability/par.ex @@ -1,6 +1,7 @@ defmodule ExWire.Packet.Capability.Par do alias ExWire.Config alias ExWire.Packet.Capability + alias ExWire.Packet.Capability.Eth alias ExWire.Packet.Capability.Par @behaviour Capability @@ -10,10 +11,26 @@ defmodule ExWire.Packet.Capability.Par do @version_to_packet_types %{ 1 => [ Par.WarpStatus, + Eth.NewBlockHashes, + Eth.Transactions, + Eth.GetBlockHeaders, + Eth.BlockHeaders, + Eth.GetBlockBodies, + Eth.BlockBodies, + Eth.NewBlock, Par.GetSnapshotManifest, Par.SnapshotManifest, Par.GetSnapshotData, - Par.SnapshotData + Par.SnapshotData, + :reserved, + :reserved, + :reserved, + :reserved, + :reserved, + :reserved, + :reserved, + :reserved, + :reserved ] } diff --git a/apps/ex_wire/lib/ex_wire/packet/packet_id_map.ex b/apps/ex_wire/lib/ex_wire/packet/packet_id_map.ex index fdea5c22f..40bde637a 100644 --- a/apps/ex_wire/lib/ex_wire/packet/packet_id_map.ex +++ b/apps/ex_wire/lib/ex_wire/packet/packet_id_map.ex @@ -48,7 +48,7 @@ defmodule ExWire.Packet.PacketIdMap do |> Enum.filter(fn cap -> Capability.are_we_capable?(cap, Mana.get_our_capabilities_map()) end) - |> Enum.sort(fn {name1, _v1}, {name2, _v2} -> name1 < name2 end) + |> Enum.sort(fn %Capability{name: name1}, %Capability{name: name2} -> name1 < name2 end) |> Enum.map(fn cap -> Capability.get_packets_for_capability(cap, Mana.get_our_capabilities_map()) end) @@ -116,8 +116,13 @@ defmodule ExWire.Packet.PacketIdMap do defp build_capability_ids_to_modules_map(capability_packet_types, {base_id, starting_map}) do capability_packet_types |> Enum.reduce({base_id, starting_map}, fn packet_type, {next_base_id, updated_map} -> - packet_id = base_id + apply(packet_type, :message_id_offset, []) - {Kernel.max(next_base_id, packet_id + 1), Map.put(updated_map, packet_id, packet_type)} + if packet_type == :reserved do + {next_base_id + 1, updated_map} + else + packet_id = base_id + apply(packet_type, :message_id_offset, []) + + {Kernel.max(next_base_id, packet_id + 1), Map.put(updated_map, packet_id, packet_type)} + end end) end end diff --git a/apps/ex_wire/lib/ex_wire/peer_supervisor.ex b/apps/ex_wire/lib/ex_wire/peer_supervisor.ex index c5e661fbd..f32bc12f3 100644 --- a/apps/ex_wire/lib/ex_wire/peer_supervisor.ex +++ b/apps/ex_wire/lib/ex_wire/peer_supervisor.ex @@ -92,28 +92,36 @@ defmodule ExWire.PeerSupervisor do @spec do_find_children(node_selector()) :: list(any()) defp do_find_children(:all) do - DynamicSupervisor.which_children(@name) + compatible_children(@name) end defp do_find_children(:last) do @name - |> DynamicSupervisor.which_children() + |> compatible_children() |> List.last() |> List.wrap() end defp do_find_children(:random) do @name - |> DynamicSupervisor.which_children() + |> compatible_children() |> Enum.shuffle() |> Enum.take(1) |> List.wrap() end + defp compatible_children(name) do + name + |> DynamicSupervisor.which_children() + |> Enum.filter(fn {_id, child, _type, _modules} -> + Server.get_state(child).session != nil + end) + end + @spec connected_peer_count() :: non_neg_integer() def connected_peer_count() do @name - |> DynamicSupervisor.which_children() + |> compatible_children() |> Enum.count() end diff --git a/apps/ex_wire/lib/ex_wire/struct/block_queue.ex b/apps/ex_wire/lib/ex_wire/struct/block_queue.ex index 3d884c817..77cff8346 100644 --- a/apps/ex_wire/lib/ex_wire/struct/block_queue.ex +++ b/apps/ex_wire/lib/ex_wire/struct/block_queue.ex @@ -15,21 +15,33 @@ defmodule ExWire.Struct.BlockQueue do 3. We may be waiting on a parent block as we received the child first. We add these blocks to a backlog map keyed by the parent hash. """ + require Logger + alias Block.Header alias ExWire.Struct.Block, as: BlockStruct - alias Blockchain.{Block, Blocktree, Chain} + alias ExWire.Struct.Peer + alias Blockchain.{Block, Blocktree} alias MerklePatriciaTree.Trie - require Logger + alias ExWire.Packet.Capability.Eth.{ + BlockBodies, + BlockHeaders, + NodeData, + Receipts + } # These will be used to help us determine if a block is empty @empty_trie MerklePatriciaTree.Trie.empty_trie_root_hash() @empty_hash [] |> ExRLP.encode() |> ExthCrypto.Hash.Keccak.kec() defstruct queue: %{}, - backlog: %{}, - do_validation: true, - block_numbers: MapSet.new() + block_numbers: MapSet.new(), + needed_block_hashes: [], + max_header_request: nil, + header_requests: MapSet.new(), + block_requests: MapSet.new(), + block_tree: Blocktree.new_tree(), + processing_blocks: MapSet.new() @type block_item :: %{ commitments: list(binary()), @@ -41,206 +53,20 @@ defmodule ExWire.Struct.BlockQueue do EVM.hash() => block_item } + @type request :: {:header, integer()} | {:block, list(EVM.hash())} + @type t :: %__MODULE__{ queue: %{integer() => block_map}, - backlog: %{EVM.hash() => list(Block.t())}, - do_validation: boolean(), - block_numbers: MapSet.t() + block_numbers: MapSet.t(integer()), + needed_block_hashes: list(EVM.hash()), + max_header_request: nil | integer(), + header_requests: MapSet.t(integer()), + block_requests: MapSet.t(EVM.hash()), + block_tree: Blocktree.t(), + processing_blocks: %{EVM.hash() => Block.t()} } - @doc """ - Adds a given header received by a peer to a block queue. Returns whether or - not we should request the block body. - - Note: we will process it if the block is empty (i.e. has neither transactions - nor ommers). - """ - @spec add_header( - t, - Blocktree.t(), - Header.t(), - EVM.hash(), - binary(), - Chain.t(), - Trie.t() - ) :: {t, Blocktree.t(), Trie.t(), boolean()} - def add_header( - block_queue = %__MODULE__{queue: queue}, - block_tree, - header, - header_hash, - remote_id, - chain, - trie - ) do - block_map = Map.get(queue, header.number, %{}) - - {block_map, should_request_body} = - case Map.get(block_map, header_hash) do - nil -> - # may already be ready, already. - is_empty = is_block_empty?(header) - - block_map = - Map.put(block_map, header_hash, %{ - commitments: MapSet.new([remote_id]), - block: %Block{header: header}, - ready: is_empty - }) - - {block_map, not is_empty} - - block_item -> - {Map.put(block_map, header_hash, %{ - block_item - | commitments: MapSet.put(block_item.commitments, remote_id) - }), false} - end - - updated_block_queue = %{ - block_queue - | queue: Map.put(queue, header.number, block_map), - block_numbers: MapSet.put(block_queue.block_numbers, header.number) - } - - {new_block_queue, new_block_tree, new_trie} = - process_block_queue(updated_block_queue, block_tree, chain, trie) - - {new_block_queue, new_block_tree, new_trie, should_request_body} - end - - @doc """ - Adds a given block struct received by a peer to a block queue. - - Since we don't really know which block this belongs to, we're going to just - need to look at every block and try and guess. - - To guess, we'll compute the transactions root and ommers hash, and then try - and find a header that matches it. For empty blocks (ones with no transactions - and no ommers, there may be several matches. Otherwise, each block body should - pretty much be unique). - """ - @spec add_block_struct( - t(), - Blocktree.t(), - BlockStruct.t(), - Chain.t(), - Trie.t() - ) :: {t(), Blocktree.t(), Trie.t()} - def add_block_struct( - block_queue = %__MODULE__{queue: queue}, - block_tree, - block_struct, - chain, - trie - ) do - transactions_root = get_transactions_root(block_struct.transactions_rlp) - ommers_hash = get_ommers_hash(block_struct.ommers_rlp) - - updated_queue = - Enum.reduce(queue, queue, fn {number, block_map}, queue -> - updated_block_map = - Enum.reduce(block_map, block_map, fn {hash, block_item}, block_map -> - if block_item.block.header.transactions_root == transactions_root and - block_item.block.header.ommers_hash == ommers_hash do - # This is now ready! (though, it may not still have enough commitments) - block = %{ - block_item.block - | transactions: block_struct.transactions, - ommers: block_struct.ommers - } - - Map.put(block_map, hash, %{block_item | block: block, ready: true}) - else - block_map - end - end) - - Map.put(queue, number, updated_block_map) - end) - - updated_block_queue = %{block_queue | queue: updated_queue} - - process_block_queue(updated_block_queue, block_tree, chain, trie) - end - - @doc """ - Processes a the block queue, adding any blocks which are complete and pass - the number of confirmations to the block tree. These blocks are then removed - from the queue. Note: they may end up in the backlog, nonetheless, if we are - waiting still for the parent block. - """ - @spec process_block_queue(t(), Blocktree.t(), Chain.t(), Trie.t()) :: - {t(), Blocktree.t(), Trie.t()} - def process_block_queue( - block_queue = %__MODULE__{}, - block_tree, - chain, - trie - ) do - # First get ready to process blocks - {remaining_block_queue, blocks} = get_complete_blocks(block_queue) - - # Then recursively process them - do_process_blocks(blocks, remaining_block_queue, block_tree, chain, trie) - end - - @spec do_process_blocks(list(Block.t()), t(), Blocktree.t(), Chain.t(), Trie.t()) :: - {t(), Blocktree.t(), Trie.t()} - - defp do_process_blocks([], block_queue, block_tree, _chain, trie), - do: {block_queue, block_tree, trie} - - defp do_process_blocks([block | rest], block_queue, block_tree, chain, trie) do - {new_block_tree, new_trie, new_backlog, extra_blocks} = - case Blocktree.verify_and_add_block( - block_tree, - chain, - block, - trie, - block_queue.do_validation - ) do - {:invalid, [:non_genesis_block_requires_parent]} -> - # Note: this is probably too slow since we see a lot of blocks without - # parents and, I think, we're running the full validity check. - - # :ok = Logger.debug("[Block Queue] Failed to verify block due to missing parent") - - updated_backlog = - Map.update( - block_queue.backlog, - block.header.parent_hash, - [block], - fn blocks -> [block | blocks] end - ) - - {block_tree, trie, updated_backlog, []} - - {:invalid, reasons} -> - :ok = - Logger.debug(fn -> - "[Block Queue] Failed to verify block due to #{inspect(reasons)}" - end) - - {block_tree, trie, block_queue.backlog, []} - - {:ok, {new_block_tree, new_trie, block_hash}} -> - :ok = - Logger.debug(fn -> - "[Block Queue] Verified block #{block.header.number} (0x#{ - Base.encode16(block_hash, case: :lower) - }) and added to new block tree" - end) - - {backlogged_blocks, new_backlog} = Map.pop(block_queue.backlog, block_hash, []) - - {new_block_tree, new_trie, new_backlog, backlogged_blocks} - end - - new_block_queue = %{block_queue | backlog: new_backlog} - - do_process_blocks(extra_blocks ++ rest, new_block_queue, new_block_tree, chain, new_trie) - end + @headers_per_request 15 @doc """ Returns the set of blocks which are complete in the block queue, returning a @@ -321,29 +147,39 @@ defmodule ExWire.Struct.BlockQueue do } """ @spec get_complete_blocks(t) :: {t, [Block.t()]} - def get_complete_blocks(block_queue = %__MODULE__{queue: queue}) do - {queue, blocks} = - Enum.reduce(queue, {queue, []}, fn {number, block_map}, {queue, blocks} -> - {final_block_map, new_blocks} = - Enum.reduce(block_map, {block_map, []}, fn {hash, block_item}, {block_map, blocks} -> + def get_complete_blocks( + block_queue = %__MODULE__{queue: queue, processing_blocks: processing_blocks} + ) do + {next_queue, next_processing_blocks, blocks} = + Enum.reduce(queue, {queue, processing_blocks, []}, fn {number, block_map}, + {curr_queue, curr_processing_blocks, + blocks} -> + {final_block_map, new_blocks, next_processing_blocks} = + Enum.reduce(block_map, {block_map, [], curr_processing_blocks}, fn {hash, block_item}, + {block_map, blocks, + inner_curr_processing_blocks} -> if block_item.ready and MapSet.size(block_item.commitments) >= ExWire.Config.commitment_count() do - {Map.delete(block_map, hash), [block_item.block | blocks]} + {Map.delete(block_map, hash), [block_item.block | blocks], + Map.put(inner_curr_processing_blocks, hash, block_item.block)} else - {block_map, blocks} + {block_map, blocks, inner_curr_processing_blocks} end end) total_blocks = blocks ++ new_blocks - if final_block_map == %{} do - {Map.delete(queue, number), total_blocks} - else - {Map.put(queue, number, final_block_map), total_blocks} - end + next_queue = + if final_block_map == %{} do + Map.delete(curr_queue, number) + else + Map.put(curr_queue, number, final_block_map) + end + + {next_queue, next_processing_blocks, total_blocks} end) - {%{block_queue | queue: queue}, blocks} + {%{block_queue | queue: next_queue, processing_blocks: next_processing_blocks}, blocks} end @doc """ @@ -378,6 +214,261 @@ defmodule ExWire.Struct.BlockQueue do header.transactions_root == @empty_trie and header.ommers_hash == @empty_hash end + @doc """ + Determines the next block we don't yet have in our blocktree and + dispatches a request to all connected peers for that block and the + next `n` blocks after it. + """ + @spec get_requests(BlockQueue.t()) :: list(request()) + def get_requests(block_queue) do + requests = [] + + # TODO: Consider this conditional logic + {next_block_queue, requests} = + if MapSet.size(block_queue.header_requests) > 5 || + Enum.count(block_queue.needed_block_hashes) > 5 do + {block_queue, requests} + else + highest_request = + if is_nil(block_queue.max_header_request) do + if is_nil(block_queue.block_tree.best_block) do + 0 + else + 0 + end + else + 0 + end + + { + %{ + block_queue + | header_requests: + MapSet.union( + block_queue.header_requests, + MapSet.new(highest_request..(highest_request + @headers_per_request)) + ), + max_header_request: highest_request + @headers_per_request + }, + [{:headers, highest_request, @headers_per_request} | requests] + } + end + + {next_block_queue_2, requests} = + if Enum.count(next_block_queue.needed_block_hashes) == 0 do + {next_block_queue, requests} + else + { + %{ + next_block_queue + | needed_block_hashes: [], + block_requests: + MapSet.union( + next_block_queue.block_requests, + MapSet.new(next_block_queue.needed_block_hashes) + ) + }, + [{:bodies, next_block_queue.needed_block_hashes} | requests] + } + end + + { + next_block_queue_2, + Enum.reverse(requests) + } + end + + @doc """ + Adds new block headers to the block queue. + """ + @spec new_block_headers(t(), BlockHeaders.t(), Peer.t()) :: t() + def new_block_headers( + block_queue = %__MODULE__{}, + %BlockHeaders{headers: headers}, + peer + ) do + Enum.reduce(headers, block_queue, fn header, curr_block_queue -> + header_hash = Header.hash(header) + bq = add_header(curr_block_queue, header, header_hash, peer.remote_id) + IO.inspect(["Queue Size", Enum.count(bq.queue)]) + bq + end) + end + + @doc """ + Adds new block bodies to the block queue. + """ + @spec new_block_bodies(t(), BlockBodies.t()) :: t() + def new_block_bodies( + block_queue = %__MODULE__{}, + %BlockBodies{blocks: blocks} + ) do + Enum.reduce(blocks, block_queue, fn block_body, curr_block_queue -> + add_block_struct(curr_block_queue, block_body) + end) + end + + @doc """ + Adds new node data to the block queue. + """ + @spec new_node_data(t(), NodeData.t()) :: t() + def new_node_data( + block_queue = %__MODULE__{}, + %NodeData{values: values} + ) do + :ok = + Exth.trace(fn -> + "#{__MODULE__} Got and ignoring #{Enum.count(values)} node data value(s)." + end) + + block_queue + end + + @doc """ + Adds new receipts to the block queue. + """ + @spec new_receipts(t(), Receipts.t()) :: t() + def new_receipts( + block_queue = %__MODULE__{}, + %Receipts{receipts: receipts} + ) do + :ok = + Exth.trace(fn -> "#{__MODULE__} Got and ignoring #{Enum.count(receipts)} receipt(s)." end) + + block_queue + end + + # Adds a given header received by a peer to a block queue. Returns whether or + # not we should request the block body. + + # Note: we will process it if the block is empty (i.e. has neither transactions + # nor ommers). + @spec add_header(t(), Header.t(), EVM.hash(), binary()) :: t() + def add_header( + block_queue = %__MODULE__{ + queue: queue, + needed_block_hashes: needed_block_hashes + }, + header, + header_hash, + remote_id + ) do + block_map = Map.get(queue, header.number, %{}) + + {next_block_map, next_needed_block_hashes} = + case Map.get(block_map, header_hash) do + nil -> + # may already be ready, already. + is_empty = is_block_empty?(header) + + next_block_map_inner = + Map.put(block_map, header_hash, %{ + commitments: MapSet.new([remote_id]), + block: %Block{header: header}, + ready: is_empty + }) + + next_needed_block_hashes = + if is_empty do + needed_block_hashes + else + [header_hash | needed_block_hashes] + end + + {next_block_map_inner, next_needed_block_hashes} + + block_item -> + {Map.put(block_map, header_hash, %{ + block_item + | commitments: MapSet.put(block_item.commitments, remote_id) + }), needed_block_hashes} + end + + %{ + block_queue + | queue: Map.put(queue, header.number, next_block_map |> IO.inspect()), + block_numbers: MapSet.put(block_queue.block_numbers, header.number), + needed_block_hashes: next_needed_block_hashes, + header_requests: MapSet.delete(block_queue.header_requests, header.number) + } + end + + @doc """ + Adds a given block struct received by a peer to a block queue. + + Since we don't really know which block this belongs to, we're going to just + need to look at every block and try and guess. + + To guess, we'll compute the transactions root and ommers hash, and then try + and find a header that matches it. For empty blocks (ones with no transactions + and no ommers, there may be several matches. Otherwise, each block body should + pretty much be unique). + """ + @spec add_block_struct( + t(), + BlockStruct.t() + ) :: t() + def add_block_struct( + block_queue = %__MODULE__{ + queue: queue + }, + block_struct + ) do + transactions_root = get_transactions_root(block_struct.transactions_rlp) + ommers_hash = get_ommers_hash(block_struct.ommers_rlp) + Exth.inspect(queue, "queue") + + updated_queue = + Enum.reduce(queue, queue, fn {number, block_map}, curr_queue -> + updated_block_map = + Enum.reduce(block_map, block_map, fn {hash, block_item}, curr_block_map -> + if block_item.block.header.transactions_root == transactions_root and + block_item.block.header.ommers_hash == ommers_hash do + IO.inspect("yes") + # This is now ready! (though, it may not still have enough commitments) + block = %{ + block_item.block + | transactions: block_struct.transactions, + ommers: block_struct.ommers + } + + Map.put(curr_block_map, hash, %{block_item | block: block, ready: true}) + else + IO.inspect("no") + curr_block_map + end + end) + + Map.put(curr_queue, number, updated_block_map) + end) + + %{block_queue | queue: updated_queue |> Exth.inspect("new queue")} + end + + @spec processed_blocks(t(), list(EVM.hash()), Block.t()) :: t() + def processed_blocks( + block_queue = %{ + block_tree: block_tree, + processing_blocks: processing_blocks + }, + block_hashes, + best_block + ) do + next_processing_blocks = + Enum.reduce(block_hashes, processing_blocks, fn block_hash, curr_processing_blocks -> + Map.delete(curr_processing_blocks, block_hash) + end) + + next_block_tree = + if best_block do + Blocktree.update_best_block(block_tree, best_block) + else + block_tree + end + + %{block_queue | processing_blocks: next_processing_blocks, block_tree: next_block_tree} + end + # Tries to get the transaction root by encoding the transaction trie @spec get_transactions_root([ExRLP.t()]) :: MerklePatriciaTree.Trie.root_hash() defp get_transactions_root(transactions_rlp) do diff --git a/apps/ex_wire/lib/ex_wire/sync.ex b/apps/ex_wire/lib/ex_wire/sync.ex index 69f4d3bff..5a1923255 100644 --- a/apps/ex_wire/lib/ex_wire/sync.ex +++ b/apps/ex_wire/lib/ex_wire/sync.ex @@ -15,20 +15,21 @@ defmodule ExWire.Sync do require Logger - alias Block.Header - alias Blockchain.{Block, Blocktree, Blocktree.State, Chain} + alias Blockchain.{Blocktree, Chain} alias Exth.Time alias ExWire.Packet alias ExWire.PeerSupervisor alias ExWire.Struct.{BlockQueue, Peer, WarpQueue} - alias ExWire.Sync.{WarpProcessor, WarpState} - alias MerklePatriciaTree.{DB, Trie, TrieStorage} + alias ExWire.Sync.{BlockProcessor, BlockState, WarpProcessor, WarpState} + alias MerklePatriciaTree.{Trie, TrieStorage} alias ExWire.Packet.Capability.Eth.{ BlockBodies, BlockHeaders, GetBlockBodies, - GetBlockHeaders + GetBlockHeaders, + NodeData, + Receipts } alias ExWire.Packet.Capability.Par.{ @@ -40,8 +41,6 @@ defmodule ExWire.Sync do alias ExWire.Packet.Capability.Par.SnapshotData.{BlockChunk, StateChunk} - @save_block_interval 100 - @blocks_per_request 100 @startup_delay 10_000 @retry_delay 5_000 @request_limit 5 @@ -57,6 +56,7 @@ defmodule ExWire.Sync do starting_block_number: non_neg_integer() | nil, highest_block_number: non_neg_integer() | nil, warp: boolean(), + block_processor: GenServer.server(), warp_processor: GenServer.server() } @@ -68,12 +68,17 @@ defmodule ExWire.Sync do @doc """ Starts a sync process for a given chain. """ - @spec start_link({Trie.t(), Chain.t(), boolean(), WarpQueue.t() | nil}, Keyword.t()) :: - GenServer.on_start() - def start_link({trie, chain, warp, warp_queue}, opts \\ []) do + @spec start_link( + {Trie.t(), Chain.t(), BlockQueue.t(), boolean(), WarpQueue.t() | nil, boolean()}, + Keyword.t() + ) :: GenServer.on_start() + def start_link({trie, chain, block_queue, warp, warp_queue}, opts \\ []) do + block_processor = Keyword.get(opts, :block_processor, BlockProcessor) warp_processor = Keyword.get(opts, :warp_processor, WarpProcessor) - GenServer.start_link(__MODULE__, {trie, chain, warp, warp_queue, warp_processor}, + GenServer.start_link( + __MODULE__, + {trie, chain, block_queue, block_processor, warp, warp_queue, warp_processor}, name: Keyword.get(opts, :name, __MODULE__) ) end @@ -87,22 +92,19 @@ defmodule ExWire.Sync do We should handle this case more gracefully. """ @impl true - def init({trie, chain, warp, warp_queue, warp_processor}) do - block_tree = load_sync_state(TrieStorage.permanent_db(trie)) - block_queue = %BlockQueue{} - - {:ok, {block, _caching_trie}} = Blocktree.get_best_block(block_tree, chain, trie) + def init({trie, chain, block_queue, block_processor, warp, warp_queue, warp_processor}) do + {:ok, {block, _caching_trie}} = Blocktree.get_best_block(block_queue.block_tree, chain, trie) state = %{ chain: chain, block_queue: block_queue, warp_queue: warp_queue, - block_tree: block_tree, trie: trie, last_requested_block: nil, starting_block_number: block.header.number, highest_block_number: block.header.number, warp: warp, + block_processor: block_processor, warp_processor: warp_processor } @@ -118,9 +120,10 @@ defmodule ExWire.Sync do state end else - request_next_block(@startup_delay) - - state + %{ + state + | block_queue: dispatch_new_block_queue_requests(block_queue) + } end {:ok, next_state} @@ -131,6 +134,19 @@ defmodule ExWire.Sync do end @impl true + def handle_cast( + {:processed_blocks, processed_blocks, best_block}, + state = %{block_queue: block_queue} + ) do + next_state = + block_queue + |> BlockQueue.processed_blocks(processed_blocks, best_block) + |> dispatch_new_block_queue_requests() + |> save_and_check_block_state(state) + + {:noreply, next_state} + end + def handle_cast( {:processed_block_chunk, chunk_hash, processed_blocks, block}, state = %{warp_queue: warp_queue} @@ -164,11 +180,52 @@ defmodule ExWire.Sync do @impl true def handle_info( :request_next_block, - state = %{block_queue: block_queue, block_tree: block_tree} + state = %{block_queue: block_queue} ) do - new_state = handle_request_next_block(block_queue, block_tree, state) + next_block_queue = dispatch_new_block_queue_requests(block_queue) - {:noreply, new_state} + {:noreply, + %{ + state + | block_queue: next_block_queue + }} + end + + def handle_info({:get_header_hashes, header_hashes}, state) do + if send_with_retry( + %GetBlockBodies{ + hashes: header_hashes + }, + :random, + {:get_header_hashes, header_hashes} + ) do + :ok = + Logger.debug(fn -> + "[Sync] Requested #{Enum.count(header_hashes)} block bodies on retry" + end) + end + + {:noreply, state} + end + + def handle_info({:get_block_headers, block_number, count}, state) do + if send_with_retry( + %GetBlockHeaders{ + block_identifier: block_number, + max_headers: count, + skip: 0, + reverse: false + }, + :random, + {:get_block_headers, block_number, count} + ) do + :ok = + Logger.debug(fn -> + "[Sync] Requested block headers #{block_number}..#{block_number + count} on retry" + end) + end + + {:noreply, state} end def handle_info(:request_manifest, state) do @@ -191,14 +248,19 @@ defmodule ExWire.Sync do {:noreply, handle_block_bodies(block_bodies, state)} end + def handle_info({:packet, %NodeData{} = node_data, _peer}, state) do + {:noreply, handle_node_data(node_data, state)} + end + + def handle_info({:packet, %Receipts{} = receipts, _peer}, state) do + {:noreply, handle_receipts(receipts, state)} + end + def handle_info({:packet, %SnapshotManifest{} = snapshot_manifest, peer}, state) do {:noreply, handle_snapshot_manifest(snapshot_manifest, peer, state)} end - def handle_info( - {:packet, %SnapshotData{} = snapshot_data, peer}, - state - ) do + def handle_info({:packet, %SnapshotData{} = snapshot_data, peer}, state) do {:noreply, handle_snapshot_data(snapshot_data, peer, state)} end @@ -245,32 +307,6 @@ defmodule ExWire.Sync do state end - @doc """ - Dispatches a packet of `GetBlockHeaders` to a peer for the next block - number that we don't have in our block queue or state tree. - """ - @spec handle_request_next_block(BlockQueue.t(), Blocktree.t(), state()) :: state() - def handle_request_next_block(block_queue, block_tree, state) do - next_block_to_request = get_next_block_to_request(block_queue, block_tree) - - if send_with_retry( - %GetBlockHeaders{ - block_identifier: next_block_to_request, - max_headers: @blocks_per_request, - skip: 0, - reverse: false - }, - :random, - :request_next_block - ) do - :ok = Logger.debug(fn -> "[Sync] Requested block #{next_block_to_request}" end) - - Map.put(state, :last_requested_block, next_block_to_request + @blocks_per_request) - else - state - end - end - @doc """ When we receive a new snapshot manifest, we add it to our warp queue. We may have new blocks to fetch, so we ask the warp queue for more blocks to @@ -298,22 +334,6 @@ defmodule ExWire.Sync do next_state end - @spec dispatch_new_warp_queue_requests(WarpQueue.t(), integer(), integer()) :: WarpQueue.t() - defp dispatch_new_warp_queue_requests( - warp_queue, - request_limit \\ @request_limit, - queue_limit \\ @queue_limit - ) do - {new_warp_queue, hashes_to_request} = - WarpQueue.get_hashes_to_request(warp_queue, request_limit, queue_limit) - - for hash <- hashes_to_request do - request_chunk(hash) - end - - new_warp_queue - end - @doc """ When we receive a SnapshotData, let's try to add the received block to the warp queue. We may decide to request new blocks at this time. @@ -374,62 +394,14 @@ defmodule ExWire.Sync do peer, state = %{ block_queue: block_queue, - block_tree: block_tree, - chain: chain, - trie: trie, - highest_block_number: highest_block_number + block_processor: block_processor, + chain: chain } ) do - {next_highest_block_number, next_block_queue, next_block_tree, next_trie, header_hashes} = - Enum.reduce( - block_headers.headers, - {highest_block_number, block_queue, block_tree, trie, []}, - fn header, {highest_block_number, block_queue, block_tree, trie, header_hashes} -> - header_hash = header |> Header.hash() - - {next_block_queue, next_block_tree, next_trie, should_request_block} = - BlockQueue.add_header( - block_queue, - block_tree, - header, - header_hash, - peer.remote_id, - chain, - trie - ) - - next_header_hashes = - if should_request_block do - :ok = Logger.debug(fn -> "[Sync] Requesting block body #{header.number}" end) - - [header_hash | header_hashes] - else - header_hashes - end - - next_highest_block_number = Kernel.max(highest_block_number, header.number) - - {next_highest_block_number, next_block_queue, next_block_tree, next_trie, - next_header_hashes} - end - ) - - :ok = - PeerSupervisor.send_packet( - %GetBlockBodies{ - hashes: header_hashes - }, - :random - ) - - next_maybe_saved_trie = maybe_save(block_tree, next_block_tree, next_trie) - :ok = maybe_request_next_block(next_block_queue) - - state - |> Map.put(:block_queue, next_block_queue) - |> Map.put(:block_tree, next_block_tree) - |> Map.put(:trie, next_maybe_saved_trie) - |> Map.put(:highest_block_number, next_highest_block_number) + BlockQueue.new_block_headers(block_queue, block_headers, peer) + |> process_completed_blocks(block_processor, chain) + |> dispatch_new_block_queue_requests() + |> save_and_check_block_state(state) end @doc """ @@ -445,78 +417,133 @@ defmodule ExWire.Sync do block_bodies, state = %{ block_queue: block_queue, - block_tree: block_tree, - chain: chain, - trie: trie + block_processor: block_processor, + chain: chain } ) do - {next_block_queue, next_block_tree, next_trie} = - Enum.reduce(block_bodies.blocks, {block_queue, block_tree, trie}, fn block_body, - {block_queue, - block_tree, trie} -> - BlockQueue.add_block_struct(block_queue, block_tree, block_body, chain, trie) - end) - - next_maybe_saved_trie = maybe_save(block_tree, next_block_tree, next_trie) - :ok = maybe_request_next_block(next_block_queue) + BlockQueue.new_block_bodies(block_queue, block_bodies) + |> process_completed_blocks(block_processor, chain) + |> dispatch_new_block_queue_requests() + |> save_and_check_block_state(state) + end - state - |> Map.put(:block_queue, next_block_queue) - |> Map.put(:block_tree, next_block_tree) - |> Map.put(:trie, next_maybe_saved_trie) - end - - # Determines the next block we don't yet have in our blocktree and - # dispatches a request to all connected peers for that block and the - # next `n` blocks after it. - @spec get_next_block_to_request(BlockQueue.t(), Blocktree.t()) :: integer() - defp get_next_block_to_request(block_queue, block_tree) do - # This is the best we know about - next_number = - case block_tree.best_block do - nil -> 0 - %Block{header: %Header{number: number}} -> number + 1 - end + @doc """ + We are not currently doing anything with PV63 node data + """ + @spec handle_node_data(NodeData.t(), state()) :: state() + def handle_node_data( + node_data, + state = %{ + block_queue: block_queue, + block_processor: block_processor, + chain: chain + } + ) do + BlockQueue.new_node_data(block_queue, node_data) + |> process_completed_blocks(block_processor, chain) + |> dispatch_new_block_queue_requests() + |> save_and_check_block_state(state) + end - # But we may have it queued up already in the block queue, let's - # start from the first we *don't* know about. It's possible there's - # holes in block queue, so it's not `max(best_block.number, max(keys(queue)))`, - # though it could be... - next_number - |> Stream.iterate(fn n -> n + 1 end) - |> Stream.reject(fn n -> MapSet.member?(block_queue.block_numbers, n) end) - |> Enum.at(0) + @doc """ + We are not currently doing anything with PV63 receipts + """ + @spec handle_receipts(Receipts.t(), state()) :: state() + def handle_receipts( + receipts, + state = %{ + block_queue: block_queue, + block_processor: block_processor, + chain: chain + } + ) do + BlockQueue.new_receipts(block_queue, receipts) + |> process_completed_blocks(block_processor, chain) + |> dispatch_new_block_queue_requests() + |> save_and_check_block_state(state) end - @spec maybe_save(Blocktree.t(), Blocktree.t(), Trie.t()) :: Trie.t() - defp maybe_save(block_tree, next_block_tree, trie) do - if block_tree != next_block_tree do - block_number = next_block_tree.best_block.header.number + @spec dispatch_new_warp_queue_requests(WarpQueue.t(), integer(), integer()) :: WarpQueue.t() + defp dispatch_new_warp_queue_requests( + warp_queue, + request_limit \\ @request_limit, + queue_limit \\ @queue_limit + ) do + {new_warp_queue, hashes_to_request} = + WarpQueue.get_hashes_to_request(warp_queue, request_limit, queue_limit) - if rem(block_number, @save_block_interval) == 0 do - save_sync_state(next_block_tree, trie) - else - trie - end - else - trie + for hash <- hashes_to_request do + Process.send_after(self(), {:request_chunk, hash}, 0) end - end - @spec request_chunk(EVM.hash()) :: reference() - defp request_chunk(chunk_hash) do - Process.send_after(self(), {:request_chunk, chunk_hash}, 0) + new_warp_queue end - @spec maybe_request_next_block(BlockQueue.t()) :: :ok - defp maybe_request_next_block(block_queue) do - # Let's pull a new block if we have none left - _ = - if block_queue.queue == %{} do - request_next_block() + @spec process_completed_blocks(BlockQueue.t(), GenServer.server(), Chain.t()) :: BlockQueue.t() + defp process_completed_blocks(block_queue, block_processor, chain) do + {next_block_queue, blocks} = BlockQueue.get_complete_blocks(block_queue) + + BlockProcessor.process_completed_blocks(block_processor, blocks, chain) + + next_block_queue + end + + # Dispatches new requests for headers or block data based on what is + # required in the block queue. + @spec dispatch_new_block_queue_requests(BlockQueue.t()) :: BlockQueue.t() + defp dispatch_new_block_queue_requests(block_queue) do + {next_block_queue, requests} = BlockQueue.get_requests(block_queue) + + for request <- requests do + case request do + {:headers, block_number, count} -> + :ok = + Logger.debug(fn -> + "[Sync] Requesting new block headers from #{block_number}..#{block_number + count}." + end) + + unless send_with_retry( + %GetBlockHeaders{ + block_identifier: block_number, + max_headers: count, + skip: 0, + reverse: false + }, + :random, + {:get_block_headers, block_number, count} + ) do + :ok = Logger.debug(fn -> "[Sync] Failed to request block headers" end) + end + + {:bodies, header_hashes} -> + :ok = + Logger.debug(fn -> + "[Sync] Requesting new block bodies for #{Enum.count(header_hashes)} header hash(es)." + end) + + unless send_with_retry( + %GetBlockBodies{ + hashes: header_hashes + }, + :random, + {:get_header_hashes, header_hashes} + ) do + :ok = Logger.debug(fn -> "[Sync] Failed to request header hashes" end) + end end + end - :ok + next_block_queue + end + + @spec save_and_check_block_state(BlockQueue.t(), state(), boolean()) :: state() + def save_and_check_block_state(block_queue, state = %{trie: trie}, save \\ true) do + if save do + :ok = BlockState.save_block_queue(TrieStorage.permanent_db(trie), block_queue) + end + + # TODO: What is check block state for here? + state end @spec save_and_check_warp_complete(WarpQueue.t(), state(), boolean()) :: state() @@ -541,7 +568,8 @@ defmodule ExWire.Sync do Logger.info("[Warp] Warp Completed in #{Time.elapsed(warp_queue.warp_start, :second)}") # Save our process - saved_tried = save_sync_state(warp_queue.block_tree, trie) + # TODO... + # saved_trie = save_sync_state(warp_queue.block_tree, trie) # Request a normal sync to start request_next_block() @@ -552,30 +580,12 @@ defmodule ExWire.Sync do %{ state | warp_queue: warp_queue, - trie: saved_tried, + trie: trie, warp: false } end end - # Loads sync state from our backing database - @spec load_sync_state(DB.db()) :: Blocktree.t() - defp load_sync_state(db) do - State.load_tree(db) - end - - # Save sync state from our backing database. - @spec save_sync_state(Blocktree.t(), Trie.t()) :: Trie.t() - defp save_sync_state(blocktree, trie) do - committed_trie = TrieStorage.commit!(trie) - - committed_trie - |> TrieStorage.permanent_db() - |> State.save_tree(blocktree) - - committed_trie - end - @spec send_with_retry( Packet.packet(), PeerSupervisor.node_selector(), diff --git a/apps/ex_wire/lib/ex_wire/sync/block_processor.ex b/apps/ex_wire/lib/ex_wire/sync/block_processor.ex new file mode 100644 index 000000000..c9fe8565f --- /dev/null +++ b/apps/ex_wire/lib/ex_wire/sync/block_processor.ex @@ -0,0 +1,187 @@ +defmodule ExWire.Sync.BlockProcessor do + @moduledoc """ + + """ + use GenServer + + require Logger + + alias Blockchain.{Block, Blocktree, Chain} + alias Exth.Time + alias ExWire.Sync.BlockProcessor.StandardProcessor + alias MerklePatriciaTree.{Trie, TrieStorage} + + @callback process_blocks( + list(Block.t()), + Blocktree.t(), + backlog(), + Chain.t(), + Trie.t(), + boolean() + ) :: {list(EVM.hash()), Blocktree.t(), backlog(), Trie.t()} + + @type backlog :: %{EVM.hash() => list(Block.t())} + + @type state :: %{ + sup: pid(), + block_processing_task: Task.t(), + queue_blocks_messages: list(Block.t()), + backlog: backlog(), + trie: Trie.t() + } + + @name __MODULE__ + + @doc """ + Initializes a new BlockProcessor server. + """ + @spec start_link({Trie.t()}, Keyword.t()) :: GenServer.on_start() + def start_link({trie}, opts \\ []) do + GenServer.start_link( + __MODULE__, + [trie: trie], + name: Keyword.get(opts, :name, @name) + ) + end + + @doc """ + Initializes gen server with options from `start_link`. + """ + @impl true + def init(trie: trie) do + {:ok, sup} = Task.Supervisor.start_link() + + {:ok, + %{ + sup: sup, + block_processing_task: nil, + queue_blocks_messages: [], + backlog: %{}, + trie: trie + }} + end + + # When a task completes, we try to pull a new task from the queue. + @spec handle_task_complete(term(), term(), state()) :: state() + defp handle_task_complete( + ref, + status, + state = %{ + sup: sup, + trie: trie, + backlog: backlog, + block_processing_task: %Task{ref: task_ref}, + queue_blocks_messages: queue_blocks_messages + } + ) + when ref == task_ref do + case status do + {:ok, {:ok, next_backlog, next_trie}} -> + %{state | backlog: next_backlog, trie: next_trie} + + :down -> + {next_task, next_queue} = + case queue_blocks_messages do + [{:process_completed_blocks, pid, blocks, chain} | next_queue] -> + task = run_task(sup, blocks, chain, pid, backlog, trie) + + {task, next_queue} + + [] -> + {nil, []} + end + + %{state | block_processing_task: next_task, queue_blocks_messages: next_queue} + end + end + + @impl true + # Called when a task completes successfully with the return + # value of that task. + def handle_info({ref, msg}, state) do + {:noreply, handle_task_complete(ref, {:ok, msg}, state)} + end + + # Called when a task completes informing the supervisor that a child + # has terminated normally. + def handle_info({:DOWN, ref, :process, _pid, :normal}, state) do + {:noreply, handle_task_complete(ref, :down, state)} + end + + @impl true + def handle_cast( + {:process_completed_blocks, _pid, [], _chain}, + state + ) do + {:noreply, state} + end + + def handle_cast( + blocks_message = {:process_completed_blocks, pid, blocks, chain}, + state = %{ + sup: sup, + trie: trie, + backlog: backlog, + block_processing_task: task, + queue_blocks_messages: queue_blocks_messages + } + ) do + {next_task, next_queue_blocks_messages} = + if is_nil(task) do + {run_task(sup, blocks, chain, pid, backlog, trie), queue_blocks_messages} + else + {task, [blocks_message | queue_blocks_messages]} + end + |> Exth.inspect("Process completed blocks") + + {:noreply, + %{ + state + | block_processing_task: next_task, + queue_blocks_messages: next_queue_blocks_messages + }} + end + + @spec run_task(pid(), list(Block.t()), Chain.t(), pid(), backlog(), Trie.t()) :: Task.t() + defp run_task(sup, blocks, chain, pid, backlog, trie) do + Task.Supervisor.async(sup, fn -> + start = Time.time_start() + + :ok = + Logger.debug(fn -> + "[BlockProcessor] Starting to process #{Enum.count(blocks)} block(s)." + end) + + {processed_blocks, next_block_tree, next_backlog, next_trie} = + StandardProcessor.process_blocks( + blocks, + Blocktree.new_tree(), + backlog, + chain, + trie, + false + ) + + trie_elapsed = + Time.elapsed(fn -> + TrieStorage.commit!(next_trie) + end) + + :ok = + Logger.debug(fn -> + "[BlockProcessor] Processed #{Enum.count(processed_blocks)} block(s) in #{ + Time.elapsed(start) + } (trie commit time: #{trie_elapsed})." + end) + + :ok = GenServer.cast(pid, {:processed_blocks, processed_blocks, next_block_tree.best_block}) + + {:ok, next_backlog, next_trie} + end) + end + + @spec process_completed_blocks(pid(), list(Block.t()), Chain.t()) :: :ok + def process_completed_blocks(pid, blocks, chain) do + GenServer.cast(pid, {:process_completed_blocks, self(), blocks, chain}) + end +end diff --git a/apps/ex_wire/lib/ex_wire/sync/block_processor/standard_processor.ex b/apps/ex_wire/lib/ex_wire/sync/block_processor/standard_processor.ex new file mode 100644 index 000000000..6847e88fa --- /dev/null +++ b/apps/ex_wire/lib/ex_wire/sync/block_processor/standard_processor.ex @@ -0,0 +1,122 @@ +defmodule ExWire.Sync.BlockProcessor.StandardProcessor do + @moduledoc """ + + """ + require Logger + + alias Blockchain.{Block, Blocktree, Chain} + alias ExWire.Sync.BlockProcessor + alias MerklePatriciaTree.Trie + + @behaviour BlockProcessor + + @doc """ + Processes a the block queue, adding any blocks which are complete and pass + the number of confirmations to the block tree. These blocks are then removed + from the queue. Note: they may end up in the backlog, nonetheless, if we are + waiting still for the parent block. + """ + @spec process_blocks( + list(Block.t()), + Blocktree.t(), + BlockProcessor.backlog(), + Chain.t(), + Trie.t(), + boolean() + ) :: {list(EVM.hash()), Blocktree.t(), BlockProcessor.backlog(), Trie.t()} + def process_blocks( + blocks, + block_tree, + backlog, + chain, + trie, + do_validation + ) do + do_process_blocks(blocks, [], block_tree, backlog, chain, trie, do_validation) + end + + @spec do_process_blocks( + list(Block.t()), + list(EVM.hash()), + Blocktree.t(), + BlockProcessor.backlog(), + Chain.t(), + Trie.t(), + boolean() + ) :: {list(EVM.hash()), Blocktree.t(), BlockProcessor.backlog(), Trie.t()} + defp do_process_blocks([], processed_blocks, block_tree, backlog, _chain, trie, _do_validation), + do: {processed_blocks, block_tree, backlog, trie} + + defp do_process_blocks( + [block | rest], + processed_blocks, + block_tree, + backlog, + chain, + trie, + do_validation + ) do + {processed_block_hash, new_block_tree, new_trie, new_backlog, extra_blocks} = + case Blocktree.verify_and_add_block( + block_tree, + chain, + block, + trie, + do_validation + ) do + {:invalid, [:non_genesis_block_requires_parent]} -> + # Note: this is probably too slow since we see a lot of blocks without + # parents and, I think, we're running the full validity check. + + :ok = + Logger.debug(fn -> "[Block Queue] Failed to verify block due to missing parent" end) + + updated_backlog = + Map.update( + backlog, + block.header.parent_hash, + [block], + fn blocks -> [block | blocks] end + ) + + {nil, block_tree, trie, updated_backlog, []} + + {:invalid, reasons} -> + :ok = + Logger.debug(fn -> + "[Block Queue] Failed to verify block ##{block.header.number} due to #{ + inspect(reasons) + }" + end) + + {nil, block_tree, trie, backlog, []} + + {:ok, {new_block_tree, new_trie, block_hash}} -> + # Weird that we can't verify block 0.... + + :ok = + Logger.debug(fn -> + "[Block Queue] Verified block ##{block.header.number} (0x#{ + Base.encode16(block_hash, case: :lower) + })" + end) + + {backlogged_blocks, new_backlog} = Map.pop(backlog, block_hash, []) + + {block_hash, new_block_tree, new_trie, new_backlog, backlogged_blocks} + end + + do_process_blocks( + extra_blocks ++ rest, + if(processed_block_hash, + do: [processed_block_hash | processed_blocks], + else: processed_blocks + ), + new_block_tree, + new_backlog, + chain, + new_trie, + do_validation + ) + end +end diff --git a/apps/ex_wire/lib/ex_wire/sync/block_state.ex b/apps/ex_wire/lib/ex_wire/sync/block_state.ex new file mode 100644 index 000000000..35df744b9 --- /dev/null +++ b/apps/ex_wire/lib/ex_wire/sync/block_state.ex @@ -0,0 +1,46 @@ +defmodule ExWire.Sync.BlockState do + @moduledoc """ + This module exposes functions to store and load the current state of + a block sync in the database. + """ + require Logger + + alias ExWire.Struct.BlockQueue + alias MerklePatriciaTree.DB + + @key "current_block_queue_9" + + @doc """ + Loads the current block queue from database. + """ + @spec load_block_queue(DB.db()) :: WarpQueue.t() + def load_block_queue(db) do + case DB.get(db, @key) do + {:ok, current_block_queue} -> + :erlang.binary_to_term(current_block_queue) + + :not_found -> + %BlockQueue{} + end + end + + @doc """ + Stores the current block queue into the database. + """ + @spec save_block_queue(DB.db(), BlockQueue.t()) :: :ok + def save_block_queue(db, block_queue) do + :ok = Logger.debug(fn -> "Saving block queue..." end) + + DB.put!( + db, + @key, + :erlang.term_to_binary(%{ + block_queue + | header_requests: MapSet.new(), + block_requests: MapSet.new() + }) + ) + + :ok + end +end diff --git a/apps/ex_wire/test/ex_wire/packet/capability/eth/get_node_data_test.exs b/apps/ex_wire/test/ex_wire/packet/capability/eth/get_node_data_test.exs new file mode 100644 index 000000000..dd81b8c3c --- /dev/null +++ b/apps/ex_wire/test/ex_wire/packet/capability/eth/get_node_data_test.exs @@ -0,0 +1,25 @@ +defmodule ExWire.Packet.Capability.Eth.GetNodeDataTest do + use ExUnit.Case, async: true + doctest ExWire.Packet.Capability.Eth.GetNodeData + + describe "handle/1" do + test "responds to request" do + ExWire.BridgeSyncMock.start_link(%{}) + + MerklePatriciaTree.Test.random_ets_db() + |> MerklePatriciaTree.Trie.new() + |> MerklePatriciaTree.TrieStorage.put_raw_key!(<<2::256>>, "mana") + |> ExWire.BridgeSyncMock.set_current_trie() + + handle_response = + %ExWire.Packet.Capability.Eth.GetNodeData{hashes: [<<1::256>>, <<2::256>>]} + |> ExWire.Packet.Capability.Eth.GetNodeData.handle() + + assert handle_response == + {:send, + %ExWire.Packet.Capability.Eth.NodeData{ + values: ["mana"] + }} + end + end +end diff --git a/apps/ex_wire/test/ex_wire/packet/capability/eth/get_receipts_test.exs b/apps/ex_wire/test/ex_wire/packet/capability/eth/get_receipts_test.exs new file mode 100644 index 000000000..f27000025 --- /dev/null +++ b/apps/ex_wire/test/ex_wire/packet/capability/eth/get_receipts_test.exs @@ -0,0 +1,39 @@ +defmodule ExWire.Packet.Capability.Eth.GetReceiptsTest do + use ExUnit.Case, async: true + doctest ExWire.Packet.Capability.Eth.GetReceipts + + alias Blockchain.Transaction.Receipt + + describe "handle/1" do + test "respond to request" do + ExWire.BridgeSyncMock.start_link(%{}) + + receipt = %Receipt{ + state: <<1, 2, 3>>, + cumulative_gas: 5, + bloom_filter: <<2, 3, 4>>, + logs: [] + } + + receipt_rlp_bin = + receipt + |> Receipt.serialize() + |> ExRLP.encode() + + MerklePatriciaTree.Test.random_ets_db() + |> MerklePatriciaTree.Trie.new() + |> MerklePatriciaTree.TrieStorage.put_raw_key!(<<2::256>>, receipt_rlp_bin) + |> ExWire.BridgeSyncMock.set_current_trie() + + handle_response = + %ExWire.Packet.Capability.Eth.GetReceipts{hashes: [<<1::256>>, <<2::256>>]} + |> ExWire.Packet.Capability.Eth.GetReceipts.handle() + + assert handle_response == + {:send, + %ExWire.Packet.Capability.Eth.Receipts{ + receipts: [receipt] + }} + end + end +end diff --git a/apps/ex_wire/test/ex_wire/packet/capability/eth/new_block_test.exs b/apps/ex_wire/test/ex_wire/packet/capability/eth/new_block_test.exs new file mode 100644 index 000000000..d7d44839b --- /dev/null +++ b/apps/ex_wire/test/ex_wire/packet/capability/eth/new_block_test.exs @@ -0,0 +1,5 @@ +defmodule ExWire.Packet.Capability.Eth.NewBlockTest do + use ExUnit.Case, async: true + alias ExWire.Packet.Capability.Eth.NewBlock + doctest NewBlock +end diff --git a/apps/ex_wire/test/ex_wire/packet/capability/eth/node_data_test.exs b/apps/ex_wire/test/ex_wire/packet/capability/eth/node_data_test.exs new file mode 100644 index 000000000..71f0b3bf6 --- /dev/null +++ b/apps/ex_wire/test/ex_wire/packet/capability/eth/node_data_test.exs @@ -0,0 +1,4 @@ +defmodule ExWire.Packet.Capability.Eth.NodeDataTest do + use ExUnit.Case, async: true + doctest ExWire.Packet.Capability.Eth.NodeData +end diff --git a/apps/ex_wire/test/ex_wire/packet/capability/eth/receipts_test.exs b/apps/ex_wire/test/ex_wire/packet/capability/eth/receipts_test.exs new file mode 100644 index 000000000..63100d8d4 --- /dev/null +++ b/apps/ex_wire/test/ex_wire/packet/capability/eth/receipts_test.exs @@ -0,0 +1,4 @@ +defmodule ExWire.Packet.Capability.Eth.ReceiptsTest do + use ExUnit.Case, async: true + doctest ExWire.Packet.Capability.Eth.Receipts +end