Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
Signed-off-by: Kevin Klues <[email protected]>
  • Loading branch information
klueska committed Jan 17, 2025
1 parent 6b4d5e1 commit 6b434b6
Show file tree
Hide file tree
Showing 13 changed files with 943 additions and 655 deletions.
90 changes: 22 additions & 68 deletions cmd/nvidia-dra-controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,95 +19,49 @@ package main
import (
"context"
"fmt"
"sync"
"time"

"k8s.io/client-go/informers"

"github.com/NVIDIA/k8s-dra-driver/pkg/flags"
nvinformers "github.com/NVIDIA/k8s-dra-driver/pkg/nvidia.com/resource/informers/externalversions"
"github.com/NVIDIA/k8s-dra-driver/pkg/workqueue"
)

type ManagerConfig struct {
clientsets flags.ClientSets
nvInformerFactory nvinformers.SharedInformerFactory
coreInformerFactory informers.SharedInformerFactory
workQueue *workqueue.WorkQueue
driverName string
driverNamespace string
clientsets flags.ClientSets
workQueue *workqueue.WorkQueue
}

type OwnerExistsFunc func(ctx context.Context, uid string) (bool, error)

// Controller defines the type to represent the controller.
type Controller struct {
waitGroup sync.WaitGroup

ImexManager *ImexManager
MultiNodeEnvironmentManager *MultiNodeEnvironmentManager
config *Config
}

// StartController starts a Controller.
func StartController(ctx context.Context, config *Config) (*Controller, error) {
if !config.flags.deviceClasses.Has(ImexChannelType) {
return nil, nil
}
// NewController creates a new Controller.
func NewController(config *Config) *Controller {
return &Controller{config: config}
}

// Run runs a Controller.
func (c *Controller) Run(ctx context.Context) error {
workQueue := workqueue.New(workqueue.DefaultControllerRateLimiter())
nvInformerFactory := nvinformers.NewSharedInformerFactory(config.clientsets.Nvidia, 30*time.Second)
coreInformerFactory := informers.NewSharedInformerFactory(config.clientsets.Core, 30*time.Second)

managerConfig := &ManagerConfig{
clientsets: config.clientsets,
nvInformerFactory: nvInformerFactory,
coreInformerFactory: coreInformerFactory,
workQueue: workQueue,
}

imexManager, err := StartImexManager(ctx, config)
if err != nil {
return nil, fmt.Errorf("error starting IMEX manager: %w", err)
driverName: c.config.driverName,
driverNamespace: c.config.flags.namespace,
clientsets: c.config.clientsets,
workQueue: workQueue,
}

mneManager, err := NewMultiNodeEnvironmentManager(ctx, managerConfig)
if err != nil {
return nil, fmt.Errorf("error starting MultiNodeEnvironment manager: %w", err)
}

c := &Controller{
ImexManager: imexManager,
MultiNodeEnvironmentManager: mneManager,
}

c.waitGroup.Add(3)
go func() {
defer c.waitGroup.Done()
nvInformerFactory.Start(ctx.Done())
}()
go func() {
defer c.waitGroup.Done()
coreInformerFactory.Start(ctx.Done())
}()
go func() {
defer c.waitGroup.Done()
workQueue.Run(ctx)
}()

if err := c.MultiNodeEnvironmentManager.WaitForCacheSync(ctx); err != nil {
return nil, fmt.Errorf("error syncing cache: %w", err)
}

return c, nil
}
mneManager := NewMultiNodeEnvironmentManager(managerConfig)

// Stop stops a running Controller.
func (m *Controller) Stop() error {
if m == nil {
return nil
if err := mneManager.Start(ctx); err != nil {
return fmt.Errorf("error starting MultiNodeEnvironment manager: %w", err)
}

m.waitGroup.Wait()
workQueue.Run(ctx)

if err := m.ImexManager.Stop(); err != nil {
return fmt.Errorf("error stopping IMEX manager: %w", err)
if err := mneManager.Shutdown(ctx); err != nil {
return fmt.Errorf("error shutting down MultiNodeEnvironment manager: %w", err)
}

return nil
Expand Down
Loading

0 comments on commit 6b434b6

Please sign in to comment.