diff --git a/sdk/java-sdk-protobuf/src/main/scala/kalix/javasdk/impl/action/ActionsImpl.scala b/sdk/java-sdk-protobuf/src/main/scala/kalix/javasdk/impl/action/ActionsImpl.scala index a62b3c0c6d..b66b34c639 100644 --- a/sdk/java-sdk-protobuf/src/main/scala/kalix/javasdk/impl/action/ActionsImpl.scala +++ b/sdk/java-sdk-protobuf/src/main/scala/kalix/javasdk/impl/action/ActionsImpl.scala @@ -22,7 +22,7 @@ import kalix.javasdk.impl.telemetry.{ ActionCategory, Instrumentation, Telemetry import kalix.protocol.action.{ ActionCommand, ActionResponse, Actions } import kalix.protocol.component import kalix.protocol.component.{ Failure, MetadataEntry } -import org.slf4j.{ Logger, LoggerFactory } +import org.slf4j.{ Logger, LoggerFactory, MDC } import java.util.Optional import scala.compat.java8.OptionConverters.RichOptionForJava8 @@ -176,7 +176,7 @@ private[javasdk] final class ActionsImpl(_system: ActorSystem, services: Map[Str services.get(in.serviceName) match { case Some(service) => val span = telemetries(service.serviceName).buildSpan(service, in) - + span.foreach(s => MDC.put(Telemetry.TRACE_ID, s.getSpanContext.getTraceId)) val fut = try { val context = createContext(in, service.messageCodec, span.map(_.getSpanContext), service.serviceName) @@ -190,9 +190,13 @@ private[javasdk] final class ActionsImpl(_system: ActorSystem, services: Map[Str case NonFatal(ex) => // command handler threw an "unexpected" error Future.successful(handleUnexpectedException(service, in, ex)) + } finally { + MDC.remove(Telemetry.TRACE_ID) } fut.andThen { case _ => - span.foreach(_.end()) + span.foreach { s => + s.end() + } } case None => Future.successful( diff --git a/sdk/java-sdk-protobuf/src/main/scala/kalix/javasdk/impl/eventsourcedentity/EventSourcedEntitiesImpl.scala b/sdk/java-sdk-protobuf/src/main/scala/kalix/javasdk/impl/eventsourcedentity/EventSourcedEntitiesImpl.scala index 043adc6215..3ec43e7d95 100644 --- a/sdk/java-sdk-protobuf/src/main/scala/kalix/javasdk/impl/eventsourcedentity/EventSourcedEntitiesImpl.scala +++ b/sdk/java-sdk-protobuf/src/main/scala/kalix/javasdk/impl/eventsourcedentity/EventSourcedEntitiesImpl.scala @@ -34,7 +34,7 @@ import kalix.protocol.event_sourced_entity.EventSourcedStreamOut.Message.{ Failu import kalix.protocol.event_sourced_entity.EventSourcedStreamOut.Message.{ Reply => OutReply } import kalix.protocol.event_sourced_entity.EventSourcedStreamOut.Message.{ SnapshotReply => OutSnapshotReply } import kalix.protocol.event_sourced_entity._ -import org.slf4j.LoggerFactory +import org.slf4j.{ LoggerFactory, MDC } import scala.util.control.NonFatal @@ -170,6 +170,7 @@ final class EventSourcedEntitiesImpl( if (thisEntityId != command.entityId) throw ProtocolException(command, "Receiving entity is not the intended recipient of command") val span = instrumentations(service.serviceName).buildSpan(service, command) + span.foreach(s => MDC.put(Telemetry.TRACE_ID, s.getSpanContext.getTraceId)) try { val cmd = service.messageCodec.decodeMessage( @@ -234,7 +235,12 @@ final class EventSourcedEntitiesImpl( serializedSnapshot, delete)))) } - } finally { span.foreach(_.end()) } + } finally { + span.foreach { s => + MDC.remove(Telemetry.TRACE_ID) + s.end() + } + } case ((sequence, _), InSnapshotRequest(request)) => val reply = EventSourcedSnapshotReply(request.requestId, Some(service.messageCodec.encodeScala(router._stateOrEmpty()))) diff --git a/sdk/java-sdk-protobuf/src/main/scala/kalix/javasdk/impl/telemetry/Telemetry.scala b/sdk/java-sdk-protobuf/src/main/scala/kalix/javasdk/impl/telemetry/Telemetry.scala index 7839a14f13..cc1a144861 100644 --- a/sdk/java-sdk-protobuf/src/main/scala/kalix/javasdk/impl/telemetry/Telemetry.scala +++ b/sdk/java-sdk-protobuf/src/main/scala/kalix/javasdk/impl/telemetry/Telemetry.scala @@ -33,6 +33,7 @@ object Telemetry extends ExtensionId[Telemetry] { val TRACE_PARENT_KEY: String = TraceInstrumentation.TRACE_PARENT_KEY val TRACE_STATE_KEY: String = TraceInstrumentation.TRACE_STATE_KEY + val TRACE_ID: String = "trace_id" override def createExtension(system: ExtendedActorSystem): Telemetry = new Telemetry(system) } diff --git a/sdk/java-sdk-protobuf/src/main/scala/kalix/javasdk/impl/valueentity/ValueEntitiesImpl.scala b/sdk/java-sdk-protobuf/src/main/scala/kalix/javasdk/impl/valueentity/ValueEntitiesImpl.scala index 115e9de9fe..c94d992b40 100644 --- a/sdk/java-sdk-protobuf/src/main/scala/kalix/javasdk/impl/valueentity/ValueEntitiesImpl.scala +++ b/sdk/java-sdk-protobuf/src/main/scala/kalix/javasdk/impl/valueentity/ValueEntitiesImpl.scala @@ -6,38 +6,31 @@ package kalix.javasdk.impl.valueentity import akka.NotUsed import akka.actor.ActorSystem -import akka.stream.scaladsl.Flow -import akka.stream.scaladsl.Source +import akka.stream.scaladsl.{ Flow, Source } import io.grpc.Status import kalix.javasdk.KalixRunner.Configuration import kalix.javasdk.impl.ErrorHandling.BadRequestException -import kalix.javasdk.impl.telemetry.Instrumentation -import kalix.javasdk.impl.telemetry.Telemetry -import kalix.javasdk.impl.telemetry.ValueEntityCategory +import kalix.javasdk.impl.telemetry.{ Instrumentation, Telemetry, ValueEntityCategory } import kalix.protocol.component.Failure -import org.slf4j.LoggerFactory +import org.slf4j.{ LoggerFactory, MDC } import scala.util.control.NonFatal // FIXME these don't seem to be 'public API', more internals? import com.google.protobuf.Descriptors import kalix.javasdk.Metadata -import kalix.javasdk.impl.ValueEntityFactory import kalix.javasdk.impl._ -import kalix.javasdk.impl.effect.EffectSupport -import kalix.javasdk.impl.effect.ErrorReplyImpl -import kalix.javasdk.impl.effect.MessageReplyImpl -import kalix.javasdk.impl.valueentity.ValueEntityEffectImpl.DeleteEntity -import kalix.javasdk.impl.valueentity.ValueEntityEffectImpl.UpdateState +import kalix.javasdk.impl.effect.{ EffectSupport, ErrorReplyImpl, MessageReplyImpl } +import kalix.javasdk.impl.valueentity.ValueEntityEffectImpl.{ DeleteEntity, UpdateState } import kalix.javasdk.impl.valueentity.ValueEntityRouter.CommandResult import kalix.javasdk.valueentity._ -import kalix.protocol.value_entity.ValueEntityAction.Action.Delete -import kalix.protocol.value_entity.ValueEntityAction.Action.Update -import kalix.protocol.value_entity.ValueEntityStreamIn.Message.{ Command => InCommand } -import kalix.protocol.value_entity.ValueEntityStreamIn.Message.{ Empty => InEmpty } -import kalix.protocol.value_entity.ValueEntityStreamIn.Message.{ Init => InInit } -import kalix.protocol.value_entity.ValueEntityStreamOut.Message.{ Failure => OutFailure } -import kalix.protocol.value_entity.ValueEntityStreamOut.Message.{ Reply => OutReply } +import kalix.protocol.value_entity.ValueEntityAction.Action.{ Delete, Update } +import kalix.protocol.value_entity.ValueEntityStreamIn.Message.{ + Command => InCommand, + Empty => InEmpty, + Init => InInit +} +import kalix.protocol.value_entity.ValueEntityStreamOut.Message.{ Failure => OutFailure, Reply => OutReply } import kalix.protocol.value_entity._ final class ValueEntityService( @@ -149,7 +142,7 @@ final class ValueEntitiesImpl( if (log.isTraceEnabled) log.trace("Metadata entries [{}].", metadata.entries) val span = instrumentations(service.serviceName).buildSpan(service, command) - + span.foreach(s => MDC.put(Telemetry.TRACE_ID, s.getSpanContext.getTraceId)) try { val cmd = service.messageCodec.decodeMessage( @@ -203,7 +196,10 @@ final class ValueEntitiesImpl( action))) } } finally { - span.foreach(_.end()) + span.foreach { s => + MDC.remove(Telemetry.TRACE_ID) + s.end() + } } case InInit(_) => diff --git a/sdk/java-sdk-protobuf/src/test/scala/kalix/javasdk/impl/eventsourcedentity/TestEventSourced.scala b/sdk/java-sdk-protobuf/src/test/scala/kalix/javasdk/impl/eventsourcedentity/TestEventSourced.scala index 67e376b133..2787fa996d 100644 --- a/sdk/java-sdk-protobuf/src/test/scala/kalix/javasdk/impl/eventsourcedentity/TestEventSourced.scala +++ b/sdk/java-sdk-protobuf/src/test/scala/kalix/javasdk/impl/eventsourcedentity/TestEventSourced.scala @@ -10,17 +10,22 @@ import akka.testkit.SocketUtil import kalix.javasdk.{ Kalix, KalixRunner } import com.typesafe.config.{ Config, ConfigFactory } import kalix.javasdk.eventsourcedentity.EventSourcedEntityProvider +import kalix.javasdk.impl.telemetry.TraceInstrumentation import kalix.javasdk.impl.ProxyInfoHolder object TestEventSourced { - def service(entityProvider: EventSourcedEntityProvider[_, _, _]): TestEventSourcedService = - new TestEventSourcedService(entityProvider) + def service( + entityProvider: EventSourcedEntityProvider[_, _, _], + extraConfig: Option[Config] = None): TestEventSourcedService = + new TestEventSourcedService(entityProvider, extraConfig) } -class TestEventSourcedService(entityProvider: EventSourcedEntityProvider[_, _, _]) { +class TestEventSourcedService(entityProvider: EventSourcedEntityProvider[_, _, _], extraConfig: Option[Config] = None) { val port: Int = SocketUtil.temporaryLocalPort() - val config: Config = ConfigFactory.load(ConfigFactory.parseString(s""" + val config: Config = ConfigFactory.load( + ConfigFactory + .parseString(s""" kalix { user-function-port = $port system.akka { @@ -28,18 +33,26 @@ class TestEventSourcedService(entityProvider: EventSourcedEntityProvider[_, _, _ coordinated-shutdown.exit-jvm = off } } - """)) + """).withFallback(extraConfig.getOrElse(ConfigFactory.empty))) val runner: KalixRunner = new Kalix() .register(entityProvider) .createRunner(config) runner.run() - //setting tracing as disabled, emulating that is discovered from the proxy. - ProxyInfoHolder(runner.system).overrideTracingCollectorEndpoint("") + //setting tracing as it was sent by the proxy and discovered by the user function + if (config.hasPath(TraceInstrumentation.TRACING_ENDPOINT)) { + ProxyInfoHolder(runner.system).overrideTracingCollectorEndpoint( + config.getString(TraceInstrumentation.TRACING_ENDPOINT)) + } else { + ProxyInfoHolder(runner.system).overrideTracingCollectorEndpoint(""); + } def expectLogError[T](message: String)(block: => T): T = LoggingTestKit.error(message).expect(block)(runner.system.toTyped) + def expectLogMdc[T](mdc: Map[String, String])(block: => T): T = + LoggingTestKit.empty.withMdc(mdc).expect(block)(runner.system.toTyped) + def terminate(): Unit = runner.terminate() } diff --git a/sdk/java-sdk-protobuf/src/test/scala/kalix/javasdk/impl/valueentity/TestValueEntity.scala b/sdk/java-sdk-protobuf/src/test/scala/kalix/javasdk/impl/valueentity/TestValueEntity.scala index e479322216..6993f77657 100644 --- a/sdk/java-sdk-protobuf/src/test/scala/kalix/javasdk/impl/valueentity/TestValueEntity.scala +++ b/sdk/java-sdk-protobuf/src/test/scala/kalix/javasdk/impl/valueentity/TestValueEntity.scala @@ -12,16 +12,20 @@ import kalix.javasdk.valueentity.ValueEntityProvider import com.typesafe.config.Config import com.typesafe.config.ConfigFactory import kalix.javasdk.impl.ProxyInfoHolder +import kalix.javasdk.impl.telemetry.TraceInstrumentation +import org.slf4j.event.Level object TestValueEntity { - def service(entityProvider: ValueEntityProvider[_, _]): TestValueService = - new TestValueService(entityProvider) + def service(entityProvider: ValueEntityProvider[_, _], extraConfig: Option[Config] = None): TestValueService = + new TestValueService(entityProvider, extraConfig) } -class TestValueService(entityProvider: ValueEntityProvider[_, _]) { +class TestValueService(entityProvider: ValueEntityProvider[_, _], extraConfig: Option[Config] = None) { val port: Int = SocketUtil.temporaryLocalPort() - val config: Config = ConfigFactory.load(ConfigFactory.parseString(s""" + val config: Config = ConfigFactory.load( + ConfigFactory + .parseString(s""" kalix { user-function-port = $port system.akka { @@ -29,19 +33,31 @@ class TestValueService(entityProvider: ValueEntityProvider[_, _]) { coordinated-shutdown.exit-jvm = off } } - """)) + """).withFallback(extraConfig.getOrElse(ConfigFactory.empty))) val runner: KalixRunner = new Kalix() .register(entityProvider) .createRunner(config) runner.run() - //setting tracing as disabled, emulating that is discovered from the proxy. - ProxyInfoHolder(runner.system).overrideTracingCollectorEndpoint("") + //setting tracing as it was sent by the proxy and discovered by the user function + if (config.hasPath(TraceInstrumentation.TRACING_ENDPOINT)) { + ProxyInfoHolder(runner.system).overrideTracingCollectorEndpoint( + config.getString(TraceInstrumentation.TRACING_ENDPOINT)) + } else { + ProxyInfoHolder(runner.system).overrideTracingCollectorEndpoint(""); + } def expectLogError[T](message: String)(block: => T): T = { LoggingTestKit.error(message).expect(block)(runner.system.toTyped) } + def expectLogInfoRegEx[T](regEx: String)(block: => T): T = { + LoggingTestKit.empty.withLogLevel(Level.INFO).withMessageRegex(regEx).expect(block)(runner.system.toTyped) + } + + def expectLogMdc[T](mdc: Map[String, String])(block: => T): T = + LoggingTestKit.empty.withMdc(mdc).expect(block)(runner.system.toTyped) + def terminate(): Unit = runner.terminate() } diff --git a/sdk/java-sdk-protobuf/src/test/scala/kalix/javasdk/impl/valueentity/ValueEntitiesImplSpec.scala b/sdk/java-sdk-protobuf/src/test/scala/kalix/javasdk/impl/valueentity/ValueEntitiesImplSpec.scala index 6631ac8e45..09b7145fd5 100644 --- a/sdk/java-sdk-protobuf/src/test/scala/kalix/javasdk/impl/valueentity/ValueEntitiesImplSpec.scala +++ b/sdk/java-sdk-protobuf/src/test/scala/kalix/javasdk/impl/valueentity/ValueEntitiesImplSpec.scala @@ -4,9 +4,9 @@ package kalix.javasdk.impl.valueentity +import com.typesafe.config.Config import io.grpc.Status.Code.INVALID_ARGUMENT -import kalix.javasdk.valueentity.CartEntity -import kalix.javasdk.valueentity.CartEntityProvider +import kalix.javasdk.valueentity.{ CartEntity, CartEntityProvider } import kalix.testkit.TestProtocol import kalix.testkit.valueentity.ValueEntityMessages import org.scalatest.BeforeAndAfterAll @@ -19,7 +19,7 @@ class ValueEntitiesImplSpec extends AnyWordSpec with Matchers with BeforeAndAfte import ShoppingCart.Protocol._ import ValueEntityMessages._ - private val service: TestValueService = ShoppingCart.testService + private val service: TestValueService = ShoppingCart.testService() private val protocol: TestProtocol = TestProtocol(service.port) override def afterAll(): Unit = { @@ -176,10 +176,11 @@ object ValueEntitiesImplSpec { val Name: String = ShoppingCartApi.getDescriptor.findServiceByName("ShoppingCartService").getFullName - def testService: TestValueService = + def testService(extraConfig: Option[Config] = None): TestValueService = TestValueEntity.service( CartEntityProvider - .of(new CartEntity(_))) + .of(new CartEntity(_)), + extraConfig) case class Item(id: String, name: String, quantity: Int) diff --git a/sdk/java-sdk-protobuf/src/test/scala/kalix/testkit/eventsourcedentity/EventSourcedMessages.scala b/sdk/java-sdk-protobuf/src/test/scala/kalix/testkit/eventsourcedentity/EventSourcedMessages.scala index de829e6407..42c8da271a 100644 --- a/sdk/java-sdk-protobuf/src/test/scala/kalix/testkit/eventsourcedentity/EventSourcedMessages.scala +++ b/sdk/java-sdk-protobuf/src/test/scala/kalix/testkit/eventsourcedentity/EventSourcedMessages.scala @@ -94,6 +94,14 @@ object EventSourcedMessages extends EntityMessages { def command(id: Long, entityId: String, name: String, payload: ScalaPbMessage): InMessage = command(id, entityId, name, messagePayload(payload)) + def command( + id: Long, + entityId: String, + name: String, + payload: ScalaPbMessage, + metadata: Option[Metadata]): InMessage = + InMessage.Command(Command(entityId, id, name, messagePayload(payload), metadata)) + def command(id: Long, entityId: String, name: String, payload: Option[ScalaPbAny]): InMessage = InMessage.Command(Command(entityId, id, name, payload)) diff --git a/sdk/java-sdk-protobuf/src/test/scala/kalix/testkit/valueentity/ValueEntityMessages.scala b/sdk/java-sdk-protobuf/src/test/scala/kalix/testkit/valueentity/ValueEntityMessages.scala index 76a0db97f4..46362da5e1 100644 --- a/sdk/java-sdk-protobuf/src/test/scala/kalix/testkit/valueentity/ValueEntityMessages.scala +++ b/sdk/java-sdk-protobuf/src/test/scala/kalix/testkit/valueentity/ValueEntityMessages.scala @@ -78,8 +78,21 @@ object ValueEntityMessages extends EntityMessages { def command(id: Long, entityId: String, name: String, payload: ScalaPbMessage): InMessage = command(id, entityId, name, messagePayload(payload)) - def command(id: Long, entityId: String, name: String, payload: Option[ScalaPbAny]): InMessage = - InMessage.Command(Command(entityId, id, name, payload)) + def command( + id: Long, + entityId: String, + name: String, + payload: ScalaPbMessage, + metadata: Option[Metadata]): InMessage = + command(id, entityId, name, messagePayload(payload), metadata) + + def command( + id: Long, + entityId: String, + name: String, + payload: Option[ScalaPbAny], + metadata: Option[Metadata] = None): InMessage = + InMessage.Command(Command(entityId, id, name, payload, metadata)) def reply(id: Long, payload: JavaPbMessage): OutMessage = reply(id, messagePayload(payload), None) diff --git a/sdk/java-sdk-spring/src/test/java/kalix/javasdk/action/TestTracingAction.java b/sdk/java-sdk-spring/src/test/java/kalix/javasdk/action/TestTracingAction.java index 7d672ad927..e7b98a4ea7 100644 --- a/sdk/java-sdk-spring/src/test/java/kalix/javasdk/action/TestTracingAction.java +++ b/sdk/java-sdk-spring/src/test/java/kalix/javasdk/action/TestTracingAction.java @@ -5,12 +5,17 @@ package kalix.javasdk.action; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.springframework.web.bind.annotation.GetMapping; public class TestTracingAction extends Action { + Logger logger = LoggerFactory.getLogger(TestTracingAction.class); + @GetMapping("/tracing/traceparent") public Effect endpoint() { + logger.info("registering a logging event"); return effects().reply( actionContext().metadata().traceContext().traceParent().orElse("not-found")); } diff --git a/sdk/java-sdk-spring/src/test/java/kalix/javasdk/eventsourcedentity/TestEventSourcedEntity.java b/sdk/java-sdk-spring/src/test/java/kalix/javasdk/eventsourcedentity/TestEventSourcedEntity.java index f3a7772055..b081b09598 100644 --- a/sdk/java-sdk-spring/src/test/java/kalix/javasdk/eventsourcedentity/TestEventSourcedEntity.java +++ b/sdk/java-sdk-spring/src/test/java/kalix/javasdk/eventsourcedentity/TestEventSourcedEntity.java @@ -7,6 +7,8 @@ import kalix.javasdk.annotations.EventHandler; import kalix.javasdk.annotations.Id; import kalix.javasdk.annotations.TypeId; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestMapping; @@ -15,6 +17,8 @@ @RequestMapping("/es") public class TestEventSourcedEntity extends EventSourcedEntity { + Logger logger = LoggerFactory.getLogger(TestEventSourcedEntity.class); + @Override public TestESState emptyState() { return new TestESState("", 0, false, ""); @@ -22,6 +26,7 @@ public TestESState emptyState() { @GetMapping public Effect get() { + logger.info("registering a logging event"); return effects().reply(currentState()); } diff --git a/sdk/java-sdk-spring/src/test/java/kalix/javasdk/valueentity/TestValueEntity.java b/sdk/java-sdk-spring/src/test/java/kalix/javasdk/valueentity/TestValueEntity.java index eb23b2fead..654ede853d 100644 --- a/sdk/java-sdk-spring/src/test/java/kalix/javasdk/valueentity/TestValueEntity.java +++ b/sdk/java-sdk-spring/src/test/java/kalix/javasdk/valueentity/TestValueEntity.java @@ -6,6 +6,8 @@ import kalix.javasdk.annotations.Id; import kalix.javasdk.annotations.TypeId; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestMapping; @@ -14,6 +16,8 @@ @RequestMapping("/ve") public class TestValueEntity extends ValueEntity { + Logger log = LoggerFactory.getLogger(TestValueEntity.class); + @Override public TestVEState1 emptyState() { return new TestVEState1("empty", 1); @@ -21,6 +25,7 @@ public TestVEState1 emptyState() { @GetMapping public Effect get() { + log.info("registering a logging event"); return effects().reply(currentState()); } diff --git a/sdk/java-sdk-spring/src/test/scala/kalix/javasdk/impl/ActionsImplSpec.scala b/sdk/java-sdk-spring/src/test/scala/kalix/javasdk/impl/ActionsImplSpec.scala index a8e27d3834..4bd42ced05 100644 --- a/sdk/java-sdk-spring/src/test/scala/kalix/javasdk/impl/ActionsImplSpec.scala +++ b/sdk/java-sdk-spring/src/test/scala/kalix/javasdk/impl/ActionsImplSpec.scala @@ -4,7 +4,7 @@ package kalix.javasdk.impl -import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit +import akka.actor.testkit.typed.scaladsl.{ LoggingTestKit, ScalaTestWithActorTestKit } import akka.actor.typed.scaladsl.adapter._ import com.google.protobuf.any.Any.toJavaProto import com.google.protobuf.any.{ Any => ScalaPbAny } @@ -20,6 +20,7 @@ import kalix.javasdk.eventsourcedentity.OldTestESEvent.OldEvent3 import kalix.javasdk.eventsourcedentity.TestESEvent.Event4 import kalix.javasdk.impl.action.ActionService import kalix.javasdk.impl.action.ActionsImpl +import kalix.javasdk.impl.telemetry.Telemetry import kalix.protocol.action.ActionCommand import kalix.protocol.action.ActionResponse import kalix.protocol.action.Actions @@ -32,6 +33,9 @@ import org.scalatest.OptionValues import org.scalatest.concurrent.ScalaFutures import org.scalatest.matchers.should.Matchers import org.scalatest.wordspec.AnyWordSpecLike +import org.slf4j.LoggerFactory + +import scala.concurrent.{ ExecutionContext, Future } class ActionsImplSpec extends ScalaTestWithActorTestKit @@ -96,7 +100,7 @@ class ActionsImplSpec } } - "inject traces correctly into metadata" in { + "inject traces correctly into metadata and keeps trace_id in MDC" in { val jsonMessageCodec = new JsonMessageCodec() val actionProvider = ReflectiveActionProvider.of( classOf[TestTracingAction], @@ -113,15 +117,26 @@ class ActionsImplSpec .getFullName) val traceParent = "00-0af7651916cd43dd8448eb211c80319c-b7ad6b7169203331-01" - val md = Metadata(Seq(MetadataEntry("traceparent", MetadataEntry.Value.StringValue(traceParent)))) - val reply1 = service.handleUnary(ActionCommand(serviceName, "Endpoint", Some(cmd1), Some(md))).futureValue + val metadata = Metadata(Seq(MetadataEntry("traceparent", MetadataEntry.Value.StringValue(traceParent)))) + val expectedMDC = Map(Telemetry.TRACE_ID -> "0af7651916cd43dd8448eb211c80319c") + val reply1 = + LoggingTestKit.empty.withMdc(expectedMDC).expect { + service.handleUnary(ActionCommand(serviceName, "Endpoint", Some(cmd1), Some(metadata))).futureValue + } inside(reply1.response) { case ActionResponse.Response.Reply(Reply(Some(payload), _, _)) => val tp = decodeJson(classOf[String], toJavaProto(payload)) tp should not be "not-found" tp should include("0af7651916cd43dd8448eb211c80319c") // trace id should be propagated (tp should not).include("b7ad6b7169203331") // new span id should be generated } + + val log = LoggerFactory.getLogger(classOf[ActionsImplSpec]) + LoggingTestKit.empty.withMdc(Map.empty).expect { + Future { + log.info("checking the MDC is empty") + }(ExecutionContext.parasitic) //parasitic to checking that in the same thread there's no MDC any more + } } } diff --git a/sdk/java-sdk-spring/src/test/scala/kalix/javasdk/impl/EvenSourcedEntitiesImplSpec.scala b/sdk/java-sdk-spring/src/test/scala/kalix/javasdk/impl/EvenSourcedEntitiesImplSpec.scala index 48272b1809..947d200799 100644 --- a/sdk/java-sdk-spring/src/test/scala/kalix/javasdk/impl/EvenSourcedEntitiesImplSpec.scala +++ b/sdk/java-sdk-spring/src/test/scala/kalix/javasdk/impl/EvenSourcedEntitiesImplSpec.scala @@ -4,22 +4,26 @@ package kalix.javasdk.impl +import akka.actor.testkit.typed.scaladsl.LoggingTestKit +import akka.actor.typed.scaladsl.adapter.ClassicActorSystemOps import com.google.protobuf.ByteString._ import com.google.protobuf.any.{ Any => ScalaPbAny } +import com.typesafe.config.ConfigFactory import kalix.javasdk.JsonSupport import kalix.javasdk.eventsourced.ReflectiveEventSourcedEntityProvider -import kalix.javasdk.eventsourcedentity.OldTestESEvent.OldEvent1 -import kalix.javasdk.eventsourcedentity.OldTestESEvent.OldEvent2 -import kalix.javasdk.eventsourcedentity.OldTestESEvent.OldEvent3 -import kalix.javasdk.eventsourcedentity.TestESEvent +import kalix.javasdk.eventsourcedentity.OldTestESEvent.{ OldEvent1, OldEvent2, OldEvent3 } import kalix.javasdk.eventsourcedentity.TestESEvent.Event4 -import kalix.javasdk.eventsourcedentity.TestESState -import kalix.javasdk.eventsourcedentity.TestEventSourcedEntity +import kalix.javasdk.eventsourcedentity.{ TestESEvent, TestESState, TestEventSourcedEntity } import kalix.javasdk.impl.eventsourcedentity.TestEventSourcedService +import kalix.javasdk.impl.telemetry.{ Telemetry, TraceInstrumentation } +import kalix.protocol.component.{ Metadata, MetadataEntry } import kalix.testkit.TestProtocol import org.scalatest.BeforeAndAfterAll import org.scalatest.matchers.should.Matchers import org.scalatest.wordspec.AnyWordSpec +import org.slf4j.LoggerFactory + +import scala.concurrent.{ ExecutionContext, Future } class EvenSourcedEntitiesImplSpec extends AnyWordSpec with Matchers with BeforeAndAfterAll { @@ -55,6 +59,38 @@ class EvenSourcedEntitiesImplSpec extends AnyWordSpec with Matchers with BeforeA } } + "inject traces correctly into metadata and keeps trace_id in MDC" in { + val entityId = "1" + val service = new TestEventSourcedService( + ReflectiveEventSourcedEntityProvider + .of[TestESState, TestESEvent, TestEventSourcedEntity]( + classOf[TestEventSourcedEntity], + new JsonMessageCodec(), + _ => new TestEventSourcedEntity()), + Some(ConfigFactory.parseString(s"${TraceInstrumentation.TRACING_ENDPOINT}=\"http://fakeurl:1234\""))) + val protocol = TestProtocol(service.port) + val entity = protocol.eventSourced.connect() + + entity.send(init(classOf[TestEventSourcedEntity].getName, entityId)) + + val traceParent = "00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01" + val metadata = Metadata(Seq(MetadataEntry("traceparent", MetadataEntry.Value.StringValue(traceParent)))) + + val expectedMDC = Map(Telemetry.TRACE_ID -> "4bf92f3577b34da6a3ce929d0e0e4736") + service.expectLogMdc(expectedMDC) { + entity.send(command(1, entityId, "Get", emptySyntheticRequest("Get"), Option(metadata))) + } + + val log = LoggerFactory.getLogger(classOf[ActionsImplSpec]) + LoggingTestKit.empty + .withMdc(Map.empty) + .expect { + Future { + log.info("checking the MDC is empty") + }(ExecutionContext.parasitic) //parasitic to check that in the same thread MDC is cleared + }(service.runner.system.toTyped) + } + private def emptySyntheticRequest(methodName: String) = { ScalaPbAny(s"type.googleapis.com/kalix.javasdk.eventsourcedentity.${methodName}KalixSyntheticRequest", EMPTY) } diff --git a/sdk/java-sdk-spring/src/test/scala/kalix/javasdk/impl/ValueEntitiesImplSpec.scala b/sdk/java-sdk-spring/src/test/scala/kalix/javasdk/impl/ValueEntitiesImplSpec.scala index b05494fbb5..da6f14f86b 100644 --- a/sdk/java-sdk-spring/src/test/scala/kalix/javasdk/impl/ValueEntitiesImplSpec.scala +++ b/sdk/java-sdk-spring/src/test/scala/kalix/javasdk/impl/ValueEntitiesImplSpec.scala @@ -4,20 +4,23 @@ package kalix.javasdk.impl +import akka.actor.testkit.typed.scaladsl.LoggingTestKit +import akka.actor.typed.scaladsl.adapter.ClassicActorSystemOps import com.google.protobuf.ByteString._ import com.google.protobuf.any.{ Any => ScalaPbAny } +import com.typesafe.config.ConfigFactory +import kalix.javasdk.impl.telemetry.{ Telemetry, TraceInstrumentation } import kalix.javasdk.impl.valueentity.TestValueService -import kalix.javasdk.valueentity.ReflectiveValueEntityProvider -import kalix.javasdk.valueentity.TestVEState0 -import kalix.javasdk.valueentity.TestVEState1 -import kalix.javasdk.valueentity.TestVEState2 -import kalix.javasdk.valueentity.TestValueEntity -import kalix.javasdk.valueentity.TestValueEntityMigration +import kalix.javasdk.valueentity._ +import kalix.protocol.component.{ Metadata, MetadataEntry } import kalix.testkit.TestProtocol import kalix.testkit.valueentity.ValueEntityMessages import org.scalatest.BeforeAndAfterAll import org.scalatest.matchers.should.Matchers import org.scalatest.wordspec.AnyWordSpec +import org.slf4j.LoggerFactory + +import scala.concurrent.{ ExecutionContext, Future } class ValueEntitiesImplSpec extends AnyWordSpec with Matchers with BeforeAndAfterAll { import ValueEntityMessages._ @@ -72,6 +75,41 @@ class ValueEntitiesImplSpec extends AnyWordSpec with Matchers with BeforeAndAfte protocol.terminate() service.terminate() } + + "Add the trace_id to the MDC" in { + val entityId = "1" + val jsonMessageCodec = new JsonMessageCodec() + + val service: TestValueService = new TestValueService( + ReflectiveValueEntityProvider + .of[TestVEState1, TestValueEntity](classOf[TestValueEntity], jsonMessageCodec, _ => new TestValueEntity()), + Some(ConfigFactory.parseString(s"${TraceInstrumentation.TRACING_ENDPOINT}=\"http://fakeurl:1234\""))) + val protocol: TestProtocol = TestProtocol(service.port) + val entity = protocol.valueEntity.connect() + //old state + entity.send( + init( + classOf[TestValueEntity].getName, + entityId, + jsonMessageCodec.encodeJava(new TestVEState0("some-state", 1)))) + + val traceParent = "00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01" + val metadata = Metadata(Seq(MetadataEntry("traceparent", MetadataEntry.Value.StringValue(traceParent)))) + + val expectedMDC = Map(Telemetry.TRACE_ID -> "4bf92f3577b34da6a3ce929d0e0e4736") + service.expectLogMdc(expectedMDC) { + entity.send(command(1, entityId, "Get", emptySyntheticRequest("Get"), Option(metadata))) + } + + val log = LoggerFactory.getLogger(classOf[ActionsImplSpec]) + LoggingTestKit.empty + .withMdc(Map.empty) + .expect { + Future { + log.info("checking the MDC is empty") + }(ExecutionContext.parasitic) //parasitic to check that in the same thread MDC is cleared + }(service.runner.system.toTyped) + } } private def emptySyntheticRequest(methodName: String) = {