Skip to content

Commit

Permalink
The right adapter in the right place
Browse files Browse the repository at this point in the history
  • Loading branch information
johanandren committed Dec 13, 2023
1 parent 14b8220 commit 9c6dd36
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import java.util.function.Supplier
import scala.compat.java8.FutureConverters._
import scala.compat.java8.OptionConverters._
import scala.concurrent.Future

@InternalApi private[projection] object JavaToScalaBySliceSourceProviderAdapter {
def apply[Offset, Envelope](
delegate: javadsl.SourceProvider[Offset, Envelope]): scaladsl.SourceProvider[Offset, Envelope] =
Expand All @@ -36,6 +37,28 @@ import scala.concurrent.Future
}
}

/**
* INTERNAL API: Adapter from javadsl.SourceProvider to scaladsl.SourceProvider
*/
private[projection] class JavaToScalaSourceProviderAdapter[Offset, Envelope](
delegate: javadsl.SourceProvider[Offset, Envelope])
extends scaladsl.SourceProvider[Offset, Envelope] {

def source(offset: () => Future[Option[Offset]]): Future[Source[Envelope, NotUsed]] = {
// the parasitic context is used to convert the Optional to Option and a java streams Source to a scala Source,
// it _should_ not be used for the blocking operation of getting offsets themselves
val ec = akka.dispatch.ExecutionContexts.parasitic
val offsetAdapter = new Supplier[CompletionStage[Optional[Offset]]] {
override def get(): CompletionStage[Optional[Offset]] = offset().map(_.asJava)(ec).toJava
}
delegate.source(offsetAdapter).toScala.map(_.asScala)(ec)
}

def extractOffset(envelope: Envelope): Offset = delegate.extractOffset(envelope)

def extractCreationTime(envelope: Envelope): Long = delegate.extractCreationTime(envelope)
}

/**
* INTERNAL API: Adapter from javadsl.SourceProvider to scaladsl.SourceProvider
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,37 +16,13 @@ import akka.projection.BySlicesSourceProvider
import akka.projection.javadsl
import akka.projection.scaladsl
import akka.stream.javadsl.{ Source => JSource }
import akka.stream.scaladsl.Source

import java.time.Instant
import java.util.Optional
import java.util.concurrent.CompletionStage
import java.util.function.Supplier
import scala.compat.java8.FutureConverters._
import scala.compat.java8.OptionConverters._
import scala.concurrent.Future

/**
* INTERNAL API: Adapter from javadsl.SourceProvider to scaladsl.SourceProvider
*/
private[projection] class JavaToScalaSourceProviderAdapter[Offset, Envelope](
delegate: javadsl.SourceProvider[Offset, Envelope])
extends scaladsl.SourceProvider[Offset, Envelope] {

def source(offset: () => Future[Option[Offset]]): Future[Source[Envelope, NotUsed]] = {
// the parasitic context is used to convert the Optional to Option and a java streams Source to a scala Source,
// it _should_ not be used for the blocking operation of getting offsets themselves
val ec = akka.dispatch.ExecutionContexts.parasitic
val offsetAdapter = new Supplier[CompletionStage[Optional[Offset]]] {
override def get(): CompletionStage[Optional[Offset]] = offset().map(_.asJava)(ec).toJava
}
delegate.source(offsetAdapter).toScala.map(_.asScala)(ec)
}

def extractOffset(envelope: Envelope): Offset = delegate.extractOffset(envelope)

def extractCreationTime(envelope: Envelope): Long = delegate.extractCreationTime(envelope)
}

/**
* INTERNAL API: Adapter from scaladsl.SourceProvider with BySlicesSourceProvider to javadsl.SourceProvider with BySlicesSourceProvider
Expand Down

0 comments on commit 9c6dd36

Please sign in to comment.