diff --git a/.github/workflows/checks.yml b/.github/workflows/checks.yml index f503e4deb..f40052871 100644 --- a/.github/workflows/checks.yml +++ b/.github/workflows/checks.yml @@ -73,7 +73,7 @@ jobs: with: jvm: temurin:1.11 - - name: Compile all code with fatal warnings for Java 11, Scala 2.12 and Scala 2.13 + - name: Compile all code with fatal warnings for Java 11, Scala 2.12, Scala 2.13 and Scala 3.1 run: sbt "clean ; +IntegrationTest/compile" check-docs: diff --git a/akka-projection-core-test/src/test/scala/akka/projection/internal/HandlerRecoveryImplSpec.scala b/akka-projection-core-test/src/test/scala/akka/projection/internal/HandlerRecoveryImplSpec.scala index 89ec2bb7d..3c1a6d632 100644 --- a/akka-projection-core-test/src/test/scala/akka/projection/internal/HandlerRecoveryImplSpec.scala +++ b/akka-projection-core-test/src/test/scala/akka/projection/internal/HandlerRecoveryImplSpec.scala @@ -45,7 +45,7 @@ class HandlerRecoveryImplSpec extends ScalaTestWithActorTestKit with AnyWordSpec import HandlerRecoveryImplSpec._ import TestStatusObserver._ - private val logger = Logging(system.toClassic, getClass) + private val logger = Logging(system.toClassic, getClass.asInstanceOf[Class[Any]]) private val failOnOffset: Long = 3 private val env3 = Envelope(offset = failOnOffset, "c") private val projectionId = ProjectionId("test", "1") diff --git a/akka-projection-core-test/src/test/scala/akka/projection/internal/metrics/tools/InternalProjectionStateMetricsSpec.scala b/akka-projection-core-test/src/test/scala/akka/projection/internal/metrics/tools/InternalProjectionStateMetricsSpec.scala index fd7dc45cd..83c2ae340 100644 --- a/akka-projection-core-test/src/test/scala/akka/projection/internal/metrics/tools/InternalProjectionStateMetricsSpec.scala +++ b/akka-projection-core-test/src/test/scala/akka/projection/internal/metrics/tools/InternalProjectionStateMetricsSpec.scala @@ -5,13 +5,9 @@ package akka.projection.internal.metrics.tools import java.util.UUID - import scala.collection.immutable -import scala.concurrent.Await -import scala.concurrent.ExecutionContext -import scala.concurrent.Future +import scala.concurrent.{ Await, ExecutionContext, ExecutionContextExecutor, Future } import scala.concurrent.duration._ - import akka.Done import akka.actor.testkit.typed.scaladsl.LogCapturing import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit @@ -128,7 +124,7 @@ object InternalProjectionStateMetricsSpec { implicit system: ActorSystem[_], projectionId: ProjectionId) { - private implicit val exCtx = system.executionContext + private implicit val exCtx: ExecutionContextExecutor = system.executionContext val entityId = UUID.randomUUID().toString private val projectionSettings = ProjectionSettings(system) @@ -192,7 +188,7 @@ object InternalProjectionStateMetricsSpec { handlerStrategy, statusObserver, settings) { - override def logger: LoggingAdapter = Logging(system.classicSystem, this.getClass) + override def logger: LoggingAdapter = Logging(system.classicSystem, this.getClass.asInstanceOf[Class[Any]]) override implicit def executionContext: ExecutionContext = system.executionContext diff --git a/akka-projection-core/src/main/scala/akka/projection/ProjectionBehavior.scala b/akka-projection-core/src/main/scala/akka/projection/ProjectionBehavior.scala index dffbbce0b..74e102691 100644 --- a/akka-projection-core/src/main/scala/akka/projection/ProjectionBehavior.scala +++ b/akka-projection-core/src/main/scala/akka/projection/ProjectionBehavior.scala @@ -83,7 +83,7 @@ object ProjectionBehavior { val running = projection.run()(ctx.system) if (running.isInstanceOf[RunningProjectionManagement[_]]) ProjectionManagement(ctx.system).register(projection.projectionId, ctx.self) - new ProjectionBehavior(ctx, projection, stashBuffer).started(running) + new ProjectionBehavior[Any, Envelope](ctx, projection, stashBuffer).started(running) } } } diff --git a/akka-projection-core/src/main/scala/akka/projection/ProjectionId.scala b/akka-projection-core/src/main/scala/akka/projection/ProjectionId.scala index b2993a6e0..5c9020784 100644 --- a/akka-projection-core/src/main/scala/akka/projection/ProjectionId.scala +++ b/akka-projection-core/src/main/scala/akka/projection/ProjectionId.scala @@ -82,7 +82,7 @@ object ProjectionId { * @return an [[java.util.Set]] of [[ProjectionId]]s */ def of(name: String, keys: java.util.Set[String]): java.util.Set[ProjectionId] = - keys.asScala.map { key: String => new ProjectionId(name, key) }.asJava + keys.asScala.map { key => new ProjectionId(name, key) }.asJava } final class ProjectionId private (val name: String, val key: String) { diff --git a/akka-projection-core/src/main/scala/akka/projection/internal/InternalProjectionState.scala b/akka-projection-core/src/main/scala/akka/projection/internal/InternalProjectionState.scala index 036565e45..6e498d7c5 100644 --- a/akka-projection-core/src/main/scala/akka/projection/internal/InternalProjectionState.scala +++ b/akka-projection-core/src/main/scala/akka/projection/internal/InternalProjectionState.scala @@ -184,7 +184,7 @@ private[projection] abstract class InternalProjectionState[Offset, Envelope]( handlerRecovery .applyRecovery(first.envelope, first.offset, last.offset, abort.future, measured) .map { _ => - last.copy(groupSize = envelopes.length) + last.withGroupSize(envelopes.length) } } @@ -260,7 +260,7 @@ private[projection] abstract class InternalProjectionState[Offset, Envelope]( } sourceProvider match { - case _: MergeableOffsetSourceProvider[Offset, Envelope] => + case _: MergeableOffsetSourceProvider[_, _] => val batches = envelopesAndOffsets .flatMap { case context @ ProjectionContextImpl(offset: MergeableOffset[_] @unchecked, _, _, _) => diff --git a/akka-projection-core/src/main/scala/akka/projection/internal/ProjectionContextImpl.scala b/akka-projection-core/src/main/scala/akka/projection/internal/ProjectionContextImpl.scala index 868a8f785..cc09e3299 100644 --- a/akka-projection-core/src/main/scala/akka/projection/internal/ProjectionContextImpl.scala +++ b/akka-projection-core/src/main/scala/akka/projection/internal/ProjectionContextImpl.scala @@ -17,7 +17,13 @@ import akka.projection.ProjectionContext envelope: Envelope, externalContext: AnyRef, groupSize: Int) - extends ProjectionContext + extends ProjectionContext { + + /** + * scala3 makes `copy` private + */ + def withGroupSize(groupSize: Int) = copy(groupSize = groupSize) +} /** * INTERNAL API diff --git a/akka-projection-grpc/src/main/scala/akka/projection/grpc/consumer/GrpcReadJournalProvider.scala b/akka-projection-grpc/src/main/scala/akka/projection/grpc/consumer/GrpcReadJournalProvider.scala index f1f181261..526b5cfa2 100644 --- a/akka-projection-grpc/src/main/scala/akka/projection/grpc/consumer/GrpcReadJournalProvider.scala +++ b/akka-projection-grpc/src/main/scala/akka/projection/grpc/consumer/GrpcReadJournalProvider.scala @@ -15,10 +15,14 @@ import com.typesafe.config.Config */ final class GrpcReadJournalProvider(system: ExtendedActorSystem, config: Config, cfgPath: String) extends ReadJournalProvider { - override val scaladslReadJournal: scaladsl.GrpcReadJournal = + + private lazy val scaladslReadJournalInstance: scaladsl.GrpcReadJournal = new scaladsl.GrpcReadJournal(system, config, cfgPath) - override val javadslReadJournal: javadsl.GrpcReadJournal = - new javadsl.GrpcReadJournal( - new scaladsl.GrpcReadJournal(system, config, cfgPath, ProtoAnySerialization.Prefer.Java)) + override def scaladslReadJournal(): scaladsl.GrpcReadJournal = scaladslReadJournalInstance + + private lazy val javadslReadJournalInstance = new javadsl.GrpcReadJournal( + new scaladsl.GrpcReadJournal(system, config, cfgPath, ProtoAnySerialization.Prefer.Java)) + + override def javadslReadJournal(): javadsl.GrpcReadJournal = javadslReadJournalInstance } diff --git a/akka-projection-grpc/src/main/scala/akka/projection/grpc/consumer/scaladsl/GrpcReadJournal.scala b/akka-projection-grpc/src/main/scala/akka/projection/grpc/consumer/scaladsl/GrpcReadJournal.scala index d8b0a278f..5043044ac 100644 --- a/akka-projection-grpc/src/main/scala/akka/projection/grpc/consumer/scaladsl/GrpcReadJournal.scala +++ b/akka-projection-grpc/src/main/scala/akka/projection/grpc/consumer/scaladsl/GrpcReadJournal.scala @@ -152,7 +152,7 @@ final class GrpcReadJournal private ( def this(system: ExtendedActorSystem, config: Config, cfgPath: String) = this(system, config, cfgPath, ProtoAnySerialization.Prefer.Scala) - private implicit val typedSystem = system.toTyped + private implicit val typedSystem: ClassicActorSystemProvider = system.toTyped private val persistenceExt = Persistence(system) private val client = EventProducerServiceClient(clientSettings) diff --git a/akka-projection-grpc/src/main/scala/akka/projection/grpc/internal/ProtoAnySerialization.scala b/akka-projection-grpc/src/main/scala/akka/projection/grpc/internal/ProtoAnySerialization.scala index c267fdf6c..613152117 100644 --- a/akka-projection-grpc/src/main/scala/akka/projection/grpc/internal/ProtoAnySerialization.scala +++ b/akka-projection-grpc/src/main/scala/akka/projection/grpc/internal/ProtoAnySerialization.scala @@ -68,7 +68,7 @@ import scalapb.options.Scalapb * should be preferred. */ sealed trait Prefer - final object Prefer { + object Prefer { case object Java extends Prefer case object Scala extends Prefer } diff --git a/akka-projection-grpc/src/main/scala/akka/projection/grpc/replication/javadsl/Replication.scala b/akka-projection-grpc/src/main/scala/akka/projection/grpc/replication/javadsl/Replication.scala index 007efd83f..80fd05d95 100644 --- a/akka-projection-grpc/src/main/scala/akka/projection/grpc/replication/javadsl/Replication.scala +++ b/akka-projection-grpc/src/main/scala/akka/projection/grpc/replication/javadsl/Replication.scala @@ -85,7 +85,7 @@ object Replication { settings.configureEntity .apply( Entity.of( - settings.entityTypeKey, { entityContext: EntityContext[Command] => + settings.entityTypeKey, { (entityContext: EntityContext[Command]) => val replicationId = ReplicationId(entityContext.getEntityTypeKey.name, entityContext.getEntityId, settings.selfReplicaId) replicatedBehaviorFactory.apply( diff --git a/akka-projection-grpc/src/main/scala/akka/projection/grpc/replication/javadsl/ReplicationSettings.scala b/akka-projection-grpc/src/main/scala/akka/projection/grpc/replication/javadsl/ReplicationSettings.scala index 39fe2fbfe..10a284e8c 100644 --- a/akka-projection-grpc/src/main/scala/akka/projection/grpc/replication/javadsl/ReplicationSettings.scala +++ b/akka-projection-grpc/src/main/scala/akka/projection/grpc/replication/javadsl/ReplicationSettings.scala @@ -87,7 +87,7 @@ object ReplicationSettings { .getConfigList("replicas") .asScala .toSet - .map { config: Config => + .map { (config: Config) => val replicaId = config.getString("replica-id") val clientConfig = config.getConfig("grpc.client").withFallback(grpcClientFallBack) diff --git a/akka-projection-grpc/src/main/scala/akka/projection/grpc/replication/scaladsl/ReplicationSettings.scala b/akka-projection-grpc/src/main/scala/akka/projection/grpc/replication/scaladsl/ReplicationSettings.scala index e3895d20e..708a12094 100644 --- a/akka-projection-grpc/src/main/scala/akka/projection/grpc/replication/scaladsl/ReplicationSettings.scala +++ b/akka-projection-grpc/src/main/scala/akka/projection/grpc/replication/scaladsl/ReplicationSettings.scala @@ -102,7 +102,7 @@ object ReplicationSettings { .getConfigList("replicas") .asScala .toSet - .map { config: Config => + .map { (config: Config) => val replicaId = config.getString("replica-id") val clientConfig = config.getConfig("grpc.client").withFallback(grpcClientFallBack) diff --git a/akka-projection-testkit/src/main/scala/akka/projection/testkit/internal/TestProjectionImpl.scala b/akka-projection-testkit/src/main/scala/akka/projection/testkit/internal/TestProjectionImpl.scala index 6bc7b82c1..a3fb69d47 100644 --- a/akka-projection-testkit/src/main/scala/akka/projection/testkit/internal/TestProjectionImpl.scala +++ b/akka-projection-testkit/src/main/scala/akka/projection/testkit/internal/TestProjectionImpl.scala @@ -157,7 +157,7 @@ private[projection] class TestInternalProjectionState[Offset, Envelope]( startOffset.foreach(offset => offsetStore.saveOffset(projectionId, offset)) - override val logger: LoggingAdapter = Logging(system.classicSystem, this.getClass) + override val logger: LoggingAdapter = Logging(system.classicSystem, this.getClass.asInstanceOf[Class[Any]]) override def readPaused(): Future[Boolean] = offsetStore.readManagementState(projectionId).map(_.exists(_.paused)) diff --git a/build.sbt b/build.sbt index 2a2422a2a..c4e943c68 100644 --- a/build.sbt +++ b/build.sbt @@ -17,6 +17,7 @@ lazy val core = Compile / packageBin / packageOptions += Package.ManifestAttributes( "Automatic-Module-Name" -> "akka.projection.core")) .settings(Protobuf.settings) + .settings(Scala3.settings) lazy val coreTest = Project(id = "akka-projection-core-test", base = file("akka-projection-core-test")) @@ -35,6 +36,7 @@ lazy val testkit = .settings(headerSettings(IntegrationTest)) .settings(Defaults.itSettings) .settings(Dependencies.testKit) + .settings(Scala3.settings) .dependsOn(core) // provides offset storage backed by a JDBC table @@ -80,6 +82,7 @@ lazy val eventsourced = .settings(Dependencies.eventsourced) .dependsOn(core) .dependsOn(testkit % Test) + .settings(Scala3.settings) // provides offset storage backed by Kafka managed offset commits lazy val kafka = @@ -99,6 +102,7 @@ lazy val `durable-state` = .settings(Dependencies.state) .dependsOn(core) .dependsOn(testkit % Test) + .settings(Scala3.settings) lazy val grpc = Project(id = "akka-projection-grpc", base = file("akka-projection-grpc")) @@ -107,6 +111,7 @@ lazy val grpc = .dependsOn(eventsourced) .enablePlugins(AkkaGrpcPlugin) .settings(akkaGrpcCodeGeneratorSettings += "server_power_apis", IntegrationTest / fork := true) + .settings(Scala3.settings) lazy val grpcTests = Project(id = "akka-projection-grpc-tests", base = file("akka-projection-grpc-tests")) diff --git a/project/Common.scala b/project/Common.scala index 6b2578a19..d288eeba0 100644 --- a/project/Common.scala +++ b/project/Common.scala @@ -45,7 +45,7 @@ object Common extends AutoPlugin { override lazy val projectSettings = Seq( projectInfoVersion := (if (isSnapshot.value) "snapshot" else version.value), crossVersion := CrossVersion.binary, - crossScalaVersions := Dependencies.ScalaVersions, + crossScalaVersions := Dependencies.Scala2Versions, scalaVersion := Dependencies.Scala213, javacOptions ++= List("-Xlint:unchecked", "-Xlint:deprecation"), Compile / doc / scalacOptions := scalacOptions.value ++ Seq( @@ -58,10 +58,14 @@ object Common extends AutoPlugin { "-doc-source-url", { val branch = if (isSnapshot.value) "main" else s"v${version.value}" s"https://github.com/akka/akka-projection/tree/${branch}€{FILE_PATH_EXT}#L€{FILE_LINE}" - }, - "-skip-packages", - "akka.pattern" // for some reason Scaladoc creates this - ), + }) + ++ { + if (scalaBinaryVersion.value.startsWith("3")) { + Seq("-skip-packages:akka.pattern") // different usage in scala3 + } else { + Seq("-skip-packages", "akka.pattern") // for some reason Scaladoc creates this + } + }, scalafmtOnCompile := System.getenv("CI") != "true", autoAPIMappings := true, apiURL := Some(url(s"https://doc.akka.io/api/akka-projection/${projectInfoVersion.value}")), diff --git a/project/Dependencies.scala b/project/Dependencies.scala index 5bc226afb..d4d4f244b 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -7,7 +7,9 @@ object Dependencies { val Scala213 = "2.13.10" val Scala212 = "2.12.17" - val ScalaVersions = Seq(Scala213, Scala212) + val Scala3 = "3.1.3" + val Scala2Versions = Seq(Scala213, Scala212) + val ScalaVersions = Dependencies.Scala2Versions :+ Dependencies.Scala3 val AkkaVersionInDocs = "2.8" val AlpakkaVersionInDocs = "5.0" diff --git a/project/Scala3.scala b/project/Scala3.scala new file mode 100644 index 000000000..161fe48bd --- /dev/null +++ b/project/Scala3.scala @@ -0,0 +1,8 @@ +import akka.projections.Dependencies +import sbt.Keys.crossScalaVersions + +object Scala3 { + + val settings = Seq(crossScalaVersions := Dependencies.ScalaVersions) + +}