From 9c6dd361e8b5e3915898c88d3d179b7628386768 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Johan=20Andr=C3=A9n?= Date: Wed, 13 Dec 2023 10:09:08 +0100 Subject: [PATCH] The right adapter in the right place --- .../JavaToScalaSourceProviderAdapter.scala | 23 ++++++++++++++++++ .../ScalaToJavaSourceProviderAdapter.scala | 24 ------------------- 2 files changed, 23 insertions(+), 24 deletions(-) diff --git a/akka-projection-core/src/main/scala/akka/projection/internal/JavaToScalaSourceProviderAdapter.scala b/akka-projection-core/src/main/scala/akka/projection/internal/JavaToScalaSourceProviderAdapter.scala index 15bf8df48..4220130a2 100644 --- a/akka-projection-core/src/main/scala/akka/projection/internal/JavaToScalaSourceProviderAdapter.scala +++ b/akka-projection-core/src/main/scala/akka/projection/internal/JavaToScalaSourceProviderAdapter.scala @@ -22,6 +22,7 @@ import java.util.function.Supplier import scala.compat.java8.FutureConverters._ import scala.compat.java8.OptionConverters._ import scala.concurrent.Future + @InternalApi private[projection] object JavaToScalaBySliceSourceProviderAdapter { def apply[Offset, Envelope]( delegate: javadsl.SourceProvider[Offset, Envelope]): scaladsl.SourceProvider[Offset, Envelope] = @@ -36,6 +37,28 @@ import scala.concurrent.Future } } +/** + * INTERNAL API: Adapter from javadsl.SourceProvider to scaladsl.SourceProvider + */ +private[projection] class JavaToScalaSourceProviderAdapter[Offset, Envelope]( + delegate: javadsl.SourceProvider[Offset, Envelope]) + extends scaladsl.SourceProvider[Offset, Envelope] { + + def source(offset: () => Future[Option[Offset]]): Future[Source[Envelope, NotUsed]] = { + // the parasitic context is used to convert the Optional to Option and a java streams Source to a scala Source, + // it _should_ not be used for the blocking operation of getting offsets themselves + val ec = akka.dispatch.ExecutionContexts.parasitic + val offsetAdapter = new Supplier[CompletionStage[Optional[Offset]]] { + override def get(): CompletionStage[Optional[Offset]] = offset().map(_.asJava)(ec).toJava + } + delegate.source(offsetAdapter).toScala.map(_.asScala)(ec) + } + + def extractOffset(envelope: Envelope): Offset = delegate.extractOffset(envelope) + + def extractCreationTime(envelope: Envelope): Long = delegate.extractCreationTime(envelope) +} + /** * INTERNAL API: Adapter from javadsl.SourceProvider to scaladsl.SourceProvider */ diff --git a/akka-projection-core/src/main/scala/akka/projection/internal/ScalaToJavaSourceProviderAdapter.scala b/akka-projection-core/src/main/scala/akka/projection/internal/ScalaToJavaSourceProviderAdapter.scala index 0f4c27591..c20c4df6d 100644 --- a/akka-projection-core/src/main/scala/akka/projection/internal/ScalaToJavaSourceProviderAdapter.scala +++ b/akka-projection-core/src/main/scala/akka/projection/internal/ScalaToJavaSourceProviderAdapter.scala @@ -16,7 +16,6 @@ import akka.projection.BySlicesSourceProvider import akka.projection.javadsl import akka.projection.scaladsl import akka.stream.javadsl.{ Source => JSource } -import akka.stream.scaladsl.Source import java.time.Instant import java.util.Optional @@ -24,29 +23,6 @@ import java.util.concurrent.CompletionStage import java.util.function.Supplier import scala.compat.java8.FutureConverters._ import scala.compat.java8.OptionConverters._ -import scala.concurrent.Future - -/** - * INTERNAL API: Adapter from javadsl.SourceProvider to scaladsl.SourceProvider - */ -private[projection] class JavaToScalaSourceProviderAdapter[Offset, Envelope]( - delegate: javadsl.SourceProvider[Offset, Envelope]) - extends scaladsl.SourceProvider[Offset, Envelope] { - - def source(offset: () => Future[Option[Offset]]): Future[Source[Envelope, NotUsed]] = { - // the parasitic context is used to convert the Optional to Option and a java streams Source to a scala Source, - // it _should_ not be used for the blocking operation of getting offsets themselves - val ec = akka.dispatch.ExecutionContexts.parasitic - val offsetAdapter = new Supplier[CompletionStage[Optional[Offset]]] { - override def get(): CompletionStage[Optional[Offset]] = offset().map(_.asJava)(ec).toJava - } - delegate.source(offsetAdapter).toScala.map(_.asScala)(ec) - } - - def extractOffset(envelope: Envelope): Offset = delegate.extractOffset(envelope) - - def extractCreationTime(envelope: Envelope): Long = delegate.extractCreationTime(envelope) -} /** * INTERNAL API: Adapter from scaladsl.SourceProvider with BySlicesSourceProvider to javadsl.SourceProvider with BySlicesSourceProvider