Skip to content

Commit bc880fc

Browse files
committed
Add logic to autogenerate a ResourceClaim from a MultiNodeEnvironment
Signed-off-by: Kevin Klues <[email protected]>
1 parent 482000d commit bc880fc

File tree

3 files changed

+296
-4
lines changed

3 files changed

+296
-4
lines changed

cmd/nvidia-dra-controller/controller.go

+15-3
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,8 @@ import (
2222
)
2323

2424
type Controller struct {
25-
ImexManager *ImexManager
25+
ImexManager *ImexManager
26+
MultiNodeEnvironmentManager *MultiNodeEnvironmentManager
2627
}
2728

2829
// StartController starts a Controller.
@@ -36,8 +37,14 @@ func StartController(ctx context.Context, config *Config) (*Controller, error) {
3637
return nil, fmt.Errorf("error starting IMEX manager: %w", err)
3738
}
3839

40+
mneManager, err := StartMultiNodeEnvironmentManager(ctx, config)
41+
if err != nil {
42+
return nil, fmt.Errorf("error starting MultiNodeEnvironment manager: %w", err)
43+
}
44+
3945
m := &Controller{
40-
ImexManager: imexManager,
46+
ImexManager: imexManager,
47+
MultiNodeEnvironmentManager: mneManager,
4148
}
4249

4350
return m, nil
@@ -48,5 +55,10 @@ func (m *Controller) Stop() error {
4855
if m == nil {
4956
return nil
5057
}
51-
return m.ImexManager.Stop()
58+
imErr := m.ImexManager.Stop()
59+
mnErr := m.MultiNodeEnvironmentManager.Stop()
60+
if imErr != nil || mnErr != nil {
61+
return fmt.Errorf("IMEX manager error: %w, MultiNodeEnvironment manager error: %w", imErr, mnErr)
62+
}
63+
return nil
5264
}

cmd/nvidia-dra-controller/mnenv.go

