diff --git a/akka-javasdk/src/main/scala/akka/javasdk/impl/reflection/Reflect.scala b/akka-javasdk/src/main/scala/akka/javasdk/impl/reflection/Reflect.scala index e8220e081..c3eec0dd2 100644 --- a/akka-javasdk/src/main/scala/akka/javasdk/impl/reflection/Reflect.scala +++ b/akka-javasdk/src/main/scala/akka/javasdk/impl/reflection/Reflect.scala @@ -160,6 +160,27 @@ private[impl] object Reflect { loop(workflow.getClass).asInstanceOf[Class[S]] } + def tableUpdaterRowType(tableUpdater: Class[_]): Class[_] = { + @tailrec + def loop(current: Class[_]): Class[_] = + if (current == classOf[AnyRef]) + // recursed to root without finding type param + throw new IllegalArgumentException(s"Cannot find table updater class for ${tableUpdater.getClass}") + else { + current.getGenericSuperclass match { + case parameterizedType: ParameterizedType => + if (parameterizedType.getActualTypeArguments.size == 1) + parameterizedType.getActualTypeArguments.head.asInstanceOf[Class[_]] + else throw new IllegalArgumentException(s"Cannot find table updater class for ${tableUpdater.getClass}") + case noTypeParamsParent: Class[_] => + // recurse and look at parent + loop(noTypeParamsParent) + } + } + + loop(tableUpdater) + } + def allKnownEventSourcedEntityEventType(component: Class[_]): Seq[Class[_]] = { val eventType = eventSourcedEntityEventType(component) eventType.getPermittedSubclasses.toSeq diff --git a/akka-javasdk/src/main/scala/akka/javasdk/impl/view/ViewDescriptorFactory.scala b/akka-javasdk/src/main/scala/akka/javasdk/impl/view/ViewDescriptorFactory.scala index 60f445005..43d246e2d 100644 --- a/akka-javasdk/src/main/scala/akka/javasdk/impl/view/ViewDescriptorFactory.scala +++ b/akka-javasdk/src/main/scala/akka/javasdk/impl/view/ViewDescriptorFactory.scala @@ -65,16 +65,7 @@ private[impl] object ViewDescriptorFactory { tableUpdaters .map { tableUpdaterClass => // View class type parameter declares table type - val tableRowClass: Class[_] = - tableUpdaterClass.getGenericSuperclass - .asInstanceOf[ParameterizedType] - .getActualTypeArguments - .head match { - case clazz: Class[_] => clazz - case other => - throw new IllegalArgumentException( - s"Expected [$tableUpdaterClass] to extends TableUpdater[] for a concrete table row type but cannot figure out type parameter because type argument is unexpected [$other] ") - } + val tableRowClass: Class[_] = Reflect.tableUpdaterRowType(tableUpdaterClass) val tableName: String = { if (tableUpdaters.size > 1) { @@ -371,14 +362,15 @@ private[impl] object ViewDescriptorFactory { deleteHandler: Boolean = false)(implicit userEc: ExecutionContext) extends SpiTableUpdateHandler { + private val tableUpdaterRowClass: Class[_] = Reflect.tableUpdaterRowType(tableUpdaterClass) + private val userLog = LoggerFactory.getLogger(tableUpdaterClass) private val methodsByInput: Map[Class[_], Method] = if (deleteHandler) Map.empty else methods.map { m => - // FIXME not entirely sure this is right - // register each possible input + // register each possible input to deserialize correctly an input val inputType = m.getParameterTypes.head serializer.registerTypeHints(m.getParameterTypes.head) @@ -391,7 +383,8 @@ private[impl] object ViewDescriptorFactory { } override def handle(input: SpiTableUpdateEnvelope): Future[SpiTableUpdateEffect] = Future { - val existingState: Option[AnyRef] = input.existingTableRow.map(serializer.fromBytes) + val existingState: Option[AnyRef] = + input.existingTableRow.map(bytes => serializer.fromBytes(tableUpdaterRowClass, bytes).asInstanceOf[AnyRef]) val metadata = MetadataImpl.of(input.metadata) val addedToMDC = metadata.traceId match { case Some(traceId) =>