Skip to content

Commit

Permalink
fix: deserialize table updater row state
Browse files Browse the repository at this point in the history
  • Loading branch information
aludwiko committed Jan 10, 2025
1 parent 7321a65 commit 06b340e
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,27 @@ private[impl] object Reflect {
loop(workflow.getClass).asInstanceOf[Class[S]]
}

def tableUpdaterRowType(tableUpdater: Class[_]): Class[_] = {
@tailrec
def loop(current: Class[_]): Class[_] =
if (current == classOf[AnyRef])
// recursed to root without finding type param
throw new IllegalArgumentException(s"Cannot find table updater class for ${tableUpdater.getClass}")
else {
current.getGenericSuperclass match {
case parameterizedType: ParameterizedType =>
if (parameterizedType.getActualTypeArguments.size == 1)
parameterizedType.getActualTypeArguments.head.asInstanceOf[Class[_]]
else throw new IllegalArgumentException(s"Cannot find table updater class for ${tableUpdater.getClass}")
case noTypeParamsParent: Class[_] =>
// recurse and look at parent
loop(noTypeParamsParent)
}
}

loop(tableUpdater)
}

def allKnownEventSourcedEntityEventType(component: Class[_]): Seq[Class[_]] = {
val eventType = eventSourcedEntityEventType(component)
eventType.getPermittedSubclasses.toSeq
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -371,14 +371,15 @@ private[impl] object ViewDescriptorFactory {
deleteHandler: Boolean = false)(implicit userEc: ExecutionContext)
extends SpiTableUpdateHandler {

private val tableUpdaterRowClass: Class[_] = Reflect.tableUpdaterRowType(tableUpdaterClass)

private val userLog = LoggerFactory.getLogger(tableUpdaterClass)

private val methodsByInput: Map[Class[_], Method] =
if (deleteHandler) Map.empty
else
methods.map { m =>
// FIXME not entirely sure this is right
// register each possible input
// register each possible input to deserialize correctly an input
val inputType = m.getParameterTypes.head
serializer.registerTypeHints(m.getParameterTypes.head)

Expand All @@ -391,7 +392,8 @@ private[impl] object ViewDescriptorFactory {
}

override def handle(input: SpiTableUpdateEnvelope): Future[SpiTableUpdateEffect] = Future {
val existingState: Option[AnyRef] = input.existingTableRow.map(serializer.fromBytes)
val existingState: Option[AnyRef] =
input.existingTableRow.map(bytes => serializer.fromBytes(tableUpdaterRowClass, bytes).asInstanceOf[AnyRef])
val metadata = MetadataImpl.of(input.metadata)
val addedToMDC = metadata.traceId match {
case Some(traceId) =>
Expand Down

0 comments on commit 06b340e

Please sign in to comment.