Skip to content

Commit

Permalink
Separate the logic of applying resources from syncer and make it reus…
Browse files Browse the repository at this point in the history
…able (#400)

* compose.yml: trim whitespace at eol

* add resourceapplier and use it in syncer

* resourceapplier: add test for creating/updating/deleting pods

* di: make resourceapplier configurable

* fix for lint

* refactor test

* fix for lint

* merge createPodsWithFilter with createPods

* docs: update options for syncer

* merge syncer options with resourceapplier options
  • Loading branch information
saza-ku authored Jan 21, 2025
1 parent 1fc02b9 commit bc586b6
Show file tree
Hide file tree
Showing 9 changed files with 980 additions and 285 deletions.
4 changes: 2 additions & 2 deletions compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ services:
image: busybox
volumes:
- conf:/config
- ${PWD}/simulator/cmd/scheduler:/host-config:ro
- ${PWD}/simulator/cmd/scheduler:/host-config:ro
command: sh -c "cp -rf /host-config/* /config/"
simulator-scheduler:
image: registry.k8s.io/scheduler-simulator/debuggable-scheduler:v0.4.0
Expand Down Expand Up @@ -68,4 +68,4 @@ networks:
driver: bridge
volumes:
simulator-etcd-data:
conf:
conf:
4 changes: 2 additions & 2 deletions simulator/cmd/simulator/simulator.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@ import (
"k8s.io/klog/v2"

"sigs.k8s.io/kube-scheduler-simulator/simulator/config"
"sigs.k8s.io/kube-scheduler-simulator/simulator/resourceapplier"
"sigs.k8s.io/kube-scheduler-simulator/simulator/server"
"sigs.k8s.io/kube-scheduler-simulator/simulator/server/di"
"sigs.k8s.io/kube-scheduler-simulator/simulator/syncer"
)

const (
Expand Down Expand Up @@ -94,7 +94,7 @@ func startSimulator() error {
return xerrors.Errorf("kubeapi-server is not ready: %w", err)
}

dic, err := di.NewDIContainer(client, dynamicClient, restMapper, etcdclient, restCfg, cfg.InitialSchedulerCfg, cfg.ExternalImportEnabled, cfg.ResourceSyncEnabled, importClusterResourceClient, importClusterDynamicClient, cfg.Port, syncer.Options{})
dic, err := di.NewDIContainer(client, dynamicClient, restMapper, etcdclient, restCfg, cfg.InitialSchedulerCfg, cfg.ExternalImportEnabled, cfg.ResourceSyncEnabled, importClusterResourceClient, importClusterDynamicClient, cfg.Port, resourceapplier.Options{})
if err != nil {
return xerrors.Errorf("create di container: %w", err)
}
Expand Down
20 changes: 12 additions & 8 deletions simulator/docs/import-cluster-resources.md
Original file line number Diff line number Diff line change
Expand Up @@ -68,19 +68,23 @@ It imports the following resources, which the scheduler's default plugins take i
If you need to, you can tweak which resources to import via the option in [/simulator/cmd/simulator/simulator.go](https://github.com/kubernetes-sigs/kube-scheduler-simulator/blob/master/simulator/cmd/simulator/simulator.go):

```go
dic, err := di.NewDIContainer(..., syncer.Options{
dic, err := di.NewDIContainer(..., resourceapplier.Options{
// GVRsToSync is a list of GroupVersionResource that will be synced.
// If GVRsToSync is nil, defaultGVRs are used.
GVRsToSync: []schema.GroupVersionResource{
{Group: "your-group", Version: "v1", Resource: "your-custom-resources"},
}
{Group: "your-group", Version: "v1", Resource: "your-custom-resources"},
},
// Actually, more options are available...
// AdditionalMutatingFunctions is a list of mutating functions that users add.
AdditionalMutatingFunctions: map[schema.GroupVersionResource]MutatingFunction{...}
// AdditionalFilteringFunctions is a list of filtering functions that users add.
AdditionalFilteringFunctions: map[schema.GroupVersionResource]FilteringFunction{...}
// FilterBeforeCreating is a list of additional filtering functions that are applied before creating resources.
FilterBeforeCreating: map[schema.GroupVersionResource][]resourceapplier.FilteringFunction{},
// MutateBeforeCreating is a list of additional mutating functions that are applied before creating resources.
MutateBeforeCreating: map[schema.GroupVersionResource][]resourceapplier.MutatingFunction{},
// FilterBeforeUpdating is a list of additional filtering functions that are applied before updating resources.
FilterBeforeUpdating: map[schema.GroupVersionResource][]resourceapplier.FilteringFunction{},
// MutateBeforeUpdating is a list of additional mutating functions that are applied before updating resources.
MutateBeforeUpdating: map[schema.GroupVersionResource][]resourceapplier.MutatingFunction{},
})
```

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package syncer
package resourceapplier

import (
"context"
Expand All @@ -11,49 +11,31 @@ import (
"k8s.io/klog/v2"
)

// DefaultGVRs is a list of GroupVersionResource that we sync by default (configurable with Options),
// which is a suitable resource set for the vanilla scheduler.
//
// Note that this order matters - When first importing resources, we want to sync namespaces first, then priorityclasses, storageclasses...
var DefaultGVRs = []schema.GroupVersionResource{
{Group: "", Version: "v1", Resource: "namespaces"},
{Group: "scheduling.k8s.io", Version: "v1", Resource: "priorityclasses"},
{Group: "storage.k8s.io", Version: "v1", Resource: "storageclasses"},
{Group: "", Version: "v1", Resource: "persistentvolumeclaims"},
{Group: "", Version: "v1", Resource: "nodes"},
{Group: "", Version: "v1", Resource: "persistentvolumes"},
{Group: "", Version: "v1", Resource: "pods"},
}

// Event is a type of events that occur in the source cluster.
type Event int

const (
Add Event = iota
Update
)
// mandatoryFilterForCreating is FilteringFunctions that we must register for creating.
// We don't allow users to opt out them.
var mandatoryFilterForCreating = map[schema.GroupVersionResource]FilteringFunction{}

// mandatoryMutatingFunctions is MutatingFunctions that we must register.
// mandatoryMutateForCreating is MutatingFunctions that we must register for creating.
// We don't allow users to opt out them.
var mandatoryMutatingFunctions = map[schema.GroupVersionResource]MutatingFunction{
var mandatoryMutateForCreating = map[schema.GroupVersionResource]MutatingFunction{
{Group: "", Version: "v1", Resource: "persistentvolumes"}: mutatePV,
{Group: "", Version: "v1", Resource: "pods"}: mutatePods,
}

// mandatoryFilteringFunctions is FilteringFunctions that we must register.
// mandatoryFilterForUpdating is FilteringFunctions that we must register.
// We don't allow users to opt out them.
var mandatoryFilteringFunctions = map[schema.GroupVersionResource]FilteringFunction{
{Group: "", Version: "v1", Resource: "pods"}: filterPods,
var mandatoryFilterForUpdating = map[schema.GroupVersionResource]FilteringFunction{
{Group: "", Version: "v1", Resource: "pods"}: filterPodsForUpdating,
}

// FilteringFunction is a function that filters a resource.
// If it returns false, the resource will not be imported.
type FilteringFunction func(ctx context.Context, resource *unstructured.Unstructured, clients *Clients, event Event) (bool, error)

// MutatingFunction is a function that mutates a resource before importing it.
type MutatingFunction func(ctx context.Context, resource *unstructured.Unstructured, clients *Clients, event Event) (*unstructured.Unstructured, error)
// mandatoryMutateForUpdating is MutatingFunctions that we must register for updating.
// We don't allow users to opt out them.
var mandatoryMutateForUpdating = map[schema.GroupVersionResource]MutatingFunction{
{Group: "", Version: "v1", Resource: "persistentvolumes"}: mutatePV,
{Group: "", Version: "v1", Resource: "pods"}: mutatePods,
}

func mutatePV(ctx context.Context, resource *unstructured.Unstructured, clients *Clients, _ Event) (*unstructured.Unstructured, error) {
func mutatePV(ctx context.Context, resource *unstructured.Unstructured, clients *Clients) (*unstructured.Unstructured, error) {
var pv v1.PersistentVolume
err := runtime.DefaultUnstructuredConverter.FromUnstructured(resource.UnstructuredContent(), &pv)
if err != nil {
Expand All @@ -64,7 +46,7 @@ func mutatePV(ctx context.Context, resource *unstructured.Unstructured, clients
// PersistentVolumeClaims's UID is changed in a destination cluster when importing from a source cluster,
// and thus we need to update the PVC UID in the PersistentVolume.
// Get PVC of pv.Spec.ClaimRef.Name.
pvc, err := clients.SrcDynamicClient.Resource(schema.GroupVersionResource{
pvc, err := clients.DynamicClient.Resource(schema.GroupVersionResource{
Group: "",
Version: "v1",
Resource: "persistentvolumeclaims",
Expand All @@ -80,7 +62,7 @@ func mutatePV(ctx context.Context, resource *unstructured.Unstructured, clients
return &unstructured.Unstructured{Object: modifiedUnstructed}, err
}

func mutatePods(_ context.Context, resource *unstructured.Unstructured, _ *Clients, _ Event) (*unstructured.Unstructured, error) {
func mutatePods(_ context.Context, resource *unstructured.Unstructured, _ *Clients) (*unstructured.Unstructured, error) {
var pod v1.Pod
err := runtime.DefaultUnstructuredConverter.FromUnstructured(resource.UnstructuredContent(), &pod)
if err != nil {
Expand All @@ -100,12 +82,7 @@ func mutatePods(_ context.Context, resource *unstructured.Unstructured, _ *Clien

// filterPods checks if a pod is already scheduled when it's updated.
// We only want to update pods that are not yet scheduled.
func filterPods(_ context.Context, resource *unstructured.Unstructured, _ *Clients, event Event) (bool, error) {
if event == Add {
// We always add a Pod, regardless it's scheduled or not.
return true, nil
}

func filterPodsForUpdating(_ context.Context, resource *unstructured.Unstructured, _ *Clients) (bool, error) {
var pod v1.Pod
err := runtime.DefaultUnstructuredConverter.FromUnstructured(resource.UnstructuredContent(), &pod)
if err != nil {
Expand Down
Loading

0 comments on commit bc586b6

Please sign in to comment.