diff --git a/README.md b/README.md index a084df41..d2d7b344 100644 --- a/README.md +++ b/README.md @@ -91,6 +91,7 @@ you can refer to the [documentation](./simulator/docs/simulator-server-config.md See the following docs to know more about simulator: - [import-cluster-resources.md](./simulator/docs/import-cluster-resources.md): describes how you can import resources in your cluster to the simulator so that you can simulate scheduling based on your cluster's situation. +- [record-and-replay-cluster-changes.md](./simulator/docs/record-and-replay-cluster-changes.md): describes how you can record and replay the resources changes in the simulator. - [how-it-works.md](simulator/docs/how-it-works.md): describes about how the simulator works. - [kube-apiserver.md](simulator/docs/kube-apiserver.md): describe about kube-apiserver in simulator. (how you can configure and access) - [api.md](simulator/docs/api.md): describes about HTTP server the simulator has. (mainly for the webUI) diff --git a/simulator/cmd/recorder/kubeconfig.yaml b/simulator/cmd/recorder/kubeconfig.yaml new file mode 100644 index 00000000..b9a11780 --- /dev/null +++ b/simulator/cmd/recorder/kubeconfig.yaml @@ -0,0 +1,14 @@ +apiVersion: v1 +kind: Config + +clusters: + - cluster: + server: http://localhost:3132 + name: simulator + +contexts: + - context: + cluster: simulator + name: simulator + +current-context: simulator diff --git a/simulator/cmd/recorder/recorder.go b/simulator/cmd/recorder/recorder.go new file mode 100644 index 00000000..90fa751d --- /dev/null +++ b/simulator/cmd/recorder/recorder.go @@ -0,0 +1,90 @@ +package main + +import ( + "context" + "flag" + "os" + "os/signal" + "syscall" + "time" + + "golang.org/x/xerrors" + "k8s.io/client-go/dynamic" + "k8s.io/client-go/tools/clientcmd" + "k8s.io/klog" + + "sigs.k8s.io/kube-scheduler-simulator/simulator/recorder" +) + +var ( + recordDir string + kubeConfig string + timeout int +) + +func main() { + if err := startRecorder(); err != nil { + klog.Fatalf("failed with error on running simulator: %+v", err) + } +} + +func startRecorder() error { + if err := parseOptions(); err != nil { + return err + } + + restCfg, err := clientcmd.BuildConfigFromFlags("", kubeConfig) + if err != nil { + return xerrors.Errorf("load kubeconfig: %w", err) + } + + client := dynamic.NewForConfigOrDie(restCfg) + + recorderOptions := recorder.Options{RecordDir: recordDir} + recorder := recorder.New(client, recorderOptions) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + if timeout > 0 { + ctx, cancel = context.WithTimeout(ctx, time.Duration(timeout)*time.Second) + defer cancel() + } + + if err := recorder.Run(ctx); err != nil { + return xerrors.Errorf("run recorder: %w", err) + } + + // Block until signal is received + quit := make(chan os.Signal, 1) + signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM) + select { + case <-quit: + case <-ctx.Done(): + } + + return nil +} + +func parseOptions() error { + var ( + dirFlag = flag.String("dir", "", "directory to store the recorded resources") + kubeConfigFlag = flag.String("kubeconfig", "", "path to kubeconfig file") + timeoutFlag = flag.Int("timeout", 0, "timeout in seconds for the simulator to run") + ) + + flag.Parse() + + if *dirFlag == "" { + return xerrors.New("dir flag is required") + } + + if *kubeConfigFlag == "" { + return xerrors.New("kubeconfig flag is required") + } + + recordDir = *dirFlag + kubeConfig = *kubeConfigFlag + timeout = *timeoutFlag + + return nil +} diff --git a/simulator/cmd/simulator/simulator.go b/simulator/cmd/simulator/simulator.go index 3a5305c0..57bee7f2 100644 --- a/simulator/cmd/simulator/simulator.go +++ b/simulator/cmd/simulator/simulator.go @@ -20,6 +20,7 @@ import ( "k8s.io/klog/v2" "sigs.k8s.io/kube-scheduler-simulator/simulator/config" + "sigs.k8s.io/kube-scheduler-simulator/simulator/replayer" "sigs.k8s.io/kube-scheduler-simulator/simulator/resourceapplier" "sigs.k8s.io/kube-scheduler-simulator/simulator/server" "sigs.k8s.io/kube-scheduler-simulator/simulator/server/di" @@ -94,7 +95,10 @@ func startSimulator() error { return xerrors.Errorf("kubeapi-server is not ready: %w", err) } - dic, err := di.NewDIContainer(client, dynamicClient, restMapper, etcdclient, restCfg, cfg.InitialSchedulerCfg, cfg.ExternalImportEnabled, cfg.ResourceSyncEnabled, importClusterResourceClient, importClusterDynamicClient, cfg.Port, resourceapplier.Options{}) + replayerOptions := replayer.Options{RecordDir: cfg.RecordDirPath} + resourceApplierOptions := resourceapplier.Options{} + + dic, err := di.NewDIContainer(client, dynamicClient, restMapper, etcdclient, restCfg, cfg.InitialSchedulerCfg, cfg.ExternalImportEnabled, cfg.ResourceSyncEnabled, cfg.ReplayerEnabled, importClusterResourceClient, importClusterDynamicClient, cfg.Port, resourceApplierOptions, replayerOptions) if err != nil { return xerrors.Errorf("create di container: %w", err) } @@ -110,6 +114,13 @@ func startSimulator() error { } } + // If ReplayEnabled is enabled, the simulator replays the recorded resources. + if cfg.ReplayerEnabled { + if err := dic.ReplayService().Replay(ctx); err != nil { + return xerrors.Errorf("replay resources: %w", err) + } + } + dic.SchedulerService().SetSchedulerConfig(cfg.InitialSchedulerCfg) if cfg.ResourceSyncEnabled { diff --git a/simulator/config.yaml b/simulator/config.yaml index e2d141a9..6d11cb1c 100644 --- a/simulator/config.yaml +++ b/simulator/config.yaml @@ -19,8 +19,8 @@ etcdURL: "http://127.0.0.1:2379" corsAllowedOriginList: - "http://localhost:3000" -# This is for the beta feature "One-shot importing cluster's resources" -# and "Continuous syncing cluster's resources". +# This is for the beta feature "One-shot importing cluster's resources", +# "Continuous syncing cluster's resources" and "Replaying cluster's events". # This variable is used to find Kubeconfig required to access your # cluster for importing resources to scheduler simulator. kubeConfig: "/kubeconfig.yaml" @@ -38,12 +38,24 @@ kubeSchedulerConfigPath: "" # This variable indicates whether the simulator will # import resources from a user cluster specified by kubeConfig. # Note that it only imports the resources once when the simulator is started. -# You cannot make both externalImportEnabled and resourceSyncEnabled true because those features would be conflicted. +# You cannot make two or more of externalImportEnabled, resourceSyncEnabled and replayEnabled true +# because those features would be conflicted. # This is still a beta feature. externalImportEnabled: false # This variable indicates whether the simulator will # keep syncing resources from an user cluster's or not. -# You cannot make both externalImportEnabled and resourceSyncEnabled true because those features would be conflicted. +# You cannot make two or more of externalImportEnabled, resourceSyncEnabled and replayEnabled true +# because those features would be conflicted. # Note, this is still a beta feature. resourceSyncEnabled: false + +# This variable indicates whether the simulator will +# replay events recorded in the file specified by recordDirPath. +# You cannot make two or more of externalImportEnabled, resourceSyncEnabled and replayEnabled true +# because those features would be conflicted. +# Note, this is still a beta feature. +replayEnabled: false + +# The path to a directory where the record files are stored. +recordDirPath: "/record" diff --git a/simulator/config/config.go b/simulator/config/config.go index e80f3b9d..e82f1509 100644 --- a/simulator/config/config.go +++ b/simulator/config/config.go @@ -42,6 +42,10 @@ type Config struct { ResourceImportLabelSelector metav1.LabelSelector // ResourceSyncEnabled indicates whether the simulator will keep syncing resources from a target cluster. ResourceSyncEnabled bool + // ReplayerEnabled indicates whether the simulator will replay events recorded in a file. + ReplayerEnabled bool + // RecordDirPath is the path to the file where the simulator records events. + RecordDirPath string // ExternalKubeClientCfg is KubeConfig to get resources from external cluster. // This field should be set when ExternalImportEnabled == true or ResourceSyncEnabled == true. ExternalKubeClientCfg *rest.Config @@ -84,9 +88,11 @@ func NewConfig() (*Config, error) { externalimportenabled := getExternalImportEnabled() resourceSyncEnabled := getResourceSyncEnabled() + replayerEnabled := getReplayerEnabled() + recordDirPath := getRecordDirPath() externalKubeClientCfg := &rest.Config{} - if externalimportenabled && resourceSyncEnabled { - return nil, xerrors.Errorf("externalImportEnabled and resourceSyncEnabled cannot be used simultaneously.") + if hasTwoOrMoreTrue(externalimportenabled, resourceSyncEnabled, replayerEnabled) { + return nil, xerrors.Errorf("externalImportEnabled, resourceSyncEnabled and replayerEnabled cannot be used simultaneously.") } if externalimportenabled || resourceSyncEnabled { externalKubeClientCfg, err = clientcmd.BuildConfigFromFlags("", configYaml.KubeConfig) @@ -110,6 +116,8 @@ func NewConfig() (*Config, error) { ResourceImportLabelSelector: configYaml.ResourceImportLabelSelector, ExternalKubeClientCfg: externalKubeClientCfg, ResourceSyncEnabled: resourceSyncEnabled, + ReplayerEnabled: replayerEnabled, + RecordDirPath: recordDirPath, }, nil } @@ -272,6 +280,28 @@ func getResourceSyncEnabled() bool { return resourceSyncEnabled } +// getReplayerEnabled reads REPLAYER_ENABLED and converts it to bool +// if empty from the config file. +// This function will return `true` if `REPLAYER_ENABLED` is "1". +func getReplayerEnabled() bool { + replayerEnabledString := os.Getenv("REPLAYER_ENABLED") + if replayerEnabledString == "" { + replayerEnabledString = strconv.FormatBool(configYaml.ReplayerEnabled) + } + replayerEnabled, _ := strconv.ParseBool(replayerEnabledString) + return replayerEnabled +} + +// getRecordDirPath reads RECORD_DIR_PATH +// if empty from the config file. +func getRecordDirPath() string { + recordDirPath := os.Getenv("RECORD_DIR_PATH") + if recordDirPath == "" { + recordDirPath = configYaml.RecordDirPath + } + return recordDirPath +} + func decodeSchedulerCfg(buf []byte) (*configv1.KubeSchedulerConfiguration, error) { decoder := scheme.Codecs.UniversalDeserializer() obj, _, err := decoder.Decode(buf, nil, nil) @@ -299,3 +329,13 @@ func GetKubeClientConfig() (*rest.Config, error) { } return clientConfig, nil } + +func hasTwoOrMoreTrue(values ...bool) bool { + count := 0 + for _, v := range values { + if v { + count++ + } + } + return count >= 2 +} diff --git a/simulator/config/v1alpha1/types.go b/simulator/config/v1alpha1/types.go index fe010051..442e4ea3 100644 --- a/simulator/config/v1alpha1/types.go +++ b/simulator/config/v1alpha1/types.go @@ -67,6 +67,13 @@ type SimulatorConfiguration struct { // sync resources from an user cluster's or not. ResourceSyncEnabled bool `json:"resourceSyncEnabled,omitempty"` + // This variable indicates whether the simulator will + // replay events recorded in a file or not. + ReplayerEnabled bool `json:"replayEnabled,omitempty"` + + // The path to a directory where the record files are stored. + RecordDirPath string `json:"recordDirPath,omitempty"` + // This variable indicates whether an external scheduler // is used. ExternalSchedulerEnabled bool `json:"externalSchedulerEnabled,omitempty"` diff --git a/simulator/docs/import-cluster-resources.md b/simulator/docs/import-cluster-resources.md index 3e61df65..ef9d81cc 100644 --- a/simulator/docs/import-cluster-resources.md +++ b/simulator/docs/import-cluster-resources.md @@ -68,10 +68,10 @@ It imports the following resources, which the scheduler's default plugins take i If you need to, you can tweak which resources to import via the option in [/simulator/cmd/simulator/simulator.go](https://github.com/kubernetes-sigs/kube-scheduler-simulator/blob/master/simulator/cmd/simulator/simulator.go): ```go -dic, err := di.NewDIContainer(..., resourceapplier.Options{ - // GVRsToSync is a list of GroupVersionResource that will be synced. - // If GVRsToSync is nil, defaultGVRs are used. - GVRsToSync: []schema.GroupVersionResource{ +resourceApplierOptions := resourceapplier.Options{ + // GVRsToApply is a list of GroupVersionResource that will be synced. + // If GVRsToApply is nil, defaultGVRs are used. + GVRsToApply: []schema.GroupVersionResource{ {Group: "your-group", Version: "v1", Resource: "your-custom-resources"}, }, @@ -85,7 +85,7 @@ dic, err := di.NewDIContainer(..., resourceapplier.Options{ FilterBeforeUpdating: map[schema.GroupVersionResource][]resourceapplier.FilteringFunction{}, // MutateBeforeUpdating is a list of additional mutating functions that are applied before updating resources. MutateBeforeUpdating: map[schema.GroupVersionResource][]resourceapplier.MutatingFunction{}, -}) +} ``` > [!NOTE] diff --git a/simulator/docs/record-and-replay-cluster-changes.md b/simulator/docs/record-and-replay-cluster-changes.md new file mode 100644 index 00000000..668211a0 --- /dev/null +++ b/simulator/docs/record-and-replay-cluster-changes.md @@ -0,0 +1,91 @@ +# [Beta] Record your real cluster's changes in resources and replay them in the simulator + +You can record resource addition/update/deletion at your real cluster. This feature is useful for reproducing issues that occur in your real cluster. + +## Record changes + +To record changes from your real cluster, you need to follow these steps: + +1. Go to `simulator` directory. +2. Start the recorder by running `go run cmd/recorder/recorder.go --dir /path/to/directory-to-store-recorded-changes --kubeconfig /path/to/kubeconfig`. + +> [!NOTE] +> When a file already exists at the value of `--dir`, it will be overwritten. + +> [!NOTE] +> You can add `--timeout` option to set the timeout for the recorder. The value is in seconds. If not set, the recorder will run until it's stopped. + +### Resources to record + +It records the changes of the following resources: + +- Pods +- Nodes +- PersistentVolumes +- PersistentVolumeClaims +- StorageClasses + +You can tweak which resources to record via the option in [/simulator/cmd/recorder/recorder.go](https://github.com/kubernetes-sigs/kube-scheduler-simulator/blob/master/simulator/cmd/recorder/recorder.go): + +```go +recorderOptions := recorder.Options{RecordDir: recordDir, + // GVRs is a list of GroupVersionResource that will be recorded. + // If it's nil, DefaultGVRs are used. + GVRs: []schema.GroupVersionResource{ + {Group: "your-group", Version: "v1", Resource: "your-custom-resources"}, + }, +} +``` + +## Replay changes + +To replay the recorded changes in the simulator, you need to follow these steps: + +1. Set `true` to `replayerEnabled`. +2. Set the path of the directory where the changes are recorded to `recordDirPath`. +3. Make sure the directory is mounted to the simulator server container. + + +```yaml:config.yaml +replayerEnabled: true +recordDirPath: "/path/to/directory-to-store-recorded-changes" +``` + +```yaml:compose.yml +volumes: + ... + - ./path/to/directory-to-store-recorded-changes:/path/to/directory-to-store-recorded-changes +``` + +### Resources to replay + +It replays the changes of the following resources: + +- Pods +- Nodes +- PersistentVolumes +- PersistentVolumeClaims +- StorageClasses + +You can tweak which resources to replay via the option in [/simulator/cmd/simulator/simulator.go](https://github.com/kubernetes-sigs/kube-scheduler-simulator/blob/master/simulator/cmd/simulator/simulator.go): + +```go +resourceApplierOptions := resourceapplier.Options{ + // GVRsToApply is a list of GroupVersionResource that will be replayed. + // If GVRsToApply is nil, defaultGVRs are used. + GVRsToApply: []schema.GroupVersionResource{ + {Group: "your-group", Version: "v1", Resource: "your-custom-resources"}, + }, + + // Actually, more options are available... + + // FilterBeforeCreating is a list of additional filtering functions that are applied before creating resources. + FilterBeforeCreating: map[schema.GroupVersionResource][]resourceapplier.FilteringFunction{}, + // MutateBeforeCreating is a list of additional mutating functions that are applied before creating resources. + MutateBeforeCreating: map[schema.GroupVersionResource][]resourceapplier.MutatingFunction{}, + // FilterBeforeUpdating is a list of additional filtering functions that are applied before updating resources. + FilterBeforeUpdating: map[schema.GroupVersionResource][]resourceapplier.FilteringFunction{}, + // MutateBeforeUpdating is a list of additional mutating functions that are applied before updating resources. + MutateBeforeUpdating: map[schema.GroupVersionResource][]resourceapplier.MutatingFunction{}, +} +``` diff --git a/simulator/recorder/recorder.go b/simulator/recorder/recorder.go new file mode 100644 index 00000000..010f44c0 --- /dev/null +++ b/simulator/recorder/recorder.go @@ -0,0 +1,184 @@ +package recorder + +import ( + "context" + "encoding/json" + "fmt" + "os" + "path" + "time" + + "golang.org/x/xerrors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/client-go/dynamic" + "k8s.io/client-go/dynamic/dynamicinformer" + "k8s.io/client-go/tools/cache" + "k8s.io/klog" +) + +type Event string + +var ( + Add Event = "Add" + Update Event = "Update" + Delete Event = "Delete" +) + +const defaultBufferSize = 1000 + +type Service struct { + client dynamic.Interface + gvrs []schema.GroupVersionResource + path string + recordCh chan Record + bufferSize int +} + +type Record struct { + Time time.Time `json:"time"` + Event Event `json:"event"` + Resource unstructured.Unstructured `json:"resource"` +} + +var DefaultGVRs = []schema.GroupVersionResource{ + {Group: "", Version: "v1", Resource: "namespaces"}, + {Group: "scheduling.k8s.io", Version: "v1", Resource: "priorityclasses"}, + {Group: "storage.k8s.io", Version: "v1", Resource: "storageclasses"}, + {Group: "", Version: "v1", Resource: "persistentvolumeclaims"}, + {Group: "", Version: "v1", Resource: "nodes"}, + {Group: "", Version: "v1", Resource: "persistentvolumes"}, + {Group: "", Version: "v1", Resource: "pods"}, +} + +type Options struct { + GVRs []schema.GroupVersionResource + RecordDir string + BufferSize *int +} + +func New(client dynamic.Interface, options Options) *Service { + gvrs := DefaultGVRs + if options.GVRs != nil { + gvrs = options.GVRs + } + + bufferSize := defaultBufferSize + if options.BufferSize != nil { + bufferSize = *options.BufferSize + } + + return &Service{ + client: client, + gvrs: gvrs, + path: options.RecordDir, + recordCh: make(chan Record, bufferSize), + bufferSize: bufferSize, + } +} + +func (s *Service) Run(ctx context.Context) error { + infFact := dynamicinformer.NewFilteredDynamicSharedInformerFactory(s.client, 0, metav1.NamespaceAll, nil) + for _, gvr := range s.gvrs { + inf := infFact.ForResource(gvr).Informer() + _, err := inf.AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { s.RecordEvent(obj, Add) }, + UpdateFunc: func(_, obj interface{}) { s.RecordEvent(obj, Update) }, + DeleteFunc: func(obj interface{}) { s.RecordEvent(obj, Delete) }, + }) + if err != nil { + return xerrors.Errorf("failed to add event handler: %w", err) + } + infFact.Start(ctx.Done()) + infFact.WaitForCacheSync(ctx.Done()) + } + + err := os.MkdirAll(s.path, 0o755) + if err != nil { + return xerrors.Errorf("failed to create record directory: %w", err) + } + + go s.record(ctx) + + return nil +} + +func (s *Service) RecordEvent(obj interface{}, e Event) { + unstructObj, ok := obj.(*unstructured.Unstructured) + if !ok { + klog.Error("Failed to convert runtime.Object to *unstructured.Unstructured") + return + } + + r := Record{ + Event: e, + Time: time.Now(), + Resource: *unstructObj, + } + + s.recordCh <- r +} + +func (s *Service) record(ctx context.Context) { + records := make([]Record, 0, s.bufferSize) + count := 0 + writeRecord := func() { + defer func() { + count++ + records = make([]Record, 0, s.bufferSize) + }() + + filePath := path.Join(s.path, fmt.Sprintf("record-%018d.json", count)) + err := writeToFile(filePath, records) + if err != nil { + klog.Errorf("failed to write records to file: %v", err) + return + } + } + + for { + select { + case r := <-s.recordCh: + records = append(records, r) + if len(records) == s.bufferSize { + writeRecord() + } + + case <-ctx.Done(): + if len(records) > 0 { + writeRecord() + } + return + + default: + if len(records) > 0 { + writeRecord() + } + } + } +} + +func writeToFile(filePath string, records []Record) error { + file, err := os.Create(filePath) + if err != nil { + return xerrors.Errorf("failed to create record file: %w", err) + } + + b, err := json.Marshal(records) + if err != nil { + return xerrors.Errorf("failed to marshal records: %w", err) + } + + _, err = file.Write(b) + if err != nil { + return xerrors.Errorf("failed to write records: %w", err) + } + + err = file.Close() + if err != nil { + return xerrors.Errorf("failed to close file: %w", err) + } + + return nil +} diff --git a/simulator/recorder/recorder_test.go b/simulator/recorder/recorder_test.go new file mode 100644 index 00000000..3403cb6a --- /dev/null +++ b/simulator/recorder/recorder_test.go @@ -0,0 +1,400 @@ +package recorder + +import ( + "context" + "encoding/json" + "os" + "path" + "strings" + "testing" + "time" + + "github.com/google/go-cmp/cmp" + "golang.org/x/xerrors" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/util/wait" + dynamicFake "k8s.io/client-go/dynamic/fake" + "k8s.io/client-go/restmapper" + appsv1 "k8s.io/kubernetes/pkg/apis/apps/v1" + schedulingv1 "k8s.io/kubernetes/pkg/apis/scheduling/v1" + storagev1 "k8s.io/kubernetes/pkg/apis/storage/v1" +) + +func TestRecorder(t *testing.T) { + t.Parallel() + tests := []struct { + name string + resourceToCreate []unstructured.Unstructured + resourceToUpdate []unstructured.Unstructured + resourceToDelete []unstructured.Unstructured + want []Record + wantErr bool + }{ + { + name: "should record creating pods", + resourceToCreate: []unstructured.Unstructured{ + { + Object: map[string]interface{}{ + "apiVersion": "v1", + "kind": "Pod", + "metadata": map[string]interface{}{ + "name": "pod-1", + "namespace": "default", + }, + }, + }, + { + Object: map[string]interface{}{ + "apiVersion": "v1", + "kind": "Pod", + "metadata": map[string]interface{}{ + "name": "pod-2", + "namespace": "default", + }, + }, + }, + }, + want: []Record{ + { + Event: Add, + Resource: unstructured.Unstructured{ + Object: map[string]interface{}{ + "apiVersion": "v1", + "kind": "Pod", + "metadata": map[string]interface{}{ + "name": "pod-1", + "namespace": "default", + }, + }, + }, + }, + { + Event: Add, + Resource: unstructured.Unstructured{ + Object: map[string]interface{}{ + "apiVersion": "v1", + "kind": "Pod", + "metadata": map[string]interface{}{ + "name": "pod-2", + "namespace": "default", + }, + }, + }, + }, + }, + wantErr: false, + }, + { + name: "should record updating a pod", + resourceToCreate: []unstructured.Unstructured{ + { + Object: map[string]interface{}{ + "apiVersion": "v1", + "kind": "Pod", + "metadata": map[string]interface{}{ + "name": "pod-1", + "namespace": "default", + }, + "spec": map[string]interface{}{ + "containers": []interface{}{ + map[string]interface{}{ + "name": "nginx", + "image": "nginx:latest", + }, + }, + }, + }, + }, + }, + resourceToUpdate: []unstructured.Unstructured{ + { + Object: map[string]interface{}{ + "apiVersion": "v1", + "kind": "Pod", + "metadata": map[string]interface{}{ + "name": "pod-1", + "namespace": "default", + }, + "spec": map[string]interface{}{ + "containers": []interface{}{ + map[string]interface{}{ + "name": "nginx", + "image": "nginx:latest", + }, + }, + "nodeName": "node-1", + }, + }, + }, + }, + want: []Record{ + { + Event: Add, + Resource: unstructured.Unstructured{ + Object: map[string]interface{}{ + "apiVersion": "v1", + "kind": "Pod", + "metadata": map[string]interface{}{ + "name": "pod-1", + "namespace": "default", + }, + "spec": map[string]interface{}{ + "containers": []interface{}{ + map[string]interface{}{ + "name": "nginx", + "image": "nginx:latest", + }, + }, + }, + }, + }, + }, + { + Event: Update, + Resource: unstructured.Unstructured{ + Object: map[string]interface{}{ + "apiVersion": "v1", + "kind": "Pod", + "metadata": map[string]interface{}{ + "name": "pod-1", + "namespace": "default", + }, + "spec": map[string]interface{}{ + "containers": []interface{}{ + map[string]interface{}{ + "name": "nginx", + "image": "nginx:latest", + }, + }, + "nodeName": "node-1", + }, + }, + }, + }, + }, + wantErr: false, + }, + { + name: "should record deleting a pod", + resourceToCreate: []unstructured.Unstructured{ + { + Object: map[string]interface{}{ + "apiVersion": "v1", + "kind": "Pod", + "metadata": map[string]interface{}{ + "name": "pod-1", + "namespace": "default", + }, + }, + }, + }, + resourceToDelete: []unstructured.Unstructured{ + { + Object: map[string]interface{}{ + "apiVersion": "v1", + "kind": "Pod", + "metadata": map[string]interface{}{ + "name": "pod-1", + "namespace": "default", + }, + }, + }, + }, + want: []Record{ + { + Event: Add, + Resource: unstructured.Unstructured{ + Object: map[string]interface{}{ + "apiVersion": "v1", + "kind": "Pod", + "metadata": map[string]interface{}{ + "name": "pod-1", + "namespace": "default", + }, + }, + }, + }, + { + Event: Delete, + Resource: unstructured.Unstructured{ + Object: map[string]interface{}{ + "apiVersion": "v1", + "kind": "Pod", + "metadata": map[string]interface{}{ + "name": "pod-1", + "namespace": "default", + }, + }, + }, + }, + }, + wantErr: false, + }, + } + + for _, tt := range tests { + tt := tt + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + + dir := path.Join(t.TempDir(), strings.ReplaceAll(tt.name, " ", "_")) + defer os.RemoveAll(dir) + + s := runtime.NewScheme() + corev1.AddToScheme(s) + appsv1.AddToScheme(s) + schedulingv1.AddToScheme(s) + storagev1.AddToScheme(s) + client := dynamicFake.NewSimpleDynamicClient(s) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + service := New(client, Options{RecordDir: dir}) + err := service.Run(ctx) + if (err != nil) != tt.wantErr { + t.Errorf("Service.Record() error = %v, wantErr %v", err, tt.wantErr) + return + } + + err = apply(ctx, client, tt.resourceToCreate, tt.resourceToUpdate, tt.resourceToDelete) + if err != nil { + t.Fatal(err) + } + + err = assert(ctx, dir, tt.want) + if err != nil { + t.Fatal(err) + } + }) + } +} + +func apply(ctx context.Context, client *dynamicFake.FakeDynamicClient, resourceToCreate []unstructured.Unstructured, resourceToUpdate []unstructured.Unstructured, resourceToDelete []unstructured.Unstructured) error { + for _, resource := range resourceToCreate { + gvr, err := findGVR(resource) + if err != nil { + return xerrors.Errorf("failed to find GVR: %w", err) + } + ns := resource.GetNamespace() + + _, err = client.Resource(gvr).Namespace(ns).Create(ctx, &resource, metav1.CreateOptions{}) + if err != nil { + return xerrors.Errorf("failed to create a pod: %w", err) + } + } + + for _, resource := range resourceToUpdate { + gvr, err := findGVR(resource) + if err != nil { + return xerrors.Errorf("failed to find GVR: %w", err) + } + ns := resource.GetNamespace() + + _, err = client.Resource(gvr).Namespace(ns).Update(ctx, &resource, metav1.UpdateOptions{}) + if err != nil { + return xerrors.Errorf("failed to update a pod: %w", err) + } + } + + for _, resource := range resourceToDelete { + gvr, err := findGVR(resource) + if err != nil { + return xerrors.Errorf("failed to find GVR: %w", err) + } + ns := resource.GetNamespace() + + err = client.Resource(gvr).Namespace(ns).Delete(ctx, resource.GetName(), metav1.DeleteOptions{}) + if err != nil { + return xerrors.Errorf("failed to delete a pod: %w", err) + } + } + + return nil +} + +func assert(ctx context.Context, dirPath string, want []Record) error { + var finalErr error + wait.PollUntilContextTimeout(ctx, 100*time.Millisecond, 5*time.Second, false, func(context.Context) (bool, error) { + files, err := os.ReadDir(dirPath) + if err != nil { + finalErr = xerrors.Errorf("failed to read the record directory: %w", err) + return false, nil + } + + got := []Record{} + for _, file := range files { + if file.IsDir() { + continue + } + + b, err := os.ReadFile(path.Join(dirPath, file.Name())) + if err != nil { + finalErr = xerrors.Errorf("failed to read the record file: %w", err) + return false, nil + } + + var records []Record + err = json.Unmarshal(b, &records) + if err != nil { + finalErr = xerrors.Errorf("failed to unmarshal the records: %w", err) + return false, nil + } + + got = append(got, records...) + } + + if len(got) != len(want) { + finalErr = xerrors.Errorf("Service.Record() got = %v, want %v", got, want) + return false, nil + } + + for i := range got { + if got[i].Event != want[i].Event { + finalErr = xerrors.Errorf("Service.Record() got = %v, want %v", got[i].Event, want[i].Event) + return true, finalErr + } + + if diff := cmp.Diff(want[i].Resource, got[i].Resource); diff != "" { + finalErr = xerrors.Errorf("Service.Record() got = %v, want %v", got[i].Resource, want[i].Resource) + return true, finalErr + } + } + + return true, nil + }) + + return finalErr +} + +var ( + resources = []*restmapper.APIGroupResources{ + { + Group: metav1.APIGroup{ + Versions: []metav1.GroupVersionForDiscovery{ + {Version: "v1"}, + }, + }, + VersionedResources: map[string][]metav1.APIResource{ + "v1": { + {Name: "pods", Namespaced: true, Kind: "Pod"}, + }, + }, + }, + } + mapper = restmapper.NewDiscoveryRESTMapper(resources) +) + +func findGVR(obj unstructured.Unstructured) (schema.GroupVersionResource, error) { + gvk := obj.GroupVersionKind() + m, err := mapper.RESTMapping(gvk.GroupKind(), gvk.Version) + if err != nil { + return schema.GroupVersionResource{}, err + } + + return m.Resource, nil +} diff --git a/simulator/replayer/mock_resourceapplier/resourceapplier.go b/simulator/replayer/mock_resourceapplier/resourceapplier.go new file mode 100644 index 00000000..46f49821 --- /dev/null +++ b/simulator/replayer/mock_resourceapplier/resourceapplier.go @@ -0,0 +1,84 @@ +// Code generated by MockGen. DO NOT EDIT. +// Source: simulator/resourceapplier/resourceapplier.go +// +// Generated by this command: +// +// mockgen -source=simulator/resourceapplier/resourceapplier.go -destination=simulator/replayer/mock_resourceapplier/resourceapplier.go +// + +// Package mock_resourceapplier is a generated GoMock package. +package mock_resourceapplier + +import ( + context "context" + reflect "reflect" + + gomock "go.uber.org/mock/gomock" + unstructured "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" +) + +// MockResourceApplier is a mock of ResourceApplier interface. +type MockResourceApplier struct { + ctrl *gomock.Controller + recorder *MockResourceApplierMockRecorder + isgomock struct{} +} + +// MockResourceApplierMockRecorder is the mock recorder for MockResourceApplier. +type MockResourceApplierMockRecorder struct { + mock *MockResourceApplier +} + +// NewMockResourceApplier creates a new mock instance. +func NewMockResourceApplier(ctrl *gomock.Controller) *MockResourceApplier { + mock := &MockResourceApplier{ctrl: ctrl} + mock.recorder = &MockResourceApplierMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockResourceApplier) EXPECT() *MockResourceApplierMockRecorder { + return m.recorder +} + +// Create mocks base method. +func (m *MockResourceApplier) Create(ctx context.Context, resource *unstructured.Unstructured) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Create", ctx, resource) + ret0, _ := ret[0].(error) + return ret0 +} + +// Create indicates an expected call of Create. +func (mr *MockResourceApplierMockRecorder) Create(ctx, resource any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Create", reflect.TypeOf((*MockResourceApplier)(nil).Create), ctx, resource) +} + +// Delete mocks base method. +func (m *MockResourceApplier) Delete(ctx context.Context, resource *unstructured.Unstructured) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Delete", ctx, resource) + ret0, _ := ret[0].(error) + return ret0 +} + +// Delete indicates an expected call of Delete. +func (mr *MockResourceApplierMockRecorder) Delete(ctx, resource any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Delete", reflect.TypeOf((*MockResourceApplier)(nil).Delete), ctx, resource) +} + +// Update mocks base method. +func (m *MockResourceApplier) Update(ctx context.Context, resource *unstructured.Unstructured) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Update", ctx, resource) + ret0, _ := ret[0].(error) + return ret0 +} + +// Update indicates an expected call of Update. +func (mr *MockResourceApplierMockRecorder) Update(ctx, resource any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Update", reflect.TypeOf((*MockResourceApplier)(nil).Update), ctx, resource) +} diff --git a/simulator/replayer/replayer.go b/simulator/replayer/replayer.go new file mode 100644 index 00000000..d79e7b3b --- /dev/null +++ b/simulator/replayer/replayer.go @@ -0,0 +1,101 @@ +package replayer + +import ( + "context" + "encoding/json" + "os" + "path" + + "golang.org/x/xerrors" + "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/klog" + + "sigs.k8s.io/kube-scheduler-simulator/simulator/recorder" +) + +type Service struct { + applier ResourceApplier + recordDir string +} + +type ResourceApplier interface { + Create(ctx context.Context, resource *unstructured.Unstructured) error + Update(ctx context.Context, resource *unstructured.Unstructured) error + Delete(ctx context.Context, resource *unstructured.Unstructured) error +} + +type Options struct { + RecordDir string +} + +func New(applier ResourceApplier, options Options) *Service { + return &Service{applier: applier, recordDir: options.RecordDir} +} + +func (s *Service) Replay(ctx context.Context) error { + files, err := os.ReadDir(s.recordDir) + if err != nil { + return xerrors.Errorf("failed to read record directory: %w", err) + } + + for _, file := range files { + if file.IsDir() { + continue + } + + filePath := path.Join(s.recordDir, file.Name()) + if err := s.replayRecordsFromFile(ctx, filePath); err != nil { + return xerrors.Errorf("failed to replay records from file: %w", err) + } + } + + return nil +} + +func (s *Service) replayRecordsFromFile(ctx context.Context, path string) error { + records := []recorder.Record{} + + b, err := os.ReadFile(path) + if err != nil { + return err + } + + if err := json.Unmarshal(b, &records); err != nil { + klog.Warningf("failed to unmarshal records from file %s: %v", path, err) + return nil + } + + for _, record := range records { + if err := s.applyEvent(ctx, record); err != nil { + return xerrors.Errorf("failed to replay event: %w", err) + } + } + + return nil +} + +func (s *Service) applyEvent(ctx context.Context, record recorder.Record) error { + switch record.Event { + case recorder.Add: + if err := s.applier.Create(ctx, &record.Resource); err != nil { + if errors.IsAlreadyExists(err) { + klog.Warningf("resource already exists: %v", err) + } else { + return xerrors.Errorf("failed to create resource: %w", err) + } + } + case recorder.Update: + if err := s.applier.Update(ctx, &record.Resource); err != nil { + return xerrors.Errorf("failed to update resource: %w", err) + } + case recorder.Delete: + if err := s.applier.Delete(ctx, &record.Resource); err != nil { + return xerrors.Errorf("failed to delete resource: %w", err) + } + default: + return xerrors.Errorf("unknown event: %v", record.Event) + } + + return nil +} diff --git a/simulator/replayer/replayer_test.go b/simulator/replayer/replayer_test.go new file mode 100644 index 00000000..e2dcb095 --- /dev/null +++ b/simulator/replayer/replayer_test.go @@ -0,0 +1,142 @@ +package replayer + +import ( + "context" + "encoding/json" + "os" + "path" + "strings" + "testing" + + "go.uber.org/mock/gomock" + "golang.org/x/xerrors" + "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime/schema" + + "sigs.k8s.io/kube-scheduler-simulator/simulator/recorder" + "sigs.k8s.io/kube-scheduler-simulator/simulator/replayer/mock_resourceapplier" +) + +func TestService_Replay(t *testing.T) { + t.Parallel() + tests := []struct { + name string + records []recorder.Record + prepareMockFn func(*mock_resourceapplier.MockResourceApplier) + wantErr bool + }{ + { + name: "no error when Create is successful", + records: []recorder.Record{ + { + Event: recorder.Add, + Resource: unstructured.Unstructured{ + Object: map[string]interface{}{ + "apiVersion": "v1", + "kind": "Pod", + "metadata": map[string]interface{}{ + "name": "pod-1", + "namespace": "default", + }, + }, + }, + }, + }, + prepareMockFn: func(applier *mock_resourceapplier.MockResourceApplier) { + applier.EXPECT().Create(gomock.Any(), gomock.Any()).Return(nil) + }, + wantErr: false, + }, + { + name: "should return error if Create raise an error", + records: []recorder.Record{ + { + Event: recorder.Add, + Resource: unstructured.Unstructured{ + Object: map[string]interface{}{ + "apiVersion": "v1", + "kind": "Pod", + "metadata": map[string]interface{}{ + "name": "pod-1", + "namespace": "default", + }, + }, + }, + }, + }, + prepareMockFn: func(applier *mock_resourceapplier.MockResourceApplier) { + applier.EXPECT().Create(gomock.Any(), gomock.Any()).Return(xerrors.Errorf("failed to create resource")) + }, + wantErr: true, + }, + { + name: "ignore AlreadyExists error when Create raise an error", + records: []recorder.Record{ + { + Event: recorder.Add, + Resource: unstructured.Unstructured{ + Object: map[string]interface{}{ + "apiVersion": "v1", + "kind": "Pod", + "metadata": map[string]interface{}{ + "name": "pod-1", + "namespace": "default", + }, + }, + }, + }, + }, + prepareMockFn: func(applier *mock_resourceapplier.MockResourceApplier) { + applier.EXPECT().Create(gomock.Any(), gomock.Any()).Return(errors.NewAlreadyExists(schema.GroupResource{}, "resource already exists")) + }, + wantErr: false, + }, + } + + for _, tt := range tests { + tt := tt + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + mockApplier := mock_resourceapplier.NewMockResourceApplier(ctrl) + tt.prepareMockFn(mockApplier) + + recordDir := path.Join(os.TempDir(), strings.ReplaceAll(tt.name, " ", "_")) + filePath := path.Join(recordDir, "record.json") + err := os.MkdirAll(recordDir, 0o755) + if err != nil { + t.Fatalf("failed to create record directory: %v", err) + } + tempFile, err := os.Create(filePath) + if err != nil { + t.Fatalf("failed to create temp file: %v", err) + } + defer os.RemoveAll(recordDir) + + b, err := json.Marshal(tt.records) + if err != nil { + t.Fatalf("failed to marshal records: %v", err) + } + + _, err = tempFile.Write(b) + if err != nil { + t.Fatalf("failed to write records: %v", err) + } + + err = tempFile.Close() + if err != nil { + t.Fatalf("failed to close temp file: %v", err) + } + + service := New(mockApplier, Options{RecordDir: recordDir}) + + err = service.Replay(context.Background()) + if (err != nil) != tt.wantErr { + t.Errorf("Service.Replay() error = %v", err) + } + }) + } +} diff --git a/simulator/resourceapplier/resourceapplier.go b/simulator/resourceapplier/resourceapplier.go index 6c191e36..13dea0ad 100644 --- a/simulator/resourceapplier/resourceapplier.go +++ b/simulator/resourceapplier/resourceapplier.go @@ -26,7 +26,7 @@ type Clients struct { } type Options struct { - GVRsToSync []schema.GroupVersionResource + GVRsToApply []schema.GroupVersionResource FilterBeforeCreating map[schema.GroupVersionResource][]FilteringFunction MutateBeforeCreating map[schema.GroupVersionResource][]MutatingFunction FilterBeforeUpdating map[schema.GroupVersionResource][]FilteringFunction @@ -56,7 +56,7 @@ func New(dynamicClient dynamic.Interface, restMapper meta.RESTMapper, options Op filterBeforeUpdating: map[schema.GroupVersionResource][]FilteringFunction{}, mutateBeforeUpdating: map[schema.GroupVersionResource][]MutatingFunction{}, - GVRsToSync: options.GVRsToSync, + GVRsToSync: options.GVRsToApply, } for gvr, fn := range mandatoryFilterForCreating { diff --git a/simulator/server/di/di.go b/simulator/server/di/di.go index e46ebfd4..b09d6ecb 100644 --- a/simulator/server/di/di.go +++ b/simulator/server/di/di.go @@ -13,6 +13,7 @@ import ( configv1 "k8s.io/kube-scheduler/config/v1" "sigs.k8s.io/kube-scheduler-simulator/simulator/oneshotimporter" + "sigs.k8s.io/kube-scheduler-simulator/simulator/replayer" "sigs.k8s.io/kube-scheduler-simulator/simulator/reset" "sigs.k8s.io/kube-scheduler-simulator/simulator/resourceapplier" "sigs.k8s.io/kube-scheduler-simulator/simulator/resourcewatcher" @@ -29,6 +30,7 @@ type Container struct { oneshotClusterResourceImporter OneShotClusterResourceImporter resourceSyncer ResourceSyncer resourceWatcherService ResourceWatcherService + replayService ReplayService } // NewDIContainer initializes Container. @@ -43,10 +45,12 @@ func NewDIContainer( initialSchedulerCfg *configv1.KubeSchedulerConfiguration, externalImportEnabled bool, resourceSyncEnabled bool, + replayEnabled bool, externalClient clientset.Interface, externalDynamicClient dynamic.Interface, simulatorPort int, resourceapplierOptions resourceapplier.Options, + replayerOptions replayer.Options, ) (*Container, error) { c := &Container{} @@ -68,6 +72,9 @@ func NewDIContainer( c.resourceSyncer = syncer.New(externalDynamicClient, resourceApplierService) } c.resourceWatcherService = resourcewatcher.NewService(client) + if replayEnabled { + c.replayService = replayer.New(resourceApplierService, replayerOptions) + } return c, nil } @@ -98,6 +105,11 @@ func (c *Container) ResourceSyncer() ResourceSyncer { return c.resourceSyncer } +// ReplayService returns ReplayService. +func (c *Container) ReplayService() ReplayService { + return c.replayService +} + // ResourceWatcherService returns ResourceWatcherService. func (c *Container) ResourceWatcherService() ResourceWatcherService { return c.resourceWatcherService diff --git a/simulator/server/di/interface.go b/simulator/server/di/interface.go index 00150362..662c1491 100644 --- a/simulator/server/di/interface.go +++ b/simulator/server/di/interface.go @@ -34,18 +34,32 @@ type ResetService interface { Reset(ctx context.Context) error } -// OneShotClusterResourceImporter represents a service to import resources from an target cluster when starting the simulator. +// OneShotClusterResourceImporter represents a service to import resources from a target cluster when starting the simulator. type OneShotClusterResourceImporter interface { ImportClusterResources(ctx context.Context, labelSelector metav1.LabelSelector) error } -// ResourceSyncer represents a service to constantly sync resources from an target cluster. +// ResourceSyncer represents a service to constantly sync resources from a target cluster. type ResourceSyncer interface { // Run starts the resource syncer. // It should be run until the context is canceled. Run(ctx context.Context) error } +// RecorderService represents a service to record events in a target cluster. +type RecorderService interface { + // Run starts the recorder. + // It should be run until the context is canceled. + Run(ctx context.Context) error +} + +// ReplayService represents a service to replay events recorded in a file. +type ReplayService interface { + // Replay replays the recorded events. + // It should be run until the context is canceled. + Replay(ctx context.Context) error +} + // ResourceWatcherService represents service for watch k8s resources. type ResourceWatcherService interface { ListWatch(ctx context.Context, stream streamwriter.ResponseStream, lrVersions *resourcewatcher.LastResourceVersions) error