Skip to content

Commit

Permalink
fix: failing the consumer if publishing without the destination is de…
Browse files Browse the repository at this point in the history
…tected (#147)

* fix: failing the consumer if publishing without the destination is detected

* fixing impl
  • Loading branch information
aludwiko authored Jan 14, 2025
1 parent d71aace commit 19cf516
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 1 deletion.
2 changes: 2 additions & 0 deletions akka-javasdk/src/main/scala/akka/javasdk/impl/SdkRunner.scala
Original file line number Diff line number Diff line change
Expand Up @@ -539,11 +539,13 @@ private final class Sdk(
case clz if classOf[Consumer].isAssignableFrom(clz) =>
val componentId = clz.getAnnotation(classOf[ComponentId]).value
val consumerClass = clz.asInstanceOf[Class[Consumer]]
val consumerDest = consumerDestination(consumerClass)
val consumerSpi =
new ConsumerImpl[Consumer](
componentId,
() => wiredInstance(consumerClass)(sideEffectingComponentInjects(None)),
consumerClass,
consumerDest,
system.classicSystem,
runtimeComponentClients.timerClient,
sdkExecutionContext,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,12 @@
package akka.javasdk.impl.consumer

import java.util.Optional

import scala.concurrent.ExecutionContext
import scala.concurrent.Future
import scala.util.control.NonFatal

import akka.Done
import akka.actor.ActorSystem
import akka.annotation.InternalApi
import akka.javasdk.Metadata
Expand All @@ -31,6 +34,7 @@ import akka.javasdk.impl.telemetry.TraceInstrumentation
import akka.javasdk.impl.timer.TimerSchedulerImpl
import akka.javasdk.timer.TimerScheduler
import akka.runtime.sdk.spi.BytesPayload
import akka.runtime.sdk.spi.ConsumerDestination
import akka.runtime.sdk.spi.SpiConsumer
import akka.runtime.sdk.spi.SpiConsumer.Effect
import akka.runtime.sdk.spi.SpiConsumer.Message
Expand All @@ -48,6 +52,7 @@ private[impl] final class ConsumerImpl[C <: Consumer](
componentId: String,
val factory: () => C,
consumerClass: Class[C],
consumerDestination: Option[ConsumerDestination],
_system: ActorSystem,
timerClient: TimerClient,
sdkExecutionContext: ExecutionContext,
Expand Down Expand Up @@ -96,11 +101,22 @@ private[impl] final class ConsumerImpl[C <: Consumer](

private def toSpiEffect(message: Message, effect: Consumer.Effect): Future[Effect] = {
effect match {
case ProduceEffect(msg, metadata) =>
case ProduceEffect(msg: Done, metadata) =>
Future.successful(
new SpiConsumer.ProduceEffect(
payload = Some(serializer.toBytes(msg)),
metadata = MetadataImpl.toSpi(metadata)))
case ProduceEffect(msg, metadata) =>
if (consumerDestination.isEmpty) {
val baseMsg = s"Consumer [$componentId] produced a message but no destination is defined."
log.error(baseMsg + " Add @Produce annotation or change the Consumer.Effect outcome.")
Future.successful(new SpiConsumer.ErrorEffect(new SpiConsumer.Error(baseMsg)))
} else {
Future.successful(
new SpiConsumer.ProduceEffect(
payload = Some(serializer.toBytes(msg)),
metadata = MetadataImpl.toSpi(metadata)))
}
case AsyncEffect(futureEffect) =>
futureEffect
.flatMap { effect => toSpiEffect(message, effect) }
Expand Down

0 comments on commit 19cf516

Please sign in to comment.