Skip to content

Commit

Permalink
fix: handle adapted java source providers for replay
Browse files Browse the repository at this point in the history
  • Loading branch information
pvlugter committed Nov 30, 2024
1 parent 34b7bee commit b93367d
Showing 1 changed file with 26 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ import akka.projection.internal.GroupedHandlerStrategy
import akka.projection.internal.HandlerStrategy
import akka.projection.internal.InternalProjection
import akka.projection.internal.InternalProjectionState
import akka.projection.internal.JavaToScalaBySliceSourceProviderAdapter
import akka.projection.internal.ManagementState
import akka.projection.internal.OffsetStoredByHandler
import akka.projection.internal.OffsetStrategy
Expand Down Expand Up @@ -218,7 +219,11 @@ private[projection] object DynamoDBProjectionImpl {
val logPrefix = offsetStore.logPrefix
originalEnvelope match {
case originalEventEnvelope: EventEnvelope[Any @unchecked] if originalEventEnvelope.sequenceNr > 1 =>
sourceProvider match {
val underlyingProvider = sourceProvider match {
case adapted: JavaToScalaBySliceSourceProviderAdapter[_, _] => adapted.delegate
case provider => provider
}
underlyingProvider match {
case provider: LoadEventsByPersistenceIdSourceProvider[Any @unchecked]
if offsetStore.settings.replayOnRejectedSequenceNumbers =>
val persistenceId = originalEventEnvelope.persistenceId
Expand Down Expand Up @@ -329,7 +334,11 @@ private[projection] object DynamoDBProjectionImpl {
val logPrefix = offsetStore.logPrefix
originalEnvelope match {
case originalEventEnvelope: EventEnvelope[Any @unchecked] if originalEventEnvelope.sequenceNr > 1 =>
sourceProvider match {
val underlyingProvider = sourceProvider match {
case adapted: JavaToScalaBySliceSourceProviderAdapter[_, _] => adapted.delegate
case provider => provider
}
underlyingProvider match {
case provider: LoadEventsByPersistenceIdSourceProvider[Any @unchecked]
if offsetStore.settings.replayOnRejectedSequenceNumbers =>
val persistenceId = originalEventEnvelope.persistenceId
Expand Down Expand Up @@ -478,7 +487,11 @@ private[projection] object DynamoDBProjectionImpl {
val logPrefix = offsetStore.logPrefix
originalEnvelope match {
case originalEventEnvelope: EventEnvelope[Any @unchecked] if originalEventEnvelope.sequenceNr > 1 =>
sourceProvider match {
val underlyingProvider = sourceProvider match {
case adapted: JavaToScalaBySliceSourceProviderAdapter[_, _] => adapted.delegate
case provider => provider
}
underlyingProvider match {
case provider: LoadEventsByPersistenceIdSourceProvider[Any @unchecked]
if offsetStore.settings.replayOnRejectedSequenceNumbers =>
val persistenceId = originalEventEnvelope.persistenceId
Expand Down Expand Up @@ -631,7 +644,11 @@ private[projection] object DynamoDBProjectionImpl {
val logPrefix = offsetStore.logPrefix
originalEnvelope match {
case originalEventEnvelope: EventEnvelope[Any @unchecked] if originalEventEnvelope.sequenceNr > 1 =>
sourceProvider match {
val underlyingProvider = sourceProvider match {
case adapted: JavaToScalaBySliceSourceProviderAdapter[_, _] => adapted.delegate
case provider => provider
}
underlyingProvider match {
case provider: LoadEventsByPersistenceIdSourceProvider[Any @unchecked]
if offsetStore.settings.replayOnRejectedSequenceNumbers =>
val persistenceId = originalEventEnvelope.persistenceId
Expand Down Expand Up @@ -713,7 +730,11 @@ private[projection] object DynamoDBProjectionImpl {
val logPrefix = offsetStore.logPrefix
originalEnvelope match {
case originalEventEnvelope: EventEnvelope[Any @unchecked] if originalEventEnvelope.sequenceNr > 1 =>
sourceProvider match {
val underlyingProvider = sourceProvider match {
case adapted: JavaToScalaBySliceSourceProviderAdapter[_, _] => adapted.delegate
case provider => provider
}
underlyingProvider match {
case provider: LoadEventsByPersistenceIdSourceProvider[Any @unchecked]
if offsetStore.settings.replayOnRejectedSequenceNumbers =>
val persistenceId = originalEventEnvelope.persistenceId
Expand Down

0 comments on commit b93367d

Please sign in to comment.