Skip to content

Commit

Permalink
Fix UTs
Browse files Browse the repository at this point in the history
Signed-off-by: Partho Sarthi <[email protected]>
  • Loading branch information
parthosa committed Sep 2, 2024
1 parent 90d18dd commit 2fc2b3f
Show file tree
Hide file tree
Showing 13 changed files with 174 additions and 96 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -177,18 +177,41 @@ class Qualification(outputPath: String, numRows: Int, hadoopConf: Configuration,
val tempSummary = qualSumInfo.get
val newClusterSummary = tempSummary.clusterSummary.copy(
recommendedClusterInfo = pluginTypeChecker.platform.recommendedClusterInfo)
val newQualSummary = tempSummary.copy(clusterSummary = newClusterSummary)
if (AppAttemptTracker.isOlderAttemptId(app.appId, app.attemptId)) {
// If the attemptId is an older attemptId, skip this attempt.
// This can happen when the user has provided event logs for multiple attempts
progressBar.foreach(_.reportSkippedProcess())
SkippedAppResult.fromAppAttempt(pathStr, app.appId, app.attemptId)
} else {
val newQualSummary = tempSummary.copy(clusterSummary = newClusterSummary)
allApps.put(app.appId, newQualSummary)
progressBar.foreach(_.reportSuccessfulProcess())
val endTime = System.currentTimeMillis()
SuccessAppResult(pathStr, app.appId, app.attemptId,
s"Took ${endTime - startTime}ms to process")
// This is a new attemptId, process the event log
val latestAttempt = AppAttemptTracker.getLatestAttempt(app.appId).getOrElse(1)
if (latestAttempt == 1) {
// This is an event log for the first attempt of the app
// Atomically insert into the allApps map based on the App ID
val insertSuccess = allApps.putIfAbsent(app.appId, qualSumInfo.get)
if (insertSuccess != null) {
// Another event log for the same App ID has already been processed.
// Skip this event log.
progressBar.foreach(_.reportSkippedProcess())
SkippedAppResult(pathStr, s"Event log for App ID '${app.appId}' " +
s"already processed; skipping!")
} else {
// add the recommend cluster info into the summary
allApps.put(app.appId, newQualSummary)
progressBar.foreach(_.reportSuccessfulProcess())
val endTime = System.currentTimeMillis()
SuccessAppResult(pathStr, app.appId, app.attemptId,
s"Took ${endTime - startTime}ms to process")
}
} else {
// add the recommend cluster info into the summary
allApps.put(app.appId, newQualSummary)
progressBar.foreach(_.reportSuccessfulProcess())
val endTime = System.currentTimeMillis()
SuccessAppResult(pathStr, app.appId, app.attemptId,
s"Took ${endTime - startTime}ms to process")
}
}
} else {
progressBar.foreach(_.reportUnkownStatusProcess())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,16 +34,20 @@ object AppAttemptTracker {
})
}

/**
* Get the latest attemptId for the given appId.
*/
def getLatestAttempt(appId: String): Option[Int] = {
Option(attemptIdMap.get(appId))
}

/**
* Check if the attemptId is older than the existing attemptId for the given appId.
*/
def isOlderAttemptId(appId: String, attemptId: Int): Boolean = {
val existingAttemptId = attemptIdMap.getOrDefault(appId, 1)
if (existingAttemptId == 1) {
// If the attemptId is 1, then it is the first attempt and not older.
false
} else {
attemptId < existingAttemptId
getLatestAttempt(appId) match {
case Some(latestAttemptId) => latestAttemptId > 1 && attemptId < latestAttemptId
case None => false
}
}
}

Large diffs are not rendered by default.

Large diffs are not rendered by default.

Large diffs are not rendered by default.

Large diffs are not rendered by default.

Large diffs are not rendered by default.

Large diffs are not rendered by default.

Binary file not shown.
Binary file not shown.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package com.nvidia.spark.rapids.tool
import java.io.{File, FilenameFilter, FileNotFoundException}

import scala.collection.mutable.ArrayBuffer
import scala.util.Random

import com.nvidia.spark.rapids.tool.profiling.ProfileArgs
import com.nvidia.spark.rapids.tool.qualification.QualOutputWriter
Expand Down Expand Up @@ -183,6 +184,16 @@ object ToolTestUtils extends Logging {
assert(actualStatusReportCount == expStatusReportCount)
}
}

