Skip to content

Commit

Permalink
chore: Verbose logs after retrying the projection a number of times
Browse files Browse the repository at this point in the history
  • Loading branch information
johanandren committed Nov 15, 2024
1 parent aa00f8c commit 319a879
Show file tree
Hide file tree
Showing 4 changed files with 8 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,11 @@ package akka.projection.internal
import scala.concurrent.duration.FiniteDuration
import scala.concurrent.duration._
import scala.jdk.DurationConverters._

import akka.actor.typed.ActorSystem
import akka.annotation.InternalApi
import akka.projection.HandlerRecoveryStrategy
import akka.projection.Projection
import akka.stream.Attributes.LogLevels
import akka.stream.RestartSettings
import com.typesafe.config.Config

Expand Down Expand Up @@ -50,7 +50,12 @@ private[projection] object ProjectionSettings {
val maxRestarts = restartBackoffConfig.getInt("max-restarts")
if (maxRestarts >= 0) RestartSettings(minBackoff, maxBackoff, randomFactor)
else RestartSettings(minBackoff, maxBackoff, randomFactor).withMaxRestarts(maxRestarts, minBackoff)
}
}.withLogSettings(
RestartSettings.LogSettings.defaultSettings
.withLogLevel(LogLevels.Warning)
// Once we have retried many times, it could still be a transient failure but is
// more likely to be a permanent problem, so increase verbosity/include full stack trace
.withVerboseLogsAfter(5))

new ProjectionSettings(
restartSettings,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -376,11 +376,7 @@ final class GrpcReadJournal private (
.invoke(streamIn)
.recover {
case ex: akka.grpc.GrpcServiceException if ex.status.getCode == Status.Code.UNAVAILABLE =>
// this means we couldn't connect, will be retried, relatively common, so make it less noisy,
// Users still want to be able to figure out non-transient errors, so log with full exception details at debug
val port = clientSettings.servicePortName.getOrElse(clientSettings.defaultPort.toString)
if (log.isDebugEnabled)
log.debug(s"Connection to ${clientSettings.serviceName}:$port for stream id $streamId failed or lost", ex)
throw new ConnectionException(clientSettings.serviceName, port, streamId)

case th: Throwable =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,9 @@ package akka.projection.grpc.internal

import akka.annotation.InternalApi

import scala.util.control.NoStackTrace

/**
* INTERNAL API
*/
@InternalApi
private[akka] final class ConnectionException(host: String, port: String, streamId: String)
extends RuntimeException(s"Connection to $host:$port for stream id $streamId failed or lost, will be retried")
with NoStackTrace
2 changes: 1 addition & 1 deletion project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ object Dependencies {
val ScalaVersions = Dependencies.Scala2Versions :+ Dependencies.Scala3

object Versions {
val Akka = sys.props.getOrElse("build.akka.version", "2.10.0")
val Akka = sys.props.getOrElse("build.akka.version", "2.10.0+18-e14c0ccd+20241115-1522-SNAPSHOT")
val AkkaVersionInDocs = VersionNumber(Akka).numbers match { case Seq(major, minor, _*) => s"$major.$minor" }

val Alpakka = "9.0.0"
Expand Down

0 comments on commit 319a879

Please sign in to comment.