From 5c4133d900306e0c7fe9fec3b87df566d8dbe16b Mon Sep 17 00:00:00 2001 From: "@saza-ku" Date: Mon, 16 Dec 2024 17:44:15 +0900 Subject: [PATCH 01/12] implement recorder and replayer --- simulator/cmd/simulator/simulator.go | 24 +- simulator/config.yaml | 28 +- simulator/config/config.go | 65 ++- simulator/config/v1alpha1/types.go | 12 + simulator/docs/import-cluster-resources.md | 10 +- .../docs/record-and-replay-cluster-events.md | 95 +++++ simulator/recorder/recorder.go | 193 +++++++++ simulator/recorder/recorder_test.go | 386 ++++++++++++++++++ .../mock_resourceapplier/resourceapplier.go | 84 ++++ simulator/replayer/replayer.go | 79 ++++ simulator/replayer/replayer_test.go | 134 ++++++ simulator/resourceapplier/resourceapplier.go | 4 +- simulator/server/di/di.go | 24 ++ simulator/server/di/interface.go | 18 +- 14 files changed, 1138 insertions(+), 18 deletions(-) create mode 100644 simulator/docs/record-and-replay-cluster-events.md create mode 100644 simulator/recorder/recorder.go create mode 100644 simulator/recorder/recorder_test.go create mode 100644 simulator/replayer/mock_resourceapplier/resourceapplier.go create mode 100644 simulator/replayer/replayer.go create mode 100644 simulator/replayer/replayer_test.go diff --git a/simulator/cmd/simulator/simulator.go b/simulator/cmd/simulator/simulator.go index 3a5305c05..f948decfb 100644 --- a/simulator/cmd/simulator/simulator.go +++ b/simulator/cmd/simulator/simulator.go @@ -20,6 +20,8 @@ import ( "k8s.io/klog/v2" "sigs.k8s.io/kube-scheduler-simulator/simulator/config" + "sigs.k8s.io/kube-scheduler-simulator/simulator/recorder" + "sigs.k8s.io/kube-scheduler-simulator/simulator/replayer" "sigs.k8s.io/kube-scheduler-simulator/simulator/resourceapplier" "sigs.k8s.io/kube-scheduler-simulator/simulator/server" "sigs.k8s.io/kube-scheduler-simulator/simulator/server/di" @@ -58,7 +60,7 @@ func startSimulator() error { importClusterResourceClient := &clientset.Clientset{} var importClusterDynamicClient dynamic.Interface - if cfg.ExternalImportEnabled || cfg.ResourceSyncEnabled { + if cfg.ExternalImportEnabled || cfg.ResourceSyncEnabled || cfg.RecorderEnabled { importClusterResourceClient, err = clientset.NewForConfig(cfg.ExternalKubeClientCfg) if err != nil { return xerrors.Errorf("creates a new Clientset for the ExternalKubeClientCfg: %w", err) @@ -94,7 +96,11 @@ func startSimulator() error { return xerrors.Errorf("kubeapi-server is not ready: %w", err) } - dic, err := di.NewDIContainer(client, dynamicClient, restMapper, etcdclient, restCfg, cfg.InitialSchedulerCfg, cfg.ExternalImportEnabled, cfg.ResourceSyncEnabled, importClusterResourceClient, importClusterDynamicClient, cfg.Port, resourceapplier.Options{}) + recorderOptions := recorder.Options{Path: cfg.RecordFilePath} + replayerOptions := replayer.Options{Path: cfg.RecordFilePath} + resourceApplierOptions := resourceapplier.Options{} + + dic, err := di.NewDIContainer(client, dynamicClient, restMapper, etcdclient, restCfg, cfg.InitialSchedulerCfg, cfg.ExternalImportEnabled, cfg.ResourceSyncEnabled, cfg.RecorderEnabled, cfg.ReplayerEnabled, importClusterResourceClient, importClusterDynamicClient, cfg.Port, resourceApplierOptions, recorderOptions, replayerOptions) if err != nil { return xerrors.Errorf("create di container: %w", err) } @@ -110,6 +116,13 @@ func startSimulator() error { } } + // If ReplayEnabled is enabled, the simulator replays the recorded resources. + if cfg.ReplayerEnabled { + if err := dic.ReplayService().Replay(ctx); err != nil { + return xerrors.Errorf("replay resources: %w", err) + } + } + dic.SchedulerService().SetSchedulerConfig(cfg.InitialSchedulerCfg) if cfg.ResourceSyncEnabled { @@ -119,6 +132,13 @@ func startSimulator() error { } } + if cfg.RecorderEnabled { + // Start the recorder to record events in the target cluster. + if err = dic.RecorderService().Run(ctx); err != nil { + return xerrors.Errorf("start recording: %w", err) + } + } + // start simulator server s := server.NewSimulatorServer(cfg, dic) shutdownFn, err := s.Start(cfg.Port) diff --git a/simulator/config.yaml b/simulator/config.yaml index e2d141a9e..7ed80002e 100644 --- a/simulator/config.yaml +++ b/simulator/config.yaml @@ -19,8 +19,8 @@ etcdURL: "http://127.0.0.1:2379" corsAllowedOriginList: - "http://localhost:3000" -# This is for the beta feature "One-shot importing cluster's resources" -# and "Continuous syncing cluster's resources". +# This is for the beta feature "One-shot importing cluster's resources", +# "Continuous syncing cluster's resources" and "Recording cluster's events". # This variable is used to find Kubeconfig required to access your # cluster for importing resources to scheduler simulator. kubeConfig: "/kubeconfig.yaml" @@ -38,12 +38,32 @@ kubeSchedulerConfigPath: "" # This variable indicates whether the simulator will # import resources from a user cluster specified by kubeConfig. # Note that it only imports the resources once when the simulator is started. -# You cannot make both externalImportEnabled and resourceSyncEnabled true because those features would be conflicted. +# You cannot make two or more of externalImportEnabled, resourceSyncEnabled and replayEnabled true +# because those features would be conflicted. # This is still a beta feature. externalImportEnabled: false # This variable indicates whether the simulator will # keep syncing resources from an user cluster's or not. -# You cannot make both externalImportEnabled and resourceSyncEnabled true because those features would be conflicted. +# You cannot make two or more of externalImportEnabled, resourceSyncEnabled and replayEnabled true +# because those features would be conflicted. # Note, this is still a beta feature. resourceSyncEnabled: false + +# This variable indicates whether the simulator will +# record events from a user cluster specified by kubeConfig. +# You cannot make both of recorderEnabled and replayEnabled true because those features would be conflicted. +# Note, this is still a beta feature. +recorderEnabled: false + +# This variable indicates whether the simulator will +# replay events recorded in the file specified by recordFilePath. +# You cannot make two or more of externalImportEnabled, resourceSyncEnabled and replayEnabled true +# because those features would be conflicted. +# You cannot make both of recorderEnabled and replayEnabled true because those features would be conflicted. +# Note, this is still a beta feature. +replayEnabled: true + +# The path to a file which the simulator will record +# events to or replay events from. +recordFilePath: "/record/record.json" diff --git a/simulator/config/config.go b/simulator/config/config.go index e80f3b9df..2d074200e 100644 --- a/simulator/config/config.go +++ b/simulator/config/config.go @@ -42,6 +42,12 @@ type Config struct { ResourceImportLabelSelector metav1.LabelSelector // ResourceSyncEnabled indicates whether the simulator will keep syncing resources from a target cluster. ResourceSyncEnabled bool + // RecorderEnabled indicates whether the simulator will record events from a target cluster. + RecorderEnabled bool + // ReplayerEnabled indicates whether the simulator will replay events recorded in a file. + ReplayerEnabled bool + // RecordFilePath is the path to the file where the simulator records events. + RecordFilePath string // ExternalKubeClientCfg is KubeConfig to get resources from external cluster. // This field should be set when ExternalImportEnabled == true or ResourceSyncEnabled == true. ExternalKubeClientCfg *rest.Config @@ -84,11 +90,17 @@ func NewConfig() (*Config, error) { externalimportenabled := getExternalImportEnabled() resourceSyncEnabled := getResourceSyncEnabled() + recorderEnabled := getRecorderEnabled() + replayerEnabled := getReplayerEnabled() + recordFilePath := getRecordFilePath() externalKubeClientCfg := &rest.Config{} - if externalimportenabled && resourceSyncEnabled { - return nil, xerrors.Errorf("externalImportEnabled and resourceSyncEnabled cannot be used simultaneously.") + if hasTwoOrMoreTrue(externalimportenabled, resourceSyncEnabled, replayerEnabled) { + return nil, xerrors.Errorf("externalImportEnabled, resourceSyncEnabled and replayerEnabled cannot be used simultaneously.") } - if externalimportenabled || resourceSyncEnabled { + if replayerEnabled && recorderEnabled { + return nil, xerrors.Errorf("recorderEnabled and replayerEnabled cannot be used simultaneously.") + } + if externalimportenabled || resourceSyncEnabled || recorderEnabled { externalKubeClientCfg, err = clientcmd.BuildConfigFromFlags("", configYaml.KubeConfig) if err != nil { return nil, xerrors.Errorf("get kube clientconfig: %w", err) @@ -110,6 +122,9 @@ func NewConfig() (*Config, error) { ResourceImportLabelSelector: configYaml.ResourceImportLabelSelector, ExternalKubeClientCfg: externalKubeClientCfg, ResourceSyncEnabled: resourceSyncEnabled, + RecorderEnabled: recorderEnabled, + ReplayerEnabled: replayerEnabled, + RecordFilePath: recordFilePath, }, nil } @@ -272,6 +287,40 @@ func getResourceSyncEnabled() bool { return resourceSyncEnabled } +// getRecorderEnabled reads RECORDER_ENABLED and converts it to bool +// if empty from the config file. +// This function will return `true` if `RECORDER_ENABLED` is "1". +func getRecorderEnabled() bool { + recorderEnabledString := os.Getenv("RECORDER_ENABLED") + if recorderEnabledString == "" { + recorderEnabledString = strconv.FormatBool(configYaml.RecorderEnabled) + } + recorderEnabled, _ := strconv.ParseBool(recorderEnabledString) + return recorderEnabled +} + +// getReplayerEnabled reads REPLAYER_ENABLED and converts it to bool +// if empty from the config file. +// This function will return `true` if `REPLAYER_ENABLED` is "1". +func getReplayerEnabled() bool { + replayerEnabledString := os.Getenv("REPLAYER_ENABLED") + if replayerEnabledString == "" { + replayerEnabledString = strconv.FormatBool(configYaml.ReplayerEnabled) + } + replayerEnabled, _ := strconv.ParseBool(replayerEnabledString) + return replayerEnabled +} + +// getRecordFilePath reads RECORD_FILE_PATH +// if empty from the config file. +func getRecordFilePath() string { + recordFilePath := os.Getenv("RECORD_FILE_PATH") + if recordFilePath == "" { + recordFilePath = configYaml.RecordFilePath + } + return recordFilePath +} + func decodeSchedulerCfg(buf []byte) (*configv1.KubeSchedulerConfiguration, error) { decoder := scheme.Codecs.UniversalDeserializer() obj, _, err := decoder.Decode(buf, nil, nil) @@ -299,3 +348,13 @@ func GetKubeClientConfig() (*rest.Config, error) { } return clientConfig, nil } + +func hasTwoOrMoreTrue(values ...bool) bool { + count := 0 + for _, v := range values { + if v { + count++ + } + } + return count >= 2 +} diff --git a/simulator/config/v1alpha1/types.go b/simulator/config/v1alpha1/types.go index fe0100510..55756d111 100644 --- a/simulator/config/v1alpha1/types.go +++ b/simulator/config/v1alpha1/types.go @@ -67,6 +67,18 @@ type SimulatorConfiguration struct { // sync resources from an user cluster's or not. ResourceSyncEnabled bool `json:"resourceSyncEnabled,omitempty"` + // This variable indicates whether the simulator will + // record events from an user cluster's or not. + RecorderEnabled bool `json:"recorderEnabled,omitempty"` + + // This variable indicates whether the simulator will + // replay events recorded in a file or not. + ReplayerEnabled bool `json:"replayEnabled,omitempty"` + + // The path to a file which the simulator will record + // events to or replay events from. + RecordFilePath string `json:"recordFilePath,omitempty"` + // This variable indicates whether an external scheduler // is used. ExternalSchedulerEnabled bool `json:"externalSchedulerEnabled,omitempty"` diff --git a/simulator/docs/import-cluster-resources.md b/simulator/docs/import-cluster-resources.md index 3e61df651..ef9d81cc3 100644 --- a/simulator/docs/import-cluster-resources.md +++ b/simulator/docs/import-cluster-resources.md @@ -68,10 +68,10 @@ It imports the following resources, which the scheduler's default plugins take i If you need to, you can tweak which resources to import via the option in [/simulator/cmd/simulator/simulator.go](https://github.com/kubernetes-sigs/kube-scheduler-simulator/blob/master/simulator/cmd/simulator/simulator.go): ```go -dic, err := di.NewDIContainer(..., resourceapplier.Options{ - // GVRsToSync is a list of GroupVersionResource that will be synced. - // If GVRsToSync is nil, defaultGVRs are used. - GVRsToSync: []schema.GroupVersionResource{ +resourceApplierOptions := resourceapplier.Options{ + // GVRsToApply is a list of GroupVersionResource that will be synced. + // If GVRsToApply is nil, defaultGVRs are used. + GVRsToApply: []schema.GroupVersionResource{ {Group: "your-group", Version: "v1", Resource: "your-custom-resources"}, }, @@ -85,7 +85,7 @@ dic, err := di.NewDIContainer(..., resourceapplier.Options{ FilterBeforeUpdating: map[schema.GroupVersionResource][]resourceapplier.FilteringFunction{}, // MutateBeforeUpdating is a list of additional mutating functions that are applied before updating resources. MutateBeforeUpdating: map[schema.GroupVersionResource][]resourceapplier.MutatingFunction{}, -}) +} ``` > [!NOTE] diff --git a/simulator/docs/record-and-replay-cluster-events.md b/simulator/docs/record-and-replay-cluster-events.md new file mode 100644 index 000000000..ff0356b63 --- /dev/null +++ b/simulator/docs/record-and-replay-cluster-events.md @@ -0,0 +1,95 @@ +# [Beta] Record your real cluster's events and replay them in the simulator + +You can record events from your real cluster and replay them in the simulator. This feature is useful for reproducing issues that occur in your real cluster. + +## Record events + +To record events from your real cluster, you need to follow these steps: + +1. Set `true` to `recorderEnabled`. +2. Set the path of the kubeconfig file for your cluster to `KubeConfig`. + - This feature only requires the read permission for events. +3. Set the path of the file to save the recorded events to `recordedFilePath`. +4. Make sure the file path is mounted to the simulator container. + +```yaml +recorderEnabled: true +kubeConfig: "/path/to/your-cluster-kubeconfig" +recordedFilePath: "/path/to/recorded-events.json" +``` + +```yaml +volumes: + ... + - ./path/to/recorded-events.json:/path/to/recorded-events.json +``` + +> [!NOTE] +> When a file already exists at `recordedFilePath`, it backs up the file in the same directory adding a timestamp to the filename and creates a new file for recording. + +### Resources to record + +It records the events of the following resources: + +- Pods +- Nodes +- PersistentVolumes +- PersistentVolumeClaims +- StorageClasses + +You can tweak which resources to record via the option in [/simulator/cmd/simulator/simulator.go](https://github.com/kubernetes-sigs/kube-scheduler-simulator/blob/master/simulator/cmd/simulator/simulator.go): + +```go +recorderOptions := recorder.Options{Path: cfg.RecordFilePath, +// GVRs is a list of GroupVersionResource that will be recorded. +// If it's nil, DefaultGVRs are used. + GVRs: []schema.GroupVersionResource{ + {Group: "your-group", Version: "v1", Resource: "your-custom-resources"}, + }, +} +``` + +## Replay events + +To replay the recorded events in the simulator, you need to follow these steps: + +1. Set `true` to `replayerEnabled`. +2. Set the path of the file where the events are recorded to `recordedFilePath`. + +```yaml +replayerEnabled: true +recordedFilePath: "/path/to/recorded-events.json" +``` + +### Resources to replay + +It replays the events of the following resources: + +- Pods +- Nodes +- PersistentVolumes +- PersistentVolumeClaims +- StorageClasses + +You can tweak which resources to replay via the option in [/simulator/cmd/simulator/simulator.go](https://github.com/kubernetes-sigs/kube-scheduler-simulator/blob/master/simulator/cmd/simulator/simulator.go): + +```go +resourceApplierOptions := resourceapplier.Options{ + // GVRsToApply is a list of GroupVersionResource that will be replayed. + // If GVRsToApply is nil, defaultGVRs are used. + GVRsToApply: []schema.GroupVersionResource{ + {Group: "your-group", Version: "v1", Resource: "your-custom-resources"}, + }, + + // Actually, more options are available... + + // FilterBeforeCreating is a list of additional filtering functions that are applied before creating resources. + FilterBeforeCreating: map[schema.GroupVersionResource][]resourceapplier.FilteringFunction{}, + // MutateBeforeCreating is a list of additional mutating functions that are applied before creating resources. + MutateBeforeCreating: map[schema.GroupVersionResource][]resourceapplier.MutatingFunction{}, + // FilterBeforeUpdating is a list of additional filtering functions that are applied before updating resources. + FilterBeforeUpdating: map[schema.GroupVersionResource][]resourceapplier.FilteringFunction{}, + // MutateBeforeUpdating is a list of additional mutating functions that are applied before updating resources. + MutateBeforeUpdating: map[schema.GroupVersionResource][]resourceapplier.MutatingFunction{}, +} +``` diff --git a/simulator/recorder/recorder.go b/simulator/recorder/recorder.go new file mode 100644 index 000000000..7425f131e --- /dev/null +++ b/simulator/recorder/recorder.go @@ -0,0 +1,193 @@ +package recorder + +import ( + "context" + "encoding/json" + "errors" + "io" + "os" + "path/filepath" + "time" + + "golang.org/x/xerrors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/client-go/dynamic" + "k8s.io/client-go/dynamic/dynamicinformer" + "k8s.io/client-go/tools/cache" + "k8s.io/klog" +) + +type Event string + +var ( + Add Event = "Add" + Update Event = "Update" + Delete Event = "Delete" +) + +type Service struct { + client dynamic.Interface + gvrs []schema.GroupVersionResource + path string + recordCh chan Record +} + +type Record struct { + Time time.Time `json:"time"` + Event Event `json:"event"` + Resource unstructured.Unstructured `json:"resource"` +} + +var DefaultGVRs = []schema.GroupVersionResource{ + {Group: "", Version: "v1", Resource: "namespaces"}, + {Group: "scheduling.k8s.io", Version: "v1", Resource: "priorityclasses"}, + {Group: "storage.k8s.io", Version: "v1", Resource: "storageclasses"}, + {Group: "", Version: "v1", Resource: "persistentvolumeclaims"}, + {Group: "", Version: "v1", Resource: "nodes"}, + {Group: "", Version: "v1", Resource: "persistentvolumes"}, + {Group: "", Version: "v1", Resource: "pods"}, +} + +type Options struct { + GVRs []schema.GroupVersionResource + Path string +} + +func New(client dynamic.Interface, options Options) *Service { + gvrs := DefaultGVRs + if options.GVRs != nil { + gvrs = options.GVRs + } + + return &Service{ + client: client, + gvrs: gvrs, + path: options.Path, + recordCh: make(chan Record), + } +} + +func (s *Service) Run(ctx context.Context) error { + infFact := dynamicinformer.NewFilteredDynamicSharedInformerFactory(s.client, 0, metav1.NamespaceAll, nil) + for _, gvr := range s.gvrs { + inf := infFact.ForResource(gvr).Informer() + _, err := inf.AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { s.RecordEvent(obj, Add) }, + UpdateFunc: func(_, obj interface{}) { s.RecordEvent(obj, Update) }, + DeleteFunc: func(obj interface{}) { s.RecordEvent(obj, Delete) }, + }) + if err != nil { + return xerrors.Errorf("failed to add event handler: %w", err) + } + infFact.Start(ctx.Done()) + infFact.WaitForCacheSync(ctx.Done()) + } + + err := s.backup() + if err != nil { + klog.Error("Failed to backup record: ", err) + } + + go s.record(ctx) + + return nil +} + +func (s *Service) RecordEvent(obj interface{}, e Event) { + unstructObj, ok := obj.(*unstructured.Unstructured) + if !ok { + klog.Error("Failed to convert runtime.Object to *unstructured.Unstructured") + return + } + + r := Record{ + Event: e, + Time: time.Now(), + Resource: *unstructObj, + } + + s.recordCh <- r +} + +func (s *Service) record(ctx context.Context) { + records := []Record{} + for { + select { + case r := <-s.recordCh: + records = append(records, r) + + tempFile, err := os.CreateTemp("", "record_temp.json") + if err != nil { + klog.Error("Failed to open file: ", err) + continue + } + + b, err := json.Marshal(records) + if err != nil { + klog.Error("Failed to marshal record: ", err) + continue + } + + _, err = tempFile.Write(b) + if err != nil { + klog.Error("Failed to write record: ", err) + continue + } + + tempFile.Close() + + if err = moveFile(tempFile.Name(), s.path); err != nil { + klog.Error("Failed to rename file: ", err) + continue + } + + case <-ctx.Done(): + return + + default: + time.Sleep(1 * time.Second) + } + } +} + +func (s *Service) backup() error { + f, err := os.Stat(s.path) + if err != nil { + if errors.Is(err, os.ErrNotExist) { + return nil + } + return xerrors.Errorf("failed to get file info: %w", err) + } + + if f.IsDir() { + return xerrors.New("record file is a directory") + } + + name := filepath.Base(s.path) + dir := filepath.Dir(s.path) + backupPath := filepath.Join(dir, f.ModTime().Format("2006-01-02_150405_")+name) + return moveFile(s.path, backupPath) +} + +func moveFile(srcPath, destPath string) error { + src, err := os.Open(srcPath) + if err != nil { + return err + } + defer src.Close() + + dest, err := os.Create(destPath) + if err != nil { + return err + } + defer dest.Close() + + _, err = io.Copy(dest, src) + if err != nil { + return err + } + + return os.Remove(srcPath) +} diff --git a/simulator/recorder/recorder_test.go b/simulator/recorder/recorder_test.go new file mode 100644 index 000000000..bcdd5d6ad --- /dev/null +++ b/simulator/recorder/recorder_test.go @@ -0,0 +1,386 @@ +package recorder + +import ( + "context" + "encoding/json" + "os" + "path" + "strings" + "testing" + "time" + + "github.com/google/go-cmp/cmp" + "golang.org/x/xerrors" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/util/wait" + dynamicFake "k8s.io/client-go/dynamic/fake" + "k8s.io/client-go/restmapper" + appsv1 "k8s.io/kubernetes/pkg/apis/apps/v1" + schedulingv1 "k8s.io/kubernetes/pkg/apis/scheduling/v1" + storagev1 "k8s.io/kubernetes/pkg/apis/storage/v1" +) + +func TestRecorder(t *testing.T) { + t.Parallel() + tests := []struct { + name string + resourceToCreate []unstructured.Unstructured + resourceToUpdate []unstructured.Unstructured + resourceToDelete []unstructured.Unstructured + want []Record + wantErr bool + }{ + { + name: "should record creating pods", + resourceToCreate: []unstructured.Unstructured{ + { + Object: map[string]interface{}{ + "apiVersion": "v1", + "kind": "Pod", + "metadata": map[string]interface{}{ + "name": "pod-1", + "namespace": "default", + }, + }, + }, + { + Object: map[string]interface{}{ + "apiVersion": "v1", + "kind": "Pod", + "metadata": map[string]interface{}{ + "name": "pod-2", + "namespace": "default", + }, + }, + }, + }, + want: []Record{ + { + Event: Add, + Resource: unstructured.Unstructured{ + Object: map[string]interface{}{ + "apiVersion": "v1", + "kind": "Pod", + "metadata": map[string]interface{}{ + "name": "pod-1", + "namespace": "default", + }, + }, + }, + }, + { + Event: Add, + Resource: unstructured.Unstructured{ + Object: map[string]interface{}{ + "apiVersion": "v1", + "kind": "Pod", + "metadata": map[string]interface{}{ + "name": "pod-2", + "namespace": "default", + }, + }, + }, + }, + }, + wantErr: false, + }, + { + name: "should record updating a pod", + resourceToCreate: []unstructured.Unstructured{ + { + Object: map[string]interface{}{ + "apiVersion": "v1", + "kind": "Pod", + "metadata": map[string]interface{}{ + "name": "pod-1", + "namespace": "default", + }, + "spec": map[string]interface{}{ + "containers": []interface{}{ + map[string]interface{}{ + "name": "nginx", + "image": "nginx:latest", + }, + }, + }, + }, + }, + }, + resourceToUpdate: []unstructured.Unstructured{ + { + Object: map[string]interface{}{ + "apiVersion": "v1", + "kind": "Pod", + "metadata": map[string]interface{}{ + "name": "pod-1", + "namespace": "default", + }, + "spec": map[string]interface{}{ + "containers": []interface{}{ + map[string]interface{}{ + "name": "nginx", + "image": "nginx:latest", + }, + }, + "nodeName": "node-1", + }, + }, + }, + }, + want: []Record{ + { + Event: Add, + Resource: unstructured.Unstructured{ + Object: map[string]interface{}{ + "apiVersion": "v1", + "kind": "Pod", + "metadata": map[string]interface{}{ + "name": "pod-1", + "namespace": "default", + }, + "spec": map[string]interface{}{ + "containers": []interface{}{ + map[string]interface{}{ + "name": "nginx", + "image": "nginx:latest", + }, + }, + }, + }, + }, + }, + { + Event: Update, + Resource: unstructured.Unstructured{ + Object: map[string]interface{}{ + "apiVersion": "v1", + "kind": "Pod", + "metadata": map[string]interface{}{ + "name": "pod-1", + "namespace": "default", + }, + "spec": map[string]interface{}{ + "containers": []interface{}{ + map[string]interface{}{ + "name": "nginx", + "image": "nginx:latest", + }, + }, + "nodeName": "node-1", + }, + }, + }, + }, + }, + wantErr: false, + }, + { + name: "should record deleting a pod", + resourceToCreate: []unstructured.Unstructured{ + { + Object: map[string]interface{}{ + "apiVersion": "v1", + "kind": "Pod", + "metadata": map[string]interface{}{ + "name": "pod-1", + "namespace": "default", + }, + }, + }, + }, + resourceToDelete: []unstructured.Unstructured{ + { + Object: map[string]interface{}{ + "apiVersion": "v1", + "kind": "Pod", + "metadata": map[string]interface{}{ + "name": "pod-1", + "namespace": "default", + }, + }, + }, + }, + want: []Record{ + { + Event: Add, + Resource: unstructured.Unstructured{ + Object: map[string]interface{}{ + "apiVersion": "v1", + "kind": "Pod", + "metadata": map[string]interface{}{ + "name": "pod-1", + "namespace": "default", + }, + }, + }, + }, + { + Event: Delete, + Resource: unstructured.Unstructured{ + Object: map[string]interface{}{ + "apiVersion": "v1", + "kind": "Pod", + "metadata": map[string]interface{}{ + "name": "pod-1", + "namespace": "default", + }, + }, + }, + }, + }, + wantErr: false, + }, + } + + for _, tt := range tests { + tt := tt + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + + tempPath := path.Join(os.TempDir(), strings.ReplaceAll(tt.name, " ", "_")+".json") + defer os.Remove(tempPath) + + s := runtime.NewScheme() + corev1.AddToScheme(s) + appsv1.AddToScheme(s) + schedulingv1.AddToScheme(s) + storagev1.AddToScheme(s) + client := dynamicFake.NewSimpleDynamicClient(s) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + service := New(client, Options{Path: tempPath}) + err := service.Run(ctx) + if (err != nil) != tt.wantErr { + t.Errorf("Service.Record() error = %v, wantErr %v", err, tt.wantErr) + return + } + + err = apply(ctx, client, tt.resourceToCreate, tt.resourceToUpdate, tt.resourceToDelete) + if err != nil { + t.Fatal(err) + } + + err = assert(ctx, tempPath, tt.want) + if err != nil { + t.Fatal(err) + } + }) + } +} + +func apply(ctx context.Context, client *dynamicFake.FakeDynamicClient, resourceToCreate []unstructured.Unstructured, resourceToUpdate []unstructured.Unstructured, resourceToDelete []unstructured.Unstructured) error { + for _, resource := range resourceToCreate { + gvr, err := findGVR(&resource) + if err != nil { + return xerrors.Errorf("failed to find GVR: %w", err) + } + ns := resource.GetNamespace() + + _, err = client.Resource(gvr).Namespace(ns).Create(ctx, &resource, metav1.CreateOptions{}) + if err != nil { + return xerrors.Errorf("failed to create a pod: %w", err) + } + } + + for _, resource := range resourceToUpdate { + gvr, err := findGVR(&resource) + if err != nil { + return xerrors.Errorf("failed to find GVR: %w", err) + } + ns := resource.GetNamespace() + + _, err = client.Resource(gvr).Namespace(ns).Update(ctx, &resource, metav1.UpdateOptions{}) + if err != nil { + return xerrors.Errorf("failed to update a pod: %w", err) + } + } + + for _, resource := range resourceToDelete { + gvr, err := findGVR(&resource) + if err != nil { + return xerrors.Errorf("failed to find GVR: %w", err) + } + ns := resource.GetNamespace() + + err = client.Resource(gvr).Namespace(ns).Delete(ctx, resource.GetName(), metav1.DeleteOptions{}) + if err != nil { + return xerrors.Errorf("failed to delete a pod: %w", err) + } + } + + return nil +} + +func assert(ctx context.Context, filePath string, want []Record) error { + var finalErr error + err := wait.PollUntilContextTimeout(ctx, 100*time.Millisecond, 5*time.Second, false, func(context.Context) (bool, error) { + got := []Record{} + var b []byte + b, err := os.ReadFile(filePath) + if err != nil { + finalErr = xerrors.Errorf("failed to read the records: %w", err) + return false, nil + } + + err = json.Unmarshal(b, &got) + if err != nil { + finalErr = xerrors.Errorf("failed to unmarshal the records: %w", err) + return false, nil + } + + if len(got) != len(want) { + finalErr = xerrors.Errorf("Service.Record() got = %v, want %v", got, want) + return false, nil + } + + for i := range got { + if got[i].Event != want[i].Event { + finalErr = xerrors.Errorf("Service.Record() got = %v, want %v", got[i].Event, want[i].Event) + return true, finalErr + } + + if diff := cmp.Diff(want[i].Resource, got[i].Resource); diff != "" { + finalErr = xerrors.Errorf("Service.Record() got = %v, want %v", got[i].Resource, want[i].Resource) + return true, finalErr + } + } + + return true, nil + }) + + return err +} + +var ( + resources = []*restmapper.APIGroupResources{ + { + Group: metav1.APIGroup{ + Versions: []metav1.GroupVersionForDiscovery{ + {Version: "v1"}, + }, + }, + VersionedResources: map[string][]metav1.APIResource{ + "v1": { + {Name: "pods", Namespaced: true, Kind: "Pod"}, + }, + }, + }, + } + mapper = restmapper.NewDiscoveryRESTMapper(resources) +) + +func findGVR(obj *unstructured.Unstructured) (schema.GroupVersionResource, error) { + gvk := obj.GroupVersionKind() + m, err := mapper.RESTMapping(gvk.GroupKind(), gvk.Version) + if err != nil { + return schema.GroupVersionResource{}, err + } + + return m.Resource, nil +} diff --git a/simulator/replayer/mock_resourceapplier/resourceapplier.go b/simulator/replayer/mock_resourceapplier/resourceapplier.go new file mode 100644 index 000000000..46f498219 --- /dev/null +++ b/simulator/replayer/mock_resourceapplier/resourceapplier.go @@ -0,0 +1,84 @@ +// Code generated by MockGen. DO NOT EDIT. +// Source: simulator/resourceapplier/resourceapplier.go +// +// Generated by this command: +// +// mockgen -source=simulator/resourceapplier/resourceapplier.go -destination=simulator/replayer/mock_resourceapplier/resourceapplier.go +// + +// Package mock_resourceapplier is a generated GoMock package. +package mock_resourceapplier + +import ( + context "context" + reflect "reflect" + + gomock "go.uber.org/mock/gomock" + unstructured "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" +) + +// MockResourceApplier is a mock of ResourceApplier interface. +type MockResourceApplier struct { + ctrl *gomock.Controller + recorder *MockResourceApplierMockRecorder + isgomock struct{} +} + +// MockResourceApplierMockRecorder is the mock recorder for MockResourceApplier. +type MockResourceApplierMockRecorder struct { + mock *MockResourceApplier +} + +// NewMockResourceApplier creates a new mock instance. +func NewMockResourceApplier(ctrl *gomock.Controller) *MockResourceApplier { + mock := &MockResourceApplier{ctrl: ctrl} + mock.recorder = &MockResourceApplierMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockResourceApplier) EXPECT() *MockResourceApplierMockRecorder { + return m.recorder +} + +// Create mocks base method. +func (m *MockResourceApplier) Create(ctx context.Context, resource *unstructured.Unstructured) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Create", ctx, resource) + ret0, _ := ret[0].(error) + return ret0 +} + +// Create indicates an expected call of Create. +func (mr *MockResourceApplierMockRecorder) Create(ctx, resource any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Create", reflect.TypeOf((*MockResourceApplier)(nil).Create), ctx, resource) +} + +// Delete mocks base method. +func (m *MockResourceApplier) Delete(ctx context.Context, resource *unstructured.Unstructured) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Delete", ctx, resource) + ret0, _ := ret[0].(error) + return ret0 +} + +// Delete indicates an expected call of Delete. +func (mr *MockResourceApplierMockRecorder) Delete(ctx, resource any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Delete", reflect.TypeOf((*MockResourceApplier)(nil).Delete), ctx, resource) +} + +// Update mocks base method. +func (m *MockResourceApplier) Update(ctx context.Context, resource *unstructured.Unstructured) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Update", ctx, resource) + ret0, _ := ret[0].(error) + return ret0 +} + +// Update indicates an expected call of Update. +func (mr *MockResourceApplierMockRecorder) Update(ctx, resource any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Update", reflect.TypeOf((*MockResourceApplier)(nil).Update), ctx, resource) +} diff --git a/simulator/replayer/replayer.go b/simulator/replayer/replayer.go new file mode 100644 index 000000000..11e0dbb1d --- /dev/null +++ b/simulator/replayer/replayer.go @@ -0,0 +1,79 @@ +package replayer + +import ( + "context" + "encoding/json" + "os" + + "golang.org/x/xerrors" + "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/klog" + + "sigs.k8s.io/kube-scheduler-simulator/simulator/recorder" +) + +type Service struct { + applier ResourceApplier + path string +} + +type ResourceApplier interface { + Create(ctx context.Context, resource *unstructured.Unstructured) error + Update(ctx context.Context, resource *unstructured.Unstructured) error + Delete(ctx context.Context, resource *unstructured.Unstructured) error +} + +type Options struct { + Path string +} + +func New(applier ResourceApplier, options Options) *Service { + return &Service{applier: applier, path: options.Path} +} + +func (s *Service) Replay(ctx context.Context) error { + records := []recorder.Record{} + + b, err := os.ReadFile(s.path) + if err != nil { + return err + } + + if err := json.Unmarshal(b, &records); err != nil { + return err + } + + for _, record := range records { + if err := s.applyEvent(ctx, record); err != nil { + return xerrors.Errorf("failed to replay event: %w", err) + } + } + + return nil +} + +func (s *Service) applyEvent(ctx context.Context, record recorder.Record) error { + switch record.Event { + case recorder.Add: + if err := s.applier.Create(ctx, &record.Resource); err != nil { + if errors.IsAlreadyExists(err) { + klog.Warningf("resource already exists: %v", err) + } else { + return xerrors.Errorf("failed to create resource: %w", err) + } + } + case recorder.Update: + if err := s.applier.Update(ctx, &record.Resource); err != nil { + return xerrors.Errorf("failed to update resource: %w", err) + } + case recorder.Delete: + if err := s.applier.Delete(ctx, &record.Resource); err != nil { + return xerrors.Errorf("failed to delete resource: %w", err) + } + default: + return xerrors.Errorf("unknown event: %v", record.Event) + } + + return nil +} diff --git a/simulator/replayer/replayer_test.go b/simulator/replayer/replayer_test.go new file mode 100644 index 000000000..6f449984c --- /dev/null +++ b/simulator/replayer/replayer_test.go @@ -0,0 +1,134 @@ +package replayer + +import ( + "context" + "encoding/json" + "os" + "testing" + + "go.uber.org/mock/gomock" + "golang.org/x/xerrors" + "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime/schema" + + "sigs.k8s.io/kube-scheduler-simulator/simulator/recorder" + "sigs.k8s.io/kube-scheduler-simulator/simulator/replayer/mock_resourceapplier" +) + +func TestService_Replay(t *testing.T) { + t.Parallel() + tests := []struct { + name string + records []recorder.Record + prepareMockFn func(*mock_resourceapplier.MockResourceApplier) + wantErr bool + }{ + { + name: "no error when Create is successful", + records: []recorder.Record{ + { + Event: recorder.Add, + Resource: unstructured.Unstructured{ + Object: map[string]interface{}{ + "apiVersion": "v1", + "kind": "Pod", + "metadata": map[string]interface{}{ + "name": "pod-1", + "namespace": "default", + }, + }, + }, + }, + }, + prepareMockFn: func(applier *mock_resourceapplier.MockResourceApplier) { + applier.EXPECT().Create(gomock.Any(), gomock.Any()).Return(nil) + }, + wantErr: false, + }, + { + name: "should return error if Create raise an error", + records: []recorder.Record{ + { + Event: recorder.Add, + Resource: unstructured.Unstructured{ + Object: map[string]interface{}{ + "apiVersion": "v1", + "kind": "Pod", + "metadata": map[string]interface{}{ + "name": "pod-1", + "namespace": "default", + }, + }, + }, + }, + }, + prepareMockFn: func(applier *mock_resourceapplier.MockResourceApplier) { + applier.EXPECT().Create(gomock.Any(), gomock.Any()).Return(xerrors.Errorf("failed to create resource")) + }, + wantErr: true, + }, + { + name: "ignore AlreadyExists error when Create raise an error", + records: []recorder.Record{ + { + Event: recorder.Add, + Resource: unstructured.Unstructured{ + Object: map[string]interface{}{ + "apiVersion": "v1", + "kind": "Pod", + "metadata": map[string]interface{}{ + "name": "pod-1", + "namespace": "default", + }, + }, + }, + }, + }, + prepareMockFn: func(applier *mock_resourceapplier.MockResourceApplier) { + applier.EXPECT().Create(gomock.Any(), gomock.Any()).Return(errors.NewAlreadyExists(schema.GroupResource{}, "resource already exists")) + }, + wantErr: false, + }, + } + + for _, tt := range tests { + tt := tt + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + mockApplier := mock_resourceapplier.NewMockResourceApplier(ctrl) + tt.prepareMockFn(mockApplier) + + tempFile, err := os.CreateTemp("", "record.json") + if err != nil { + t.Fatalf("failed to create temp file: %v", err) + } + defer os.Remove(tempFile.Name()) + + b, err := json.Marshal(tt.records) + if err != nil { + t.Fatalf("failed to marshal records: %v", err) + } + + _, err = tempFile.Write(b) + if err != nil { + t.Fatalf("failed to write records: %v", err) + } + + err = tempFile.Close() + if err != nil { + t.Fatalf("failed to close temp file: %v", err) + } + + service := New(mockApplier, Options{tempFile.Name()}) + + err = service.Replay(context.Background()) + if (err != nil) != tt.wantErr { + t.Errorf("Service.Replay() error = %v", err) + } + }) + } +} diff --git a/simulator/resourceapplier/resourceapplier.go b/simulator/resourceapplier/resourceapplier.go index 6c191e36c..13dea0ad0 100644 --- a/simulator/resourceapplier/resourceapplier.go +++ b/simulator/resourceapplier/resourceapplier.go @@ -26,7 +26,7 @@ type Clients struct { } type Options struct { - GVRsToSync []schema.GroupVersionResource + GVRsToApply []schema.GroupVersionResource FilterBeforeCreating map[schema.GroupVersionResource][]FilteringFunction MutateBeforeCreating map[schema.GroupVersionResource][]MutatingFunction FilterBeforeUpdating map[schema.GroupVersionResource][]FilteringFunction @@ -56,7 +56,7 @@ func New(dynamicClient dynamic.Interface, restMapper meta.RESTMapper, options Op filterBeforeUpdating: map[schema.GroupVersionResource][]FilteringFunction{}, mutateBeforeUpdating: map[schema.GroupVersionResource][]MutatingFunction{}, - GVRsToSync: options.GVRsToSync, + GVRsToSync: options.GVRsToApply, } for gvr, fn := range mandatoryFilterForCreating { diff --git a/simulator/server/di/di.go b/simulator/server/di/di.go index e46ebfd43..d44d4656d 100644 --- a/simulator/server/di/di.go +++ b/simulator/server/di/di.go @@ -13,6 +13,8 @@ import ( configv1 "k8s.io/kube-scheduler/config/v1" "sigs.k8s.io/kube-scheduler-simulator/simulator/oneshotimporter" + "sigs.k8s.io/kube-scheduler-simulator/simulator/recorder" + "sigs.k8s.io/kube-scheduler-simulator/simulator/replayer" "sigs.k8s.io/kube-scheduler-simulator/simulator/reset" "sigs.k8s.io/kube-scheduler-simulator/simulator/resourceapplier" "sigs.k8s.io/kube-scheduler-simulator/simulator/resourcewatcher" @@ -29,6 +31,8 @@ type Container struct { oneshotClusterResourceImporter OneShotClusterResourceImporter resourceSyncer ResourceSyncer resourceWatcherService ResourceWatcherService + recorderSerivce RecorderService + replayService ReplayService } // NewDIContainer initializes Container. @@ -43,10 +47,14 @@ func NewDIContainer( initialSchedulerCfg *configv1.KubeSchedulerConfiguration, externalImportEnabled bool, resourceSyncEnabled bool, + recordEnabled bool, + replayEnabled bool, externalClient clientset.Interface, externalDynamicClient dynamic.Interface, simulatorPort int, resourceapplierOptions resourceapplier.Options, + recorderOptions recorder.Options, + replayerOptions replayer.Options, ) (*Container, error) { c := &Container{} @@ -68,6 +76,12 @@ func NewDIContainer( c.resourceSyncer = syncer.New(externalDynamicClient, resourceApplierService) } c.resourceWatcherService = resourcewatcher.NewService(client) + if recordEnabled { + c.recorderSerivce = recorder.New(externalDynamicClient, recorderOptions) + } + if replayEnabled { + c.replayService = replayer.New(resourceApplierService, replayerOptions) + } return c, nil } @@ -98,6 +112,16 @@ func (c *Container) ResourceSyncer() ResourceSyncer { return c.resourceSyncer } +// RecorderService returns RecorderService. +func (c *Container) RecorderService() RecorderService { + return c.recorderSerivce +} + +// ReplayService returns ReplayService. +func (c *Container) ReplayService() ReplayService { + return c.replayService +} + // ResourceWatcherService returns ResourceWatcherService. func (c *Container) ResourceWatcherService() ResourceWatcherService { return c.resourceWatcherService diff --git a/simulator/server/di/interface.go b/simulator/server/di/interface.go index 00150362d..662c14912 100644 --- a/simulator/server/di/interface.go +++ b/simulator/server/di/interface.go @@ -34,18 +34,32 @@ type ResetService interface { Reset(ctx context.Context) error } -// OneShotClusterResourceImporter represents a service to import resources from an target cluster when starting the simulator. +// OneShotClusterResourceImporter represents a service to import resources from a target cluster when starting the simulator. type OneShotClusterResourceImporter interface { ImportClusterResources(ctx context.Context, labelSelector metav1.LabelSelector) error } -// ResourceSyncer represents a service to constantly sync resources from an target cluster. +// ResourceSyncer represents a service to constantly sync resources from a target cluster. type ResourceSyncer interface { // Run starts the resource syncer. // It should be run until the context is canceled. Run(ctx context.Context) error } +// RecorderService represents a service to record events in a target cluster. +type RecorderService interface { + // Run starts the recorder. + // It should be run until the context is canceled. + Run(ctx context.Context) error +} + +// ReplayService represents a service to replay events recorded in a file. +type ReplayService interface { + // Replay replays the recorded events. + // It should be run until the context is canceled. + Replay(ctx context.Context) error +} + // ResourceWatcherService represents service for watch k8s resources. type ResourceWatcherService interface { ListWatch(ctx context.Context, stream streamwriter.ResponseStream, lrVersions *resourcewatcher.LastResourceVersions) error From 9e8feeb2ca7299576405f74c79a0106bba32e18c Mon Sep 17 00:00:00 2001 From: saza-ku Date: Mon, 27 Jan 2025 17:19:15 +0900 Subject: [PATCH 02/12] fix docs --- README.md | 1 + ...d => record-and-replay-cluster-changes.md} | 30 +++++++++---------- 2 files changed, 16 insertions(+), 15 deletions(-) rename simulator/docs/{record-and-replay-cluster-events.md => record-and-replay-cluster-changes.md} (69%) diff --git a/README.md b/README.md index a084df412..d2d7b3440 100644 --- a/README.md +++ b/README.md @@ -91,6 +91,7 @@ you can refer to the [documentation](./simulator/docs/simulator-server-config.md See the following docs to know more about simulator: - [import-cluster-resources.md](./simulator/docs/import-cluster-resources.md): describes how you can import resources in your cluster to the simulator so that you can simulate scheduling based on your cluster's situation. +- [record-and-replay-cluster-changes.md](./simulator/docs/record-and-replay-cluster-changes.md): describes how you can record and replay the resources changes in the simulator. - [how-it-works.md](simulator/docs/how-it-works.md): describes about how the simulator works. - [kube-apiserver.md](simulator/docs/kube-apiserver.md): describe about kube-apiserver in simulator. (how you can configure and access) - [api.md](simulator/docs/api.md): describes about HTTP server the simulator has. (mainly for the webUI) diff --git a/simulator/docs/record-and-replay-cluster-events.md b/simulator/docs/record-and-replay-cluster-changes.md similarity index 69% rename from simulator/docs/record-and-replay-cluster-events.md rename to simulator/docs/record-and-replay-cluster-changes.md index ff0356b63..7e04d8592 100644 --- a/simulator/docs/record-and-replay-cluster-events.md +++ b/simulator/docs/record-and-replay-cluster-changes.md @@ -1,35 +1,35 @@ -# [Beta] Record your real cluster's events and replay them in the simulator +# [Beta] Record your real cluster's changes in resources and replay them in the simulator -You can record events from your real cluster and replay them in the simulator. This feature is useful for reproducing issues that occur in your real cluster. +You can record resource addition/update/deletion at your real cluster. This feature is useful for reproducing issues that occur in your real cluster. -## Record events +## Record changes -To record events from your real cluster, you need to follow these steps: +To record changes from your real cluster, you need to follow these steps: 1. Set `true` to `recorderEnabled`. 2. Set the path of the kubeconfig file for your cluster to `KubeConfig`. - - This feature only requires the read permission for events. -3. Set the path of the file to save the recorded events to `recordedFilePath`. + - This feature only requires the read permission for resources. +3. Set the path of the file to save the recorded changes to `recordedFilePath`. 4. Make sure the file path is mounted to the simulator container. ```yaml recorderEnabled: true kubeConfig: "/path/to/your-cluster-kubeconfig" -recordedFilePath: "/path/to/recorded-events.json" +recordedFilePath: "/path/to/recorded-changes.json" ``` ```yaml volumes: ... - - ./path/to/recorded-events.json:/path/to/recorded-events.json + - ./path/to/recorded-changes.json:/path/to/recorded-changes.json ``` > [!NOTE] -> When a file already exists at `recordedFilePath`, it backs up the file in the same directory adding a timestamp to the filename and creates a new file for recording. +> When a file already exists at `recordedFilePath`, it puts out an error. ### Resources to record -It records the events of the following resources: +It records the changes of the following resources: - Pods - Nodes @@ -49,21 +49,21 @@ recorderOptions := recorder.Options{Path: cfg.RecordFilePath, } ``` -## Replay events +## Replay changes -To replay the recorded events in the simulator, you need to follow these steps: +To replay the recorded changes in the simulator, you need to follow these steps: 1. Set `true` to `replayerEnabled`. -2. Set the path of the file where the events are recorded to `recordedFilePath`. +2. Set the path of the file where the changes are recorded to `recordedFilePath`. ```yaml replayerEnabled: true -recordedFilePath: "/path/to/recorded-events.json" +recordedFilePath: "/path/to/recorded-changes.json" ``` ### Resources to replay -It replays the events of the following resources: +It replays the changes of the following resources: - Pods - Nodes From f8ca2fb7aa470f36a5b74b7c4fc749239ac61098 Mon Sep 17 00:00:00 2001 From: saza-ku Date: Wed, 5 Feb 2025 15:10:47 +0900 Subject: [PATCH 03/12] buffering records --- simulator/Makefile | 2 +- simulator/cmd/recorder/recorder.go | 4 + simulator/cmd/simulator/simulator.go | 2 +- simulator/recorder/recorder.go | 142 +++++++++++---------------- simulator/recorder/recorder_test.go | 51 ++++++---- 5 files changed, 98 insertions(+), 103 deletions(-) create mode 100644 simulator/cmd/recorder/recorder.go diff --git a/simulator/Makefile b/simulator/Makefile index dbdc28724..5c5c2dcc4 100644 --- a/simulator/Makefile +++ b/simulator/Makefile @@ -14,7 +14,7 @@ format: .PHONY: test test: - go test ./... + go test -count=1 -v ./... .PHONY: mod-download mod-download: ## Downloads the Go module diff --git a/simulator/cmd/recorder/recorder.go b/simulator/cmd/recorder/recorder.go new file mode 100644 index 000000000..da29a2cad --- /dev/null +++ b/simulator/cmd/recorder/recorder.go @@ -0,0 +1,4 @@ +package main + +func main() { +} diff --git a/simulator/cmd/simulator/simulator.go b/simulator/cmd/simulator/simulator.go index f948decfb..8bad1ddae 100644 --- a/simulator/cmd/simulator/simulator.go +++ b/simulator/cmd/simulator/simulator.go @@ -96,7 +96,7 @@ func startSimulator() error { return xerrors.Errorf("kubeapi-server is not ready: %w", err) } - recorderOptions := recorder.Options{Path: cfg.RecordFilePath} + recorderOptions := recorder.Options{RecordDir: cfg.RecordFilePath} replayerOptions := replayer.Options{Path: cfg.RecordFilePath} resourceApplierOptions := resourceapplier.Options{} diff --git a/simulator/recorder/recorder.go b/simulator/recorder/recorder.go index 7425f131e..8480be1fd 100644 --- a/simulator/recorder/recorder.go +++ b/simulator/recorder/recorder.go @@ -3,10 +3,9 @@ package recorder import ( "context" "encoding/json" - "errors" - "io" + "fmt" "os" - "path/filepath" + "path" "time" "golang.org/x/xerrors" @@ -27,11 +26,14 @@ var ( Delete Event = "Delete" ) +const defaultBufferSize = 1000 + type Service struct { - client dynamic.Interface - gvrs []schema.GroupVersionResource - path string - recordCh chan Record + client dynamic.Interface + gvrs []schema.GroupVersionResource + path string + recordCh chan Record + bufferSize int } type Record struct { @@ -51,8 +53,9 @@ var DefaultGVRs = []schema.GroupVersionResource{ } type Options struct { - GVRs []schema.GroupVersionResource - Path string + GVRs []schema.GroupVersionResource + RecordDir string + BufferSize *int } func New(client dynamic.Interface, options Options) *Service { @@ -61,11 +64,17 @@ func New(client dynamic.Interface, options Options) *Service { gvrs = options.GVRs } + bufferSize := defaultBufferSize + if options.BufferSize != nil { + bufferSize = *options.BufferSize + } + return &Service{ - client: client, - gvrs: gvrs, - path: options.Path, - recordCh: make(chan Record), + client: client, + gvrs: gvrs, + path: options.RecordDir, + recordCh: make(chan Record, bufferSize), + bufferSize: bufferSize, } } @@ -85,11 +94,6 @@ func (s *Service) Run(ctx context.Context) error { infFact.WaitForCacheSync(ctx.Done()) } - err := s.backup() - if err != nil { - klog.Error("Failed to backup record: ", err) - } - go s.record(ctx) return nil @@ -112,82 +116,52 @@ func (s *Service) RecordEvent(obj interface{}, e Event) { } func (s *Service) record(ctx context.Context) { - records := []Record{} + records := make([]Record, 0, s.bufferSize) + count := 0 + writeRecord := func() { + defer func() { + count++ + records = make([]Record, 0, s.bufferSize) + }() + + filePath := path.Join(s.path, fmt.Sprintf("record-%018d.json", count)) + file, err := os.Create(filePath) + if err != nil { + klog.Error("Failed to create record file: ", err) + return + } + + b, err := json.Marshal(records) + if err != nil { + klog.Error("Failed to marshal record: ", err) + return + } + + _, err = file.Write(b) + if err != nil { + klog.Error("Failed to write record: ", err) + return + } + } + for { select { case r := <-s.recordCh: records = append(records, r) - - tempFile, err := os.CreateTemp("", "record_temp.json") - if err != nil { - klog.Error("Failed to open file: ", err) - continue - } - - b, err := json.Marshal(records) - if err != nil { - klog.Error("Failed to marshal record: ", err) - continue - } - - _, err = tempFile.Write(b) - if err != nil { - klog.Error("Failed to write record: ", err) - continue - } - - tempFile.Close() - - if err = moveFile(tempFile.Name(), s.path); err != nil { - klog.Error("Failed to rename file: ", err) - continue + if len(records) == s.bufferSize { + writeRecord() } case <-ctx.Done(): + if len(records) > 0 { + writeRecord() + } return default: - time.Sleep(1 * time.Second) - } - } -} - -func (s *Service) backup() error { - f, err := os.Stat(s.path) - if err != nil { - if errors.Is(err, os.ErrNotExist) { - return nil + if len(records) > 0 { + writeRecord() + } } - return xerrors.Errorf("failed to get file info: %w", err) } - - if f.IsDir() { - return xerrors.New("record file is a directory") - } - - name := filepath.Base(s.path) - dir := filepath.Dir(s.path) - backupPath := filepath.Join(dir, f.ModTime().Format("2006-01-02_150405_")+name) - return moveFile(s.path, backupPath) -} - -func moveFile(srcPath, destPath string) error { - src, err := os.Open(srcPath) - if err != nil { - return err - } - defer src.Close() - - dest, err := os.Create(destPath) - if err != nil { - return err - } - defer dest.Close() - - _, err = io.Copy(dest, src) - if err != nil { - return err - } - - return os.Remove(srcPath) } diff --git a/simulator/recorder/recorder_test.go b/simulator/recorder/recorder_test.go index bcdd5d6ad..f6c622d7a 100644 --- a/simulator/recorder/recorder_test.go +++ b/simulator/recorder/recorder_test.go @@ -5,7 +5,6 @@ import ( "encoding/json" "os" "path" - "strings" "testing" "time" @@ -241,8 +240,12 @@ func TestRecorder(t *testing.T) { t.Run(tt.name, func(t *testing.T) { t.Parallel() - tempPath := path.Join(os.TempDir(), strings.ReplaceAll(tt.name, " ", "_")+".json") - defer os.Remove(tempPath) + dir := path.Join(t.TempDir(), tt.name) + err := os.MkdirAll(dir, 0755) + if err != nil { + t.Fatal(err) + } + defer os.RemoveAll(dir) s := runtime.NewScheme() corev1.AddToScheme(s) @@ -254,8 +257,8 @@ func TestRecorder(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - service := New(client, Options{Path: tempPath}) - err := service.Run(ctx) + service := New(client, Options{RecordDir: dir}) + err = service.Run(ctx) if (err != nil) != tt.wantErr { t.Errorf("Service.Record() error = %v, wantErr %v", err, tt.wantErr) return @@ -266,7 +269,7 @@ func TestRecorder(t *testing.T) { t.Fatal(err) } - err = assert(ctx, tempPath, tt.want) + err = assert(ctx, dir, tt.want) if err != nil { t.Fatal(err) } @@ -317,21 +320,35 @@ func apply(ctx context.Context, client *dynamicFake.FakeDynamicClient, resourceT return nil } -func assert(ctx context.Context, filePath string, want []Record) error { +func assert(ctx context.Context, dirPath string, want []Record) error { var finalErr error - err := wait.PollUntilContextTimeout(ctx, 100*time.Millisecond, 5*time.Second, false, func(context.Context) (bool, error) { - got := []Record{} - var b []byte - b, err := os.ReadFile(filePath) + wait.PollUntilContextTimeout(ctx, 100*time.Millisecond, 5*time.Second, false, func(context.Context) (bool, error) { + files, err := os.ReadDir(dirPath) if err != nil { - finalErr = xerrors.Errorf("failed to read the records: %w", err) + finalErr = xerrors.Errorf("failed to read the record directory: %w", err) return false, nil } - err = json.Unmarshal(b, &got) - if err != nil { - finalErr = xerrors.Errorf("failed to unmarshal the records: %w", err) - return false, nil + got := []Record{} + for _, file := range files { + if file.IsDir() { + continue + } + + b, err := os.ReadFile(path.Join(dirPath, file.Name())) + if err != nil { + finalErr = xerrors.Errorf("failed to read the record file: %w", err) + return false, nil + } + + var records []Record + err = json.Unmarshal(b, &records) + if err != nil { + finalErr = xerrors.Errorf("failed to unmarshal the records: %w", err) + return false, nil + } + + got = append(got, records...) } if len(got) != len(want) { @@ -354,7 +371,7 @@ func assert(ctx context.Context, filePath string, want []Record) error { return true, nil }) - return err + return finalErr } var ( From 43a330ff75b2ea2d472d6a2c08b6c5814ac159de Mon Sep 17 00:00:00 2001 From: saza-ku Date: Wed, 5 Feb 2025 17:27:45 +0900 Subject: [PATCH 04/12] make recorder cli --- simulator/cmd/recorder/kubeconfig.yaml | 14 ++++++ simulator/cmd/recorder/recorder.go | 67 ++++++++++++++++++++++++++ 2 files changed, 81 insertions(+) create mode 100644 simulator/cmd/recorder/kubeconfig.yaml diff --git a/simulator/cmd/recorder/kubeconfig.yaml b/simulator/cmd/recorder/kubeconfig.yaml new file mode 100644 index 000000000..b9a117804 --- /dev/null +++ b/simulator/cmd/recorder/kubeconfig.yaml @@ -0,0 +1,14 @@ +apiVersion: v1 +kind: Config + +clusters: + - cluster: + server: http://localhost:3132 + name: simulator + +contexts: + - context: + cluster: simulator + name: simulator + +current-context: simulator diff --git a/simulator/cmd/recorder/recorder.go b/simulator/cmd/recorder/recorder.go index da29a2cad..6bded456a 100644 --- a/simulator/cmd/recorder/recorder.go +++ b/simulator/cmd/recorder/recorder.go @@ -1,4 +1,71 @@ package main +import ( + "context" + "flag" + "fmt" + "os" + "os/signal" + "syscall" + + "golang.org/x/xerrors" + "k8s.io/client-go/dynamic" + "k8s.io/client-go/tools/clientcmd" + "k8s.io/klog" + "sigs.k8s.io/kube-scheduler-simulator/simulator/recorder" +) + +var dirFlag = flag.String("dir", "", "directory to store the recorded resources") +var kubeConfigFlag = flag.String("kubeconfig", "", "path to kubeconfig file") + func main() { + if err := startRecorder(); err != nil { + klog.Fatalf("failed with error on running simulator: %+v", err) + } +} + +func startRecorder() error { + flag.Parse() + + if err := validateFlags(); err != nil { + return err + } + + restCfg, err := clientcmd.BuildConfigFromFlags("", *kubeConfigFlag) + if err != nil { + return xerrors.Errorf("load kubeconfig: %w", err) + } + + client := dynamic.NewForConfigOrDie(restCfg) + fmt.Println(restCfg.Host) + + recorder := recorder.New(client, recorder.Options{ + RecordDir: *dirFlag, + }) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + if err := recorder.Run(ctx); err != nil { + return xerrors.Errorf("run recorder: %w", err) + } + + // Block until signal is received + ch := make(chan os.Signal, 1) + signal.Notify(ch, syscall.SIGINT, syscall.SIGTERM) + <-ch + + return nil +} + +func validateFlags() error { + if *dirFlag == "" { + return xerrors.New("dir flag is required") + } + + if *kubeConfigFlag == "" { + return xerrors.New("kubeconfig flag is required") + } + + return nil } From f2bc165ff6879f2af727453a973c1561b64ecba0 Mon Sep 17 00:00:00 2001 From: saza-ku Date: Wed, 5 Feb 2025 17:46:30 +0900 Subject: [PATCH 05/12] fix replayer to read all files in record directory --- simulator/cmd/simulator/simulator.go | 2 +- simulator/config.yaml | 2 +- simulator/replayer/replayer.go | 34 +++++++++++++++++++++++----- 3 files changed, 30 insertions(+), 8 deletions(-) diff --git a/simulator/cmd/simulator/simulator.go b/simulator/cmd/simulator/simulator.go index 8bad1ddae..dc103275a 100644 --- a/simulator/cmd/simulator/simulator.go +++ b/simulator/cmd/simulator/simulator.go @@ -97,7 +97,7 @@ func startSimulator() error { } recorderOptions := recorder.Options{RecordDir: cfg.RecordFilePath} - replayerOptions := replayer.Options{Path: cfg.RecordFilePath} + replayerOptions := replayer.Options{RecordDir: cfg.RecordFilePath} resourceApplierOptions := resourceapplier.Options{} dic, err := di.NewDIContainer(client, dynamicClient, restMapper, etcdclient, restCfg, cfg.InitialSchedulerCfg, cfg.ExternalImportEnabled, cfg.ResourceSyncEnabled, cfg.RecorderEnabled, cfg.ReplayerEnabled, importClusterResourceClient, importClusterDynamicClient, cfg.Port, resourceApplierOptions, recorderOptions, replayerOptions) diff --git a/simulator/config.yaml b/simulator/config.yaml index 7ed80002e..e0d5dfbcc 100644 --- a/simulator/config.yaml +++ b/simulator/config.yaml @@ -66,4 +66,4 @@ replayEnabled: true # The path to a file which the simulator will record # events to or replay events from. -recordFilePath: "/record/record.json" +recordFilePath: "./simulator/record" diff --git a/simulator/replayer/replayer.go b/simulator/replayer/replayer.go index 11e0dbb1d..2838af9ee 100644 --- a/simulator/replayer/replayer.go +++ b/simulator/replayer/replayer.go @@ -4,6 +4,7 @@ import ( "context" "encoding/json" "os" + "path" "golang.org/x/xerrors" "k8s.io/apimachinery/pkg/api/errors" @@ -14,8 +15,8 @@ import ( ) type Service struct { - applier ResourceApplier - path string + applier ResourceApplier + recordDir string } type ResourceApplier interface { @@ -25,23 +26,44 @@ type ResourceApplier interface { } type Options struct { - Path string + RecordDir string } func New(applier ResourceApplier, options Options) *Service { - return &Service{applier: applier, path: options.Path} + return &Service{applier: applier, recordDir: options.RecordDir} } func (s *Service) Replay(ctx context.Context) error { + files, err := os.ReadDir(s.recordDir) + if err != nil { + return xerrors.Errorf("failed to read record directory: %w", err) + } + + for _, file := range files { + if file.IsDir() { + continue + } + + filePath := path.Join(s.recordDir, file.Name()) + if err := s.replayRecordsFromFile(ctx, filePath, s.applier); err != nil { + return xerrors.Errorf("failed to replay records from file: %w", err) + } + } + + return nil +} + +func (s *Service) replayRecordsFromFile(ctx context.Context, path string, applier ResourceApplier) error { records := []recorder.Record{} - b, err := os.ReadFile(s.path) + b, err := os.ReadFile(path) if err != nil { return err } if err := json.Unmarshal(b, &records); err != nil { - return err + klog.Warningf("failed to unmarshal records from file %s: %v", path, err) + return nil } for _, record := range records { From f801c9abbdbbd0b44c300948f907d8c69a56d5a2 Mon Sep 17 00:00:00 2001 From: saza-ku Date: Wed, 5 Feb 2025 18:33:02 +0900 Subject: [PATCH 06/12] remove recorder from simulator --- compose.yml | 1 + simulator/cmd/simulator/simulator.go | 15 +++--------- simulator/config.yaml | 16 ++++--------- simulator/config/config.go | 35 +++++++--------------------- simulator/config/v1alpha1/types.go | 9 ++----- simulator/replayer/replayer.go | 4 ++-- simulator/server/di/di.go | 12 ---------- 7 files changed, 20 insertions(+), 72 deletions(-) diff --git a/compose.yml b/compose.yml index 35d8bc0b2..0bc9c3ad0 100644 --- a/compose.yml +++ b/compose.yml @@ -29,6 +29,7 @@ services: - ./simulator/kubeconfig.yaml:/kubeconfig.yaml - /var/run/docker.sock:/var/run/docker.sock - conf:/config + - ./simulator/record:/record environment: - PORT=1212 - KUBE_SCHEDULER_SIMULATOR_ETCD_URL=http://simulator-cluster:2379 diff --git a/simulator/cmd/simulator/simulator.go b/simulator/cmd/simulator/simulator.go index dc103275a..57bee7f25 100644 --- a/simulator/cmd/simulator/simulator.go +++ b/simulator/cmd/simulator/simulator.go @@ -20,7 +20,6 @@ import ( "k8s.io/klog/v2" "sigs.k8s.io/kube-scheduler-simulator/simulator/config" - "sigs.k8s.io/kube-scheduler-simulator/simulator/recorder" "sigs.k8s.io/kube-scheduler-simulator/simulator/replayer" "sigs.k8s.io/kube-scheduler-simulator/simulator/resourceapplier" "sigs.k8s.io/kube-scheduler-simulator/simulator/server" @@ -60,7 +59,7 @@ func startSimulator() error { importClusterResourceClient := &clientset.Clientset{} var importClusterDynamicClient dynamic.Interface - if cfg.ExternalImportEnabled || cfg.ResourceSyncEnabled || cfg.RecorderEnabled { + if cfg.ExternalImportEnabled || cfg.ResourceSyncEnabled { importClusterResourceClient, err = clientset.NewForConfig(cfg.ExternalKubeClientCfg) if err != nil { return xerrors.Errorf("creates a new Clientset for the ExternalKubeClientCfg: %w", err) @@ -96,11 +95,10 @@ func startSimulator() error { return xerrors.Errorf("kubeapi-server is not ready: %w", err) } - recorderOptions := recorder.Options{RecordDir: cfg.RecordFilePath} - replayerOptions := replayer.Options{RecordDir: cfg.RecordFilePath} + replayerOptions := replayer.Options{RecordDir: cfg.RecordDirPath} resourceApplierOptions := resourceapplier.Options{} - dic, err := di.NewDIContainer(client, dynamicClient, restMapper, etcdclient, restCfg, cfg.InitialSchedulerCfg, cfg.ExternalImportEnabled, cfg.ResourceSyncEnabled, cfg.RecorderEnabled, cfg.ReplayerEnabled, importClusterResourceClient, importClusterDynamicClient, cfg.Port, resourceApplierOptions, recorderOptions, replayerOptions) + dic, err := di.NewDIContainer(client, dynamicClient, restMapper, etcdclient, restCfg, cfg.InitialSchedulerCfg, cfg.ExternalImportEnabled, cfg.ResourceSyncEnabled, cfg.ReplayerEnabled, importClusterResourceClient, importClusterDynamicClient, cfg.Port, resourceApplierOptions, replayerOptions) if err != nil { return xerrors.Errorf("create di container: %w", err) } @@ -132,13 +130,6 @@ func startSimulator() error { } } - if cfg.RecorderEnabled { - // Start the recorder to record events in the target cluster. - if err = dic.RecorderService().Run(ctx); err != nil { - return xerrors.Errorf("start recording: %w", err) - } - } - // start simulator server s := server.NewSimulatorServer(cfg, dic) shutdownFn, err := s.Start(cfg.Port) diff --git a/simulator/config.yaml b/simulator/config.yaml index e0d5dfbcc..e83970606 100644 --- a/simulator/config.yaml +++ b/simulator/config.yaml @@ -20,7 +20,7 @@ corsAllowedOriginList: - "http://localhost:3000" # This is for the beta feature "One-shot importing cluster's resources", -# "Continuous syncing cluster's resources" and "Recording cluster's events". +# "Continuous syncing cluster's resources" and "Replaying cluster's events". # This variable is used to find Kubeconfig required to access your # cluster for importing resources to scheduler simulator. kubeConfig: "/kubeconfig.yaml" @@ -51,19 +51,11 @@ externalImportEnabled: false resourceSyncEnabled: false # This variable indicates whether the simulator will -# record events from a user cluster specified by kubeConfig. -# You cannot make both of recorderEnabled and replayEnabled true because those features would be conflicted. -# Note, this is still a beta feature. -recorderEnabled: false - -# This variable indicates whether the simulator will -# replay events recorded in the file specified by recordFilePath. +# replay events recorded in the file specified by recordDirPath. # You cannot make two or more of externalImportEnabled, resourceSyncEnabled and replayEnabled true # because those features would be conflicted. -# You cannot make both of recorderEnabled and replayEnabled true because those features would be conflicted. # Note, this is still a beta feature. replayEnabled: true -# The path to a file which the simulator will record -# events to or replay events from. -recordFilePath: "./simulator/record" +# The path to a directory where the record files are stored. +recordDirPath: "/record" diff --git a/simulator/config/config.go b/simulator/config/config.go index 2d074200e..ba84b58ff 100644 --- a/simulator/config/config.go +++ b/simulator/config/config.go @@ -42,12 +42,10 @@ type Config struct { ResourceImportLabelSelector metav1.LabelSelector // ResourceSyncEnabled indicates whether the simulator will keep syncing resources from a target cluster. ResourceSyncEnabled bool - // RecorderEnabled indicates whether the simulator will record events from a target cluster. - RecorderEnabled bool // ReplayerEnabled indicates whether the simulator will replay events recorded in a file. ReplayerEnabled bool - // RecordFilePath is the path to the file where the simulator records events. - RecordFilePath string + // RecordDirPath is the path to the file where the simulator records events. + RecordDirPath string // ExternalKubeClientCfg is KubeConfig to get resources from external cluster. // This field should be set when ExternalImportEnabled == true or ResourceSyncEnabled == true. ExternalKubeClientCfg *rest.Config @@ -90,17 +88,13 @@ func NewConfig() (*Config, error) { externalimportenabled := getExternalImportEnabled() resourceSyncEnabled := getResourceSyncEnabled() - recorderEnabled := getRecorderEnabled() replayerEnabled := getReplayerEnabled() - recordFilePath := getRecordFilePath() + recordFilePath := getRecordDirPath() externalKubeClientCfg := &rest.Config{} if hasTwoOrMoreTrue(externalimportenabled, resourceSyncEnabled, replayerEnabled) { return nil, xerrors.Errorf("externalImportEnabled, resourceSyncEnabled and replayerEnabled cannot be used simultaneously.") } - if replayerEnabled && recorderEnabled { - return nil, xerrors.Errorf("recorderEnabled and replayerEnabled cannot be used simultaneously.") - } - if externalimportenabled || resourceSyncEnabled || recorderEnabled { + if externalimportenabled || resourceSyncEnabled { externalKubeClientCfg, err = clientcmd.BuildConfigFromFlags("", configYaml.KubeConfig) if err != nil { return nil, xerrors.Errorf("get kube clientconfig: %w", err) @@ -122,9 +116,8 @@ func NewConfig() (*Config, error) { ResourceImportLabelSelector: configYaml.ResourceImportLabelSelector, ExternalKubeClientCfg: externalKubeClientCfg, ResourceSyncEnabled: resourceSyncEnabled, - RecorderEnabled: recorderEnabled, ReplayerEnabled: replayerEnabled, - RecordFilePath: recordFilePath, + RecordDirPath: recordFilePath, }, nil } @@ -287,18 +280,6 @@ func getResourceSyncEnabled() bool { return resourceSyncEnabled } -// getRecorderEnabled reads RECORDER_ENABLED and converts it to bool -// if empty from the config file. -// This function will return `true` if `RECORDER_ENABLED` is "1". -func getRecorderEnabled() bool { - recorderEnabledString := os.Getenv("RECORDER_ENABLED") - if recorderEnabledString == "" { - recorderEnabledString = strconv.FormatBool(configYaml.RecorderEnabled) - } - recorderEnabled, _ := strconv.ParseBool(recorderEnabledString) - return recorderEnabled -} - // getReplayerEnabled reads REPLAYER_ENABLED and converts it to bool // if empty from the config file. // This function will return `true` if `REPLAYER_ENABLED` is "1". @@ -311,12 +292,12 @@ func getReplayerEnabled() bool { return replayerEnabled } -// getRecordFilePath reads RECORD_FILE_PATH +// getRecordDirPath reads RECORD_FILE_PATH // if empty from the config file. -func getRecordFilePath() string { +func getRecordDirPath() string { recordFilePath := os.Getenv("RECORD_FILE_PATH") if recordFilePath == "" { - recordFilePath = configYaml.RecordFilePath + recordFilePath = configYaml.RecordDirPath } return recordFilePath } diff --git a/simulator/config/v1alpha1/types.go b/simulator/config/v1alpha1/types.go index 55756d111..442e4ea3a 100644 --- a/simulator/config/v1alpha1/types.go +++ b/simulator/config/v1alpha1/types.go @@ -67,17 +67,12 @@ type SimulatorConfiguration struct { // sync resources from an user cluster's or not. ResourceSyncEnabled bool `json:"resourceSyncEnabled,omitempty"` - // This variable indicates whether the simulator will - // record events from an user cluster's or not. - RecorderEnabled bool `json:"recorderEnabled,omitempty"` - // This variable indicates whether the simulator will // replay events recorded in a file or not. ReplayerEnabled bool `json:"replayEnabled,omitempty"` - // The path to a file which the simulator will record - // events to or replay events from. - RecordFilePath string `json:"recordFilePath,omitempty"` + // The path to a directory where the record files are stored. + RecordDirPath string `json:"recordDirPath,omitempty"` // This variable indicates whether an external scheduler // is used. diff --git a/simulator/replayer/replayer.go b/simulator/replayer/replayer.go index 2838af9ee..d79e7b3ba 100644 --- a/simulator/replayer/replayer.go +++ b/simulator/replayer/replayer.go @@ -45,7 +45,7 @@ func (s *Service) Replay(ctx context.Context) error { } filePath := path.Join(s.recordDir, file.Name()) - if err := s.replayRecordsFromFile(ctx, filePath, s.applier); err != nil { + if err := s.replayRecordsFromFile(ctx, filePath); err != nil { return xerrors.Errorf("failed to replay records from file: %w", err) } } @@ -53,7 +53,7 @@ func (s *Service) Replay(ctx context.Context) error { return nil } -func (s *Service) replayRecordsFromFile(ctx context.Context, path string, applier ResourceApplier) error { +func (s *Service) replayRecordsFromFile(ctx context.Context, path string) error { records := []recorder.Record{} b, err := os.ReadFile(path) diff --git a/simulator/server/di/di.go b/simulator/server/di/di.go index d44d4656d..b09d6ecb6 100644 --- a/simulator/server/di/di.go +++ b/simulator/server/di/di.go @@ -13,7 +13,6 @@ import ( configv1 "k8s.io/kube-scheduler/config/v1" "sigs.k8s.io/kube-scheduler-simulator/simulator/oneshotimporter" - "sigs.k8s.io/kube-scheduler-simulator/simulator/recorder" "sigs.k8s.io/kube-scheduler-simulator/simulator/replayer" "sigs.k8s.io/kube-scheduler-simulator/simulator/reset" "sigs.k8s.io/kube-scheduler-simulator/simulator/resourceapplier" @@ -31,7 +30,6 @@ type Container struct { oneshotClusterResourceImporter OneShotClusterResourceImporter resourceSyncer ResourceSyncer resourceWatcherService ResourceWatcherService - recorderSerivce RecorderService replayService ReplayService } @@ -47,13 +45,11 @@ func NewDIContainer( initialSchedulerCfg *configv1.KubeSchedulerConfiguration, externalImportEnabled bool, resourceSyncEnabled bool, - recordEnabled bool, replayEnabled bool, externalClient clientset.Interface, externalDynamicClient dynamic.Interface, simulatorPort int, resourceapplierOptions resourceapplier.Options, - recorderOptions recorder.Options, replayerOptions replayer.Options, ) (*Container, error) { c := &Container{} @@ -76,9 +72,6 @@ func NewDIContainer( c.resourceSyncer = syncer.New(externalDynamicClient, resourceApplierService) } c.resourceWatcherService = resourcewatcher.NewService(client) - if recordEnabled { - c.recorderSerivce = recorder.New(externalDynamicClient, recorderOptions) - } if replayEnabled { c.replayService = replayer.New(resourceApplierService, replayerOptions) } @@ -112,11 +105,6 @@ func (c *Container) ResourceSyncer() ResourceSyncer { return c.resourceSyncer } -// RecorderService returns RecorderService. -func (c *Container) RecorderService() RecorderService { - return c.recorderSerivce -} - // ReplayService returns ReplayService. func (c *Container) ReplayService() ReplayService { return c.replayService From c51eba5201acdd1c76468ca7bf0da82c7fb1d1a6 Mon Sep 17 00:00:00 2001 From: saza-ku Date: Wed, 5 Feb 2025 18:41:37 +0900 Subject: [PATCH 07/12] fix replayer test --- simulator/Makefile | 2 +- simulator/recorder/recorder.go | 5 +++++ simulator/recorder/recorder_test.go | 9 +++------ simulator/replayer/replayer_test.go | 14 +++++++++++--- 4 files changed, 20 insertions(+), 10 deletions(-) diff --git a/simulator/Makefile b/simulator/Makefile index 5c5c2dcc4..dbdc28724 100644 --- a/simulator/Makefile +++ b/simulator/Makefile @@ -14,7 +14,7 @@ format: .PHONY: test test: - go test -count=1 -v ./... + go test ./... .PHONY: mod-download mod-download: ## Downloads the Go module diff --git a/simulator/recorder/recorder.go b/simulator/recorder/recorder.go index 8480be1fd..6ebe7f4a1 100644 --- a/simulator/recorder/recorder.go +++ b/simulator/recorder/recorder.go @@ -94,6 +94,11 @@ func (s *Service) Run(ctx context.Context) error { infFact.WaitForCacheSync(ctx.Done()) } + err := os.MkdirAll(s.path, 0755) + if err != nil { + return xerrors.Errorf("failed to create record directory: %w", err) + } + go s.record(ctx) return nil diff --git a/simulator/recorder/recorder_test.go b/simulator/recorder/recorder_test.go index f6c622d7a..1af9d68d6 100644 --- a/simulator/recorder/recorder_test.go +++ b/simulator/recorder/recorder_test.go @@ -5,6 +5,7 @@ import ( "encoding/json" "os" "path" + "strings" "testing" "time" @@ -240,11 +241,7 @@ func TestRecorder(t *testing.T) { t.Run(tt.name, func(t *testing.T) { t.Parallel() - dir := path.Join(t.TempDir(), tt.name) - err := os.MkdirAll(dir, 0755) - if err != nil { - t.Fatal(err) - } + dir := path.Join(t.TempDir(), strings.ReplaceAll(tt.name, " ", "_")) defer os.RemoveAll(dir) s := runtime.NewScheme() @@ -258,7 +255,7 @@ func TestRecorder(t *testing.T) { defer cancel() service := New(client, Options{RecordDir: dir}) - err = service.Run(ctx) + err := service.Run(ctx) if (err != nil) != tt.wantErr { t.Errorf("Service.Record() error = %v, wantErr %v", err, tt.wantErr) return diff --git a/simulator/replayer/replayer_test.go b/simulator/replayer/replayer_test.go index 6f449984c..3f0f85582 100644 --- a/simulator/replayer/replayer_test.go +++ b/simulator/replayer/replayer_test.go @@ -4,6 +4,8 @@ import ( "context" "encoding/json" "os" + "path" + "strings" "testing" "go.uber.org/mock/gomock" @@ -102,11 +104,17 @@ func TestService_Replay(t *testing.T) { mockApplier := mock_resourceapplier.NewMockResourceApplier(ctrl) tt.prepareMockFn(mockApplier) - tempFile, err := os.CreateTemp("", "record.json") + recordDir := path.Join(os.TempDir(), strings.ReplaceAll(tt.name, " ", "_")) + filePath := path.Join(recordDir, "record.json") + err := os.MkdirAll(recordDir, 0755) + if err != nil { + t.Fatalf("failed to create record directory: %v", err) + } + tempFile, err := os.Create(filePath) if err != nil { t.Fatalf("failed to create temp file: %v", err) } - defer os.Remove(tempFile.Name()) + defer os.RemoveAll(recordDir) b, err := json.Marshal(tt.records) if err != nil { @@ -123,7 +131,7 @@ func TestService_Replay(t *testing.T) { t.Fatalf("failed to close temp file: %v", err) } - service := New(mockApplier, Options{tempFile.Name()}) + service := New(mockApplier, Options{RecordDir: recordDir}) err = service.Replay(context.Background()) if (err != nil) != tt.wantErr { From 5461dcb07d81844ecd1b6d68fb3052a111e2231b Mon Sep 17 00:00:00 2001 From: saza-ku Date: Wed, 5 Feb 2025 18:49:19 +0900 Subject: [PATCH 08/12] fix docs --- compose.yml | 1 - simulator/cmd/recorder/recorder.go | 7 +--- .../docs/record-and-replay-cluster-changes.md | 41 ++++++++----------- 3 files changed, 19 insertions(+), 30 deletions(-) diff --git a/compose.yml b/compose.yml index 0bc9c3ad0..35d8bc0b2 100644 --- a/compose.yml +++ b/compose.yml @@ -29,7 +29,6 @@ services: - ./simulator/kubeconfig.yaml:/kubeconfig.yaml - /var/run/docker.sock:/var/run/docker.sock - conf:/config - - ./simulator/record:/record environment: - PORT=1212 - KUBE_SCHEDULER_SIMULATOR_ETCD_URL=http://simulator-cluster:2379 diff --git a/simulator/cmd/recorder/recorder.go b/simulator/cmd/recorder/recorder.go index 6bded456a..f029b39e8 100644 --- a/simulator/cmd/recorder/recorder.go +++ b/simulator/cmd/recorder/recorder.go @@ -3,7 +3,6 @@ package main import ( "context" "flag" - "fmt" "os" "os/signal" "syscall" @@ -37,11 +36,9 @@ func startRecorder() error { } client := dynamic.NewForConfigOrDie(restCfg) - fmt.Println(restCfg.Host) - recorder := recorder.New(client, recorder.Options{ - RecordDir: *dirFlag, - }) + recorderOptions := recorder.Options{RecordDir: *dirFlag} + recorder := recorder.New(client, recorderOptions) ctx, cancel := context.WithCancel(context.Background()) defer cancel() diff --git a/simulator/docs/record-and-replay-cluster-changes.md b/simulator/docs/record-and-replay-cluster-changes.md index 7e04d8592..fa7da38e4 100644 --- a/simulator/docs/record-and-replay-cluster-changes.md +++ b/simulator/docs/record-and-replay-cluster-changes.md @@ -6,26 +6,11 @@ You can record resource addition/update/deletion at your real cluster. This feat To record changes from your real cluster, you need to follow these steps: -1. Set `true` to `recorderEnabled`. -2. Set the path of the kubeconfig file for your cluster to `KubeConfig`. - - This feature only requires the read permission for resources. -3. Set the path of the file to save the recorded changes to `recordedFilePath`. -4. Make sure the file path is mounted to the simulator container. - -```yaml -recorderEnabled: true -kubeConfig: "/path/to/your-cluster-kubeconfig" -recordedFilePath: "/path/to/recorded-changes.json" -``` - -```yaml -volumes: - ... - - ./path/to/recorded-changes.json:/path/to/recorded-changes.json -``` +1. Go to `simulator` directory. +2. Start the recorder by running `go run cmd/recorder/recorder.go --dir /path/to/directory-to-store-recorded-changes --kubeconfig /path/to/kubeconfig`. > [!NOTE] -> When a file already exists at `recordedFilePath`, it puts out an error. +> When a file already exists at the value of `--dir`, it will be overwritten. ### Resources to record @@ -37,12 +22,12 @@ It records the changes of the following resources: - PersistentVolumeClaims - StorageClasses -You can tweak which resources to record via the option in [/simulator/cmd/simulator/simulator.go](https://github.com/kubernetes-sigs/kube-scheduler-simulator/blob/master/simulator/cmd/simulator/simulator.go): +You can tweak which resources to record via the option in [/simulator/cmd/recorder/recorder.go](https://github.com/kubernetes-sigs/kube-scheduler-simulator/blob/master/simulator/cmd/recorder/recorder.go): ```go -recorderOptions := recorder.Options{Path: cfg.RecordFilePath, -// GVRs is a list of GroupVersionResource that will be recorded. -// If it's nil, DefaultGVRs are used. +recorderOptions := recorder.Options{RecordDir: *dirFlag, + // GVRs is a list of GroupVersionResource that will be recorded. + // If it's nil, DefaultGVRs are used. GVRs: []schema.GroupVersionResource{ {Group: "your-group", Version: "v1", Resource: "your-custom-resources"}, }, @@ -55,10 +40,18 @@ To replay the recorded changes in the simulator, you need to follow these steps: 1. Set `true` to `replayerEnabled`. 2. Set the path of the file where the changes are recorded to `recordedFilePath`. +3. Make sure the file path is mounted to the simulator server container. -```yaml + +```yaml:config.yaml replayerEnabled: true -recordedFilePath: "/path/to/recorded-changes.json" +recordedFilePath: "/path/to/directory-to-store-recorded-changes" +``` + +```yaml:compose.yml +volumes: + ... + - ./path/to/directory-to-store-recorded-changes:/path/to/directory-to-store-recorded-changes ``` ### Resources to replay From 1fcc3bd38836b598dfdd2055335cec5c29a04f87 Mon Sep 17 00:00:00 2001 From: saza-ku Date: Wed, 5 Feb 2025 19:09:15 +0900 Subject: [PATCH 09/12] add timeout option --- simulator/cmd/recorder/recorder.go | 43 ++++++++++++++----- .../docs/record-and-replay-cluster-changes.md | 3 ++ 2 files changed, 35 insertions(+), 11 deletions(-) diff --git a/simulator/cmd/recorder/recorder.go b/simulator/cmd/recorder/recorder.go index f029b39e8..d88a4574c 100644 --- a/simulator/cmd/recorder/recorder.go +++ b/simulator/cmd/recorder/recorder.go @@ -6,6 +6,7 @@ import ( "os" "os/signal" "syscall" + "time" "golang.org/x/xerrors" "k8s.io/client-go/dynamic" @@ -14,8 +15,11 @@ import ( "sigs.k8s.io/kube-scheduler-simulator/simulator/recorder" ) -var dirFlag = flag.String("dir", "", "directory to store the recorded resources") -var kubeConfigFlag = flag.String("kubeconfig", "", "path to kubeconfig file") +var ( + recordDir string + kubeConfig string + timeout int +) func main() { if err := startRecorder(); err != nil { @@ -24,38 +28,51 @@ func main() { } func startRecorder() error { - flag.Parse() - - if err := validateFlags(); err != nil { + if err := parseOptions(); err != nil { return err } - restCfg, err := clientcmd.BuildConfigFromFlags("", *kubeConfigFlag) + restCfg, err := clientcmd.BuildConfigFromFlags("", kubeConfig) if err != nil { return xerrors.Errorf("load kubeconfig: %w", err) } client := dynamic.NewForConfigOrDie(restCfg) - recorderOptions := recorder.Options{RecordDir: *dirFlag} + recorderOptions := recorder.Options{RecordDir: recordDir} recorder := recorder.New(client, recorderOptions) ctx, cancel := context.WithCancel(context.Background()) defer cancel() + if timeout > 0 { + ctx, cancel = context.WithTimeout(ctx, time.Duration(timeout)*time.Second) + defer cancel() + } if err := recorder.Run(ctx); err != nil { return xerrors.Errorf("run recorder: %w", err) } // Block until signal is received - ch := make(chan os.Signal, 1) - signal.Notify(ch, syscall.SIGINT, syscall.SIGTERM) - <-ch + quit := make(chan os.Signal, 1) + signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM) + select { + case <-quit: + case <-ctx.Done(): + } return nil } -func validateFlags() error { +func parseOptions() error { + var ( + dirFlag = flag.String("dir", "", "directory to store the recorded resources") + kubeConfigFlag = flag.String("kubeconfig", "", "path to kubeconfig file") + timeoutFlag = flag.Int("timeout", 0, "timeout in seconds for the simulator to run") + ) + + flag.Parse() + if *dirFlag == "" { return xerrors.New("dir flag is required") } @@ -64,5 +81,9 @@ func validateFlags() error { return xerrors.New("kubeconfig flag is required") } + recordDir = *dirFlag + kubeConfig = *kubeConfigFlag + timeout = *timeoutFlag + return nil } diff --git a/simulator/docs/record-and-replay-cluster-changes.md b/simulator/docs/record-and-replay-cluster-changes.md index fa7da38e4..c34af077c 100644 --- a/simulator/docs/record-and-replay-cluster-changes.md +++ b/simulator/docs/record-and-replay-cluster-changes.md @@ -12,6 +12,9 @@ To record changes from your real cluster, you need to follow these steps: > [!NOTE] > When a file already exists at the value of `--dir`, it will be overwritten. +> [!NOTE] +> You can add `--timeout` option to set the timeout for the recorder. The value is in seconds. If not set, the recorder will run until it's stopped. + ### Resources to record It records the changes of the following resources: From c5fd9c31ec4bea4b1aa46ec596da4b06f54c916a Mon Sep 17 00:00:00 2001 From: saza-ku Date: Wed, 5 Feb 2025 19:14:23 +0900 Subject: [PATCH 10/12] fix recordFilePath to recordDirPath --- simulator/config.yaml | 2 +- simulator/config/config.go | 14 +++++++------- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/simulator/config.yaml b/simulator/config.yaml index e83970606..6d11cb1c5 100644 --- a/simulator/config.yaml +++ b/simulator/config.yaml @@ -55,7 +55,7 @@ resourceSyncEnabled: false # You cannot make two or more of externalImportEnabled, resourceSyncEnabled and replayEnabled true # because those features would be conflicted. # Note, this is still a beta feature. -replayEnabled: true +replayEnabled: false # The path to a directory where the record files are stored. recordDirPath: "/record" diff --git a/simulator/config/config.go b/simulator/config/config.go index ba84b58ff..e82f1509e 100644 --- a/simulator/config/config.go +++ b/simulator/config/config.go @@ -89,7 +89,7 @@ func NewConfig() (*Config, error) { externalimportenabled := getExternalImportEnabled() resourceSyncEnabled := getResourceSyncEnabled() replayerEnabled := getReplayerEnabled() - recordFilePath := getRecordDirPath() + recordDirPath := getRecordDirPath() externalKubeClientCfg := &rest.Config{} if hasTwoOrMoreTrue(externalimportenabled, resourceSyncEnabled, replayerEnabled) { return nil, xerrors.Errorf("externalImportEnabled, resourceSyncEnabled and replayerEnabled cannot be used simultaneously.") @@ -117,7 +117,7 @@ func NewConfig() (*Config, error) { ExternalKubeClientCfg: externalKubeClientCfg, ResourceSyncEnabled: resourceSyncEnabled, ReplayerEnabled: replayerEnabled, - RecordDirPath: recordFilePath, + RecordDirPath: recordDirPath, }, nil } @@ -292,14 +292,14 @@ func getReplayerEnabled() bool { return replayerEnabled } -// getRecordDirPath reads RECORD_FILE_PATH +// getRecordDirPath reads RECORD_DIR_PATH // if empty from the config file. func getRecordDirPath() string { - recordFilePath := os.Getenv("RECORD_FILE_PATH") - if recordFilePath == "" { - recordFilePath = configYaml.RecordDirPath + recordDirPath := os.Getenv("RECORD_DIR_PATH") + if recordDirPath == "" { + recordDirPath = configYaml.RecordDirPath } - return recordFilePath + return recordDirPath } func decodeSchedulerCfg(buf []byte) (*configv1.KubeSchedulerConfiguration, error) { From ffacb0df0e2e145e681285ae847feb8c6b9a8921 Mon Sep 17 00:00:00 2001 From: saza-ku Date: Wed, 5 Feb 2025 19:15:55 +0900 Subject: [PATCH 11/12] fix docs --- simulator/docs/record-and-replay-cluster-changes.md | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/simulator/docs/record-and-replay-cluster-changes.md b/simulator/docs/record-and-replay-cluster-changes.md index c34af077c..668211a0d 100644 --- a/simulator/docs/record-and-replay-cluster-changes.md +++ b/simulator/docs/record-and-replay-cluster-changes.md @@ -28,7 +28,7 @@ It records the changes of the following resources: You can tweak which resources to record via the option in [/simulator/cmd/recorder/recorder.go](https://github.com/kubernetes-sigs/kube-scheduler-simulator/blob/master/simulator/cmd/recorder/recorder.go): ```go -recorderOptions := recorder.Options{RecordDir: *dirFlag, +recorderOptions := recorder.Options{RecordDir: recordDir, // GVRs is a list of GroupVersionResource that will be recorded. // If it's nil, DefaultGVRs are used. GVRs: []schema.GroupVersionResource{ @@ -42,13 +42,13 @@ recorderOptions := recorder.Options{RecordDir: *dirFlag, To replay the recorded changes in the simulator, you need to follow these steps: 1. Set `true` to `replayerEnabled`. -2. Set the path of the file where the changes are recorded to `recordedFilePath`. -3. Make sure the file path is mounted to the simulator server container. +2. Set the path of the directory where the changes are recorded to `recordDirPath`. +3. Make sure the directory is mounted to the simulator server container. ```yaml:config.yaml replayerEnabled: true -recordedFilePath: "/path/to/directory-to-store-recorded-changes" +recordDirPath: "/path/to/directory-to-store-recorded-changes" ``` ```yaml:compose.yml From d5a40a9b7bee156798d858991200dc7eb021dd91 Mon Sep 17 00:00:00 2001 From: saza-ku Date: Wed, 5 Feb 2025 19:24:22 +0900 Subject: [PATCH 12/12] fix for lint --- simulator/cmd/recorder/recorder.go | 1 + simulator/recorder/recorder.go | 42 ++++++++++++++++++----------- simulator/recorder/recorder_test.go | 8 +++--- simulator/replayer/replayer_test.go | 2 +- 4 files changed, 33 insertions(+), 20 deletions(-) diff --git a/simulator/cmd/recorder/recorder.go b/simulator/cmd/recorder/recorder.go index d88a4574c..90fa751d4 100644 --- a/simulator/cmd/recorder/recorder.go +++ b/simulator/cmd/recorder/recorder.go @@ -12,6 +12,7 @@ import ( "k8s.io/client-go/dynamic" "k8s.io/client-go/tools/clientcmd" "k8s.io/klog" + "sigs.k8s.io/kube-scheduler-simulator/simulator/recorder" ) diff --git a/simulator/recorder/recorder.go b/simulator/recorder/recorder.go index 6ebe7f4a1..010f44c09 100644 --- a/simulator/recorder/recorder.go +++ b/simulator/recorder/recorder.go @@ -94,7 +94,7 @@ func (s *Service) Run(ctx context.Context) error { infFact.WaitForCacheSync(ctx.Done()) } - err := os.MkdirAll(s.path, 0755) + err := os.MkdirAll(s.path, 0o755) if err != nil { return xerrors.Errorf("failed to create record directory: %w", err) } @@ -130,21 +130,9 @@ func (s *Service) record(ctx context.Context) { }() filePath := path.Join(s.path, fmt.Sprintf("record-%018d.json", count)) - file, err := os.Create(filePath) + err := writeToFile(filePath, records) if err != nil { - klog.Error("Failed to create record file: ", err) - return - } - - b, err := json.Marshal(records) - if err != nil { - klog.Error("Failed to marshal record: ", err) - return - } - - _, err = file.Write(b) - if err != nil { - klog.Error("Failed to write record: ", err) + klog.Errorf("failed to write records to file: %v", err) return } } @@ -170,3 +158,27 @@ func (s *Service) record(ctx context.Context) { } } } + +func writeToFile(filePath string, records []Record) error { + file, err := os.Create(filePath) + if err != nil { + return xerrors.Errorf("failed to create record file: %w", err) + } + + b, err := json.Marshal(records) + if err != nil { + return xerrors.Errorf("failed to marshal records: %w", err) + } + + _, err = file.Write(b) + if err != nil { + return xerrors.Errorf("failed to write records: %w", err) + } + + err = file.Close() + if err != nil { + return xerrors.Errorf("failed to close file: %w", err) + } + + return nil +} diff --git a/simulator/recorder/recorder_test.go b/simulator/recorder/recorder_test.go index 1af9d68d6..3403cb6ac 100644 --- a/simulator/recorder/recorder_test.go +++ b/simulator/recorder/recorder_test.go @@ -276,7 +276,7 @@ func TestRecorder(t *testing.T) { func apply(ctx context.Context, client *dynamicFake.FakeDynamicClient, resourceToCreate []unstructured.Unstructured, resourceToUpdate []unstructured.Unstructured, resourceToDelete []unstructured.Unstructured) error { for _, resource := range resourceToCreate { - gvr, err := findGVR(&resource) + gvr, err := findGVR(resource) if err != nil { return xerrors.Errorf("failed to find GVR: %w", err) } @@ -289,7 +289,7 @@ func apply(ctx context.Context, client *dynamicFake.FakeDynamicClient, resourceT } for _, resource := range resourceToUpdate { - gvr, err := findGVR(&resource) + gvr, err := findGVR(resource) if err != nil { return xerrors.Errorf("failed to find GVR: %w", err) } @@ -302,7 +302,7 @@ func apply(ctx context.Context, client *dynamicFake.FakeDynamicClient, resourceT } for _, resource := range resourceToDelete { - gvr, err := findGVR(&resource) + gvr, err := findGVR(resource) if err != nil { return xerrors.Errorf("failed to find GVR: %w", err) } @@ -389,7 +389,7 @@ var ( mapper = restmapper.NewDiscoveryRESTMapper(resources) ) -func findGVR(obj *unstructured.Unstructured) (schema.GroupVersionResource, error) { +func findGVR(obj unstructured.Unstructured) (schema.GroupVersionResource, error) { gvk := obj.GroupVersionKind() m, err := mapper.RESTMapping(gvk.GroupKind(), gvk.Version) if err != nil { diff --git a/simulator/replayer/replayer_test.go b/simulator/replayer/replayer_test.go index 3f0f85582..e2dcb0958 100644 --- a/simulator/replayer/replayer_test.go +++ b/simulator/replayer/replayer_test.go @@ -106,7 +106,7 @@ func TestService_Replay(t *testing.T) { recordDir := path.Join(os.TempDir(), strings.ReplaceAll(tt.name, " ", "_")) filePath := path.Join(recordDir, "record.json") - err := os.MkdirAll(recordDir, 0755) + err := os.MkdirAll(recordDir, 0o755) if err != nil { t.Fatalf("failed to create record directory: %v", err) }