/**
* Replaces the App ID in the given event log line with a random string.
*/
def generateRandomAppId(eventLogLine: String): String = {
if (eventLogLine.contains("App ID")) {
val randomAppId = Random.alphanumeric.take(16).mkString
eventLogLine.replaceAll("""("App ID":")([^"]*)(")""", s"""$$1$randomAppId$$3""")
} else eventLogLine
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,13 +151,13 @@ class AppFilterSuite extends BaseTestSuite {
}

val appsToTest = Array(TestEventLogInfo("ndshours18", msHoursAgo(18), 1),
TestEventLogInfo("ndsweeks2", msWeeksAgo(2), 1),
TestEventLogInfo("ndsmonths4", msMonthsAgo(5), 1),
TestEventLogInfo("ndsdays3", msDaysAgo(3), 1),
TestEventLogInfo("ndsmins34", msMinAgo(34), 1),
TestEventLogInfo("nds86", msDaysAgo(4), 1),
TestEventLogInfo("nds86", msWeeksAgo(2), 2),
TestEventLogInfo("otherapp", msWeeksAgo(2), 1))
TestEventLogInfo("ndsweeks2", msWeeksAgo(2), 2),
TestEventLogInfo("ndsmonths4", msMonthsAgo(5), 3),
TestEventLogInfo("ndsdays3", msDaysAgo(3), 4),
TestEventLogInfo("ndsmins34", msMinAgo(34), 5),
TestEventLogInfo("nds86", msDaysAgo(4), 6),
TestEventLogInfo("nds86", msWeeksAgo(2), 7),
TestEventLogInfo("otherapp", msWeeksAgo(2), 8))

