diff --git a/simulator/cmd/simulator/simulator.go b/simulator/cmd/simulator/simulator.go index 3a5305c05..f948decfb 100644 --- a/simulator/cmd/simulator/simulator.go +++ b/simulator/cmd/simulator/simulator.go @@ -20,6 +20,8 @@ import ( "k8s.io/klog/v2" "sigs.k8s.io/kube-scheduler-simulator/simulator/config" + "sigs.k8s.io/kube-scheduler-simulator/simulator/recorder" + "sigs.k8s.io/kube-scheduler-simulator/simulator/replayer" "sigs.k8s.io/kube-scheduler-simulator/simulator/resourceapplier" "sigs.k8s.io/kube-scheduler-simulator/simulator/server" "sigs.k8s.io/kube-scheduler-simulator/simulator/server/di" @@ -58,7 +60,7 @@ func startSimulator() error { importClusterResourceClient := &clientset.Clientset{} var importClusterDynamicClient dynamic.Interface - if cfg.ExternalImportEnabled || cfg.ResourceSyncEnabled { + if cfg.ExternalImportEnabled || cfg.ResourceSyncEnabled || cfg.RecorderEnabled { importClusterResourceClient, err = clientset.NewForConfig(cfg.ExternalKubeClientCfg) if err != nil { return xerrors.Errorf("creates a new Clientset for the ExternalKubeClientCfg: %w", err) @@ -94,7 +96,11 @@ func startSimulator() error { return xerrors.Errorf("kubeapi-server is not ready: %w", err) } - dic, err := di.NewDIContainer(client, dynamicClient, restMapper, etcdclient, restCfg, cfg.InitialSchedulerCfg, cfg.ExternalImportEnabled, cfg.ResourceSyncEnabled, importClusterResourceClient, importClusterDynamicClient, cfg.Port, resourceapplier.Options{}) + recorderOptions := recorder.Options{Path: cfg.RecordFilePath} + replayerOptions := replayer.Options{Path: cfg.RecordFilePath} + resourceApplierOptions := resourceapplier.Options{} + + dic, err := di.NewDIContainer(client, dynamicClient, restMapper, etcdclient, restCfg, cfg.InitialSchedulerCfg, cfg.ExternalImportEnabled, cfg.ResourceSyncEnabled, cfg.RecorderEnabled, cfg.ReplayerEnabled, importClusterResourceClient, importClusterDynamicClient, cfg.Port, resourceApplierOptions, recorderOptions, replayerOptions) if err != nil { return xerrors.Errorf("create di container: %w", err) } @@ -110,6 +116,13 @@ func startSimulator() error { } } + // If ReplayEnabled is enabled, the simulator replays the recorded resources. + if cfg.ReplayerEnabled { + if err := dic.ReplayService().Replay(ctx); err != nil { + return xerrors.Errorf("replay resources: %w", err) + } + } + dic.SchedulerService().SetSchedulerConfig(cfg.InitialSchedulerCfg) if cfg.ResourceSyncEnabled { @@ -119,6 +132,13 @@ func startSimulator() error { } } + if cfg.RecorderEnabled { + // Start the recorder to record events in the target cluster. + if err = dic.RecorderService().Run(ctx); err != nil { + return xerrors.Errorf("start recording: %w", err) + } + } + // start simulator server s := server.NewSimulatorServer(cfg, dic) shutdownFn, err := s.Start(cfg.Port) diff --git a/simulator/config.yaml b/simulator/config.yaml index e2d141a9e..7ed80002e 100644 --- a/simulator/config.yaml +++ b/simulator/config.yaml @@ -19,8 +19,8 @@ etcdURL: "http://127.0.0.1:2379" corsAllowedOriginList: - "http://localhost:3000" -# This is for the beta feature "One-shot importing cluster's resources" -# and "Continuous syncing cluster's resources". +# This is for the beta feature "One-shot importing cluster's resources", +# "Continuous syncing cluster's resources" and "Recording cluster's events". # This variable is used to find Kubeconfig required to access your # cluster for importing resources to scheduler simulator. kubeConfig: "/kubeconfig.yaml" @@ -38,12 +38,32 @@ kubeSchedulerConfigPath: "" # This variable indicates whether the simulator will # import resources from a user cluster specified by kubeConfig. # Note that it only imports the resources once when the simulator is started. -# You cannot make both externalImportEnabled and resourceSyncEnabled true because those features would be conflicted. +# You cannot make two or more of externalImportEnabled, resourceSyncEnabled and replayEnabled true +# because those features would be conflicted. # This is still a beta feature. externalImportEnabled: false # This variable indicates whether the simulator will # keep syncing resources from an user cluster's or not. -# You cannot make both externalImportEnabled and resourceSyncEnabled true because those features would be conflicted. +# You cannot make two or more of externalImportEnabled, resourceSyncEnabled and replayEnabled true +# because those features would be conflicted. # Note, this is still a beta feature. resourceSyncEnabled: false + +# This variable indicates whether the simulator will +# record events from a user cluster specified by kubeConfig. +# You cannot make both of recorderEnabled and replayEnabled true because those features would be conflicted. +# Note, this is still a beta feature. +recorderEnabled: false + +# This variable indicates whether the simulator will +# replay events recorded in the file specified by recordFilePath. +# You cannot make two or more of externalImportEnabled, resourceSyncEnabled and replayEnabled true +# because those features would be conflicted. +# You cannot make both of recorderEnabled and replayEnabled true because those features would be conflicted. +# Note, this is still a beta feature. +replayEnabled: true + +# The path to a file which the simulator will record +# events to or replay events from. +recordFilePath: "/record/record.json" diff --git a/simulator/config/config.go b/simulator/config/config.go index e80f3b9df..2d074200e 100644 --- a/simulator/config/config.go +++ b/simulator/config/config.go @@ -42,6 +42,12 @@ type Config struct { ResourceImportLabelSelector metav1.LabelSelector // ResourceSyncEnabled indicates whether the simulator will keep syncing resources from a target cluster. ResourceSyncEnabled bool + // RecorderEnabled indicates whether the simulator will record events from a target cluster. + RecorderEnabled bool + // ReplayerEnabled indicates whether the simulator will replay events recorded in a file. + ReplayerEnabled bool + // RecordFilePath is the path to the file where the simulator records events. + RecordFilePath string // ExternalKubeClientCfg is KubeConfig to get resources from external cluster. // This field should be set when ExternalImportEnabled == true or ResourceSyncEnabled == true. ExternalKubeClientCfg *rest.Config @@ -84,11 +90,17 @@ func NewConfig() (*Config, error) { externalimportenabled := getExternalImportEnabled() resourceSyncEnabled := getResourceSyncEnabled() + recorderEnabled := getRecorderEnabled() + replayerEnabled := getReplayerEnabled() + recordFilePath := getRecordFilePath() externalKubeClientCfg := &rest.Config{} - if externalimportenabled && resourceSyncEnabled { - return nil, xerrors.Errorf("externalImportEnabled and resourceSyncEnabled cannot be used simultaneously.") + if hasTwoOrMoreTrue(externalimportenabled, resourceSyncEnabled, replayerEnabled) { + return nil, xerrors.Errorf("externalImportEnabled, resourceSyncEnabled and replayerEnabled cannot be used simultaneously.") } - if externalimportenabled || resourceSyncEnabled { + if replayerEnabled && recorderEnabled { + return nil, xerrors.Errorf("recorderEnabled and replayerEnabled cannot be used simultaneously.") + } + if externalimportenabled || resourceSyncEnabled || recorderEnabled { externalKubeClientCfg, err = clientcmd.BuildConfigFromFlags("", configYaml.KubeConfig) if err != nil { return nil, xerrors.Errorf("get kube clientconfig: %w", err) @@ -110,6 +122,9 @@ func NewConfig() (*Config, error) { ResourceImportLabelSelector: configYaml.ResourceImportLabelSelector, ExternalKubeClientCfg: externalKubeClientCfg, ResourceSyncEnabled: resourceSyncEnabled, + RecorderEnabled: recorderEnabled, + ReplayerEnabled: replayerEnabled, + RecordFilePath: recordFilePath, }, nil } @@ -272,6 +287,40 @@ func getResourceSyncEnabled() bool { return resourceSyncEnabled } +// getRecorderEnabled reads RECORDER_ENABLED and converts it to bool +// if empty from the config file. +// This function will return `true` if `RECORDER_ENABLED` is "1". +func getRecorderEnabled() bool { + recorderEnabledString := os.Getenv("RECORDER_ENABLED") + if recorderEnabledString == "" { + recorderEnabledString = strconv.FormatBool(configYaml.RecorderEnabled) + } + recorderEnabled, _ := strconv.ParseBool(recorderEnabledString) + return recorderEnabled +} + +// getReplayerEnabled reads REPLAYER_ENABLED and converts it to bool +// if empty from the config file. +// This function will return `true` if `REPLAYER_ENABLED` is "1". +func getReplayerEnabled() bool { + replayerEnabledString := os.Getenv("REPLAYER_ENABLED") + if replayerEnabledString == "" { + replayerEnabledString = strconv.FormatBool(configYaml.ReplayerEnabled) + } + replayerEnabled, _ := strconv.ParseBool(replayerEnabledString) + return replayerEnabled +} + +// getRecordFilePath reads RECORD_FILE_PATH +// if empty from the config file. +func getRecordFilePath() string { + recordFilePath := os.Getenv("RECORD_FILE_PATH") + if recordFilePath == "" { + recordFilePath = configYaml.RecordFilePath + } + return recordFilePath +} + func decodeSchedulerCfg(buf []byte) (*configv1.KubeSchedulerConfiguration, error) { decoder := scheme.Codecs.UniversalDeserializer() obj, _, err := decoder.Decode(buf, nil, nil) @@ -299,3 +348,13 @@ func GetKubeClientConfig() (*rest.Config, error) { } return clientConfig, nil } + +func hasTwoOrMoreTrue(values ...bool) bool { + count := 0 + for _, v := range values { + if v { + count++ + } + } + return count >= 2 +} diff --git a/simulator/config/v1alpha1/types.go b/simulator/config/v1alpha1/types.go index fe0100510..55756d111 100644 --- a/simulator/config/v1alpha1/types.go +++ b/simulator/config/v1alpha1/types.go @@ -67,6 +67,18 @@ type SimulatorConfiguration struct { // sync resources from an user cluster's or not. ResourceSyncEnabled bool `json:"resourceSyncEnabled,omitempty"` + // This variable indicates whether the simulator will + // record events from an user cluster's or not. + RecorderEnabled bool `json:"recorderEnabled,omitempty"` + + // This variable indicates whether the simulator will + // replay events recorded in a file or not. + ReplayerEnabled bool `json:"replayEnabled,omitempty"` + + // The path to a file which the simulator will record + // events to or replay events from. + RecordFilePath string `json:"recordFilePath,omitempty"` + // This variable indicates whether an external scheduler // is used. ExternalSchedulerEnabled bool `json:"externalSchedulerEnabled,omitempty"` diff --git a/simulator/docs/import-cluster-resources.md b/simulator/docs/import-cluster-resources.md index 3e61df651..ef9d81cc3 100644 --- a/simulator/docs/import-cluster-resources.md +++ b/simulator/docs/import-cluster-resources.md @@ -68,10 +68,10 @@ It imports the following resources, which the scheduler's default plugins take i If you need to, you can tweak which resources to import via the option in [/simulator/cmd/simulator/simulator.go](https://github.com/kubernetes-sigs/kube-scheduler-simulator/blob/master/simulator/cmd/simulator/simulator.go): ```go -dic, err := di.NewDIContainer(..., resourceapplier.Options{ - // GVRsToSync is a list of GroupVersionResource that will be synced. - // If GVRsToSync is nil, defaultGVRs are used. - GVRsToSync: []schema.GroupVersionResource{ +resourceApplierOptions := resourceapplier.Options{ + // GVRsToApply is a list of GroupVersionResource that will be synced. + // If GVRsToApply is nil, defaultGVRs are used. + GVRsToApply: []schema.GroupVersionResource{ {Group: "your-group", Version: "v1", Resource: "your-custom-resources"}, }, @@ -85,7 +85,7 @@ dic, err := di.NewDIContainer(..., resourceapplier.Options{ FilterBeforeUpdating: map[schema.GroupVersionResource][]resourceapplier.FilteringFunction{}, // MutateBeforeUpdating is a list of additional mutating functions that are applied before updating resources. MutateBeforeUpdating: map[schema.GroupVersionResource][]resourceapplier.MutatingFunction{}, -}) +} ``` > [!NOTE] diff --git a/simulator/docs/record-and-replay-cluster-events.md b/simulator/docs/record-and-replay-cluster-events.md new file mode 100644 index 000000000..ff0356b63 --- /dev/null +++ b/simulator/docs/record-and-replay-cluster-events.md @@ -0,0 +1,95 @@ +# [Beta] Record your real cluster's events and replay them in the simulator + +You can record events from your real cluster and replay them in the simulator. This feature is useful for reproducing issues that occur in your real cluster. + +## Record events + +To record events from your real cluster, you need to follow these steps: + +1. Set `true` to `recorderEnabled`. +2. Set the path of the kubeconfig file for your cluster to `KubeConfig`. + - This feature only requires the read permission for events. +3. Set the path of the file to save the recorded events to `recordedFilePath`. +4. Make sure the file path is mounted to the simulator container. + +```yaml +recorderEnabled: true +kubeConfig: "/path/to/your-cluster-kubeconfig" +recordedFilePath: "/path/to/recorded-events.json" +``` + +```yaml +volumes: + ... + - ./path/to/recorded-events.json:/path/to/recorded-events.json +``` + +> [!NOTE] +> When a file already exists at `recordedFilePath`, it backs up the file in the same directory adding a timestamp to the filename and creates a new file for recording. + +### Resources to record + +It records the events of the following resources: + +- Pods +- Nodes +- PersistentVolumes +- PersistentVolumeClaims +- StorageClasses + +You can tweak which resources to record via the option in [/simulator/cmd/simulator/simulator.go](https://github.com/kubernetes-sigs/kube-scheduler-simulator/blob/master/simulator/cmd/simulator/simulator.go): + +```go +recorderOptions := recorder.Options{Path: cfg.RecordFilePath, +// GVRs is a list of GroupVersionResource that will be recorded. +// If it's nil, DefaultGVRs are used. + GVRs: []schema.GroupVersionResource{ + {Group: "your-group", Version: "v1", Resource: "your-custom-resources"}, + }, +} +``` + +## Replay events + +To replay the recorded events in the simulator, you need to follow these steps: + +1. Set `true` to `replayerEnabled`. +2. Set the path of the file where the events are recorded to `recordedFilePath`. + +```yaml +replayerEnabled: true +recordedFilePath: "/path/to/recorded-events.json" +``` + +### Resources to replay + +It replays the events of the following resources: + +- Pods +- Nodes +- PersistentVolumes +- PersistentVolumeClaims +- StorageClasses + +You can tweak which resources to replay via the option in [/simulator/cmd/simulator/simulator.go](https://github.com/kubernetes-sigs/kube-scheduler-simulator/blob/master/simulator/cmd/simulator/simulator.go): + +```go +resourceApplierOptions := resourceapplier.Options{ + // GVRsToApply is a list of GroupVersionResource that will be replayed. + // If GVRsToApply is nil, defaultGVRs are used. + GVRsToApply: []schema.GroupVersionResource{ + {Group: "your-group", Version: "v1", Resource: "your-custom-resources"}, + }, + + // Actually, more options are available... + + // FilterBeforeCreating is a list of additional filtering functions that are applied before creating resources. + FilterBeforeCreating: map[schema.GroupVersionResource][]resourceapplier.FilteringFunction{}, + // MutateBeforeCreating is a list of additional mutating functions that are applied before creating resources. + MutateBeforeCreating: map[schema.GroupVersionResource][]resourceapplier.MutatingFunction{}, + // FilterBeforeUpdating is a list of additional filtering functions that are applied before updating resources. + FilterBeforeUpdating: map[schema.GroupVersionResource][]resourceapplier.FilteringFunction{}, + // MutateBeforeUpdating is a list of additional mutating functions that are applied before updating resources. + MutateBeforeUpdating: map[schema.GroupVersionResource][]resourceapplier.MutatingFunction{}, +} +``` diff --git a/simulator/recorder/recorder.go b/simulator/recorder/recorder.go new file mode 100644 index 000000000..7425f131e --- /dev/null +++ b/simulator/recorder/recorder.go @@ -0,0 +1,193 @@ +package recorder + +import ( + "context" + "encoding/json" + "errors" + "io" + "os" + "path/filepath" + "time" + + "golang.org/x/xerrors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/client-go/dynamic" + "k8s.io/client-go/dynamic/dynamicinformer" + "k8s.io/client-go/tools/cache" + "k8s.io/klog" +) + +type Event string + +var ( + Add Event = "Add" + Update Event = "Update" + Delete Event = "Delete" +) + +type Service struct { + client dynamic.Interface + gvrs []schema.GroupVersionResource + path string + recordCh chan Record +} + +type Record struct { + Time time.Time `json:"time"` + Event Event `json:"event"` + Resource unstructured.Unstructured `json:"resource"` +} + +var DefaultGVRs = []schema.GroupVersionResource{ + {Group: "", Version: "v1", Resource: "namespaces"}, + {Group: "scheduling.k8s.io", Version: "v1", Resource: "priorityclasses"}, + {Group: "storage.k8s.io", Version: "v1", Resource: "storageclasses"}, + {Group: "", Version: "v1", Resource: "persistentvolumeclaims"}, + {Group: "", Version: "v1", Resource: "nodes"}, + {Group: "", Version: "v1", Resource: "persistentvolumes"}, + {Group: "", Version: "v1", Resource: "pods"}, +} + +type Options struct { + GVRs []schema.GroupVersionResource + Path string +} + +func New(client dynamic.Interface, options Options) *Service { + gvrs := DefaultGVRs + if options.GVRs != nil { + gvrs = options.GVRs + } + + return &Service{ + client: client, + gvrs: gvrs, + path: options.Path, + recordCh: make(chan Record), + } +} + +func (s *Service) Run(ctx context.Context) error { + infFact := dynamicinformer.NewFilteredDynamicSharedInformerFactory(s.client, 0, metav1.NamespaceAll, nil) + for _, gvr := range s.gvrs { + inf := infFact.ForResource(gvr).Informer() + _, err := inf.AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { s.RecordEvent(obj, Add) }, + UpdateFunc: func(_, obj interface{}) { s.RecordEvent(obj, Update) }, + DeleteFunc: func(obj interface{}) { s.RecordEvent(obj, Delete) }, + }) + if err != nil { + return xerrors.Errorf("failed to add event handler: %w", err) + } + infFact.Start(ctx.Done()) + infFact.WaitForCacheSync(ctx.Done()) + } + + err := s.backup() + if err != nil { + klog.Error("Failed to backup record: ", err) + } + + go s.record(ctx) + + return nil +} + +func (s *Service) RecordEvent(obj interface{}, e Event) { + unstructObj, ok := obj.(*unstructured.Unstructured) + if !ok { + klog.Error("Failed to convert runtime.Object to *unstructured.Unstructured") + return + } + + r := Record{ + Event: e, + Time: time.Now(), + Resource: *unstructObj, + } + + s.recordCh <- r +} + +func (s *Service) record(ctx context.Context) { + records := []Record{} + for { + select { + case r := <-s.recordCh: + records = append(records, r) + + tempFile, err := os.CreateTemp("", "record_temp.json") + if err != nil { + klog.Error("Failed to open file: ", err) + continue + } + + b, err := json.Marshal(records) + if err != nil { + klog.Error("Failed to marshal record: ", err) + continue + } + + _, err = tempFile.Write(b) + if err != nil { + klog.Error("Failed to write record: ", err) + continue + } + + tempFile.Close() + + if err = moveFile(tempFile.Name(), s.path); err != nil { + klog.Error("Failed to rename file: ", err) + continue + } + + case <-ctx.Done(): + return + + default: + time.Sleep(1 * time.Second) + } + } +} + +func (s *Service) backup() error { + f, err := os.Stat(s.path) + if err != nil { + if errors.Is(err, os.ErrNotExist) { + return nil + } + return xerrors.Errorf("failed to get file info: %w", err) + } + + if f.IsDir() { + return xerrors.New("record file is a directory") + } + + name := filepath.Base(s.path) + dir := filepath.Dir(s.path) + backupPath := filepath.Join(dir, f.ModTime().Format("2006-01-02_150405_")+name) + return moveFile(s.path, backupPath) +} + +func moveFile(srcPath, destPath string) error { + src, err := os.Open(srcPath) + if err != nil { + return err + } + defer src.Close() + + dest, err := os.Create(destPath) + if err != nil { + return err + } + defer dest.Close() + + _, err = io.Copy(dest, src) + if err != nil { + return err + } + + return os.Remove(srcPath) +} diff --git a/simulator/recorder/recorder_test.go b/simulator/recorder/recorder_test.go new file mode 100644 index 000000000..bcdd5d6ad --- /dev/null +++ b/simulator/recorder/recorder_test.go @@ -0,0 +1,386 @@ +package recorder + +import ( + "context" + "encoding/json" + "os" + "path" + "strings" + "testing" + "time" + + "github.com/google/go-cmp/cmp" + "golang.org/x/xerrors" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/util/wait" + dynamicFake "k8s.io/client-go/dynamic/fake" + "k8s.io/client-go/restmapper" + appsv1 "k8s.io/kubernetes/pkg/apis/apps/v1" + schedulingv1 "k8s.io/kubernetes/pkg/apis/scheduling/v1" + storagev1 "k8s.io/kubernetes/pkg/apis/storage/v1" +) + +func TestRecorder(t *testing.T) { + t.Parallel() + tests := []struct { + name string + resourceToCreate []unstructured.Unstructured + resourceToUpdate []unstructured.Unstructured + resourceToDelete []unstructured.Unstructured + want []Record + wantErr bool + }{ + { + name: "should record creating pods", + resourceToCreate: []unstructured.Unstructured{ + { + Object: map[string]interface{}{ + "apiVersion": "v1", + "kind": "Pod", + "metadata": map[string]interface{}{ + "name": "pod-1", + "namespace": "default", + }, + }, + }, + { + Object: map[string]interface{}{ + "apiVersion": "v1", + "kind": "Pod", + "metadata": map[string]interface{}{ + "name": "pod-2", + "namespace": "default", + }, + }, + }, + }, + want: []Record{ + { + Event: Add, + Resource: unstructured.Unstructured{ + Object: map[string]interface{}{ + "apiVersion": "v1", + "kind": "Pod", + "metadata": map[string]interface{}{ + "name": "pod-1", + "namespace": "default", + }, + }, + }, + }, + { + Event: Add, + Resource: unstructured.Unstructured{ + Object: map[string]interface{}{ + "apiVersion": "v1", + "kind": "Pod", + "metadata": map[string]interface{}{ + "name": "pod-2", + "namespace": "default", + }, + }, + }, + }, + }, + wantErr: false, + }, + { + name: "should record updating a pod", + resourceToCreate: []unstructured.Unstructured{ + { + Object: map[string]interface{}{ + "apiVersion": "v1", + "kind": "Pod", + "metadata": map[string]interface{}{ + "name": "pod-1", + "namespace": "default", + }, + "spec": map[string]interface{}{ + "containers": []interface{}{ + map[string]interface{}{ + "name": "nginx", + "image": "nginx:latest", + }, + }, + }, + }, + }, + }, + resourceToUpdate: []unstructured.Unstructured{ + { + Object: map[string]interface{}{ + "apiVersion": "v1", + "kind": "Pod", + "metadata": map[string]interface{}{ + "name": "pod-1", + "namespace": "default", + }, + "spec": map[string]interface{}{ + "containers": []interface{}{ + map[string]interface{}{ + "name": "nginx", + "image": "nginx:latest", + }, + }, + "nodeName": "node-1", + }, + }, + }, + }, + want: []Record{ + { + Event: Add, + Resource: unstructured.Unstructured{ + Object: map[string]interface{}{ + "apiVersion": "v1", + "kind": "Pod", + "metadata": map[string]interface{}{ + "name": "pod-1", + "namespace": "default", + }, + "spec": map[string]interface{}{ + "containers": []interface{}{ + map[string]interface{}{ + "name": "nginx", + "image": "nginx:latest", + }, + }, + }, + }, + }, + }, + { + Event: Update, + Resource: unstructured.Unstructured{ + Object: map[string]interface{}{ + "apiVersion": "v1", + "kind": "Pod", + "metadata": map[string]interface{}{ + "name": "pod-1", + "namespace": "default", + }, + "spec": map[string]interface{}{ + "containers": []interface{}{ + map[string]interface{}{ + "name": "nginx", + "image": "nginx:latest", + }, + }, + "nodeName": "node-1", + }, + }, + }, + }, + }, + wantErr: false, + }, + { + name: "should record deleting a pod", + resourceToCreate: []unstructured.Unstructured{ + { + Object: map[string]interface{}{ + "apiVersion": "v1", + "kind": "Pod", + "metadata": map[string]interface{}{ + "name": "pod-1", + "namespace": "default", + }, + }, + }, + }, + resourceToDelete: []unstructured.Unstructured{ + { + Object: map[string]interface{}{ + "apiVersion": "v1", + "kind": "Pod", + "metadata": map[string]interface{}{ + "name": "pod-1", + "namespace": "default", + }, + }, + }, + }, + want: []Record{ + { + Event: Add, + Resource: unstructured.Unstructured{ + Object: map[string]interface{}{ + "apiVersion": "v1", + "kind": "Pod", + "metadata": map[string]interface{}{ + "name": "pod-1", + "namespace": "default", + }, + }, + }, + }, + { + Event: Delete, + Resource: unstructured.Unstructured{ + Object: map[string]interface{}{ + "apiVersion": "v1", + "kind": "Pod", + "metadata": map[string]interface{}{ + "name": "pod-1", + "namespace": "default", + }, + }, + }, + }, + }, + wantErr: false, + }, + } + + for _, tt := range tests { + tt := tt + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + + tempPath := path.Join(os.TempDir(), strings.ReplaceAll(tt.name, " ", "_")+".json") + defer os.Remove(tempPath) + + s := runtime.NewScheme() + corev1.AddToScheme(s) + appsv1.AddToScheme(s) + schedulingv1.AddToScheme(s) + storagev1.AddToScheme(s) + client := dynamicFake.NewSimpleDynamicClient(s) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + service := New(client, Options{Path: tempPath}) + err := service.Run(ctx) + if (err != nil) != tt.wantErr { + t.Errorf("Service.Record() error = %v, wantErr %v", err, tt.wantErr) + return + } + + err = apply(ctx, client, tt.resourceToCreate, tt.resourceToUpdate, tt.resourceToDelete) + if err != nil { + t.Fatal(err) + } + + err = assert(ctx, tempPath, tt.want) + if err != nil { + t.Fatal(err) + } + }) + } +} + +func apply(ctx context.Context, client *dynamicFake.FakeDynamicClient, resourceToCreate []unstructured.Unstructured, resourceToUpdate []unstructured.Unstructured, resourceToDelete []unstructured.Unstructured) error { + for _, resource := range resourceToCreate { + gvr, err := findGVR(&resource) + if err != nil { + return xerrors.Errorf("failed to find GVR: %w", err) + } + ns := resource.GetNamespace() + + _, err = client.Resource(gvr).Namespace(ns).Create(ctx, &resource, metav1.CreateOptions{}) + if err != nil { + return xerrors.Errorf("failed to create a pod: %w", err) + } + } + + for _, resource := range resourceToUpdate { + gvr, err := findGVR(&resource) + if err != nil { + return xerrors.Errorf("failed to find GVR: %w", err) + } + ns := resource.GetNamespace() + + _, err = client.Resource(gvr).Namespace(ns).Update(ctx, &resource, metav1.UpdateOptions{}) + if err != nil { + return xerrors.Errorf("failed to update a pod: %w", err) + } + } + + for _, resource := range resourceToDelete { + gvr, err := findGVR(&resource) + if err != nil { + return xerrors.Errorf("failed to find GVR: %w", err) + } + ns := resource.GetNamespace() + + err = client.Resource(gvr).Namespace(ns).Delete(ctx, resource.GetName(), metav1.DeleteOptions{}) + if err != nil { + return xerrors.Errorf("failed to delete a pod: %w", err) + } + } + + return nil +} + +func assert(ctx context.Context, filePath string, want []Record) error { + var finalErr error + err := wait.PollUntilContextTimeout(ctx, 100*time.Millisecond, 5*time.Second, false, func(context.Context) (bool, error) { + got := []Record{} + var b []byte + b, err := os.ReadFile(filePath) + if err != nil { + finalErr = xerrors.Errorf("failed to read the records: %w", err) + return false, nil + } + + err = json.Unmarshal(b, &got) + if err != nil { + finalErr = xerrors.Errorf("failed to unmarshal the records: %w", err) + return false, nil + } + + if len(got) != len(want) { + finalErr = xerrors.Errorf("Service.Record() got = %v, want %v", got, want) + return false, nil + } + + for i := range got { + if got[i].Event != want[i].Event { + finalErr = xerrors.Errorf("Service.Record() got = %v, want %v", got[i].Event, want[i].Event) + return true, finalErr + } + + if diff := cmp.Diff(want[i].Resource, got[i].Resource); diff != "" { + finalErr = xerrors.Errorf("Service.Record() got = %v, want %v", got[i].Resource, want[i].Resource) + return true, finalErr + } + } + + return true, nil + }) + + return err +} + +var ( + resources = []*restmapper.APIGroupResources{ + { + Group: metav1.APIGroup{ + Versions: []metav1.GroupVersionForDiscovery{ + {Version: "v1"}, + }, + }, + VersionedResources: map[string][]metav1.APIResource{ + "v1": { + {Name: "pods", Namespaced: true, Kind: "Pod"}, + }, + }, + }, + } + mapper = restmapper.NewDiscoveryRESTMapper(resources) +) + +func findGVR(obj *unstructured.Unstructured) (schema.GroupVersionResource, error) { + gvk := obj.GroupVersionKind() + m, err := mapper.RESTMapping(gvk.GroupKind(), gvk.Version) + if err != nil { + return schema.GroupVersionResource{}, err + } + + return m.Resource, nil +} diff --git a/simulator/replayer/mock_resourceapplier/resourceapplier.go b/simulator/replayer/mock_resourceapplier/resourceapplier.go new file mode 100644 index 000000000..46f498219 --- /dev/null +++ b/simulator/replayer/mock_resourceapplier/resourceapplier.go @@ -0,0 +1,84 @@ +// Code generated by MockGen. DO NOT EDIT. +// Source: simulator/resourceapplier/resourceapplier.go +// +// Generated by this command: +// +// mockgen -source=simulator/resourceapplier/resourceapplier.go -destination=simulator/replayer/mock_resourceapplier/resourceapplier.go +// + +// Package mock_resourceapplier is a generated GoMock package. +package mock_resourceapplier + +import ( + context "context" + reflect "reflect" + + gomock "go.uber.org/mock/gomock" + unstructured "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" +) + +// MockResourceApplier is a mock of ResourceApplier interface. +type MockResourceApplier struct { + ctrl *gomock.Controller + recorder *MockResourceApplierMockRecorder + isgomock struct{} +} + +// MockResourceApplierMockRecorder is the mock recorder for MockResourceApplier. +type MockResourceApplierMockRecorder struct { + mock *MockResourceApplier +} + +// NewMockResourceApplier creates a new mock instance. +func NewMockResourceApplier(ctrl *gomock.Controller) *MockResourceApplier { + mock := &MockResourceApplier{ctrl: ctrl} + mock.recorder = &MockResourceApplierMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockResourceApplier) EXPECT() *MockResourceApplierMockRecorder { + return m.recorder +} + +// Create mocks base method. +func (m *MockResourceApplier) Create(ctx context.Context, resource *unstructured.Unstructured) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Create", ctx, resource) + ret0, _ := ret[0].(error) + return ret0 +} + +// Create indicates an expected call of Create. +func (mr *MockResourceApplierMockRecorder) Create(ctx, resource any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Create", reflect.TypeOf((*MockResourceApplier)(nil).Create), ctx, resource) +} + +// Delete mocks base method. +func (m *MockResourceApplier) Delete(ctx context.Context, resource *unstructured.Unstructured) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Delete", ctx, resource) + ret0, _ := ret[0].(error) + return ret0 +} + +// Delete indicates an expected call of Delete. +func (mr *MockResourceApplierMockRecorder) Delete(ctx, resource any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Delete", reflect.TypeOf((*MockResourceApplier)(nil).Delete), ctx, resource) +} + +// Update mocks base method. +func (m *MockResourceApplier) Update(ctx context.Context, resource *unstructured.Unstructured) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Update", ctx, resource) + ret0, _ := ret[0].(error) + return ret0 +} + +// Update indicates an expected call of Update. +func (mr *MockResourceApplierMockRecorder) Update(ctx, resource any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Update", reflect.TypeOf((*MockResourceApplier)(nil).Update), ctx, resource) +} diff --git a/simulator/replayer/replayer.go b/simulator/replayer/replayer.go new file mode 100644 index 000000000..11e0dbb1d --- /dev/null +++ b/simulator/replayer/replayer.go @@ -0,0 +1,79 @@ +package replayer + +import ( + "context" + "encoding/json" + "os" + + "golang.org/x/xerrors" + "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/klog" + + "sigs.k8s.io/kube-scheduler-simulator/simulator/recorder" +) + +type Service struct { + applier ResourceApplier + path string +} + +type ResourceApplier interface { + Create(ctx context.Context, resource *unstructured.Unstructured) error + Update(ctx context.Context, resource *unstructured.Unstructured) error + Delete(ctx context.Context, resource *unstructured.Unstructured) error +} + +type Options struct { + Path string +} + +func New(applier ResourceApplier, options Options) *Service { + return &Service{applier: applier, path: options.Path} +} + +func (s *Service) Replay(ctx context.Context) error { + records := []recorder.Record{} + + b, err := os.ReadFile(s.path) + if err != nil { + return err + } + + if err := json.Unmarshal(b, &records); err != nil { + return err + } + + for _, record := range records { + if err := s.applyEvent(ctx, record); err != nil { + return xerrors.Errorf("failed to replay event: %w", err) + } + } + + return nil +} + +func (s *Service) applyEvent(ctx context.Context, record recorder.Record) error { + switch record.Event { + case recorder.Add: + if err := s.applier.Create(ctx, &record.Resource); err != nil { + if errors.IsAlreadyExists(err) { + klog.Warningf("resource already exists: %v", err) + } else { + return xerrors.Errorf("failed to create resource: %w", err) + } + } + case recorder.Update: + if err := s.applier.Update(ctx, &record.Resource); err != nil { + return xerrors.Errorf("failed to update resource: %w", err) + } + case recorder.Delete: + if err := s.applier.Delete(ctx, &record.Resource); err != nil { + return xerrors.Errorf("failed to delete resource: %w", err) + } + default: + return xerrors.Errorf("unknown event: %v", record.Event) + } + + return nil +} diff --git a/simulator/replayer/replayer_test.go b/simulator/replayer/replayer_test.go new file mode 100644 index 000000000..6f449984c --- /dev/null +++ b/simulator/replayer/replayer_test.go @@ -0,0 +1,134 @@ +package replayer + +import ( + "context" + "encoding/json" + "os" + "testing" + + "go.uber.org/mock/gomock" + "golang.org/x/xerrors" + "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime/schema" + + "sigs.k8s.io/kube-scheduler-simulator/simulator/recorder" + "sigs.k8s.io/kube-scheduler-simulator/simulator/replayer/mock_resourceapplier" +) + +func TestService_Replay(t *testing.T) { + t.Parallel() + tests := []struct { + name string + records []recorder.Record + prepareMockFn func(*mock_resourceapplier.MockResourceApplier) + wantErr bool + }{ + { + name: "no error when Create is successful", + records: []recorder.Record{ + { + Event: recorder.Add, + Resource: unstructured.Unstructured{ + Object: map[string]interface{}{ + "apiVersion": "v1", + "kind": "Pod", + "metadata": map[string]interface{}{ + "name": "pod-1", + "namespace": "default", + }, + }, + }, + }, + }, + prepareMockFn: func(applier *mock_resourceapplier.MockResourceApplier) { + applier.EXPECT().Create(gomock.Any(), gomock.Any()).Return(nil) + }, + wantErr: false, + }, + { + name: "should return error if Create raise an error", + records: []recorder.Record{ + { + Event: recorder.Add, + Resource: unstructured.Unstructured{ + Object: map[string]interface{}{ + "apiVersion": "v1", + "kind": "Pod", + "metadata": map[string]interface{}{ + "name": "pod-1", + "namespace": "default", + }, + }, + }, + }, + }, + prepareMockFn: func(applier *mock_resourceapplier.MockResourceApplier) { + applier.EXPECT().Create(gomock.Any(), gomock.Any()).Return(xerrors.Errorf("failed to create resource")) + }, + wantErr: true, + }, + { + name: "ignore AlreadyExists error when Create raise an error", + records: []recorder.Record{ + { + Event: recorder.Add, + Resource: unstructured.Unstructured{ + Object: map[string]interface{}{ + "apiVersion": "v1", + "kind": "Pod", + "metadata": map[string]interface{}{ + "name": "pod-1", + "namespace": "default", + }, + }, + }, + }, + }, + prepareMockFn: func(applier *mock_resourceapplier.MockResourceApplier) { + applier.EXPECT().Create(gomock.Any(), gomock.Any()).Return(errors.NewAlreadyExists(schema.GroupResource{}, "resource already exists")) + }, + wantErr: false, + }, + } + + for _, tt := range tests { + tt := tt + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + mockApplier := mock_resourceapplier.NewMockResourceApplier(ctrl) + tt.prepareMockFn(mockApplier) + + tempFile, err := os.CreateTemp("", "record.json") + if err != nil { + t.Fatalf("failed to create temp file: %v", err) + } + defer os.Remove(tempFile.Name()) + + b, err := json.Marshal(tt.records) + if err != nil { + t.Fatalf("failed to marshal records: %v", err) + } + + _, err = tempFile.Write(b) + if err != nil { + t.Fatalf("failed to write records: %v", err) + } + + err = tempFile.Close() + if err != nil { + t.Fatalf("failed to close temp file: %v", err) + } + + service := New(mockApplier, Options{tempFile.Name()}) + + err = service.Replay(context.Background()) + if (err != nil) != tt.wantErr { + t.Errorf("Service.Replay() error = %v", err) + } + }) + } +} diff --git a/simulator/resourceapplier/resourceapplier.go b/simulator/resourceapplier/resourceapplier.go index 6c191e36c..13dea0ad0 100644 --- a/simulator/resourceapplier/resourceapplier.go +++ b/simulator/resourceapplier/resourceapplier.go @@ -26,7 +26,7 @@ type Clients struct { } type Options struct { - GVRsToSync []schema.GroupVersionResource + GVRsToApply []schema.GroupVersionResource FilterBeforeCreating map[schema.GroupVersionResource][]FilteringFunction MutateBeforeCreating map[schema.GroupVersionResource][]MutatingFunction FilterBeforeUpdating map[schema.GroupVersionResource][]FilteringFunction @@ -56,7 +56,7 @@ func New(dynamicClient dynamic.Interface, restMapper meta.RESTMapper, options Op filterBeforeUpdating: map[schema.GroupVersionResource][]FilteringFunction{}, mutateBeforeUpdating: map[schema.GroupVersionResource][]MutatingFunction{}, - GVRsToSync: options.GVRsToSync, + GVRsToSync: options.GVRsToApply, } for gvr, fn := range mandatoryFilterForCreating { diff --git a/simulator/server/di/di.go b/simulator/server/di/di.go index e46ebfd43..d44d4656d 100644 --- a/simulator/server/di/di.go +++ b/simulator/server/di/di.go @@ -13,6 +13,8 @@ import ( configv1 "k8s.io/kube-scheduler/config/v1" "sigs.k8s.io/kube-scheduler-simulator/simulator/oneshotimporter" + "sigs.k8s.io/kube-scheduler-simulator/simulator/recorder" + "sigs.k8s.io/kube-scheduler-simulator/simulator/replayer" "sigs.k8s.io/kube-scheduler-simulator/simulator/reset" "sigs.k8s.io/kube-scheduler-simulator/simulator/resourceapplier" "sigs.k8s.io/kube-scheduler-simulator/simulator/resourcewatcher" @@ -29,6 +31,8 @@ type Container struct { oneshotClusterResourceImporter OneShotClusterResourceImporter resourceSyncer ResourceSyncer resourceWatcherService ResourceWatcherService + recorderSerivce RecorderService + replayService ReplayService } // NewDIContainer initializes Container. @@ -43,10 +47,14 @@ func NewDIContainer( initialSchedulerCfg *configv1.KubeSchedulerConfiguration, externalImportEnabled bool, resourceSyncEnabled bool, + recordEnabled bool, + replayEnabled bool, externalClient clientset.Interface, externalDynamicClient dynamic.Interface, simulatorPort int, resourceapplierOptions resourceapplier.Options, + recorderOptions recorder.Options, + replayerOptions replayer.Options, ) (*Container, error) { c := &Container{} @@ -68,6 +76,12 @@ func NewDIContainer( c.resourceSyncer = syncer.New(externalDynamicClient, resourceApplierService) } c.resourceWatcherService = resourcewatcher.NewService(client) + if recordEnabled { + c.recorderSerivce = recorder.New(externalDynamicClient, recorderOptions) + } + if replayEnabled { + c.replayService = replayer.New(resourceApplierService, replayerOptions) + } return c, nil } @@ -98,6 +112,16 @@ func (c *Container) ResourceSyncer() ResourceSyncer { return c.resourceSyncer } +// RecorderService returns RecorderService. +func (c *Container) RecorderService() RecorderService { + return c.recorderSerivce +} + +// ReplayService returns ReplayService. +func (c *Container) ReplayService() ReplayService { + return c.replayService +} + // ResourceWatcherService returns ResourceWatcherService. func (c *Container) ResourceWatcherService() ResourceWatcherService { return c.resourceWatcherService diff --git a/simulator/server/di/interface.go b/simulator/server/di/interface.go index 00150362d..662c14912 100644 --- a/simulator/server/di/interface.go +++ b/simulator/server/di/interface.go @@ -34,18 +34,32 @@ type ResetService interface { Reset(ctx context.Context) error } -// OneShotClusterResourceImporter represents a service to import resources from an target cluster when starting the simulator. +// OneShotClusterResourceImporter represents a service to import resources from a target cluster when starting the simulator. type OneShotClusterResourceImporter interface { ImportClusterResources(ctx context.Context, labelSelector metav1.LabelSelector) error } -// ResourceSyncer represents a service to constantly sync resources from an target cluster. +// ResourceSyncer represents a service to constantly sync resources from a target cluster. type ResourceSyncer interface { // Run starts the resource syncer. // It should be run until the context is canceled. Run(ctx context.Context) error } +// RecorderService represents a service to record events in a target cluster. +type RecorderService interface { + // Run starts the recorder. + // It should be run until the context is canceled. + Run(ctx context.Context) error +} + +// ReplayService represents a service to replay events recorded in a file. +type ReplayService interface { + // Replay replays the recorded events. + // It should be run until the context is canceled. + Replay(ctx context.Context) error +} + // ResourceWatcherService represents service for watch k8s resources. type ResourceWatcherService interface { ListWatch(ctx context.Context, stream streamwriter.ResponseStream, lrVersions *resourcewatcher.LastResourceVersions) error