Skip to content

Commit

Permalink
Updated the FlowDistributor (#285)
Browse files Browse the repository at this point in the history
* Updated the FlowDistributor to work in more cases and be more performant.

* Removed old FlowDistributor
  • Loading branch information
DanteNiewenhuis authored Jan 7, 2025
1 parent c425a03 commit f71e07f
Show file tree
Hide file tree
Showing 19 changed files with 338 additions and 198 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ public class SimHost(
private val graph: FlowGraph,
private val machineModel: MachineModel,
private val cpuPowerModel: CpuPowerModel,
private val powerMux: FlowDistributor,
private val powerDistributor: FlowDistributor,
) : AutoCloseable {
/**
* The event listeners registered with this host.
Expand Down Expand Up @@ -131,7 +131,7 @@ public class SimHost(
SimMachine(
this.graph,
this.machineModel,
this.powerMux,
this.powerDistributor,
this.cpuPowerModel,
) { cause ->
hostState = if (cause != null) HostState.ERROR else HostState.DOWN
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,8 @@ public class HostsProvisioningStep internal constructor(
service.addPowerSource(simPowerSource)
simPowerSources.add(simPowerSource)

val powerMux = FlowDistributor(graph)
graph.addEdge(powerMux, simPowerSource)
val powerDistributor = FlowDistributor(graph)
graph.addEdge(powerDistributor, simPowerSource)

// Create hosts, they are connected to the powerMux when SimMachine is created
for (hostSpec in cluster.hostSpecs) {
Expand All @@ -78,7 +78,7 @@ public class HostsProvisioningStep internal constructor(
graph,
hostSpec.model,
hostSpec.cpuPowerModel,
powerMux,
powerDistributor,
)

require(simHosts.add(simHost)) { "Host with uid ${hostSpec.uid} already exists" }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -296,8 +296,8 @@ class ExperimentTest {

// Note that these values have been verified beforehand
assertAll(
{ assertEquals(1803918432, monitor.idleTime) { "Idle time incorrect" } },
{ assertEquals(787181568, monitor.activeTime) { "Active time incorrect" } },
{ assertEquals(1803918435, monitor.idleTime) { "Idle time incorrect" } },
{ assertEquals(787181565, monitor.activeTime) { "Active time incorrect" } },
{ assertEquals(0, monitor.stealTime) { "Steal time incorrect" } },
{ assertEquals(0, monitor.lostTime) { "Lost time incorrect" } },
{ assertEquals(6.7565629E8, monitor.energyUsage, 1E4) { "Incorrect energy usage" } },
Expand Down Expand Up @@ -343,11 +343,11 @@ class ExperimentTest {
{ assertEquals(0, monitor.tasksActive, "All VMs should finish after a run") },
{ assertEquals(0, monitor.attemptsFailure, "No VM should be unscheduled") },
{ assertEquals(0, monitor.tasksPending, "No VM should not be in the queue") },
{ assertEquals(43101787447, monitor.idleTime) { "Incorrect idle time" } },
{ assertEquals(3489412553, monitor.activeTime) { "Incorrect active time" } },
{ assertEquals(43101787496, monitor.idleTime) { "Incorrect idle time" } },
{ assertEquals(3489412504, monitor.activeTime) { "Incorrect active time" } },
{ assertEquals(0, monitor.stealTime) { "Incorrect steal time" } },
{ assertEquals(0, monitor.lostTime) { "Incorrect lost time" } },
{ assertEquals(1.0016123392181786E10, monitor.energyUsage, 1E4) { "Incorrect energy usage" } },
{ assertEquals(6.914184592181973E9, monitor.energyUsage, 1E4) { "Incorrect energy usage" } },
)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,8 +165,6 @@ class FlowDistributorTest {
{ assertEquals(2000.0, monitor.hostCpuDemands[10]) { "The cpu demanded by the host is incorrect" } },
{ assertEquals(1000.0, monitor.hostCpuSupplied[1]) { "The cpu used by the host is incorrect" } },
{ assertEquals(2000.0, monitor.hostCpuSupplied[10]) { "The cpu used by the host is incorrect" } },
{ assertEquals(0.0, monitor.hostCpuDemands.last()) { "When the task is finished, the host should have 0.0 demand" } },
{ assertEquals(0.0, monitor.hostCpuSupplied.last()) { "When the task is finished, the host should have 0.0 demand" } },
)
}

Expand Down Expand Up @@ -202,8 +200,6 @@ class FlowDistributorTest {
{ assertEquals(4000.0, monitor.hostCpuDemands[10]) { "The cpu demanded by the host is incorrect" } },
{ assertEquals(2000.0, monitor.hostCpuSupplied[1]) { "The cpu used by the host is incorrect" } },
{ assertEquals(2000.0, monitor.hostCpuSupplied[10]) { "The cpu used by the host is incorrect" } },
{ assertEquals(0.0, monitor.hostCpuDemands.last()) { "The host should have 0.0 demand when finished" } },
{ assertEquals(0.0, monitor.hostCpuSupplied.last()) { "The host should have 0.0 usage when finished" } },
)
}

Expand Down Expand Up @@ -239,8 +235,6 @@ class FlowDistributorTest {
{ assertEquals(4000.0, monitor.hostCpuDemands[10]) { "The cpu demanded by the host is incorrect" } },
{ assertEquals(1000.0, monitor.hostCpuSupplied[1]) { "The cpu used by the host is incorrect" } },
{ assertEquals(2000.0, monitor.hostCpuSupplied[10]) { "The cpu used by the host is incorrect" } },
{ assertEquals(0.0, monitor.hostCpuDemands.last()) { "The host should have 0.0 demand when finished" } },
{ assertEquals(0.0, monitor.hostCpuSupplied.last()) { "The host should have 0.0 usage when finished" } },
)
}

Expand Down Expand Up @@ -276,19 +270,52 @@ class FlowDistributorTest {
{ assertEquals(1000.0, monitor.hostCpuDemands[10]) { "The cpu demanded by the host is incorrect" } },
{ assertEquals(2000.0, monitor.hostCpuSupplied[1]) { "The cpu used by the host is incorrect" } },
{ assertEquals(1000.0, monitor.hostCpuSupplied[10]) { "The cpu used by the host is incorrect" } },
{ assertEquals(0.0, monitor.hostCpuDemands.last()) { "The host should have 0.0 demand when finished" } },
{ assertEquals(0.0, monitor.hostCpuSupplied.last()) { "The host should have 0.0 usage when finished" } },
)
}

/**
* FlowDistributor test 5: Two task, same time, both fit
* FlowDistributor test 5: A single task transition overload to perfect fit
* In this test, a single task is scheduled where the first fragment does not fit, and the second does perfectly for the available CPU.
* For the first fragment, we expect the usage of the first fragment to be lower than the demand
* We check if both the host and the Task show the correct cpu usage and demand during the two fragments.
*/
@Test
fun testFlowDistributor5() {
val workload: ArrayList<Task> =
arrayListOf(
createTestTask(
name = "0",
fragments =
arrayListOf(
TraceFragment(10 * 60 * 1000, 4000.0, 1),
TraceFragment(10 * 60 * 1000, 2000.0, 1),
),
),
)
val topology = createTopology("single_1_2000.json")

monitor = runTest(topology, workload)

assertAll(
{ assertEquals(4000.0, monitor.taskCpuDemands["0"]?.get(1)) { "The cpu demanded by task 0 is incorrect" } },
{ assertEquals(2000.0, monitor.taskCpuDemands["0"]?.get(10)) { "The cpu demanded by task 0 is incorrect" } },
{ assertEquals(2000.0, monitor.taskCpuSupplied["0"]?.get(1)) { "The cpu used by task 0 is incorrect" } },
{ assertEquals(2000.0, monitor.taskCpuSupplied["0"]?.get(10)) { "The cpu used by task 0 is incorrect" } },
{ assertEquals(4000.0, monitor.hostCpuDemands[1]) { "The cpu demanded by the host is incorrect" } },
{ assertEquals(2000.0, monitor.hostCpuDemands[10]) { "The cpu demanded by the host is incorrect" } },
{ assertEquals(2000.0, monitor.hostCpuSupplied[1]) { "The cpu used by the host is incorrect" } },
{ assertEquals(2000.0, monitor.hostCpuSupplied[10]) { "The cpu used by the host is incorrect" } },
)
}

/**
* FlowDistributor test 6: Two task, same time, both fit
* In this test, two tasks are scheduled, and they fit together on the host . The tasks start and finish at the same time
* This test shows how the FlowDistributor handles two tasks that can fit and no redistribution is required.
* We check if both the host and the Tasks show the correct cpu usage and demand during the two fragments.
*/
@Test
fun testFlowDistributor5() {
fun testFlowDistributor6() {
val workload: ArrayList<Task> =
arrayListOf(
createTestTask(
Expand Down Expand Up @@ -325,19 +352,17 @@ class FlowDistributorTest {
{ assertEquals(4000.0, monitor.hostCpuDemands[10]) { "The cpu demanded by the host is incorrect" } },
{ assertEquals(4000.0, monitor.hostCpuSupplied[1]) { "The cpu used by the host is incorrect" } },
{ assertEquals(4000.0, monitor.hostCpuSupplied[10]) { "The cpu used by the host is incorrect" } },
{ assertEquals(0.0, monitor.hostCpuDemands.last()) { "The host should have 0.0 demand when finished" } },
{ assertEquals(0.0, monitor.hostCpuSupplied.last()) { "The host should have 0.0 usage when finished" } },
)
}

/**
* FlowDistributor test 6: Two task, same time, can not fit
* FlowDistributor test 7: Two task, same time, can not fit
* In this test, two tasks are scheduled, and they can not both fit. The tasks start and finish at the same time
* This test shows how the FlowDistributor handles two tasks that both do not fit and redistribution is required.
* We check if both the host and the Tasks show the correct cpu usage and demand during the two fragments.
*/
@Test
fun testFlowDistributor6() {
fun testFlowDistributor7() {
val workload: ArrayList<Task> =
arrayListOf(
createTestTask(
Expand Down Expand Up @@ -374,18 +399,16 @@ class FlowDistributorTest {
{ assertEquals(11000.0, monitor.hostCpuDemands[10]) { "The cpu demanded by the host is incorrect" } },
{ assertEquals(4000.0, monitor.hostCpuSupplied[1]) { "The cpu used by the host is incorrect" } },
{ assertEquals(4000.0, monitor.hostCpuSupplied[10]) { "The cpu used by the host is incorrect" } },
{ assertEquals(0.0, monitor.hostCpuDemands.last()) { "The host should have 0.0 demand when finished" } },
{ assertEquals(0.0, monitor.hostCpuSupplied.last()) { "The host should have 0.0 usage when finished" } },
)
}

/**
* FlowDistributor test 7: Two task, both fit, second task is delayed
* FlowDistributor test 8: Two task, both fit, second task is delayed
* In this test, two tasks are scheduled, the second task is delayed.
* We check if both the host and the Tasks show the correct cpu usage and demand during the two fragments.
*/
@Test
fun testFlowDistributor7() {
fun testFlowDistributor8() {
val workload: ArrayList<Task> =
arrayListOf(
createTestTask(
Expand Down Expand Up @@ -431,21 +454,19 @@ class FlowDistributorTest {
{ assertEquals(3000.0, monitor.hostCpuSupplied[5]) { "The cpu used by the host is incorrect" } },
{ assertEquals(4000.0, monitor.hostCpuSupplied[9]) { "The cpu used by the host is incorrect" } },
{ assertEquals(2000.0, monitor.hostCpuSupplied[14]) { "The cpu used by the host is incorrect" } },
{ assertEquals(0.0, monitor.hostCpuDemands.last()) { "The host should have 0.0 demand when finished" } },
{ assertEquals(0.0, monitor.hostCpuSupplied.last()) { "The host should have 0.0 usage when finished" } },
)
}

/**
* FlowDistributor test 8: Two task, both fit on their own but not together, second task is delayed
* FlowDistributor test 9: Two task, both fit on their own but not together, second task is delayed
* In this test, two tasks are scheduled, the second task is delayed.
* We check if both the host and the Tasks show the correct cpu usage and demand during the two fragments.
* When the second task comes in, the host is overloaded.
* This test shows how the FlowDistributor can handle redistribution when a new task comes in.
* We check if both the host and the Tasks show the correct cpu usage and demand during the two fragments.
*/
@Test
fun testFlowDistributor8() {
fun testFlowDistributor9() {
val workload: ArrayList<Task> =
arrayListOf(
createTestTask(
Expand Down Expand Up @@ -488,20 +509,18 @@ class FlowDistributorTest {
{ assertEquals(3000.0, monitor.hostCpuSupplied[1]) { "The cpu used by the host is incorrect" } },
{ assertEquals(4000.0, monitor.hostCpuSupplied[5]) { "The cpu used by the host is incorrect" } },
{ assertEquals(3000.0, monitor.hostCpuSupplied[14]) { "The cpu used by the host is incorrect" } },
{ assertEquals(0.0, monitor.hostCpuDemands.last()) { "The host should have 0.0 demand when finished" } },
{ assertEquals(0.0, monitor.hostCpuSupplied.last()) { "The host should have 0.0 usage when finished" } },
)
}

/**
* FlowDistributor test 9: Two task, one changes demand, causing overload
* FlowDistributor test 10: Two task, one changes demand, causing overload
* In this test, two tasks are scheduled, and they can both fit.
* However, task 0 increases its demand which overloads the FlowDistributor.
* This test shows how the FlowDistributor handles transition from fitting to overloading when multiple tasks are running.
* We check if both the host and the Tasks show the correct cpu usage and demand during the two fragments.
*/
@Test
fun testFlowDistributor9() {
fun testFlowDistributor10() {
val workload: ArrayList<Task> =
arrayListOf(
createTestTask(
Expand Down Expand Up @@ -551,11 +570,83 @@ class FlowDistributorTest {
{ assertEquals(4000.0, monitor.hostCpuSupplied[5]) { "The cpu used by the host is incorrect" } },
{ assertEquals(4000.0, monitor.hostCpuSupplied[9]) { "The cpu used by the host is incorrect" } },
{ assertEquals(4000.0, monitor.hostCpuSupplied[14]) { "The cpu used by the host is incorrect" } },
{ assertEquals(0.0, monitor.hostCpuDemands.last()) { "The host should have 0.0 demand when finished" } },
{ assertEquals(0.0, monitor.hostCpuSupplied.last()) { "The host should have 0.0 usage when finished" } },
)
}

/**
* FlowDistributor test 11: 5000 hosts. This tests the performance of the distributor
* In this test, two tasks are scheduled, and they can both fit.
* However, task 0 increases its demand which overloads the FlowDistributor.
* This test shows how the FlowDistributor handles transition from fitting to overloading when multiple tasks are running.
* We check if both the host and the Tasks show the correct cpu usage and demand during the two fragments.
*/
@Test
fun testFlowDistributor11() {
val workload: ArrayList<Task> =
arrayListOf(
createTestTask(
name = "0",
fragments =
arrayListOf<TraceFragment>().apply {
repeat(10) { this.add(TraceFragment(20 * 60 * 1000, 3000.0, 1)) }
},
),
)
val topology = createTopology("single_5000_2000.json")

monitor = runTest(topology, workload)
}

/**
* FlowDistributor test 12: 1000 fragments. This tests the performance of the distributor
* In this test, two tasks are scheduled, and they can both fit.
* However, task 0 increases its demand which overloads the FlowDistributor.
* This test shows how the FlowDistributor handles transition from fitting to overloading when multiple tasks are running.
* We check if both the host and the Tasks show the correct cpu usage and demand during the two fragments.
*/
@Test
fun testFlowDistributor12() {
val workload: ArrayList<Task> =
arrayListOf(
createTestTask(
name = "0",
fragments =
arrayListOf<TraceFragment>().apply {
repeat(1000) { this.add(TraceFragment(10 * 60 * 1000, 2000.0, 1)) }
},
),
)
val topology = createTopology("single_1_2000.json")

monitor = runTest(topology, workload)
}

/**
* FlowDistributor test 13: 1000 tasks. This tests the performance
* In this test, two tasks are scheduled, and they can both fit.
* However, task 0 increases its demand which overloads the FlowDistributor.
* This test shows how the FlowDistributor handles transition from fitting to overloading when multiple tasks are running.
* We check if both the host and the Tasks show the correct cpu usage and demand during the two fragments.
*/
@Test
fun testFlowDistributor13() {
val workload: ArrayList<Task> =
arrayListOf<Task>().apply {
repeat(1000) {
this.add(
createTestTask(
name = "0",
fragments =
arrayListOf(TraceFragment(10 * 60 * 1000, 2000.0, 1)),
),
)
}
}
val topology = createTopology("single_1_2000.json")

monitor = runTest(topology, workload)
}

/**
* Obtain the topology factory for the test.
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
{
"clusters":
[
{
"name": "C01",
"hosts" :
[
{
"name": "H01",
"cpu":
{
"coreCount": 1,
"coreSpeed": 2000
},
"memory": {
"memorySize": 140457600000
},
"count": 5000
}
]
}
]
}
Loading

0 comments on commit f71e07f

Please sign in to comment.