From 1f7f5252ae54dcc5ae8881a54513a99416a0d2db Mon Sep 17 00:00:00 2001 From: Kevin Klues Date: Thu, 23 Jan 2025 12:36:00 +0000 Subject: [PATCH] Add ComputeDomainStatus and set it as its deployment pods come online. Signed-off-by: Kevin Klues --- .../resource/v1beta1/computedomain.go | 27 ++++++- .../resource/v1beta1/zz_generated.deepcopy.go | 42 +++++++++++ cmd/nvidia-dra-imex-controller/deployment.go | 2 +- .../deploymentpods.go | 72 ++++++++++++++++--- .../resource.nvidia.com_computedomains.yaml | 26 +++++++ .../resource.nvidia.com_computedomains.yaml | 26 +++++++ .../templates/clusterrole.yaml | 3 + .../typed/resource/v1beta1/computedomain.go | 17 +++++ .../v1beta1/fake/fake_computedomain.go | 12 ++++ 9 files changed, 216 insertions(+), 11 deletions(-) diff --git a/api/nvidia.com/resource/v1beta1/computedomain.go b/api/nvidia.com/resource/v1beta1/computedomain.go index c245d9b0..e5efea6e 100644 --- a/api/nvidia.com/resource/v1beta1/computedomain.go +++ b/api/nvidia.com/resource/v1beta1/computedomain.go @@ -24,13 +24,15 @@ import ( // +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object // +k8s:openapi-gen=true // +kubebuilder:resource:scope=Namespaced +// +kubebuilder:subresource:status // ComputeDomain prepares a set of nodes to run a multi-node workload in. type ComputeDomain struct { metav1.TypeMeta `json:",inline"` metav1.ObjectMeta `json:"metadata,omitempty"` - Spec ComputeDomainSpec `json:"spec,omitempty"` + Spec ComputeDomainSpec `json:"spec,omitempty"` + Status ComputeDomainStatus `json:"status,omitempty"` } // +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object @@ -51,3 +53,26 @@ type ComputeDomainSpec struct { ResourceClaimName string `json:"resourceClaimName,omitempty"` DeviceClassName string `json:"deviceClassName,omitempty"` } + +// ComputeDomainStatus provides the status for a ComputeDomain. +type ComputeDomainStatus struct { + // +listType=map + // +listMapKey=name + Nodes []*ComputeDomainNode `json:"nodes,omitempty"` +} + +// ComputeDomainNode provides information about each node added to a ComputeDomain. +type ComputeDomainNode struct { + Name string `json:"name"` + IPAddress string `json:"ipAddress"` + CliqueID string `json:"cliqueID"` +} + +// GetNodesAsMap returns the list of nodes in a ComputeDomainStatus as a map. +func (s *ComputeDomainStatus) GetNodesAsMap() map[string]*ComputeDomainNode { + m := make(map[string]*ComputeDomainNode) + for _, node := range s.Nodes { + m[node.Name] = node + } + return m +} diff --git a/api/nvidia.com/resource/v1beta1/zz_generated.deepcopy.go b/api/nvidia.com/resource/v1beta1/zz_generated.deepcopy.go index 8da6d4ac..e0363f81 100644 --- a/api/nvidia.com/resource/v1beta1/zz_generated.deepcopy.go +++ b/api/nvidia.com/resource/v1beta1/zz_generated.deepcopy.go @@ -30,6 +30,7 @@ func (in *ComputeDomain) DeepCopyInto(out *ComputeDomain) { out.TypeMeta = in.TypeMeta in.ObjectMeta.DeepCopyInto(&out.ObjectMeta) out.Spec = in.Spec + in.Status.DeepCopyInto(&out.Status) } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ComputeDomain. @@ -82,6 +83,21 @@ func (in *ComputeDomainList) DeepCopyObject() runtime.Object { return nil } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *ComputeDomainNode) DeepCopyInto(out *ComputeDomainNode) { + *out = *in +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ComputeDomainNode. +func (in *ComputeDomainNode) DeepCopy() *ComputeDomainNode { + if in == nil { + return nil + } + out := new(ComputeDomainNode) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *ComputeDomainSpec) DeepCopyInto(out *ComputeDomainSpec) { *out = *in @@ -97,6 +113,32 @@ func (in *ComputeDomainSpec) DeepCopy() *ComputeDomainSpec { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *ComputeDomainStatus) DeepCopyInto(out *ComputeDomainStatus) { + *out = *in + if in.Nodes != nil { + in, out := &in.Nodes, &out.Nodes + *out = make([]*ComputeDomainNode, len(*in)) + for i := range *in { + if (*in)[i] != nil { + in, out := &(*in)[i], &(*out)[i] + *out = new(ComputeDomainNode) + **out = **in + } + } + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ComputeDomainStatus. +func (in *ComputeDomainStatus) DeepCopy() *ComputeDomainStatus { + if in == nil { + return nil + } + out := new(ComputeDomainStatus) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *GpuConfig) DeepCopyInto(out *GpuConfig) { *out = *in diff --git a/cmd/nvidia-dra-imex-controller/deployment.go b/cmd/nvidia-dra-imex-controller/deployment.go index b6b164f4..6c3d1870 100644 --- a/cmd/nvidia-dra-imex-controller/deployment.go +++ b/cmd/nvidia-dra-imex-controller/deployment.go @@ -308,7 +308,7 @@ func (m *DeploymentManager) addPodManager(ctx context.Context, labelSelector *me return nil } - podManager := NewDeploymentPodManager(m.config, m.imexChannelManager, labelSelector, numPods) + podManager := NewDeploymentPodManager(m.config, m.imexChannelManager, labelSelector, numPods, m.getComputeDomain) if err := podManager.Start(ctx); err != nil { return fmt.Errorf("error creating Pod manager: %w", err) diff --git a/cmd/nvidia-dra-imex-controller/deploymentpods.go b/cmd/nvidia-dra-imex-controller/deploymentpods.go index f85894cf..fb4a8605 100644 --- a/cmd/nvidia-dra-imex-controller/deploymentpods.go +++ b/cmd/nvidia-dra-imex-controller/deploymentpods.go @@ -29,6 +29,12 @@ import ( corev1listers "k8s.io/client-go/listers/core/v1" "k8s.io/client-go/tools/cache" "k8s.io/klog/v2" + + nvapi "github.com/NVIDIA/k8s-dra-driver/api/nvidia.com/resource/v1beta1" +) + +const ( + CliqueIDLabelKey = "nvidia.com/gpu.clique" ) type DeploymentPodManager struct { @@ -40,14 +46,16 @@ type DeploymentPodManager struct { informer cache.SharedInformer lister corev1listers.PodLister - nodeSelector corev1.NodeSelector + getComputeDomain GetComputeDomainFunc + computeDomainNodes []*nvapi.ComputeDomainNode computeDomainLabel string numPods int + nodeSelector corev1.NodeSelector imexChannelManager *ImexChannelManager } -func NewDeploymentPodManager(config *ManagerConfig, imexChannelManager *ImexChannelManager, labelSelector *metav1.LabelSelector, numPods int) *DeploymentPodManager { +func NewDeploymentPodManager(config *ManagerConfig, imexChannelManager *ImexChannelManager, labelSelector *metav1.LabelSelector, numPods int, getComputeDomain GetComputeDomainFunc) *DeploymentPodManager { factory := informers.NewSharedInformerFactoryWithOptions( config.clientsets.Core, informerResyncPeriod, @@ -79,9 +87,10 @@ func NewDeploymentPodManager(config *ManagerConfig, imexChannelManager *ImexChan factory: factory, informer: informer, lister: lister, - nodeSelector: nodeSelector, + getComputeDomain: getComputeDomain, computeDomainLabel: labelSelector.MatchLabels[computeDomainLabelKey], numPods: numPods, + nodeSelector: nodeSelector, imexChannelManager: imexChannelManager, } @@ -150,23 +159,68 @@ func (m *DeploymentPodManager) onPodAddOrUpdate(ctx context.Context, obj any) er klog.Infof("Processing added or updated Pod: %s/%s", p.Namespace, p.Name) + cd, err := m.getComputeDomain(p.Labels[computeDomainLabelKey]) + if err != nil { + return fmt.Errorf("error getting ComputeDomain: %w", err) + } + if cd == nil { + return nil + } + if p.Spec.NodeName == "" { return fmt.Errorf("pod not yet scheduled: %s/%s", p.Namespace, p.Name) } - hostnameLabels := m.nodeSelector.NodeSelectorTerms[0].MatchExpressions[0].Values - if !slices.Contains(hostnameLabels, p.Spec.NodeName) { - hostnameLabels = append(hostnameLabels, p.Spec.NodeName) + var nodeNames []string + for _, node := range m.computeDomainNodes { + nodeNames = append(nodeNames, node.Name) } - m.nodeSelector.NodeSelectorTerms[0].MatchExpressions[0].Values = hostnameLabels - if len(hostnameLabels) != m.numPods { - return fmt.Errorf("node selector not yet complete") + if !slices.Contains(nodeNames, p.Spec.NodeName) { + node, err := m.GetComputeDomainNode(ctx, p.Spec.NodeName) + if err != nil { + return fmt.Errorf("error getting ComputeDomainNode: %w", err) + } + nodeNames = append(nodeNames, node.Name) + m.computeDomainNodes = append(m.computeDomainNodes, node) } + if len(nodeNames) != m.numPods { + return fmt.Errorf("not all pods scheduled yet") + } + + cd.Status.Nodes = m.computeDomainNodes + if _, err = m.config.clientsets.Nvidia.ResourceV1beta1().ComputeDomains(cd.Namespace).UpdateStatus(ctx, cd, metav1.UpdateOptions{}); err != nil { + return fmt.Errorf("error updating nodes in ComputeDomain status: %w", err) + } + + m.nodeSelector.NodeSelectorTerms[0].MatchExpressions[0].Values = nodeNames if err := m.imexChannelManager.CreateOrUpdatePool(m.computeDomainLabel, &m.nodeSelector); err != nil { return fmt.Errorf("failed to create or update IMEX channel pool: %w", err) } return nil } + +func (m *DeploymentPodManager) GetComputeDomainNode(ctx context.Context, nodeName string) (*nvapi.ComputeDomainNode, error) { + node, err := m.config.clientsets.Core.CoreV1().Nodes().Get(ctx, nodeName, metav1.GetOptions{}) + if err != nil { + return nil, fmt.Errorf("error getting Node '%s': %w", nodeName, err) + } + + var ipAddress string + for _, addr := range node.Status.Addresses { + if addr.Type == corev1.NodeInternalIP { + ipAddress = addr.Address + break + } + } + + n := &nvapi.ComputeDomainNode{ + Name: nodeName, + IPAddress: ipAddress, + CliqueID: node.Labels[CliqueIDLabelKey], + } + + return n, nil +} diff --git a/deployments/helm/k8s-dra-gpu-driver/crds/resource.nvidia.com_computedomains.yaml b/deployments/helm/k8s-dra-gpu-driver/crds/resource.nvidia.com_computedomains.yaml index 8b6be75f..7be1f0f9 100644 --- a/deployments/helm/k8s-dra-gpu-driver/crds/resource.nvidia.com_computedomains.yaml +++ b/deployments/helm/k8s-dra-gpu-driver/crds/resource.nvidia.com_computedomains.yaml @@ -53,6 +53,32 @@ spec: - message: Exactly one of 'resourceClaimName' or 'deviceClassName' must be set. rule: '(has(self.resourceClaimName) ? !has(self.deviceClassName) : has(self.deviceClassName))' + status: + description: ComputeDomainStatus provides the status for a ComputeDomain. + properties: + nodes: + items: + description: ComputeDomainNode provides information about each node + added to a ComputeDomain. + properties: + cliqueID: + type: string + ipAddress: + type: string + name: + type: string + required: + - cliqueID + - ipAddress + - name + type: object + type: array + x-kubernetes-list-map-keys: + - name + x-kubernetes-list-type: map + type: object type: object served: true storage: true + subresources: + status: {} diff --git a/deployments/helm/k8s-dra-imex-driver/crds/resource.nvidia.com_computedomains.yaml b/deployments/helm/k8s-dra-imex-driver/crds/resource.nvidia.com_computedomains.yaml index 8b6be75f..7be1f0f9 100644 --- a/deployments/helm/k8s-dra-imex-driver/crds/resource.nvidia.com_computedomains.yaml +++ b/deployments/helm/k8s-dra-imex-driver/crds/resource.nvidia.com_computedomains.yaml @@ -53,6 +53,32 @@ spec: - message: Exactly one of 'resourceClaimName' or 'deviceClassName' must be set. rule: '(has(self.resourceClaimName) ? !has(self.deviceClassName) : has(self.deviceClassName))' + status: + description: ComputeDomainStatus provides the status for a ComputeDomain. + properties: + nodes: + items: + description: ComputeDomainNode provides information about each node + added to a ComputeDomain. + properties: + cliqueID: + type: string + ipAddress: + type: string + name: + type: string + required: + - cliqueID + - ipAddress + - name + type: object + type: array + x-kubernetes-list-map-keys: + - name + x-kubernetes-list-type: map + type: object type: object served: true storage: true + subresources: + status: {} diff --git a/deployments/helm/k8s-dra-imex-driver/templates/clusterrole.yaml b/deployments/helm/k8s-dra-imex-driver/templates/clusterrole.yaml index a7698eea..dedf6595 100644 --- a/deployments/helm/k8s-dra-imex-driver/templates/clusterrole.yaml +++ b/deployments/helm/k8s-dra-imex-driver/templates/clusterrole.yaml @@ -8,6 +8,9 @@ rules: - apiGroups: ["resource.nvidia.com"] resources: ["computedomains"] verbs: ["get", "list", "watch", "create", "update", "patch", "delete"] +- apiGroups: ["resource.nvidia.com"] + resources: ["computedomains/status"] + verbs: ["get", "list", "watch", "create", "update", "patch", "delete"] - apiGroups: ["resource.k8s.io"] resources: ["resourceclaims"] verbs: ["get", "list", "watch", "create", "update", "patch", "delete"] diff --git a/pkg/nvidia.com/clientset/versioned/typed/resource/v1beta1/computedomain.go b/pkg/nvidia.com/clientset/versioned/typed/resource/v1beta1/computedomain.go index c1d2ae5d..b4bef1b2 100644 --- a/pkg/nvidia.com/clientset/versioned/typed/resource/v1beta1/computedomain.go +++ b/pkg/nvidia.com/clientset/versioned/typed/resource/v1beta1/computedomain.go @@ -40,6 +40,7 @@ type ComputeDomainsGetter interface { type ComputeDomainInterface interface { Create(ctx context.Context, computeDomain *v1beta1.ComputeDomain, opts v1.CreateOptions) (*v1beta1.ComputeDomain, error) Update(ctx context.Context, computeDomain *v1beta1.ComputeDomain, opts v1.UpdateOptions) (*v1beta1.ComputeDomain, error) + UpdateStatus(ctx context.Context, computeDomain *v1beta1.ComputeDomain, opts v1.UpdateOptions) (*v1beta1.ComputeDomain, error) Delete(ctx context.Context, name string, opts v1.DeleteOptions) error DeleteCollection(ctx context.Context, opts v1.DeleteOptions, listOpts v1.ListOptions) error Get(ctx context.Context, name string, opts v1.GetOptions) (*v1beta1.ComputeDomain, error) @@ -135,6 +136,22 @@ func (c *computeDomains) Update(ctx context.Context, computeDomain *v1beta1.Comp return } +// UpdateStatus was generated because the type contains a Status member. +// Add a +genclient:noStatus comment above the type to avoid generating UpdateStatus(). +func (c *computeDomains) UpdateStatus(ctx context.Context, computeDomain *v1beta1.ComputeDomain, opts v1.UpdateOptions) (result *v1beta1.ComputeDomain, err error) { + result = &v1beta1.ComputeDomain{} + err = c.client.Put(). + Namespace(c.ns). + Resource("computedomains"). + Name(computeDomain.Name). + SubResource("status"). + VersionedParams(&opts, scheme.ParameterCodec). + Body(computeDomain). + Do(ctx). + Into(result) + return +} + // Delete takes name of the computeDomain and deletes it. Returns an error if one occurs. func (c *computeDomains) Delete(ctx context.Context, name string, opts v1.DeleteOptions) error { return c.client.Delete(). diff --git a/pkg/nvidia.com/clientset/versioned/typed/resource/v1beta1/fake/fake_computedomain.go b/pkg/nvidia.com/clientset/versioned/typed/resource/v1beta1/fake/fake_computedomain.go index 96554b0a..ed52019a 100644 --- a/pkg/nvidia.com/clientset/versioned/typed/resource/v1beta1/fake/fake_computedomain.go +++ b/pkg/nvidia.com/clientset/versioned/typed/resource/v1beta1/fake/fake_computedomain.go @@ -101,6 +101,18 @@ func (c *FakeComputeDomains) Update(ctx context.Context, computeDomain *v1beta1. return obj.(*v1beta1.ComputeDomain), err } +// UpdateStatus was generated because the type contains a Status member. +// Add a +genclient:noStatus comment above the type to avoid generating UpdateStatus(). +func (c *FakeComputeDomains) UpdateStatus(ctx context.Context, computeDomain *v1beta1.ComputeDomain, opts v1.UpdateOptions) (*v1beta1.ComputeDomain, error) { + obj, err := c.Fake. + Invokes(testing.NewUpdateSubresourceAction(computedomainsResource, "status", c.ns, computeDomain), &v1beta1.ComputeDomain{}) + + if obj == nil { + return nil, err + } + return obj.(*v1beta1.ComputeDomain), err +} + // Delete takes name of the computeDomain and deletes it. Returns an error if one occurs. func (c *FakeComputeDomains) Delete(ctx context.Context, name string, opts v1.DeleteOptions) error { _, err := c.Fake.