Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: Update views to use new SPI type structure #126

Merged
merged 3 commits into from
Jan 9, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion akka-javasdk-maven/akka-javasdk-parent/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@

<!-- These are dependent on runtime environment and cannot be customized by users -->
<maven.compiler.release>21</maven.compiler.release>
<kalix-runtime.version>1.3.0-cb36dd2e</kalix-runtime.version>
<kalix-runtime.version>1.3.0-645b7b0</kalix-runtime.version>

<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<skip.docker>false</skip.docker>
Expand Down
20 changes: 15 additions & 5 deletions akka-javasdk/src/main/scala/akka/javasdk/impl/SdkRunner.scala
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ import akka.runtime.sdk.spi.SpiWorkflow
import akka.runtime.sdk.spi.StartContext
import akka.runtime.sdk.spi.TimedActionDescriptor
import akka.runtime.sdk.spi.UserFunctionError
import akka.runtime.sdk.spi.views.SpiViewDescriptor
import akka.runtime.sdk.spi.ViewDescriptor
import akka.runtime.sdk.spi.WorkflowDescriptor
import akka.stream.Materializer
import com.typesafe.config.Config
Expand Down Expand Up @@ -450,7 +450,12 @@ private final class Sdk(
})
}
eventSourcedEntityDescriptors :+=
new EventSourcedEntityDescriptor(componentId, clz.getName, readOnlyCommandNames, instanceFactory)
new EventSourcedEntityDescriptor(
componentId,
clz.getName,
readOnlyCommandNames,
instanceFactory,
keyValue = false)

case clz if classOf[KeyValueEntity[_]].isAssignableFrom(clz) =>
val componentId = clz.getAnnotation(classOf[ComponentId]).value
Expand All @@ -475,7 +480,12 @@ private final class Sdk(
})
}
keyValueEntityDescriptors :+=
new EventSourcedEntityDescriptor(componentId, clz.getName, readOnlyCommandNames, instanceFactory)
new EventSourcedEntityDescriptor(
componentId,
clz.getName,
readOnlyCommandNames,
instanceFactory,
keyValue = true)

case clz if Reflect.isWorkflow(clz) =>
val componentId = clz.getAnnotation(classOf[ComponentId]).value
Expand Down Expand Up @@ -543,7 +553,7 @@ private final class Sdk(
logger.warn("Unknown component [{}]", clz.getName)
}

