Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: implement recorder and replayer #403

Closed
wants to merge 12 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ you can refer to the [documentation](./simulator/docs/simulator-server-config.md

See the following docs to know more about simulator:
- [import-cluster-resources.md](./simulator/docs/import-cluster-resources.md): describes how you can import resources in your cluster to the simulator so that you can simulate scheduling based on your cluster's situation.
- [record-and-replay-cluster-changes.md](./simulator/docs/record-and-replay-cluster-changes.md): describes how you can record and replay the resources changes in the simulator.
- [how-it-works.md](simulator/docs/how-it-works.md): describes about how the simulator works.
- [kube-apiserver.md](simulator/docs/kube-apiserver.md): describe about kube-apiserver in simulator. (how you can configure and access)
- [api.md](simulator/docs/api.md): describes about HTTP server the simulator has. (mainly for the webUI)
Expand Down
14 changes: 14 additions & 0 deletions simulator/cmd/recorder/kubeconfig.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
apiVersion: v1
kind: Config

clusters:
- cluster:
server: http://localhost:3132
name: simulator

contexts:
- context:
cluster: simulator
name: simulator

current-context: simulator
90 changes: 90 additions & 0 deletions simulator/cmd/recorder/recorder.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
package main

import (
"context"
"flag"
"os"
"os/signal"
"syscall"
"time"

"golang.org/x/xerrors"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/klog"

"sigs.k8s.io/kube-scheduler-simulator/simulator/recorder"
)

var (
recordDir string
kubeConfig string
timeout int
)

func main() {
if err := startRecorder(); err != nil {
klog.Fatalf("failed with error on running simulator: %+v", err)
}
}

func startRecorder() error {
if err := parseOptions(); err != nil {
return err
}

restCfg, err := clientcmd.BuildConfigFromFlags("", kubeConfig)
if err != nil {
return xerrors.Errorf("load kubeconfig: %w", err)
}

client := dynamic.NewForConfigOrDie(restCfg)

recorderOptions := recorder.Options{RecordDir: recordDir}
recorder := recorder.New(client, recorderOptions)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
if timeout > 0 {
ctx, cancel = context.WithTimeout(ctx, time.Duration(timeout)*time.Second)
defer cancel()
}

if err := recorder.Run(ctx); err != nil {
return xerrors.Errorf("run recorder: %w", err)
}

// Block until signal is received
quit := make(chan os.Signal, 1)
signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)
select {
case <-quit:
case <-ctx.Done():
}

return nil
}

func parseOptions() error {
var (
dirFlag = flag.String("dir", "", "directory to store the recorded resources")
kubeConfigFlag = flag.String("kubeconfig", "", "path to kubeconfig file")
timeoutFlag = flag.Int("timeout", 0, "timeout in seconds for the simulator to run")
)

flag.Parse()

if *dirFlag == "" {
return xerrors.New("dir flag is required")
}

if *kubeConfigFlag == "" {
return xerrors.New("kubeconfig flag is required")
}

recordDir = *dirFlag
kubeConfig = *kubeConfigFlag
timeout = *timeoutFlag

return nil
}
13 changes: 12 additions & 1 deletion simulator/cmd/simulator/simulator.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"k8s.io/klog/v2"

"sigs.k8s.io/kube-scheduler-simulator/simulator/config"
"sigs.k8s.io/kube-scheduler-simulator/simulator/replayer"
"sigs.k8s.io/kube-scheduler-simulator/simulator/resourceapplier"
"sigs.k8s.io/kube-scheduler-simulator/simulator/server"
"sigs.k8s.io/kube-scheduler-simulator/simulator/server/di"
Expand Down Expand Up @@ -94,7 +95,10 @@ func startSimulator() error {
return xerrors.Errorf("kubeapi-server is not ready: %w", err)
}

dic, err := di.NewDIContainer(client, dynamicClient, restMapper, etcdclient, restCfg, cfg.InitialSchedulerCfg, cfg.ExternalImportEnabled, cfg.ResourceSyncEnabled, importClusterResourceClient, importClusterDynamicClient, cfg.Port, resourceapplier.Options{})
replayerOptions := replayer.Options{RecordDir: cfg.RecordDirPath}
resourceApplierOptions := resourceapplier.Options{}

