Skip to content

Commit

Permalink
Fix aggregators
Browse files Browse the repository at this point in the history
  • Loading branch information
Androoideka committed Jun 16, 2022
1 parent b379c1c commit c7997a6
Show file tree
Hide file tree
Showing 28 changed files with 373 additions and 175 deletions.
14 changes: 7 additions & 7 deletions config.json → config0
Original file line number Diff line number Diff line change
Expand Up @@ -8,21 +8,21 @@
{
"name": "triangle",
"pointCount": 3,
"p": 0.1,
"width": 1920,
"height": 1080,
"p": 0.3,
"width": 200,
"height": 200,
"mainPoints": [
{
"x": 0,
"y": 0
},
{
"x": 1,
"y": 2
"x": 99,
"y": 99
},
{
"x": 1,
"y": 4
"x": 199,
"y": 0
}
]
}
Expand Down
30 changes: 30 additions & 0 deletions config1
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
{
"port": 1200,
"bootstrapIpAddress": "192.168.0.29",
"bootstrapPort": 2000,
"weakLimit": 500,
"strongLimit": 1000,
"jobs": [
{
"name": "triangle",
"pointCount": 3,
"p": 0.3,
"width": 200,
"height": 200,
"mainPoints": [
{
"x": 0,
"y": 0
},
{
"x": 99,
"y": 99
},
{
"x": 199,
"y": 0
}
]
}
]
}
30 changes: 30 additions & 0 deletions config2
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
{
"port": 1300,
"bootstrapIpAddress": "192.168.0.29",
"bootstrapPort": 2000,
"weakLimit": 500,
"strongLimit": 1000,
"jobs": [
{
"name": "triangle",
"pointCount": 3,
"p": 0.3,
"width": 200,
"height": 200,
"mainPoints": [
{
"x": 0,
"y": 0
},
{
"x": 99,
"y": 99
},
{
"x": 199,
"y": 0
}
]
}
]
}
30 changes: 30 additions & 0 deletions config3
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
{
"port": 1100,
"bootstrapIpAddress": "192.168.0.29",
"bootstrapPort": 2000,
"weakLimit": 500,
"strongLimit": 1000,
"jobs": [
{
"name": "triangle",
"pointCount": 3,
"p": 0.3,
"width": 200,
"height": 200,
"mainPoints": [
{
"x": 0,
"y": 0
},
{
"x": 99,
"y": 99
},
{
"x": 199,
"y": 0
}
]
}
]
}
Empty file added empty.exs
Empty file.
36 changes: 19 additions & 17 deletions lib/aggregation/aggregate.ex
Original file line number Diff line number Diff line change
Expand Up @@ -23,37 +23,39 @@ defmodule Ekser.Aggregate do
new_map
end

:ok =
case nodes_to_send === %{} do
false ->
:ok
case nodes_to_send === %{} do
false ->
nodes_to_send
|> Map.keys()
|> register_keys(response_module)

true ->
receivers =
nodes_to_send
|> Map.values()
|> register_keys(response_module)
receivers =
nodes_to_send
|> Map.values()

fn curr ->
Enum.map(receivers, fn receiver -> message_module.new(curr, receiver, arg) end)
end
|> Ekser.Router.send()
end
fn curr ->
Enum.map(receivers, fn receiver -> message_module.new(curr, receiver, arg) end)
end
|> Ekser.Router.send()

true ->
:ok
end

responses = Map.new(nodes_without_curr, fn {k, _} -> {k, false} end)

case curr do
nil -> {responses, nil}
_ -> {responses, self_function.()}
_ -> {Map.put(responses, curr.id, true), self_function.()}
end
end

defp register_keys(node_ids, message_module) do
Enum.each(node_ids, fn id ->
Registry.register(Registry.AggregateRegistry, {message_module, id}, nil)
Registry.register(Ekser.AggregateReg, {message_module, id}, nil)
end)

node_ids
:ok
end

def is_complete?(responses) do
Expand Down
2 changes: 1 addition & 1 deletion lib/aggregation/child_server.ex
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ defmodule Ekser.ChildServer do

@impl GenServer
def handle_continue(:init, [job, fractal_id]) do
Registry.register(Registry.AggregateRegistry, {Ekser.Message.EnteredCluster, job.name}, nil)
Registry.register(Registry.AggregateRegistry, {Ekser.Message.Entered_Cluster, job.name}, nil)

next_id =
case fractal_id do
Expand Down
16 changes: 11 additions & 5 deletions lib/aggregation/cluster_server.ex
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ defmodule Ekser.ClusterServer do

# Client API

def start_link([args]) do
def start_link(args) do
GenServer.start_link(__MODULE__, args)
end

Expand All @@ -20,8 +20,8 @@ defmodule Ekser.ClusterServer do
{responses, _} =
Ekser.NodeStore.get_cluster_neighbours(job_name, fractal_id)
|> Ekser.Aggregate.init(
Ekser.Message.ClusterConnectionRequest,
Ekser.Message.ClusterConnectoinResponse,
Ekser.Message.Cluster_Connection_Request,
Ekser.Message.Cluster_Connection_Response,
fn -> nil end,
nil
)
Expand Down Expand Up @@ -50,12 +50,18 @@ defmodule Ekser.ClusterServer do
end

defp complete() do
all_nodes = Ekser.NodeStore.get_nodes([])

{curr, nodes_without_curr} = Map.pop(all_nodes, :curr)

