Skip to content

Commit

Permalink
implement recorder and replayer
Browse files Browse the repository at this point in the history
  • Loading branch information
saza-ku committed Jan 21, 2025
1 parent bc586b6 commit 5c4133d
Show file tree
Hide file tree
Showing 14 changed files with 1,138 additions and 18 deletions.
24 changes: 22 additions & 2 deletions simulator/cmd/simulator/simulator.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import (
"k8s.io/klog/v2"

"sigs.k8s.io/kube-scheduler-simulator/simulator/config"
"sigs.k8s.io/kube-scheduler-simulator/simulator/recorder"
"sigs.k8s.io/kube-scheduler-simulator/simulator/replayer"
"sigs.k8s.io/kube-scheduler-simulator/simulator/resourceapplier"
"sigs.k8s.io/kube-scheduler-simulator/simulator/server"
"sigs.k8s.io/kube-scheduler-simulator/simulator/server/di"
Expand Down Expand Up @@ -58,7 +60,7 @@ func startSimulator() error {

importClusterResourceClient := &clientset.Clientset{}
var importClusterDynamicClient dynamic.Interface
if cfg.ExternalImportEnabled || cfg.ResourceSyncEnabled {
if cfg.ExternalImportEnabled || cfg.ResourceSyncEnabled || cfg.RecorderEnabled {
importClusterResourceClient, err = clientset.NewForConfig(cfg.ExternalKubeClientCfg)
if err != nil {
return xerrors.Errorf("creates a new Clientset for the ExternalKubeClientCfg: %w", err)
Expand Down Expand Up @@ -94,7 +96,11 @@ func startSimulator() error {
return xerrors.Errorf("kubeapi-server is not ready: %w", err)
}

dic, err := di.NewDIContainer(client, dynamicClient, restMapper, etcdclient, restCfg, cfg.InitialSchedulerCfg, cfg.ExternalImportEnabled, cfg.ResourceSyncEnabled, importClusterResourceClient, importClusterDynamicClient, cfg.Port, resourceapplier.Options{})
recorderOptions := recorder.Options{Path: cfg.RecordFilePath}
replayerOptions := replayer.Options{Path: cfg.RecordFilePath}
resourceApplierOptions := resourceapplier.Options{}

dic, err := di.NewDIContainer(client, dynamicClient, restMapper, etcdclient, restCfg, cfg.InitialSchedulerCfg, cfg.ExternalImportEnabled, cfg.ResourceSyncEnabled, cfg.RecorderEnabled, cfg.ReplayerEnabled, importClusterResourceClient, importClusterDynamicClient, cfg.Port, resourceApplierOptions, recorderOptions, replayerOptions)
if err != nil {
return xerrors.Errorf("create di container: %w", err)
}
Expand All @@ -110,6 +116,13 @@ func startSimulator() error {
}
}

// If ReplayEnabled is enabled, the simulator replays the recorded resources.
if cfg.ReplayerEnabled {
if err := dic.ReplayService().Replay(ctx); err != nil {
return xerrors.Errorf("replay resources: %w", err)
}
}

dic.SchedulerService().SetSchedulerConfig(cfg.InitialSchedulerCfg)

if cfg.ResourceSyncEnabled {
Expand All @@ -119,6 +132,13 @@ func startSimulator() error {
}
}

if cfg.RecorderEnabled {
// Start the recorder to record events in the target cluster.
if err = dic.RecorderService().Run(ctx); err != nil {
return xerrors.Errorf("start recording: %w", err)
}
}

// start simulator server
s := server.NewSimulatorServer(cfg, dic)
shutdownFn, err := s.Start(cfg.Port)
Expand Down
28 changes: 24 additions & 4 deletions simulator/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ etcdURL: "http://127.0.0.1:2379"
corsAllowedOriginList:
- "http://localhost:3000"

# This is for the beta feature "One-shot importing cluster's resources"
# and "Continuous syncing cluster's resources".
# This is for the beta feature "One-shot importing cluster's resources",
# "Continuous syncing cluster's resources" and "Recording cluster's events".
# This variable is used to find Kubeconfig required to access your
# cluster for importing resources to scheduler simulator.
kubeConfig: "/kubeconfig.yaml"
Expand All @@ -38,12 +38,32 @@ kubeSchedulerConfigPath: ""
# This variable indicates whether the simulator will
# import resources from a user cluster specified by kubeConfig.
# Note that it only imports the resources once when the simulator is started.
# You cannot make both externalImportEnabled and resourceSyncEnabled true because those features would be conflicted.
# You cannot make two or more of externalImportEnabled, resourceSyncEnabled and replayEnabled true
# because those features would be conflicted.
# This is still a beta feature.
externalImportEnabled: false

# This variable indicates whether the simulator will
# keep syncing resources from an user cluster's or not.
# You cannot make both externalImportEnabled and resourceSyncEnabled true because those features would be conflicted.
# You cannot make two or more of externalImportEnabled, resourceSyncEnabled and replayEnabled true
# because those features would be conflicted.
# Note, this is still a beta feature.
resourceSyncEnabled: false

# This variable indicates whether the simulator will
# record events from a user cluster specified by kubeConfig.
# You cannot make both of recorderEnabled and replayEnabled true because those features would be conflicted.
# Note, this is still a beta feature.
recorderEnabled: false

# This variable indicates whether the simulator will
# replay events recorded in the file specified by recordFilePath.
# You cannot make two or more of externalImportEnabled, resourceSyncEnabled and replayEnabled true
# because those features would be conflicted.
# You cannot make both of recorderEnabled and replayEnabled true because those features would be conflicted.
# Note, this is still a beta feature.
replayEnabled: true

# The path to a file which the simulator will record
# events to or replay events from.
recordFilePath: "/record/record.json"
65 changes: 62 additions & 3 deletions simulator/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,12 @@ type Config struct {
ResourceImportLabelSelector metav1.LabelSelector
// ResourceSyncEnabled indicates whether the simulator will keep syncing resources from a target cluster.
ResourceSyncEnabled bool
// RecorderEnabled indicates whether the simulator will record events from a target cluster.
RecorderEnabled bool
// ReplayerEnabled indicates whether the simulator will replay events recorded in a file.
ReplayerEnabled bool
// RecordFilePath is the path to the file where the simulator records events.
RecordFilePath string
// ExternalKubeClientCfg is KubeConfig to get resources from external cluster.
// This field should be set when ExternalImportEnabled == true or ResourceSyncEnabled == true.
ExternalKubeClientCfg *rest.Config
Expand Down Expand Up @@ -84,11 +90,17 @@ func NewConfig() (*Config, error) {

externalimportenabled := getExternalImportEnabled()
resourceSyncEnabled := getResourceSyncEnabled()
recorderEnabled := getRecorderEnabled()
replayerEnabled := getReplayerEnabled()
recordFilePath := getRecordFilePath()
externalKubeClientCfg := &rest.Config{}
if externalimportenabled && resourceSyncEnabled {
return nil, xerrors.Errorf("externalImportEnabled and resourceSyncEnabled cannot be used simultaneously.")
if hasTwoOrMoreTrue(externalimportenabled, resourceSyncEnabled, replayerEnabled) {
return nil, xerrors.Errorf("externalImportEnabled, resourceSyncEnabled and replayerEnabled cannot be used simultaneously.")
}
if externalimportenabled || resourceSyncEnabled {
if replayerEnabled && recorderEnabled {
return nil, xerrors.Errorf("recorderEnabled and replayerEnabled cannot be used simultaneously.")
}
if externalimportenabled || resourceSyncEnabled || recorderEnabled {
externalKubeClientCfg, err = clientcmd.BuildConfigFromFlags("", configYaml.KubeConfig)
if err != nil {
return nil, xerrors.Errorf("get kube clientconfig: %w", err)
Expand All @@ -110,6 +122,9 @@ func NewConfig() (*Config, error) {
ResourceImportLabelSelector: configYaml.ResourceImportLabelSelector,
ExternalKubeClientCfg: externalKubeClientCfg,
ResourceSyncEnabled: resourceSyncEnabled,
RecorderEnabled: recorderEnabled,
ReplayerEnabled: replayerEnabled,
RecordFilePath: recordFilePath,
}, nil
}

Expand Down Expand Up @@ -272,6 +287,40 @@ func getResourceSyncEnabled() bool {
return resourceSyncEnabled
}

// getRecorderEnabled reads RECORDER_ENABLED and converts it to bool
// if empty from the config file.
// This function will return `true` if `RECORDER_ENABLED` is "1".
func getRecorderEnabled() bool {
recorderEnabledString := os.Getenv("RECORDER_ENABLED")
if recorderEnabledString == "" {
recorderEnabledString = strconv.FormatBool(configYaml.RecorderEnabled)
}
recorderEnabled, _ := strconv.ParseBool(recorderEnabledString)
return recorderEnabled
}

// getReplayerEnabled reads REPLAYER_ENABLED and converts it to bool
// if empty from the config file.
// This function will return `true` if `REPLAYER_ENABLED` is "1".
func getReplayerEnabled() bool {
replayerEnabledString := os.Getenv("REPLAYER_ENABLED")
if replayerEnabledString == "" {
replayerEnabledString = strconv.FormatBool(configYaml.ReplayerEnabled)
}
replayerEnabled, _ := strconv.ParseBool(replayerEnabledString)
return replayerEnabled
}

// getRecordFilePath reads RECORD_FILE_PATH
// if empty from the config file.
func getRecordFilePath() string {
recordFilePath := os.Getenv("RECORD_FILE_PATH")
if recordFilePath == "" {
recordFilePath = configYaml.RecordFilePath
}
return recordFilePath
}

func decodeSchedulerCfg(buf []byte) (*configv1.KubeSchedulerConfiguration, error) {
decoder := scheme.Codecs.UniversalDeserializer()
obj, _, err := decoder.Decode(buf, nil, nil)
Expand Down Expand Up @@ -299,3 +348,13 @@ func GetKubeClientConfig() (*rest.Config, error) {
}
return clientConfig, nil
}

func hasTwoOrMoreTrue(values ...bool) bool {
count := 0
for _, v := range values {
if v {
count++
}
}
return count >= 2
}
12 changes: 12 additions & 0 deletions simulator/config/v1alpha1/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,18 @@ type SimulatorConfiguration struct {
// sync resources from an user cluster's or not.
ResourceSyncEnabled bool `json:"resourceSyncEnabled,omitempty"`

// This variable indicates whether the simulator will
// record events from an user cluster's or not.
RecorderEnabled bool `json:"recorderEnabled,omitempty"`

// This variable indicates whether the simulator will
// replay events recorded in a file or not.
ReplayerEnabled bool `json:"replayEnabled,omitempty"`

// The path to a file which the simulator will record
// events to or replay events from.
RecordFilePath string `json:"recordFilePath,omitempty"`

// This variable indicates whether an external scheduler
// is used.
ExternalSchedulerEnabled bool `json:"externalSchedulerEnabled,omitempty"`
Expand Down
10 changes: 5 additions & 5 deletions simulator/docs/import-cluster-resources.md
Original file line number Diff line number Diff line change
Expand Up @@ -68,10 +68,10 @@ It imports the following resources, which the scheduler's default plugins take i
If you need to, you can tweak which resources to import via the option in [/simulator/cmd/simulator/simulator.go](https://github.com/kubernetes-sigs/kube-scheduler-simulator/blob/master/simulator/cmd/simulator/simulator.go):

```go
dic, err := di.NewDIContainer(..., resourceapplier.Options{
// GVRsToSync is a list of GroupVersionResource that will be synced.
// If GVRsToSync is nil, defaultGVRs are used.
GVRsToSync: []schema.GroupVersionResource{
resourceApplierOptions := resourceapplier.Options{
// GVRsToApply is a list of GroupVersionResource that will be synced.
// If GVRsToApply is nil, defaultGVRs are used.
GVRsToApply: []schema.GroupVersionResource{
{Group: "your-group", Version: "v1", Resource: "your-custom-resources"},
},
Expand All @@ -85,7 +85,7 @@ dic, err := di.NewDIContainer(..., resourceapplier.Options{
FilterBeforeUpdating: map[schema.GroupVersionResource][]resourceapplier.FilteringFunction{},
// MutateBeforeUpdating is a list of additional mutating functions that are applied before updating resources.
MutateBeforeUpdating: map[schema.GroupVersionResource][]resourceapplier.MutatingFunction{},
})
}
```

> [!NOTE]
Expand Down
95 changes: 95 additions & 0 deletions simulator/docs/record-and-replay-cluster-events.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
# [Beta] Record your real cluster's events and replay them in the simulator

You can record events from your real cluster and replay them in the simulator. This feature is useful for reproducing issues that occur in your real cluster.

## Record events

To record events from your real cluster, you need to follow these steps:

1. Set `true` to `recorderEnabled`.
2. Set the path of the kubeconfig file for your cluster to `KubeConfig`.
- This feature only requires the read permission for events.
3. Set the path of the file to save the recorded events to `recordedFilePath`.
4. Make sure the file path is mounted to the simulator container.

```yaml
recorderEnabled: true
kubeConfig: "/path/to/your-cluster-kubeconfig"
recordedFilePath: "/path/to/recorded-events.json"
```
```yaml
volumes:
...
- ./path/to/recorded-events.json:/path/to/recorded-events.json
```
> [!NOTE]
> When a file already exists at `recordedFilePath`, it backs up the file in the same directory adding a timestamp to the filename and creates a new file for recording.

### Resources to record

It records the events of the following resources:

- Pods
- Nodes
- PersistentVolumes
- PersistentVolumeClaims
- StorageClasses

You can tweak which resources to record via the option in [/simulator/cmd/simulator/simulator.go](https://github.com/kubernetes-sigs/kube-scheduler-simulator/blob/master/simulator/cmd/simulator/simulator.go):

```go
recorderOptions := recorder.Options{Path: cfg.RecordFilePath,
// GVRs is a list of GroupVersionResource that will be recorded.
// If it's nil, DefaultGVRs are used.
GVRs: []schema.GroupVersionResource{
{Group: "your-group", Version: "v1", Resource: "your-custom-resources"},
},
}
```

## Replay events

To replay the recorded events in the simulator, you need to follow these steps:

1. Set `true` to `replayerEnabled`.
2. Set the path of the file where the events are recorded to `recordedFilePath`.

```yaml
replayerEnabled: true
recordedFilePath: "/path/to/recorded-events.json"
```

### Resources to replay

It replays the events of the following resources:

- Pods
- Nodes
- PersistentVolumes
- PersistentVolumeClaims
- StorageClasses

You can tweak which resources to replay via the option in [/simulator/cmd/simulator/simulator.go](https://github.com/kubernetes-sigs/kube-scheduler-simulator/blob/master/simulator/cmd/simulator/simulator.go):

```go
resourceApplierOptions := resourceapplier.Options{
// GVRsToApply is a list of GroupVersionResource that will be replayed.
// If GVRsToApply is nil, defaultGVRs are used.
GVRsToApply: []schema.GroupVersionResource{
{Group: "your-group", Version: "v1", Resource: "your-custom-resources"},
},
// Actually, more options are available...
// FilterBeforeCreating is a list of additional filtering functions that are applied before creating resources.
FilterBeforeCreating: map[schema.GroupVersionResource][]resourceapplier.FilteringFunction{},
// MutateBeforeCreating is a list of additional mutating functions that are applied before creating resources.
MutateBeforeCreating: map[schema.GroupVersionResource][]resourceapplier.MutatingFunction{},
// FilterBeforeUpdating is a list of additional filtering functions that are applied before updating resources.
FilterBeforeUpdating: map[schema.GroupVersionResource][]resourceapplier.FilteringFunction{},
// MutateBeforeUpdating is a list of additional mutating functions that are applied before updating resources.
MutateBeforeUpdating: map[schema.GroupVersionResource][]resourceapplier.MutatingFunction{},
}
```
Loading

0 comments on commit 5c4133d

Please sign in to comment.