Skip to content

Commit

Permalink
set status.OK on applications that didn't fail
Browse files Browse the repository at this point in the history
Issue: #12

This requires some more testing.
  • Loading branch information
barend-xebia committed Dec 10, 2024
1 parent 60da5de commit e9e631c
Show file tree
Hide file tree
Showing 3 changed files with 57 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package com.xebia.data.spot

import com.xebia.data.spot.TelemetrySparkListener.ApplicationSpan
import io.opentelemetry.api.common.Attributes
import io.opentelemetry.api.trace.{Span, StatusCode}
import io.opentelemetry.context.{Context, Scope}
Expand Down Expand Up @@ -54,7 +55,7 @@ class TelemetrySparkListener(val sparkConf: SparkConf) extends SparkListener wit

override def spotConfig: Map[String, String] = sparkConf.getAll.toMap

private var applicationSpan: Option[PendingSpan] = None
private var applicationSpan: Option[ApplicationSpan] = None
private val jobSpans = mutable.Map[Int, PendingSpan]()
private val stageIdToJobId = mutable.Map[Int, Int]()

Expand All @@ -77,12 +78,16 @@ class TelemetrySparkListener(val sparkConf: SparkConf) extends SparkListener wit
val span = sb.startSpan()
val scope = span.makeCurrent()
val context = span.storeInContext(rootContext._1)
applicationSpan = Some((span, context, scope))
applicationSpan = Some(ApplicationSpan(span, context, scope))
}

override def onApplicationEnd(event: SparkListenerApplicationEnd): Unit = {
applicationSpan
.map { case (span, _, scope) =>
.map { case ApplicationSpan(span, _, scope, status) =>
status match {
case StatusCode.UNSET => span.setStatus(StatusCode.OK)
case _ => span.setStatus(status)
}
span.end()
scope.close()
}
Expand All @@ -94,7 +99,7 @@ class TelemetrySparkListener(val sparkConf: SparkConf) extends SparkListener wit
}

override def onJobStart(event: SparkListenerJobStart): Unit = {
applicationSpan.foreach { case (_, parentContext, _) =>
applicationSpan.foreach { case ApplicationSpan(_, parentContext, _, _) =>
val span = tracer
.spanBuilder("job-%05d".format(event.jobId))
.setParent(parentContext)
Expand All @@ -113,6 +118,8 @@ class TelemetrySparkListener(val sparkConf: SparkConf) extends SparkListener wit
case JobSucceeded => span.setStatus(StatusCode.OK)
case jobFailed: Any =>
// The JobFailed(e) case class is private[spark], therefore we can't use span.recordException(e).
// TODO test with a Spark Job that succeeds after a task retry, what does that look like?
applicationSpan = applicationSpan.map(_.failed())
span.setStatus(StatusCode.ERROR, jobFailed.toString)
}
span.setAttribute(atts.jobTime, Long.box(event.time))
Expand Down Expand Up @@ -143,4 +150,7 @@ class TelemetrySparkListener(val sparkConf: SparkConf) extends SparkListener wit
object TelemetrySparkListener {
type PendingContext = (Context, Scope)
type PendingSpan = (Span, Context, Scope)
case class ApplicationSpan(span: Span, context: Context, scope: Scope, status: StatusCode = StatusCode.UNSET) {
def failed(): ApplicationSpan = this.copy(status = StatusCode.ERROR)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package com.xebia.data.spot

import com.xebia.data.spot.TestingSdkProvider.testingSdk
import io.opentelemetry.api.OpenTelemetry
import io.opentelemetry.api.trace.StatusCode
import io.opentelemetry.api.trace.propagation.W3CTraceContextPropagator
import io.opentelemetry.context.propagation.ContextPropagators
import io.opentelemetry.sdk.OpenTelemetrySdk
Expand All @@ -33,7 +34,8 @@ import org.apache.spark.scheduler.{
SparkListenerApplicationEnd,
SparkListenerApplicationStart,
SparkListenerJobEnd,
SparkListenerJobStart
SparkListenerJobStart,
SparkPrivatesAccessor
}
import org.scalatest.BeforeAndAfterEach
import org.scalatest.flatspec.AnyFlatSpecLike
Expand Down Expand Up @@ -71,13 +73,33 @@ class TelemetrySparkListenerTest extends AnyFlatSpecLike with BeforeAndAfterEach
.hasAttribute(TelemetrySpanAttributes.appName, "testapp")
.hasAttribute(TelemetrySpanAttributes.appAttemptId, "1")
.hasAttribute(TelemetrySpanAttributes.sparkUser, "User")
.hasStatus(StatusData.ok())

assertThat(jobSpan).isSampled.hasEnded
.hasParent(appSpan)
.hasName("job-00001")
.hasStatus(StatusData.ok())
}

it should "set a negative status on the ApplicationSpan if a Job fails" in new TestTelemetrySparkListener() {
tsl.onApplicationStart(SparkListenerApplicationStart("testapp", Some("ta123"), 100L, "User", Some("1"), None, None))
advanceTimeBy(Duration.ofMillis(200))
tsl.onJobStart(SparkListenerJobStart(1, 200L, Seq.empty, null))
advanceTimeBy(Duration.ofMillis(5000))
tsl.onJobEnd(SparkListenerJobEnd(1, 5000, SparkPrivatesAccessor.jobFailed(new IllegalStateException("test"))))
advanceTimeBy(Duration.ofMillis(100))
tsl.onApplicationEnd(SparkListenerApplicationEnd(5200L))

val spans = getFinishedSpanItems
spans should have length (2)
val appSpan = spans.get(1)
val jobSpan = spans.get(0)
assertThat(appSpan).hasEnded.hasStatus(StatusData.error())
assertThat(jobSpan).hasEnded.hasStatus(
StatusData.create(StatusCode.ERROR, "JobFailed(java.lang.IllegalStateException: test)")
)
}

it should "get traceId from config if provided" in new TestTelemetrySparkListener(
"spark.com.xebia.data.spot.traceparent" -> "00-0af7651916cd43dd8448eb211c80319c-b7ad6b7169203331-01"
) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
/*
* Copyright 2024 Xebia Netherlands
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.scheduler

object SparkPrivatesAccessor {
def jobFailed(e: Exception): JobResult = JobFailed(e)
}

0 comments on commit e9e631c

Please sign in to comment.