Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: Re-introduce trace spans for timed actions and consumers #113

Merged
merged 1 commit into from
Dec 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions akka-javasdk/src/main/scala/akka/javasdk/impl/SdkRunner.scala
Original file line number Diff line number Diff line change
Expand Up @@ -494,6 +494,7 @@ private final class Sdk(
val timedActionClass = clz.asInstanceOf[Class[TimedAction]]
val timedActionSpi =
new TimedActionImpl[TimedAction](
componentId,
() => wiredInstance(timedActionClass)(sideEffectingComponentInjects(None)),
timedActionClass,
system.classicSystem,
Expand All @@ -510,6 +511,7 @@ private final class Sdk(
val consumerClass = clz.asInstanceOf[Class[Consumer]]
val timedActionSpi =
new ConsumerImpl[Consumer](
componentId,
() => wiredInstance(consumerClass)(sideEffectingComponentInjects(None)),
consumerClass,
system.classicSystem,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,9 @@
package akka.javasdk.impl.consumer

import java.util.Optional

import scala.concurrent.ExecutionContext
import scala.concurrent.Future
import scala.util.control.NonFatal

import akka.actor.ActorSystem
import akka.annotation.InternalApi
import akka.javasdk.Metadata
Expand All @@ -19,14 +17,17 @@ import akka.javasdk.consumer.MessageContext
import akka.javasdk.consumer.MessageEnvelope
import akka.javasdk.impl.AbstractContext
import akka.javasdk.impl.ComponentDescriptor
import akka.javasdk.impl.ComponentType
import akka.javasdk.impl.ErrorHandling
import akka.javasdk.impl.MetadataImpl
import akka.javasdk.impl.consumer.ConsumerEffectImpl.AsyncEffect
import akka.javasdk.impl.consumer.ConsumerEffectImpl.IgnoreEffect
import akka.javasdk.impl.consumer.ConsumerEffectImpl.ProduceEffect
import akka.javasdk.impl.serialization.JsonSerializer
import akka.javasdk.impl.telemetry.ConsumerCategory
import akka.javasdk.impl.telemetry.SpanTracingImpl
import akka.javasdk.impl.telemetry.Telemetry
import akka.javasdk.impl.telemetry.TraceInstrumentation
import akka.javasdk.impl.timer.TimerSchedulerImpl
import akka.javasdk.timer.TimerScheduler
import akka.runtime.sdk.spi.BytesPayload
Expand All @@ -44,6 +45,7 @@ import org.slf4j.MDC
/** EndMarker */
@InternalApi
private[impl] final class ConsumerImpl[C <: Consumer](
componentId: String,
val factory: () => C,
consumerClass: Class[C],
_system: ActorSystem,
Expand All @@ -59,6 +61,7 @@ private[impl] final class ConsumerImpl[C <: Consumer](

private implicit val executionContext: ExecutionContext = sdkExecutionContext
implicit val system: ActorSystem = _system
private val traceInstrumentation = new TraceInstrumentation(componentId, ConsumerCategory, tracerFactory)

private def createRouter(): ReflectiveConsumerRouter[C] =
new ReflectiveConsumerRouter[C](
Expand All @@ -68,15 +71,20 @@ private[impl] final class ConsumerImpl[C <: Consumer](
ignoreUnknown)

override def handleMessage(message: Message): Future[Effect] = {
val span: Option[Span] = None //FIXME add intrumentation
val metadata = MetadataImpl.of(message.metadata)

// FIXME would be good if we could record the chosen method in the span
val span: Option[Span] =
traceInstrumentation.buildSpan(ComponentType.Consumer, componentId, metadata.subjectScala, message.metadata)
val updatedMetadata = span.map(metadata.withTracing).getOrElse(metadata)

span.foreach(s => MDC.put(Telemetry.TRACE_ID, s.getSpanContext.getTraceId))
val fut =
try {
val messageContext = createMessageContext(message, span)
val messageContext = new MessageContextImpl(updatedMetadata, timerClient, tracerFactory, span)
val payload: BytesPayload = message.payload.getOrElse(throw new IllegalArgumentException("No message payload"))
val effect = createRouter()
.handleCommand(MessageEnvelope.of(payload, messageContext.metadata()), messageContext)
.handleCommand(MessageEnvelope.of(payload, messageContext.metadata), messageContext)
toSpiEffect(message, effect)
} catch {
case NonFatal(ex) =>
Expand All @@ -90,12 +98,6 @@ private[impl] final class ConsumerImpl[C <: Consumer](
}
}

private def createMessageContext(message: Message, span: Option[Span]): MessageContext = {
val metadata = MetadataImpl.of(message.metadata)
val updatedMetadata = span.map(metadata.withTracing).getOrElse(metadata)
new MessageContextImpl(updatedMetadata, timerClient, tracerFactory, span)
}

private def toSpiEffect(message: Message, effect: Consumer.Effect): Future[Effect] = {
effect match {
case ProduceEffect(msg, metadata) =>
Expand All @@ -119,7 +121,8 @@ private[impl] final class ConsumerImpl[C <: Consumer](
private def handleUnexpectedException(message: Message, ex: Throwable): Effect = {
ErrorHandling.withCorrelationId { correlationId =>
log.error(
s"Failure during handling message [${message.name}] from Consumer component [${consumerClass.getSimpleName}].",
s"Failure during handling message of type [${message.payload.fold("none")(
_.contentType)}] from Consumer component [${consumerClass.getSimpleName}].",
ex)
protocolFailure(correlationId)
}
Expand All @@ -145,7 +148,7 @@ private[impl] final class MessageContextImpl(
override val metadata: Metadata,
timerClient: TimerClient,
tracerFactory: () => Tracer,
span: Option[Span])
val span: Option[Span])
extends AbstractContext
with MessageContext {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ private[impl] final class EventSourcedEntityImpl[S, E, ES <: EventSourcedEntity[
command: SpiEntity.Command): Future[SpiEventSourcedEntity.Effect] = {

val span: Option[Span] =
traceInstrumentation.buildSpan(ComponentType.EventSourcedEntity, componentId, entityId, command)
traceInstrumentation.buildEntityCommandSpan(ComponentType.EventSourcedEntity, componentId, entityId, command)
span.foreach(s => MDC.put(Telemetry.TRACE_ID, s.getSpanContext.getTraceId))
// smuggling 0 arity method called from component client through here
val cmdPayload = command.payload.getOrElse(BytesPayload.empty)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ private[impl] final class KeyValueEntityImpl[S, KV <: KeyValueEntity[S]](
command: SpiEntity.Command): Future[SpiEventSourcedEntity.Effect] = {

val span: Option[Span] =
traceInstrumentation.buildSpan(ComponentType.KeyValueEntity, componentId, entityId, command)
traceInstrumentation.buildEntityCommandSpan(ComponentType.KeyValueEntity, componentId, entityId, command)
span.foreach(s => MDC.put(Telemetry.TRACE_ID, s.getSpanContext.getTraceId))
// smuggling 0 arity method called from component client through here
val cmdPayload = command.payload.getOrElse(BytesPayload.empty)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,8 @@ case object ConsumerCategory extends ComponentCategory {
* INTERNAL API
*/
@InternalApi
case object ActionCategory extends ComponentCategory {
def name = "Action"
case object TimedActionCategory extends ComponentCategory {
def name = "TimedAction"
}

/**
Expand Down Expand Up @@ -137,20 +137,31 @@ private[akka] final class TraceInstrumentation(
private val enabled = tracer != OpenTelemetry.noop().getTracer(InstrumentationScopeName)

/**
* Creates a span if it finds a trace parent in the command's metadata
* Creates a span if tracing enabled and it finds a trace parent in the command's metadata
*/
def buildSpan(
def buildEntityCommandSpan(
componentType: String,
componentId: String,
entityId: String,
command: SpiEntity.Command): Option[Span] =
if (enabled) internalBuildSpan(componentType, componentId, command.name, command.metadata, Some(entityId))
if (enabled) internalBuildSpan(componentType, componentId, Some(command.name), command.metadata, Some(entityId))
else None

/**
* Creates a span if tracing enabled and if it finds a trace parent in the command's metadata
*/
def buildSpan(
componentType: String,
componentId: String,
subjectId: Option[String],
spiMetadata: SpiMetadata): Option[Span] =
if (enabled) internalBuildSpan(componentType, componentId, None, spiMetadata, subjectId)
else None

private def internalBuildSpan(
componentType: String,
componentId: String,
commandName: String,
commandName: Option[String],
commandMetadata: SpiMetadata,
subjectId: Option[String]): Option[Span] = {
// only if there is a trace parent in the metadata
Expand All @@ -159,7 +170,7 @@ private[akka] final class TraceInstrumentation(
val parentContext = propagator.getTextMapPropagator
.extract(OtelContext.current(), traceParentMetadataEntry, metadataEntryTraceParentGetter)

val spanName = s"$traceNamePrefix.${removeSyntheticName(commandName)}"
val spanName = s"$traceNamePrefix${commandName.fold("")("." + _)}"
var spanBuilder =
tracer
.spanBuilder(spanName)
Expand All @@ -172,9 +183,4 @@ private[akka] final class TraceInstrumentation(
}
}

private def removeSyntheticName(maybeSyntheticName: String): String =
maybeSyntheticName
.replace("KalixSyntheticMethodOnES", "")
.replace("KalixSyntheticMethodOnTopic", "")
.replace("KalixSyntheticMethodOnStream", "")
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,18 +7,20 @@ package akka.javasdk.impl.timedaction
import scala.concurrent.ExecutionContext
import scala.concurrent.Future
import scala.util.control.NonFatal

import akka.actor.ActorSystem
import akka.annotation.InternalApi
import akka.javasdk.Metadata
import akka.javasdk.Tracing
import akka.javasdk.impl.AbstractContext
import akka.javasdk.impl.ComponentDescriptor
import akka.javasdk.impl.ComponentType
import akka.javasdk.impl.ErrorHandling
import akka.javasdk.impl.MetadataImpl
import akka.javasdk.impl.serialization.JsonSerializer
import akka.javasdk.impl.telemetry.SpanTracingImpl
import akka.javasdk.impl.telemetry.Telemetry
import akka.javasdk.impl.telemetry.TimedActionCategory
import akka.javasdk.impl.telemetry.TraceInstrumentation
import akka.javasdk.impl.timedaction.TimedActionEffectImpl.AsyncEffect
import akka.javasdk.impl.timedaction.TimedActionEffectImpl.ErrorEffect
import akka.javasdk.impl.timedaction.TimedActionEffectImpl.ReplyEffect
Expand Down Expand Up @@ -72,6 +74,7 @@ object TimedActionImpl {
/** EndMarker */
@InternalApi
private[impl] final class TimedActionImpl[TA <: TimedAction](
componentId: String,
val factory: () => TA,
timedActionClass: Class[TA],
_system: ActorSystem,
Expand All @@ -87,20 +90,27 @@ private[impl] final class TimedActionImpl[TA <: TimedAction](

private implicit val executionContext: ExecutionContext = sdkExecutionContext
implicit val system: ActorSystem = _system
private val traceInstrumentation = new TraceInstrumentation(componentId, TimedActionCategory, tracerFactory)

private def createRouter(): ReflectiveTimedActionRouter[TA] =
new ReflectiveTimedActionRouter[TA](factory(), componentDescriptor.commandHandlers, jsonSerializer)

override def handleCommand(command: Command): Future[Effect] = {
val span: Option[Span] = None //FIXME add intrumentation
val metadata = MetadataImpl.of(command.metadata)

// FIXME would be good if we could record the chosen method in the span
val span: Option[Span] =
traceInstrumentation.buildSpan(ComponentType.TimedAction, componentId, metadata.subjectScala, command.metadata)

span.foreach(s => MDC.put(Telemetry.TRACE_ID, s.getSpanContext.getTraceId))
val fut =
try {
val commandContext = createCommandContext(command, span)
val updatedMetadata = span.map(metadata.withTracing).getOrElse(metadata)
val commandContext = new CommandContextImpl(updatedMetadata, timerClient, tracerFactory, span)

val payload: BytesPayload = command.payload.getOrElse(throw new IllegalArgumentException("No command payload"))
val effect = createRouter()
.handleCommand(command.name, CommandEnvelope.of(payload, commandContext.metadata()), commandContext)
.handleCommand(command.name, CommandEnvelope.of(payload, commandContext.metadata), commandContext)
toSpiEffect(command, effect)
} catch {
case NonFatal(ex) =>
Expand All @@ -114,12 +124,6 @@ private[impl] final class TimedActionImpl[TA <: TimedAction](
}
}

private def createCommandContext(command: Command, span: Option[Span]): CommandContext = {
val metadata = MetadataImpl.of(command.metadata)
val updatedMetadata = span.map(metadata.withTracing).getOrElse(metadata)
new CommandContextImpl(updatedMetadata, timerClient, tracerFactory, span)
}

private def toSpiEffect(command: Command, effect: TimedAction.Effect): Future[Effect] = {
effect match {
case ReplyEffect(_) => //FIXME remove meta, not used in the reply
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -387,7 +387,6 @@ private[impl] object ViewDescriptorFactory {
}

override def handle(input: SpiTableUpdateEnvelope): Future[SpiTableUpdateEffect] = Future {
// FIXME tracing span?
val existingState: Option[AnyRef] = input.existingTableRow.map(serializer.fromBytes)
val metadata = MetadataImpl.of(input.metadata)
val addedToMDC = metadata.traceId match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ class TimedActionImplSpec

def create(componentDescriptor: ComponentDescriptor): TimedActionImpl[TestTimedAction] = {
new TimedActionImpl(
"dummy-id",
() => new TestTimedAction,
classOf[TestTimedAction],
classicSystem,
Expand Down
Loading