Skip to content

Commit

Permalink
fixing impl
Browse files Browse the repository at this point in the history
  • Loading branch information
aludwiko committed Jan 14, 2025
1 parent 10ff9a0 commit 6941a96
Showing 1 changed file with 6 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import scala.concurrent.ExecutionContext
import scala.concurrent.Future
import scala.util.control.NonFatal

import akka.Done
import akka.actor.ActorSystem
import akka.annotation.InternalApi
import akka.javasdk.Metadata
Expand Down Expand Up @@ -100,6 +101,11 @@ private[impl] final class ConsumerImpl[C <: Consumer](

private def toSpiEffect(message: Message, effect: Consumer.Effect): Future[Effect] = {
effect match {
case ProduceEffect(msg: Done, metadata) =>
Future.successful(
new SpiConsumer.ProduceEffect(
payload = Some(serializer.toBytes(msg)),
metadata = MetadataImpl.toSpi(metadata)))
case ProduceEffect(msg, metadata) =>
if (consumerDestination.isEmpty) {
val baseMsg = s"Consumer [$componentId] produced a message but no destination is defined."
Expand Down

0 comments on commit 6941a96

Please sign in to comment.