diff --git a/auron-spark-ui/src/main/scala/org/apache/auron/spark/ui/AuronEvent.scala b/auron-spark-ui/src/main/scala/org/apache/auron/spark/ui/AuronEvent.scala
index b0dd2fdda..ee3c9a532 100644
--- a/auron-spark-ui/src/main/scala/org/apache/auron/spark/ui/AuronEvent.scala
+++ b/auron-spark-ui/src/main/scala/org/apache/auron/spark/ui/AuronEvent.scala
@@ -23,3 +23,11 @@ import org.apache.spark.scheduler.SparkListenerEvent
sealed trait AuronEvent extends SparkListenerEvent {}
case class AuronBuildInfoEvent(info: mutable.LinkedHashMap[String, String]) extends AuronEvent {}
+
+case class AuronPlanFallbackEvent(
+ executionId: Long,
+ numAuronNodes: Int,
+ numFallbackNodes: Int,
+ physicalPlanDescription: String,
+ fallbackNodeToReason: Map[String, String])
+ extends AuronEvent {}
diff --git a/auron-spark-ui/src/main/scala/org/apache/spark/sql/execution/ui/AuronAllExecutionsPage.scala b/auron-spark-ui/src/main/scala/org/apache/spark/sql/execution/ui/AuronAllExecutionsPage.scala
index c237557ff..41b26eab7 100644
--- a/auron-spark-ui/src/main/scala/org/apache/spark/sql/execution/ui/AuronAllExecutionsPage.scala
+++ b/auron-spark-ui/src/main/scala/org/apache/spark/sql/execution/ui/AuronAllExecutionsPage.scala
@@ -16,12 +16,18 @@
*/
package org.apache.spark.sql.execution.ui
+import java.net.URLEncoder
+import java.nio.charset.StandardCharsets.UTF_8
import javax.servlet.http.HttpServletRequest
-import scala.xml.{Node, NodeSeq}
+import scala.collection.JavaConverters.mapAsScalaMapConverter
+import scala.collection.mutable
+import scala.xml.{Node, NodeSeq, Unparsed}
import org.apache.spark.internal.Logging
-import org.apache.spark.ui.{UIUtils, WebUIPage}
+import org.apache.spark.sql.catalyst.util.StringUtils.PlanStringConcat
+import org.apache.spark.ui.{PagedDataSource, PagedTable, UIUtils, WebUIPage}
+import org.apache.spark.util.Utils
private[ui] class AuronAllExecutionsPage(parent: AuronSQLTab) extends WebUIPage("") with Logging {
@@ -29,6 +35,38 @@ private[ui] class AuronAllExecutionsPage(parent: AuronSQLTab) extends WebUIPage(
override def render(request: HttpServletRequest): Seq[Node] = {
val buildInfo = sqlStore.buildInfo()
+ val data = sqlStore.executionsList()
+
+ val content = {
+ val _content = mutable.ListBuffer[Node]()
+
+ val auronPageTable =
+ executionsTable(request, "auron", data)
+
+ _content ++=
+
+
+ ++
+
+ {auronPageTable}
+
+
+ _content
+ }
+ content ++=
+
+
val infos =
UIUtils.listingTable(propertyHeader, propertyRow, buildInfo.info, fixedWidth = true)
val summary: NodeSeq =
@@ -48,7 +86,7 @@ private[ui] class AuronAllExecutionsPage(parent: AuronSQLTab) extends WebUIPage(
- UIUtils.headerSparkPage(request, "Auron", summary, parent)
+ UIUtils.headerSparkPage(request, "Auron", summary ++ content, parent)
}
private def propertyHeader = Seq("Name", "Value")
@@ -61,4 +99,297 @@ private[ui] class AuronAllExecutionsPage(parent: AuronSQLTab) extends WebUIPage(
+ private def executionsTable(
+ request: HttpServletRequest,
+ executionTag: String,
+ executionData: Seq[AuronSQLExecutionUIData]): Seq[Node] = {
+
+ val executionPage =
+ Option(request.getParameter(s"$executionTag.page")).map(_.toInt).getOrElse(1)
+
+ val tableHeaderId = executionTag
+
+ try {
+ new AuronExecutionPagedTable(
+ request,
+ parent,
+ executionData,
+ tableHeaderId,
+ executionTag,
+ UIUtils.prependBaseUri(request, parent.basePath),
+ "auron").table(executionPage)
+ } catch {
+ case e @ (_: IllegalArgumentException | _: IndexOutOfBoundsException) =>
+
+
Error while rendering execution table:
+
+ {Utils.exceptionString(e)}
+
+
+ }
+ }
+}
+
+private[ui] class AuronExecutionPagedTable(
+ request: HttpServletRequest,
+ parent: AuronSQLTab,
+ data: Seq[AuronSQLExecutionUIData],
+ tableHeaderId: String,
+ executionTag: String,
+ basePath: String,
+ subPath: String)
+ extends PagedTable[AuronExecutionTableRowData] {
+
+ private val (sortColumn, desc, pageSize) = getAuronTableParameters(request, executionTag, "ID")
+
+ private val encodedSortColumn = URLEncoder.encode(sortColumn, UTF_8.name())
+
+ override val dataSource = new AuronExecutionDataSource(data, pageSize, sortColumn, desc)
+
+ private val parameterPath =
+ s"$basePath/$subPath/?${getAuronParameterOtherTable(request, executionTag)}"
+
+ override def tableId: String = s"$executionTag-table"
+
+ override def tableCssClass: String =
+ "table table-bordered table-sm table-striped table-head-clickable table-cell-width-limited"
+
+ override def pageLink(page: Int): String = {
+ parameterPath +
+ s"&$pageNumberFormField=$page" +
+ s"&$executionTag.sort=$encodedSortColumn" +
+ s"&$executionTag.desc=$desc" +
+ s"&$pageSizeFormField=$pageSize" +
+ s"#$tableHeaderId"
+ }
+
+ /**
+ * Returns parameters of other tables in the page.
+ */
+ def getAuronParameterOtherTable(request: HttpServletRequest, tableTag: String): String = {
+ request.getParameterMap.asScala
+ .filterNot(_._1.startsWith(tableTag))
+ .map(parameter => parameter._1 + "=" + parameter._2(0))
+ .mkString("&")
+ }
+
+ /**
+ * Returns parameter of this table.
+ */
+ def getAuronTableParameters(
+ request: HttpServletRequest,
+ tableTag: String,
+ defaultSortColumn: String): (String, Boolean, Int) = {
+ val parameterSortColumn = request.getParameter(s"$tableTag.sort")
+ val parameterSortDesc = request.getParameter(s"$tableTag.desc")
+ val parameterPageSize = request.getParameter(s"$tableTag.pageSize")
+ val sortColumn = Option(parameterSortColumn)
+ .map { sortColumn =>
+ UIUtils.decodeURLParameter(sortColumn)
+ }
+ .getOrElse(defaultSortColumn)
+ val desc =
+ Option(parameterSortDesc).map(_.toBoolean).getOrElse(sortColumn == defaultSortColumn)
+ val pageSize = Option(parameterPageSize).map(_.toInt).getOrElse(100)
+
+ (sortColumn, desc, pageSize)
+ }
+
+ override def pageSizeFormField: String = s"$executionTag.pageSize"
+
+ override def pageNumberFormField: String = s"$executionTag.page"
+
+ override def goButtonFormPath: String =
+ s"$parameterPath&$executionTag.sort=$encodedSortColumn&$executionTag.desc=$desc#$tableHeaderId"
+
+ // Information for each header: title, sortable, tooltip
+ private val headerInfo: Seq[(String, Boolean, Option[String])] = {
+ Seq(
+ ("ID", true, None),
+ ("Description", true, None),
+ ("Num Auron Nodes", true, None),
+ ("Num Fallback Nodes", true, None))
+ }
+
+ override def headers: Seq[Node] = {
+ isAuronSortColumnValid(headerInfo, sortColumn)
+
+ headerAuronRow(
+ headerInfo,
+ desc,
+ pageSize,
+ sortColumn,
+ parameterPath,
+ executionTag,
+ tableHeaderId)
+ }
+
+ def headerAuronRow(
+ headerInfo: Seq[(String, Boolean, Option[String])],
+ desc: Boolean,
+ pageSize: Int,
+ sortColumn: String,
+ parameterPath: String,
+ tableTag: String,
+ headerId: String): Seq[Node] = {
+ val row: Seq[Node] = {
+ headerInfo.map { case (header, sortable, tooltip) =>
+ if (header == sortColumn) {
+ val headerLink = Unparsed(
+ parameterPath +
+ s"&$tableTag.sort=${URLEncoder.encode(header, UTF_8.name())}" +
+ s"&$tableTag.desc=${!desc}" +
+ s"&$tableTag.pageSize=$pageSize" +
+ s"#$headerId")
+ val arrow = if (desc) "▾" else "▴" // UP or DOWN
+
+
+
+
+ {header} {Unparsed(arrow)}
+
+
+ |
+ } else {
+ if (sortable) {
+ val headerLink = Unparsed(
+ parameterPath +
+ s"&$tableTag.sort=${URLEncoder.encode(header, UTF_8.name())}" +
+ s"&$tableTag.pageSize=$pageSize" +
+ s"#$headerId")
+
+
+
+
+ {header}
+
+
+ |
+ } else {
+
+
+ {header}
+
+ |
+ }
+ }
+ }
+ }
+
+
+ {row}
+
+
+ }
+
+ def isAuronSortColumnValid(
+ headerInfo: Seq[(String, Boolean, Option[String])],
+ sortColumn: String): Unit = {
+ if (!headerInfo.filter(_._2).map(_._1).contains(sortColumn)) {
+ throw new IllegalArgumentException(s"Unknown column: $sortColumn")
+ }
+ }
+
+ override def row(executionTableRow: AuronExecutionTableRowData): Seq[Node] = {
+ val executionUIData = executionTableRow.executionUIData
+
+
+ |
+ {executionUIData.executionId.toString}
+ |
+
+ {descriptionCell(executionUIData)}
+ |
+
+ {executionUIData.numAuronNodes.toString}
+ |
+
+ {executionUIData.numFallbackNodes.toString}
+ |
+
+ }
+
+ private def descriptionCell(execution: AuronSQLExecutionUIData): Seq[Node] = {
+ val details = if (execution.description != null && execution.description.nonEmpty) {
+ val concat = new PlanStringConcat()
+ concat.append("== Fallback Summary ==\n")
+ val fallbackSummary = execution.fallbackNodeToReason
+ .map { case (name, reason) =>
+ val id = name.substring(0, 3)
+ val nodeName = name.substring(4)
+ s"(${id.toInt}) $nodeName: $reason"
+ }
+ .mkString("\n")
+ concat.append(fallbackSummary)
+ if (execution.fallbackNodeToReason.isEmpty) {
+ concat.append("No fallback nodes")
+ }
+ concat.append("\n\n")
+ concat.append(execution.fallbackDescription)
+
+
+ +details
+ ++
+
+ } else {
+ Nil
+ }
+
+ val desc = if (execution.description != null && execution.description.nonEmpty) {
+
+ {execution.description}
+ } else {
+ {execution.executionId}
+ }
+
+ {desc}{details}
+ }
+
+ private def executionURL(executionID: Long): String =
+ s"${UIUtils.prependBaseUri(request, parent.basePath)}/SQL/execution/?id=$executionID"
+}
+
+private[ui] class AuronExecutionTableRowData(val executionUIData: AuronSQLExecutionUIData)
+
+private[ui] class AuronExecutionDataSource(
+ executionData: Seq[AuronSQLExecutionUIData],
+ pageSize: Int,
+ sortColumn: String,
+ desc: Boolean)
+ extends PagedDataSource[AuronExecutionTableRowData](pageSize) {
+
+ // Convert ExecutionData to ExecutionTableRowData which contains the final contents to show
+ // in the table so that we can avoid creating duplicate contents during sorting the data
+ private val data = executionData.map(executionRow).sorted(ordering(sortColumn, desc))
+
+ override def dataSize: Int = data.size
+
+ override def sliceData(from: Int, to: Int): Seq[AuronExecutionTableRowData] =
+ data.slice(from, to)
+
+ private def executionRow(
+ executionUIData: AuronSQLExecutionUIData): AuronExecutionTableRowData = {
+ new AuronExecutionTableRowData(executionUIData)
+ }
+
+ /** Return Ordering according to sortColumn and desc. */
+ private def ordering(
+ sortColumn: String,
+ desc: Boolean): Ordering[AuronExecutionTableRowData] = {
+ val ordering: Ordering[AuronExecutionTableRowData] = sortColumn match {
+ case "ID" => Ordering.by(_.executionUIData.executionId)
+ case "Description" => Ordering.by(_.executionUIData.fallbackDescription)
+ case "Num Auron Nodes" => Ordering.by(_.executionUIData.numAuronNodes)
+ case "Num Fallback Nodes" => Ordering.by(_.executionUIData.numFallbackNodes)
+ case unknownColumn => throw new IllegalArgumentException(s"Unknown column: $unknownColumn")
+ }
+ if (desc) {
+ ordering.reverse
+ } else {
+ ordering
+ }
+ }
}
diff --git a/auron-spark-ui/src/main/scala/org/apache/spark/sql/execution/ui/AuronEventUtils.scala b/auron-spark-ui/src/main/scala/org/apache/spark/sql/execution/ui/AuronEventUtils.scala
index e4a359ceb..5fdc669e1 100644
--- a/auron-spark-ui/src/main/scala/org/apache/spark/sql/execution/ui/AuronEventUtils.scala
+++ b/auron-spark-ui/src/main/scala/org/apache/spark/sql/execution/ui/AuronEventUtils.scala
@@ -21,6 +21,7 @@ import org.apache.spark.SparkContext
import org.apache.auron.spark.ui.AuronEvent
object AuronEventUtils {
+
def post(sc: SparkContext, event: AuronEvent): Unit = {
sc.listenerBus.post(event)
}
diff --git a/auron-spark-ui/src/main/scala/org/apache/spark/sql/execution/ui/AuronSQLAppStatusListener.scala b/auron-spark-ui/src/main/scala/org/apache/spark/sql/execution/ui/AuronSQLAppStatusListener.scala
index 0da16d4fd..b817ad903 100644
--- a/auron-spark-ui/src/main/scala/org/apache/spark/sql/execution/ui/AuronSQLAppStatusListener.scala
+++ b/auron-spark-ui/src/main/scala/org/apache/spark/sql/execution/ui/AuronSQLAppStatusListener.scala
@@ -16,17 +16,27 @@
*/
package org.apache.spark.sql.execution.ui
+import scala.collection.mutable
+
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.internal.Logging
import org.apache.spark.scheduler.{SparkListener, SparkListenerEvent}
-import org.apache.spark.status.ElementTrackingStore
+import org.apache.spark.sql.internal.StaticSQLConf.UI_RETAINED_EXECUTIONS
+import org.apache.spark.status.{ElementTrackingStore, KVUtils}
-import org.apache.auron.spark.ui.AuronBuildInfoEvent
+import org.apache.auron.spark.ui.{AuronBuildInfoEvent, AuronPlanFallbackEvent}
class AuronSQLAppStatusListener(conf: SparkConf, kvstore: ElementTrackingStore)
extends SparkListener
with Logging {
+ private val executionIdToDescription = new mutable.HashMap[Long, String]
+ private val executionIdToFallbackEvent = new mutable.HashMap[Long, AuronPlanFallbackEvent]
+
+ kvstore.addTrigger(classOf[SQLExecutionUIData], conf.get[Int](UI_RETAINED_EXECUTIONS)) {
+ count => cleanupExecutions(count)
+ }
+
def getAuronBuildInfo(): Long = {
kvstore.count(classOf[AuronBuildInfoUIData])
}
@@ -36,11 +46,61 @@ class AuronSQLAppStatusListener(conf: SparkConf, kvstore: ElementTrackingStore)
kvstore.write(uiData)
}
+ private def onAuronPlanFallback(event: AuronPlanFallbackEvent): Unit = {
+ val description = executionIdToDescription.get(event.executionId)
+ if (description.isDefined) {
+ val uiData = new AuronSQLExecutionUIData(
+ event.executionId,
+ description.get,
+ event.numAuronNodes,
+ event.numFallbackNodes,
+ event.physicalPlanDescription,
+ event.fallbackNodeToReason.toSeq.sortBy(_._1))
+ kvstore.write(uiData)
+ } else {
+ executionIdToFallbackEvent.put(event.executionId, event.copy())
+ }
+ }
+
+ private def onSQLExecutionStart(event: SparkListenerSQLExecutionStart): Unit = {
+ val fallbackEvent = executionIdToFallbackEvent.get(event.executionId)
+ if (fallbackEvent.isDefined) {
+ val uiData = new AuronSQLExecutionUIData(
+ fallbackEvent.get.executionId,
+ event.description,
+ fallbackEvent.get.numAuronNodes,
+ fallbackEvent.get.numFallbackNodes,
+ fallbackEvent.get.physicalPlanDescription,
+ fallbackEvent.get.fallbackNodeToReason.toSeq.sortBy(_._1))
+ kvstore.write(uiData)
+ executionIdToFallbackEvent.remove(event.executionId)
+ }
+ executionIdToDescription.put(event.executionId, event.description)
+ }
+
+ private def onSQLExtensionEnd(event: SparkListenerSQLExecutionEnd): Unit = {
+ executionIdToDescription.remove(event.executionId)
+ executionIdToFallbackEvent.remove(event.executionId)
+ }
+
override def onOtherEvent(event: SparkListenerEvent): Unit = event match {
+ case e: SparkListenerSQLExecutionStart => onSQLExecutionStart(e)
+ case e: SparkListenerSQLExecutionEnd => onSQLExtensionEnd(e)
case e: AuronBuildInfoEvent => onAuronBuildInfo(e)
+ case e: AuronPlanFallbackEvent => onAuronPlanFallback(e)
case _ => // Ignore
}
+ private def cleanupExecutions(count: Long): Unit = {
+ val countToDelete = count - conf.get(UI_RETAINED_EXECUTIONS)
+ if (countToDelete <= 0) {
+ return
+ }
+
+ val view = kvstore.view(classOf[AuronSQLExecutionUIData]).first(0L)
+ val toDelete = KVUtils.viewToSeq(view, countToDelete.toInt)(_ => true)
+ toDelete.foreach(e => kvstore.delete(e.getClass(), e.executionId))
+ }
}
object AuronSQLAppStatusListener {
def register(sc: SparkContext): Unit = {
diff --git a/auron-spark-ui/src/main/scala/org/apache/spark/sql/execution/ui/AuronSQLAppStatusStore.scala b/auron-spark-ui/src/main/scala/org/apache/spark/sql/execution/ui/AuronSQLAppStatusStore.scala
index 3fc4beb69..fd4721505 100644
--- a/auron-spark-ui/src/main/scala/org/apache/spark/sql/execution/ui/AuronSQLAppStatusStore.scala
+++ b/auron-spark-ui/src/main/scala/org/apache/spark/sql/execution/ui/AuronSQLAppStatusStore.scala
@@ -16,17 +16,54 @@
*/
package org.apache.spark.sql.execution.ui
+import scala.collection.JavaConverters.asScalaIteratorConverter
+
import com.fasterxml.jackson.annotation.JsonIgnore
-import org.apache.spark.util.kvstore.{KVIndex, KVStore}
+import org.apache.spark.status.KVUtils.KVIndexParam
+import org.apache.spark.util.Utils
+import org.apache.spark.util.kvstore.{KVIndex, KVStore, KVStoreView}
class AuronSQLAppStatusStore(store: KVStore) {
+ private def viewToSeq[T](view: KVStoreView[T]): Seq[T] = {
+ Utils.tryWithResource(view.closeableIterator())(iter => iter.asScala.toList)
+ }
+
+ def executionsList(): Seq[AuronSQLExecutionUIData] = {
+ viewToSeq(store.view(classOf[AuronSQLExecutionUIData]))
+ }
+
def buildInfo(): AuronBuildInfoUIData = {
val kClass = classOf[AuronBuildInfoUIData]
store.read(kClass, kClass.getName)
}
+
+ def executionsList(offset: Int, length: Int): Seq[AuronSQLExecutionUIData] = {
+ viewToSeq(store.view(classOf[AuronSQLExecutionUIData]).skip(offset).max(length))
+ }
+
+ def execution(executionId: Long): Option[AuronSQLExecutionUIData] = {
+ try {
+ Some(store.read(classOf[AuronSQLExecutionUIData], executionId))
+ } catch {
+ case _: NoSuchElementException => None
+ }
+ }
+
+ def executionsCount(): Long = {
+ store.count(classOf[AuronSQLExecutionUIData])
+ }
}
+@KVIndex("executionId")
+class AuronSQLExecutionUIData(
+ @KVIndexParam val executionId: Long,
+ val description: String,
+ val numAuronNodes: Int,
+ val numFallbackNodes: Int,
+ val fallbackDescription: String,
+ val fallbackNodeToReason: Seq[(String, String)]) {}
+
class AuronBuildInfoUIData(val info: Seq[(String, String)]) {
@JsonIgnore
@KVIndex
diff --git a/spark-extension-shims-spark/src/test/scala/org/apache/spark/sql/auron/BuildinfoInSparkUISuite.scala b/spark-extension-shims-spark/src/test/scala/org/apache/spark/sql/auron/BuildinfoInSparkUISuite.scala
index 833ea7193..95d68158a 100644
--- a/spark-extension-shims-spark/src/test/scala/org/apache/spark/sql/auron/BuildinfoInSparkUISuite.scala
+++ b/spark-extension-shims-spark/src/test/scala/org/apache/spark/sql/auron/BuildinfoInSparkUISuite.scala
@@ -16,8 +16,16 @@
*/
package org.apache.spark.sql.auron
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.spark.scheduler.{SparkListener, SparkListenerEvent}
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.execution.ExplainUtils
+import org.apache.spark.sql.execution.auron.plan.NativeParquetScanExec
import org.apache.spark.sql.execution.ui.AuronSQLAppStatusListener
+import org.apache.auron.spark.ui.AuronPlanFallbackEvent
+
class BuildinfoInSparkUISuite
extends org.apache.spark.sql.QueryTest
with BuildInfoAuronSQLSuite
diff --git a/spark-extension/src/main/scala/org/apache/spark/sql/auron/AuronExplainUtils.scala b/spark-extension/src/main/scala/org/apache/spark/sql/auron/AuronExplainUtils.scala
new file mode 100644
index 000000000..c01222c6b
--- /dev/null
+++ b/spark-extension/src/main/scala/org/apache/spark/sql/auron/AuronExplainUtils.scala
@@ -0,0 +1,270 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.auron
+
+import java.util.Collections.newSetFromMap
+
+import scala.collection.mutable
+import scala.collection.mutable.{ArrayBuffer, BitSet}
+
+import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.auron.AuronConvertStrategy.neverConvertReasonTag
+import org.apache.spark.sql.catalyst.expressions.{Expression, PlanExpression}
+import org.apache.spark.sql.catalyst.plans.QueryPlan
+import org.apache.spark.sql.catalyst.trees.TreeNodeTag
+import org.apache.spark.sql.execution.{BaseSubqueryExec, InputAdapter, ReusedSubqueryExec, SparkPlan, WholeStageCodegenExec}
+import org.apache.spark.sql.execution.ExplainUtils.getOpId
+import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanExec, QueryStageExec}
+import org.apache.spark.sql.execution.command.ExecutedCommandExec
+import org.apache.spark.sql.execution.exchange.{Exchange, ReusedExchangeExec}
+
+import org.apache.auron.sparkver
+
+object AuronExplainUtils {
+ private def generateOperatorIDs(
+ plan: QueryPlan[_],
+ startOperatorID: Int,
+ visited: java.util.Set[QueryPlan[_]],
+ reusedExchanges: ArrayBuffer[ReusedExchangeExec],
+ addReusedExchanges: Boolean): Int = {
+ var currentOperationID = startOperatorID
+ if (plan.isInstanceOf[BaseSubqueryExec]) {
+ return currentOperationID
+ }
+
+ def setOpId(plan: QueryPlan[_]): Unit = if (!visited.contains(plan)) {
+ plan match {
+ case r: ReusedExchangeExec if addReusedExchanges =>
+ reusedExchanges.append(r)
+ case _ =>
+ }
+ visited.add(plan)
+ currentOperationID += 1
+ plan.setTagValue(TreeNodeTag[Int]("operatorId"), currentOperationID)
+ }
+
+ plan.foreachUp {
+ case _: WholeStageCodegenExec =>
+ case _: InputAdapter =>
+ case p: AdaptiveSparkPlanExec =>
+ currentOperationID = generateOperatorIDs(
+ p.executedPlan,
+ currentOperationID,
+ visited,
+ reusedExchanges,
+ addReusedExchanges)
+ setOpId(p)
+ case p: QueryStageExec =>
+ currentOperationID = generateOperatorIDs(
+ p.plan,
+ currentOperationID,
+ visited,
+ reusedExchanges,
+ addReusedExchanges)
+ setOpId(p)
+ case other: QueryPlan[_] =>
+ setOpId(other)
+ currentOperationID = other.innerChildren.foldLeft(currentOperationID) { (curId, plan) =>
+ generateOperatorIDs(plan, curId, visited, reusedExchanges, addReusedExchanges)
+ }
+ }
+ currentOperationID
+ }
+
+ private def getSubqueries(
+ plan: => QueryPlan[_],
+ subqueries: ArrayBuffer[(SparkPlan, Expression, BaseSubqueryExec)]): Unit = {
+ plan.foreach {
+ case a: AdaptiveSparkPlanExec =>
+ getSubqueries(a.executedPlan, subqueries)
+ case q: QueryStageExec =>
+ getSubqueries(q.plan, subqueries)
+ case p: SparkPlan =>
+ p.expressions.foreach(_.collect { case e: PlanExpression[_] =>
+ e.plan match {
+ case s: BaseSubqueryExec =>
+ subqueries += ((p, e, s))
+ getSubqueries(s, subqueries)
+ case _ =>
+ }
+ })
+ }
+ }
+
+ private def processPlanSkippingSubqueries[T <: QueryPlan[T]](
+ plan: T,
+ append: String => Unit,
+ collectedOperators: BitSet): Unit = {
+ try {
+
+ QueryPlan.append(plan, append, verbose = false, addSuffix = false, printOperatorId = true)
+
+ append("\n")
+ } catch {
+ case e: AnalysisException => append(e.toString)
+ }
+ }
+
+ private def collectFallbackNodes(plan: QueryPlan[_]): (Int, Map[String, String]) = {
+ var numAuronNodes = 0
+ val fallbackNodeToReason = new mutable.HashMap[String, String]
+
+ def collect(tmp: QueryPlan[_]): Unit = {
+ tmp.foreachUp {
+ case p: ExecutedCommandExec =>
+ handleVanillaSparkPlan(p, fallbackNodeToReason)
+ case p: AdaptiveSparkPlanExec =>
+ handleVanillaSparkPlan(p, fallbackNodeToReason)
+ collect(p.executedPlan)
+ case p: QueryStageExec =>
+ handleVanillaSparkPlan(p, fallbackNodeToReason)
+ collect(p.plan)
+ case p: NativeSupports =>
+ numAuronNodes += 1
+ p.innerChildren.foreach(collect)
+ case p: SparkPlan =>
+ handleVanillaSparkPlan(p, fallbackNodeToReason)
+ p.innerChildren.foreach(collect)
+ case _ =>
+ }
+ }
+
+ collect(plan)
+ (numAuronNodes, fallbackNodeToReason.toMap)
+ }
+
+ def handleVanillaSparkPlan(
+ p: SparkPlan,
+ fallbackNodeToReason: mutable.HashMap[String, String]): Unit = {
+ if (p.getTagValue(neverConvertReasonTag).isDefined) {
+ addFallbackNodeWithReason(p, p.getTagValue(neverConvertReasonTag).get, fallbackNodeToReason)
+ }
+ }
+
+ def addFallbackNodeWithReason(
+ p: SparkPlan,
+ reason: String,
+ fallbackNodeToReason: mutable.HashMap[String, String]): Unit = {
+ p.getTagValue(TreeNodeTag[Int]("operatorId")).foreach { opId =>
+ // e.g., 002 project, it is used to help analysis by `substring(4)`
+ val formattedNodeName = f"$opId%03d ${p.nodeName}"
+ fallbackNodeToReason.put(formattedNodeName, reason)
+ }
+ }
+
+ def processPlan[T <: QueryPlan[T]](
+ plan: T,
+ append: String => Unit,
+ collectFallbackFunc: Option[QueryPlan[_] => (Int, Map[String, String])] = None)
+ : (Int, Map[String, String]) = synchronized {
+ try {
+ val operators = newSetFromMap[QueryPlan[_]](new java.util.IdentityHashMap())
+ val reusedExchanges = ArrayBuffer.empty[ReusedExchangeExec]
+
+ var currentOperatorID = 0
+ currentOperatorID =
+ generateOperatorIDs(plan, currentOperatorID, operators, reusedExchanges, true)
+
+ val subqueries = ArrayBuffer.empty[(SparkPlan, Expression, BaseSubqueryExec)]
+ getSubqueries(plan, subqueries)
+
+ currentOperatorID = subqueries.foldLeft(currentOperatorID) { (curId, plan) =>
+ generateOperatorIDs(plan._3.child, curId, operators, reusedExchanges, true)
+ }
+
+ val optimizedOutExchanges = ArrayBuffer.empty[Exchange]
+ reusedExchanges.foreach { reused =>
+ val child = reused.child
+ if (!operators.contains(child)) {
+ optimizedOutExchanges.append(child)
+ currentOperatorID =
+ generateOperatorIDs(child, currentOperatorID, operators, reusedExchanges, false)
+ }
+ }
+
+ val collectedOperators = mutable.BitSet.empty
+ processPlanSkippingSubqueries(plan, append, collectedOperators)
+
+ var i = 0
+ for (sub <- subqueries) {
+ if (i == 0) {
+ append("\n===== Subqueries =====\n\n")
+ }
+ i = i + 1
+ append(
+ s"Subquery:$i Hosting operator id = " +
+ s"${getOpId(sub._1)} Hosting Expression = ${sub._2}\n")
+
+ if (!sub._3.isInstanceOf[ReusedSubqueryExec]) {
+ processPlanSkippingSubqueries(sub._3.child, append, collectedOperators)
+ }
+ append("\n")
+ }
+
+ i = 0
+ optimizedOutExchanges.foreach { exchange =>
+ if (i == 0) {
+ append("\n===== Adaptively Optimized Out Exchanges =====\n\n")
+ }
+ i = i + 1
+ append(s"Subplan:$i\n")
+ processPlanSkippingSubqueries[SparkPlan](exchange, append, collectedOperators)
+ append("\n")
+ }
+
+ (subqueries.filter(!_._3.isInstanceOf[ReusedSubqueryExec]).map(_._3.child) :+ plan)
+ .map { plan =>
+ if (collectFallbackFunc.isEmpty) {
+ collectFallbackNodes(plan)
+ } else {
+ collectFallbackFunc.get.apply(plan)
+ }
+ }
+ .reduce((a, b) => (a._1 + b._1, a._2 ++ b._2))
+ } finally {
+ removeTags(plan)
+ }
+ }
+
+ @sparkver("3.1/ 3.2 / 3.3/ 3.4/ 3.5")
+ private def removeTags(plan: QueryPlan[_]): Unit = {
+ def remove(p: QueryPlan[_], children: Seq[QueryPlan[_]]): Unit = {
+ p.unsetTagValue(TreeNodeTag[Int]("operatorId"))
+ children.foreach(removeTags)
+ }
+
+ plan.foreach {
+ case p: AdaptiveSparkPlanExec => remove(p, Seq(p.executedPlan))
+ case p: QueryStageExec => remove(p, Seq(p.plan))
+ case plan: QueryPlan[_] => remove(plan, plan.innerChildren)
+ }
+ }
+
+ @sparkver("3.0")
+ private def removeTags(plan: QueryPlan[_]): Unit = {
+ def remove(p: QueryPlan[_], children: Seq[QueryPlan[_]]): Unit = {
+ p.unsetTagValue(TreeNodeTag[Int]("operatorId"))
+ children.foreach(removeTags)
+ }
+
+ plan.foreach {
+ case p: AdaptiveSparkPlanExec => remove(p, Seq(p.executedPlan, p.initialPlan))
+ case p: QueryStageExec => remove(p, Seq(p.plan))
+ case plan: QueryPlan[_] => remove(plan, plan.innerChildren)
+ }
+ }
+}
diff --git a/spark-extension/src/main/scala/org/apache/spark/sql/auron/AuronSparkSessionExtension.scala b/spark-extension/src/main/scala/org/apache/spark/sql/auron/AuronSparkSessionExtension.scala
index 2e46a88f0..5253f8308 100644
--- a/spark-extension/src/main/scala/org/apache/spark/sql/auron/AuronSparkSessionExtension.scala
+++ b/spark-extension/src/main/scala/org/apache/spark/sql/auron/AuronSparkSessionExtension.scala
@@ -22,11 +22,13 @@ import org.apache.spark.internal.config.ConfigEntry
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.SparkSessionExtensions
import org.apache.spark.sql.catalyst.rules.Rule
-import org.apache.spark.sql.execution.ColumnarRule
-import org.apache.spark.sql.execution.LocalTableScanExec
-import org.apache.spark.sql.execution.SparkPlan
+import org.apache.spark.sql.catalyst.util.StringUtils.PlanStringConcat
+import org.apache.spark.sql.execution.{ColumnarRule, LocalTableScanExec, SparkPlan, SQLExecution}
+import org.apache.spark.sql.execution.ui.AuronEventUtils
import org.apache.spark.sql.internal.SQLConf
+import org.apache.auron.spark.ui.AuronPlanFallbackEvent
+
class AuronSparkSessionExtension extends (SparkSessionExtensions => Unit) with Logging {
Shims.get.initExtension()
@@ -94,4 +96,33 @@ case class AuronColumnarOverrides(sparkSession: SparkSession) extends ColumnarRu
}
}
}
+
+ override def postColumnarTransitions: Rule[SparkPlan] = {
+ new Rule[SparkPlan] {
+ override def apply(sparkPlan: SparkPlan): SparkPlan = {
+ if (SparkEnv.get.conf.get(AuronConf.UI_ENABLED.key, "true").equals("true")) {
+ val sc = sparkSession.sparkContext
+ val executionId = sc.getLocalProperty(SQLExecution.EXECUTION_ID_KEY)
+ if (executionId == null) {
+ logDebug(s"Unknown execution id for plan: $sparkPlan")
+ return sparkPlan
+ }
+ val concat = new PlanStringConcat()
+ concat.append("== Physical Plan ==\n")
+
+ val (numAuronNodes, fallbackNodeToReason) =
+ AuronExplainUtils.processPlan(sparkPlan, concat.append)
+
+ val event = AuronPlanFallbackEvent(
+ executionId.toLong,
+ numAuronNodes,
+ fallbackNodeToReason.size,
+ concat.toString(),
+ fallbackNodeToReason)
+ AuronEventUtils.post(sc, event)
+ }
+ sparkPlan
+ }
+ }
+ }
}
diff --git a/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeParquetInsertIntoHiveTableBase.scala b/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeParquetInsertIntoHiveTableBase.scala
index b2ffa53a0..d43f7d17d 100644
--- a/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeParquetInsertIntoHiveTableBase.scala
+++ b/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeParquetInsertIntoHiveTableBase.scala
@@ -69,7 +69,8 @@ abstract class NativeParquetInsertIntoHiveTableBase(
.filterKeys(Set("stage_id", "output_rows", "elapsed_compute"))
.toSeq
:+ ("io_time", SQLMetrics.createNanoTimingMetric(sparkContext, "Native.io_time"))
- :+ ("bytes_written", SQLMetrics
+ :+ ("bytes_written",
+ SQLMetrics
.createSizeMetric(sparkContext, "Native.bytes_written")): _*)
def check(): Unit = {