Skip to content

Commit

Permalink
Added sampleFraction and submissionTime to the workloadSpec (#295)
Browse files Browse the repository at this point in the history
* Added sampleFraction and submissionTime to the workloadSpec

* Removed commented code
  • Loading branch information
DanteNiewenhuis authored Jan 23, 2025
1 parent 9403af1 commit bb945c2
Show file tree
Hide file tree
Showing 7 changed files with 57 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,11 @@ import kotlin.math.roundToLong
*/
public class ComputeWorkloadLoader(
private val pathToFile: File,
private val checkpointInterval: Long,
private val checkpointDuration: Long,
private val checkpointIntervalScaling: Double,
) : WorkloadLoader() {
private val subMissionTime: String? = null,
private val checkpointInterval: Long = 0L,
private val checkpointDuration: Long = 0L,
private val checkpointIntervalScaling: Double = 1.0,
) : WorkloadLoader(subMissionTime) {
/**
* The logger for this instance.
*/
Expand Down Expand Up @@ -160,25 +161,11 @@ public class ComputeWorkloadLoader(
* Load the trace at the specified [pathToFile].
*/
override fun load(): List<Task> {
val ref =
cache.compute(pathToFile) { key, oldVal ->
val inst = oldVal?.get()
if (inst == null) {
// val path = baseDir.resolve(key)
val trace = Trace.open(pathToFile, "opendc-vm")
val fragments = parseFragments(trace)
val vms = parseMeta(trace, fragments)

logger.info { "Loading trace $key at $pathToFile" }

val trace = Trace.open(pathToFile, "opendc-vm")
val fragments = parseFragments(trace)
val vms = parseMeta(trace, fragments)

SoftReference(vms)
} else {
oldVal
}
}

return checkNotNull(ref?.get()) { "Memory pressure" }
return vms
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public data class Task(
val cpuCapacity: Double,
val memCapacity: Long,
val totalLoad: Double,
val submissionTime: Instant,
var submissionTime: Instant,
val duration: Long,
val trace: TraceWorkload,
)
Original file line number Diff line number Diff line change
Expand Up @@ -22,24 +22,56 @@

package org.opendc.compute.workload
import mu.KotlinLogging
import java.time.Instant
import java.time.LocalDateTime
import java.time.ZoneOffset

public abstract class WorkloadLoader {
public abstract class WorkloadLoader(private val submissionTime: String? = null) {
private val logger = KotlinLogging.logger {}

public fun reScheduleTasks(workload: List<Task>) {
if (submissionTime == null) {
return
}

val workloadSubmissionTime = workload.minOf({ it.submissionTime }).toEpochMilli()
val submissionTimeLong = LocalDateTime.parse(submissionTime).toInstant(ZoneOffset.UTC).toEpochMilli()

val timeShift = submissionTimeLong - workloadSubmissionTime

for (task in workload) {
task.submissionTime = Instant.ofEpochMilli(task.submissionTime.toEpochMilli() + timeShift)
}
}

public abstract fun load(): List<Task>

/**
* Load the workload at sample tasks until a fraction of the workload is loaded
*/
public fun sampleByLoad(fraction: Double): List<Task> {
val vms = this.load()
val workload = this.load()

reScheduleTasks(workload)

if (fraction >= 1.0) {
return workload
}

if (fraction <= 0.0) {
throw Error("The fraction of tasks to load cannot be 0.0 or lower")
}

val res = mutableListOf<Task>()

val totalLoad = vms.sumOf { it.totalLoad }
val totalLoad = workload.sumOf { it.totalLoad }
var currentLoad = 0.0

for (entry in vms) {
val shuffledWorkload = workload.shuffled()
for (entry in shuffledWorkload) {
val entryLoad = entry.totalLoad

// TODO: ask Sacheen
if ((currentLoad + entryLoad) / totalLoad > fraction) {
break
}
Expand All @@ -48,7 +80,7 @@ public abstract class WorkloadLoader {
res += entry
}

logger.info { "Sampled ${vms.size} VMs (fraction $fraction) into subset of ${res.size} VMs" }
logger.info { "Sampled ${workload.size} VMs (fraction $fraction) into subset of ${res.size} VMs" }

return res
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,15 +32,20 @@ import java.io.File
*
* @property pathToFile
* @property type
* @property sampleFraction
* @property submissionTime
*/
@Serializable
public data class WorkloadSpec(
val pathToFile: String,
val type: WorkloadTypes,
val sampleFraction: Double = 1.0,
val submissionTime: String? = null,
) {
public val name: String = File(pathToFile).nameWithoutExtension

init {
require(sampleFraction > 0) { "The fraction of the tasks can not be 0.0 or lower" }
require(File(pathToFile).exists()) { "The provided path to the workload: $pathToFile does not exist " }
}
}
Expand All @@ -65,6 +70,7 @@ public enum class WorkloadTypes {
public fun getWorkloadLoader(
type: WorkloadTypes,
pathToFile: File,
submissionTime: String?,
checkpointInterval: Long,
checkpointDuration: Long,
checkpointIntervalScaling: Double,
Expand All @@ -73,6 +79,7 @@ public fun getWorkloadLoader(
WorkloadTypes.ComputeWorkload ->
ComputeWorkloadLoader(
pathToFile,
submissionTime,
checkpointInterval,
checkpointDuration,
checkpointIntervalScaling,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,9 +139,6 @@ public suspend fun ComputeService.replay(

// Wait until the task is terminated
taskWatcher.wait()

// Stop the task after reaching the end-time of the virtual machine
// task.delete()
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,24 +80,16 @@ public fun runScenario(
val checkpointDuration = scenario.checkpointModelSpec?.checkpointDuration ?: 0L
val checkpointIntervalScaling = scenario.checkpointModelSpec?.checkpointIntervalScaling ?: 1.0

// val workloadLoader =
// ComputeWorkloadLoader(
// File(scenario.workloadSpec.pathToFile),
// checkpointInterval,
// checkpointDuration,
// checkpointIntervalScaling,
// )
// val tasks = getWorkloadType(scenario.workloadSpec.type).resolve(workloadLoader, Random(seed))

val workloadLoader =
getWorkloadLoader(
scenario.workloadSpec.type,
File(scenario.workloadSpec.pathToFile),
scenario.workloadSpec.submissionTime,
checkpointInterval,
checkpointDuration,
checkpointIntervalScaling,
)
val workload = workloadLoader.load()
val workload = workloadLoader.sampleByLoad(scenario.workloadSpec.sampleFraction)

val startTimeLong = workload.minOf { it.submissionTime }.toEpochMilli()
val startTime = Duration.ofMillis(startTimeLong)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ public class OpenDCRunner(
/**
* Helper class to load the workloads.
*/
private val workloadLoader = ComputeWorkloadLoader(tracePath, 0L, 0L, 0.0)
private val workloadLoader = ComputeWorkloadLoader(tracePath)

/**
* The [ForkJoinPool] that is used to execute the simulation jobs.
Expand Down

0 comments on commit bb945c2

Please sign in to comment.