diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java index 8f9a7cf366bef..b3f9404e3eead 100755 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java @@ -525,7 +525,7 @@ public CompletableFuture sendSlotReport( WorkerResourceSpec workerResourceSpec = WorkerResourceSpec.fromTotalResourceProfile( workerTypeWorkerRegistration.getTotalResourceProfile(), - slotReport.getNumSlotStatus()); + workerTypeWorkerRegistration.getNumberSlots()); onWorkerRegistered(workerTypeWorkerRegistration.getWorker(), workerResourceSpec); } else if (registrationResult == SlotManager.RegistrationResult.REJECTED) { closeTaskManagerConnection( @@ -1083,7 +1083,8 @@ private RegistrationResponse registerTaskExecutorInternal( taskExecutorRegistration.getMemoryConfiguration(), taskExecutorRegistration.getTotalResourceProfile(), taskExecutorRegistration.getDefaultSlotResourceProfile(), - taskExecutorRegistration.getNodeId()); + taskExecutorRegistration.getNodeId(), + taskExecutorRegistration.getNumberSlots()); log.info( "Registering TaskManager with ResourceID {} ({}) at ResourceManager", diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/TaskExecutorRegistration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/TaskExecutorRegistration.java index b3b9c51a3f5fe..6f8530b6a13ee 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/TaskExecutorRegistration.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/TaskExecutorRegistration.java @@ -64,6 +64,9 @@ public class TaskExecutorRegistration implements Serializable { */ private final String nodeId; + /** Number of slots in static slot allocation. */ + private final int numberSlots; + public TaskExecutorRegistration( final String taskExecutorAddress, final ResourceID resourceId, @@ -73,7 +76,8 @@ public TaskExecutorRegistration( final TaskExecutorMemoryConfiguration memoryConfiguration, final ResourceProfile defaultSlotResourceProfile, final ResourceProfile totalResourceProfile, - final String nodeId) { + final String nodeId, + final int numberSlots) { this.taskExecutorAddress = checkNotNull(taskExecutorAddress); this.resourceId = checkNotNull(resourceId); this.dataPort = dataPort; @@ -83,6 +87,7 @@ public TaskExecutorRegistration( this.defaultSlotResourceProfile = checkNotNull(defaultSlotResourceProfile); this.totalResourceProfile = checkNotNull(totalResourceProfile); this.nodeId = checkNotNull(nodeId); + this.numberSlots = numberSlots; } public String getTaskExecutorAddress() { @@ -120,4 +125,8 @@ public ResourceProfile getTotalResourceProfile() { public String getNodeId() { return nodeId; } + + public int getNumberSlots() { + return numberSlots; + } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/registration/WorkerRegistration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/registration/WorkerRegistration.java index 79f402e4cd197..660fb33103da7 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/registration/WorkerRegistration.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/registration/WorkerRegistration.java @@ -45,6 +45,8 @@ public class WorkerRegistration private final String nodeId; + private final int numberSlots; + public WorkerRegistration( TaskExecutorGateway taskExecutorGateway, WorkerType worker, @@ -54,7 +56,8 @@ public WorkerRegistration( TaskExecutorMemoryConfiguration memoryConfiguration, ResourceProfile totalResourceProfile, ResourceProfile defaultSlotResourceProfile, - String nodeId) { + String nodeId, + int numberSlots) { super(worker.getResourceID(), taskExecutorGateway); @@ -66,6 +69,7 @@ public WorkerRegistration( this.totalResourceProfile = Preconditions.checkNotNull(totalResourceProfile); this.defaultSlotResourceProfile = Preconditions.checkNotNull(defaultSlotResourceProfile); this.nodeId = Preconditions.checkNotNull(nodeId); + this.numberSlots = numberSlots; } public WorkerType getWorker() { @@ -99,4 +103,8 @@ public ResourceProfile getTotalResourceProfile() { public String getNodeId() { return nodeId; } + + public int getNumberSlots() { + return numberSlots; + } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java index 17b2adea2e822..88f24d7df0db1 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java @@ -1568,7 +1568,8 @@ private void connectToResourceManager() { memoryConfiguration, taskManagerConfiguration.getDefaultSlotResourceProfile(), taskManagerConfiguration.getTotalResourceProfile(), - unresolvedTaskManagerLocation.getNodeId()); + unresolvedTaskManagerLocation.getNodeId(), + taskManagerConfiguration.getNumberSlots()); resourceManagerConnection = new TaskExecutorToResourceManagerConnection( diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerPartitionLifecycleTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerPartitionLifecycleTest.java index 44ca1cded16aa..e581a037db0a0 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerPartitionLifecycleTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerPartitionLifecycleTest.java @@ -187,7 +187,8 @@ static void registerTaskExecutor( 1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L, 10L), ResourceProfile.ZERO, ResourceProfile.ZERO, - taskExecutorAddress); + taskExecutorAddress, + 1); final CompletableFuture registrationFuture = resourceManagerGateway.registerTaskExecutor( taskExecutorRegistration, TestingUtils.TIMEOUT); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java index 9e4add90f96ef..761f94b08bcaa 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java @@ -222,7 +222,8 @@ void testDelayedRegisterTaskExecutor() throws Exception { 1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L, 10L), DEFAULT_SLOT_PROFILE, DEFAULT_SLOT_PROFILE, - taskExecutorGateway.getAddress()); + taskExecutorGateway.getAddress(), + 1); CompletableFuture firstFuture = rmGateway.registerTaskExecutor(taskExecutorRegistration, fastTimeout); @@ -287,7 +288,8 @@ void testDisconnectTaskExecutor() throws Exception { 1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L, 10L), DEFAULT_SLOT_PROFILE, DEFAULT_SLOT_PROFILE.multiply(numberSlots), - taskExecutorGateway.getAddress()); + taskExecutorGateway.getAddress(), + numberSlots); final RegistrationResponse registrationResponse = rmGateway.registerTaskExecutor(taskExecutorRegistration, TIMEOUT).get(); assertThat(registrationResponse).isInstanceOf(TaskExecutorRegistrationSuccess.class); @@ -364,7 +366,8 @@ private CompletableFuture registerTaskExecutor( 1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L, 10L), DEFAULT_SLOT_PROFILE, DEFAULT_SLOT_PROFILE, - taskExecutorAddress), + taskExecutorAddress, + 1), TIMEOUT); } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTest.java index 976f8b6502809..935096c6e63f1 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTest.java @@ -254,7 +254,8 @@ private void registerTaskExecutor( 1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L, 10L), ResourceProfile.ZERO, ResourceProfile.ZERO, - taskExecutorAddress); + taskExecutorAddress, + 1); final CompletableFuture registrationFuture = resourceManagerGateway.registerTaskExecutor( taskExecutorRegistration, TestingUtils.TIMEOUT); @@ -767,7 +768,8 @@ private void registerTaskExecutorAndSlot( 1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L, 10L), ResourceProfile.fromResources(1, 1024), ResourceProfile.fromResources(1, 1024).multiply(slotCount), - taskExecutorGateway.getAddress()); + taskExecutorGateway.getAddress(), + slotCount); RegistrationResponse registrationResult = resourceManagerGateway .registerTaskExecutor(taskExecutorRegistration, TestingUtils.TIMEOUT) diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/active/ActiveResourceManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/active/ActiveResourceManagerTest.java index 639684b30d9ae..e39de9da1ad53 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/active/ActiveResourceManagerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/active/ActiveResourceManagerTest.java @@ -1245,7 +1245,8 @@ CompletableFuture registerTaskExecutor( TESTING_CONFIG, ResourceProfile.ZERO, ResourceProfile.ZERO, - resourceID.toString()); + resourceID.toString(), + 1); return resourceManager .getSelfGateway(ResourceManagerGateway.class) diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorRecoveryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorRecoveryTest.java index 00ff1df132612..25f2970c3658c 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorRecoveryTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorRecoveryTest.java @@ -26,12 +26,15 @@ import org.apache.flink.runtime.clusterframework.types.AllocationID; import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.clusterframework.types.SlotID; +import org.apache.flink.runtime.entrypoint.ClusterInformation; import org.apache.flink.runtime.entrypoint.WorkingDirectory; import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices; +import org.apache.flink.runtime.instance.InstanceID; import org.apache.flink.runtime.jobmaster.utils.TestingJobMasterGateway; import org.apache.flink.runtime.jobmaster.utils.TestingJobMasterGatewayBuilder; import org.apache.flink.runtime.leaderretrieval.SettableLeaderRetrievalService; import org.apache.flink.runtime.messages.Acknowledge; +import org.apache.flink.runtime.resourcemanager.TaskExecutorRegistration; import org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGateway; import org.apache.flink.runtime.rpc.TestingRpcService; import org.apache.flink.runtime.rpc.TestingRpcServiceExtension; @@ -63,8 +66,19 @@ class TaskExecutorRecoveryTest { new EachCallbackWrapper<>(rpcServiceExtension); @Test - void testRecoveredTaskExecutorWillRestoreAllocationState(@TempDir File tempDir) - throws Exception { + void testRecoveredTaskExecutorWillRestoreAllocationStateWithFixedSlotRequest( + @TempDir File tempDir) throws Exception { + testRecoveredTaskExecutorWillRestoreAllocationState(tempDir, false); + } + + @Test + void testRecoveredTaskExecutorWillRestoreAllocationStateWithDynamicSlotRequest( + @TempDir File tempDir) throws Exception { + testRecoveredTaskExecutorWillRestoreAllocationState(tempDir, true); + } + + private void testRecoveredTaskExecutorWillRestoreAllocationState( + File tempDir, boolean useDynamicRequest) throws Exception { final ResourceID resourceId = ResourceID.generate(); final Configuration configuration = new Configuration(); @@ -82,6 +96,20 @@ void testRecoveredTaskExecutorWillRestoreAllocationState(@TempDir File tempDir) return CompletableFuture.completedFuture(Acknowledge.get()); }); + final ArrayBlockingQueue taskExecutorRegistrations = + new ArrayBlockingQueue<>(2); + + testingResourceManagerGateway.setRegisterTaskExecutorFunction( + taskExecutorRegistration -> { + taskExecutorRegistrations.offer(taskExecutorRegistration); + return CompletableFuture.completedFuture( + new TaskExecutorRegistrationSuccess( + new InstanceID(), + taskExecutorRegistration.getResourceId(), + new ClusterInformation("localhost", 1234), + null)); + }); + final TestingRpcService rpcService = rpcServiceExtension.getTestingRpcService(); rpcService.registerGateway( testingResourceManagerGateway.getAddress(), testingResourceManagerGateway); @@ -118,8 +146,14 @@ void testRecoveredTaskExecutorWillRestoreAllocationState(@TempDir File tempDir) assertThat(slotReport.getNumSlotStatus(), is(2)); + final TaskExecutorRegistration taskExecutorRegistration = taskExecutorRegistrations.take(); + assertThat(taskExecutorRegistration.getNumberSlots(), is(2)); + final SlotStatus slotStatus = slotReport.iterator().next(); - final SlotID allocatedSlotID = slotStatus.getSlotID(); + final SlotID allocatedSlotID = + useDynamicRequest + ? SlotID.getDynamicSlotID(slotStatus.getSlotID().getResourceID()) + : slotStatus.getSlotID(); final AllocationID allocationId = new AllocationID(); taskExecutorGateway @@ -160,9 +194,15 @@ void testRecoveredTaskExecutorWillRestoreAllocationState(@TempDir File tempDir) recoveredTaskExecutor.start(); final TaskExecutorSlotReport recoveredSlotReport = queue.take(); - + final int expectedNumberOfSlots = useDynamicRequest ? 3 : 2; + assertThat( + recoveredSlotReport.getSlotReport().getNumSlotStatus(), is(expectedNumberOfSlots)); for (SlotStatus status : recoveredSlotReport.getSlotReport()) { - if (status.getSlotID().equals(allocatedSlotID)) { + boolean isAllocatedSlot = + useDynamicRequest + ? status.getSlotID().getSlotNumber() == 2 + : status.getSlotID().equals(allocatedSlotID); + if (isAllocatedSlot) { assertThat(status.getJobID(), is(jobId)); assertThat(status.getAllocationID(), is(allocationId)); } else { @@ -170,6 +210,10 @@ void testRecoveredTaskExecutorWillRestoreAllocationState(@TempDir File tempDir) } } + final TaskExecutorRegistration recoveredTaskExecutorRegistration = + taskExecutorRegistrations.take(); + assertThat(recoveredTaskExecutorRegistration.getNumberSlots(), is(2)); + final Collection take = offeredSlots.take(); assertThat(take, hasSize(1)); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorToResourceManagerConnectionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorToResourceManagerConnectionTest.java index 6d19c1d6c2675..4fd0545328f7a 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorToResourceManagerConnectionTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorToResourceManagerConnectionTest.java @@ -140,7 +140,8 @@ void testResourceManagerRegistrationIsRejected() { TASK_MANAGER_MEMORY_CONFIGURATION, ResourceProfile.ZERO, ResourceProfile.ZERO, - TASK_MANAGER_NODE_ID); + TASK_MANAGER_NODE_ID, + 1); return new TaskExecutorToResourceManagerConnection( LOGGER, rpcService,