Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: failing the consumer if publishing without the destination is detected #147

Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions akka-javasdk/src/main/scala/akka/javasdk/impl/SdkRunner.scala
Original file line number Diff line number Diff line change
Expand Up @@ -539,11 +539,13 @@ private final class Sdk(
case clz if classOf[Consumer].isAssignableFrom(clz) =>
val componentId = clz.getAnnotation(classOf[ComponentId]).value
val consumerClass = clz.asInstanceOf[Class[Consumer]]
val consumerDest = consumerDestination(consumerClass)
val consumerSpi =
new ConsumerImpl[Consumer](
componentId,
() => wiredInstance(consumerClass)(sideEffectingComponentInjects(None)),
consumerClass,
consumerDest,
system.classicSystem,
runtimeComponentClients.timerClient,
sdkExecutionContext,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,11 @@
package akka.javasdk.impl.consumer

import java.util.Optional

import scala.concurrent.ExecutionContext
import scala.concurrent.Future
import scala.util.control.NonFatal

import akka.actor.ActorSystem
import akka.annotation.InternalApi
import akka.javasdk.Metadata
Expand All @@ -31,6 +33,7 @@ import akka.javasdk.impl.telemetry.TraceInstrumentation
import akka.javasdk.impl.timer.TimerSchedulerImpl
import akka.javasdk.timer.TimerScheduler
import akka.runtime.sdk.spi.BytesPayload
import akka.runtime.sdk.spi.ConsumerDestination
import akka.runtime.sdk.spi.SpiConsumer
import akka.runtime.sdk.spi.SpiConsumer.Effect
import akka.runtime.sdk.spi.SpiConsumer.Message
Expand All @@ -48,6 +51,7 @@ private[impl] final class ConsumerImpl[C <: Consumer](
componentId: String,
val factory: () => C,
consumerClass: Class[C],
consumerDestination: Option[ConsumerDestination],
_system: ActorSystem,
timerClient: TimerClient,
sdkExecutionContext: ExecutionContext,
Expand Down Expand Up @@ -97,10 +101,16 @@ private[impl] final class ConsumerImpl[C <: Consumer](
private def toSpiEffect(message: Message, effect: Consumer.Effect): Future[Effect] = {
effect match {
case ProduceEffect(msg, metadata) =>
Future.successful(
new SpiConsumer.ProduceEffect(
payload = Some(serializer.toBytes(msg)),
metadata = MetadataImpl.toSpi(metadata)))
if (consumerDestination.isEmpty) {
val baseMsg = s"Consumer [$componentId] produced a message but no destination is defined."
log.error(baseMsg + " Add @Produce annotation or change the Consumer.Effect outcome.")
Future.successful(new SpiConsumer.ErrorEffect(new SpiConsumer.Error(baseMsg)))
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

initially, I just wanted to log warn/error, but then the consumer will move forward, which is probably not what the user wants when producing.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Failing/holding off sounds good, this would be a code error so needs a new version of the user service deployed.

} else {
Future.successful(
new SpiConsumer.ProduceEffect(
payload = Some(serializer.toBytes(msg)),
metadata = MetadataImpl.toSpi(metadata)))
}
case AsyncEffect(futureEffect) =>
futureEffect
.flatMap { effect => toSpiEffect(message, effect) }
Expand Down
Loading