Skip to content

Commit

Permalink
Multiplexer update (#278)
Browse files Browse the repository at this point in the history
* Fixed the Multiplexer.java to properly divide the supply over the different consumers.

Fixed a bug where fragments were being loaded in reversed order.

* Optimized the Multiplexer.java, by only updating the supply of the consumer that updated its demand when possible.
  • Loading branch information
DanteNiewenhuis authored Nov 29, 2024
1 parent 124b40c commit a49a387
Show file tree
Hide file tree
Showing 8 changed files with 123 additions and 65 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,4 +40,5 @@ public record GuestCpuStats(
long lostTime,
double capacity,
double usage,
double demand,
double utilization) {}
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,7 @@ public class Guest(
counters.cpuLostTime / 1000L,
counters.cpuCapacity,
counters.cpuSupply,
counters.cpuDemand,
counters.cpuSupply / cpuLimit,
)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,16 @@ public object DfltTaskExportColumns {
field = Types.required(FLOAT).named("cpu_limit"),
) { it.cpuLimit }

public val CPU_USAGE: ExportColumn<TaskTableReader> =
ExportColumn(
field = Types.required(FLOAT).named("cpu_usage"),
) { it.cpuUsage }

public val CPU_DEMAND: ExportColumn<TaskTableReader> =
ExportColumn(
field = Types.required(FLOAT).named("cpu_demand"),
) { it.cpuDemand }

public val CPU_TIME_ACTIVE: ExportColumn<TaskTableReader> =
ExportColumn(
field = Types.required(INT64).named("cpu_time_active"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,16 @@ public interface TaskTableReader : Exportable {
*/
public val cpuLimit: Double

/**
* The CPU given to this task (in MHz).
*/
public val cpuUsage: Double

/**
* The CPU demanded by this task (in MHz).
*/
public val cpuDemand: Double

/**
* The duration (in seconds) that a CPU was active in the task.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ public class TaskTableReaderImpl(
_timestampAbsolute = table.timestampAbsolute

_cpuLimit = table.cpuLimit
_cpuDemand = table.cpuDemand
_cpuUsage = table.cpuUsage
_cpuActiveTime = table.cpuActiveTime
_cpuIdleTime = table.cpuIdleTime
_cpuStealTime = table.cpuStealTime
Expand Down Expand Up @@ -128,6 +130,14 @@ public class TaskTableReaderImpl(
get() = _cpuLimit
private var _cpuLimit = 0.0

override val cpuUsage: Double
get() = _cpuUsage
private var _cpuUsage = 0.0

override val cpuDemand: Double
get() = _cpuDemand
private var _cpuDemand = 0.0

override val cpuActiveTime: Long
get() = _cpuActiveTime - previousCpuActiveTime
private var _cpuActiveTime = 0L
Expand Down Expand Up @@ -181,14 +191,16 @@ public class TaskTableReaderImpl(
_timestampAbsolute = now + startTime

_cpuLimit = cpuStats?.capacity ?: 0.0
_cpuActiveTime = cpuStats?.activeTime ?: 0
_cpuIdleTime = cpuStats?.idleTime ?: 0
_cpuStealTime = cpuStats?.stealTime ?: 0
_cpuLostTime = cpuStats?.lostTime ?: 0
_uptime = sysStats?.uptime?.toMillis() ?: 0
_downtime = sysStats?.downtime?.toMillis() ?: 0
_cpuDemand = cpuStats?.demand ?: 0.0
_cpuUsage = cpuStats?.usage ?: 0.0
_cpuActiveTime = cpuStats?.activeTime ?: _cpuActiveTime
_cpuIdleTime = cpuStats?.idleTime ?: _cpuIdleTime
_cpuStealTime = cpuStats?.stealTime ?: _cpuStealTime
_cpuLostTime = cpuStats?.lostTime ?: _cpuLostTime
_uptime = sysStats?.uptime?.toMillis() ?: _uptime
_downtime = sysStats?.downtime?.toMillis() ?: _downtime
_provisionTime = task.launchedAt
_bootTime = sysStats?.bootTime
_bootTime = sysStats?.bootTime ?: _bootTime
_creationTime = task.createdAt
_finishTime = task.finishedAt

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -253,9 +253,9 @@ class ScenarioIntegrationTest {
assertAll(
{ assertEquals(0, monitor.tasksTerminated) { "Idle time incorrect" } },
{ assertEquals(1, monitor.tasksCompleted) { "Idle time incorrect" } },
{ assertEquals(4297000, monitor.idleTime) { "Idle time incorrect" } },
{ assertEquals(5003000, monitor.activeTime) { "Active time incorrect" } },
{ assertEquals(0, monitor.stealTime) { "Steal time incorrect" } },
{ assertEquals(4296000, monitor.idleTime) { "Idle time incorrect" } },
{ assertEquals(5004000, monitor.activeTime) { "Active time incorrect" } },
{ assertEquals(14824, monitor.stealTime) { "Steal time incorrect" } },
{ assertEquals(0, monitor.lostTime) { "Lost time incorrect" } },
{ assertEquals(2860800.0, monitor.energyUsage, 1E4) { "Incorrect energy usage" } },
)
Expand Down Expand Up @@ -294,8 +294,8 @@ class ScenarioIntegrationTest {

// Note that these values have been verified beforehand
assertAll(
{ assertEquals(1803918431, monitor.idleTime) { "Idle time incorrect" } },
{ assertEquals(787181569, monitor.activeTime) { "Active time incorrect" } },
{ assertEquals(1803918432, monitor.idleTime) { "Idle time incorrect" } },
{ assertEquals(787181568, monitor.activeTime) { "Active time incorrect" } },
{ assertEquals(0, monitor.stealTime) { "Steal time incorrect" } },
{ assertEquals(0, monitor.lostTime) { "Lost time incorrect" } },
{ assertEquals(6.7565629E8, monitor.energyUsage, 1E4) { "Incorrect energy usage" } },
Expand Down Expand Up @@ -341,8 +341,8 @@ class ScenarioIntegrationTest {
{ assertEquals(0, monitor.tasksActive, "All VMs should finish after a run") },
{ assertEquals(0, monitor.attemptsFailure, "No VM should be unscheduled") },
{ assertEquals(0, monitor.tasksPending, "No VM should not be in the queue") },
{ assertEquals(43101787498, monitor.idleTime) { "Incorrect idle time" } },
{ assertEquals(3489412502, monitor.activeTime) { "Incorrect active time" } },
{ assertEquals(43101787496, monitor.idleTime) { "Incorrect idle time" } },
{ assertEquals(3489412504, monitor.activeTime) { "Incorrect active time" } },
{ assertEquals(0, monitor.stealTime) { "Incorrect steal time" } },
{ assertEquals(0, monitor.lostTime) { "Incorrect lost time" } },
{ assertEquals(1.0016123392181786E10, monitor.energyUsage, 1E4) { "Incorrect energy usage" } },
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,9 @@ public final class SimCpu extends FlowNode implements FlowSupplier, FlowConsumer

private double currentCpuDemand = 0.0f; // cpu capacity demanded by the mux
private double currentCpuUtilization = 0.0f;
private double currentPowerDemand = 0.0f; // power demanded of the psu
private double currentCpuSupplied = 0.0f; // cpu capacity supplied to the mux

private double currentPowerDemand = 0.0f; // power demanded of the psu
private double currentPowerSupplied = 0.0f; // cpu capacity supplied by the psu

private double maxCapacity;
Expand Down Expand Up @@ -122,7 +123,7 @@ public SimCpu(FlowGraph graph, CpuModel cpuModel, CpuPowerModel powerModel, int
public long onUpdate(long now) {
updateCounters(now);

this.currentCpuUtilization = this.currentCpuDemand / this.maxCapacity;
this.currentCpuUtilization = Math.min(this.currentCpuDemand / this.maxCapacity, 1.0);

// Calculate Power Demand and send to PSU
double powerDemand = this.cpuPowerModel.computePower(this.currentCpuUtilization);
Expand All @@ -132,7 +133,7 @@ public long onUpdate(long now) {
}

// Calculate the amount of cpu this can provide
double cpuSupply = this.currentCpuDemand;
double cpuSupply = Math.min(this.currentCpuDemand, this.maxCapacity);

if (cpuSupply != this.currentCpuSupplied) {
this.pushSupply(this.muxEdge, cpuSupply);
Expand Down Expand Up @@ -205,6 +206,8 @@ public void handleDemand(FlowEdge consumerEdge, double newCpuDemand) {
this.currentCpuDemand = newCpuDemand;
this.currentCpuUtilization = this.currentCpuDemand / this.maxCapacity;

this.currentCpuUtilization = Math.min(this.currentCpuDemand / this.maxCapacity, 1.0);

// Calculate Power Demand and send to PSU
double powerDemand = this.cpuPowerModel.computePower(this.currentCpuUtilization);

Expand All @@ -223,7 +226,8 @@ public void handleSupply(FlowEdge supplierEdge, double newPowerSupply) {
this.currentPowerSupplied = newPowerSupply;

// Calculate the amount of cpu this can provide
double cpuSupply = this.currentCpuDemand;
double cpuSupply = Math.min(this.currentCpuDemand, this.maxCapacity);
;

if (cpuSupply != this.currentCpuSupplied) {
this.pushSupply(this.muxEdge, cpuSupply);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,10 @@ public class Multiplexer extends FlowNode implements FlowSupplier, FlowConsumer

private double totalDemand; // The total demand of all the consumers
private double totalSupply; // The total supply from the supplier

private boolean overProvisioned = false;
private int currentConsumerIdx = -1;

private double capacity; // What is the max capacity

public Multiplexer(FlowGraph graph) {
Expand All @@ -59,52 +63,63 @@ public double getCapacity() {

public long onUpdate(long now) {

if (this.totalDemand > this.capacity) {
redistributeSupply(this.consumerEdges, this.supplies, this.capacity);
} else {
for (int i = 0; i < this.demands.size(); i++) {
this.supplies.set(i, this.demands.get(i));
return Long.MAX_VALUE;
}

private void distributeSupply() {
// if supply >= demand -> push supplies to all tasks
// TODO: possible optimization -> Only has to be done for the specific consumer that changed demand
if (this.totalSupply >= this.totalDemand) {

// If this came from a state of over provisioning, provide all consumers with their demand
if (this.overProvisioned) {
for (int idx = 0; idx < this.consumerEdges.size(); idx++) {
this.pushSupply(this.consumerEdges.get(idx), this.demands.get(idx));
}
}
}

double totalSupply = 0;
for (int i = 0; i < this.consumerEdges.size(); i++) {
this.pushSupply(this.consumerEdges.get(i), this.supplies.get(i));
totalSupply += this.supplies.get(i);
if (this.currentConsumerIdx != -1) {
this.pushSupply(
this.consumerEdges.get(this.currentConsumerIdx), this.demands.get(this.currentConsumerIdx));
this.currentConsumerIdx = -1;
}

this.overProvisioned = false;
}

// Only update supplier if supply has changed
if (this.totalSupply != totalSupply) {
this.totalSupply = totalSupply;
// if supply < demand -> distribute the supply over all consumers
else {
this.overProvisioned = true;
double[] supplies = redistributeSupply(this.demands, this.totalSupply);

pushDemand(this.supplierEdge, this.totalSupply);
for (int idx = 0; idx < this.consumerEdges.size(); idx++) {
this.pushSupply(this.consumerEdges.get(idx), supplies[idx]);
}
}

return Long.MAX_VALUE;
}

private static double redistributeSupply(
ArrayList<FlowEdge> consumerEdges, ArrayList<Double> supplies, double capacity) {
final long[] consumers = new long[consumerEdges.size()];
private record Demand(int idx, double value) {}

for (int i = 0; i < consumers.length; i++) {
FlowEdge consumer = consumerEdges.get(i);
private static double[] redistributeSupply(ArrayList<Double> demands, double totalSupply) {
int inputSize = demands.size();

if (consumer == null) {
break;
}
final double[] supplies = new double[inputSize];
final Demand[] tempDemands = new Demand[inputSize];

consumers[i] = (Double.doubleToRawLongBits(consumer.getDemand()) << 32) | (i & 0xFFFFFFFFL);
for (int i = 0; i < inputSize; i++) {
tempDemands[i] = new Demand(i, demands.get(i));
}
Arrays.sort(consumers);

double availableCapacity = capacity;
int inputSize = consumers.length;
Arrays.sort(tempDemands, (o1, o2) -> {
Double i1 = o1.value;
Double i2 = o2.value;
return i1.compareTo(i2);
});

double availableCapacity = totalSupply; // totalSupply

for (int i = 0; i < inputSize; i++) {
long v = consumers[i];
int slot = (int) v;
double d = Double.longBitsToDouble((int) (v >> 32));
double d = tempDemands[i].value;

if (d == 0.0) {
continue;
Expand All @@ -113,12 +128,13 @@ private static double redistributeSupply(
double availableShare = availableCapacity / (inputSize - i);
double r = Math.min(d, availableShare);

supplies.set(slot, r); // Update the rates
int idx = tempDemands[i].idx;
supplies[idx] = r; // Update the rates
availableCapacity -= r;
}

// Return the used capacity
return capacity - availableCapacity;
return supplies;
}

/**
Expand All @@ -132,17 +148,13 @@ public void addConsumerEdge(FlowEdge consumerEdge) {
this.consumerEdges.add(consumerEdge);
this.demands.add(0.0);
this.supplies.add(0.0);

this.invalidate();
}

@Override
public void addSupplierEdge(FlowEdge supplierEdge) {
this.supplierEdge = supplierEdge;
this.capacity = supplierEdge.getCapacity();
this.totalSupply = 0;

this.invalidate();
}

@Override
Expand All @@ -164,7 +176,9 @@ public void removeConsumerEdge(FlowEdge consumerEdge) {
this.consumerEdges.get(i).setConsumerIndex(i);
}

this.invalidate();
this.currentConsumerIdx = -1;

this.pushDemand(this.supplierEdge, this.totalDemand);
}

@Override
Expand All @@ -178,28 +192,34 @@ public void removeSupplierEdge(FlowEdge supplierEdge) {
public void handleDemand(FlowEdge consumerEdge, double newDemand) {
int idx = consumerEdge.getConsumerIndex();

this.currentConsumerIdx = idx;

if (idx == -1) {
System.out.println("Error (Multiplexer): Demand pushed by an unknown consumer");
return;
}

// Update the total demand (This is cheaper than summing over all demands)
double prevDemand = demands.get(idx);
demands.set(idx, newDemand);

demands.set(idx, newDemand);
this.totalDemand += (newDemand - prevDemand);

if (this.totalDemand <= this.capacity) {

this.totalSupply = this.totalDemand;
this.pushDemand(this.supplierEdge, this.totalSupply);

this.pushSupply(consumerEdge, newDemand);
if (overProvisioned) {
distributeSupply();
}
// TODO: add behaviour if capacity is reached

// Send new totalDemand to CPU
// TODO: Look at what happens if total demand is not changed (if total demand is higher than totalSupply)
this.pushDemand(this.supplierEdge, this.totalDemand);
}

@Override
public void handleSupply(FlowEdge supplierEdge, double newSupply) {}
public void handleSupply(FlowEdge supplierEdge, double newSupply) {
this.totalSupply = newSupply; // Currently this is from a single supply, might turn into multiple suppliers

this.distributeSupply();
}

@Override
public void pushDemand(FlowEdge supplierEdge, double newDemand) {
Expand Down

0 comments on commit a49a387

Please sign in to comment.