diff --git a/akka-projection-durable-state/src/main/scala/akka/projection/state/javadsl/DurableStateSourceProvider.scala b/akka-projection-durable-state/src/main/scala/akka/projection/state/javadsl/DurableStateSourceProvider.scala index 4cb322f53..e8f4c83ed 100644 --- a/akka-projection-durable-state/src/main/scala/akka/projection/state/javadsl/DurableStateSourceProvider.scala +++ b/akka-projection-durable-state/src/main/scala/akka/projection/state/javadsl/DurableStateSourceProvider.scala @@ -36,6 +36,7 @@ import akka.stream.javadsl.Source */ @ApiMayChange object DurableStateSourceProvider { + def changesByTag[A]( system: ActorSystem[_], pluginId: String, @@ -43,7 +44,13 @@ object DurableStateSourceProvider { val durableStateStoreQuery = DurableStateStoreRegistry(system) .getDurableStateStoreFor[DurableStateStoreQuery[A]](classOf[DurableStateStoreQuery[A]], pluginId) + changesByTag(system, durableStateStoreQuery, tag) + } + def changesByTag[A]( + system: ActorSystem[_], + durableStateStoreQuery: DurableStateStoreQuery[A], + tag: String): SourceProvider[Offset, DurableStateChange[A]] = { new DurableStateStoreQuerySourceProvider(durableStateStoreQuery, tag, system) } @@ -83,7 +90,15 @@ object DurableStateSourceProvider { val durableStateStoreQuery = DurableStateStoreRegistry(system) .getDurableStateStoreFor(classOf[DurableStateStoreBySliceQuery[A]], durableStateStoreQueryPluginId) + changesBySlices(system, durableStateStoreQuery, entityType, minSlice, maxSlice) + } + def changesBySlices[A]( + system: ActorSystem[_], + durableStateStoreQuery: DurableStateStoreBySliceQuery[A], + entityType: String, + minSlice: Int, + maxSlice: Int): SourceProvider[Offset, DurableStateChange[A]] = { new DurableStateBySlicesSourceProvider(durableStateStoreQuery, entityType, minSlice, maxSlice, system) } diff --git a/akka-projection-durable-state/src/main/scala/akka/projection/state/scaladsl/DurableStateSourceProvider.scala b/akka-projection-durable-state/src/main/scala/akka/projection/state/scaladsl/DurableStateSourceProvider.scala index 771a8ad57..0329dbe3f 100644 --- a/akka-projection-durable-state/src/main/scala/akka/projection/state/scaladsl/DurableStateSourceProvider.scala +++ b/akka-projection-durable-state/src/main/scala/akka/projection/state/scaladsl/DurableStateSourceProvider.scala @@ -37,7 +37,13 @@ object DurableStateSourceProvider { val durableStateStoreQuery = DurableStateStoreRegistry(system).durableStateStoreFor[DurableStateStoreQuery[A]](pluginId) + changesByTag(system, durableStateStoreQuery, tag) + } + def changesByTag[A]( + system: ActorSystem[_], + durableStateStoreQuery: DurableStateStoreQuery[A], + tag: String): SourceProvider[Offset, DurableStateChange[A]] = { new DurableStateStoreQuerySourceProvider(durableStateStoreQuery, tag, system) } @@ -74,7 +80,15 @@ object DurableStateSourceProvider { val durableStateStoreQuery = DurableStateStoreRegistry(system) .durableStateStoreFor[DurableStateStoreBySliceQuery[A]](durableStateStoreQueryPluginId) + changesBySlices(system, durableStateStoreQuery, entityType, minSlice, maxSlice) + } + def changesBySlices[A]( + system: ActorSystem[_], + durableStateStoreQuery: DurableStateStoreBySliceQuery[A], + entityType: String, + minSlice: Int, + maxSlice: Int): SourceProvider[Offset, DurableStateChange[A]] = { new DurableStateBySlicesSourceProvider(durableStateStoreQuery, entityType, minSlice, maxSlice, system) } diff --git a/akka-projection-eventsourced/src/main/scala/akka/projection/eventsourced/javadsl/EventSourcedProvider.scala b/akka-projection-eventsourced/src/main/scala/akka/projection/eventsourced/javadsl/EventSourcedProvider.scala index 3938e9015..1f21d3a86 100644 --- a/akka-projection-eventsourced/src/main/scala/akka/projection/eventsourced/javadsl/EventSourcedProvider.scala +++ b/akka-projection-eventsourced/src/main/scala/akka/projection/eventsourced/javadsl/EventSourcedProvider.scala @@ -39,10 +39,15 @@ object EventSourcedProvider { system: ActorSystem[_], readJournalPluginId: String, tag: String): SourceProvider[Offset, EventEnvelope[Event]] = { - val eventsByTagQuery = PersistenceQuery(system).getReadJournalFor(classOf[EventsByTagQuery], readJournalPluginId) + eventsByTag(system, eventsByTagQuery, tag) + } + def eventsByTag[Event]( + system: ActorSystem[_], + eventsByTagQuery: EventsByTagQuery, + tag: String): SourceProvider[Offset, EventEnvelope[Event]] = { new EventsByTagSourceProvider(system, eventsByTagQuery, tag) } @@ -78,20 +83,17 @@ object EventSourcedProvider { entityType: String, minSlice: Int, maxSlice: Int): SourceProvider[Offset, akka.persistence.query.typed.EventEnvelope[Event]] = { - val eventsBySlicesQuery = PersistenceQuery(system).getReadJournalFor(classOf[EventsBySliceQuery], readJournalPluginId) + eventsBySlices(system, eventsBySlicesQuery, entityType, minSlice, maxSlice) + } - if (!eventsBySlicesQuery.isInstanceOf[EventTimestampQuery]) - throw new IllegalArgumentException( - s"[${eventsBySlicesQuery.getClass.getName}] with readJournalPluginId " + - s"[$readJournalPluginId] must implement [${classOf[EventTimestampQuery].getName}]") - - if (!eventsBySlicesQuery.isInstanceOf[LoadEventQuery]) - throw new IllegalArgumentException( - s"[${eventsBySlicesQuery.getClass.getName}] with readJournalPluginId " + - s"[$readJournalPluginId] must implement [${classOf[LoadEventQuery].getName}]") - + def eventsBySlices[Event]( + system: ActorSystem[_], + eventsBySlicesQuery: EventsBySliceQuery, + entityType: String, + minSlice: Int, + maxSlice: Int): SourceProvider[Offset, akka.persistence.query.typed.EventEnvelope[Event]] = { new EventsBySlicesSourceProvider(eventsBySlicesQuery, entityType, minSlice, maxSlice, system) } diff --git a/akka-projection-eventsourced/src/main/scala/akka/projection/eventsourced/scaladsl/EventSourcedProvider.scala b/akka-projection-eventsourced/src/main/scala/akka/projection/eventsourced/scaladsl/EventSourcedProvider.scala index baa1760f3..a110a0fd6 100644 --- a/akka-projection-eventsourced/src/main/scala/akka/projection/eventsourced/scaladsl/EventSourcedProvider.scala +++ b/akka-projection-eventsourced/src/main/scala/akka/projection/eventsourced/scaladsl/EventSourcedProvider.scala @@ -32,10 +32,15 @@ object EventSourcedProvider { system: ActorSystem[_], readJournalPluginId: String, tag: String): SourceProvider[Offset, EventEnvelope[Event]] = { - val eventsByTagQuery = PersistenceQuery(system).readJournalFor[EventsByTagQuery](readJournalPluginId) + eventsByTag(system, eventsByTagQuery, tag) + } + def eventsByTag[Event]( + system: ActorSystem[_], + eventsByTagQuery: EventsByTagQuery, + tag: String): SourceProvider[Offset, EventEnvelope[Event]] = { new EventsByTagSourceProvider(eventsByTagQuery, tag, system) } @@ -67,7 +72,15 @@ object EventSourcedProvider { maxSlice: Int): SourceProvider[Offset, akka.persistence.query.typed.EventEnvelope[Event]] = { val eventsBySlicesQuery = PersistenceQuery(system).readJournalFor[EventsBySliceQuery](readJournalPluginId) + eventsBySlices(system, eventsBySlicesQuery, entityType, minSlice, maxSlice) + } + def eventsBySlices[Event]( + system: ActorSystem[_], + eventsBySlicesQuery: EventsBySliceQuery, + entityType: String, + minSlice: Int, + maxSlice: Int): SourceProvider[Offset, akka.persistence.query.typed.EventEnvelope[Event]] = { new EventsBySlicesSourceProvider(eventsBySlicesQuery, entityType, minSlice, maxSlice, system) }