test("app name and start time 20m") {
testTimePeriodAndStart(appsToTest, "20m", "nds", appsToTest.size - 1)
Expand Down Expand Up @@ -225,9 +225,9 @@ class AppFilterSuite extends BaseTestSuite {

private val appsWithFsToTest = Array(
TestEventLogFSAndAppNameInfo("ndshours18", msHoursAgo(18), 1),
TestEventLogFSAndAppNameInfo("ndsweeks2", msWeeksAgo(2), 1),
TestEventLogFSAndAppNameInfo("nds86", msDaysAgo(4), 1),
TestEventLogFSAndAppNameInfo("nds86", msWeeksAgo(2), 2))
TestEventLogFSAndAppNameInfo("ndsweeks2", msWeeksAgo(2), 2),
TestEventLogFSAndAppNameInfo("nds86", msDaysAgo(4), 3),
TestEventLogFSAndAppNameInfo("nds86", msWeeksAgo(2), 4))

test("app name exact and fs 10-newest-filesystem") {
testFileSystemTimeAndStart(appsWithFsToTest, "10-newest-filesystem", "nds86", 2)
Expand Down Expand Up @@ -356,9 +356,9 @@ class AppFilterSuite extends BaseTestSuite {

private val appsFullWithFsToTest = Array(
TestEventLogFSAndAppInfo("app-ndshours18", msHoursAgo(16), "ndshours18", msHoursAgo(18), 1),
TestEventLogFSAndAppInfo("app-ndsweeks2", msWeeksAgo(2), "ndsweeks2", msWeeksAgo(2), 1),
TestEventLogFSAndAppInfo("app-nds86-1", msDaysAgo(3), "nds86", msDaysAgo(4), 1),
TestEventLogFSAndAppInfo("app-nds86-2", msDaysAgo(13), "nds86", msWeeksAgo(2), 2))
TestEventLogFSAndAppInfo("app-ndsweeks2", msWeeksAgo(2), "ndsweeks2", msWeeksAgo(2), 2),
TestEventLogFSAndAppInfo("app-nds86-1", msDaysAgo(3), "nds86", msDaysAgo(4), 3),
TestEventLogFSAndAppInfo("app-nds86-2", msDaysAgo(13), "nds86", msWeeksAgo(2), 4))

test("full app name exact and fs 10-newest-filesystem 6 days") {
testFileSystemTimeAndStartAndAppFull(appsFullWithFsToTest, "10-newest-filesystem",
Expand Down Expand Up @@ -460,37 +460,48 @@ class AppFilterSuite extends BaseTestSuite {

private val appsWithAppNameCriteriaToTest = Array(
TestEventLogFSAndAppInfo("app-ndshours18", msHoursAgo(16), "ndshours18", msHoursAgo(18), 1),
TestEventLogFSAndAppInfo("app-ndsweeks-1", msWeeksAgo(1), "ndsweeks", msWeeksAgo(1), 1),
TestEventLogFSAndAppInfo("app-ndsweeks-2", msWeeksAgo(2), "ndsweeks", msWeeksAgo(2), 2),
TestEventLogFSAndAppInfo("app-nds86-1", msDaysAgo(3), "nds86", msDaysAgo(4), 1),
TestEventLogFSAndAppInfo("app-nds86-2", msDaysAgo(13), "nds86", msWeeksAgo(2), 2),
TestEventLogFSAndAppInfo("app-nds86-3", msDaysAgo(18), "nds86", msWeeksAgo(3), 3))
TestEventLogFSAndAppInfo("app-ndsweeks-1", msWeeksAgo(1), "ndsweeks", msWeeksAgo(1), 2),
TestEventLogFSAndAppInfo("app-ndsweeks-2", msWeeksAgo(2), "ndsweeks", msWeeksAgo(2), 3),
TestEventLogFSAndAppInfo("app-nds86-1", msDaysAgo(3), "nds86", msDaysAgo(4), 4),
TestEventLogFSAndAppInfo("app-nds86-2", msDaysAgo(13), "nds86", msWeeksAgo(2), 5),
TestEventLogFSAndAppInfo("app-nds86-3", msDaysAgo(18), "nds86", msWeeksAgo(3), 6))

test("standalone 1-oldest-per-app-name") {
val expected = Array(("ndshours18", "local-162610430031"), ("ndsweeks", "local-162610430032"),
("nds86", "local-162610430033"))
val expected = Array(
("ndshours18", "local-162610430031"),
("ndsweeks", "local-162610430033"),
("nds86", "local-162610430036"))
testAppFilterCriteriaAndPerAppName(appsWithAppNameCriteriaToTest, "1-oldest-per-app-name",
3, expected)
}

test("standalone 2-newest-per-app-name") {
val expected = Array(("ndshours18", "local-162610430031"), ("ndsweeks", "local-162610430031"),
("ndsweeks", "local-162610430032"), ("nds86", "local-162610430031"),
("nds86", "local-162610430032"))
val expected = Array(
("ndshours18", "local-162610430031"),
("ndsweeks", "local-162610430032"),
("ndsweeks", "local-162610430033"),
("nds86", "local-162610430034"),
("nds86", "local-162610430035"))
testAppFilterCriteriaAndPerAppName(appsWithAppNameCriteriaToTest, "2-newest-per-app-name",
5, expected)
}

test("standalone 2-newest based on app time") {
val expected = Array(("ndshours18", "local-162610430031"), ("nds86", "local-162610430031"))
val expected = Array(
("ndshours18", "local-162610430031"),
("nds86", "local-162610430034"))
testAppFilterCriteriaAndPerAppName(appsWithAppNameCriteriaToTest,
"2-newest", 2, expected)
}

test("standalone 10-oldest based on app time") {
val expected = Array(("nds86", "local-162610430031"), ("nds86", "local-162610430032"),
("nds86", "local-162610430033"), ("ndsweeks", "local-162610430031"),
("ndsweeks", "local-162610430032"), ("ndshours18", "local-162610430031"))
val expected = Array(
("ndshours18", "local-162610430031"),
("ndsweeks", "local-162610430032"),
("ndsweeks", "local-162610430033"),
("nds86", "local-162610430034"),
("nds86", "local-162610430035"),
("nds86", "local-162610430036"))
testAppFilterCriteriaAndPerAppName(appsWithAppNameCriteriaToTest, "10-oldest", 6, expected)
}

Expand Down Expand Up @@ -537,14 +548,14 @@ class AppFilterSuite extends BaseTestSuite {
TestRegexAppNameAndUserName("app-ndshours18", msHoursAgo(16), "ndshours18",
msHoursAgo(18), 1, "user1"),
TestRegexAppNameAndUserName("app-ndsweeks-1", msWeeksAgo(1), "ndsweeks",
msWeeksAgo(1), 1, "user1"),
msWeeksAgo(1), 2, "user1"),
TestRegexAppNameAndUserName("app-ndsweeks-2", msWeeksAgo(2), "ndsweeks",
msWeeksAgo(2), 2, "user2"),
msWeeksAgo(2), 3, "user2"),
TestRegexAppNameAndUserName("app-ndsweeks-3", msWeeksAgo(3), "Ndsweeks",
msWeeksAgo(3), 3, "user3"),
TestRegexAppNameAndUserName("app-nds86-1", msDaysAgo(3), "nds86", msDaysAgo(4), 1, "user1"),
TestRegexAppNameAndUserName("app-nds86-2", msDaysAgo(13), "Nds86", msWeeksAgo(2), 2, "user2"),
TestRegexAppNameAndUserName("app-nds86-3", msDaysAgo(18), "nds86", msWeeksAgo(3), 3, "user3"))
msWeeksAgo(3), 4, "user3"),
TestRegexAppNameAndUserName("app-nds86-1", msDaysAgo(3), "nds86", msDaysAgo(4), 5, "user1"),
TestRegexAppNameAndUserName("app-nds86-2", msDaysAgo(13), "Nds86", msWeeksAgo(2), 6, "user2"),
TestRegexAppNameAndUserName("app-nds86-3", msDaysAgo(18), "nds86", msWeeksAgo(3), 7, "user3"))

