Skip to content

Commit

Permalink
Query plugin paramater in source providers (#684)
Browse files Browse the repository at this point in the history
* power user API when the query plugin instance has custom
  configuration (or programatic setup)
  • Loading branch information
patriknw authored Sep 9, 2022
1 parent a8953b1 commit f404ec5
Show file tree
Hide file tree
Showing 4 changed files with 57 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,14 +36,21 @@ import akka.stream.javadsl.Source
*/
@ApiMayChange
object DurableStateSourceProvider {

def changesByTag[A](
system: ActorSystem[_],
pluginId: String,
tag: String): SourceProvider[Offset, DurableStateChange[A]] = {
val durableStateStoreQuery =
DurableStateStoreRegistry(system)
.getDurableStateStoreFor[DurableStateStoreQuery[A]](classOf[DurableStateStoreQuery[A]], pluginId)
changesByTag(system, durableStateStoreQuery, tag)
}

def changesByTag[A](
system: ActorSystem[_],
durableStateStoreQuery: DurableStateStoreQuery[A],
tag: String): SourceProvider[Offset, DurableStateChange[A]] = {
new DurableStateStoreQuerySourceProvider(durableStateStoreQuery, tag, system)
}

Expand Down Expand Up @@ -83,7 +90,15 @@ object DurableStateSourceProvider {
val durableStateStoreQuery =
DurableStateStoreRegistry(system)
.getDurableStateStoreFor(classOf[DurableStateStoreBySliceQuery[A]], durableStateStoreQueryPluginId)
changesBySlices(system, durableStateStoreQuery, entityType, minSlice, maxSlice)
}

def changesBySlices[A](
system: ActorSystem[_],
durableStateStoreQuery: DurableStateStoreBySliceQuery[A],
entityType: String,
minSlice: Int,
maxSlice: Int): SourceProvider[Offset, DurableStateChange[A]] = {
new DurableStateBySlicesSourceProvider(durableStateStoreQuery, entityType, minSlice, maxSlice, system)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,13 @@ object DurableStateSourceProvider {

val durableStateStoreQuery =
DurableStateStoreRegistry(system).durableStateStoreFor[DurableStateStoreQuery[A]](pluginId)
changesByTag(system, durableStateStoreQuery, tag)
}

def changesByTag[A](
system: ActorSystem[_],
durableStateStoreQuery: DurableStateStoreQuery[A],
tag: String): SourceProvider[Offset, DurableStateChange[A]] = {
new DurableStateStoreQuerySourceProvider(durableStateStoreQuery, tag, system)
}

Expand Down Expand Up @@ -74,7 +80,15 @@ object DurableStateSourceProvider {
val durableStateStoreQuery =
DurableStateStoreRegistry(system)
.durableStateStoreFor[DurableStateStoreBySliceQuery[A]](durableStateStoreQueryPluginId)
changesBySlices(system, durableStateStoreQuery, entityType, minSlice, maxSlice)
}

def changesBySlices[A](
system: ActorSystem[_],
durableStateStoreQuery: DurableStateStoreBySliceQuery[A],
entityType: String,
minSlice: Int,
maxSlice: Int): SourceProvider[Offset, DurableStateChange[A]] = {
new DurableStateBySlicesSourceProvider(durableStateStoreQuery, entityType, minSlice, maxSlice, system)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,15 @@ object EventSourcedProvider {
system: ActorSystem[_],
readJournalPluginId: String,
tag: String): SourceProvider[Offset, EventEnvelope[Event]] = {

val eventsByTagQuery =
PersistenceQuery(system).getReadJournalFor(classOf[EventsByTagQuery], readJournalPluginId)
eventsByTag(system, eventsByTagQuery, tag)
}

def eventsByTag[Event](
system: ActorSystem[_],
eventsByTagQuery: EventsByTagQuery,
tag: String): SourceProvider[Offset, EventEnvelope[Event]] = {
new EventsByTagSourceProvider(system, eventsByTagQuery, tag)
}

Expand Down Expand Up @@ -78,20 +83,17 @@ object EventSourcedProvider {
entityType: String,
minSlice: Int,
maxSlice: Int): SourceProvider[Offset, akka.persistence.query.typed.EventEnvelope[Event]] = {

val eventsBySlicesQuery =
PersistenceQuery(system).getReadJournalFor(classOf[EventsBySliceQuery], readJournalPluginId)
eventsBySlices(system, eventsBySlicesQuery, entityType, minSlice, maxSlice)
}

if (!eventsBySlicesQuery.isInstanceOf[EventTimestampQuery])
throw new IllegalArgumentException(
s"[${eventsBySlicesQuery.getClass.getName}] with readJournalPluginId " +
s"[$readJournalPluginId] must implement [${classOf[EventTimestampQuery].getName}]")

if (!eventsBySlicesQuery.isInstanceOf[LoadEventQuery])
throw new IllegalArgumentException(
s"[${eventsBySlicesQuery.getClass.getName}] with readJournalPluginId " +
s"[$readJournalPluginId] must implement [${classOf[LoadEventQuery].getName}]")

def eventsBySlices[Event](
system: ActorSystem[_],
eventsBySlicesQuery: EventsBySliceQuery,
entityType: String,
minSlice: Int,
maxSlice: Int): SourceProvider[Offset, akka.persistence.query.typed.EventEnvelope[Event]] = {
new EventsBySlicesSourceProvider(eventsBySlicesQuery, entityType, minSlice, maxSlice, system)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,15 @@ object EventSourcedProvider {
system: ActorSystem[_],
readJournalPluginId: String,
tag: String): SourceProvider[Offset, EventEnvelope[Event]] = {

val eventsByTagQuery =
PersistenceQuery(system).readJournalFor[EventsByTagQuery](readJournalPluginId)
eventsByTag(system, eventsByTagQuery, tag)
}

def eventsByTag[Event](
system: ActorSystem[_],
eventsByTagQuery: EventsByTagQuery,
tag: String): SourceProvider[Offset, EventEnvelope[Event]] = {
new EventsByTagSourceProvider(eventsByTagQuery, tag, system)
}

Expand Down Expand Up @@ -67,7 +72,15 @@ object EventSourcedProvider {
maxSlice: Int): SourceProvider[Offset, akka.persistence.query.typed.EventEnvelope[Event]] = {
val eventsBySlicesQuery =
PersistenceQuery(system).readJournalFor[EventsBySliceQuery](readJournalPluginId)
eventsBySlices(system, eventsBySlicesQuery, entityType, minSlice, maxSlice)
}

def eventsBySlices[Event](
system: ActorSystem[_],
eventsBySlicesQuery: EventsBySliceQuery,
entityType: String,
minSlice: Int,
maxSlice: Int): SourceProvider[Offset, akka.persistence.query.typed.EventEnvelope[Event]] = {
new EventsBySlicesSourceProvider(eventsBySlicesQuery, entityType, minSlice, maxSlice, system)
}

Expand Down

0 comments on commit f404ec5

Please sign in to comment.