private val viewDescriptors: Seq[SpiViewDescriptor] =
private val viewDescriptors: Seq[ViewDescriptor] =
componentClasses
.filter(hasComponentId)
.collect {
Expand Down Expand Up @@ -618,7 +628,7 @@ private final class Sdk(
override val consumersDescriptors: Seq[ConsumerDescriptor] =
Sdk.this.consumerDescriptors

override val viewDescriptors: Seq[SpiViewDescriptor] =
override val viewDescriptors: Seq[ViewDescriptor] =
Sdk.this.viewDescriptors

override val workflowDescriptors: Seq[WorkflowDescriptor] =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,20 @@ import akka.javasdk.view.TableUpdater
import akka.javasdk.view.UpdateContext
import akka.javasdk.view.View
import akka.javasdk.view.View.QueryStreamEffect
import akka.runtime.sdk.spi
import akka.runtime.sdk.spi.ComponentOptions
import akka.runtime.sdk.spi.ConsumerSource
import akka.runtime.sdk.spi.MethodOptions
import akka.runtime.sdk.spi.views.SpiQueryDescriptor
import akka.runtime.sdk.spi.views.SpiTableDescriptor
import akka.runtime.sdk.spi.views.SpiTableUpdateEffect
import akka.runtime.sdk.spi.views.SpiTableUpdateEnvelope
import akka.runtime.sdk.spi.views.SpiTableUpdateHandler
import akka.runtime.sdk.spi.views.SpiType
import akka.runtime.sdk.spi.views.SpiType.SpiClass
import akka.runtime.sdk.spi.views.SpiType.SpiField
import akka.runtime.sdk.spi.views.SpiType.SpiList
import akka.runtime.sdk.spi.views.SpiViewDescriptor
import akka.runtime.sdk.spi.QueryDescriptor
import akka.runtime.sdk.spi.SpiSchema
import akka.runtime.sdk.spi.TableDescriptor
import akka.runtime.sdk.spi.SpiTableUpdateHandler.SpiTableUpdateEffect
import akka.runtime.sdk.spi.SpiTableUpdateHandler.SpiTableUpdateEnvelope
import akka.runtime.sdk.spi.SpiTableUpdateHandler
import akka.runtime.sdk.spi.SpiSchema.SpiClass
import akka.runtime.sdk.spi.SpiSchema.SpiField
import akka.runtime.sdk.spi.SpiSchema.SpiList
import akka.runtime.sdk.spi.ViewDescriptor
import org.slf4j.LoggerFactory
import org.slf4j.MDC

Expand All @@ -51,7 +52,7 @@ private[impl] object ViewDescriptorFactory {

val TableNamePattern: Regex = """FROM\s+`?([A-Za-z][A-Za-z0-9_]*)""".r

def apply(viewClass: Class[_], serializer: JsonSerializer, userEc: ExecutionContext): SpiViewDescriptor = {
def apply(viewClass: Class[_], serializer: JsonSerializer, userEc: ExecutionContext): ViewDescriptor = {
val componentId = ComponentDescriptorFactory.readComponentIdIdValue(viewClass)

val tableUpdaters =
Expand All @@ -60,7 +61,7 @@ private[impl] object ViewDescriptorFactory {
val allQueryMethods = extractQueryMethods(viewClass)
val allQueryStrings = allQueryMethods.map(_.queryString)

val tables: Seq[SpiTableDescriptor] =
val tables: Seq[TableDescriptor] =
tableUpdaters
.map { tableUpdaterClass =>
// View class type parameter declares table type
Expand Down Expand Up @@ -110,7 +111,7 @@ private[impl] object ViewDescriptorFactory {
throw new IllegalStateException(s"Table updater [${tableUpdaterClass}] is missing a @Consume annotation")
}

new SpiViewDescriptor(
new ViewDescriptor(
componentId,
viewClass.getName,
tables,
Expand All @@ -119,7 +120,7 @@ private[impl] object ViewDescriptorFactory {
componentOptions = new ComponentOptions(None, None))
}

private case class QueryMethod(descriptor: SpiQueryDescriptor, queryString: String)
private case class QueryMethod(descriptor: QueryDescriptor, queryString: String)

private def validQueryMethod(method: Method): Boolean =
method.getAnnotation(classOf[Query]) != null && (method.getReturnType == classOf[
Expand Down Expand Up @@ -169,10 +170,10 @@ private[impl] object ViewDescriptorFactory {
s"Method [${method.getName}] is marked as streaming updates, this requires it to return a ${classOf[
QueryStreamEffect[_]]}")

val inputType: Option[SpiType.QueryInput] =
val inputType: Option[SpiSchema.QueryInput] =
method.getGenericParameterTypes.headOption.map(ViewSchema.apply(_)).map {
case validInput: SpiType.QueryInput => validInput
case other =>
case validInput: SpiSchema.QueryInput => validInput
case other =>
// FIXME let's see if this flies
// For using primitive parameters directly, using their parameter name as placeholder in the query,
// we have to make up a valid message with that as a field
Expand All @@ -182,7 +183,7 @@ private[impl] object ViewDescriptorFactory {
}

val outputType = ViewSchema(actualQueryOutputClass) match {
case output: SpiType.SpiClass =>
case output: SpiClass =>
if (streamingQuery) new SpiList(output)
else output
case _ =>
Expand All @@ -191,7 +192,7 @@ private[impl] object ViewDescriptorFactory {
}

QueryMethod(
new SpiQueryDescriptor(
new QueryDescriptor(
method.getName,
queryStr,
inputType,
Expand All @@ -208,7 +209,7 @@ private[impl] object ViewDescriptorFactory {
tableType: SpiClass,
tableName: String,
serializer: JsonSerializer,
userEc: ExecutionContext): SpiTableDescriptor = {
userEc: ExecutionContext): TableDescriptor = {
val annotation = tableUpdater.getAnnotation(classOf[Consume.FromServiceStream])

val updaterMethods = tableUpdater.getMethods.toIndexedSeq
Expand All @@ -220,7 +221,7 @@ private[impl] object ViewDescriptorFactory {
.filterNot(ComponentDescriptorFactory.hasHandleDeletes)
.filter(ComponentDescriptorFactory.hasUpdateEffectOutput)

new SpiTableDescriptor(
new TableDescriptor(
tableName,
tableType,
new ConsumerSource.ServiceStreamSource(annotation.service(), annotation.id(), annotation.consumerGroup()),
Expand All @@ -246,7 +247,7 @@ private[impl] object ViewDescriptorFactory {
tableType: SpiClass,
tableName: String,
serializer: JsonSerializer,
userEc: ExecutionContext): SpiTableDescriptor = {
userEc: ExecutionContext): TableDescriptor = {

val annotation = tableUpdater.getAnnotation(classOf[Consume.FromEventSourcedEntity])

Expand All @@ -262,7 +263,7 @@ private[impl] object ViewDescriptorFactory {
// FIXME input type validation? (does that happen elsewhere?)
// FIXME method output vs table type validation? (does that happen elsewhere?)

new SpiTableDescriptor(
new TableDescriptor(
tableName,
tableType,
new ConsumerSource.EventSourcedEntitySource(
Expand All @@ -289,7 +290,7 @@ private[impl] object ViewDescriptorFactory {
tableType: SpiClass,
tableName: String,
serializer: JsonSerializer,
userEc: ExecutionContext): SpiTableDescriptor = {
userEc: ExecutionContext): TableDescriptor = {
val annotation = tableUpdater.getAnnotation(classOf[Consume.FromTopic])

val updaterMethods = tableUpdater.getMethods.toIndexedSeq
Expand All @@ -301,7 +302,7 @@ private[impl] object ViewDescriptorFactory {
// FIXME input type validation? (does that happen elsewhere?)
// FIXME method output vs table type validation? (does that happen elsewhere?)

new SpiTableDescriptor(
new TableDescriptor(
tableName,
tableType,
new ConsumerSource.TopicSource(annotation.value(), annotation.consumerGroup()),
Expand All @@ -321,7 +322,7 @@ private[impl] object ViewDescriptorFactory {
tableType: SpiClass,
tableName: String,
serializer: JsonSerializer,
userEc: ExecutionContext): SpiTableDescriptor = {
userEc: ExecutionContext): TableDescriptor = {
val annotation = tableUpdater.getAnnotation(classOf[Consume.FromKeyValueEntity])

val updaterMethods = tableUpdater.getMethods.toIndexedSeq
Expand All @@ -345,7 +346,7 @@ private[impl] object ViewDescriptorFactory {
.filterNot(ComponentDescriptorFactory.hasHandleDeletes)
.filter(ComponentDescriptorFactory.hasUpdateEffectOutput)

new SpiTableDescriptor(
new TableDescriptor(
tableName,
tableType,
new ConsumerSource.KeyValueEntitySource(ComponentDescriptorFactory.readComponentIdIdValue(annotation.value())),
Expand Down Expand Up @@ -462,9 +463,9 @@ private[impl] object ViewDescriptorFactory {
throw ViewException(componentId, "updateState with null state is not allowed.", None)
}
val bytesPayload = serializer.toBytes(newState)
new SpiTableUpdateEffect.UpdateRow(bytesPayload)
case ViewEffectImpl.Delete => SpiTableUpdateEffect.DeleteRow
case ViewEffectImpl.Ignore => SpiTableUpdateEffect.IgnoreUpdate
new spi.SpiTableUpdateHandler.UpdateRow(bytesPayload)
case ViewEffectImpl.Delete => SpiTableUpdateHandler.DeleteRow
case ViewEffectImpl.Ignore => SpiTableUpdateHandler.IgnoreUpdate
}
} finally {
if (addedToMDC) MDC.remove(Telemetry.TRACE_ID)
Expand Down
41 changes: 22 additions & 19 deletions akka-javasdk/src/main/scala/akka/javasdk/impl/view/ViewSchema.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,22 @@
package akka.javasdk.impl.view

import akka.annotation.InternalApi
import akka.runtime.sdk.spi.views.SpiType
import akka.runtime.sdk.spi.views.SpiType.SpiBoolean
import akka.runtime.sdk.spi.views.SpiType.SpiByteString
import akka.runtime.sdk.spi.views.SpiType.SpiDouble
import akka.runtime.sdk.spi.views.SpiType.SpiFloat
import akka.runtime.sdk.spi.views.SpiType.SpiInteger
import akka.runtime.sdk.spi.views.SpiType.SpiLong
import akka.runtime.sdk.spi.views.SpiType.SpiNestableType
import akka.runtime.sdk.spi.views.SpiType.SpiString
import akka.runtime.sdk.spi.views.SpiType.SpiTimestamp
import akka.runtime.sdk.spi.SpiSchema.SpiType
import akka.runtime.sdk.spi.SpiSchema.SpiBoolean
import akka.runtime.sdk.spi.SpiSchema.SpiByteString
import akka.runtime.sdk.spi.SpiSchema.SpiClass
import akka.runtime.sdk.spi.SpiSchema.SpiClassRef
import akka.runtime.sdk.spi.SpiSchema.SpiDouble
import akka.runtime.sdk.spi.SpiSchema.SpiEnum
import akka.runtime.sdk.spi.SpiSchema.SpiField
import akka.runtime.sdk.spi.SpiSchema.SpiFloat
import akka.runtime.sdk.spi.SpiSchema.SpiInteger
import akka.runtime.sdk.spi.SpiSchema.SpiList
import akka.runtime.sdk.spi.SpiSchema.SpiLong
import akka.runtime.sdk.spi.SpiSchema.SpiNestableType
import akka.runtime.sdk.spi.SpiSchema.SpiOptional
import akka.runtime.sdk.spi.SpiSchema.SpiString
import akka.runtime.sdk.spi.SpiSchema.SpiTimestamp

import java.lang.reflect.AccessFlag
import java.lang.reflect.ParameterizedType
Expand Down Expand Up @@ -58,7 +64,7 @@ private[view] object ViewSchema {
case c: Class[_] => c
case p: ParameterizedType => p.getRawType.asInstanceOf[Class[_]]
}
if (seenClasses.contains(clazz)) new SpiType.SpiClassRef(clazz.getName)
if (seenClasses.contains(clazz)) new SpiClassRef(clazz.getName)
else
knownConcreteClasses.get(clazz) match {
case Some(found) => found
Expand All @@ -67,23 +73,20 @@ private[view] object ViewSchema {
if (clazz.isArray && clazz.componentType() == classOf[java.lang.Byte]) {
SpiByteString
} else if (clazz.isEnum) {
new SpiType.SpiEnum(clazz.getName)
new SpiEnum(clazz.getName)
} else {
currentType match {
case p: ParameterizedType if clazz == classOf[Optional[_]] =>
new SpiType.SpiOptional(
loop(p.getActualTypeArguments.head, seenClasses).asInstanceOf[SpiNestableType])
new SpiOptional(loop(p.getActualTypeArguments.head, seenClasses).asInstanceOf[SpiNestableType])
case p: ParameterizedType if classOf[java.util.Collection[_]].isAssignableFrom(clazz) =>
new SpiType.SpiList(
loop(p.getActualTypeArguments.head, seenClasses).asInstanceOf[SpiNestableType])
new SpiList(loop(p.getActualTypeArguments.head, seenClasses).asInstanceOf[SpiNestableType])
case _: Class[_] =>
val seenIncludingThis = seenClasses + clazz
new SpiType.SpiClass(
new SpiClass(
clazz.getName,
clazz.getDeclaredFields
.filterNot(f => f.accessFlags().contains(AccessFlag.STATIC))
.map(field =>
new SpiType.SpiField(field.getName, loop(field.getGenericType, seenIncludingThis)))
.map(field => new SpiField(field.getName, loop(field.getGenericType, seenIncludingThis)))
.toSeq)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,12 @@ import akka.javasdk.testmodels.view.ViewTestModels
import akka.runtime.sdk.spi.ConsumerSource
import akka.runtime.sdk.spi.Principal
import akka.runtime.sdk.spi.ServiceNamePattern
import akka.runtime.sdk.spi.views.SpiType.SpiClass
import akka.runtime.sdk.spi.views.SpiType.SpiInteger
import akka.runtime.sdk.spi.views.SpiType.SpiList
import akka.runtime.sdk.spi.views.SpiType.SpiString
import akka.runtime.sdk.spi.views.SpiType.SpiTimestamp
import akka.runtime.sdk.spi.views.SpiViewDescriptor
import akka.runtime.sdk.spi.SpiSchema.SpiClass
import akka.runtime.sdk.spi.SpiSchema.SpiInteger
import akka.runtime.sdk.spi.SpiSchema.SpiList
import akka.runtime.sdk.spi.SpiSchema.SpiString
import akka.runtime.sdk.spi.SpiSchema.SpiTimestamp
import akka.runtime.sdk.spi.ViewDescriptor
import org.scalatest.matchers.should.Matchers
import org.scalatest.wordspec.AnyWordSpec

Expand All @@ -28,7 +28,7 @@ class ViewDescriptorFactorySpec extends AnyWordSpec with Matchers {
import ViewTestModels._
import akka.javasdk.testmodels.subscriptions.PubSubTestModels._

def assertDescriptor[T](test: SpiViewDescriptor => Any)(implicit tag: ClassTag[T]): Unit = {
def assertDescriptor[T](test: ViewDescriptor => Any)(implicit tag: ClassTag[T]): Unit = {
test(ViewDescriptorFactory(tag.runtimeClass, new JsonSerializer, ExecutionContexts.global()))
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,20 +5,20 @@
package akka.javasdk.impl.view

import akka.javasdk.testmodels.view.ViewTestModels
import akka.runtime.sdk.spi.views.SpiType.SpiBoolean
import akka.runtime.sdk.spi.views.SpiType.SpiByteString
import akka.runtime.sdk.spi.views.SpiType.SpiClass
import akka.runtime.sdk.spi.views.SpiType.SpiClassRef
import akka.runtime.sdk.spi.views.SpiType.SpiDouble
import akka.runtime.sdk.spi.views.SpiType.SpiEnum
import akka.runtime.sdk.spi.views.SpiType.SpiField
import akka.runtime.sdk.spi.views.SpiType.SpiFloat
import akka.runtime.sdk.spi.views.SpiType.SpiInteger
import akka.runtime.sdk.spi.views.SpiType.SpiList
import akka.runtime.sdk.spi.views.SpiType.SpiLong
import akka.runtime.sdk.spi.views.SpiType.SpiOptional
import akka.runtime.sdk.spi.views.SpiType.SpiString
import akka.runtime.sdk.spi.views.SpiType.SpiTimestamp
import akka.runtime.sdk.spi.SpiSchema.SpiBoolean
import akka.runtime.sdk.spi.SpiSchema.SpiByteString
import akka.runtime.sdk.spi.SpiSchema.SpiClass
import akka.runtime.sdk.spi.SpiSchema.SpiClassRef
import akka.runtime.sdk.spi.SpiSchema.SpiDouble
import akka.runtime.sdk.spi.SpiSchema.SpiEnum
import akka.runtime.sdk.spi.SpiSchema.SpiField
import akka.runtime.sdk.spi.SpiSchema.SpiFloat
import akka.runtime.sdk.spi.SpiSchema.SpiInteger
import akka.runtime.sdk.spi.SpiSchema.SpiList
import akka.runtime.sdk.spi.SpiSchema.SpiLong
import akka.runtime.sdk.spi.SpiSchema.SpiOptional
import akka.runtime.sdk.spi.SpiSchema.SpiString
import akka.runtime.sdk.spi.SpiSchema.SpiTimestamp
import org.scalatest.matchers.should.Matchers
import org.scalatest.wordspec.AnyWordSpec

Expand Down
2 changes: 1 addition & 1 deletion project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ object Dependencies {
val ProtocolVersionMinor = 1
val RuntimeImage = "gcr.io/kalix-public/kalix-runtime"
// Remember to bump kalix-runtime.version in akka-javasdk-maven/akka-javasdk-parent if bumping this
val RuntimeVersion = sys.props.getOrElse("kalix-runtime.version", "1.3.0-cb36dd2e")
val RuntimeVersion = sys.props.getOrElse("kalix-runtime.version", "1.3.0-645b7b0")
}
// NOTE: embedded SDK should have the AkkaVersion aligned, when updating RuntimeVersion, make sure to check
// if AkkaVersion and AkkaHttpVersion are aligned
Expand Down
Loading