Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: failing the consumer if publishing without the destination is detected #147

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions akka-javasdk/src/main/scala/akka/javasdk/impl/SdkRunner.scala
Original file line number Diff line number Diff line change
Expand Up @@ -539,11 +539,13 @@ private final class Sdk(
case clz if classOf[Consumer].isAssignableFrom(clz) =>
val componentId = clz.getAnnotation(classOf[ComponentId]).value
val consumerClass = clz.asInstanceOf[Class[Consumer]]
val consumerDest = consumerDestination(consumerClass)
val consumerSpi =
new ConsumerImpl[Consumer](
componentId,
() => wiredInstance(consumerClass)(sideEffectingComponentInjects(None)),
consumerClass,
consumerDest,
system.classicSystem,
runtimeComponentClients.timerClient,
sdkExecutionContext,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,12 @@
package akka.javasdk.impl.consumer

import java.util.Optional

import scala.concurrent.ExecutionContext
import scala.concurrent.Future
import scala.util.control.NonFatal

import akka.Done
import akka.actor.ActorSystem
import akka.annotation.InternalApi
import akka.javasdk.Metadata
Expand All @@ -31,6 +34,7 @@ import akka.javasdk.impl.telemetry.TraceInstrumentation
import akka.javasdk.impl.timer.TimerSchedulerImpl
import akka.javasdk.timer.TimerScheduler
import akka.runtime.sdk.spi.BytesPayload
import akka.runtime.sdk.spi.ConsumerDestination
import akka.runtime.sdk.spi.SpiConsumer
import akka.runtime.sdk.spi.SpiConsumer.Effect
import akka.runtime.sdk.spi.SpiConsumer.Message
Expand All @@ -48,6 +52,7 @@ private[impl] final class ConsumerImpl[C <: Consumer](
componentId: String,
val factory: () => C,
consumerClass: Class[C],
consumerDestination: Option[ConsumerDestination],
_system: ActorSystem,
timerClient: TimerClient,
sdkExecutionContext: ExecutionContext,
Expand Down Expand Up @@ -96,11 +101,22 @@ private[impl] final class ConsumerImpl[C <: Consumer](

private def toSpiEffect(message: Message, effect: Consumer.Effect): Future[Effect] = {
effect match {
case ProduceEffect(msg, metadata) =>
case ProduceEffect(msg: Done, metadata) =>
Future.successful(
new SpiConsumer.ProduceEffect(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like we are missing one more spi effect abstraction, except ProduceEffect, ErrorEffect, and IgnoreEffect.
Sth like: ConsumedEffect or ProcessedEffect to signal that we successfully consumed the message, but we don't want to publish anything.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Aha, as in produce for some messages but not all? Would possibly need some more changes in eventing/message publishing in the runtime than just being about the consumer.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Aha, as in produce for some messages but not all? - this is done with ProduceEffect + IgnoreEffect, but when I just consume sth without producing, I still have to return either ProduceEffect(Done) or IgnoreEffect, which semantically looks strange. It is hidden from the user-facing API, but visible in the SPI API.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

payload = Some(serializer.toBytes(msg)),
metadata = MetadataImpl.toSpi(metadata)))
case ProduceEffect(msg, metadata) =>
if (consumerDestination.isEmpty) {
val baseMsg = s"Consumer [$componentId] produced a message but no destination is defined."
log.error(baseMsg + " Add @Produce annotation or change the Consumer.Effect outcome.")
Future.successful(new SpiConsumer.ErrorEffect(new SpiConsumer.Error(baseMsg)))
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

initially, I just wanted to log warn/error, but then the consumer will move forward, which is probably not what the user wants when producing.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Failing/holding off sounds good, this would be a code error so needs a new version of the user service deployed.

} else {
Future.successful(
new SpiConsumer.ProduceEffect(
payload = Some(serializer.toBytes(msg)),
metadata = MetadataImpl.toSpi(metadata)))
}
case AsyncEffect(futureEffect) =>
futureEffect
.flatMap { effect => toSpiEffect(message, effect) }
Expand Down
Loading