Skip to content

Commit

Permalink
feat: adding tracing id to UF logs (#2160)
Browse files Browse the repository at this point in the history
* feat:  adding tracing id to UF logs
  • Loading branch information
franciscolopezsancho authored Jul 1, 2024
1 parent 0811ef9 commit 3769379
Show file tree
Hide file tree
Showing 15 changed files with 225 additions and 63 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import kalix.javasdk.impl.telemetry.{ ActionCategory, Instrumentation, Telemetry
import kalix.protocol.action.{ ActionCommand, ActionResponse, Actions }
import kalix.protocol.component
import kalix.protocol.component.{ Failure, MetadataEntry }
import org.slf4j.{ Logger, LoggerFactory }
import org.slf4j.{ Logger, LoggerFactory, MDC }

import java.util.Optional
import scala.compat.java8.OptionConverters.RichOptionForJava8
Expand Down Expand Up @@ -176,7 +176,7 @@ private[javasdk] final class ActionsImpl(_system: ActorSystem, services: Map[Str
services.get(in.serviceName) match {
case Some(service) =>
val span = telemetries(service.serviceName).buildSpan(service, in)

span.foreach(s => MDC.put(Telemetry.TRACE_ID, s.getSpanContext.getTraceId))
val fut =
try {
val context = createContext(in, service.messageCodec, span.map(_.getSpanContext), service.serviceName)
Expand All @@ -190,9 +190,13 @@ private[javasdk] final class ActionsImpl(_system: ActorSystem, services: Map[Str
case NonFatal(ex) =>
// command handler threw an "unexpected" error
Future.successful(handleUnexpectedException(service, in, ex))
} finally {
MDC.remove(Telemetry.TRACE_ID)
}
fut.andThen { case _ =>
span.foreach(_.end())
span.foreach { s =>
s.end()
}
}
case None =>
Future.successful(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import kalix.protocol.event_sourced_entity.EventSourcedStreamOut.Message.{ Failu
import kalix.protocol.event_sourced_entity.EventSourcedStreamOut.Message.{ Reply => OutReply }
import kalix.protocol.event_sourced_entity.EventSourcedStreamOut.Message.{ SnapshotReply => OutSnapshotReply }
import kalix.protocol.event_sourced_entity._
import org.slf4j.LoggerFactory
import org.slf4j.{ LoggerFactory, MDC }

import scala.util.control.NonFatal

Expand Down Expand Up @@ -170,6 +170,7 @@ final class EventSourcedEntitiesImpl(
if (thisEntityId != command.entityId)
throw ProtocolException(command, "Receiving entity is not the intended recipient of command")
val span = instrumentations(service.serviceName).buildSpan(service, command)
span.foreach(s => MDC.put(Telemetry.TRACE_ID, s.getSpanContext.getTraceId))
try {
val cmd =
service.messageCodec.decodeMessage(
Expand Down Expand Up @@ -234,7 +235,12 @@ final class EventSourcedEntitiesImpl(
serializedSnapshot,
delete))))
}
} finally { span.foreach(_.end()) }
} finally {
span.foreach { s =>
MDC.remove(Telemetry.TRACE_ID)
s.end()
}
}
case ((sequence, _), InSnapshotRequest(request)) =>
val reply =
EventSourcedSnapshotReply(request.requestId, Some(service.messageCodec.encodeScala(router._stateOrEmpty())))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ object Telemetry extends ExtensionId[Telemetry] {

val TRACE_PARENT_KEY: String = TraceInstrumentation.TRACE_PARENT_KEY
val TRACE_STATE_KEY: String = TraceInstrumentation.TRACE_STATE_KEY
val TRACE_ID: String = "trace_id"
override def createExtension(system: ExtendedActorSystem): Telemetry =
new Telemetry(system)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,38 +6,31 @@ package kalix.javasdk.impl.valueentity

import akka.NotUsed
import akka.actor.ActorSystem
import akka.stream.scaladsl.Flow
import akka.stream.scaladsl.Source
import akka.stream.scaladsl.{ Flow, Source }
import io.grpc.Status
import kalix.javasdk.KalixRunner.Configuration
import kalix.javasdk.impl.ErrorHandling.BadRequestException
import kalix.javasdk.impl.telemetry.Instrumentation
import kalix.javasdk.impl.telemetry.Telemetry
import kalix.javasdk.impl.telemetry.ValueEntityCategory
import kalix.javasdk.impl.telemetry.{ Instrumentation, Telemetry, ValueEntityCategory }
import kalix.protocol.component.Failure
import org.slf4j.LoggerFactory
import org.slf4j.{ LoggerFactory, MDC }

import scala.util.control.NonFatal

// FIXME these don't seem to be 'public API', more internals?
import com.google.protobuf.Descriptors
import kalix.javasdk.Metadata
import kalix.javasdk.impl.ValueEntityFactory
import kalix.javasdk.impl._
import kalix.javasdk.impl.effect.EffectSupport
import kalix.javasdk.impl.effect.ErrorReplyImpl
import kalix.javasdk.impl.effect.MessageReplyImpl
import kalix.javasdk.impl.valueentity.ValueEntityEffectImpl.DeleteEntity
import kalix.javasdk.impl.valueentity.ValueEntityEffectImpl.UpdateState
import kalix.javasdk.impl.effect.{ EffectSupport, ErrorReplyImpl, MessageReplyImpl }
import kalix.javasdk.impl.valueentity.ValueEntityEffectImpl.{ DeleteEntity, UpdateState }
import kalix.javasdk.impl.valueentity.ValueEntityRouter.CommandResult
import kalix.javasdk.valueentity._
import kalix.protocol.value_entity.ValueEntityAction.Action.Delete
import kalix.protocol.value_entity.ValueEntityAction.Action.Update
import kalix.protocol.value_entity.ValueEntityStreamIn.Message.{ Command => InCommand }
import kalix.protocol.value_entity.ValueEntityStreamIn.Message.{ Empty => InEmpty }
import kalix.protocol.value_entity.ValueEntityStreamIn.Message.{ Init => InInit }
import kalix.protocol.value_entity.ValueEntityStreamOut.Message.{ Failure => OutFailure }
import kalix.protocol.value_entity.ValueEntityStreamOut.Message.{ Reply => OutReply }
import kalix.protocol.value_entity.ValueEntityAction.Action.{ Delete, Update }
import kalix.protocol.value_entity.ValueEntityStreamIn.Message.{
Command => InCommand,
Empty => InEmpty,
Init => InInit
}
import kalix.protocol.value_entity.ValueEntityStreamOut.Message.{ Failure => OutFailure, Reply => OutReply }
import kalix.protocol.value_entity._

final class ValueEntityService(
Expand Down Expand Up @@ -149,7 +142,7 @@ final class ValueEntitiesImpl(

if (log.isTraceEnabled) log.trace("Metadata entries [{}].", metadata.entries)
val span = instrumentations(service.serviceName).buildSpan(service, command)

span.foreach(s => MDC.put(Telemetry.TRACE_ID, s.getSpanContext.getTraceId))
try {
val cmd =
service.messageCodec.decodeMessage(
Expand Down Expand Up @@ -203,7 +196,10 @@ final class ValueEntitiesImpl(
action)))
}
} finally {
span.foreach(_.end())
span.foreach { s =>
MDC.remove(Telemetry.TRACE_ID)
s.end()
}
}

case InInit(_) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,36 +10,49 @@ import akka.testkit.SocketUtil
import kalix.javasdk.{ Kalix, KalixRunner }
import com.typesafe.config.{ Config, ConfigFactory }
import kalix.javasdk.eventsourcedentity.EventSourcedEntityProvider
import kalix.javasdk.impl.telemetry.TraceInstrumentation
import kalix.javasdk.impl.ProxyInfoHolder

object TestEventSourced {
def service(entityProvider: EventSourcedEntityProvider[_, _, _]): TestEventSourcedService =
new TestEventSourcedService(entityProvider)
def service(
entityProvider: EventSourcedEntityProvider[_, _, _],
extraConfig: Option[Config] = None): TestEventSourcedService =
new TestEventSourcedService(entityProvider, extraConfig)
}

class TestEventSourcedService(entityProvider: EventSourcedEntityProvider[_, _, _]) {
class TestEventSourcedService(entityProvider: EventSourcedEntityProvider[_, _, _], extraConfig: Option[Config] = None) {
val port: Int = SocketUtil.temporaryLocalPort()

val config: Config = ConfigFactory.load(ConfigFactory.parseString(s"""
val config: Config = ConfigFactory.load(
ConfigFactory
.parseString(s"""
kalix {
user-function-port = $port
system.akka {
loglevel = DEBUG
coordinated-shutdown.exit-jvm = off
}
}
"""))
""").withFallback(extraConfig.getOrElse(ConfigFactory.empty)))

val runner: KalixRunner = new Kalix()
.register(entityProvider)
.createRunner(config)

runner.run()
//setting tracing as disabled, emulating that is discovered from the proxy.
ProxyInfoHolder(runner.system).overrideTracingCollectorEndpoint("")
//setting tracing as it was sent by the proxy and discovered by the user function
if (config.hasPath(TraceInstrumentation.TRACING_ENDPOINT)) {
ProxyInfoHolder(runner.system).overrideTracingCollectorEndpoint(
config.getString(TraceInstrumentation.TRACING_ENDPOINT))
} else {
ProxyInfoHolder(runner.system).overrideTracingCollectorEndpoint("");
}

def expectLogError[T](message: String)(block: => T): T =
LoggingTestKit.error(message).expect(block)(runner.system.toTyped)

def expectLogMdc[T](mdc: Map[String, String])(block: => T): T =
LoggingTestKit.empty.withMdc(mdc).expect(block)(runner.system.toTyped)

def terminate(): Unit = runner.terminate()
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,36 +12,52 @@ import kalix.javasdk.valueentity.ValueEntityProvider
import com.typesafe.config.Config
import com.typesafe.config.ConfigFactory
import kalix.javasdk.impl.ProxyInfoHolder
import kalix.javasdk.impl.telemetry.TraceInstrumentation
import org.slf4j.event.Level

object TestValueEntity {
def service(entityProvider: ValueEntityProvider[_, _]): TestValueService =
new TestValueService(entityProvider)
def service(entityProvider: ValueEntityProvider[_, _], extraConfig: Option[Config] = None): TestValueService =
new TestValueService(entityProvider, extraConfig)
}

class TestValueService(entityProvider: ValueEntityProvider[_, _]) {
class TestValueService(entityProvider: ValueEntityProvider[_, _], extraConfig: Option[Config] = None) {
val port: Int = SocketUtil.temporaryLocalPort()

val config: Config = ConfigFactory.load(ConfigFactory.parseString(s"""
val config: Config = ConfigFactory.load(
ConfigFactory
.parseString(s"""
kalix {
user-function-port = $port
system.akka {
loglevel = DEBUG
coordinated-shutdown.exit-jvm = off
}
}
"""))
""").withFallback(extraConfig.getOrElse(ConfigFactory.empty)))

val runner: KalixRunner = new Kalix()
.register(entityProvider)
.createRunner(config)

runner.run()
//setting tracing as disabled, emulating that is discovered from the proxy.
ProxyInfoHolder(runner.system).overrideTracingCollectorEndpoint("")
//setting tracing as it was sent by the proxy and discovered by the user function
if (config.hasPath(TraceInstrumentation.TRACING_ENDPOINT)) {
ProxyInfoHolder(runner.system).overrideTracingCollectorEndpoint(
config.getString(TraceInstrumentation.TRACING_ENDPOINT))
} else {
ProxyInfoHolder(runner.system).overrideTracingCollectorEndpoint("");
}

def expectLogError[T](message: String)(block: => T): T = {
LoggingTestKit.error(message).expect(block)(runner.system.toTyped)
}

def expectLogInfoRegEx[T](regEx: String)(block: => T): T = {
LoggingTestKit.empty.withLogLevel(Level.INFO).withMessageRegex(regEx).expect(block)(runner.system.toTyped)
}

def expectLogMdc[T](mdc: Map[String, String])(block: => T): T =
LoggingTestKit.empty.withMdc(mdc).expect(block)(runner.system.toTyped)

def terminate(): Unit = runner.terminate()
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@

package kalix.javasdk.impl.valueentity

import com.typesafe.config.Config
import io.grpc.Status.Code.INVALID_ARGUMENT
import kalix.javasdk.valueentity.CartEntity
import kalix.javasdk.valueentity.CartEntityProvider
import kalix.javasdk.valueentity.{ CartEntity, CartEntityProvider }
import kalix.testkit.TestProtocol
import kalix.testkit.valueentity.ValueEntityMessages
import org.scalatest.BeforeAndAfterAll
Expand All @@ -19,7 +19,7 @@ class ValueEntitiesImplSpec extends AnyWordSpec with Matchers with BeforeAndAfte
import ShoppingCart.Protocol._
import ValueEntityMessages._

private val service: TestValueService = ShoppingCart.testService
private val service: TestValueService = ShoppingCart.testService()
private val protocol: TestProtocol = TestProtocol(service.port)

override def afterAll(): Unit = {
Expand Down Expand Up @@ -176,10 +176,11 @@ object ValueEntitiesImplSpec {

val Name: String = ShoppingCartApi.getDescriptor.findServiceByName("ShoppingCartService").getFullName

def testService: TestValueService =
def testService(extraConfig: Option[Config] = None): TestValueService =
TestValueEntity.service(
CartEntityProvider
.of(new CartEntity(_)))
.of(new CartEntity(_)),
extraConfig)

case class Item(id: String, name: String, quantity: Int)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,14 @@ object EventSourcedMessages extends EntityMessages {
def command(id: Long, entityId: String, name: String, payload: ScalaPbMessage): InMessage =
command(id, entityId, name, messagePayload(payload))

def command(
id: Long,
entityId: String,
name: String,
payload: ScalaPbMessage,
metadata: Option[Metadata]): InMessage =
InMessage.Command(Command(entityId, id, name, messagePayload(payload), metadata))

def command(id: Long, entityId: String, name: String, payload: Option[ScalaPbAny]): InMessage =
InMessage.Command(Command(entityId, id, name, payload))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,21 @@ object ValueEntityMessages extends EntityMessages {
def command(id: Long, entityId: String, name: String, payload: ScalaPbMessage): InMessage =
command(id, entityId, name, messagePayload(payload))

def command(id: Long, entityId: String, name: String, payload: Option[ScalaPbAny]): InMessage =
InMessage.Command(Command(entityId, id, name, payload))
def command(
id: Long,
entityId: String,
name: String,
payload: ScalaPbMessage,
metadata: Option[Metadata]): InMessage =
command(id, entityId, name, messagePayload(payload), metadata)

def command(
id: Long,
entityId: String,
name: String,
payload: Option[ScalaPbAny],
metadata: Option[Metadata] = None): InMessage =
InMessage.Command(Command(entityId, id, name, payload, metadata))

def reply(id: Long, payload: JavaPbMessage): OutMessage =
reply(id, messagePayload(payload), None)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,17 @@
package kalix.javasdk.action;


import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.web.bind.annotation.GetMapping;

public class TestTracingAction extends Action {

Logger logger = LoggerFactory.getLogger(TestTracingAction.class);

@GetMapping("/tracing/traceparent")
public Effect<String> endpoint() {
logger.info("registering a logging event");
return effects().reply(
actionContext().metadata().traceContext().traceParent().orElse("not-found"));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
import kalix.javasdk.annotations.EventHandler;
import kalix.javasdk.annotations.Id;
import kalix.javasdk.annotations.TypeId;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;

Expand All @@ -15,13 +17,16 @@
@RequestMapping("/es")
public class TestEventSourcedEntity extends EventSourcedEntity<TestESState, TestESEvent> {

Logger logger = LoggerFactory.getLogger(TestEventSourcedEntity.class);

@Override
public TestESState emptyState() {
return new TestESState("", 0, false, "");
}

@GetMapping
public Effect<TestESState> get() {
logger.info("registering a logging event");
return effects().reply(currentState());
}

Expand Down
Loading

0 comments on commit 3769379

Please sign in to comment.