-
Notifications
You must be signed in to change notification settings - Fork 5
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix: failing the consumer if publishing without the destination is detected #147
Conversation
if (consumerDestination.isEmpty) { | ||
val baseMsg = s"Consumer [$componentId] produced a message but no destination is defined." | ||
log.error(baseMsg + " Add @Produce annotation or change the Consumer.Effect outcome.") | ||
Future.successful(new SpiConsumer.ErrorEffect(new SpiConsumer.Error(baseMsg))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
initially, I just wanted to log warn/error, but then the consumer will move forward, which is probably not what the user wants when producing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Failing/holding off sounds good, this would be a code error so needs a new version of the user service deployed.
@@ -96,11 +101,22 @@ private[impl] final class ConsumerImpl[C <: Consumer]( | |||
|
|||
private def toSpiEffect(message: Message, effect: Consumer.Effect): Future[Effect] = { | |||
effect match { | |||
case ProduceEffect(msg, metadata) => | |||
case ProduceEffect(msg: Done, metadata) => | |||
Future.successful( | |||
new SpiConsumer.ProduceEffect( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It looks like we are missing one more spi effect abstraction, except ProduceEffect, ErrorEffect, and IgnoreEffect.
Sth like: ConsumedEffect
or ProcessedEffect
to signal that we successfully consumed the message, but we don't want to publish anything.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Aha, as in produce for some messages but not all? Would possibly need some more changes in eventing/message publishing in the runtime than just being about the consumer.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Aha, as in produce for some messages but not all?
- this is done with ProduceEffect + IgnoreEffect, but when I just consume sth without producing, I still have to return either ProduceEffect(Done) or IgnoreEffect, which semantically looks strange. It is hidden from the user-facing API, but visible in the SPI API.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
continuation of the discussion: https://github.com/lightbend/kalix-runtime/issues/3331
if (consumerDestination.isEmpty) { | ||
val baseMsg = s"Consumer [$componentId] produced a message but no destination is defined." | ||
log.error(baseMsg + " Add @Produce annotation or change the Consumer.Effect outcome.") | ||
Future.successful(new SpiConsumer.ErrorEffect(new SpiConsumer.Error(baseMsg))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Failing/holding off sounds good, this would be a code error so needs a new version of the user service deployed.
No description provided.