diff --git a/modules/flowable-cmmn-engine/src/main/java/org/flowable/cmmn/engine/impl/callback/DefaultInternalCmmnJobManager.java b/modules/flowable-cmmn-engine/src/main/java/org/flowable/cmmn/engine/impl/callback/DefaultInternalCmmnJobManager.java index 85680dd40b9..f52736e6b41 100644 --- a/modules/flowable-cmmn-engine/src/main/java/org/flowable/cmmn/engine/impl/callback/DefaultInternalCmmnJobManager.java +++ b/modules/flowable-cmmn-engine/src/main/java/org/flowable/cmmn/engine/impl/callback/DefaultInternalCmmnJobManager.java @@ -54,6 +54,11 @@ protected VariableScope resolveVariableScopeInternal(Job job) { return null; } + @Override + protected String resolveJobLockIdInternal(Job job) { + return job.getScopeId(); + } + @Override protected boolean handleJobInsertInternal(Job job) { // Currently, nothing extra needed (but counting relationships can be added later here). diff --git a/modules/flowable-engine/src/main/java/org/flowable/engine/impl/cfg/DefaultInternalJobManager.java b/modules/flowable-engine/src/main/java/org/flowable/engine/impl/cfg/DefaultInternalJobManager.java index 0033e232670..c69ec2111b8 100644 --- a/modules/flowable-engine/src/main/java/org/flowable/engine/impl/cfg/DefaultInternalJobManager.java +++ b/modules/flowable-engine/src/main/java/org/flowable/engine/impl/cfg/DefaultInternalJobManager.java @@ -69,6 +69,11 @@ protected VariableScope resolveVariableScopeInternal(Job job) { return null; } + @Override + protected String resolveJobLockIdInternal(Job job){ + return job.getProcessInstanceId(); + } + @Override protected boolean handleJobInsertInternal(Job job) { // add link to execution diff --git a/modules/flowable-engine/src/test/java/org/flowable/engine/test/externalworker/ExternalWorkerServiceTaskTest.java b/modules/flowable-engine/src/test/java/org/flowable/engine/test/externalworker/ExternalWorkerServiceTaskTest.java index 810607a92e8..8c5711c4d09 100644 --- a/modules/flowable-engine/src/test/java/org/flowable/engine/test/externalworker/ExternalWorkerServiceTaskTest.java +++ b/modules/flowable-engine/src/test/java/org/flowable/engine/test/externalworker/ExternalWorkerServiceTaskTest.java @@ -20,6 +20,7 @@ import java.time.Duration; import java.time.Instant; import java.time.temporal.ChronoUnit; +import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.Date; @@ -937,6 +938,53 @@ void testSimpleWithBoundaryErrorAndVariables() { ); } + @Test + @Deployment + void testSimpleParallel() { + ProcessInstance processInstance = runtimeService.createProcessInstanceBuilder() + .processDefinitionKey("simpleParallelExternalWorker") + .start(); + + List externalWorkerJobs = managementService.createExternalWorkerJobQuery() + .list(); + assertThat(externalWorkerJobs) + .extracting(ExternalWorkerJob::getElementId, ExternalWorkerJob::getJobHandlerConfiguration) + .containsExactlyInAnyOrder( + tuple("externalWorkerTask1", "simple"), + tuple("externalWorkerTask2", "simple") + ); + + List acquiredJobs = managementService.createExternalWorkerJobAcquireBuilder() + .topic("simple", Duration.ofMinutes(30)) + .acquireAndLock(2, "testWorker"); + + //Both external worker tasks have the exclusive flag set + //so only one job can be acquired because they cannot be executed concurrently. + assertThat(acquiredJobs).hasSize(1); + + AcquiredExternalWorkerJob acquiredJob1 = acquiredJobs.get(0); + + managementService.createExternalWorkerCompletionBuilder(acquiredJob1.getId(), "testWorker") + .complete(); + + //Acquire the second external worker job + acquiredJobs = managementService.createExternalWorkerJobAcquireBuilder() + .topic("simple", Duration.ofMinutes(30)) + .acquireAndLock(2, "testWorker"); + + assertThat(acquiredJobs).hasSize(1); + + AcquiredExternalWorkerJob acquiredJob2 = acquiredJobs.get(0); + + managementService.createExternalWorkerCompletionBuilder(acquiredJob2.getId(), "testWorker") + .complete(); + + assertThat(Arrays.asList(acquiredJob1.getElementId(), acquiredJob2.getElementId())) + .containsExactlyInAnyOrder("externalWorkerTask1", "externalWorkerTask2"); + + waitForJobExecutorToProcessAllJobs(5000, 300); + } + @Test void testAcquireWithInvalidArguments() { assertThatThrownBy(() -> managementService.createExternalWorkerJobAcquireBuilder().acquireAndLock(10, "someWorker")) diff --git a/modules/flowable-engine/src/test/resources/org/flowable/engine/test/externalworker/ExternalWorkerServiceTaskTest.testSimpleParallel.bpmn20.xml b/modules/flowable-engine/src/test/resources/org/flowable/engine/test/externalworker/ExternalWorkerServiceTaskTest.testSimpleParallel.bpmn20.xml new file mode 100644 index 00000000000..c87e32b011f --- /dev/null +++ b/modules/flowable-engine/src/test/resources/org/flowable/engine/test/externalworker/ExternalWorkerServiceTaskTest.testSimpleParallel.bpmn20.xml @@ -0,0 +1,36 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + \ No newline at end of file diff --git a/modules/flowable-job-service/src/main/java/org/flowable/job/service/InternalJobManager.java b/modules/flowable-job-service/src/main/java/org/flowable/job/service/InternalJobManager.java index a5a8208fabc..c0db6f131b0 100644 --- a/modules/flowable-job-service/src/main/java/org/flowable/job/service/InternalJobManager.java +++ b/modules/flowable-job-service/src/main/java/org/flowable/job/service/InternalJobManager.java @@ -26,6 +26,8 @@ public interface InternalJobManager { void registerScopedInternalJobManager(String scopeType, InternalJobManager internalJobManager); VariableScope resolveVariableScope(Job job); + + String resolveJobLockId(Job job); boolean handleJobInsert(Job job); diff --git a/modules/flowable-job-service/src/main/java/org/flowable/job/service/ScopeAwareInternalJobManager.java b/modules/flowable-job-service/src/main/java/org/flowable/job/service/ScopeAwareInternalJobManager.java index 3e2ef105b40..5349767490b 100644 --- a/modules/flowable-job-service/src/main/java/org/flowable/job/service/ScopeAwareInternalJobManager.java +++ b/modules/flowable-job-service/src/main/java/org/flowable/job/service/ScopeAwareInternalJobManager.java @@ -49,6 +49,18 @@ public final VariableScope resolveVariableScope(Job job) { protected abstract VariableScope resolveVariableScopeInternal(Job job); + @Override + public final String resolveJobLockId(Job job) { + InternalJobManager internalJobManager = findInternalJobManager(job); + if (internalJobManager == null) { + return resolveJobLockIdInternal(job); + } + + return internalJobManager.resolveJobLockId(job); + } + + protected abstract String resolveJobLockIdInternal(Job job); + @Override public final boolean handleJobInsert(Job job) { InternalJobManager internalJobManager = findInternalJobManager(job); diff --git a/modules/flowable-job-service/src/main/java/org/flowable/job/service/impl/cmd/AcquireExternalWorkerJobsCmd.java b/modules/flowable-job-service/src/main/java/org/flowable/job/service/impl/cmd/AcquireExternalWorkerJobsCmd.java index 5b06d91121b..874da43dc3f 100644 --- a/modules/flowable-job-service/src/main/java/org/flowable/job/service/impl/cmd/AcquireExternalWorkerJobsCmd.java +++ b/modules/flowable-job-service/src/main/java/org/flowable/job/service/impl/cmd/AcquireExternalWorkerJobsCmd.java @@ -15,8 +15,10 @@ import java.util.ArrayList; import java.util.Calendar; import java.util.GregorianCalendar; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; import org.apache.commons.lang3.StringUtils; import org.flowable.common.engine.api.FlowableIllegalArgumentException; @@ -73,27 +75,44 @@ public List execute(CommandContext commandContext) { int lockTimeInMillis = (int) builder.getLockDuration().abs().toMillis(); List acquiredJobs = new ArrayList<>(jobs.size()); + Set jobLockIds = new HashSet<>(); for (ExternalWorkerJobEntity job : jobs) { - lockJob(commandContext, job, lockTimeInMillis); - Map variables = null; - if (internalJobManager != null) { - VariableScope variableScope = internalJobManager.resolveVariableScope(job); - if (variableScope != null) { - variables = variableScope.getVariables(); + if (hasUnLockedJobScope(internalJobManager, job, jobLockIds)) { + lockJob(commandContext, job, lockTimeInMillis); + Map variables = null; + if (internalJobManager != null) { + VariableScope variableScope = internalJobManager.resolveVariableScope(job); + if (variableScope != null) { + variables = variableScope.getVariables(); + } + + if (job.isExclusive()) { + internalJobManager.lockJobScope(job); + String jobLockId = internalJobManager.resolveJobLockId(job); + if (jobLockId != null) { + jobLockIds.add(jobLockId); + } + } } - if (job.isExclusive()) { - internalJobManager.lockJobScope(job); - } + acquiredJobs.add(new AcquiredExternalWorkerJobImpl(job, variables)); } - - acquiredJobs.add(new AcquiredExternalWorkerJobImpl(job, variables)); } return acquiredJobs; } + protected boolean hasUnLockedJobScope(InternalJobManager internalJobManager, ExternalWorkerJobEntity job, Set jobLockIds) { + if (internalJobManager != null && job.isExclusive()) { + String jobLockId = internalJobManager.resolveJobLockId(job); + if (jobLockId != null && jobLockIds.contains(jobLockId)) { + return false; + } + } + return true; + } + protected void lockJob(CommandContext commandContext, JobInfoEntity job, int lockTimeInMillis) { GregorianCalendar gregorianCalendar = new GregorianCalendar(); gregorianCalendar.setTime(jobServiceConfiguration.getClock().getCurrentTime()); diff --git a/modules/flowable-job-service/src/test/java/org/flowable/job/service/ScopeAwareInternalJobManagerTest.java b/modules/flowable-job-service/src/test/java/org/flowable/job/service/ScopeAwareInternalJobManagerTest.java index 42072e4e7bf..4aa9d1ffcba 100644 --- a/modules/flowable-job-service/src/test/java/org/flowable/job/service/ScopeAwareInternalJobManagerTest.java +++ b/modules/flowable-job-service/src/test/java/org/flowable/job/service/ScopeAwareInternalJobManagerTest.java @@ -205,6 +205,7 @@ protected T mockCmmnJob(Class jobClass, String mockName) { private static class TestScopeAwareInternalJobManager extends ScopeAwareInternalJobManager { protected Map variableScopeByJob = new HashMap<>(); + protected Map lockIdByJob = new HashMap<>(); protected Map insertJobInternalByJob = new HashMap<>(); protected Job jobDeleteInternal; protected Job lockJobScopeInternal; @@ -221,6 +222,12 @@ protected VariableScope resolveVariableScopeInternal(Job job) { return variableScopeByJob.get(job); } + @Override + protected String resolveJobLockIdInternal(Job job) { + invokedMethods.add("resolveJobLockIdInternal"); + return lockIdByJob.get(job); + } + @Override protected boolean handleJobInsertInternal(Job job) { invokedMethods.add("handleJobInsertInternal");