test("App Name Regex match with all user name") {
testAppNameRegexAndUserName(appsWithAppNameRegexAndUserNameToTest,
Expand Down Expand Up @@ -625,17 +636,17 @@ class AppFilterSuite extends BaseTestSuite {
TestConjunctionAndDisjunction("app-ndshours18", msHoursAgo(16), "Ndshours18",
msHoursAgo(18), 1, "user1"),
TestConjunctionAndDisjunction("app-Ndsweeks-1", msWeeksAgo(1), "ndsweeks",
msWeeksAgo(1), 1, "user1"),
msWeeksAgo(1), 2, "user1"),
TestConjunctionAndDisjunction("app-ndsweeks-2", msWeeksAgo(2), "Ndsweeks",
msWeeksAgo(2), 2, "user2"),
msWeeksAgo(2), 3, "user2"),
TestConjunctionAndDisjunction("app-ndsweeks-3", msWeeksAgo(3), "ndsweeks",
msWeeksAgo(3), 3, "user3"),
msWeeksAgo(3), 4, "user3"),
TestConjunctionAndDisjunction("app-Nds86-1", msDaysAgo(3), "nds86",
msDaysAgo(4), 1, "user1"),
msDaysAgo(4), 5, "user1"),
TestConjunctionAndDisjunction("app-nds86-2", msDaysAgo(6), "nds86",
msWeeksAgo(1), 2, "user2"),
msWeeksAgo(1), 6, "user2"),
TestConjunctionAndDisjunction("app-nds86-3", msDaysAgo(18), "nds86",
msWeeksAgo(3), 3, "user3"))
msWeeksAgo(3), 7, "user3"))