+277
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,277 @@
1+
/*
2+
* Copyright (c) 2025 NVIDIA CORPORATION. All rights reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package main
18+
19+
import (
20+
"context"
21+
"fmt"
22+
"sync"
23+
"time"
24+
25+
resourceapi "k8s.io/api/resource/v1beta1"
26+
"k8s.io/apimachinery/pkg/api/errors"
27+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
28+
"k8s.io/client-go/informers"
29+
resourcelisters "k8s.io/client-go/listers/resource/v1beta1"
30+
"k8s.io/client-go/tools/cache"
31+
"k8s.io/klog/v2"
32+
"k8s.io/utils/ptr"
33+
34+
nvapi "github.com/NVIDIA/k8s-dra-driver/api/nvidia.com/resource/gpu/v1alpha1"
35+
"github.com/NVIDIA/k8s-dra-driver/pkg/flags"
36+
nvinformers "github.com/NVIDIA/k8s-dra-driver/pkg/nvidia.com/resource/informers/externalversions"
37+
nvlisters "github.com/NVIDIA/k8s-dra-driver/pkg/nvidia.com/resource/listers/gpu/v1alpha1"
38+
"github.com/NVIDIA/k8s-dra-driver/pkg/workqueue"
39+
)
40+
41+
const (
42+
multiNodeEnvironmentFinalizer = "gpu.nvidia.com/finalizer.multiNodeEnvironment"
43+
imexDeviceClass = "imex.nvidia.com"
44+
)
45+
46+
type WorkItem struct {
47+
Object any
48+
EventType string
49+
}
50+
51+
type MultiNodeEnvironmentManager struct {
52+
clientsets flags.ClientSets
53+
waitGroup sync.WaitGroup
54+
55+
multiNodeEnvironmentInformer cache.SharedIndexInformer
56+
multiNodeEnvironmentLister nvlisters.MultiNodeEnvironmentLister
57+
resourceClaimLister resourcelisters.ResourceClaimLister
58+
}
59+
60+
// StartManager starts a MultiNodeEnvironmentManager.
61+
func StartMultiNodeEnvironmentManager(ctx context.Context, config *Config) (*MultiNodeEnvironmentManager, error) {
62+
queue := workqueue.New(workqueue.DefaultControllerRateLimiter())
63+
64+
nvInformerFactory := nvinformers.NewSharedInformerFactory(config.clientsets.Nvidia, 30*time.Second)
65+
coreInformerFactory := informers.NewSharedInformerFactory(config.clientsets.Core, 30*time.Second)
66+
67+
mneInformer := nvInformerFactory.Gpu().V1alpha1().MultiNodeEnvironments().Informer()
68+
mneLister := nvlisters.NewMultiNodeEnvironmentLister(mneInformer.GetIndexer())
69+
70+
rcInformer := coreInformerFactory.Resource().V1beta1().ResourceClaims().Informer()
71+
rcLister := resourcelisters.NewResourceClaimLister(rcInformer.GetIndexer())
72+
73+
m := &MultiNodeEnvironmentManager{
74+
clientsets: config.clientsets,
75+
multiNodeEnvironmentInformer: mneInformer,
76+
multiNodeEnvironmentLister: mneLister,
77+
resourceClaimLister: rcLister,
78+
}
79+
80+
var err error
81+
err = mneInformer.AddIndexers(cache.Indexers{
82+
"uid": func(obj interface{}) ([]string, error) {
83+
mne, ok := obj.(*nvapi.MultiNodeEnvironment)
84+
if !ok {
85+
return nil, fmt.Errorf("expected a MultiNodeEnvironment but got %T", obj)
86+
}
87+
return []string{string(mne.UID)}, nil
88+
},
89+
})
90+
if err != nil {
91+
return nil, fmt.Errorf("error adding indexer for MultiNodeEnvironment UUIDs: %w", err)
92+
}
93+
94+
_, err = mneInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
95+
AddFunc: func(obj any) { queue.Enqueue(obj, m.onMultiNodeEnvironmentAdd) },
96+
})
97+
if err != nil {
98+
return nil, fmt.Errorf("error adding event handlers for MultiNodeEnvironment informer: %w", err)
99+
}
100+
101+
_, err = rcInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
102+
AddFunc: func(obj any) { queue.Enqueue(obj, m.onResourceClaimAddOrUpdate) },
103+
UpdateFunc: func(objOld, objNew any) { queue.Enqueue(objNew, m.onResourceClaimAddOrUpdate) },
104+
})
105+
if err != nil {
106+
return nil, fmt.Errorf("error adding event handlers for ResourceClaim informer: %w", err)
107+
}
108+
109+
m.waitGroup.Add(3)
110+
go func() {
111+
defer m.waitGroup.Done()
112+
nvInformerFactory.Start(ctx.Done())
113+
}()
114+
go func() {
115+
defer m.waitGroup.Done()
116+
coreInformerFactory.Start(ctx.Done())
117+
}()
118+
go func() {
119+
defer m.waitGroup.Done()
120+
queue.Run(ctx.Done())
121+
}()
122+
123+
if !cache.WaitForCacheSync(ctx.Done(), mneInformer.HasSynced, rcInformer.HasSynced) {
124+
klog.Warning("Cache sync failed; retrying in 5 seconds")
125+
time.Sleep(5 * time.Second)
126+
if !cache.WaitForCacheSync(ctx.Done(), mneInformer.HasSynced, rcInformer.HasSynced) {
127+
return nil, fmt.Errorf("informer cache sync failed twice")
128+
}
129+
}
130+
131+
return m, nil
132+
}
133+
134+
// Stop stops a running MultiNodeEnvironmentManager.
135+
func (m *MultiNodeEnvironmentManager) Stop() error {
136+
if m == nil {
137+
return nil
138+
}
139+
m.waitGroup.Wait()
140+
return nil
141+
}
142+
143+
func (m *MultiNodeEnvironmentManager) onMultiNodeEnvironmentAdd(obj any) error {
144+
mne, ok := obj.(*nvapi.MultiNodeEnvironment)
145+
if !ok {
146+
return fmt.Errorf("failed to cast to MultiNodeEnvironment")
147+
}
148+
149+
klog.Infof("Processing added MultiNodeEnvironment: %s/%s", mne.Namespace, mne.Name)
150+
151+
gvk := nvapi.SchemeGroupVersion.WithKind("MultiNodeEnvironment")
152+
mne.APIVersion = gvk.GroupVersion().String()
153+
mne.Kind = gvk.Kind
154+
155+
ownerReference := metav1.OwnerReference{
156+
APIVersion: mne.APIVersion,
157+
Kind: mne.Kind,
158+
Name: mne.Name,
159+
UID: mne.UID,
160+
Controller: ptr.To(true),
161+
}
162+
163+
if _, err := m.createResourceClaim(mne.Namespace, mne.Spec.ResourceClaimName, ownerReference); err != nil {
164+
return fmt.Errorf("error creating ResourceClaim '%s/%s': %w", mne.Namespace, mne.Spec.ResourceClaimName, err)
165+
}
166+
167+
return nil
168+
}
169+
170+
func (m *MultiNodeEnvironmentManager) onResourceClaimAddOrUpdate(obj any) error {
171+
rc, ok := obj.(*resourceapi.ResourceClaim)
172+
if !ok {
173+
return fmt.Errorf("failed to cast to ResourceClaim")
174+
}
175+
176+
klog.Infof("Processing added or updated ResourceClaim: %s/%s", rc.Namespace, rc.Name)
177+
178+
if len(rc.OwnerReferences) != 1 {
179+
return nil
180+
}
181+
182+
if rc.OwnerReferences[0].Kind != nvapi.MultiNodeEnvironmentKind {
183+
return nil
184+
}
185+
186+
if !cache.WaitForCacheSync(context.Background().Done(), m.multiNodeEnvironmentInformer.HasSynced) {
187+
return fmt.Errorf("cache sync failed for MultiNodeEnvironment")
188+
}
189+
190+
mnes, err := m.multiNodeEnvironmentInformer.GetIndexer().ByIndex("uid", string(rc.OwnerReferences[0].UID))
191+
if err != nil {
192+
return fmt.Errorf("error retrieving MultiNodeInformer OwnerReference by UID from indexer: %w", err)
193+
}
194+
if len(mnes) != 0 {
195+
return nil
196+
}
197+
198+
if err := m.removeResourceClaimFinalizer(rc.Namespace, rc.Name); err != nil {
199+
return fmt.Errorf("error removing finalizer on ResourceClaim '%s/%s': %w", rc.Namespace, rc.Name, err)
200+
}
201+
202+
if err := m.deleteResourceClaim(rc.Namespace, rc.Name); err != nil {
203+
return fmt.Errorf("error deleting ResourceClaim '%s/%s': %w", rc.Namespace, rc.Name, err)
204+
}
205+
206+
return nil
207+
}
208+
209+
func (m *MultiNodeEnvironmentManager) createResourceClaim(namespace, name string, ownerReference metav1.OwnerReference) (*resourceapi.ResourceClaim, error) {
210+
rc, err := m.resourceClaimLister.ResourceClaims(namespace).Get(name)
211+
if err == nil {
212+
if len(rc.OwnerReferences) != 1 && rc.OwnerReferences[0] != ownerReference {
213+
return nil, fmt.Errorf("ResourceClaim '%s/%s' exists without expected OwnerReference: %v", namespace, name, ownerReference)
214+
}
215+
return rc, nil
216+
}
217+
if !errors.IsNotFound(err) {
218+
return nil, fmt.Errorf("error retrieving ResourceClaim: %w", err)
219+
}
220+
221+
resourceClaim := &resourceapi.ResourceClaim{
222+
ObjectMeta: metav1.ObjectMeta{
223+
Name: name,
224+
Namespace: namespace,
225+
OwnerReferences: []metav1.OwnerReference{ownerReference},
226+
Finalizers: []string{multiNodeEnvironmentFinalizer},
227+
},
228+
Spec: resourceapi.ResourceClaimSpec{
229+
Devices: resourceapi.DeviceClaim{
230+
Requests: []resourceapi.DeviceRequest{{
231+
Name: "imex", DeviceClassName: imexDeviceClass,
232+
}},
233+
},
234+
},
235+
}
236+
237+
rc, err = m.clientsets.Core.ResourceV1beta1().ResourceClaims(resourceClaim.Namespace).Create(context.Background(), resourceClaim, metav1.CreateOptions{})
238+
if err != nil {
239+
return nil, fmt.Errorf("error creating ResourceClaim: %w", err)
240+
}
241+
242+
return rc, nil
243+
}
244+
245+
func (m *MultiNodeEnvironmentManager) removeResourceClaimFinalizer(namespace, name string) error {
246+
rc, err := m.resourceClaimLister.ResourceClaims(namespace).Get(name)
247+
if err != nil && errors.IsNotFound(err) {
248+
return nil
249+
}
250+
if err != nil {
251+
return fmt.Errorf("error retrieving ResourceClaim: %w", err)
252+
}
253+
254+
newRC := rc.DeepCopy()
255+
256+
newRC.Finalizers = []string{}
257+
for _, f := range rc.Finalizers {
258+
if f != multiNodeEnvironmentFinalizer {
259+
newRC.Finalizers = append(newRC.Finalizers, f)
260+
}
261+
}
262+
263+
_, err = m.clientsets.Core.ResourceV1beta1().ResourceClaims(namespace).Update(context.Background(), newRC, metav1.UpdateOptions{})
264+
if err != nil {
265+
return fmt.Errorf("error updating ResourceClaim: %w", err)
266+
}
267+
268+
return nil
269+
}
270+
271+
func (m *MultiNodeEnvironmentManager) deleteResourceClaim(namespace, name string) error {
272+
err := m.clientsets.Core.ResourceV1beta1().ResourceClaims(namespace).Delete(context.Background(), name, metav1.DeleteOptions{})
273+
if err != nil && !errors.IsNotFound(err) {
274+
return fmt.Errorf("erroring deleting ResourceClaim: %w", err)
275+
}
276+
return nil
277+
}

deployments/helm/k8s-dra-driver/templates/clusterrole.yaml

+4-1
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,12 @@ metadata:
55
name: {{ include "k8s-dra-driver.fullname" . }}-role
66
namespace: {{ include "k8s-dra-driver.namespace" . }}
77
rules:
8+
- apiGroups: ["gpu.nvidia.com"]
9+
resources: ["multinodeenvironments"]
10+
verbs: ["get", "list", "watch"]
811
- apiGroups: ["resource.k8s.io"]
912
resources: ["resourceclaims"]
10-
verbs: ["get"]
13+
verbs: ["get", "list", "watch", "create", "update", "delete"]
1114
- apiGroups: ["resource.k8s.io"]
1215
resources: ["resourceclaims/status"]
1316
verbs: ["update"]

0 commit comments

Comments
 (0)