Skip to content

Commit

Permalink
Added 9 new tests specifically testing the Multiplexer. This assumes …
Browse files Browse the repository at this point in the history
…the Multiplexer is using MaxMinFairness given that this is currently the default and only fairness available in OpenDC. (#280)
  • Loading branch information
DanteNiewenhuis authored Dec 6, 2024
1 parent a49a387 commit b4f694d
Show file tree
Hide file tree
Showing 34 changed files with 610 additions and 111 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ import java.util.Random
/**
* An integration test suite for the Scenario experiments.
*/
class ScenarioIntegrationTest {
class ExperimentTest {
/**
* The monitor used to keep track of the metrics.
*/
Expand All @@ -70,6 +70,8 @@ class ScenarioIntegrationTest {
*/
private lateinit var workloadLoader: ComputeWorkloadLoader

private val basePath = "src/test/resources/Experiment"

/**
* Set up the experimental environment.
*/
Expand All @@ -81,7 +83,7 @@ class ScenarioIntegrationTest {
filters = listOf(ComputeFilter(), VCpuFilter(16.0), RamFilter(1.0)),
weighers = listOf(CoreRamWeigher(multiplier = 1.0)),
)
workloadLoader = ComputeWorkloadLoader(File("src/test/resources/traces"), 0L, 0L, 0.0)
workloadLoader = ComputeWorkloadLoader(File("$basePath/traces"), 0L, 0L, 0.0)
}

/**
Expand Down Expand Up @@ -137,7 +139,7 @@ class ScenarioIntegrationTest {
val monitor = monitor
val failureModelSpec =
TraceBasedFailureModelSpec(
"src/test/resources/failureTraces/single_failure.parquet",
"$basePath/failureTraces/single_failure.parquet",
repeat = false,
)

Expand Down Expand Up @@ -183,7 +185,7 @@ class ScenarioIntegrationTest {
val workload = createTestWorkload("single_task", 1.0, seed)
val topology = createTopology("single.json")
val monitor = monitor
val failureModelSpec = TraceBasedFailureModelSpec("src/test/resources/failureTraces/11_failures.parquet")
val failureModelSpec = TraceBasedFailureModelSpec("$basePath/failureTraces/11_failures.parquet")

Provisioner(dispatcher, seed).use { provisioner ->
provisioner.runSteps(
Expand Down Expand Up @@ -223,11 +225,11 @@ class ScenarioIntegrationTest {
fun testSingleTaskCheckpoint() =
runSimulation {
val seed = 1L
workloadLoader = ComputeWorkloadLoader(File("src/test/resources/traces"), 1000000L, 1000L, 1.0)
workloadLoader = ComputeWorkloadLoader(File("$basePath/traces"), 1000000L, 1000L, 1.0)
val workload = createTestWorkload("single_task", 1.0, seed)
val topology = createTopology("single.json")
val monitor = monitor
val failureModelSpec = TraceBasedFailureModelSpec("src/test/resources/failureTraces/11_failures.parquet")
val failureModelSpec = TraceBasedFailureModelSpec("$basePath/failureTraces/11_failures.parquet")

Provisioner(dispatcher, seed).use { provisioner ->
provisioner.runSteps(
Expand Down Expand Up @@ -341,8 +343,8 @@ class ScenarioIntegrationTest {
{ assertEquals(0, monitor.tasksActive, "All VMs should finish after a run") },
{ assertEquals(0, monitor.attemptsFailure, "No VM should be unscheduled") },
{ assertEquals(0, monitor.tasksPending, "No VM should not be in the queue") },
{ assertEquals(43101787496, monitor.idleTime) { "Incorrect idle time" } },
{ assertEquals(3489412504, monitor.activeTime) { "Incorrect active time" } },
{ assertEquals(43101787447, monitor.idleTime) { "Incorrect idle time" } },
{ assertEquals(3489412553, monitor.activeTime) { "Incorrect active time" } },
{ assertEquals(0, monitor.stealTime) { "Incorrect steal time" } },
{ assertEquals(0, monitor.lostTime) { "Incorrect lost time" } },
{ assertEquals(1.0016123392181786E10, monitor.energyUsage, 1E4) { "Incorrect energy usage" } },
Expand All @@ -365,7 +367,7 @@ class ScenarioIntegrationTest {
* Obtain the topology factory for the test.
*/
private fun createTopology(name: String): List<ClusterSpec> {
val stream = checkNotNull(object {}.javaClass.getResourceAsStream("/topologies/$name"))
val stream = checkNotNull(object {}.javaClass.getResourceAsStream("/Experiment/topologies/$name"))
return stream.use { clusterTopology(stream) }
}

Expand Down

Large diffs are not rendered by default.

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
{
"clusters":
[
{
"name": "C01",
"hosts" :
[
{
"name": "H01",
"cpu":
{
"coreCount": 1,
"coreSpeed": 2000
},
"memory": {
"memorySize": 140457600000
}
}
]
}
]
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
{
"clusters":
[
{
"name": "C01",
"hosts" :
[
{
"name": "H01",
"cpu":
{
"coreCount": 2,
"coreSpeed": 2000
},
"memory": {
"memorySize": 140457600000
}
}
]
}
]
}
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public class Multiplexer extends FlowNode implements FlowSupplier, FlowConsumer
private double totalDemand; // The total demand of all the consumers
private double totalSupply; // The total supply from the supplier

private boolean overProvisioned = false;
private boolean overLoaded = false;
private int currentConsumerIdx = -1;

private double capacity; // What is the max capacity
Expand Down Expand Up @@ -68,11 +68,10 @@ public long onUpdate(long now) {

private void distributeSupply() {
// if supply >= demand -> push supplies to all tasks
// TODO: possible optimization -> Only has to be done for the specific consumer that changed demand
if (this.totalSupply >= this.totalDemand) {
if (this.totalSupply > this.totalDemand) {

// If this came from a state of over provisioning, provide all consumers with their demand
if (this.overProvisioned) {
if (this.overLoaded) {
for (int idx = 0; idx < this.consumerEdges.size(); idx++) {
this.pushSupply(this.consumerEdges.get(idx), this.demands.get(idx));
}
Expand All @@ -84,12 +83,12 @@ private void distributeSupply() {
this.currentConsumerIdx = -1;
}

this.overProvisioned = false;
this.overLoaded = false;
}

// if supply < demand -> distribute the supply over all consumers
else {
this.overProvisioned = true;
this.overLoaded = true;
double[] supplies = redistributeSupply(this.demands, this.totalSupply);

for (int idx = 0; idx < this.consumerEdges.size(); idx++) {
Expand Down Expand Up @@ -178,6 +177,10 @@ public void removeConsumerEdge(FlowEdge consumerEdge) {

this.currentConsumerIdx = -1;

if (this.overLoaded) {
this.distributeSupply();
}

this.pushDemand(this.supplierEdge, this.totalDemand);
}

Expand Down Expand Up @@ -205,7 +208,7 @@ public void handleDemand(FlowEdge consumerEdge, double newDemand) {
demands.set(idx, newDemand);
this.totalDemand += (newDemand - prevDemand);

if (overProvisioned) {
if (overLoaded) {
distributeSupply();
}

Expand All @@ -216,7 +219,7 @@ public void handleDemand(FlowEdge consumerEdge, double newDemand) {

@Override
public void handleSupply(FlowEdge supplierEdge, double newSupply) {
this.totalSupply = newSupply; // Currently this is from a single supply, might turn into multiple suppliers
this.totalSupply = newSupply;

this.distributeSupply();
}
Expand Down

0 comments on commit b4f694d

Please sign in to comment.