diff --git a/Dokumentacija za KiDS projekat.pdf b/Dokumentacija za KiDS projekat.pdf new file mode 100644 index 0000000..487933b Binary files /dev/null and b/Dokumentacija za KiDS projekat.pdf differ diff --git a/config0 b/config0 index 68fec26..d00268f 100644 --- a/config0 +++ b/config0 @@ -6,7 +6,7 @@ "strongLimit": 1000, "jobs": [ { - "name": "triangle", + "name": "upside_triangle", "pointCount": 3, "p": 0.3, "width": 200, @@ -25,6 +25,27 @@ "y": 0 } ] + }, + { + "name": "triangle", + "pointCount": 3, + "p": 0.3, + "width": 200, + "height": 200, + "mainPoints": [ + { + "x": 0, + "y": 199 + }, + { + "x": 99, + "y": 99 + }, + { + "x": 199, + "y": 199 + } + ] } ] } \ No newline at end of file diff --git a/config1 b/config1 index 166ded2..750c1d1 100644 --- a/config1 +++ b/config1 @@ -6,7 +6,7 @@ "strongLimit": 1000, "jobs": [ { - "name": "triangle", + "name": "upside_triangle", "pointCount": 3, "p": 0.3, "width": 200, @@ -25,6 +25,27 @@ "y": 0 } ] + }, + { + "name": "triangle", + "pointCount": 3, + "p": 0.3, + "width": 200, + "height": 200, + "mainPoints": [ + { + "x": 0, + "y": 199 + }, + { + "x": 99, + "y": 99 + }, + { + "x": 199, + "y": 199 + } + ] } ] } \ No newline at end of file diff --git a/config2 b/config2 index 3a9c371..996d159 100644 --- a/config2 +++ b/config2 @@ -6,7 +6,7 @@ "strongLimit": 1000, "jobs": [ { - "name": "triangle", + "name": "upside_triangle", "pointCount": 3, "p": 0.3, "width": 200, @@ -25,6 +25,27 @@ "y": 0 } ] + }, + { + "name": "triangle", + "pointCount": 3, + "p": 0.3, + "width": 200, + "height": 200, + "mainPoints": [ + { + "x": 0, + "y": 199 + }, + { + "x": 99, + "y": 99 + }, + { + "x": 199, + "y": 199 + } + ] } ] } \ No newline at end of file diff --git a/config3 b/config3 index 68fec26..5d84838 100644 --- a/config3 +++ b/config3 @@ -1,12 +1,12 @@ { - "port": 1100, + "port": 1400, "bootstrapIpAddress": "192.168.0.29", "bootstrapPort": 2000, "weakLimit": 500, "strongLimit": 1000, "jobs": [ { - "name": "triangle", + "name": "upside_triangle", "pointCount": 3, "p": 0.3, "width": 200, @@ -25,6 +25,27 @@ "y": 0 } ] + }, + { + "name": "triangle", + "pointCount": 3, + "p": 0.3, + "width": 200, + "height": 200, + "mainPoints": [ + { + "x": 0, + "y": 199 + }, + { + "x": 99, + "y": 99 + }, + { + "x": 199, + "y": 199 + } + ] } ] } \ No newline at end of file diff --git a/config4 b/config4 new file mode 100644 index 0000000..7ce810d --- /dev/null +++ b/config4 @@ -0,0 +1,51 @@ +{ + "port": 1500, + "bootstrapIpAddress": "192.168.0.29", + "bootstrapPort": 2000, + "weakLimit": 500, + "strongLimit": 1000, + "jobs": [ + { + "name": "upside_triangle", + "pointCount": 3, + "p": 0.3, + "width": 200, + "height": 200, + "mainPoints": [ + { + "x": 0, + "y": 0 + }, + { + "x": 99, + "y": 99 + }, + { + "x": 199, + "y": 0 + } + ] + }, + { + "name": "triangle", + "pointCount": 3, + "p": 0.3, + "width": 200, + "height": 200, + "mainPoints": [ + { + "x": 0, + "y": 199 + }, + { + "x": 99, + "y": 99 + }, + { + "x": 199, + "y": 199 + } + ] + } + ] +} \ No newline at end of file diff --git a/config5 b/config5 new file mode 100644 index 0000000..915c0fe --- /dev/null +++ b/config5 @@ -0,0 +1,51 @@ +{ + "port": 1600, + "bootstrapIpAddress": "192.168.0.29", + "bootstrapPort": 2000, + "weakLimit": 500, + "strongLimit": 1000, + "jobs": [ + { + "name": "upside_triangle", + "pointCount": 3, + "p": 0.3, + "width": 200, + "height": 200, + "mainPoints": [ + { + "x": 0, + "y": 0 + }, + { + "x": 99, + "y": 99 + }, + { + "x": 199, + "y": 0 + } + ] + }, + { + "name": "triangle", + "pointCount": 3, + "p": 0.3, + "width": 200, + "height": 200, + "mainPoints": [ + { + "x": 0, + "y": 199 + }, + { + "x": 99, + "y": 99 + }, + { + "x": 199, + "y": 199 + } + ] + } + ] +} \ No newline at end of file diff --git a/config6 b/config6 new file mode 100644 index 0000000..7c1f7bf --- /dev/null +++ b/config6 @@ -0,0 +1,51 @@ +{ + "port": 1700, + "bootstrapIpAddress": "192.168.0.29", + "bootstrapPort": 2000, + "weakLimit": 500, + "strongLimit": 1000, + "jobs": [ + { + "name": "upside_triangle", + "pointCount": 3, + "p": 0.3, + "width": 200, + "height": 200, + "mainPoints": [ + { + "x": 0, + "y": 0 + }, + { + "x": 99, + "y": 99 + }, + { + "x": 199, + "y": 0 + } + ] + }, + { + "name": "triangle", + "pointCount": 3, + "p": 0.3, + "width": 200, + "height": 200, + "mainPoints": [ + { + "x": 0, + "y": 199 + }, + { + "x": 99, + "y": 99 + }, + { + "x": 199, + "y": 199 + } + ] + } + ] +} \ No newline at end of file diff --git a/config7 b/config7 new file mode 100644 index 0000000..2e84e3b --- /dev/null +++ b/config7 @@ -0,0 +1,51 @@ +{ + "port": 1800, + "bootstrapIpAddress": "192.168.0.29", + "bootstrapPort": 2000, + "weakLimit": 500, + "strongLimit": 1000, + "jobs": [ + { + "name": "upside_triangle", + "pointCount": 3, + "p": 0.3, + "width": 200, + "height": 200, + "mainPoints": [ + { + "x": 0, + "y": 0 + }, + { + "x": 99, + "y": 99 + }, + { + "x": 199, + "y": 0 + } + ] + }, + { + "name": "triangle", + "pointCount": 3, + "p": 0.3, + "width": 200, + "height": 200, + "mainPoints": [ + { + "x": 0, + "y": 199 + }, + { + "x": 99, + "y": 99 + }, + { + "x": 199, + "y": 199 + } + ] + } + ] +} \ No newline at end of file diff --git a/config8 b/config8 new file mode 100644 index 0000000..dcb73ea --- /dev/null +++ b/config8 @@ -0,0 +1,51 @@ +{ + "port": 1900, + "bootstrapIpAddress": "192.168.0.29", + "bootstrapPort": 2000, + "weakLimit": 500, + "strongLimit": 1000, + "jobs": [ + { + "name": "upside_triangle", + "pointCount": 3, + "p": 0.3, + "width": 200, + "height": 200, + "mainPoints": [ + { + "x": 0, + "y": 0 + }, + { + "x": 99, + "y": 99 + }, + { + "x": 199, + "y": 0 + } + ] + }, + { + "name": "triangle", + "pointCount": 3, + "p": 0.3, + "width": 200, + "height": 200, + "mainPoints": [ + { + "x": 0, + "y": 199 + }, + { + "x": 99, + "y": 99 + }, + { + "x": 199, + "y": 199 + } + ] + } + ] +} \ No newline at end of file diff --git a/diagram.png b/diagram.png new file mode 100644 index 0000000..c39e72f Binary files /dev/null and b/diagram.png differ diff --git a/komanda.txt b/komanda.txt new file mode 100644 index 0000000..0604545 --- /dev/null +++ b/komanda.txt @@ -0,0 +1,7 @@ +mix run --no-halt -- "empty.exs" config0 +mix run --no-halt -- "empty.exs" config1 +mix run --no-halt -- "empty.exs" config2 +mix run --no-halt -- "empty.exs" config3 +mix run --no-halt -- "empty.exs" config4 +mix run --no-halt -- "empty.exs" config5 +mix run --no-halt -- "empty.exs" config6 \ No newline at end of file diff --git a/lib/aggregation/aggregate.ex b/lib/aggregation/aggregate.ex index e892b94..2397ab7 100644 --- a/lib/aggregation/aggregate.ex +++ b/lib/aggregation/aggregate.ex @@ -58,6 +58,17 @@ defmodule Ekser.Aggregate do :ok end + def register_non_vital() do + Registry.register(Ekser.AggregateReg, :non_vital, nil) + end + + def close_non_vital() do + Registry.dispatch(Ekser.AggregateReg, :non_vital, fn entries -> + for {pid, _} <- entries, + do: GenServer.call(pid, :stop) + end) + end + def is_complete?(responses) do Enum.all?(Map.values(responses), fn value -> value === true end) end @@ -72,7 +83,7 @@ defmodule Ekser.Aggregate do def respond_job(message) do Registry.dispatch(Ekser.AggregateReg, {message.type, message.payload.job_name}, fn entries -> for {pid, _} <- entries, - do: GenServer.call(pid, {:response, message.sender.id, message.payload}) + do: GenServer.call(pid, {:response, message.payload}) end) end end diff --git a/lib/aggregation/child_server.ex b/lib/aggregation/child_server.ex index eecdc2f..70486fc 100644 --- a/lib/aggregation/child_server.ex +++ b/lib/aggregation/child_server.ex @@ -12,13 +12,14 @@ defmodule Ekser.ChildServer do # Server Functions @impl GenServer - def init(args) do + def init(args = [job, _]) do + Registry.register(Ekser.AggregateReg, {Ekser.Message.Entered_Cluster, job.name}, nil) {:ok, args, {:continue, :init}} end @impl GenServer def handle_continue(:init, [job, fractal_id]) do - Registry.register(Registry.AggregateRegistry, {Ekser.Message.Entered_Cluster, job.name}, nil) + Ekser.Aggregate.register_non_vital() next_id = case fractal_id do @@ -26,24 +27,32 @@ defmodule Ekser.ChildServer do _ -> fractal_id <> "0" end - {:noreply, {[], job.count, fractal_id, next_id}} + {:noreply, {[], job.count, fractal_id, next_id, false}} end @impl GenServer - def handle_call({:response, _, payload}, _from, {responses, count, fractal_id, next_id}) do + def handle_call( + {:response, payload}, + _from, + {responses, count, fractal_id, next_id, ready} + ) do new_responses = case Ekser.FractalId.is_child?(fractal_id, payload.fractal_id) do - true -> [payload | responses] - false -> responses + true -> + Ekser.Router.add_cluster_neighbour(payload) + [payload | responses] + + false -> + responses end - case length(new_responses) === count - 1 do + case length(new_responses) === count - 1 and ready do true -> - Ekser.FractalServer.redistribute(responses, next_id) - {:noreply, {[], count, next_id, next_id <> "0"}} + Ekser.FractalServer.redistribute(new_responses, next_id) + {:reply, :ok, {[], count, next_id, next_id <> "0", true}} false -> - {:noreply, {responses, count, fractal_id, next_id}} + {:reply, :ok, {new_responses, count, fractal_id, next_id, ready}} end end @@ -51,4 +60,16 @@ defmodule Ekser.ChildServer do def handle_call(:stop, _from, _) do exit(:shutdown) end + + @impl GenServer + def handle_cast(:clear, {responses, count, fractal_id, next_id, _}) do + case length(responses) === count - 1 do + true -> + Ekser.FractalServer.redistribute(responses, next_id) + {:noreply, {[], count, next_id, next_id <> "0", true}} + + false -> + {:noreply, {responses, count, fractal_id, next_id, true}} + end + end end diff --git a/lib/aggregation/cluster_server.ex b/lib/aggregation/cluster_server.ex index c61c463..dc65912 100644 --- a/lib/aggregation/cluster_server.ex +++ b/lib/aggregation/cluster_server.ex @@ -28,28 +28,16 @@ defmodule Ekser.ClusterServer do Ekser.Aggregate.continue_or_exit(responses) - try_complete(responses) - end - - @impl GenServer - def handle_call({:response, id, _}, _from, responses) do - new_responses = %{responses | id => true} - try_complete(new_responses) - end + Ekser.Aggregate.register_non_vital() - @impl GenServer - def handle_call(:stop, _from, _) do - exit(:shutdown) - end - - defp try_complete(responses) do case Ekser.Aggregate.is_complete?(responses) do - true -> complete() + true -> {:noreply, nil, {:continue, :complete}} false -> {:noreply, responses} end end - defp complete() do + @impl GenServer + def handle_continue(:complete, _) do all_nodes = Ekser.NodeStore.get_nodes([]) {curr, nodes_without_curr} = Map.pop(all_nodes, :curr) @@ -67,4 +55,22 @@ defmodule Ekser.ClusterServer do exit(:shutdown) end + + @impl GenServer + def handle_call({:response, id, _}, _from, responses) do + new_responses = %{responses | id => true} + try_complete(new_responses) + end + + @impl GenServer + def handle_call(:stop, _from, _) do + exit(:shutdown) + end + + defp try_complete(responses) do + case Ekser.Aggregate.is_complete?(responses) do + true -> {:reply, :ok, nil, {:continue, :complete}} + false -> {:reply, :ok, responses} + end + end end diff --git a/lib/aggregation/coordinator_server.ex b/lib/aggregation/coordinator_server.ex index e82b3da..536ee8d 100644 --- a/lib/aggregation/coordinator_server.ex +++ b/lib/aggregation/coordinator_server.ex @@ -38,7 +38,7 @@ defmodule Ekser.CoordinatorServer do false -> Ekser.Result.get_friendly(local_info) end - try_complete(responses, Map.put(initial_results, job.name, []), output) + {:noreply, {responses, Map.put(initial_results, job.name, []), output}} end @impl GenServer @@ -54,36 +54,11 @@ defmodule Ekser.CoordinatorServer do Ekser.Aggregate.continue_or_exit(responses) - try_complete(responses, %{}, output) + {:noreply, {responses, %{}, output}} end @impl GenServer - def handle_call({:response, id, payload}, _from, {responses, results, output}) do - new_results = Ekser.Result.merge_result(results, payload) - new_responses = %{responses | id => true} - try_complete(new_responses, new_results, output) - end - - @impl GenServer - def handle_call({:response, id, payload}, _from, {responses, results, output, job_name}) do - new_results = - case payload.job_name === job_name do - true -> results - false -> Ekser.Result.merge_result(results, payload) - end - - new_responses = %{responses | id => true} - try_complete(new_responses, new_results, output) - end - - defp try_complete(responses, results, output) do - case Ekser.Aggregate.is_complete?(responses) do - true -> complete(results, output) - false -> {:noreply, {responses, results, output}} - end - end - - defp complete(results, output) do + def handle_continue(:complete, {results, output}) do individual_results = results |> Map.pop("") @@ -104,6 +79,8 @@ defmodule Ekser.CoordinatorServer do genesis_node_stream = Stream.cycle(genesis_nodes) + Ekser.Aggregate.close_non_vital() + # 2 cases - curr in zipped genesis, curr in zipped leftover zipped_genesis = Enum.zip(genesis_nodes, individual_results) |> remove_from_genesis(curr) @@ -120,19 +97,55 @@ defmodule Ekser.CoordinatorServer do end) fn curr -> - Enum.map(updated_zipped_genesis, fn {node, result} -> - Ekser.Message.Start_Job_Genesis.new(curr, node, result) + Enum.map(updated_zipped_genesis, fn {receiver, result} -> + Ekser.Message.Start_Job_Genesis.new(curr, receiver, result) end) ++ Enum.map(zipped_leftover, fn {receiver, payload} -> Ekser.Message.Approach_Cluster.new(curr, receiver, payload) - end) ++ message + end) end |> Ekser.Router.send() + :ok = + case message != nil do + true -> + message + |> Ekser.Router.send() + + false -> + :ok + end + IO.puts(output, "Reorganized clusters.") exit(:shutdown) end + @impl GenServer + def handle_call({:response, id, payload}, _from, {responses, results, output}) do + new_results = Ekser.Result.merge_result(results, payload) + new_responses = %{responses | id => true} + try_complete(new_responses, new_results, output) + end + + @impl GenServer + def handle_call({:response, id, payload}, _from, {responses, results, output, job_name}) do + new_results = + case payload.job_name === job_name do + true -> results + false -> Ekser.Result.merge_result(results, payload) + end + + new_responses = %{responses | id => true} + try_complete(new_responses, new_results, output) + end + + defp try_complete(responses, results, output) do + case Ekser.Aggregate.is_complete?(responses) do + true -> {:reply, :ok, {results, output}, {:continue, :complete}} + false -> {:reply, :ok, {responses, results, output}} + end + end + defp remove_from_genesis(zipped_genesis, curr) do case Enum.find(zipped_genesis, fn {node, _} -> node.id === curr.id end) do nil -> @@ -151,10 +164,10 @@ defmodule Ekser.CoordinatorServer do message = case Enum.find(zipped_leftover, fn {node, _} -> node.id === curr.id end) do nil -> - [] + nil {_, genesis} -> - [fn curr -> Ekser.Message.Cluster_Knock.new(curr, genesis) end] + fn curr -> [Ekser.Message.Cluster_Knock.new(curr, genesis)] end end {Enum.reject(zipped_leftover, fn {node, _} -> node.id === curr.id end), message} diff --git a/lib/aggregation/result_server.ex b/lib/aggregation/result_server.ex index 89eda61..0d4b8d3 100644 --- a/lib/aggregation/result_server.ex +++ b/lib/aggregation/result_server.ex @@ -30,35 +30,22 @@ defmodule Ekser.ResultServer do Ekser.Aggregate.continue_or_exit(responses) + Ekser.Aggregate.register_non_vital() + initial_results = case local_info === nil do true -> [] false -> local_info.points end - try_complete(responses, initial_results, output, job.resolution) - end - - @impl GenServer - def handle_call({:response, id, payload}, _from, {responses, results, output, resolution}) do - new_results = results ++ payload.points - new_responses = %{responses | id => true} - try_complete(new_responses, new_results, output, resolution) - end - - @impl GenServer - def handle_call(:stop, _from, _) do - exit(:shutdown) - end - - defp try_complete(responses, results, output, resolution) do case Ekser.Aggregate.is_complete?(responses) do - true -> complete(results, output, resolution) - false -> {:noreply, {responses, results, output, resolution}} + true -> {:noreply, {initial_results, output, job.resolution}, {:continue, :complete}} + false -> {:noreply, {responses, initial_results, output, job.resolution}} end end - defp complete(results, output, resolution) do + @impl GenServer + def handle_continue(:complete, {results, output, resolution}) do file = "result.png" |> Path.expand() @@ -91,6 +78,25 @@ defmodule Ekser.ResultServer do exit(:shutdown) end + @impl GenServer + def handle_call({:response, id, payload}, _from, {responses, results, output, resolution}) do + new_results = results ++ payload.points + new_responses = %{responses | id => true} + try_complete(new_responses, new_results, output, resolution) + end + + @impl GenServer + def handle_call(:stop, _from, _) do + exit(:shutdown) + end + + defp try_complete(responses, results, output, resolution) do + case Ekser.Aggregate.is_complete?(responses) do + true -> {:reply, :ok, {results, output, resolution}, {:continue, :complete}} + false -> {:reply, :ok, {responses, results, output, resolution}} + end + end + defp check_rows(list, acc, png, width) do [{_, y}] = Enum.take(list, 1) diff --git a/lib/aggregation/status.ex b/lib/aggregation/status.ex index 9bb2e02..addbbcb 100644 --- a/lib/aggregation/status.ex +++ b/lib/aggregation/status.ex @@ -5,7 +5,6 @@ defmodule Ekser.Status do @impl Ekser.Serializable def create_from_json(json) when is_map(json) do - IO.inspect(json) job_name = json["jobName"] [fractal_id] = diff --git a/lib/aggregation/status_server.ex b/lib/aggregation/status_server.ex index d087b14..7d5613d 100644 --- a/lib/aggregation/status_server.ex +++ b/lib/aggregation/status_server.ex @@ -29,13 +29,26 @@ defmodule Ekser.StatusServer do Ekser.Aggregate.continue_or_exit(responses) + Ekser.Aggregate.register_non_vital() + initial_results = case local_info === nil do true -> %{} false -> Ekser.Status.get_friendly(local_info) end - try_complete(responses, initial_results, output) + case Ekser.Aggregate.is_complete?(responses) do + true -> {:noreply, {initial_results, output}, {:continue, :complete}} + false -> {:noreply, {responses, initial_results, output}} + end + end + + @impl GenServer + def handle_continue(:complete, {results, output}) do + to_print = Ekser.Status.get_status_string(results) + + IO.write(output, to_print) + exit(:shutdown) end @impl GenServer @@ -52,15 +65,8 @@ defmodule Ekser.StatusServer do defp try_complete(responses, results, output) do case Ekser.Aggregate.is_complete?(responses) do - true -> complete(results, output) - false -> {:noreply, {responses, results, output}} + true -> {:reply, :ok, {results, output}, {:continue, :complete}} + false -> {:reply, :ok, {responses, results, output}} end end - - defp complete(results, output) do - to_print = Ekser.Status.get_status_string(results) - - IO.write(output, to_print) - exit(:shutdown) - end end diff --git a/lib/communication/aggregation_messages.ex b/lib/communication/aggregation_messages.ex index f58231b..a87b42a 100644 --- a/lib/communication/aggregation_messages.ex +++ b/lib/communication/aggregation_messages.ex @@ -15,7 +15,7 @@ defmodule Ekser.Message.Start_Job do @impl Ekser.Message def send_effect(message) do - Ekser.Aggregate.respond(message) + Ekser.FractalServer.start_job(message.payload.points) end end diff --git a/lib/communication/cluster_messages.ex b/lib/communication/cluster_messages.ex index 2db9a5b..f402a72 100644 --- a/lib/communication/cluster_messages.ex +++ b/lib/communication/cluster_messages.ex @@ -16,6 +16,7 @@ defmodule Ekser.Message.Updated_Node do @impl Ekser.Message def send_effect(message) do Ekser.NodeStore.receive_node(message.payload) + :ok end end @@ -37,6 +38,8 @@ defmodule Ekser.Message.Entered_Cluster do @impl Ekser.Message def send_effect(message) do Ekser.NodeStore.receive_node(message.payload) + Ekser.Aggregate.respond_job(message) + :ok end end @@ -54,6 +57,7 @@ defmodule Ekser.Message.Cluster_Connection_Response do @impl Ekser.Message def send_effect(message) do + Ekser.Router.add_cluster_neighbour(message.sender) Ekser.Aggregate.respond(message) end end @@ -66,7 +70,7 @@ defmodule Ekser.Message.Cluster_Connection_Request do nil end - def new(curr, receiver) do + def new(curr, receiver, _) do Ekser.Message.new(__MODULE__, curr, receiver, [], nil) end @@ -101,6 +105,9 @@ defmodule Ekser.Message.Cluster_Welcome do @impl Ekser.Message def send_effect(message) do + job = Ekser.JobStore.get_job_by_name(message.payload["job_name"]) + Ekser.FractalServer.join_cluster(job, message.payload["fractal_id"]) + Ekser.ClusterServer.child_spec([message.payload["job_name"], message.payload["fractal_id"]]) |> Ekser.Aggregate.new() @@ -122,7 +129,7 @@ defmodule Ekser.Message.Cluster_Knock do @impl Ekser.Message def send_effect(message) do - case Ekser.NodeStore.get_next_fractal_id() do + case Ekser.NodeStore.get_next_fractal_id(message.sender) do :error -> :ok @@ -175,6 +182,7 @@ defmodule Ekser.Message.Start_Job_Genesis do @impl Ekser.Message def send_effect(message) do job = Ekser.JobStore.get_job_by_name(message.payload.job_name) + Ekser.Router.wipe_cluster_neighbours() Ekser.FractalServer.join_cluster(job, "0") Ekser.FractalServer.start_job(message.payload.points) diff --git a/lib/communication/message.ex b/lib/communication/message.ex index c32292b..ef9f34a 100644 --- a/lib/communication/message.ex +++ b/lib/communication/message.ex @@ -72,8 +72,11 @@ defmodule Ekser.Message do payload: payload } else - {false, message} -> {:error, message} - {:error, message} -> {:error, message} + {false, message} -> + {:error, message} + + {:error, message} -> + {:error, message} end end diff --git a/lib/communication/network_messages.ex b/lib/communication/network_messages.ex index 084d944..bb54ae5 100644 --- a/lib/communication/network_messages.ex +++ b/lib/communication/network_messages.ex @@ -16,6 +16,7 @@ defmodule Ekser.Message.Entered do @impl Ekser.Message def send_effect(message) do Ekser.NodeStore.enter_network(message.payload) + Ekser.Router.update_last_id(message.payload.id) end end @@ -132,7 +133,12 @@ defmodule Ekser.Message.System_Knock do map = Ekser.NodeStore.introduce_new() jobs = Ekser.JobStore.get_all_jobs() dht = Ekser.DHT.new(map.id, map.nodes, jobs) - {:send, fn curr -> [Ekser.Message.Welcome.new(curr, message.sender, dht)] end} + new_sender = %Ekser.Node{message.sender | id: map.id} + + {:send, + fn curr -> + [Ekser.Message.Welcome.new(curr, new_sender, dht)] + end} end end diff --git a/lib/dht/node.ex b/lib/dht/node.ex index d2a910d..9db5793 100644 --- a/lib/dht/node.ex +++ b/lib/dht/node.ex @@ -48,6 +48,13 @@ defmodule Ekser.Node do end end +defimpl String.Chars, for: Ekser.Node do + def to_string(node) do + string_ip = Ekser.TCP.to_ip(node.ip) + "Node #{node.id} at #{string_ip}:#{node.port}" + end +end + defimpl Jason.Encoder, for: Ekser.Node do def encode(value, opts) do map = %{ diff --git a/lib/dht/node_map.ex b/lib/dht/node_map.ex index 1e34003..a7d6a81 100644 --- a/lib/dht/node_map.ex +++ b/lib/dht/node_map.ex @@ -17,7 +17,9 @@ defmodule Ekser.NodeMap do neighbours = new_nodes |> Map.filter(fn {_, node} -> - Ekser.FractalId.compare_edit_distance(new_nodes.curr.fractal_id, node.fractal_id, 1) === 1 + node.job_name === new_nodes.curr.job_name and + Ekser.FractalId.compare_edit_distance(new_nodes.curr.fractal_id, node.fractal_id, 1) === + 1 end) {neighbours, new_nodes} @@ -32,7 +34,7 @@ defmodule Ekser.NodeMap do |> length() end - def get_next_fractal_id(nodes) do + def get_next_fractal_id(nodes, node) do job = Ekser.JobStore.get_job_by_name(nodes.curr.job_name) case job do @@ -56,7 +58,10 @@ defmodule Ekser.NodeMap do |> Enum.reverse() |> Enum.take(1) - Ekser.FractalId.get_next(top_id, job.count) + fractal_id = Ekser.FractalId.get_next(top_id, job.count) + new_node = %Ekser.Node{node | job_name: job.name, fractal_id: fractal_id} + + {fractal_id, %{nodes | node.id => new_node}} end end diff --git a/lib/dht/node_store.ex b/lib/dht/node_store.ex index 09cd211..09dcb94 100644 --- a/lib/dht/node_store.ex +++ b/lib/dht/node_store.ex @@ -59,8 +59,8 @@ defmodule Ekser.NodeStore do Agent.update(__MODULE__, Ekser.NodeMap, :update_curr_fractal, [job_name, fractal_id]) end - @spec get_next_fractal_id() :: String.t() | :error - def get_next_fractal_id() do - Agent.get(__MODULE__, Ekser.NodeMap, :get_next_fractal_id, []) + @spec get_next_fractal_id(%Ekser.Node{}) :: String.t() | :error + def get_next_fractal_id(node) do + Agent.get_and_update(__MODULE__, Ekser.NodeMap, :get_next_fractal_id, [node]) end end diff --git a/lib/fractal/fractal_id.ex b/lib/fractal/fractal_id.ex index 8501abd..36c2a6d 100644 --- a/lib/fractal/fractal_id.ex +++ b/lib/fractal/fractal_id.ex @@ -1,6 +1,5 @@ defmodule Ekser.FractalId do def get_next(fractal_id, point_count) do - # {base_rep, _} = Integer.parse(fractal_id, point_count) base_rep = String.to_integer(fractal_id, point_count) new_base_rep = base_rep + 1 @@ -19,10 +18,11 @@ defmodule Ekser.FractalId do overflow? = String.length(newer_string) > String.length(fractal_id) + concat = &("0" <> &1) + case overflow? do true -> - dropped = newer_string |> Enum.drop(1) - ["0" | dropped] + newer_string |> String.slice(1..-1//1) |> concat.() false -> String.pad_leading(newer_string, String.length(fractal_id), "0") @@ -36,12 +36,10 @@ defmodule Ekser.FractalId do removed_tail = list_other - |> Enum.reverse() - |> tl() - |> Enum.reverse() + |> Enum.drop(-1) with true <- length(list_id) > 0 do - (fractal_id === "0" and length(list_other) === 0) or + (fractal_id === "0" and length(removed_tail) === 0) or removed_tail === list_id else false -> false @@ -79,10 +77,10 @@ defmodule Ekser.FractalId do defp calculate_edit_distance({char1, char2}, acc, value) do cond do - char1 === char2 and acc === value -> + char1 != char2 and acc === value -> {:halt, false} - char1 === char2 -> + char1 != char2 -> {:cont, acc + 1} true -> diff --git a/lib/fractal/fractal_server.ex b/lib/fractal/fractal_server.ex index ca81603..0fcfe03 100644 --- a/lib/fractal/fractal_server.ex +++ b/lib/fractal/fractal_server.ex @@ -48,7 +48,7 @@ defmodule Ekser.FractalServer do @impl GenServer def init(:ok) do - {:ok, %__MODULE__{}} + {:ok, %__MODULE__{count: 0}} end @impl GenServer @@ -57,12 +57,36 @@ defmodule Ekser.FractalServer do new_state = %__MODULE__{state | job: job, fractal_id: fractal_id, anchors: job.points} + Ekser.ChildServer.child_spec([job, fractal_id]) + |> Ekser.Aggregate.new() + scaled_state = - case new_state.job != nil and new_state.points != nil do - true -> set_up(new_state) - false -> new_state + cond do + new_state.fractal_id != "0" and new_state.points != nil -> + scale_state(new_state) |> set_up_cruncher() + + new_state.points != nil -> + set_up_cruncher(new_state) + + true -> + new_state end + all_nodes = Ekser.NodeStore.get_nodes([]) + {curr, nodes_without_curr} = Map.pop(all_nodes, :curr) + + receivers = + Map.pop(nodes_without_curr, curr.id) + |> elem(1) + |> Map.values() + + fn curr -> + Enum.map(receivers, fn receiver -> + Ekser.Message.Entered_Cluster.new(curr, receiver, curr) + end) + end + |> Ekser.Router.send() + {:reply, :ok, scaled_state} end @@ -73,12 +97,18 @@ defmodule Ekser.FractalServer do @impl GenServer def handle_call({:start, points}, _from, state) when state.points === nil do - new_state = %__MODULE__{state | points: points} + new_state = %__MODULE__{state | points: Enum.reverse(points)} scaled_state = - case new_state.job != nil and new_state.points != nil do - true -> set_up(new_state) - false -> new_state + cond do + new_state.fractal_id != "0" and new_state.job != nil -> + scale_state(new_state) |> set_up_cruncher() + + new_state.job != nil -> + set_up_cruncher(new_state) + + true -> + new_state end {:reply, :ok, scaled_state} @@ -91,8 +121,13 @@ defmodule Ekser.FractalServer do @impl GenServer def handle_call({:redistribute, nodes, new_id}, _from, state) do - # This call has to update DHT/Router with Curr and send an UpdatedNode message because of it - # But this isn't supposed to happen if the new_id is the same as the one in state + scaled_state = %__MODULE__{state | fractal_id: new_id} |> scale_state() |> set_up_cruncher() + result = Ekser.Result.new(state.job.name, state.points) + + fn curr -> + Enum.map(nodes, fn node -> Ekser.Message.Start_Job.new(curr, node, result) end) + end + |> Ekser.Router.send() :ok = case new_id === state.fractal_id do @@ -100,16 +135,24 @@ defmodule Ekser.FractalServer do :ok false -> - Ekser.NodeStore.update_cluster(state.job_name, new_id) + Ekser.NodeStore.update_cluster(state.job.name, new_id) + all_nodes = Ekser.NodeStore.get_nodes([]) + {curr, nodes_without_curr} = Map.pop(all_nodes, :curr) + + receivers = + Map.pop(nodes_without_curr, curr.id) + |> elem(1) + |> Map.values() fn curr -> - Enum.map(nodes, fn node -> Ekser.Message.Start_Job.new(curr, node, state.points) end) + Enum.map(receivers, fn receiver -> + Ekser.Message.Updated_Node.new(curr, receiver, curr) + end) end |> Ekser.Router.send() end - new_state = %__MODULE__{state | fractal_id: new_id} - {:reply, :ok, new_state} + {:reply, :ok, scaled_state} end @impl GenServer @@ -129,10 +172,7 @@ defmodule Ekser.FractalServer do clean_list(state) |> get_proper_list() - case state.cruncher do - nil -> true - _ -> Process.exit(state.cruncher, :shutdown) - end + stop_cruncher(state.cruncher) result = Ekser.Result.new(state.job.name, new_points) @@ -143,18 +183,12 @@ defmodule Ekser.FractalServer do def handle_call(:stop, _from, _) do result = Ekser.Result.new("", []) - {:reply, result, %__MODULE__{}} - end - - @impl GenServer - def handle_call(:status, _from, state) when state.job != nil and state.points != nil do - status = Ekser.Status.new(state.job.name, state.fractal_id, state.count) - {:reply, status, state} + {:reply, result, %__MODULE__{count: 0}} end @impl GenServer def handle_call(:status, _from, state) when state.job != nil do - status = Ekser.Status.new(state.job.name, state.fractal_id, 0) + status = Ekser.Status.new(state.job.name, state.fractal_id, state.count) {:reply, status, state} end @@ -179,9 +213,10 @@ defmodule Ekser.FractalServer do @impl GenServer def handle_info({:DOWN, old_cruncher, _, _, _}, state) do + Process.demonitor(old_cruncher) + case old_cruncher === state.cruncher do true -> - Process.demonitor(old_cruncher) cruncher = start_cruncher(state) new_state = %__MODULE__{state | cruncher: cruncher} {:noreply, new_state} @@ -191,28 +226,32 @@ defmodule Ekser.FractalServer do end end - defp set_up(state) do + defp scale_state(state) do {scaled_anchors, scaled_points} = - case state.fractal_id === "0" do - true -> - {state.anchors, state.points} + Ekser.Point.scale_to_fractal_id( + state.job.ratio, + state.fractal_id, + {state.anchors, state.points} + ) - false -> - Ekser.Point.scale_to_fractal_id( - state.job.ratio, - state.fractal_id, - {state.anchors, Enum.reverse(state.points)} - ) - end + set_state(state, scaled_anchors, scaled_points) + end - new_state = %__MODULE__{ - state - | anchors: scaled_anchors, - points: scaled_points, - count: length(scaled_anchors) + length(scaled_points) - } + defp set_state(state, anchors, points) do + %__MODULE__{state | anchors: anchors, points: points, count: length(anchors) + length(points)} + end - %__MODULE__{new_state | cruncher: start_cruncher(new_state)} + defp set_up_cruncher(state) do + Registry.dispatch( + Ekser.AggregateReg, + {Ekser.Message.Entered_Cluster, state.job.name}, + fn entries -> + for {pid, _} <- entries, + do: GenServer.cast(pid, :clear) + end + ) + + %__MODULE__{state | cruncher: start_cruncher(state)} end defp clean_list(state) do @@ -224,6 +263,8 @@ defmodule Ekser.FractalServer do end defp start_cruncher(state) do + stop_cruncher(state.cruncher) + point = case length(state.points) do 0 -> @@ -242,4 +283,11 @@ defmodule Ekser.FractalServer do Process.monitor(cruncher) cruncher end + + defp stop_cruncher(cruncher) do + case cruncher do + nil -> true + _ -> Process.exit(cruncher, :shutdown) + end + end end diff --git a/lib/input/listener.ex b/lib/input/listener.ex index b9ae184..ff01d90 100644 --- a/lib/input/listener.ex +++ b/lib/input/listener.ex @@ -30,10 +30,14 @@ defmodule Ekser.Listener do defp listen(socket, curr) do {:ok, client} = :gen_tcp.accept(socket) + own_pid = self() + {:ok, pid} = - Task.Supervisor.start_child(Ekser.ReceiverSup, fn -> serve(client, curr, self()) end) + Task.Supervisor.start_child(Ekser.ReceiverSup, fn -> serve(client, curr, own_pid) end) :ok = :gen_tcp.controlling_process(client, pid) + + send(pid, {:ok, nil}) listen(socket, curr) end @@ -43,6 +47,11 @@ defmodule Ekser.Listener do end defp serve(socket, curr, pid) do + receive do + {:ok, _} -> + :ok + end + bytes = socket |> read() @@ -57,6 +66,8 @@ defmodule Ekser.Listener do end defp process(message, curr, pid) do + Logger.info("Got #{message.sender.id}|#{message.receiver.id}|#{message.type}") + case Ekser.Node.same_node?(message.receiver, curr) do true -> execute(message, pid) false -> Ekser.Router.forward(message) diff --git a/lib/network/route_table.ex b/lib/network/route_table.ex index 26ca79b..c147d53 100644 --- a/lib/network/route_table.ex +++ b/lib/network/route_table.ex @@ -8,15 +8,22 @@ defmodule Ekser.RouteTable do :curr, :prev, :next, - cluster_neighbours: [] + cluster_neighbours: [], + last_id: 0 ] - @spec get_next(%__MODULE__{}, %Ekser.Node{}) :: + @spec get_next(%__MODULE__{}, %Ekser.Node{}, integer()) :: {:ok, %Ekser.Node{}} | {:error, String.t()} - def get_next(table, receiver) do + def get_next(table, receiver, sender_id) do + last_id = + case sender_id > table.last_id do + true -> 0 + false -> table.last_id + end + best_node = extract_neighbours(table) - |> Stream.map(fn node -> {node, get_min_distance(node, receiver)} end) + |> Stream.map(fn node -> {node, get_min_distance(node, receiver, last_id)} end) |> Enum.reduce_while(:error, fn element, acc -> least_hoops(element, acc) end) case best_node do @@ -50,8 +57,18 @@ defmodule Ekser.RouteTable do end end - defp get_min_distance(node, element) do - chain_distance = abs(node.id - element.id) + defp get_min_distance(node, element, last_id) do + chain_distance = + case last_id > 0 do + true -> + min( + abs(node.id - element.id), + last_id + 1 - max(node.id, element.id) + min(node.id, element.id) + ) + + false -> + abs(node.id - element.id) + end with true <- node.job_name != "", true <- node.job_name === element.job_name, diff --git a/lib/network/router.ex b/lib/network/router.ex index d02bf98..08c77e5 100644 --- a/lib/network/router.ex +++ b/lib/network/router.ex @@ -1,4 +1,5 @@ defmodule Ekser.Router do + require Logger require Ekser.TCP require Ekser.Node require Ekser.RouteTable @@ -17,6 +18,11 @@ defmodule Ekser.Router do GenServer.call(Ekser.Router, {:curr, node}) end + @spec update_last_id(integer()) :: :ok + def update_last_id(id) do + GenServer.call(Ekser.Router, {:last_id, id}) + end + @spec introduce_new(%Ekser.Node{}) :: :ok def introduce_new(node) do GenServer.call(Ekser.Router, {:introduce, node}) @@ -82,7 +88,7 @@ defmodule Ekser.Router do @impl GenServer def handle_call({:prev, node}, _from, table) do - {:reply, :ok, %Ekser.RouteTable{table | prev: node}} + {:reply, :ok, %Ekser.RouteTable{table | prev: node, last_id: node.id}} end @impl GenServer @@ -96,10 +102,19 @@ defmodule Ekser.Router do {:reply, :ok, %Ekser.RouteTable{table | next: new_node}} end + @impl GenServer + def handle_call({:last_id, id}, _from, table) do + {:reply, :ok, %Ekser.RouteTable{table | last_id: id}} + end + @impl GenServer def handle_call({:bootstrap, closure}, _from, table) do message = closure.(table.curr, table.bootstrap) + Logger.info( + "Sending #{message.sender.id}|#{message.receiver.id}|#{message.type} to bootstrap" + ) + dispatch(message, message.receiver, table.curr.id) {:reply, :ok, table} @@ -107,7 +122,11 @@ defmodule Ekser.Router do @impl GenServer def handle_cast({:forward, message}, table) do - {:ok, route_to} = Ekser.RouteTable.get_next(table, message.receiver) + {:ok, route_to} = Ekser.RouteTable.get_next(table, message.receiver, message.sender.id) + + Logger.info( + "Forwarding #{message.sender.id}|#{message.receiver.id}|#{message.type} to #{route_to.id}" + ) dispatch(message, route_to, table.curr.id) @@ -119,7 +138,12 @@ defmodule Ekser.Router do message_list = closure.(table.curr) for message <- message_list do - {:ok, route_to} = Ekser.RouteTable.get_next(table, message.receiver) + {:ok, route_to} = Ekser.RouteTable.get_next(table, message.receiver, message.sender.id) + + Logger.info( + "Sending #{message.sender.id}|#{message.receiver.id}|#{message.type} to #{route_to.id}" + ) + dispatch(message, route_to, table.curr.id) end diff --git a/log.txt b/log.txt index 3ed2606..d59f284 100644 --- a/log.txt +++ b/log.txt @@ -1,8 +1,3 @@ -07:25:54.531 [error] Task #PID<0.211.0> started from Ekser.ListenerSup terminating -** (MatchError) no match of right hand side value: {:error, :badarg} - (ekser 0.1.0) lib/input/listener.ex:36: Ekser.Listener.listen/2 - (elixir 1.13.4) lib/task/supervised.ex:89: Task.Supervised.invoke_mfa/2 - (stdlib 3.17.2) proc_lib.erl:226: :proc_lib.init_p_do_apply/3 -Function: &Ekser.Listener.run/1 - Args: [%Ekser.Node{fractal_id: "", id: -2, ip: {192, 168, 0, 29}, job_name: "", port: 1200}] -07:25:54.773 [error] [__exception__: true, __struct__: Jason.DecodeError, data: "{\"payload\":{\"fractalId\":\"\",\"ipAddress\":\"192.168.0.29\",\"jobName\":\"\",\"nodeId\":2,\"port\":1100},\"receiver\":{\"fractalId\":\"\",\"ipAddress\":\"192.168.0.29\",\"jobName\":\"\",\"nodeId\":1,\"port\":1200},\"routes\":[2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2],\"sender\":{\"fractalId\":\"\",\"ipAddress\":\"192.168.0.29\",\"jobName\":\"\",\"nodeId\":2,\"port\":1100},\"type\":\"ENTERED\"", position: 1460, token: nil] +07:23:29.216 [info] Sending -2|-1|Elixir.Ekser.Message.Hail to bootstrap +07:23:30.562 [info] Got -2|-2|Elixir.Ekser.Message.Contact +07:23:30.562 [info] Sending 0|-1|Elixir.Ekser.Message.Join to bootstrap diff --git a/result.png b/result.png index aaef895..1bf32c7 100644 Binary files a/result.png and b/result.png differ