diff --git a/backend/api/src/main/kotlin/io/tolgee/api/v2/controllers/administration/ProjectBatchLockController.kt b/backend/api/src/main/kotlin/io/tolgee/api/v2/controllers/administration/ProjectBatchLockController.kt index c140adad38..4fde515706 100644 --- a/backend/api/src/main/kotlin/io/tolgee/api/v2/controllers/administration/ProjectBatchLockController.kt +++ b/backend/api/src/main/kotlin/io/tolgee/api/v2/controllers/administration/ProjectBatchLockController.kt @@ -47,14 +47,13 @@ class ProjectBatchLockController( logger.debug("Retrieving all project batch locks") val locks = batchJobProjectLockingManager.getMap() - val lockModels = locks.map { (projectId, lockedJobId) -> - val lockStatus = when (lockedJobId) { - null -> LockStatus.UNINITIALIZED - 0L -> LockStatus.UNLOCKED + val lockModels = locks.entries.map { (projectId, lockedJobIds) -> + val lockStatus = when { + lockedJobIds.isEmpty() -> LockStatus.UNLOCKED else -> LockStatus.LOCKED } - val jobInfo = if (lockedJobId != null && lockedJobId > 0L) { + val jobInfos = lockedJobIds.mapNotNull { lockedJobId -> val jobDto = batchJobService.findJobDto(lockedJobId) if (jobDto == null) { logger.warn("Locked job $lockedJobId in project $projectId not found") @@ -67,15 +66,13 @@ class ProjectBatchLockController( createdAt = jobDto.createdAt ) } - } else { - null } ProjectLockModel( projectId = projectId, - lockedJobId = lockedJobId, + lockedJobIds = lockedJobIds, lockStatus = lockStatus, - jobInfo = jobInfo + jobInfos = jobInfos ) } @@ -113,9 +110,9 @@ class ProjectBatchLockController( */ data class ProjectLockModel( val projectId: Long, - val lockedJobId: Long?, + val lockedJobIds: Set, val lockStatus: LockStatus, - val jobInfo: JobInfo? + val jobInfos: List ) /** @@ -132,13 +129,10 @@ data class JobInfo( * Status of the project lock */ enum class LockStatus { - /** Lock is explicitly cleared (value = 0L) */ + /** Lock is explicitly cleared (value = empty set) */ UNLOCKED, - /** Lock has never been initialized (value = null) */ - UNINITIALIZED, - - /** Lock is held by a specific job (value = jobId) */ + /** Lock is held by one or more jobs (value = set of job IDs) */ LOCKED } diff --git a/backend/app/src/test/kotlin/io/tolgee/api/v2/controllers/batch/BatchJobManagementControllerCancellationTest.kt b/backend/app/src/test/kotlin/io/tolgee/api/v2/controllers/batch/BatchJobManagementControllerCancellationTest.kt index 054b8a1007..d49775bc2c 100644 --- a/backend/app/src/test/kotlin/io/tolgee/api/v2/controllers/batch/BatchJobManagementControllerCancellationTest.kt +++ b/backend/app/src/test/kotlin/io/tolgee/api/v2/controllers/batch/BatchJobManagementControllerCancellationTest.kt @@ -1,5 +1,6 @@ package io.tolgee.api.v2.controllers.batch +import io.tolgee.batch.BatchJobProjectLockingManager import io.tolgee.fixtures.andIsForbidden import io.tolgee.fixtures.andIsOk import io.tolgee.fixtures.waitFor @@ -14,6 +15,8 @@ import kotlinx.coroutines.ensureActive import org.junit.jupiter.api.AfterEach import org.junit.jupiter.api.BeforeEach import org.junit.jupiter.api.Test +import org.junit.jupiter.params.ParameterizedTest +import org.junit.jupiter.params.provider.ValueSource import org.mockito.kotlin.* import org.springframework.beans.factory.annotation.Autowired import org.springframework.boot.test.context.SpringBootTest @@ -30,22 +33,34 @@ class BatchJobManagementControllerCancellationTest : @Autowired lateinit var stuckBatchJobTestUtil: StuckBatchJobTestUtil + @Autowired + lateinit var batchJobProjectLockingManager: BatchJobProjectLockingManager + + @Autowired + lateinit var batchProperties: io.tolgee.configuration.tolgee.BatchProperties + var simulateLongRunningChunkRun = true + var originalProjectConcurrency = 1 @BeforeEach fun setup() { simulateLongRunningChunkRun = true + originalProjectConcurrency = batchProperties.projectConcurrency } @AfterEach fun clean() { simulateLongRunningChunkRun = false + batchProperties.projectConcurrency = originalProjectConcurrency } - @Test + @ParameterizedTest + @ValueSource(ints = [1, 2]) @ProjectJWTAuthTestMethod - fun `cancels a job`() { - batchDumper.finallyDump { + fun `cancels a job`(projectConcurrency: Int) { + batchProperties.projectConcurrency = projectConcurrency + + batchDumper.finallyDumpAll { val keys = testData.addTranslationOperationData(100) saveAndPrepare() @@ -81,6 +96,11 @@ class BatchJobManagementControllerCancellationTest : } val job = util.getSingleJob() + + // Verify the job is locked + val lockedJobs = batchJobProjectLockingManager.getLockedForProject(testData.project.id) + lockedJobs.assert.contains(job.id) + performProjectAuthPut("batch-jobs/${job.id}/cancel") .andIsOk @@ -95,6 +115,10 @@ class BatchJobManagementControllerCancellationTest : .assert.hasSize(1) } } + + // Verify the job lock was released + val lockedJobsAfterCancel = batchJobProjectLockingManager.getLockedForProject(testData.project.id) + lockedJobsAfterCancel.assert.doesNotContain(job.id) } } @@ -135,4 +159,106 @@ class BatchJobManagementControllerCancellationTest : util.getSingleJob().status.assert.isEqualTo(BatchJobStatus.CANCELLED) } + + @Test + @ProjectJWTAuthTestMethod + fun `cancels one of two running jobs with projectConcurrency=2`() { + val keys = testData.addTranslationOperationData(100) + saveAndPrepare() + + batchProperties.projectConcurrency = 2 + + val keyIds = keys.map { it.id }.toList() + + val firstJobCount = AtomicInteger(0) + val secondJobCount = AtomicInteger(0) + + doAnswer { + val jobId = it.getArgument(0).id + val allJobs = executeInNewTransaction { + batchJobService.getAllByProjectId(testData.project.id) + } + val isFirstJob = allJobs.size > 0 && jobId == allJobs[0].id + + val counter = if (isFirstJob) firstJobCount else secondJobCount + + if (counter.incrementAndGet() > 5) { + while (simulateLongRunningChunkRun) { + // this simulates long-running operation, which checks for active context + val context = it.arguments[2] as CoroutineContext + context.ensureActive() + Thread.sleep(10) + } + } + it.callRealMethod() + }.whenever(machineTranslationChunkProcessor).process(any(), any(), any(), any()) + + // Start first job + performProjectAuthPost( + "start-batch-job/machine-translate", + mapOf( + "keyIds" to keyIds, + "targetLanguageIds" to + listOf( + testData.projectBuilder.getLanguageByTag("cs")!!.self.id, + ), + ), + ).andIsOk + + waitFor(timeout = 5000) { + batchJobConcurrentLauncher.runningJobs.size >= 5 + } + + // Start second job + performProjectAuthPost( + "start-batch-job/machine-translate", + mapOf( + "keyIds" to keyIds, + "targetLanguageIds" to + listOf( + testData.projectBuilder.getLanguageByTag("cs")!!.self.id, + ), + ), + ).andIsOk + + waitFor(timeout = 5000) { + batchJobConcurrentLauncher.runningJobs.size >= 10 + } + + val jobs = executeInNewTransaction { + batchJobService.getAllByProjectId(testData.project.id) + } + jobs.size.assert.isEqualTo(2) + + // Verify both jobs are locked + val lockedJobs = batchJobProjectLockingManager.getLockedForProject(testData.project.id) + lockedJobs.size.assert.isEqualTo(2) + lockedJobs.assert.contains(jobs[0].id) + lockedJobs.assert.contains(jobs[1].id) + + // Cancel the first job + val firstJob = jobs[0] + performProjectAuthPut("batch-jobs/${firstJob.id}/cancel") + .andIsOk + + waitForNotThrowing(pollTime = 100) { + executeInNewTransaction { + batchJobService.getJobDto(firstJob.id).status.assert.isEqualTo(BatchJobStatus.CANCELLED) + } + } + + // Verify the first job lock was released but second job is still locked + val lockedJobsAfterCancel = batchJobProjectLockingManager.getLockedForProject(testData.project.id) + lockedJobsAfterCancel.assert.doesNotContain(firstJob.id) + lockedJobsAfterCancel.assert.contains(jobs[1].id) + + // Let the second job complete + simulateLongRunningChunkRun = false + + waitForNotThrowing(pollTime = 100, timeout = 10000) { + executeInNewTransaction { + batchJobService.getJobDto(jobs[1].id).status.assert.isEqualTo(BatchJobStatus.SUCCESS) + } + } + } } diff --git a/backend/app/src/test/kotlin/io/tolgee/api/v2/controllers/batch/BatchJobManagementControllerTest.kt b/backend/app/src/test/kotlin/io/tolgee/api/v2/controllers/batch/BatchJobManagementControllerTest.kt index 501677e0c0..aba4508966 100644 --- a/backend/app/src/test/kotlin/io/tolgee/api/v2/controllers/batch/BatchJobManagementControllerTest.kt +++ b/backend/app/src/test/kotlin/io/tolgee/api/v2/controllers/batch/BatchJobManagementControllerTest.kt @@ -9,7 +9,10 @@ import io.tolgee.testing.assert import io.tolgee.util.Logging import io.tolgee.util.addMinutes import org.junit.jupiter.api.AfterEach +import org.junit.jupiter.api.BeforeEach import org.junit.jupiter.api.Test +import org.junit.jupiter.params.ParameterizedTest +import org.junit.jupiter.params.provider.ValueSource import org.mockito.kotlin.any import org.mockito.kotlin.doAnswer import org.mockito.kotlin.whenever @@ -23,10 +26,24 @@ class BatchJobManagementControllerTest : AbstractBatchJobManagementControllerTes @Autowired lateinit var throwingService: ThrowingService + @Autowired + lateinit var batchProperties: io.tolgee.configuration.tolgee.BatchProperties + + @Autowired + lateinit var batchJobProjectLockingManager: io.tolgee.batch.BatchJobProjectLockingManager + + var originalProjectConcurrency = 1 + + @BeforeEach + fun setup() { + originalProjectConcurrency = batchProperties.projectConcurrency + } + @AfterEach fun after() { batchJobConcurrentLauncher.pause = false clearForcedDate() + batchProperties.projectConcurrency = originalProjectConcurrency } @Test @@ -70,10 +87,12 @@ class BatchJobManagementControllerTest : AbstractBatchJobManagementControllerTes } } - @Test + @ParameterizedTest + @ValueSource(ints = [1, 2]) @ProjectJWTAuthTestMethod - fun `returns list of jobs`() { + fun `returns list of jobs`(projectConcurrency: Int) { saveAndPrepare() + batchProperties.projectConcurrency = projectConcurrency val jobIds = ConcurrentHashMap.newKeySet() var wait = true @@ -81,7 +100,7 @@ class BatchJobManagementControllerTest : AbstractBatchJobManagementControllerTes try { doAnswer { val id = it.getArgument(0).id - if (jobIds.size == 2 && !jobIds.contains(id)) { + if (jobIds.size == projectConcurrency && !jobIds.contains(id)) { while (wait) { Thread.sleep(100) } @@ -94,6 +113,7 @@ class BatchJobManagementControllerTest : AbstractBatchJobManagementControllerTes val jobs = (1..3).map { util.runChunkedJob(50) } + // With projectConcurrency=N, we should be able to run N jobs concurrently waitForNotThrowing(pollTime = 1000, timeout = 10000) { val dtos = jobs.map { batchJobService.getJobDto(it.id) } dtos.forEach { @@ -102,10 +122,14 @@ class BatchJobManagementControllerTest : AbstractBatchJobManagementControllerTes "Job ${it.id} status ${it.status} progress: ${state?.values?.sumOf { it.successTargets.size }}", ) } - dtos.count { it.status == BatchJobStatus.SUCCESS }.assert.isEqualTo(2) + dtos.count { it.status == BatchJobStatus.SUCCESS }.assert.isEqualTo(projectConcurrency) dtos.count { it.status == BatchJobStatus.RUNNING }.assert.isEqualTo(1) } + // Verify that N jobs are locked for the project + val lockedJobs = batchJobProjectLockingManager.getLockedForProject(testData.project.id) + lockedJobs.size.assert.isEqualTo(projectConcurrency) + performProjectAuthGet("batch-jobs?sort=status&sort=id") .andIsOk.andAssertThatJson { node("_embedded.batchJobs") { @@ -131,6 +155,10 @@ class BatchJobManagementControllerTest : AbstractBatchJobManagementControllerTes node("[0].status").isEqualTo("SUCCESS") } } + + // Verify all locks are released after completion + val lockedJobsAfter = batchJobProjectLockingManager.getLockedForProject(testData.project.id) + lockedJobsAfter.assert.isEmpty() } finally { wait = false } @@ -164,10 +192,12 @@ class BatchJobManagementControllerTest : AbstractBatchJobManagementControllerTes } } - @Test + @ParameterizedTest + @ValueSource(ints = [1, 2]) @ProjectJWTAuthTestMethod - fun `returns list of current jobs`() { + fun `returns list of current jobs`(projectConcurrency: Int) { saveAndPrepare() + batchProperties.projectConcurrency = projectConcurrency var wait = true doAnswer { @@ -185,13 +215,23 @@ class BatchJobManagementControllerTest : AbstractBatchJobManagementControllerTes .andIsOk.andPrettyPrint.andAssertThatJson { node("_embedded.batchJobs") { isArray.hasSize(6) + // With projectConcurrency=N, we should have N RUNNING jobs and (6-N) PENDING node("[0].status").isEqualTo("RUNNING") - node("[1].status").isEqualTo("PENDING") - node("[2].status").isEqualTo("PENDING") + if (projectConcurrency == 2) { + node("[1].status").isEqualTo("RUNNING") + node("[2].status").isEqualTo("PENDING") + } else { + node("[1].status").isEqualTo("PENDING") + node("[2].status").isEqualTo("PENDING") + } } } } + // Verify that N jobs are locked for the project + val lockedJobs = batchJobProjectLockingManager.getLockedForProject(testData.project.id) + lockedJobs.size.assert.isEqualTo(projectConcurrency) + wait = false waitForNotThrowing(pollTime = 1000, timeout = 10000) { @@ -220,6 +260,10 @@ class BatchJobManagementControllerTest : AbstractBatchJobManagementControllerTes .andIsOk.andAssertThatJson { node("_embedded.batchJobs").isAbsent() } + + // Verify all locks are released after completion + val lockedJobsAfter = batchJobProjectLockingManager.getLockedForProject(testData.project.id) + lockedJobsAfter.assert.isEmpty() } finally { wait = false } diff --git a/backend/app/src/test/kotlin/io/tolgee/batch/BatchJobTestUtil.kt b/backend/app/src/test/kotlin/io/tolgee/batch/BatchJobTestUtil.kt index 5228abee9f..7b43a9e90a 100644 --- a/backend/app/src/test/kotlin/io/tolgee/batch/BatchJobTestUtil.kt +++ b/backend/app/src/test/kotlin/io/tolgee/batch/BatchJobTestUtil.kt @@ -409,7 +409,7 @@ class BatchJobTestUtil( fun verifyProjectJobLockReleased() { waitFor(pollTime = 200, timeout = 1000) { - batchJobProjectLockingManager.getLockedForProject(testData.projectBuilder.self.id) == 0L + batchJobProjectLockingManager.getLockedForProject(testData.projectBuilder.self.id).isEmpty() } } diff --git a/backend/app/src/test/kotlin/io/tolgee/util/BatchDumper.kt b/backend/app/src/test/kotlin/io/tolgee/util/BatchDumper.kt index c6563cb75e..95081e35b4 100644 --- a/backend/app/src/test/kotlin/io/tolgee/util/BatchDumper.kt +++ b/backend/app/src/test/kotlin/io/tolgee/util/BatchDumper.kt @@ -78,8 +78,20 @@ class BatchDumper( } } + fun finallyDumpAll(fn: () -> T): T { + return try { + fn() + } finally { + getAllJobs().forEach { + this.dump(it.id) + } + } + } + fun getSingleJob(): BatchJob = entityManager.createQuery("""from BatchJob""", BatchJob::class.java).singleResult + fun getAllJobs(): List = entityManager.createQuery("""from BatchJob""", BatchJob::class.java).resultList + private fun dumpQueuedItems( jobId: Long, stringBuilder: StringBuilder, diff --git a/backend/data/src/main/kotlin/io/tolgee/batch/BatchJobConcurrentLauncher.kt b/backend/data/src/main/kotlin/io/tolgee/batch/BatchJobConcurrentLauncher.kt index 39e55f6dff..78c8716b7d 100644 --- a/backend/data/src/main/kotlin/io/tolgee/batch/BatchJobConcurrentLauncher.kt +++ b/backend/data/src/main/kotlin/io/tolgee/batch/BatchJobConcurrentLauncher.kt @@ -170,7 +170,7 @@ class BatchJobConcurrentLauncher( } /** - * Only single job can run in project at the same time + * There is a project level lock with configurable n concurrent locks allowed. */ if (!batchJobProjectLockingManager.canLockJobForProject(executionItem.jobId)) { logger.debug( diff --git a/backend/data/src/main/kotlin/io/tolgee/batch/BatchJobProjectLockingManager.kt b/backend/data/src/main/kotlin/io/tolgee/batch/BatchJobProjectLockingManager.kt index dc1be2b728..6993e37618 100644 --- a/backend/data/src/main/kotlin/io/tolgee/batch/BatchJobProjectLockingManager.kt +++ b/backend/data/src/main/kotlin/io/tolgee/batch/BatchJobProjectLockingManager.kt @@ -2,15 +2,18 @@ package io.tolgee.batch import io.tolgee.batch.data.BatchJobDto import io.tolgee.component.UsingRedisProvider +import io.tolgee.configuration.tolgee.BatchProperties import io.tolgee.util.Logging import io.tolgee.util.logger -import org.redisson.api.RMap import org.redisson.api.RedissonClient +import org.springframework.beans.factory.InitializingBean import org.springframework.context.annotation.Lazy import org.springframework.stereotype.Component import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.ConcurrentMap +private const val REDIS_PROJECT_BATCH_JOB_LOCKS_KEY = "project_batch_job_locks" + /** * Only single job can be executed at the same time for one project. * @@ -19,13 +22,14 @@ import java.util.concurrent.ConcurrentMap @Component class BatchJobProjectLockingManager( private val batchJobService: BatchJobService, + private val batchProperties: BatchProperties, @Lazy private val redissonClient: RedissonClient, private val usingRedisProvider: UsingRedisProvider, -) : Logging { +) : Logging, InitializingBean { companion object { private val localProjectLocks by lazy { - ConcurrentHashMap() + ConcurrentHashMap>() } } @@ -51,18 +55,19 @@ class BatchJobProjectLockingManager( jobId: Long, ) { projectId ?: return - getMap().compute(projectId) { _, lockedJobId -> - logger.debug("Unlocking job: $jobId for project $projectId") - if (lockedJobId == jobId) { + getMap().compute(projectId) { _, lockedJobIds -> + val currentJobs = lockedJobIds ?: emptySet() + if (currentJobs.contains(jobId)) { logger.debug("Unlocking job: $jobId for project $projectId") - return@compute 0L + val updatedJobs = currentJobs - jobId + return@compute updatedJobs.ifEmpty { emptySet() } } logger.debug("Job: $jobId for project $projectId is not locked") - return@compute lockedJobId + return@compute currentJobs } } - fun getMap(): ConcurrentMap { + fun getMap(): ConcurrentMap> { if (usingRedisProvider.areWeUsingRedis) { return getRedissonProjectLocks() } @@ -71,65 +76,84 @@ class BatchJobProjectLockingManager( private fun tryLockWithRedisson(batchJobDto: BatchJobDto): Boolean { val projectId = batchJobDto.projectId ?: return true - val computed = - getRedissonProjectLocks().compute(projectId) { _, value -> - computeFnBody(batchJobDto, value) - } - return computed == batchJobDto.id + val computedJobIds = + getRedissonProjectLocks().compute(projectId) { _, lockedJobIds -> + val newLockedJobIds = computeLockedJobIdsForProject(batchJobDto, lockedJobIds ?: emptySet()) + logger.debug( + "While trying to lock on redis {} for project {} new lock value is {}", + batchJobDto.id, + batchJobDto.projectId, + newLockedJobIds + ) + newLockedJobIds + } ?: emptySet() + return computedJobIds.contains(batchJobDto.id) } - fun getLockedForProject(projectId: Long): Long? { + fun getLockedForProject(projectId: Long): Set { if (usingRedisProvider.areWeUsingRedis) { - return getRedissonProjectLocks()[projectId] + return getRedissonProjectLocks()[projectId] ?: emptySet() } - return localProjectLocks[projectId] + return localProjectLocks[projectId] ?: emptySet() } - private fun tryLockLocal(toLock: BatchJobDto): Boolean { - val projectId = toLock.projectId ?: return true - val computed = - localProjectLocks.compute(projectId) { _, value -> - val newLocked = computeFnBody(toLock, value) - logger.debug("While trying to lock ${toLock.id} for project ${toLock.projectId} new lock value is $newLocked") - newLocked - } - return computed == toLock.id + private fun tryLockLocal(batchJobDto: BatchJobDto): Boolean { + val projectId = batchJobDto.projectId ?: return true + val computedJobIds = + localProjectLocks.compute(projectId) { _, lockedJobIds -> + val newLockedJobIds = computeLockedJobIdsForProject(batchJobDto, lockedJobIds ?: emptySet()) + logger.debug( + "While trying to lock locally {} for project {} new lock value is {}", + batchJobDto.id, + batchJobDto.projectId, + newLockedJobIds + ) + newLockedJobIds + } ?: emptySet() + return computedJobIds.contains(batchJobDto.id) } - private fun computeFnBody( + private fun computeLockedJobIdsForProject( toLock: BatchJobDto, - currentValue: Long?, - ): Long { + lockedJobIds: Set, + ): Set { val projectId = toLock.projectId ?: throw IllegalStateException( "Project id is required. " + "Locking for project should not happen for non-project jobs.", ) - // nothing is locked - if (currentValue == 0L) { - logger.debug("Locking job ${toLock.id} for project ${toLock.projectId}, nothing is locked") - return toLock.id - } - // value for the project is not initialized yet - if (currentValue == null) { + // nothing is locked + if (lockedJobIds.isEmpty()) { logger.debug("Getting initial locked state from DB state") // we have to find out from database if there is any running job for the project - val initial = getInitialJobId(projectId) - logger.debug("Initial locked job $initial for project ${toLock.projectId}") - if (initial == null) { - logger.debug("No job found, locking ${toLock.id}") - return toLock.id + val initialJobId = getInitialJobId(projectId) + logger.info("Initial locked job $initialJobId for project ${toLock.projectId}") + if (initialJobId == null) { + logger.debug("No initial job found, locking only ${toLock.id}") + return setOf(toLock.id) } + val newLockedJobIds = mutableSetOf(initialJobId) + if (newLockedJobIds.size < batchProperties.projectConcurrency) { + logger.debug("Locking job ${toLock.id} for project $projectId. Active jobs before: $newLockedJobIds") + newLockedJobIds.add(toLock.id) + } else { + logger.debug( + "Cannot lock job ${toLock.id} for project $projectId, limit reached. Active jobs: $newLockedJobIds" + ) + } + return newLockedJobIds + } - logger.debug("Job found, locking $initial") - return initial + // standard case - there are some jobs locked + if (lockedJobIds.size < batchProperties.projectConcurrency) { + logger.debug("Locking job ${toLock.id} for project $projectId. Active jobs before: $lockedJobIds") + return lockedJobIds + toLock.id } - logger.debug("Job $currentValue is locked for project ${toLock.projectId}") // if we cannot lock, we are returning current value - return currentValue + return lockedJobIds } private fun getInitialJobId(projectId: Long): Long? { @@ -161,11 +185,47 @@ class BatchJobProjectLockingManager( return null } - private fun getRedissonProjectLocks(): RMap { - return redissonClient.getMap("project_batch_job_locks") + private fun getRedissonProjectLocks(): ConcurrentMap> { + return redissonClient.getMap(REDIS_PROJECT_BATCH_JOB_LOCKS_KEY) + } + + override fun afterPropertiesSet() { + // This runs first to check if redis has a map of the old format. + // If so, we migrate it to the new format. + if (!usingRedisProvider.areWeUsingRedis) { + logger.debug("Not using Redis, skipping migration check") + return + } + + val redisProjectBatchJobLocks = redissonClient.getMap(REDIS_PROJECT_BATCH_JOB_LOCKS_KEY) + val isOldFormat = redisProjectBatchJobLocks.values.any { it is Long || it == null } + if (!isOldFormat) { + logger.debug("Redis project locks are in new format, no migration needed") + return + } + + logger.info("Starting migration of project locks from old format (v1) to new format (v2)") + // First, copy all data from Redis to local memory + val localCopy = mutableMapOf>() + redisProjectBatchJobLocks.forEach { (projectId, jobId) -> + val jobSet = when (jobId) { + null, 0L -> emptySet() + else -> setOf(jobId as Long) + } + localCopy[projectId] = jobSet + } + logger.info("Copied ${localCopy.size} project locks from old format to local memory") + + // Write all data back in new format (this will overwrite the old format) + val newMap = getRedissonProjectLocks() + localCopy.forEach { (projectId, jobSet) -> + newMap[projectId] = jobSet + } + + logger.info("Successfully migrated ${newMap.size} project locks from local memory to new format") } fun getLockedJobIds(): Set { - return getMap().values.filterNotNull().toSet() + return getMap().values.flatten().toSet() } } diff --git a/backend/data/src/main/kotlin/io/tolgee/configuration/tolgee/BatchProperties.kt b/backend/data/src/main/kotlin/io/tolgee/configuration/tolgee/BatchProperties.kt index b98d8a76eb..2fdcf97080 100644 --- a/backend/data/src/main/kotlin/io/tolgee/configuration/tolgee/BatchProperties.kt +++ b/backend/data/src/main/kotlin/io/tolgee/configuration/tolgee/BatchProperties.kt @@ -11,4 +11,7 @@ class BatchProperties { @DocProperty(description = "How many job chunks are added to the internal queue on each scheduled run") var chunkQueuePopulationSize: Int = 1_000 + + @DocProperty(description = "How many parallel jobs can be run at once per project across all Tolgee instances") + var projectConcurrency: Int = 1 }