test("Test disjunction all filters") {
testConjunctionAndDisjunction(appsNameConjunctionAndDisjunctionToTest,
Expand Down Expand Up @@ -846,12 +857,14 @@ class AppFilterSuite extends BaseTestSuite {
TrampolineUtil.withTempDir { tmpEventLogDir =>

val fileNames = apps.map { app =>
val userPattern = "user(\\d+)".r
val userId = userPattern.findFirstMatchIn(app.userName).get.group(1).toInt
val elogFile = Paths.get(tmpEventLogDir.getAbsolutePath, app.fileName)
// scalastyle:off line.size.limit
val supText =
s"""{"Event":"SparkListenerLogStart","Spark Version":"3.1.1"}
|{"Event":"SparkListenerApplicationStart","App Name":"${app.appName}", "App ID":"local-16261043003${app.uniqueId}","Timestamp":${app.appTime}, "User":"${app.userName}"}
|{"Event":"SparkListenerEnvironmentUpdate","JVM Information":{"Java Home":"/usr/lib/jvm/java-8-openjdk-amd64/jre"},"Spark Properties":{"spark.driver.host":"10.10.19.1${app.uniqueId}","spark.app.name":"${app.appName}","spark.driver.port":"4349${app.uniqueId}","spark.eventLog.enabled":"true","spark.master":"spark://5.6.7.8:707${app.uniqueId + 4}","spark.redaction.regex":"*********(redacted)","spark.eventLog.dir":"file:///tmp/spark-events-${app.uniqueId}","spark.sql.maven.additionalRemoteRepositories":"https://maven-central.storage-download.googleapis.com/maven2/","spark.sql.hive.metastore.sharedPrefixes":"com.mysql.jdbc,org.postgresql,com.microsoft.sqlserver","spark.shuffle.io.maxRetries":"${app.uniqueId}","spark.shuffle.registration.maxAttempts":"${app.uniqueId + 1}"},"Hadoop Properties":{"hadoop.service.shutdown.timeout":"30s"},"System Properties":{"java.io.tmpdir":"/tmp"},"Classpath Entries":{"/home/user1/runspace/spark311/spark-3.1.1-bin-hadoop3.2/jars/hive-exec-2.3.7-core.jar":"System Classpath"}}""".stripMargin
|{"Event":"SparkListenerEnvironmentUpdate","JVM Information":{"Java Home":"/usr/lib/jvm/java-8-openjdk-amd64/jre"},"Spark Properties":{"spark.driver.host":"10.10.19.1$userId","spark.app.name":"${app.appName}","spark.driver.port":"4349$userId","spark.eventLog.enabled":"true","spark.master":"spark://5.6.7.8:707${userId + 4}","spark.redaction.regex":"*********(redacted)","spark.eventLog.dir":"file:///tmp/spark-events-$userId","spark.sql.maven.additionalRemoteRepositories":"https://maven-central.storage-download.googleapis.com/maven2/","spark.sql.hive.metastore.sharedPrefixes":"com.mysql.jdbc,org.postgresql,com.microsoft.sqlserver","spark.shuffle.io.maxRetries":"$userId","spark.shuffle.registration.maxAttempts":"${userId + 1}"},"Hadoop Properties":{"hadoop.service.shutdown.timeout":"30s"},"System Properties":{"java.io.tmpdir":"/tmp"},"Classpath Entries":{"/home/user1/runspace/spark311/spark-3.1.1-bin-hadoop3.2/jars/hive-exec-2.3.7-core.jar":"System Classpath"}}""".stripMargin
// scalastyle:on line.size.limit
Files.write(elogFile, supText.getBytes(StandardCharsets.UTF_8))
new File(elogFile.toString).setLastModified(app.fsTime)
Expand All @@ -871,7 +884,7 @@ class AppFilterSuite extends BaseTestSuite {
}
}
}

test("Test filtering eventlog with missing start event") {
TrampolineUtil.withTempDir { outpath =>
TrampolineUtil.withTempDir { tmpEventLogDir =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -495,7 +495,13 @@ class QualificationSuite extends BaseTestSuite {
val tailLines = allEventLines.takeRight(5)
val selectedLines: List[String] = allEventLines.dropRight(5)
selectedLines.foreach { line =>
pwList.foreach(pw => pw.println(line))
if (line.contains("App ID")) {
pwList.foreach { pw =>
pw.println(ToolTestUtils.generateRandomAppId(line))
}
} else {
pwList.foreach(_.println(line))
}
}
for (i <- 0 to tailLines.length - 1) {
if (i == 0) {
Expand Down Expand Up @@ -1779,6 +1785,27 @@ class QualificationSuite extends BaseTestSuite {
ToolTestUtils.compareStatusReport(sparkSession, expectedStatusCount, statusResultFile)
}
}

test("process multiple event logs with same app ID and skip one of them") {
TrampolineUtil.withTempDir { outPath =>
val baseArgs = Array("--output-directory",
outPath.getAbsolutePath,
s"$logDir/eventlog_same_app_id_1.zstd",
s"$logDir/eventlog_same_app_id_2.zstd")
val appArgs = new QualificationArgs(baseArgs)
val (exitCode, result) = QualificationMain.mainInternal(appArgs)
assert(exitCode == 0 && result.size == 1,
"Qualification tool returned unexpected results.")

val statusResultFile = s"$outPath/${QualOutputWriter.LOGFILE_NAME}/" +
s"${QualOutputWriter.LOGFILE_NAME}_status.csv"

// Only one of the event logs should be processed and the other should be skipped.
// Status counts: 1 SUCCESS, 0 FAILURE, 1 SKIPPED, 0 UNKNOWN
val expectedStatusCount = StatusReportCounts(1, 0, 1, 0)
ToolTestUtils.compareStatusReport(sparkSession, expectedStatusCount, statusResultFile)
}
}
}

class ToolTestListener extends SparkListener {
Expand Down

0 comments on commit 2fc2b3f

Please sign in to comment.