Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Log slow RPC requests #8349

Merged
merged 18 commits into from
Jan 27, 2025
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions app/Startup.scala
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import org.apache.pekko.actor.{ActorSystem, Props}
import cleanup.CleanUpService
import com.scalableminds.util.time.Instant
import com.typesafe.scalalogging.LazyLogging
import controllers.InitialDataService
import files.TempFileService
Expand Down Expand Up @@ -35,7 +36,7 @@ class Startup @Inject()(actorSystem: ActorSystem,
slackNotificationService: SlackNotificationService)(implicit ec: ExecutionContext)
extends LazyLogging {

private val beforeStartup = System.currentTimeMillis()
private val beforeStartup = Instant.now

logger.info(s"Executing Startup: Start actors, register cleanup services and stop hooks...")

Expand Down Expand Up @@ -89,10 +90,10 @@ class Startup @Inject()(actorSystem: ActorSystem,
}

initialDataService.insert.futureBox.map {
case Full(_) => logger.info(s"Webknossos startup took ${System.currentTimeMillis() - beforeStartup} ms.")
case Full(_) => Instant.logSince(beforeStartup, "Webknossos startup", logger)
case Failure(msg, _, _) =>
logger.info("No initial data inserted: " + msg)
logger.info(s"Webknossos startup took ${System.currentTimeMillis() - beforeStartup} ms.")
Instant.logSince(beforeStartup, "Webknossos startup", logger)
case _ => ()
}

Expand Down
4 changes: 2 additions & 2 deletions app/controllers/DatasetController.scala
Original file line number Diff line number Diff line change
Expand Up @@ -472,7 +472,7 @@ class DatasetController @Inject()(userService: UserService,
request.body.mag,
request.body.additionalCoordinates
) ?~> "segmentAnything.getData.failed"
_ = logger.info(s"Data loading for SAM took ${Instant.since(beforeDataLoading)}")
_ = Instant.logSince(beforeDataLoading, "Data loading for SAM", logger)
_ = logger.debug(
s"Sending ${data.length} bytes to SAM server, element class is ${dataLayer.elementClass}, range: $intensityMin-$intensityMax...")
_ <- bool2Fox(
Expand All @@ -492,7 +492,7 @@ class DatasetController @Inject()(userService: UserService,
intensityMin,
intensityMax
) ?~> "segmentAnything.getMask.failed"
_ = logger.info(s"Fetching SAM masks from torchserve took ${Instant.since(beforeMask)}")
_ = Instant.logSince(beforeMask, "Fetching SAM masks from torchserve", logger)
_ = logger.debug(s"Received ${mask.length} bytes of mask from SAM server, forwarding to front-end...")
} yield Ok(mask)
}
Expand Down
5 changes: 2 additions & 3 deletions app/controllers/OrganizationController.scala
Original file line number Diff line number Diff line change
Expand Up @@ -127,13 +127,12 @@ class OrganizationController @Inject()(
needsAcceptance = conf.WebKnossos.TermsOfService.enabled &&
organization.lastTermsOfServiceAcceptanceVersion < conf.WebKnossos.TermsOfService.version
acceptanceDeadline = conf.WebKnossos.TermsOfService.acceptanceDeadline
deadlinePassed = acceptanceDeadline.toEpochMilli < System.currentTimeMillis()
} yield
Ok(
Json.obj(
"acceptanceNeeded" -> needsAcceptance,
"acceptanceDeadline" -> acceptanceDeadline.toEpochMilli,
"acceptanceDeadlinePassed" -> deadlinePassed
"acceptanceDeadline" -> acceptanceDeadline,
"acceptanceDeadlinePassed" -> acceptanceDeadline.isPast
))
}

Expand Down
9 changes: 5 additions & 4 deletions app/files/TempFileService.scala
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package files

import cleanup.CleanUpService
import com.scalableminds.util.time.Instant

