Skip to content

Commit

Permalink
chore: Snapshots and StateSerializer type
Browse files Browse the repository at this point in the history
* test that verifies snapshots
* pass snapshotEvery in SpiSettings
* use entityStateType when deserializing state (snapshot)
  • Loading branch information
patriknw committed Dec 2, 2024
1 parent 2c30b64 commit 8df58eb
Show file tree
Hide file tree
Showing 11 changed files with 28 additions and 35 deletions.
2 changes: 1 addition & 1 deletion akka-javasdk-maven/akka-javasdk-parent/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@

<!-- These are dependent on runtime environment and cannot be customized by users -->
<maven.compiler.release>21</maven.compiler.release>
<kalix-runtime.version>1.3.0-2909c94-1-37f8654f-SNAPSHOT</kalix-runtime.version>
<kalix-runtime.version>1.3.0-2909c94-1-e56551b7-SNAPSHOT</kalix-runtime.version>

<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<skip.docker>false</skip.docker>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,14 @@
package akkajavasdk;

import akka.javasdk.http.StrictResponse;
import akka.javasdk.testkit.TestKit;
import akka.javasdk.testkit.TestKitSupport;
import akkajavasdk.components.eventsourcedentities.counter.Counter;
import akkajavasdk.components.eventsourcedentities.counter.CounterEntity;
import akka.javasdk.client.EventSourcedEntityClient;
import akkajavasdk.components.eventsourcedentities.hierarchy.AbstractTextConsumer;
import akkajavasdk.components.eventsourcedentities.hierarchy.TextEsEntity;
import com.typesafe.config.ConfigFactory;
import org.awaitility.Awaitility;
import org.hamcrest.core.IsEqual;
import org.junit.jupiter.api.Assertions;
Expand All @@ -30,6 +32,13 @@
@ExtendWith(Junit5LogCapturing.class)
public class EventSourcedEntityTest extends TestKitSupport {

@Override
protected TestKit.Settings testKitSettings() {
return TestKit.Settings.DEFAULT.withAdditionalConfig(ConfigFactory.parseString("""
akka.javasdk.event-sourced-entity.snapshot-every = 10
"""));
}

@Test
public void verifyCounterEventSourcedWiring() throws InterruptedException {

Expand Down Expand Up @@ -144,7 +153,7 @@ public void verifyCounterEventSourcedAfterRestart() {
@Test
public void verifyCounterEventSourcedAfterRestartFromSnapshot() {

// snapshotting with kalix.event-sourced-entity.snapshot-every = 10
// snapshotting with akka.javasdk.event-sourced-entity.snapshot-every = 10
var counterId = "restartFromSnapshot";
var client = componentClient.forEventSourcedEntity(counterId);

Expand Down
1 change: 0 additions & 1 deletion akka-javasdk-tests/src/test/resources/application.conf
Original file line number Diff line number Diff line change
@@ -1,3 +1,2 @@
kalix.event-sourced-entity.snapshot-every = 10
# Using a different port to not conflict with parallel tests
akka.javasdk.testkit.http-port = 39391
3 changes: 2 additions & 1 deletion akka-javasdk-tests/src/test/resources/logback-test.xml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

<logger name="akka" level="WARN"/>
<logger name="akka.runtime" level="DEBUG"/>
<logger name="kalix.runtime" level="DEBUG"/>
<logger name="akka.javasdk" level="DEBUG"/>
<logger name="kalix.runtime.views" level="INFO"/>
<logger name="akka.http" level="WARN"/>
Expand All @@ -23,6 +24,6 @@

<root level="DEBUG">
<appender-ref ref="CapturingAppender"/>
<!-- <appender-ref ref="STDOUT"/>-->
<appender-ref ref="STDOUT"/>
</root>
</configuration>
Original file line number Diff line number Diff line change
Expand Up @@ -165,9 +165,8 @@ private[javasdk] class JsonMessageCodec extends MessageCodec {
value
}

def decodeMessage[T](expectedType: Class[T], bytes: akka.util.ByteString): T = {
// FIXME could we avoid the copy?
JsonSupport.parseBytes(bytes.toArrayUnsafe(), expectedType)
def decodeMessage[T](expectedType: Class[T], pb: ScalaPbAny): T = {
JsonSupport.decodeJson(expectedType, pb)
}

private[akka] def removeVersion(typeName: String) = {
Expand Down
8 changes: 5 additions & 3 deletions akka-javasdk/src/main/scala/akka/javasdk/impl/SdkRunner.scala
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,9 @@ class SdkRunner private (dependencyProvider: Option[DependencyProvider]) extends
@nowarn("msg=deprecated") //TODO remove deprecation once we remove the old constructor
override def getSettings: SpiSettings = {
val applicationConf = applicationConfig

val eventSourcedEntitySnapshotEvery = applicationConfig.getInt("akka.javasdk.event-sourced-entity.snapshot-every")

val devModeSettings =
if (applicationConf.getBoolean("akka.javasdk.dev-mode.enabled"))
Some(
Expand All @@ -132,7 +135,7 @@ class SdkRunner private (dependencyProvider: Option[DependencyProvider]) extends
else
None

new SpiSettings(devModeSettings)
new SpiSettings(eventSourcedEntitySnapshotEvery, devModeSettings)
}

private def extractBrokerConfig(eventingConf: Config): SpiEventingSupportSettings = {
Expand Down Expand Up @@ -374,8 +377,7 @@ private final class Sdk(
wiredInstance(clz.asInstanceOf[Class[EventSourcedEntity[AnyRef, AnyRef]]]) {
// remember to update component type API doc and docs if changing the set of injectables
case p if p == classOf[EventSourcedEntityContext] => context
},
sdkSettings.snapshotEvery)
})
}
new EventSourcedEntityDescriptor(componentId, instanceFactory)
}
Expand Down
2 changes: 0 additions & 2 deletions akka-javasdk/src/main/scala/akka/javasdk/impl/Settings.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ private[impl] object Settings {

def apply(sdkConfig: Config): Settings = {
Settings(
snapshotEvery = sdkConfig.getInt("event-sourced-entity.snapshot-every"),
cleanupDeletedEventSourcedEntityAfter = sdkConfig.getDuration("event-sourced-entity.cleanup-deleted-after"),
cleanupDeletedKeyValueEntityAfter = sdkConfig.getDuration("key-value-entity.cleanup-deleted-after"),
devModeSettings = Option.when(sdkConfig.getBoolean("dev-mode.enabled"))(
Expand All @@ -35,7 +34,6 @@ private[impl] object Settings {
*/
@InternalApi
private[impl] final case class Settings(
snapshotEvery: Int,
cleanupDeletedEventSourcedEntityAfter: Duration,
cleanupDeletedKeyValueEntityAfter: Duration,
devModeSettings: Option[DevModeSettings])
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ private[impl] final class EventSourcedEntitiesImpl(
if (service.snapshotEvery < 0)
log.warn("Snapshotting disabled for entity [{}], this is not recommended.", service.componentId)
// FIXME overlay configuration provided by _system
(name, if (service.snapshotEvery == 0) service.withSnapshotEvery(configuration.snapshotEvery) else service)
(name, if (service.snapshotEvery == 0) service else service)
}.toMap

private val instrumentations: Map[String, TraceInstrumentation] = services.values.map { s =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,15 +44,13 @@ import com.google.protobuf.any.{ Any => ScalaPbAny }
import io.grpc.Status
import io.opentelemetry.api.trace.Span
import io.opentelemetry.api.trace.Tracer
import org.slf4j.LoggerFactory
import org.slf4j.MDC

/**
* INTERNAL API
*/
@InternalApi
private[impl] object EventSourcedEntityImpl {
private val log = LoggerFactory.getLogger(this.getClass)

private class CommandContextImpl(
override val entityId: String,
Expand Down Expand Up @@ -89,14 +87,10 @@ private[impl] final class EventSourcedEntityImpl[S, E, ES <: EventSourcedEntity[
componentClass: Class[_],
entityId: String,
messageCodec: JsonMessageCodec,
factory: EventSourcedEntityContext => ES,
snapshotEvery: Int)
factory: EventSourcedEntityContext => ES)
extends SpiEventSourcedEntity {
import EventSourcedEntityImpl._

if (snapshotEvery < 0)
log.warn("Snapshotting disabled for entity [{}], this is not recommended.", componentId)

// FIXME
// private val traceInstrumentation = new TraceInstrumentation(componentId, EventSourcedEntityCategory, tracerFactory)

Expand Down Expand Up @@ -130,7 +124,7 @@ private[impl] final class EventSourcedEntityImpl[S, E, ES <: EventSourcedEntity[
// FIXME smuggling 0 arity method called from component client through here
ScalaPbAny.defaultInstance.withTypeUrl(AnySupport.JsonTypeUrlPrefix).withValue(ByteString.empty())))
val metadata: Metadata =
MetadataImpl.of(Nil) // FIXME MetadataImpl.of(command.metadata.map(_.entries.toVector).getOrElse(Nil))
MetadataImpl.Empty // FIXME MetadataImpl.of(command.metadata.map(_.entries.toVector).getOrElse(Nil))
val cmdContext =
new CommandContextImpl(
entityId,
Expand Down Expand Up @@ -168,13 +162,11 @@ private[impl] final class EventSourcedEntityImpl[S, E, ES <: EventSourcedEntity[
var updatedState = state
commandEffect.primaryEffect match {
case EmitEvents(events, deleteEntity) =>
var shouldSnapshot = false
events.foreach { event =>
updatedState = entityHandleEvent(updatedState, event.asInstanceOf[AnyRef], entityId, currentSequence)
if (updatedState == null)
throw new IllegalArgumentException("Event handler must not return null as the updated state.")
currentSequence += 1
shouldSnapshot = shouldSnapshot || (snapshotEvery > 0 && currentSequence % snapshotEvery == 0)
}

val (reply, error) = replyOrError(updatedState)
Expand All @@ -183,14 +175,6 @@ private[impl] final class EventSourcedEntityImpl[S, E, ES <: EventSourcedEntity[
Future.successful(
new SpiEventSourcedEntity.Effect(events = Vector.empty, updatedState = state, reply = None, error, None))
} else {
// snapshotting final state since that is the "atomic" write
// emptyState can be null but null snapshot should not be stored, but that can't even
// happen since event handler is not allowed to return null as newState
// FIXME
// val snapshot =
// if (shouldSnapshot) Option(updatedState)
// else None

val delete =
if (deleteEntity) Some(configuration.cleanupDeletedEventSourcedEntityAfter)
else None
Expand Down Expand Up @@ -279,7 +263,7 @@ private[impl] final class EventSourcedEntityImpl[S, E, ES <: EventSourcedEntity[
override def toProto(obj: Deserialized): ScalaPbAny =
ScalaPbAny.fromJavaProto(messageCodec.encodeJava(obj))

override def fromProto(pb: ScalaPbAny): Deserialized =
messageCodec.decodeMessage(pb).asInstanceOf[Deserialized]
def fromProto(pb: ScalaPbAny): Deserialized =
messageCodec.decodeMessage(router.entityStateType, pb)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ private[impl] class ReflectiveEventSourcedEntityRouter[S, E, ES <: EventSourcedE
// similar to workflow, we preemptively register the events type to the message codec
Reflect.allKnownEventTypes[S, E, ES](entity).foreach(messageCodec.registerTypeHints)

val entityStateType: Class[S] = Reflect.eventSourcedEntityStateType(entity.getClass).asInstanceOf[Class[S]]

private def commandHandlerLookup(commandName: String) =
commandHandlers.getOrElse(
commandName,
Expand Down Expand Up @@ -90,7 +92,6 @@ private[impl] class ReflectiveEventSourcedEntityRouter[S, E, ES <: EventSourcedE
}

private def _setCurrentState(state: S): Unit = {
val entityStateType: Class[S] = Reflect.eventSourcedEntityStateType(this.entity.getClass).asInstanceOf[Class[S]]

// the state: S received can either be of the entity "state" type (if coming from emptyState/memory)
// or PB Any type (if coming from the runtime)
Expand Down
2 changes: 1 addition & 1 deletion project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ object Dependencies {
val ProtocolVersionMinor = 1
val RuntimeImage = "gcr.io/kalix-public/kalix-runtime"
// Remember to bump kalix-runtime.version in akka-javasdk-maven/akka-javasdk-parent if bumping this
val RuntimeVersion = sys.props.getOrElse("kalix-runtime.version", "1.3.0-2909c94-1-37f8654f-SNAPSHOT")
val RuntimeVersion = sys.props.getOrElse("kalix-runtime.version", "1.3.0-2909c94-1-e56551b7-SNAPSHOT")
}
// NOTE: embedded SDK should have the AkkaVersion aligned, when updating RuntimeVersion, make sure to check
// if AkkaVersion and AkkaHttpVersion are aligned
Expand Down

0 comments on commit 8df58eb

Please sign in to comment.