Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implemenet MPI Plugin for OpenMPI #2493

Merged
merged 9 commits into from
Mar 13, 2025

Conversation

tenzen-y
Copy link
Member

@tenzen-y tenzen-y commented Mar 10, 2025

What this PR does / why we need it:

I have done the following things to support OpenMPI:

  • Added MPI policies propagation and resource constructure mechanism to MPI plugin.
  • Added a mechanism to obtain runtime template (.spec.template) from runtime.Info in each plugin.
  • Added PodNetworkPlugin to KF Pipeline Framework to identify the Pod Endpoints like test-job-trainer-node-0-0.test-job.
  • Introduced PodSet internal data structure to runtime package to represent the arbitrary type of Jobs (Initializer, launchere) as opposed to Trainer. However, those are used only for MPI plugin for now. Other plugins keep using the previous Trainer date structure until those are removed completely (Migrate Trainer to PodSet and RuntimePolicy in runtime package (InternalAPI) #2495)

Furthermore, I quickly verified the OpenMPI workload behavior by the following TrainJob and manifests/base/runtimes/mpi_distributed.yaml.

apiVersion: trainer.kubeflow.org/v1alpha1
kind: TrainJob
metadata:
  name: mpirun
  namespace: default
spec:
  runtimeRef:
    name: mpi-distributed
$ kubectl get pod
NAME                        READY   STATUS    RESTARTS   AGE
mpirun-launcher-0-0-wvmpl       1/1     Running             1 (1s ago)   2s
mpirun-trainer-node-0-0-52jxr   1/1     Running             0          1s
mpirun-trainer-node-0-1-czdb9   1/1     Running             0          1s
$
$ kubectl logs mpirun-launcher-0-0-wvmpl 
Warning: Permanently added '[mpirun-trainer-node-0-0.mpirun]:2222' (ECDSA) to the list of known hosts.
Warning: Permanently added '[mpirun-trainer-node-0-1.mpirun]:2222' (ECDSA) to the list of known hosts.
Workers: 2
Rank 0 on host mpirun-trainer-node-0-0
Rank 1 on host mpirun-trainer-node-0-1
pi is approximately 3.1410376000000002

Additionally, we will work on the below items in the follow-up PRs:

Which issue(s) this PR fixes (optional, in Fixes #<issue number>, #<issue number>, ... format, will close the issue(s) when PR gets merged):
Fixes #

Checklist:

  • Docs included if any changes are user facing

@tenzen-y tenzen-y force-pushed the implement-mpi-plugin branch 5 times, most recently from 261cded to 590f258 Compare March 10, 2025 13:20
@tenzen-y tenzen-y marked this pull request as ready for review March 10, 2025 15:21
@tenzen-y
Copy link
Member Author

/hold
I have implemented OpenMPI workload supports. Please take another look. Thanks.
@kubeflow/wg-training-leads @astefanutti

@tenzen-y tenzen-y force-pushed the implement-mpi-plugin branch 5 times, most recently from 43cbefe to 8860550 Compare March 11, 2025 11:14
@tenzen-y tenzen-y force-pushed the implement-mpi-plugin branch from 8860550 to 8282e74 Compare March 11, 2025 11:17
@@ -103,26 +108,74 @@ func (r *TrainingRuntime) buildObjects(
runtime.WithAnnotations(propagationAnnotations),
runtime.WithMLPolicy(mlPolicy),
runtime.WithPodGroupPolicy(podGroupPolicy),
runtime.WithTemplateSpecObjApply[jobsetv1alpha2ac.JobSetSpecApplyConfiguration](&jobsetv1alpha2.JobSet{
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if loading the TrainingRuntime as apply configuration directly would simplify things and remove the need for converting things like Volume to VolumeApplyConfiguration.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, that is a good point. Actually, I was thinking about it a bit.
However, we will add another CRD as runtime like SingleRoleTrainingRuntime based on batch/v1 Job.

Copy link
Member Author

@tenzen-y tenzen-y Mar 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, does that mean we can pass .spec.template (JobSetTemplateSpec) as ApplyConfiguration to WithTemplateSpecObjApply, right?

Copy link
Member Author

@tenzen-y tenzen-y Mar 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In that case, we can reduce parser from WithTemplateSpecObjApply function. That makes sense. Thanks.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

However, we will add another CRD as runtime like SingleRoleTrainingRuntime based on batch/v1 Job.

I am not sure if that is needed, until we really hear use-cases when JobSet won't work for users.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

However, we will add another CRD as runtime like SingleRoleTrainingRuntime based on batch/v1 Job.

I am not sure if that is needed, until we really hear use-cases when JobSet won't work for users.

As I mentioned above, the unstructured parse mechanism will be removed. The SingleRoleTrainingRuntime was just example.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

Copy link
Member

@andreyvelich andreyvelich left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for this great work @tenzen-y!
I left my initial comments, will take a look again in a few hours.

@@ -7,39 +7,51 @@ metadata:
trainer.kubeflow.org/phase: pre-training
spec:
mlPolicy:
numNodes: 1
numNodes: 2
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we keep 1 Node by default ?

Suggested change
numNodes: 2
numNodes: 1

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, sure.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

mpi:
numProcPerNode: 1
mpiImplementation: OpenMPI
sshAuthMountPath: /root/.ssh
sshAuthMountPath: /home/mpiuser/.ssh
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we keep the default path ?

Suggested change
sshAuthMountPath: /home/mpiuser/.ssh
sshAuthMountPath: /root/.ssh

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently, we support only non root users. The root user support requires additional enhancement.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In that case, do we need to change the default mount path for now to /home/mpiuser/.ssh ?
Also, does it mean that having /home/mpiuser directory is a requirement to use MPI runtime with Kubeflow Trainer?

Copy link
Member Author

@tenzen-y tenzen-y Mar 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

/home/mpiuser

They can use arbitratry USER insted of mpiuser. The mpiuser is a specification for the pi container image.

- name: trainer-node
dependsOn:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why did you remove dependsOn in trainer should start after launcher is ready ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This causes a deadlock because the launcher keeps a crash-and-restart loop until trainer-nodes endpoint is healthy (≠PodReady).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, do we see any problems when trainer node Job starts before the launcher ?
I thought one of the goals for StartupPolicy/DependsOn API was the MPI use-case.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, do we see any problems when trainer node Job starts before the launcher ?

This is implemented as an optional parameter (WaitForWorkersReady) for MPIJob v2beta1:

https://github.com/kubeflow/mpi-operator/blob/7f94988ab1d27fb46c69994e538543ef0e115589/pkg/apis/kubeflow/v2beta1/types.go#L194-L197

The Gang Scheduling problem is the reason why we use AtStartup as a default.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, so with PodGroupPolicy we don't really need it.
In the current implementation, won't launcher fail to run mpirun if some of the workers are not Ready ?
Since by default we don't use PodGroupPolicy and Gang Scheduling.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the current implementation, won't launcher fail to run mpirun if some of the workers are not Ready ?
Since by default we don't use PodGroupPolicy and Gang Scheduling.

That is expected behavior. The launcher keeps restarting the Node (=Pod) endpoint is ready.

OpenMPIEnvKeepFQDNHostNames string = "OMPI_MCA_orte_keep_fqdn_hostnames"

// OpenMPIEnvDefaultSlots is the OpenMPI default number of slots env key.
OpenMPIEnvDefaultSlots string = "OMPI_MCA_orte_set_default_slots"
Copy link
Member

@andreyvelich andreyvelich Mar 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need this env variable for OpenMPI if the host file always sets the appropriate number of slots using NumProcPerNode value ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This env variable is defined as part of OpenMPI implementation, and OpenMPI is not only for mpirun.
So, it would be better to provide an environment variable to align with OpenMPI specification.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good.

Name string
// If Name is trainer-node, CountForNonTrainer is null.
// For Trainer, PodSet Count should be stored in Info.RuntimePolicy.MLPolicy.NumNodes.
CountForNonTrainer *int32
Copy link
Member

@andreyvelich andreyvelich Mar 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need it if we are NOT planning to support more than 1 Job replica for other other Jobs (e.g. Initializer, Launcher).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We actually support it here. The key point is that we have 3 type of replication, numNodes, batchv1.Job.parallelism, and JobSet.replicatedJob.Replica.

This CountForNonTrainer corresponds to batchv1.Job.parallelism.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we want to support batchv1.Job.parallelism != 1 for non Trainer Node jobs ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They can specify arbitrary roles except for trainer-node, initializer, and launcher. And it would be better to just propagate the parallelism parameters to JobSet.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see makes sense.
I am wondering whether we should consolidate NumNodes to the PodCount as well ?
Should we just override this value in the MLPolicy plugins based on trainJob.spec.numNodes and trainingRuntime.spec.mlPolicy.numNodes

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am wondering whether we should consolidate NumNodes to the PodCount as well ?
Should we just override this value in the MLPolicy plugins based on trainJob.spec.numNodes and trainingRuntime.spec.mlPolicy.numNodes

Yeah, I was thinking the same thing. However, if we want to do it, we need to change Info.RuntimePolicy data structure, and it will affect all plugin implementation. So, let me revisit if we should do it after #2495

If we make a lot of data structure changes in a single scope, that causes bugs, IMO.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds great! It would be nice to make a note about it in the #2495

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah sure

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

After the above migration, we want to consider dropping NumNodes from Info.RuntimePolicy.MLPolicy and then fully rely on the PodSet data structure in case of Trainer as well.

@@ -103,26 +108,74 @@ func (r *TrainingRuntime) buildObjects(
runtime.WithAnnotations(propagationAnnotations),
runtime.WithMLPolicy(mlPolicy),
runtime.WithPodGroupPolicy(podGroupPolicy),
runtime.WithTemplateSpecObjApply[jobsetv1alpha2ac.JobSetSpecApplyConfiguration](&jobsetv1alpha2.JobSet{
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

However, we will add another CRD as runtime like SingleRoleTrainingRuntime based on batch/v1 Job.

I am not sure if that is needed, until we really hear use-cases when JobSet won't work for users.

var (
errorTemplateSpecPathNotFound = errors.New("template spec path not found")

defaultPodSetsSyncer = func(*Info) {}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How this sync is used ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here is one of sync flow:

  1. Register callback function:
    runtime.WithPodSetSyncer(syncPodSets),
  2. Obtainer syncer:
    func (i *Info) SyncPodSetsToTemplateSpec() {
  3. Sync PodSpec to TemplateSpecApplyConfiguration:
    info.SyncPodSetsToTemplateSpec()

@@ -112,6 +116,15 @@ func (f *Framework) RunCustomValidationPlugins(oldObj, newObj *trainer.TrainJob)
return aggregatedWarnings, aggregatedErrors
}

func (f *Framework) RunPodNetworkPlugins(info *runtime.Info, trainJob *trainer.TrainJob) error {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need separate plugin for the Pod Network ?
I can't imagine a use-case when this might be different.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am curious if it is only applied for MPI, should we couple it with MPI plugin for now ?

for e := range ps.Endpoints {
hostFile.WriteString(fmt.Sprintf("%s slots=%d\n", e, slots))
}

In the future, if we see a need for other use-cases when we want to generate endpoints for every Pod, we can refactor it.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am curious if it is only applied for MPI, should we couple it with MPI plugin for now ?

The endpoint pattern and construction mechanism deeply depend on JobSet functionality (subdomain, etc.) and the JobSet replication parameters. So, MPI plugin can not construct the endpoint list.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are right, so we might need to update the Pipeline Framework to introduce this plugin: https://github.com/kubeflow/trainer/tree/master/docs/proposals/2170-kubeflow-training-v2#pipeline-framework.

I am curious, if in the future we are going to see more decencies between components (e.g. MPI <-> JobSet), do we need to introduce more plugins into Pipeline Framework ?

I guess, the main goal is to have the correct Info object before we run the ComponentBuilder() plugin.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are right, so we might need to update the Pipeline Framework to introduce this plugin: https://github.com/kubeflow/trainer/tree/master/docs/proposals/2170-kubeflow-training-v2#pipeline-framework.

This will be done as a part of #2497

I am curious, if in the future we are going to see more decencies between components (e.g. MPI <-> JobSet), do we need to introduce more plugins into Pipeline Framework ?

In that case, we can prepare more comprehensive interface like JobInfrastructure something, and then add multiple functions like Network, Volume and so on. However, for now, a single Network interface should be enough.

rJobReplicas := 1
info.TemplateSpec.PodSets[rJobIdx].Endpoints = func(yield func(string) bool) {
for podIdx := range ptr.Deref(podCount, 1) {
endpoint := fmt.Sprintf("%s-%s-%d-%d.%s", trainJob.Name, *rJob.Name, rJobReplicas-1, podIdx, subDomain)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the MPI use-case, do we need separate endpoints for Launcher and Trainer jobs ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to record all Node (=Pod) endpoints to hostfile.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we exclude the Initializer Job from the PodNetwork ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we exclude the Initializer Job from the PodNetwork ?

It would be better to avoid distinguishing between Initializer and others when constructing the Info object since, from outside of the JobSet plugin's POV, they can not understand why Info.PodSet.Endpoints is missing in the case of some roles.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I think that makes a lot of sense! And we only iterate over Launcher + Trainer endpoints in the MPI plugin.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes
Additionally, PodSet.Endpoints is an iterator. So, we can reduce time complexity to calculate the endpoint in case of large Jobs.

cmSource.WithItems(cmItems...)
vol.WithConfigMap(cmSource)
}
// TODO: Add other volume sources
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wouldn't it be tricky for us to support all volumes in the Apply configuration script ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you clarify "tricky for us"?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need to support more volumes in this API ?
We don't really call this outside of MPI context when we need to create Secret + ConfigMap volume.
Or I am missing smth ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, this will be used to parse PodSpec under the JobSetSpec. So, this was needed.

However based on #2493 (comment) discussion, we will remove the object parse mechanism. So, we can remove this Volume function, completely.

Thanks.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, so do we want to address it in this PR or in the followup changes ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, so do we want to address it in this PR or in the followup changes ?

I am currently working on it in this PR. I will ping after I have finalized migration.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

Comment on lines +141 to +144
corev1ac.KeyToPath().
WithKey(constants.MPIHostfileName).
WithPath(constants.MPIHostfileName).
WithMode(0444),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If filename is equal to the ConfigMap data key, do we need to set Path ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIUC, we can omit it in that case. But this approach is more declarative, isn't it?

@@ -58,7 +58,7 @@ func (p *PlainML) EnforceMLPolicy(info *runtime.Info, trainJob *trainer.TrainJob

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we planning to change other plugins in the followup PRs ?
E.g. insert NumNodes to the info.RuntimePolicy.MLPolicy.NumNodes ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As we discussed in previously, I prioritized this MPI enhancement instead of stabilize plugin mechanism.
So, the whole of refactor will be performed in the future PRs as part of #2495.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks!

if err != nil {
return nil, fmt.Errorf("failed to build Secret with SSH auth keys. Error: %v", err)
}
var objects []any
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do you want to create this variable to just append secret ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I might not catch your point. Could you clarify? This is because Secret is created only when Secret does not exist.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, makes sense due to this comment: #2493 (comment)

@@ -112,6 +116,15 @@ func (f *Framework) RunCustomValidationPlugins(oldObj, newObj *trainer.TrainJob)
return aggregatedWarnings, aggregatedErrors
}

func (f *Framework) RunPodNetworkPlugins(info *runtime.Info, trainJob *trainer.TrainJob) error {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am curious if it is only applied for MPI, should we couple it with MPI plugin for now ?

for e := range ps.Endpoints {
hostFile.WriteString(fmt.Sprintf("%s slots=%d\n", e, slots))
}

In the future, if we see a need for other use-cases when we want to generate endpoints for every Pod, we can refactor it.

@@ -87,29 +93,58 @@ func (b *Builder) Initializer(trainJob *trainer.TrainJob) *Builder {
}

// Launcher updates JobSet values for the launcher Job.
func (b *Builder) Launcher(info *runtime.Info, trainJob *trainer.TrainJob) *Builder {
func (b *Builder) Launcher() *Builder {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need Launcher builder in JobSet plugins if we just assign the "1" to the Job Replicas ?
Would it be simpler to just validate in webhook that launcher ReplicatedJob has replicas: 1?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, that's right. We should add webhook validators for all reserved roles, Trainer, Initializer, and Launcher if replicatedJobs[*].replicas is 1. However, I want to avoid conflicts with @akshaychitneni impls, and it sounds beyond the scope of this PR since we need to validate all reserved roles rather than just the Launcher.

So, I would like a follow-up.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I opened an follow-up issue: #2502

// Init the JobSet apply configuration from the runtime template spec
jobSetBuilder := NewBuilder(jobsetv1alpha2ac.JobSet(trainJob.Name, trainJob.Namespace).
WithLabels(maps.Clone(info.Labels)).
WithAnnotations(maps.Clone(info.Annotations)).
WithSpec(jobSetTemplateSpec))
WithSpec(jobSetSpec))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we apply jobSetSpec to the JobSet spec after we run the JobSet builder ?
Since Plugins always contains the final values for JobSet even if TrainJob overrides it.
For example, if webhook is disabled and if user accidentally configures PET_NNODES envs in the TrainJob, we have to override it according to the Torch plugin.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, that sounds reasonable. Ideally, we want to propagate information by Info.TemplateSpec.PodSet -> TrainJob -> TrainingRuntime. In this case , input TrainingRuntime will be stored in Info.TemplateSpec.ObjApply as immutable, and each plugin update only Info.TemplateSpec.PodSet.

But, here problem is Info.Trainer. So, I would like to do more refactor in the follow-up PR.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good.

// TODO: Add tests for all Interfaces.
// REF: https://github.com/kubeflow/trainer/issues/2468

func TestJobSet(t *testing.T) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Duplicated tests will be removed as part of #2468 in the future PR.

WithVolumeMounts(
corev1ac.VolumeMount().
WithName(jobsetplgconsts.VolumeNameInitializer).
WithMountPath("/workspace/dataset"),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We might need to store /workspace/dataset and /workspace/model in the constants since it is default paths to store dataset and model when our initializer is used.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, let me try it.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

@andreyvelich
Copy link
Member

cc MPI Operator folks in case you want to check out this PR
/cc @kannon92 @vsoch @kuizhiqing @alculquicondor @roteme-runai @mchmarny @mlsorensen @Syulin7

Copy link

@andreyvelich: GitHub didn't allow me to request PR reviews from the following users: mchmarny, mlsorensen, kannon92, vsoch, roteme-runai.

Note that only kubeflow members and repo collaborators can review this PR, and authors cannot review their own PRs.

In response to this:

cc MPI Operator folks in case you want to check out this PR
/cc @kannon92 @vsoch @kuizhiqing @alculquicondor @roteme-runai @mchmarny @mlsorensen @Syulin7

Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes/test-infra repository.

containers:
- name: launcher
image: busybox
image: mpioperator/mpi-pi:openmpi
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are these image published from kubeflow?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does kubeflow have an official repository for this? Otherwise this hits dockerhub and you may get rate limited if you brought this in the CI.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is a good point, actually even with official repository we will hit pull rate limits for non-auth requests, more info here: kubeflow/manifests#3010
We are working to migrate our images to the GHCR: #2491
I think, we should do the same for MPI Operator images.
cc @kubeflow/wg-training-leads

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will open an issue for that in mpi-operator repository.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@vsoch
Copy link

vsoch commented Mar 12, 2025

So I understand - you are going to do this for every MPI implementation? Is that sustainable for developers?

if err != nil {
return nil, err
}
templateSpec, ok, err := unstructured.NestedFieldCopy(u, fields...)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe change to variable name so it's generic?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, good catch

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

return nil, err
}
if !ok {
return nil, fmt.Errorf("%w: '.%s'", errorTemplateSpecPathNotFound, strings.Join(fields, "."))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe the specific error should be wrapping the generic error from the caller?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, that's true... Let me fix that

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

@@ -98,31 +105,86 @@ func (r *TrainingRuntime) buildObjects(
// The JobSetTemplateSpec annotations are overridden by the TrainJob Annotations (.spec.annotations).
propagationAnnotations[k] = v
}
jobSetSpecApply, err := apply.FromTypedObjWithFields[jobsetv1alpha2ac.JobSetSpecApplyConfiguration](&jobsetv1alpha2.JobSet{
Copy link
Contributor

@astefanutti astefanutti Mar 12, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ideally, to be strictly correct, the TrainingRuntime would be Get as unstructured in NewObjects to avoid pollution from zero / nil fields from the typed struct. It is converted to unstructured anyways in FromTypedObjWithFields.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, distinguishing zero /null is important for SSA.
So, Ideally, we want to get unstructured objects in

err := r.client.Get(ctx, client.ObjectKey{Namespace: trainJob.Namespace, Name: trainJob.Spec.RuntimeRef.Name}, &trainingRuntime)
if err != nil {
return nil, fmt.Errorf("%w: %w", errorNotFoundSpecifiedTrainingRuntime, err)
}
return r.buildObjects(ctx, trainJob, trainingRuntime.Spec.Template, trainingRuntime.Spec.MLPolicy, trainingRuntime.Spec.PodGroupPolicy)
.

However, it will bring MPI unrelated change in this PR. So, let me open issue and do it as a follow-up, thanks.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Opened: #2515

@andreyvelich
Copy link
Member

So I understand - you are going to do this for every MPI implementation? Is that sustainable for developers?

I think, this should be driven by user requirements and ML use-cases.
Initially, we want to support only OpenMPI since we can build DeepSpeed and MLX runtimes with it:

@tenzen-y
Copy link
Member Author

I think, this should be driven by user requirements and ML use-cases.
Initially, we want to support only OpenMPI since we can build DeepSpeed and MLX runtimes with it:

Initially, yes. But eventually, I want to support MPICH and IntelMPI as well.

@tenzen-y tenzen-y force-pushed the implement-mpi-plugin branch from 3c9179e to 76dff03 Compare March 12, 2025 19:20
@tenzen-y
Copy link
Member Author

@andreyvelich @astefanutti I addressed all your comments, PTAL thanks.

PodSetEndpointsCmpOpts = cmp.Transformer("Seq", func(a iter.Seq[string]) []string { return slices.Collect(a) })
)

func SecretDataComparer(a, b map[string][]byte) bool {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we make this util func more explicit ?

Suggested change
func SecretDataComparer(a, b map[string][]byte) bool {
func MPISecretDataComparer(a, b map[string][]byte) bool {

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.


func (j *JobSetWrapper) LauncherReplica() *JobSetWrapper {
for i, rJob := range j.Spec.ReplicatedJobs {
if rJob.Name == constants.JobTrainerNode {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we really need this check ?
E.g. JobSet wrapper always has Trainer Node Job:

Name: constants.JobTrainerNode,

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is needed for an array right shift operation.
ReplicatedJob must have Jobs in the following order:

  • Initializer
  • Laucher
  • Trainer

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tenzen-y If we don't really use DependsOn API for Launcher <-> Trainer relationship, do we need to preserve order of ReplicatedJob (Launcher is set before Trainer Node) in our testing ?

Copy link
Member Author

@tenzen-y tenzen-y Mar 13, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Slice should guarantee the order. So, we want to keep using same order, anyplaces.

},
VolumeMounts: []corev1.VolumeMount{{
Name: jobsetplgconsts.VolumeNameInitializer,
MountPath: "constants.ModelMountPath",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
MountPath: "constants.ModelMountPath",
MountPath: constants.ModelMountPath,

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

},
{
Name: jobsetplgconsts.VolumeNameInitializer,
MountPath: "constants.ModelMountPath",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
MountPath: "constants.ModelMountPath",
MountPath: constants.ModelMountPath,

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

return j
}

func (j *JobSetWrapper) Env(rJobName, containerName string, envs ...corev1.EnvVar) *JobSetWrapper {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please can you add TODO statement in the code that these functions should be refactored in favour of this Env() API:

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

},
VolumeMounts: []corev1.VolumeMount{{
Name: jobsetplgconsts.VolumeNameInitializer,
MountPath: "constants.ModelMountPath",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think, there are a few places where you accidentally replaced this value.

Suggested change
MountPath: "constants.ModelMountPath",
MountPath: constants.ModelMountPath,

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, good catch...

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

@@ -33,7 +34,7 @@ import (

trainer "github.com/kubeflow/trainer/pkg/apis/trainer/v1alpha1"
"github.com/kubeflow/trainer/pkg/constants"
jobsetplugin "github.com/kubeflow/trainer/pkg/runtime/framework/plugins/jobset"
jobsetplgconsts "github.com/kubeflow/trainer/pkg/runtime/framework/plugins/jobset/constants"
testingutil "github.com/kubeflow/trainer/pkg/util/testing"
"github.com/kubeflow/trainer/test/integration/framework"
"github.com/kubeflow/trainer/test/util"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we combine ginkgo.When("Reconciling TrainJob") and ginkgo.Describe("TrainJob controller") under single Ginkgo context ?
I don't think that we use trainjob_controller_test.go file for integration tests outside of TrainJob controller context.
I just think that this part can be easily moved under:

ginkgo.Describe("TrainJob controller") 

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We do not want to do it. Ideally, we should decouple framework tests to dedicated When (currently, we have all cases in a single When).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, yeah that might make more sense to refactor it in the future.

Copy link
Member

@andreyvelich andreyvelich left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think, we are ready to merge this.
Thanks again for this tremendous work for such short period of time @tenzen-y!
Feel free to unhold.

/lgtm
/approve
/hold

Copy link

[APPROVALNOTIFIER] This PR is APPROVED

This pull-request has been approved by: andreyvelich

The full list of commands accepted by this bot can be found here.

The pull request process is described here

Needs approval from an approver in each of these files:

Approvers can indicate their approval by writing /approve in a comment
Approvers can cancel approval by writing /approve cancel in a comment

@tenzen-y
Copy link
Member Author

Thank you all! I would especially like to say special thanks to the leading reviewers, @astefanutti and @andreyvelich!

/hold cancel

@google-oss-prow google-oss-prow bot merged commit f64bdf2 into kubeflow:master Mar 13, 2025
14 checks passed
@tenzen-y tenzen-y deleted the implement-mpi-plugin branch March 13, 2025 05:04
tenzen-y added a commit to tenzen-y/trainer that referenced this pull request Mar 13, 2025
Signed-off-by: Yuki Iwai <[email protected]>

Use numNodes=1 as default mpi_distributed ClusterTrainingRuntime

Signed-off-by: Yuki Iwai <[email protected]>

Implemenet MPI Plugin for OpenMPI

Signed-off-by: Yuki Iwai <[email protected]>
mahdikhashan pushed a commit to mahdikhashan/trainer that referenced this pull request Mar 16, 2025
* Implemenet MPI Plugin for OpenMPI

Signed-off-by: Yuki Iwai <[email protected]>

* Directory pass the JobSetApplyconfiguration to RuntimeInfo

Signed-off-by: Yuki Iwai <[email protected]>

* Make repeated string as constants

Signed-off-by: Yuki Iwai <[email protected]>

* Use numNodes=1 as default mpi_distributed ClusterTrainingRuntime

Signed-off-by: Yuki Iwai <[email protected]>

* Remove unused errors

Signed-off-by: Yuki Iwai <[email protected]>

* Rename runLauncherAsWorker with runLauncherAsNode

Signed-off-by: Yuki Iwai <[email protected]>

* Fix unintended constants usage for ModelMountPath

Signed-off-by: Yuki Iwai <[email protected]>

* Rename SecretDataComparer with MPISecretDataComparer

Signed-off-by: Yuki Iwai <[email protected]>

* Add TODO STAEEMENT to deprecated env wrappers.

Signed-off-by: Yuki Iwai <[email protected]>

---------

Signed-off-by: Yuki Iwai <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants