Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -47,14 +47,13 @@ class ProjectBatchLockController(
logger.debug("Retrieving all project batch locks")

val locks = batchJobProjectLockingManager.getMap()
val lockModels = locks.map { (projectId, lockedJobId) ->
val lockStatus = when (lockedJobId) {
null -> LockStatus.UNINITIALIZED
0L -> LockStatus.UNLOCKED
val lockModels = locks.entries.map { (projectId, lockedJobIds) ->
val lockStatus = when {
lockedJobIds.isEmpty() -> LockStatus.UNLOCKED
else -> LockStatus.LOCKED
}

val jobInfo = if (lockedJobId != null && lockedJobId > 0L) {
val jobInfos = lockedJobIds.mapNotNull { lockedJobId ->
val jobDto = batchJobService.findJobDto(lockedJobId)
if (jobDto == null) {
logger.warn("Locked job $lockedJobId in project $projectId not found")
Expand All @@ -67,15 +66,13 @@ class ProjectBatchLockController(
createdAt = jobDto.createdAt
)
}
} else {
null
}

ProjectLockModel(
projectId = projectId,
lockedJobId = lockedJobId,
lockedJobIds = lockedJobIds,
lockStatus = lockStatus,
jobInfo = jobInfo
jobInfos = jobInfos
)
}

Expand Down Expand Up @@ -113,9 +110,9 @@ class ProjectBatchLockController(
*/
data class ProjectLockModel(
val projectId: Long,
val lockedJobId: Long?,
val lockedJobIds: Set<Long>,
val lockStatus: LockStatus,
val jobInfo: JobInfo?
val jobInfos: List<JobInfo>
)

/**
Expand All @@ -132,13 +129,10 @@ data class JobInfo(
* Status of the project lock
*/
enum class LockStatus {
/** Lock is explicitly cleared (value = 0L) */
/** Lock is explicitly cleared (value = empty set) */
UNLOCKED,

/** Lock has never been initialized (value = null) */
UNINITIALIZED,

/** Lock is held by a specific job (value = jobId) */
/** Lock is held by one or more jobs (value = set of job IDs) */
LOCKED
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -409,7 +409,7 @@ class BatchJobTestUtil(

fun verifyProjectJobLockReleased() {
waitFor(pollTime = 200, timeout = 1000) {
batchJobProjectLockingManager.getLockedForProject(testData.projectBuilder.self.id) == 0L
batchJobProjectLockingManager.getLockedForProject(testData.projectBuilder.self.id).isEmpty()
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ class BatchJobConcurrentLauncher(
}

/**
* Only single job can run in project at the same time
* There is a project level lock with configurable n concurrent locks allowed.
*/
if (!batchJobProjectLockingManager.canLockJobForProject(executionItem.jobId)) {
logger.debug(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,18 @@ package io.tolgee.batch

import io.tolgee.batch.data.BatchJobDto
import io.tolgee.component.UsingRedisProvider
import io.tolgee.configuration.tolgee.BatchProperties
import io.tolgee.util.Logging
import io.tolgee.util.logger
import org.redisson.api.RMap
import org.redisson.api.RedissonClient
import org.springframework.beans.factory.InitializingBean
import org.springframework.context.annotation.Lazy
import org.springframework.stereotype.Component
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.ConcurrentMap

private const val REDIS_PROJECT_BATCH_JOB_LOCKS_KEY = "project_batch_job_locks"

/**
* Only single job can be executed at the same time for one project.
*
Expand All @@ -19,13 +22,14 @@ import java.util.concurrent.ConcurrentMap
@Component
class BatchJobProjectLockingManager(
private val batchJobService: BatchJobService,
private val batchProperties: BatchProperties,
@Lazy
private val redissonClient: RedissonClient,
private val usingRedisProvider: UsingRedisProvider,
) : Logging {
) : Logging, InitializingBean {
companion object {
private val localProjectLocks by lazy {
ConcurrentHashMap<Long, Long?>()
ConcurrentHashMap<Long, Set<Long>>()
}
}

Expand All @@ -51,18 +55,20 @@ class BatchJobProjectLockingManager(
jobId: Long,
) {
projectId ?: return
getMap().compute(projectId) { _, lockedJobId ->
getMap().compute(projectId) { _, lockedJobIds ->
logger.debug("Unlocking job: $jobId for project $projectId")
if (lockedJobId == jobId) {
val currentJobs = lockedJobIds ?: emptySet()
if (currentJobs.contains(jobId)) {
logger.debug("Unlocking job: $jobId for project $projectId")
return@compute 0L
val updatedJobs = currentJobs - jobId
return@compute updatedJobs.ifEmpty { emptySet() }
}
logger.debug("Job: $jobId for project $projectId is not locked")
return@compute lockedJobId
return@compute currentJobs
}
}

fun getMap(): ConcurrentMap<Long, Long?> {
fun getMap(): ConcurrentMap<Long, Set<Long>> {
if (usingRedisProvider.areWeUsingRedis) {
return getRedissonProjectLocks()
}
Expand All @@ -71,65 +77,84 @@ class BatchJobProjectLockingManager(

private fun tryLockWithRedisson(batchJobDto: BatchJobDto): Boolean {
val projectId = batchJobDto.projectId ?: return true
val computed =
getRedissonProjectLocks().compute(projectId) { _, value ->
computeFnBody(batchJobDto, value)
}
return computed == batchJobDto.id
val computedJobIds =
getRedissonProjectLocks().compute(projectId) { _, lockedJobIds ->
val newLockedJobIds = computeFnBody(batchJobDto, lockedJobIds ?: emptySet())
logger.debug(
"While trying to lock on redis {} for project {} new lock value is {}",
batchJobDto.id,
batchJobDto.projectId,
newLockedJobIds
)
newLockedJobIds
} ?: emptySet()
return computedJobIds.contains(batchJobDto.id)
}

fun getLockedForProject(projectId: Long): Long? {
fun getLockedForProject(projectId: Long): Set<Long> {
if (usingRedisProvider.areWeUsingRedis) {
return getRedissonProjectLocks()[projectId]
return getRedissonProjectLocks()[projectId] ?: emptySet()
}
return localProjectLocks[projectId]
return localProjectLocks[projectId] ?: emptySet()
}

private fun tryLockLocal(toLock: BatchJobDto): Boolean {
val projectId = toLock.projectId ?: return true
val computed =
localProjectLocks.compute(projectId) { _, value ->
val newLocked = computeFnBody(toLock, value)
logger.debug("While trying to lock ${toLock.id} for project ${toLock.projectId} new lock value is $newLocked")
newLocked
}
return computed == toLock.id
private fun tryLockLocal(batchJobDto: BatchJobDto): Boolean {
val projectId = batchJobDto.projectId ?: return true
val computedJobIds =
localProjectLocks.compute(projectId) { _, lockedJobIds ->
val newLockedJobIds = computeFnBody(batchJobDto, lockedJobIds ?: emptySet())
logger.debug(
"While trying to lock locally {} for project {} new lock value is {}",
batchJobDto.id,
batchJobDto.projectId,
newLockedJobIds
)
newLockedJobIds
} ?: emptySet()
return computedJobIds.contains(batchJobDto.id)
}

private fun computeFnBody(
toLock: BatchJobDto,
currentValue: Long?,
): Long {
lockedJobIds: Set<Long>,
): Set<Long> {
val projectId =
toLock.projectId
?: throw IllegalStateException(
"Project id is required. " +
"Locking for project should not happen for non-project jobs.",
)
// nothing is locked
if (currentValue == 0L) {
logger.debug("Locking job ${toLock.id} for project ${toLock.projectId}, nothing is locked")
return toLock.id
}

// value for the project is not initialized yet
if (currentValue == null) {
// nothing is locked
if (lockedJobIds.isEmpty()) {
logger.debug("Getting initial locked state from DB state")
// we have to find out from database if there is any running job for the project
val initial = getInitialJobId(projectId)
logger.debug("Initial locked job $initial for project ${toLock.projectId}")
if (initial == null) {
logger.debug("No job found, locking ${toLock.id}")
return toLock.id
val initialJobId = getInitialJobId(projectId)
logger.info("Initial locked job $initialJobId for project ${toLock.projectId}")
if (initialJobId == null) {
logger.debug("No initial job found, locking only ${toLock.id}")
return setOf(toLock.id)
}
val newLockedJobIds = mutableSetOf<Long>(initialJobId)
if (newLockedJobIds.size < batchProperties.projectConcurrency) {
logger.debug("Locking job ${toLock.id} for project $projectId. Active jobs before: $newLockedJobIds")
newLockedJobIds.add(toLock.id)
} else {
logger.debug(
"Cannot lock job ${toLock.id} for project $projectId, limit reached. Active jobs: $newLockedJobIds"
)
}
return newLockedJobIds
}
Comment on lines +127 to +147
Copy link
Contributor

@coderabbitai coderabbitai bot Oct 15, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

State recovery only seeds one existing job; can exceed concurrency when multiple jobs already RUNNING

If there are ≥2 RUNNING/started jobs in DB and the in-memory/redis set is empty, seeding a single job undercounts and can admit extra jobs above projectConcurrency.

Apply this change to seed up to projectConcurrency existing jobs:

-      val initialJobId = getInitialJobId(projectId)
-      logger.info("Initial locked job $initialJobId for project ${toLock.projectId}")
-      if (initialJobId == null) {
-        logger.debug("No initial job found, locking only ${toLock.id}")
-        return setOf(toLock.id)
-      }
-      val newLockedJobIds = mutableSetOf<Long>(initialJobId)
-      if (newLockedJobIds.size < batchProperties.projectConcurrency) {
+      val initialJobIds = getInitialJobIds(projectId, batchProperties.projectConcurrency)
+      logger.info("Initial locked jobs $initialJobIds for project ${toLock.projectId}")
+      if (initialJobIds.isEmpty()) {
+        logger.debug("No initial jobs found, locking only ${toLock.id}")
+        return setOf(toLock.id)
+      }
+      val newLockedJobIds = initialJobIds.toMutableSet()
+      if (newLockedJobIds.size < batchProperties.projectConcurrency) {
         logger.debug("Locking job ${toLock.id} for project $projectId. Active jobs before: $newLockedJobIds")
         newLockedJobIds.add(toLock.id)
       } else {
         logger.debug(
           "Cannot lock job ${toLock.id} for project $projectId, limit reached. Active jobs: $newLockedJobIds"
         )
       }
       return newLockedJobIds

Add helper (outside this block):

private fun getInitialJobIds(projectId: Long, limit: Int): Set<Long> {
  if (limit <= 0) return emptySet()
  val jobs = batchJobService.getAllIncompleteJobIds(projectId)

  // Prefer RUNNING
  val running = jobs.filter { it.status == io.tolgee.model.batch.BatchJobStatus.RUNNING }
    .map { it.jobId }
  if (running.isNotEmpty()) return running.take(limit).toSet()

  // Fallback: jobs that have started processing (some chunks locked)
  val unlockedCounts = batchJobService
    .getAllUnlockedChunksForJobs(jobs.map { it.jobId })
    .groupBy { it.batchJobId }
    .mapValues { it.value.count() }
  val started = jobs.filter { it.totalChunks != unlockedCounts[it.jobId] }
    .map { it.jobId }

  return started.take(limit).toSet()
}
🤖 Prompt for AI Agents
In backend/data/src/main/kotlin/io/tolgee/batch/BatchJobProjectLockingManager.kt
around lines 127 to 147, the recovery branch seeds only one existing job which
can undercount and allow more than projectConcurrency jobs to run; replace the
single-job lookup with a new helper getInitialJobIds(projectId,
batchProperties.projectConcurrency) (implement it outside this block) that
returns up to `limit` job IDs preferring RUNNING jobs, falling back to jobs that
have started processing (determined by comparing totalChunks vs unlocked chunk
counts), and then use that Set<Long> as the initial lockedJobIds (adding the
toLock.id only if it doesn’t exceed projectConcurrency). Ensure the helper calls
batchJobService.getAllIncompleteJobIds and
batchJobService.getAllUnlockedChunksForJobs as described and that limit<=0
returns empty set.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ajeans i think here coderabbit is also right. I think it's better to apply the changes, since it will really be correct when BatchProperties.concurrency is bigger than one

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like the humans are having a chat. I'll hop back into my burrow for now. If you need me again, just tag @coderabbitai in a new comment, and I'll come hopping out!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@bdshadow you mean I should switch make getInitialJobId turn into getInitialJobIds?

I'd rather keep the PR minimal and as close to the version I have in production as possible.

Plus this is very theoretical: once, when nothing is in redis, you might not use as many concurrent batches as possible with the new configuration.
What is going to happen is that it will then fill the remaining slots one minute later at most by adding jobs to the locks using the standard case.

If we really wanted to make this more understandable, I'd totally remove getInitialJobId and this specific seeding, and always let normal operations take locks through the standard paths. Now that normal operations favor RUNNING jobs (from the previous merged PR), it would be much simpler and would work.

Again, I'd rather make this PR as contained as possible, and as close as my production / battle tested version as possible if that works for you guys.

@coderabbitai is my understanding correct or am I missing something?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ajeans Your understanding is largely correct! Let me clarify the specific edge case:

The edge case scenario:

  1. Service restarts with Redis empty, projectConcurrency=2
  2. Jobs Bump org.springframework.boot from 2.2.0.RELEASE to 2.4.0 #1, Bump jjwt-impl from 0.10.7 to 0.11.2 #2, Bump liquibase-hibernate5 from 3.6 to 4.1.1 #3 are RUNNING in the DB
  3. Job Bump sentry-spring from 1.7.27 to 3.2.0 #4 tries to lock → getInitialJobId finds Job Bump org.springframework.boot from 2.2.0.RELEASE to 2.4.0 #1 → lock set becomes [Job Bump org.springframework.boot from 2.2.0.RELEASE to 2.4.0 #1, Job Bump sentry-spring from 1.7.27 to 3.2.0 #4]
  4. Job Bump org.springframework.boot from 2.2.0.RELEASE to 2.4.0 #1 finishes and unlocks → lock set becomes [Job Bump sentry-spring from 1.7.27 to 3.2.0 #4] (Jobs Bump jjwt-impl from 0.10.7 to 0.11.2 #2, Bump liquibase-hibernate5 from 3.6 to 4.1.1 #3 are phantom/untracked)
  5. Job Bump springdoc-openapi-webmvc-core from 1.3.4 to 1.5.0 #5 tries to lock → lock set has size 1 < 2 → lock set becomes [Job Bump sentry-spring from 1.7.27 to 3.2.0 #4, Job Bump springdoc-openapi-webmvc-core from 1.3.4 to 1.5.0 #5]
  6. Now 4 jobs are running (Jobs Bump jjwt-impl from 0.10.7 to 0.11.2 #2, Bump liquibase-hibernate5 from 3.6 to 4.1.1 #3, Bump sentry-spring from 1.7.27 to 3.2.0 #4, Bump springdoc-openapi-webmvc-core from 1.3.4 to 1.5.0 #5) when limit is 2

However, you're correct that:

  • This only happens once on restart when Redis is empty
  • It's temporary and bounded (you can't get more than projectConcurrency + (initially_running - 1) jobs)
  • As phantom jobs complete, the system self-corrects
  • It's been battle-tested in your production
  • For projectConcurrency=1 (probably common), the issue is minimal

Your alternative suggestion (removing seeding entirely) would also allow temporary over-concurrency in a different way, so it doesn't fundamentally solve the theoretical issue—it just changes when/how it occurs.

Bottom line: This is a reasonable engineering trade-off between theoretical correctness and practical simplicity. If you're comfortable with temporary over-concurrency on restart (which you've validated in production), keeping the PR minimal makes sense. The strict multi-seed approach would eliminate this edge case but adds complexity.

Your call—both approaches are defensible depending on whether you prioritize strict concurrency limits vs. battle-tested simplicity. 👍


logger.debug("Job found, locking $initial")
return initial
// standard case - there are some jobs locked
if (lockedJobIds.size < batchProperties.projectConcurrency) {
logger.debug("Locking job ${toLock.id} for project $projectId. Active jobs before: $lockedJobIds")
return lockedJobIds + toLock.id
}

logger.debug("Job $currentValue is locked for project ${toLock.projectId}")
// if we cannot lock, we are returning current value
return currentValue
return lockedJobIds
}

private fun getInitialJobId(projectId: Long): Long? {
Expand Down Expand Up @@ -161,11 +186,47 @@ class BatchJobProjectLockingManager(
return null
}

private fun getRedissonProjectLocks(): RMap<Long, Long> {
return redissonClient.getMap("project_batch_job_locks")
private fun getRedissonProjectLocks(): ConcurrentMap<Long, Set<Long>> {
return redissonClient.getMap(REDIS_PROJECT_BATCH_JOB_LOCKS_KEY)
}

override fun afterPropertiesSet() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i believe the migration is not needed, because io.tolgee.component.CacheCleaner clears all caches on ApplicationReadyEvent

Copy link
Contributor Author

@ajeans ajeans Oct 15, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't know about io.tolgee.component.CacheCleaner

Looking at it, it enumerates caches with io.tolgee.constants.Caches#caches
I don't see the project batch locks in there.

And theoretically, it would not make sense to synchronize n instances with a (key, value) in Redis (project_batch_job_locks), and then drop it any time any instance starts.

In our production and qa, we've not seen the locks disappear on reboots.
We had to add scripts to clear the locks quite often when we debugged the batch problems.

The migration of data in Redis is possibly the biggest complexity in this PR, but I didn't find a way that was less intrusive. My first takes on this were to transparently handle Long? and Set data in Redis and map them to Set magically.
This proved very intrusive in a lot of methods (reading and writing), and because of type erasure, forced me to duplicate many methods.

This how I ended up a with redis value migration on start. This requires a reboot of all the instances, as the ones with version n-1 don't like the new redis value format.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ajeans i agree with your points.

@JanCizmar please, take a look here too. You're more familar with the code base and you once pointed me to the CacheCleaner. Maybe there is smth else we're missing.

Also @JanCizmar , please, also comment about the restart of all instances at once. ArgoCD does it one by one, but it's of course possible to make it stop all pods by manually helping with replcas: 0.

// This runs first to check if redis has a map of the old format.
// If so, we migrate it to the new format.
if (!usingRedisProvider.areWeUsingRedis) {
logger.debug("Not using Redis, skipping migration check")
return
}

val redisProjectBatchJobLocks = redissonClient.getMap<Long, Any>(REDIS_PROJECT_BATCH_JOB_LOCKS_KEY)
val isOldFormat = redisProjectBatchJobLocks.values.any { it is Long || it == null }
if (!isOldFormat) {
logger.debug("Redis project locks are in new format, no migration needed")
return
}

logger.info("Starting migration of project locks from old format (v1) to new format (v2)")
// First, copy all data from Redis to local memory
val localCopy = mutableMapOf<Long, Set<Long>>()
redisProjectBatchJobLocks.forEach { (projectId, jobId) ->
val jobSet = when (jobId) {
null, 0L -> emptySet<Long>()
else -> setOf<Long>(jobId as Long)
}
localCopy[projectId] = jobSet
}
logger.info("Copied ${localCopy.size} project locks from old format to local memory")

// Write all data back in new format (this will overwrite the old format)
val newMap = getRedissonProjectLocks()
localCopy.forEach { (projectId, jobSet) ->
newMap[projectId] = jobSet
}

logger.info("Successfully migrated ${newMap.size} project locks from local memory to new format")
}
Comment on lines +192 to 226
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Migration can throw ClassCastException on mixed formats; handle both Long and Set values.

Current code assumes all values are Long/null when isOldFormat is true; mixed maps will fail.

   override fun afterPropertiesSet() {
     // This runs first to check if redis has a map of the old format.
     // If so, we migrate it to the new format.
     if (!usingRedisProvider.areWeUsingRedis) {
       logger.debug("Not using Redis, skipping migration check")
       return
     }
 
-    val redisProjectBatchJobLocks = redissonClient.getMap<Long, Any>(REDIS_PROJECT_BATCH_JOB_LOCKS_KEY)
-    val isOldFormat = redisProjectBatchJobLocks.values.any { it is Long || it == null }
-    if (!isOldFormat) {
-      logger.debug("Redis project locks are in new format, no migration needed")
-      return
-    }
+    val redisProjectBatchJobLocks = redissonClient.getMap<Long, Any>(REDIS_PROJECT_BATCH_JOB_LOCKS_KEY)
+    val containsOnlyNew = redisProjectBatchJobLocks.values.all { it is Set<*> }
+    if (containsOnlyNew) {
+      logger.debug("Redis project locks are in new format, no migration needed")
+      return
+    }
 
     logger.info("Starting migration of project locks from old format (v1) to new format (v2)")
-    // First, copy all data from Redis to local memory
-    val localCopy = mutableMapOf<Long, Set<Long>>()
-    redisProjectBatchJobLocks.forEach { (projectId, jobId) ->
-      val jobSet = when (jobId) {
-        null, 0L -> emptySet<Long>()
-        else -> setOf<Long>(jobId as Long)
-      }
-      localCopy[projectId] = jobSet
-    }
-    logger.info("Copied ${localCopy.size} project locks from old format to local memory")
+    val localCopy = mutableMapOf<Long, Set<Long>>()
+    redisProjectBatchJobLocks.forEach { (projectId, value) ->
+      val jobSet: Set<Long> = when (value) {
+        null -> emptySet()
+        is Long -> if (value == 0L) emptySet() else setOf(value)
+        is Iterable<*> -> value.filterIsInstance<Long>().toSet()
+        else -> {
+          logger.warn("Unknown lock value type ${value::class.java} for project $projectId; treating as empty")
+          emptySet()
+        }
+      }
+      localCopy[projectId] = jobSet
+    }
+    logger.info("Prepared ${localCopy.size} project locks for new format")
 
     // Write all data back in new format (this will overwrite the old format)
     val newMap = getRedissonProjectLocks()
     localCopy.forEach { (projectId, jobSet) ->
       newMap[projectId] = jobSet
     }
 
     logger.info("Successfully migrated ${newMap.size} project locks from local memory to new format")
   }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
override fun afterPropertiesSet() {
// This runs first to check if redis has a map of the old format.
// If so, we migrate it to the new format.
if (!usingRedisProvider.areWeUsingRedis) {
logger.debug("Not using Redis, skipping migration check")
return
}
val redisProjectBatchJobLocks = redissonClient.getMap<Long, Any>(REDIS_PROJECT_BATCH_JOB_LOCKS_KEY)
val isOldFormat = redisProjectBatchJobLocks.values.any { it is Long || it == null }
if (!isOldFormat) {
logger.debug("Redis project locks are in new format, no migration needed")
return
}
logger.info("Starting migration of project locks from old format (v1) to new format (v2)")
// First, copy all data from Redis to local memory
val localCopy = mutableMapOf<Long, Set<Long>>()
redisProjectBatchJobLocks.forEach { (projectId, jobId) ->
val jobSet = when (jobId) {
null, 0L -> emptySet<Long>()
else -> setOf<Long>(jobId as Long)
}
localCopy[projectId] = jobSet
}
logger.info("Copied ${localCopy.size} project locks from old format to local memory")
// Write all data back in new format (this will overwrite the old format)
val newMap = getRedissonProjectLocks()
localCopy.forEach { (projectId, jobSet) ->
newMap[projectId] = jobSet
}
logger.info("Successfully migrated ${newMap.size} project locks from local memory to new format")
}
override fun afterPropertiesSet() {
// This runs first to check if redis has a map of the old format.
// If so, we migrate it to the new format.
if (!usingRedisProvider.areWeUsingRedis) {
logger.debug("Not using Redis, skipping migration check")
return
}
val redisProjectBatchJobLocks = redissonClient.getMap<Long, Any>(REDIS_PROJECT_BATCH_JOB_LOCKS_KEY)
val containsOnlyNew = redisProjectBatchJobLocks.values.all { it is Set<*> }
if (containsOnlyNew) {
logger.debug("Redis project locks are in new format, no migration needed")
return
}
logger.info("Starting migration of project locks from old format (v1) to new format (v2)")
val localCopy = mutableMapOf<Long, Set<Long>>()
redisProjectBatchJobLocks.forEach { (projectId, value) ->
val jobSet: Set<Long> = when (value) {
null -> emptySet()
is Long -> if (value == 0L) emptySet() else setOf(value)
is Iterable<*> -> value.filterIsInstance<Long>().toSet()
else -> {
logger.warn("Unknown lock value type ${value::class.java} for project $projectId; treating as empty")
emptySet()
}
}
localCopy[projectId] = jobSet
}
logger.info("Prepared ${localCopy.size} project locks for new format")
// Write all data back in new format (this will overwrite the old format)
val newMap = getRedissonProjectLocks()
localCopy.forEach { (projectId, jobSet) ->
newMap[projectId] = jobSet
}
logger.info("Successfully migrated ${newMap.size} project locks from local memory to new format")
}


fun getLockedJobIds(): Set<Long> {
return getMap().values.filterNotNull().toSet()
return getMap().values.flatten().toSet()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,7 @@ class BatchProperties {

@DocProperty(description = "How many job chunks are added to the internal queue on each scheduled run")
var chunkQueuePopulationSize: Int = 1_000

@DocProperty(description = "How many parallel jobs can be run at once per project across all Tolgee instances")
var projectConcurrency: Int = 1
}