dic, err := di.NewDIContainer(client, dynamicClient, restMapper, etcdclient, restCfg, cfg.InitialSchedulerCfg, cfg.ExternalImportEnabled, cfg.ResourceSyncEnabled, cfg.ReplayerEnabled, importClusterResourceClient, importClusterDynamicClient, cfg.Port, resourceApplierOptions, replayerOptions)
if err != nil {
return xerrors.Errorf("create di container: %w", err)
}
Expand All @@ -110,6 +114,13 @@ func startSimulator() error {
}
}

// If ReplayEnabled is enabled, the simulator replays the recorded resources.
if cfg.ReplayerEnabled {
if err := dic.ReplayService().Replay(ctx); err != nil {
return xerrors.Errorf("replay resources: %w", err)
}
}

dic.SchedulerService().SetSchedulerConfig(cfg.InitialSchedulerCfg)

if cfg.ResourceSyncEnabled {
Expand Down
20 changes: 16 additions & 4 deletions simulator/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ etcdURL: "http://127.0.0.1:2379"
corsAllowedOriginList:
- "http://localhost:3000"

# This is for the beta feature "One-shot importing cluster's resources"
# and "Continuous syncing cluster's resources".
# This is for the beta feature "One-shot importing cluster's resources",
# "Continuous syncing cluster's resources" and "Replaying cluster's events".
# This variable is used to find Kubeconfig required to access your
# cluster for importing resources to scheduler simulator.
kubeConfig: "/kubeconfig.yaml"
Expand All @@ -38,12 +38,24 @@ kubeSchedulerConfigPath: ""
# This variable indicates whether the simulator will
# import resources from a user cluster specified by kubeConfig.
# Note that it only imports the resources once when the simulator is started.
# You cannot make both externalImportEnabled and resourceSyncEnabled true because those features would be conflicted.
# You cannot make two or more of externalImportEnabled, resourceSyncEnabled and replayEnabled true
# because those features would be conflicted.
# This is still a beta feature.
externalImportEnabled: false

# This variable indicates whether the simulator will
# keep syncing resources from an user cluster's or not.
# You cannot make both externalImportEnabled and resourceSyncEnabled true because those features would be conflicted.
# You cannot make two or more of externalImportEnabled, resourceSyncEnabled and replayEnabled true
# because those features would be conflicted.
# Note, this is still a beta feature.
resourceSyncEnabled: false

# This variable indicates whether the simulator will
# replay events recorded in the file specified by recordDirPath.
# You cannot make two or more of externalImportEnabled, resourceSyncEnabled and replayEnabled true
# because those features would be conflicted.
# Note, this is still a beta feature.
replayEnabled: false

# The path to a directory where the record files are stored.
recordDirPath: "/record"
44 changes: 42 additions & 2 deletions simulator/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,10 @@ type Config struct {
ResourceImportLabelSelector metav1.LabelSelector
// ResourceSyncEnabled indicates whether the simulator will keep syncing resources from a target cluster.
ResourceSyncEnabled bool
// ReplayerEnabled indicates whether the simulator will replay events recorded in a file.
ReplayerEnabled bool
// RecordDirPath is the path to the file where the simulator records events.
RecordDirPath string
// ExternalKubeClientCfg is KubeConfig to get resources from external cluster.
// This field should be set when ExternalImportEnabled == true or ResourceSyncEnabled == true.
ExternalKubeClientCfg *rest.Config
Expand Down Expand Up @@ -84,9 +88,11 @@ func NewConfig() (*Config, error) {

externalimportenabled := getExternalImportEnabled()
resourceSyncEnabled := getResourceSyncEnabled()
replayerEnabled := getReplayerEnabled()
recordDirPath := getRecordDirPath()
externalKubeClientCfg := &rest.Config{}
if externalimportenabled && resourceSyncEnabled {
return nil, xerrors.Errorf("externalImportEnabled and resourceSyncEnabled cannot be used simultaneously.")
if hasTwoOrMoreTrue(externalimportenabled, resourceSyncEnabled, replayerEnabled) {
return nil, xerrors.Errorf("externalImportEnabled, resourceSyncEnabled and replayerEnabled cannot be used simultaneously.")
}
if externalimportenabled || resourceSyncEnabled {
externalKubeClientCfg, err = clientcmd.BuildConfigFromFlags("", configYaml.KubeConfig)
Expand All @@ -110,6 +116,8 @@ func NewConfig() (*Config, error) {
ResourceImportLabelSelector: configYaml.ResourceImportLabelSelector,
ExternalKubeClientCfg: externalKubeClientCfg,
ResourceSyncEnabled: resourceSyncEnabled,
ReplayerEnabled: replayerEnabled,
RecordDirPath: recordDirPath,
}, nil
}

Expand Down Expand Up @@ -272,6 +280,28 @@ func getResourceSyncEnabled() bool {
return resourceSyncEnabled
}

// getReplayerEnabled reads REPLAYER_ENABLED and converts it to bool
// if empty from the config file.
// This function will return `true` if `REPLAYER_ENABLED` is "1".
func getReplayerEnabled() bool {
replayerEnabledString := os.Getenv("REPLAYER_ENABLED")
if replayerEnabledString == "" {
replayerEnabledString = strconv.FormatBool(configYaml.ReplayerEnabled)
}
replayerEnabled, _ := strconv.ParseBool(replayerEnabledString)
return replayerEnabled
}

// getRecordDirPath reads RECORD_DIR_PATH
// if empty from the config file.
func getRecordDirPath() string {
recordDirPath := os.Getenv("RECORD_DIR_PATH")
if recordDirPath == "" {
recordDirPath = configYaml.RecordDirPath
}
return recordDirPath
}

func decodeSchedulerCfg(buf []byte) (*configv1.KubeSchedulerConfiguration, error) {
decoder := scheme.Codecs.UniversalDeserializer()
obj, _, err := decoder.Decode(buf, nil, nil)
Expand Down Expand Up @@ -299,3 +329,13 @@ func GetKubeClientConfig() (*rest.Config, error) {
}
return clientConfig, nil
}

func hasTwoOrMoreTrue(values ...bool) bool {
count := 0
for _, v := range values {
if v {
count++
}
}
return count >= 2
}
7 changes: 7 additions & 0 deletions simulator/config/v1alpha1/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,13 @@ type SimulatorConfiguration struct {
// sync resources from an user cluster's or not.
ResourceSyncEnabled bool `json:"resourceSyncEnabled,omitempty"`

// This variable indicates whether the simulator will
// replay events recorded in a file or not.
ReplayerEnabled bool `json:"replayEnabled,omitempty"`

// The path to a directory where the record files are stored.
RecordDirPath string `json:"recordDirPath,omitempty"`

// This variable indicates whether an external scheduler
// is used.
ExternalSchedulerEnabled bool `json:"externalSchedulerEnabled,omitempty"`
Expand Down
10 changes: 5 additions & 5 deletions simulator/docs/import-cluster-resources.md
Original file line number Diff line number Diff line change
Expand Up @@ -68,10 +68,10 @@ It imports the following resources, which the scheduler's default plugins take i
If you need to, you can tweak which resources to import via the option in [/simulator/cmd/simulator/simulator.go](https://github.com/kubernetes-sigs/kube-scheduler-simulator/blob/master/simulator/cmd/simulator/simulator.go):

```go
dic, err := di.NewDIContainer(..., resourceapplier.Options{
// GVRsToSync is a list of GroupVersionResource that will be synced.
// If GVRsToSync is nil, defaultGVRs are used.
GVRsToSync: []schema.GroupVersionResource{
resourceApplierOptions := resourceapplier.Options{
// GVRsToApply is a list of GroupVersionResource that will be synced.
// If GVRsToApply is nil, defaultGVRs are used.
GVRsToApply: []schema.GroupVersionResource{
{Group: "your-group", Version: "v1", Resource: "your-custom-resources"},
},

Expand All @@ -85,7 +85,7 @@ dic, err := di.NewDIContainer(..., resourceapplier.Options{
FilterBeforeUpdating: map[schema.GroupVersionResource][]resourceapplier.FilteringFunction{},
// MutateBeforeUpdating is a list of additional mutating functions that are applied before updating resources.
MutateBeforeUpdating: map[schema.GroupVersionResource][]resourceapplier.MutatingFunction{},
})
}
```

> [!NOTE]
Expand Down
91 changes: 91 additions & 0 deletions simulator/docs/record-and-replay-cluster-changes.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
# [Beta] Record your real cluster's changes in resources and replay them in the simulator

You can record resource addition/update/deletion at your real cluster. This feature is useful for reproducing issues that occur in your real cluster.

## Record changes

To record changes from your real cluster, you need to follow these steps:

1. Go to `simulator` directory.
2. Start the recorder by running `go run cmd/recorder/recorder.go --dir /path/to/directory-to-store-recorded-changes --kubeconfig /path/to/kubeconfig`.

> [!NOTE]
> When a file already exists at the value of `--dir`, it will be overwritten.

> [!NOTE]
> You can add `--timeout` option to set the timeout for the recorder. The value is in seconds. If not set, the recorder will run until it's stopped.

### Resources to record

It records the changes of the following resources:

- Pods
- Nodes
- PersistentVolumes
- PersistentVolumeClaims
- StorageClasses

You can tweak which resources to record via the option in [/simulator/cmd/recorder/recorder.go](https://github.com/kubernetes-sigs/kube-scheduler-simulator/blob/master/simulator/cmd/recorder/recorder.go):

```go
recorderOptions := recorder.Options{RecordDir: recordDir,
// GVRs is a list of GroupVersionResource that will be recorded.
// If it's nil, DefaultGVRs are used.
GVRs: []schema.GroupVersionResource{
{Group: "your-group", Version: "v1", Resource: "your-custom-resources"},
},
}
```

## Replay changes

To replay the recorded changes in the simulator, you need to follow these steps:

1. Set `true` to `replayerEnabled`.
2. Set the path of the directory where the changes are recorded to `recordDirPath`.
3. Make sure the directory is mounted to the simulator server container.


```yaml:config.yaml
replayerEnabled: true
recordDirPath: "/path/to/directory-to-store-recorded-changes"
```

```yaml:compose.yml
volumes:
...
- ./path/to/directory-to-store-recorded-changes:/path/to/directory-to-store-recorded-changes
```

### Resources to replay

It replays the changes of the following resources:

- Pods
- Nodes
- PersistentVolumes
- PersistentVolumeClaims
- StorageClasses

You can tweak which resources to replay via the option in [/simulator/cmd/simulator/simulator.go](https://github.com/kubernetes-sigs/kube-scheduler-simulator/blob/master/simulator/cmd/simulator/simulator.go):

```go
resourceApplierOptions := resourceapplier.Options{
// GVRsToApply is a list of GroupVersionResource that will be replayed.
// If GVRsToApply is nil, defaultGVRs are used.
GVRsToApply: []schema.GroupVersionResource{
{Group: "your-group", Version: "v1", Resource: "your-custom-resources"},
},

// Actually, more options are available...

// FilterBeforeCreating is a list of additional filtering functions that are applied before creating resources.
FilterBeforeCreating: map[schema.GroupVersionResource][]resourceapplier.FilteringFunction{},
// MutateBeforeCreating is a list of additional mutating functions that are applied before creating resources.
MutateBeforeCreating: map[schema.GroupVersionResource][]resourceapplier.MutatingFunction{},
// FilterBeforeUpdating is a list of additional filtering functions that are applied before updating resources.
FilterBeforeUpdating: map[schema.GroupVersionResource][]resourceapplier.FilteringFunction{},
// MutateBeforeUpdating is a list of additional mutating functions that are applied before updating resources.
MutateBeforeUpdating: map[schema.GroupVersionResource][]resourceapplier.MutatingFunction{},
}
```
Loading