import java.nio.file.{Files, Path, Paths}
import com.scalableminds.util.tools.Fox
Expand All @@ -22,7 +23,7 @@ class TempFileService @Inject()(cleanUpService: CleanUpService)(implicit ec: Exe

private val tmpDir: Path = Paths.get(System.getProperty("java.io.tmpdir")).resolve("webknossosTempFiles")

private val activeTempFiles = scala.collection.mutable.Set[(Path, Long)]()
private val activeTempFiles = scala.collection.mutable.Set[(Path, Instant)]()

cleanUpService.register("Clean up expired temporary files", 1 hour)(cleanUpExpiredFiles())

Expand All @@ -34,12 +35,12 @@ class TempFileService @Inject()(cleanUpService: CleanUpService)(implicit ec: Exe
val path = tmpDir.resolve(f"$prefix-${Random.alphanumeric.take(15).mkString("")}")
logger.info(f"Creating temp file at $path")
Files.createFile(path)
activeTempFiles.add((path, System.currentTimeMillis() + lifeTime.toMillis))
activeTempFiles.add((path, Instant.now + lifeTime))
path
}

def cleanUpExpiredFiles(): Fox[Unit] = {
val now = System.currentTimeMillis()
private def cleanUpExpiredFiles(): Fox[Unit] = {
val now = Instant.now
activeTempFiles.foreach {
case (path, expiryTime) =>
if (expiryTime < now) {
Expand Down
10 changes: 6 additions & 4 deletions app/models/analytics/AnalyticsService.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package models.analytics

import com.scalableminds.util.accesscontext.{DBAccessContext, GlobalAccessContext}
import com.scalableminds.util.objectid.ObjectId
import com.scalableminds.util.time.Instant
import com.scalableminds.util.tools.Fox
import com.scalableminds.util.tools.Fox.{bool2Fox, box2Fox}
import com.scalableminds.webknossos.datastore.rpc.RPC
Expand Down Expand Up @@ -100,13 +101,14 @@ class AnalyticsSessionService @Inject()(wkConf: WkConf) extends LazyLogging {
private lazy val pause: FiniteDuration = wkConf.BackendAnalytics.sessionPause

// format: userId → (lastRefreshTimestamp, sessionId)
private lazy val sessionIdStore: scala.collection.mutable.Map[ObjectId, (Long, Long)] = scala.collection.mutable.Map()
private lazy val sessionIdStore: scala.collection.mutable.Map[ObjectId, (Instant, Long)] =
scala.collection.mutable.Map()

def refreshAndGetSessionId(multiUserId: ObjectId): Long = {
val now: Long = System.currentTimeMillis()
val now = Instant.now
sessionIdStore.synchronized {
val valueOld = sessionIdStore.getOrElse(multiUserId, (-1L, -1L))
val idToSet = if (valueOld._1 + pause.toMillis < now) now else valueOld._2
val valueOld = sessionIdStore.getOrElse(multiUserId, (Instant.zero, -1L))
val idToSet = if (valueOld._1 + pause < now) now.epochMillis else valueOld._2
sessionIdStore.put(multiUserId, (now, idToSet))
idToSet
}
Expand Down
2 changes: 0 additions & 2 deletions app/models/annotation/AnnotationStore.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@ class AnnotationStore @Inject()(

private val cacheTimeout = 60 minutes

case class StoredResult(result: Fox[Annotation], timestamp: Long = System.currentTimeMillis)

def requestAnnotation(id: AnnotationIdentifier, user: Option[User])(implicit ctx: DBAccessContext): Fox[Annotation] =
requestFromCache(id).getOrElse(requestFromHandler(id, user)).futureBox.recover {
case e =>
Expand Down
13 changes: 6 additions & 7 deletions app/models/job/Worker.scala
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import com.scalableminds.util.tools.Fox
import com.scalableminds.webknossos.datastore.helpers.IntervalScheduler
import com.scalableminds.webknossos.schema.Tables._
import com.typesafe.scalalogging.LazyLogging
import models.dataset.DataStoreDAO
import models.job.JobCommand.JobCommand
import play.api.inject.ApplicationLifecycle
import play.api.libs.json.{JsObject, Json}
Expand All @@ -29,7 +28,7 @@ case class Worker(_id: ObjectId,
maxParallelHighPriorityJobs: Int,
maxParallelLowPriorityJobs: Int,
supportedJobCommands: Set[JobCommand],
lastHeartBeat: Long = 0,
lastHeartBeat: Instant = Instant.zero,
created: Instant = Instant.now,
isDeleted: Boolean = false)

Expand All @@ -55,7 +54,7 @@ class WorkerDAO @Inject()(sqlClient: SqlClient)(implicit ec: ExecutionContext)
r.maxparallelhighpriorityjobs,
r.maxparallellowpriorityjobs,
supportedJobCommands.toSet,
r.lastheartbeat.getTime,
Instant.fromSql(r.lastheartbeat),
Instant.fromSql(r.created),
r.isdeleted
)
Expand All @@ -80,10 +79,10 @@ class WorkerDAO @Inject()(sqlClient: SqlClient)(implicit ec: ExecutionContext)
}
}

class WorkerService @Inject()(conf: WkConf, dataStoreDAO: DataStoreDAO, workerDAO: WorkerDAO) {
class WorkerService @Inject()(conf: WkConf) {

def lastHeartBeatIsRecent(worker: Worker): Boolean =
System.currentTimeMillis() - worker.lastHeartBeat < conf.Jobs.workerLivenessTimeout.toMillis
Instant.since(worker.lastHeartBeat) < conf.Jobs.workerLivenessTimeout

def publicWrites(worker: Worker): JsObject =
Json.obj(
Expand Down Expand Up @@ -136,14 +135,14 @@ class WorkerLivenessService @Inject()(workerService: WorkerService,

private def reportAsDead(worker: Worker): Unit = {
val msg =
s"Worker ${worker.name} (${worker._id}) is not reporting. Last heartbeat was at ${formatDate(worker.lastHeartBeat)}"
s"Worker ${worker.name} (${worker._id}) is not reporting. Last heartbeat was at ${worker.lastHeartBeat}"
slackNotificationService.warn("Worker missing", msg)
logger.warn(msg)
}

private def reportAsResurrected(worker: Worker): Unit = {
val msg =
s"Worker ${worker.name} (${worker._id}) is reporting again. Last heartbeat was at ${formatDate(worker.lastHeartBeat)}"
s"Worker ${worker.name} (${worker._id}) is reporting again. Last heartbeat was at ${worker.lastHeartBeat}"
slackNotificationService.success("Worker return", msg)
logger.info(msg)
}
Expand Down
2 changes: 1 addition & 1 deletion app/models/voxelytics/LokiClient.scala
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ class LokiClient @Inject()(wkConf: WkConf, rpc: RPC, val system: ActorSystem)(im
for {
timestampString <- tryo((entry \ "@timestamp").as[String]).toFox
timestamp <- if (timestampString.endsWith("Z"))
Instant.fromString(timestampString)
Instant.fromString(timestampString).toFox
else
Instant.fromLocalTimeString(timestampString)
values <- tryo(
Expand Down
2 changes: 1 addition & 1 deletion app/utils/WkConf.scala
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
package utils

import com.scalableminds.util.time.Instant
import com.scalableminds.util.tools.ConfigReader
import com.typesafe.scalalogging.LazyLogging
import play.api.Configuration

import java.time.Instant
import javax.inject.Inject
import scala.concurrent.duration._

Expand Down
52 changes: 52 additions & 0 deletions test/backend/FormatterTestSuite.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package backend

import com.scalableminds.util.mvc.Formatter
import org.scalatestplus.play.PlaySpec

import scala.concurrent.duration.FiniteDuration

class FormatterTestSuite extends PlaySpec with Formatter {
"formatDuration" should {
"format human readable dates" in {
assert(formatDuration(FiniteDuration(0, "ms")) == "0ms")
assert(formatDuration(FiniteDuration(10, "ms")) == "10ms")
assert(formatDuration(FiniteDuration(999, "ms")) == "999ms")
assert(formatDuration(FiniteDuration(1000, "ms")) == "1s")
assert(formatDuration(FiniteDuration(1004, "ms")) == "1s")
assert(formatDuration(FiniteDuration(1005, "ms")) == "1.01s")
assert(formatDuration(FiniteDuration(1014, "ms")) == "1.01s")
assert(formatDuration(FiniteDuration(1015, "ms")) == "1.02s")
assert(formatDuration(FiniteDuration(4000, "ms")) == "4s")
assert(formatDuration(FiniteDuration(4100, "ms")) == "4.1s")
assert(formatDuration(FiniteDuration(4110, "ms")) == "4.11s")
assert(formatDuration(FiniteDuration(4114, "ms")) == "4.11s")
assert(formatDuration(FiniteDuration(4115, "ms")) == "4.12s")
assert(formatDuration(FiniteDuration(59994, "ms")) == "59.99s")
assert(formatDuration(FiniteDuration(59995, "ms")) == "1m")
assert(formatDuration(FiniteDuration(60000, "ms")) == "1m")
assert(formatDuration(FiniteDuration(60499, "ms")) == "1m")
assert(formatDuration(FiniteDuration(60500, "ms")) == "1m 1s")
assert(formatDuration(FiniteDuration(61111, "ms")) == "1m 1s")
assert(formatDuration(FiniteDuration(120000, "ms")) == "2m")
assert(formatDuration(FiniteDuration(3600000 - 501, "ms")) == "59m 59s")
assert(formatDuration(FiniteDuration(3600000 - 500, "ms")) == "1h")
assert(formatDuration(FiniteDuration(3600000 - 1, "ms")) == "1h")
assert(formatDuration(FiniteDuration(3600000, "ms")) == "1h")
assert(formatDuration(FiniteDuration(3600000 + 60000, "ms")) == "1h 1m")
assert(formatDuration(FiniteDuration(3600000 + 60010, "ms")) == "1h 1m")
assert(formatDuration(FiniteDuration(3600000 + 60500, "ms")) == "1h 1m 1s")
assert(formatDuration(FiniteDuration(24 * 3600000 - 1, "ms")) == "1 day")
assert(formatDuration(FiniteDuration(24 * 3600000 - 500, "ms")) == "1 day")
assert(formatDuration(FiniteDuration(24 * 3600000 - 501, "ms")) == "23h 59m 59s")
assert(formatDuration(FiniteDuration(24 * 3600000 + 501, "ms")) == "1 day")
assert(formatDuration(FiniteDuration(24 * 3600000 + 59000 + 500, "ms")) == "1 day 1m")
assert(formatDuration(FiniteDuration(24 * 3600000 + 60000, "ms")) == "1 day 1m")
assert(formatDuration(FiniteDuration(25 * 3600000 + 60000 + 1000, "ms")) == "1 day 1h 1m")
assert(formatDuration(FiniteDuration(49 * 3600000 + 60000 + 500, "ms")) == "2 days 1h 1m")
assert(formatDuration(FiniteDuration(24 * 24 * 3600000 + 60000 + 500, "ms")) == "24 days 1m")
assert(formatDuration(FiniteDuration(-50, "ms")) == "-50ms")
assert(formatDuration(FiniteDuration(-5000, "ms")) == "-5s")
assert(formatDuration(FiniteDuration(-1 * (49 * 3600000 + 60000 + 500), "ms")) == "-2 days 1h 1m")
}
}
}
9 changes: 2 additions & 7 deletions test/backend/InstantTestSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,8 @@ class InstantTestSuite extends PlaySpec {

"Instant" should {
"be parsed from strings in different formats" in {
assert(
Instant.fromString("1707389459123")(global).await(handleFoxJustification).contains(Instant(1707389459123L)))
assert(
Instant
.fromString("2024-02-08T10:50:59.123Z")(global)
.await(handleFoxJustification)
.contains(Instant(1707389459123L)))
assert(Instant.fromString("1707389459123").contains(Instant(1707389459123L)))
assert(Instant.fromString("2024-02-08T10:50:59.123Z").contains(Instant(1707389459123L)))
}
"be parsed from json in different formats" in {
assert(JsonHelper.parseAndValidateJson[Instant]("1707389459123").contains(Instant(1707389459123L)))
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.scalableminds.util.mvc

import com.google.protobuf.CodedInputStream
import com.scalableminds.util.time.Instant
import com.scalableminds.util.tools.{BoxImplicits, Fox, FoxImplicits}
import com.typesafe.scalalogging.LazyLogging
import net.liftweb.common._
Expand Down Expand Up @@ -46,7 +47,7 @@ trait BoxToResultHelpers extends I18nSupport with Formatter with RemoteOriginHel
private def formatChain(chain: Box[Failure], includeTime: Boolean = true)(
implicit messages: MessagesProvider): String = chain match {
case Full(failure) =>
val serverTimeMsg = if (includeTime) "[Server Time " + formatDate(System.currentTimeMillis()) + "] " else ""
val serverTimeMsg = if (includeTime) s"[Server Time ${Instant.now}] " else ""
serverTimeMsg + " <~ " + formatFailure(failure) + formatChain(failure.chain, includeTime = false)
case _ => ""
}
Expand Down
Loading