2626import org .apache .flink .runtime .clusterframework .types .AllocationID ;
2727import org .apache .flink .runtime .clusterframework .types .ResourceID ;
2828import org .apache .flink .runtime .clusterframework .types .SlotID ;
29+ import org .apache .flink .runtime .entrypoint .ClusterInformation ;
2930import org .apache .flink .runtime .entrypoint .WorkingDirectory ;
3031import org .apache .flink .runtime .highavailability .TestingHighAvailabilityServices ;
32+ import org .apache .flink .runtime .instance .InstanceID ;
3133import org .apache .flink .runtime .jobmaster .utils .TestingJobMasterGateway ;
3234import org .apache .flink .runtime .jobmaster .utils .TestingJobMasterGatewayBuilder ;
3335import org .apache .flink .runtime .leaderretrieval .SettableLeaderRetrievalService ;
3436import org .apache .flink .runtime .messages .Acknowledge ;
37+ import org .apache .flink .runtime .resourcemanager .TaskExecutorRegistration ;
3538import org .apache .flink .runtime .resourcemanager .utils .TestingResourceManagerGateway ;
3639import org .apache .flink .runtime .rpc .TestingRpcService ;
3740import org .apache .flink .runtime .rpc .TestingRpcServiceExtension ;
@@ -63,8 +66,19 @@ class TaskExecutorRecoveryTest {
6366 new EachCallbackWrapper <>(rpcServiceExtension );
6467
6568 @ Test
66- void testRecoveredTaskExecutorWillRestoreAllocationState (@ TempDir File tempDir )
67- throws Exception {
69+ void testRecoveredTaskExecutorWillRestoreAllocationStateWithFixedSlotRequest (
70+ @ TempDir File tempDir ) throws Exception {
71+ testRecoveredTaskExecutorWillRestoreAllocationState (tempDir , false );
72+ }
73+
74+ @ Test
75+ void testRecoveredTaskExecutorWillRestoreAllocationStateWithDynamicSlotRequest (
76+ @ TempDir File tempDir ) throws Exception {
77+ testRecoveredTaskExecutorWillRestoreAllocationState (tempDir , true );
78+ }
79+
80+ private void testRecoveredTaskExecutorWillRestoreAllocationState (
81+ File tempDir , boolean useDynamicRequest ) throws Exception {
6882 final ResourceID resourceId = ResourceID .generate ();
6983
7084 final Configuration configuration = new Configuration ();
@@ -82,6 +96,20 @@ void testRecoveredTaskExecutorWillRestoreAllocationState(@TempDir File tempDir)
8296 return CompletableFuture .completedFuture (Acknowledge .get ());
8397 });
8498
99+ final ArrayBlockingQueue <TaskExecutorRegistration > taskExecutorRegistrations =
100+ new ArrayBlockingQueue <>(2 );
101+
102+ testingResourceManagerGateway .setRegisterTaskExecutorFunction (
103+ taskExecutorRegistration -> {
104+ taskExecutorRegistrations .offer (taskExecutorRegistration );
105+ return CompletableFuture .completedFuture (
106+ new TaskExecutorRegistrationSuccess (
107+ new InstanceID (),
108+ taskExecutorRegistration .getResourceId (),
109+ new ClusterInformation ("localhost" , 1234 ),
110+ null ));
111+ });
112+
85113 final TestingRpcService rpcService = rpcServiceExtension .getTestingRpcService ();
86114 rpcService .registerGateway (
87115 testingResourceManagerGateway .getAddress (), testingResourceManagerGateway );
@@ -118,8 +146,14 @@ void testRecoveredTaskExecutorWillRestoreAllocationState(@TempDir File tempDir)
118146
119147 assertThat (slotReport .getNumSlotStatus (), is (2 ));
120148
149+ final TaskExecutorRegistration taskExecutorRegistration = taskExecutorRegistrations .take ();
150+ assertThat (taskExecutorRegistration .getNumberSlots (), is (2 ));
151+
121152 final SlotStatus slotStatus = slotReport .iterator ().next ();
122- final SlotID allocatedSlotID = slotStatus .getSlotID ();
153+ final SlotID allocatedSlotID =
154+ useDynamicRequest
155+ ? SlotID .getDynamicSlotID (slotStatus .getSlotID ().getResourceID ())
156+ : slotStatus .getSlotID ();
123157
124158 final AllocationID allocationId = new AllocationID ();
125159 taskExecutorGateway
@@ -160,16 +194,26 @@ void testRecoveredTaskExecutorWillRestoreAllocationState(@TempDir File tempDir)
160194 recoveredTaskExecutor .start ();
161195
162196 final TaskExecutorSlotReport recoveredSlotReport = queue .take ();
163-
197+ final int expectedNumberOfSlots = useDynamicRequest ? 3 : 2 ;
198+ assertThat (
199+ recoveredSlotReport .getSlotReport ().getNumSlotStatus (), is (expectedNumberOfSlots ));
164200 for (SlotStatus status : recoveredSlotReport .getSlotReport ()) {
165- if (status .getSlotID ().equals (allocatedSlotID )) {
201+ boolean isAllocatedSlot =
202+ useDynamicRequest
203+ ? status .getSlotID ().getSlotNumber () == 2
204+ : status .getSlotID ().equals (allocatedSlotID );
205+ if (isAllocatedSlot ) {
166206 assertThat (status .getJobID (), is (jobId ));
167207 assertThat (status .getAllocationID (), is (allocationId ));
168208 } else {
169209 assertThat (status .getJobID (), is (nullValue ()));
170210 }
171211 }
172212
213+ final TaskExecutorRegistration recoveredTaskExecutorRegistration =
214+ taskExecutorRegistrations .take ();
215+ assertThat (recoveredTaskExecutorRegistration .getNumberSlots (), is (2 ));
216+
173217 final Collection <SlotOffer > take = offeredSlots .take ();
174218
175219 assertThat (take , hasSize (1 ));
0 commit comments