@@ -259,9 +259,37 @@ func testMpiJobWaitWorkers(t *testing.T, startSuspended bool) {
259259 }
260260 s .events .verify (t )
261261
262- workerPods , err := getPodsForJob (ctx , s .kClient , mpiJob )
262+ // The launcher job should not be created until all workers are ready even when we start in suspended mode.
263+ job , err := getLauncherJobForMPIJob (ctx , s .kClient , mpiJob )
263264 if err != nil {
264- t .Fatalf ("Cannot get worker pods from job: %v" , err )
265+ t .Fatalf ("Cannot get launcher job from job: %v" , err )
266+ }
267+ if job != nil {
268+ t .Fatalf ("Launcher is created before workers" )
269+ }
270+
271+ if startSuspended {
272+ // Resume the MPIJob so that the test can follow the normal path.
273+ mpiJob .Spec .RunPolicy .Suspend = ptr .To (false )
274+ mpiJob , err = s .mpiClient .KubeflowV2beta1 ().MPIJobs (mpiJob .Namespace ).Update (ctx , mpiJob , metav1.UpdateOptions {})
275+ if err != nil {
276+ t .Fatalf ("Error Updating MPIJob: %v" , err )
277+ }
278+ }
279+
280+ var workerPods []corev1.Pod
281+ if err = wait .PollUntilContextTimeout (ctx , util .WaitInterval , wait .ForeverTestTimeout , false , func (ctx context.Context ) (bool , error ) {
282+ var err error
283+ workerPods , err = getPodsForJob (ctx , s .kClient , mpiJob )
284+ if err != nil {
285+ return false , err
286+ }
287+ if len (workerPods ) != 2 {
288+ return false , nil
289+ }
290+ return true , nil
291+ }); err != nil {
292+ t .Errorf ("Failed updating scheduler-plugins PodGroup: %v" , err )
265293 }
266294
267295 err = updatePodsToPhase (ctx , s .kClient , workerPods , corev1 .PodRunning )
0 commit comments