nodes =
Ekser.NodeStore.get_nodes([])
nodes_without_curr
|> Map.pop(curr.id)
|> elem(1)
|> Map.values()

fn curr ->
Enum.map(nodes, fn node -> Ekser.Message.EnteredCluster.new(curr, node, curr) end)
Enum.map(nodes, fn node -> Ekser.Message.Entered_Cluster.new(curr, node, curr) end)
end
|> Ekser.Router.send()

Expand Down
83 changes: 60 additions & 23 deletions lib/aggregation/coordinator_server.ex
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,15 @@ defmodule Ekser.CoordinatorServer do

@impl GenServer
def handle_continue(:init, [:start, output, job]) do
arg =
case Ekser.JobStore.receive_job(job) do
:unchanged -> nil
:ok -> job
end
Ekser.JobStore.receive_job(job)

{responses, local_info} =
Ekser.NodeStore.get_nodes([])
|> Ekser.Aggregate.init(
Ekser.Message.StopShareJob,
Ekser.Message.StoppedJobInfo,
Ekser.Message.Stop_Share_Job,
Ekser.Message.Stopped_Job_Info,
fn -> Ekser.FractalServer.stop() end,
arg
job
)

Ekser.Aggregate.continue_or_exit(responses)
Expand All @@ -42,16 +38,16 @@ defmodule Ekser.CoordinatorServer do
false -> Ekser.Result.get_friendly(local_info)
end

try_complete(responses, initial_results, output)
try_complete(responses, Map.put(initial_results, job.name, []), output)
end

@impl GenServer
def handle_continue(:init, [:stop, output, _]) do
{responses, _} =
Ekser.NodeStore.get_nodes([])
|> Ekser.Aggregate.init(
Ekser.Message.StopShareJob,
Ekser.Message.StoppedJobInfo,
Ekser.Message.Stop_Share_Job,
Ekser.Message.Stopped_Job_Info,
fn -> Ekser.FractalServer.stop() end,
nil
)
Expand Down Expand Up @@ -88,38 +84,79 @@ defmodule Ekser.CoordinatorServer do
end

defp complete(results, output) do
job_names = Map.keys(results)
individual_results =
results
|> Map.pop("")
|> elem(1)
|> Map.to_list()
|> Enum.map(fn {job_name, points} -> Ekser.Result.new(job_name, points) end)

all_nodes = Ekser.NodeStore.get_nodes([])

{curr, nodes_without_curr} = Map.pop(all_nodes, :curr)

nodes =
Ekser.NodeStore.get_nodes([])
nodes_without_curr
|> Map.values()
|> Enum.sort_by(fn node -> node.id end)

{genesis_nodes, leftover_nodes} = Enum.split(nodes, length(job_names))
{genesis_nodes, leftover_nodes} = Enum.split(nodes, length(individual_results))

genesis_node_stream = Stream.cycle(genesis_nodes)

zipped_genesis = Enum.zip(genesis_nodes, job_names)
zipped_leftover = Enum.zip(leftover_nodes, genesis_node_stream)
# 2 cases - curr in zipped genesis, curr in zipped leftover

zipped_genesis = Enum.zip(genesis_nodes, individual_results) |> remove_from_genesis(curr)

{zipped_leftover, message} =
Enum.zip(leftover_nodes, genesis_node_stream) |> remove_from_leftover(curr)

# For DHT
updated_zipped_genesis =
Enum.map(zipped_genesis, fn {node, job_name} ->
new_node = %Ekser.Node{node | job_name: job_name, fractal_id: "0"}
Enum.map(zipped_genesis, fn {node, result} ->
new_node = %Ekser.Node{node | job_name: result.job_name, fractal_id: "0"}
Ekser.NodeStore.receive_node(new_node)
{new_node, job_name}
{new_node, result}
end)

fn curr ->
Enum.map(updated_zipped_genesis, fn {node, job_name} ->
Ekser.Message.StartJobGenesis.new(curr, node, job_name)
Enum.map(updated_zipped_genesis, fn {node, result} ->
Ekser.Message.Start_Job_Genesis.new(curr, node, result)
end) ++
Enum.map(zipped_leftover, fn {receiver, payload} ->
Ekser.Message.ApproachCluster.new(curr, receiver, payload)
end)
Ekser.Message.Approach_Cluster.new(curr, receiver, payload)
end) ++ message
end
|> Ekser.Router.send()

IO.puts(output, "Reorganized clusters.")
exit(:shutdown)
end

defp remove_from_genesis(zipped_genesis, curr) do
case Enum.find(zipped_genesis, fn {node, _} -> node.id === curr.id end) do
nil ->
:ok

{_, result} ->
job = Ekser.JobStore.get_job_by_name(result.job_name)
Ekser.FractalServer.join_cluster(job, "0")
Ekser.FractalServer.start_job(result.points)
end

Enum.reject(zipped_genesis, fn {node, _} -> node.id === curr.id end)
end

defp remove_from_leftover(zipped_leftover, curr) do
message =
case Enum.find(zipped_leftover, fn {node, _} -> node.id === curr.id end) do
nil ->
[]

{_, genesis} ->
[fn curr -> Ekser.Message.Cluster_Knock.new(curr, genesis) end]
end

{Enum.reject(zipped_leftover, fn {node, _} -> node.id === curr.id end), message}
end
end
Loading

0 comments on commit c7997a6

Please sign in to comment.