diff --git a/README.md b/README.md index 21a3bba..e7a386b 100644 --- a/README.md +++ b/README.md @@ -1,21 +1,25 @@ -## podDeployer-operator -### pod按照容器权重顺序启动operator +## podDeployer-operator 按照容器权重顺序启动 operator ![]() +### 有重大 bug! patch 不能保证容器顺序启动 + ### 项目思路与设计 设计背景: -集群在部署服务时,可能会需要多个container部署在同一个pod中的场景。当不同的container有顺序需要依赖时,可以采用k8s 提供的hook来做命令脚本 +集群在部署服务时,可能会需要多个 container 部署在同一个 pod 中的场景。当不同的 container 有顺序需要依赖时,可以采用 k8s 提供的 hook 来做命令脚本,也能使用 patch 进行操作。 -思路:对deployment使用clientSet patch操作实现。 +思路:对 deployment daemonset statefulset 使用 clientSet patch 操作实现。 ### 项目功能 - +- 使用 patch 操作实现容器顺序启动功能(仅仅用于模拟)。 可参考 [deployment yaml参考](yaml/example_deployment.yaml) [statefulset yaml参考](yaml/example_statefulset.yaml) [daemonset yaml参考](yaml/example_daemonset.yaml) ```yaml apiVersion: api.practice.com/v1alpha1 kind: Poddeployer metadata: - name: mypoddeployer + name: mypoddeployer-deployment namespace: default spec: + type: apps/v1/deployments # 可选择使用哪种类型,目前支持三种:apps/v1/deployments apps/v1/statefulsets apps/v1/daemonsets + # 资源对象的原生 Spec + # 目前支持三种:deployment_spec statefulset_spec daemonset_spec deployment_spec: replicas: 1 template: @@ -47,5 +51,4 @@ spec: value: 100 - image: example4 value: 1000 - ``` \ No newline at end of file diff --git a/helm/.helmignore b/helm/.helmignore new file mode 100644 index 0000000..0e8a0eb --- /dev/null +++ b/helm/.helmignore @@ -0,0 +1,23 @@ +# Patterns to ignore when building packages. +# This supports shell glob matching, relative path matching, and +# negation (prefixed with !). Only one pattern per line. +.DS_Store +# Common VCS dirs +.git/ +.gitignore +.bzr/ +.bzrignore +.hg/ +.hgignore +.svn/ +# Common backup files +*.swp +*.bak +*.tmp +*.orig +*~ +# Various IDEs +.project +.idea/ +*.tmproj +.vscode/ diff --git a/helm/Chart.yaml b/helm/Chart.yaml new file mode 100644 index 0000000..fc38992 --- /dev/null +++ b/helm/Chart.yaml @@ -0,0 +1,24 @@ +apiVersion: v2 +name: helm +description: A Helm chart for Kubernetes + +# A chart can be either an 'application' or a 'library' chart. +# +# Application charts are a collection of templates that can be packaged into versioned archives +# to be deployed. +# +# Library charts provide useful utilities or functions for the chart developer. They're included as +# a dependency of application charts to inject those utilities and functions into the rendering +# pipeline. Library charts do not define any templates and therefore cannot be deployed. +type: application + +# This is the chart version. This version number should be incremented each time you make changes +# to the chart and its templates, including the app version. +# Versions are expected to follow Semantic Versioning (https://semver.org/) +version: 0.1.0 + +# This is the version number of the application being deployed. This version number should be +# incremented each time you make changes to the application. Versions are not expected to +# follow Semantic Versioning. They should reflect the version the application is using. +# It is recommended to use it with quotes. +appVersion: "1.16.0" diff --git a/helm/templates/deployment.yaml b/helm/templates/deployment.yaml new file mode 100644 index 0000000..7834f58 --- /dev/null +++ b/helm/templates/deployment.yaml @@ -0,0 +1,50 @@ +apiVersion: apps/v1 +kind: Deployment +metadata: + name: {{ .Values.name }} + namespace: {{ .Values.namespace }} + labels: + app: {{ .Values.name }} +spec: + replicas: {{ .Values.replicaCount }} + selector: + matchLabels: + app: {{ .Values.name }} + template: + metadata: + labels: + app: {{ .Values.name }} + spec: + serviceAccountName: {{ .Values.serviceAccountName }} + nodeName: vm-0-16-centos + containers: + - name: {{ .Values.name }} + image: "{{ .Values.image }}" + imagePullPolicy: IfNotPresent + env: + - name: "Release" + value: "1" + workingDir: "/app" + command: [ "./mypoddeployeroperator" ] + {{- if .Values.service.ports }} + ports: + {{- range .Values.service.ports }} + - name: port-80 + containerPort: 80 + protocol: TCP + {{- end }} + {{- end }} + resources: + {{- toYaml .Values.resources | nindent 12 }} + {{- with .Values.nodeSelector }} + nodeSelector: + {{- toYaml . | nindent 8 }} + {{- end }} + {{- with .Values.affinity }} + affinity: + {{- toYaml . | nindent 8 }} + {{- end }} + {{- with .Values.tolerations }} + tolerations: + {{- toYaml . | nindent 8 }} + {{- end }} \ No newline at end of file diff --git a/helm/templates/service.yaml b/helm/templates/service.yaml new file mode 100644 index 0000000..7df11ae --- /dev/null +++ b/helm/templates/service.yaml @@ -0,0 +1,21 @@ +apiVersion: v1 +kind: Service +metadata: + name: {{ .Values.serviceName }} + namespace: {{ .Values.namespace }} + labels: + app: {{ .Values.name }} +spec: + type: {{ .Values.service.type }} + {{- if .Values.service.ports }} + ports: + {{- range .Values.service.ports }} + - port: {{ .port }} + targetPort: {{ .port }} + nodePort: {{ .nodePort }} + protocol: TCP + name: port-{{ .port }} + {{- end }} + {{- end }} + selector: + app: {{ .Values.name }} diff --git a/helm/values.yaml b/helm/values.yaml new file mode 100644 index 0000000..cd78a0f --- /dev/null +++ b/helm/values.yaml @@ -0,0 +1,68 @@ +# Default values for helm. +# This is a YAML-formatted file. +# Declare variables to be passed into your templates. + +replicaCount: 1 +name: mypoddeployer-controller +namespace: default +serviceName: mypoddeployer-svc +serviceAccountName: mypoddeployer-sa +image: poddeployeroperator:v1 + + +imagePullSecrets: [] +nameOverride: "" + +serviceAccount: + # Specifies whether a service account should be created + create: true + # Annotations to add to the service account + annotations: {} + # The name of the service account to use. + # If not set and create is true, a name is generated using the fullname template + name: "" + +podAnnotations: {} + +podSecurityContext: {} + # fsGroup: 2000 + +securityContext: {} + # capabilities: + # drop: + # - ALL + # readOnlyRootFilesystem: true + # runAsNonRoot: true + # runAsUser: 1000 + +service: + type: NodePort + ports: + - port: 8888 + nodePort: 31131 + + +resources: {} + # We usually recommend not to specify default resources and to leave this as a conscious + # choice for the user. This also increases chances charts run on environments with little + # resources, such as Minikube. If you do want to specify resources, uncomment the following + # lines, adjust them as necessary, and remove the curly braces after 'resources:'. + # limits: + # cpu: 100m + # memory: 128Mi + # requests: + # cpu: 100m + # memory: 128Mi + +autoscaling: + enabled: false + minReplicas: 1 + maxReplicas: 100 + targetCPUUtilizationPercentage: 80 + # targetMemoryUtilizationPercentage: 80 + +nodeSelector: {} + +tolerations: [] + +affinity: {} diff --git a/main.go b/main.go index efd6f1e..4fece8a 100644 --- a/main.go +++ b/main.go @@ -5,6 +5,7 @@ import ( "github.com/myoperator/poddeployer/pkg/controller" "github.com/myoperator/poddeployer/pkg/k8sconfig" v1 "k8s.io/api/apps/v1" + v12 "k8s.io/api/core/v1" _ "k8s.io/code-generator" "k8s.io/klog/v2" "log" @@ -32,7 +33,7 @@ func main() { var d time.Duration = 0 // 1. 管理器初始化 mgr, err := manager.New(k8sconfig.K8sRestConfig(), manager.Options{ - Logger: logf.Log.WithName("poddeployer-operator"), + Logger: logf.Log.WithName("poddeployer-operator"), SyncPeriod: &d, // resync不设置触发 }) if err != nil { @@ -48,7 +49,7 @@ func main() { } // 3. 控制器相关 - podReStarterCtl := controller.NewPodDeployerController() + podReStarterCtl := controller.NewPodDeployerController(k8sconfig.DynamicClient) err = builder.ControllerManagedBy(mgr). For(&podrestarterv1alpha1.Poddeployer{}). @@ -56,8 +57,11 @@ func main() { handler.Funcs{ DeleteFunc: podReStarterCtl.DeploymentDeleteHandler, }, - ). - Complete(podReStarterCtl) + ).Watches(&source.Kind{Type: &v12.Pod{}}, + handler.Funcs{ + DeleteFunc: podReStarterCtl.PodDeleteHandler, + }, + ).Complete(podReStarterCtl) errC := make(chan error) diff --git a/pkg/apis/podDeployer/v1alpha1/types.go b/pkg/apis/podDeployer/v1alpha1/types.go index c859cfb..0f57858 100644 --- a/pkg/apis/podDeployer/v1alpha1/types.go +++ b/pkg/apis/podDeployer/v1alpha1/types.go @@ -18,8 +18,11 @@ type Poddeployer struct { } type PodDeployerSpec struct { - DeploymentSpec appsv1.DeploymentSpec `json:"deployment_spec"` - PriorityImages []PriorityImage `json:"priority_images,omitempty"` + Type string `json:"type"` + DaemonSetSpec appsv1.DaemonSetSpec `json:"daemonset_spec,omitempty"` + StatefulSetSpec appsv1.StatefulSetSpec `json:"statefulset_spec,omitempty"` + DeploymentSpec appsv1.DeploymentSpec `json:"deployment_spec,omitempty"` + PriorityImages []PriorityImage `json:"priority_images,omitempty"` } type PriorityImage struct { diff --git a/pkg/apis/podDeployer/v1alpha1/zz_generated.deepcopy.go b/pkg/apis/podDeployer/v1alpha1/zz_generated.deepcopy.go index 78bf86e..c25f10c 100644 --- a/pkg/apis/podDeployer/v1alpha1/zz_generated.deepcopy.go +++ b/pkg/apis/podDeployer/v1alpha1/zz_generated.deepcopy.go @@ -29,6 +29,8 @@ import ( func (in *PodDeployerSpec) DeepCopyInto(out *PodDeployerSpec) { *out = *in in.DeploymentSpec.DeepCopyInto(&out.DeploymentSpec) + in.DaemonSetSpec.DeepCopyInto(&out.DaemonSetSpec) + in.StatefulSetSpec.DeepCopyInto(&out.StatefulSetSpec) if in.PriorityImages != nil { in, out := &in.PriorityImages, &out.PriorityImages *out = make([]PriorityImage, len(*in)) diff --git a/pkg/controller/controller.go b/pkg/controller/controller.go index aca5848..0d437a0 100644 --- a/pkg/controller/controller.go +++ b/pkg/controller/controller.go @@ -2,37 +2,32 @@ package controller import ( "context" - "fmt" podrestarterv1alpha1 "github.com/myoperator/poddeployer/pkg/apis/podDeployer/v1alpha1" - appsv1 "k8s.io/api/apps/v1" + "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/dynamic" "k8s.io/client-go/util/workqueue" "k8s.io/klog/v2" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/event" "sigs.k8s.io/controller-runtime/pkg/reconcile" - "sync" + "strings" "time" ) -var wg = sync.WaitGroup{} - type PodDeployerController struct { client.Client + DynamicClient dynamic.Interface } -func NewPodDeployerController() *PodDeployerController { - return &PodDeployerController{} +func NewPodDeployerController(dc dynamic.Interface) *PodDeployerController { + return &PodDeployerController{DynamicClient: dc} } // Reconcile 调协loop func (r *PodDeployerController) Reconcile(ctx context.Context, req reconcile.Request) (reconcile.Result, error) { podDeployer := &podrestarterv1alpha1.Poddeployer{} - var deployment appsv1.Deployment - deployment.Name = podDeployer.Name - deployment.Namespace = podDeployer.Namespace - err := r.Get(ctx, req.NamespacedName, podDeployer) if err != nil { if client.IgnoreNotFound(err) != nil { @@ -43,24 +38,73 @@ func (r *PodDeployerController) Reconcile(ctx context.Context, req reconcile.Req } klog.Info(podDeployer) - err = r.handleDeployment(ctx, podDeployer, deployment) - if err != nil { - klog.Error("handler deployment err: ", err) - return reconcile.Result{}, err + // 非填,默认 "apps/v1/deployments" + if podDeployer.Spec.Type == "" { + podDeployer.Spec.Type = "apps/v1/deployments" } - if len(podDeployer.Spec.DeploymentSpec.Template.Spec.Containers) != 1 || len(podDeployer.Spec.PriorityImages) != 0 { - klog.Info("do patch image to deployment...") - klog.Info(SetOtherContainers) - // 执行patch操作 - for _, container := range SetOtherContainers { - time.Sleep(time.Second * 15) - patchDeployment(podDeployer.Name, podDeployer.Namespace, &container) + if podDeployer.Spec.Type == "apps/v1/deployments" { + err = r.handleDeployment(ctx, podDeployer) + if err != nil { + klog.Error("handler deployment err: ", err) + return reconcile.Result{}, err } - } - return reconcile.Result{}, nil + if len(podDeployer.Spec.DeploymentSpec.Template.Spec.Containers) != 1 || len(podDeployer.Spec.PriorityImages) != 0 { + klog.Info("do patch image to deployment...") + klog.Info(SetOtherContainers) + // 执行patch操作 + for _, container := range SetOtherContainers { + time.Sleep(time.Second * 15) + err = patchResource(podDeployer.Name, podDeployer.Namespace, &container, parseGVR(podDeployer.Spec.Type)) + if err != nil { + klog.Error("patch deployment err: ", err) + return reconcile.Result{}, err + } + } + } + } else if podDeployer.Spec.Type == "apps/v1/statefulsets" { + err = r.handleStatefulSet(ctx, podDeployer) + if err != nil { + klog.Error("handler statefulset err: ", err) + return reconcile.Result{}, err + } + if len(podDeployer.Spec.StatefulSetSpec.Template.Spec.Containers) != 1 || len(podDeployer.Spec.PriorityImages) != 0 { + klog.Info("do patch image to statefulset...") + klog.Info(SetOtherContainers) + // 执行patch操作 + for _, container := range SetOtherContainers { + time.Sleep(time.Second * 15) + err = patchResource(podDeployer.Name, podDeployer.Namespace, &container, parseGVR(podDeployer.Spec.Type)) + if err != nil { + klog.Error("patch statefulset err: ", err) + return reconcile.Result{}, err + } + } + } + } else { + err = r.handleDaemonSet(ctx, podDeployer) + if err != nil { + klog.Error("handler daemonSet err: ", err) + return reconcile.Result{}, err + } + + if len(podDeployer.Spec.DaemonSetSpec.Template.Spec.Containers) != 1 || len(podDeployer.Spec.PriorityImages) != 0 { + klog.Info("do patch image to daemonSet...") + klog.Info(SetOtherContainers) + // 执行patch操作 + for _, container := range SetOtherContainers { + time.Sleep(time.Second * 15) + err = patchResource(podDeployer.Name, podDeployer.Namespace, &container, parseGVR(podDeployer.Spec.Type)) + if err != nil { + klog.Error("patch daemonSet err: ", err) + return reconcile.Result{}, err + } + } + } + } + return reconcile.Result{}, nil } // InjectClient 使用controller-runtime 需要注入的client @@ -71,12 +115,51 @@ func (r *PodDeployerController) InjectClient(c client.Client) error { func (r *PodDeployerController) DeploymentDeleteHandler(event event.DeleteEvent, limitingInterface workqueue.RateLimitingInterface) { for _, ref := range event.Object.GetOwnerReferences() { - if ref.Kind == podrestarterv1alpha1.PodDeployerApiVersion && ref.APIVersion == podrestarterv1alpha1.PodDeployerApiVersion { - // 重新入列,这样删除pod后,就会进入调和loop,发现ownerReference还在,会立即创建出新的pod。 - fmt.Println("被删除的对象名称是", event.Object.GetName(), event.Object.GetObjectKind()) + if ref.Kind == podrestarterv1alpha1.PodDeployerKind && ref.APIVersion == podrestarterv1alpha1.PodDeployerApiVersion { + // 重新入列 + klog.Infof("deleted deployment object [%v] name [%v]]\n", event.Object.GetName(), event.Object.GetObjectKind().GroupVersionKind().Kind) limitingInterface.Add(reconcile.Request{ NamespacedName: types.NamespacedName{Name: ref.Name, Namespace: event.Object.GetNamespace()}}) } } } + +// FIXME: 目前没效果,因为 pod OwnerReferences 没有設置 PodDeployer +func (r *PodDeployerController) PodDeleteHandler(event event.DeleteEvent, limitingInterface workqueue.RateLimitingInterface) { + for _, ref := range event.Object.GetOwnerReferences() { + if ref.Kind == podrestarterv1alpha1.PodDeployerKind && ref.APIVersion == podrestarterv1alpha1.PodDeployerApiVersion { + // 重新入列 + klog.Infof("deleted pod object [%v] name [%v]]\n", event.Object.GetName(), event.Object.GetObjectKind().GroupVersionKind().Kind) + limitingInterface.Add(reconcile.Request{ + NamespacedName: types.NamespacedName{Name: ref.Name, + Namespace: event.Object.GetNamespace()}}) + } + } +} + +// parseGVR 解析并指定资源对象 "apps/v1/deployments" "core/v1/pods" "batch/v1/jobs" +func parseGVR(gvr string) schema.GroupVersionResource { + var group, version, resource string + gvList := strings.Split(gvr, "/") + + // 防止越界 + if len(gvList) < 2 { + panic("gvr input error, please input like format apps/v1/deployments or core/v1/pods") + } + + if len(gvList) < 3 { + group = "" + version = gvList[0] + resource = gvList[1] + } else { + if gvList[0] == "core" { + gvList[0] = "" + } + group, version, resource = gvList[0], gvList[1], gvList[2] + } + + return schema.GroupVersionResource{ + Group: group, Version: version, Resource: resource, + } +} diff --git a/pkg/controller/resource.go b/pkg/controller/resource.go index 4de9876..bdf618c 100644 --- a/pkg/controller/resource.go +++ b/pkg/controller/resource.go @@ -10,6 +10,7 @@ import ( appsv1 "k8s.io/api/apps/v1" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/json" "k8s.io/klog/v2" @@ -21,17 +22,18 @@ var ( SetOtherContainers = make([]v1.Container, 0) ) -func (r *PodDeployerController) handleDeployment(ctx context.Context, podDeployer *podrestarterv1alpha1.Poddeployer, deployment appsv1.Deployment) error { - +// handleDeployment 处理顺序的主要逻辑:使用依赖第一个容器创建 deployment, +// 再按照顺序 patch 其他容器 +func (r *PodDeployerController) handleDeployment(ctx context.Context, podDeployer *podrestarterv1alpha1.Poddeployer) error { + var deployment appsv1.Deployment deployment.Name = podDeployer.Name deployment.Namespace = podDeployer.Namespace - + // 创建 deployment 对象 mutateDeploymentRes, err := ctrl.CreateOrUpdate(ctx, r.Client, &deployment, func() error { - // TODO: 处理deployment image的数量 - // 1. 如果image数量为1或没有设置PriorityImages,没有必要做其他操作 - // 2. 如果有设置PriorityImages,需要先遍历PriorityImages的image name 与 container name - // 只留最高优先集的image,把其他的都取出来,并且使用patch再加入,完成一个调协 - + // TODO: 处理 image 的数量 + // 1. 如果 image 数量为 1 或没有设置 PriorityImages ,没有必要做其他操作 + // 2. 如果有设置 PriorityImages,需要先遍历 PriorityImages的 image name 与 container name + // 只留最高优先集的 image,把其他的都取出来,并且使用 patch 再加入,完成一个调协 if len(podDeployer.Spec.DeploymentSpec.Template.Spec.Containers) == 1 || len(podDeployer.Spec.PriorityImages) == 0 { klog.Info("no need to go priority process...") mutateDeployment(podDeployer, &deployment) @@ -46,21 +48,20 @@ func (r *PodDeployerController) handleDeployment(ctx context.Context, podDeploye return err } mutateDeployment(podDeployer, &deployment) - } - // 设置ownerReferences + // 设置 ownerReferences + c := true deployment.OwnerReferences = append(deployment.OwnerReferences, metav1.OwnerReference{ APIVersion: podDeployer.APIVersion, Kind: podDeployer.Kind, Name: podDeployer.Name, UID: podDeployer.UID, + Controller: &c, }) - return nil }) if err != nil { - return err } @@ -68,6 +69,124 @@ func (r *PodDeployerController) handleDeployment(ctx context.Context, podDeploye return nil } +func (r *PodDeployerController) handleStatefulSet(ctx context.Context, podDeployer *podrestarterv1alpha1.Poddeployer) error { + var statefulSet appsv1.StatefulSet + statefulSet.Name = podDeployer.Name + statefulSet.Namespace = podDeployer.Namespace + // 创建 statefulSet 对象 + mutateStatefulSetRes, err := ctrl.CreateOrUpdate(ctx, r.Client, &statefulSet, func() error { + // TODO: 处理 image 的数量 + // 1. 如果 image 数量为 1 或没有设置 PriorityImages ,没有必要做其他操作 + // 2. 如果有设置 PriorityImages,需要先遍历 PriorityImages的 image name 与 container name + // 只留最高优先集的 image,把其他的都取出来,并且使用 patch 再加入,完成一个调协 + if len(podDeployer.Spec.StatefulSetSpec.Template.Spec.Containers) == 1 || len(podDeployer.Spec.PriorityImages) == 0 { + klog.Info("no need to go priority process...") + mutateStatefulSet(podDeployer, &statefulSet) + } else { + klog.Info("need to go priority process...") + priorityImages := calculatePriorityImages(podDeployer) + setOtherContainers, err := handleStatefulSetImageSort(priorityImages, podDeployer) + klog.Info("setOtherContainers: ", setOtherContainers) + SetOtherContainers = setOtherContainers + if err != nil { + klog.Error("err: ", err) + return err + } + mutateStatefulSet(podDeployer, &statefulSet) + } + + // 设置 ownerReferences + c := true + statefulSet.OwnerReferences = append(statefulSet.OwnerReferences, metav1.OwnerReference{ + APIVersion: podDeployer.APIVersion, + Kind: podDeployer.Kind, + Name: podDeployer.Name, + UID: podDeployer.UID, + Controller: &c, + }) + return nil + }) + if err != nil { + return err + } + + klog.Info("CreateOrUpdate", "StatefulSet", mutateStatefulSetRes) + return nil +} + +func (r *PodDeployerController) handleDaemonSet(ctx context.Context, podDeployer *podrestarterv1alpha1.Poddeployer) error { + var daemonSet appsv1.DaemonSet + daemonSet.Name = podDeployer.Name + daemonSet.Namespace = podDeployer.Namespace + // 创建 deployment 对象 + mutateDaemonSetRes, err := ctrl.CreateOrUpdate(ctx, r.Client, &daemonSet, func() error { + // TODO: 处理 image 的数量 + // 1. 如果 image 数量为 1 或没有设置 PriorityImages ,没有必要做其他操作 + // 2. 如果有设置 PriorityImages,需要先遍历 PriorityImages的 image name 与 container name + // 只留最高优先集的 image,把其他的都取出来,并且使用 patch 再加入,完成一个调协 + if len(podDeployer.Spec.DaemonSetSpec.Template.Spec.Containers) == 1 || len(podDeployer.Spec.PriorityImages) == 0 { + klog.Info("no need to go priority process...") + mutateDaemonSet(podDeployer, &daemonSet) + } else { + klog.Info("need to go priority process...") + priorityImages := calculatePriorityImages(podDeployer) + setOtherContainers, err := handleDaemonSetImageSort(priorityImages, podDeployer) + klog.Info("setOtherContainers: ", setOtherContainers) + SetOtherContainers = setOtherContainers + if err != nil { + klog.Error("err: ", err) + return err + } + mutateDaemonSet(podDeployer, &daemonSet) + } + + // 设置 ownerReferences + c := true + daemonSet.OwnerReferences = append(daemonSet.OwnerReferences, metav1.OwnerReference{ + APIVersion: podDeployer.APIVersion, + Kind: podDeployer.Kind, + Name: podDeployer.Name, + UID: podDeployer.UID, + Controller: &c, + }) + return nil + }) + if err != nil { + return err + } + klog.Info("CreateOrUpdate", "DaemonSet", mutateDaemonSetRes) + return nil +} + +// mutateStatefulSet 设置 labels 标签,目前无其他作用 +func mutateStatefulSet(podDeployer *podrestarterv1alpha1.Poddeployer, statefulSet *appsv1.StatefulSet) { + statefulSet.Spec = podDeployer.Spec.StatefulSetSpec + labels := map[string]string{ + "podDeployer": podDeployer.Name, + } + selector := metav1.LabelSelector{ + MatchLabels: labels, + } + statefulSet.Spec.Template.Labels = map[string]string{} + statefulSet.Spec.Selector = &selector + statefulSet.Spec.Template.Labels["podDeployer"] = podDeployer.Name +} + +// mutateStatefulSet 设置 labels 标签,目前无其他作用 +func mutateDaemonSet(podDeployer *podrestarterv1alpha1.Poddeployer, daemonSet *appsv1.DaemonSet) { + daemonSet.Spec = podDeployer.Spec.DaemonSetSpec + labels := map[string]string{ + "podDeployer": podDeployer.Name, + } + selector := metav1.LabelSelector{ + MatchLabels: labels, + } + daemonSet.Spec.Template.Labels = map[string]string{} + daemonSet.Spec.Selector = &selector + daemonSet.Spec.Template.Labels["podDeployer"] = podDeployer.Name +} + +// mutateDeployment 设置 labels 标签,目前无其他作用 func mutateDeployment(podDeployer *podrestarterv1alpha1.Poddeployer, deployment *appsv1.Deployment) { deployment.Spec = podDeployer.Spec.DeploymentSpec labels := map[string]string{ @@ -81,7 +200,7 @@ func mutateDeployment(podDeployer *podrestarterv1alpha1.Poddeployer, deployment deployment.Spec.Template.Labels["podDeployer"] = podDeployer.Name } -// 为image排序 +// calculatePriorityImages 为 image 排序 func calculatePriorityImages(podDeployer *podrestarterv1alpha1.Poddeployer) []podrestarterv1alpha1.PriorityImage { // 找出对应的 imageList := podDeployer.Spec.PriorityImages @@ -92,7 +211,7 @@ func calculatePriorityImages(podDeployer *podrestarterv1alpha1.Poddeployer) []po return imageList } -// 替换image,并返回剩下排序后的images +// handleDeploymentImageSort 替换 image,并返回剩下排序后的 images func handleDeploymentImageSort(priorityImages []podrestarterv1alpha1.PriorityImage, podDeployer *podrestarterv1alpha1.Poddeployer) ([]v1.Container, error) { if len(priorityImages) != len(podDeployer.Spec.DeploymentSpec.Template.Spec.Containers) { return nil, errors.New("priorityImage len error") @@ -114,15 +233,59 @@ func handleDeploymentImageSort(priorityImages []podrestarterv1alpha1.PriorityIma return setOtherContainers, nil } +// handleStatefulSetImageSort 替换 image,并返回剩下排序后的 images +func handleStatefulSetImageSort(priorityImages []podrestarterv1alpha1.PriorityImage, podDeployer *podrestarterv1alpha1.Poddeployer) ([]v1.Container, error) { + if len(priorityImages) != len(podDeployer.Spec.StatefulSetSpec.Template.Spec.Containers) { + return nil, errors.New("priorityImage len error") + } + firstContainer := make([]v1.Container, 0) + setOtherContainers := make([]v1.Container, 0) + + for k, v := range priorityImages { + for _, container := range podDeployer.Spec.StatefulSetSpec.Template.Spec.Containers { + // 第一个放进来 + if v.Image == container.Name && k == 0 { + firstContainer = append(firstContainer, container) + } else if v.Image == container.Name { + setOtherContainers = append(setOtherContainers, container) + } + } + } + podDeployer.Spec.StatefulSetSpec.Template.Spec.Containers = firstContainer + return setOtherContainers, nil +} + +// handleDeploymentImageSort 替换 image,并返回剩下排序后的 images +func handleDaemonSetImageSort(priorityImages []podrestarterv1alpha1.PriorityImage, podDeployer *podrestarterv1alpha1.Poddeployer) ([]v1.Container, error) { + if len(priorityImages) != len(podDeployer.Spec.DaemonSetSpec.Template.Spec.Containers) { + return nil, errors.New("priorityImage len error") + } + firstContainer := make([]v1.Container, 0) + setOtherContainers := make([]v1.Container, 0) + + for k, v := range priorityImages { + for _, container := range podDeployer.Spec.DaemonSetSpec.Template.Spec.Containers { + // 第一个放进来 + if v.Image == container.Name && k == 0 { + firstContainer = append(firstContainer, container) + } else if v.Image == container.Name { + setOtherContainers = append(setOtherContainers, container) + } + } + } + podDeployer.Spec.DaemonSetSpec.Template.Spec.Containers = firstContainer + return setOtherContainers, nil +} + type patchOperation struct { Op string `json:"op"` Path string `json:"path"` Value interface{} `json:"value,omitempty"` } -// patchDeployment 使用deployment patch的方式顺序执行pod -func patchDeployment(deploymentName, namespace string, container *v1.Container) { - klog.Info("do deployment patch....") +// patchResource 使用 dynamic client patch +func patchResource(deploymentName, namespace string, container *v1.Container, gvr schema.GroupVersionResource) error { + klog.Infof("do resource patch [%v/%v]", namespace, deploymentName) pa := make([]patchOperation, 0) p := patchOperation{ @@ -133,26 +296,28 @@ func patchDeployment(deploymentName, namespace string, container *v1.Container) pa = append(pa, p) patchBytes, err := json.Marshal(&pa) if err != nil { - klog.Error(err) - return + klog.Error("patch marshal error: ", err) + return err } jsonPatch, err := jsonpatch.DecodePatch(patchBytes) if err != nil { klog.Error("DecodePatch error: ", err) - return + return err } + jsonPatchBytes, err := json.Marshal(jsonPatch) if err != nil { klog.Error("json Marshal error: ", err) - return + return err } - klog.Info(string(jsonPatchBytes)) - _, err = k8sconfig.ClientSet.AppsV1().Deployments(namespace). - Patch(context.TODO(), deploymentName, types.JSONPatchType, - jsonPatchBytes, metav1.PatchOptions{}) + klog.Infof("patch operation: %v", string(jsonPatchBytes)) + _, err = k8sconfig.DynamicClient.Resource(gvr).Namespace(namespace).Patch(context.TODO(), deploymentName, + types.JSONPatchType, jsonPatchBytes, metav1.PatchOptions{}) + if err != nil { klog.Error("patch error: ", err) - return + return err } + return nil } diff --git a/pkg/k8sconfig/init_client.go b/pkg/k8sconfig/init_client.go index ed8552f..2576ddc 100644 --- a/pkg/k8sconfig/init_client.go +++ b/pkg/k8sconfig/init_client.go @@ -1,13 +1,14 @@ package k8sconfig import ( + "k8s.io/client-go/dynamic" "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" "log" ) -// InitClient 初始化k8s-client -func InitClient(config *rest.Config) kubernetes.Interface { +// initClient 初始化 k8s-client +func initClient(config *rest.Config) kubernetes.Interface { c, err := kubernetes.NewForConfig(config) if err != nil { log.Fatal(err) @@ -15,8 +16,22 @@ func InitClient(config *rest.Config) kubernetes.Interface { return c } -var ClientSet kubernetes.Interface +// initDynamicClient 初始化 k8s-dynamic-client +func initDynamicClient(config *rest.Config) dynamic.Interface { + c, err := dynamic.NewForConfig(config) + if err != nil { + log.Fatal(err) + } + + return c +} + +var ( + ClientSet kubernetes.Interface + DynamicClient dynamic.Interface +) func init() { - ClientSet = InitClient(K8sRestConfig()) + ClientSet = initClient(K8sRestConfig()) + DynamicClient = initDynamicClient(K8sRestConfig()) } diff --git a/pkg/k8sconfig/init_k8s_config.go b/pkg/k8sconfig/init_k8s_config.go index b145fa6..b989634 100644 --- a/pkg/k8sconfig/init_k8s_config.go +++ b/pkg/k8sconfig/init_k8s_config.go @@ -25,7 +25,6 @@ func K8sRestConfig() *rest.Config { config.Insecure = true klog.Info("run outside the cluster") return config - } // k8sRestConfigInPod 集群内部POD里使用 diff --git a/pkg/util/find.go b/pkg/util/find.go deleted file mode 100644 index 504d477..0000000 --- a/pkg/util/find.go +++ /dev/null @@ -1,70 +0,0 @@ -package util - -import ( - "context" - "github.com/myoperator/poddeployer/pkg/k8sconfig" - appv1 "k8s.io/api/apps/v1" - v1 "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/labels" - "k8s.io/client-go/kubernetes" - "k8s.io/klog/v2" -) - -// GetPodsByDeployment 根据传入Deployment获取当前"正在"使用的pod -func GetPodsByDeployment(depName, ns string) []v1.Pod { - - deployment, err := k8sconfig.ClientSet.AppsV1().Deployments(ns).Get(context.TODO(), - depName, metav1.GetOptions{}) - if err != nil { - klog.Error("create clientSet error: ", err) - return nil - } - rsIdList := getRsIdsByDeployment(deployment, k8sconfig.ClientSet) - podsList := make([]v1.Pod, 0) - for _, rs := range rsIdList { - pods := getPodsByReplicaSet(rs, k8sconfig.ClientSet, ns) - podsList = append(podsList, pods...) - } - - return podsList -} - -// getPodsByReplicaSet 根据传入的ReplicaSet查询到需要的pod -func getPodsByReplicaSet(rs appv1.ReplicaSet, clientSet kubernetes.Interface, ns string) []v1.Pod { - pods, err := clientSet.CoreV1().Pods(ns).List(context.TODO(), metav1.ListOptions{}) - if err != nil { - klog.Error("list pod error: ", err) - return nil - } - - ret := make([]v1.Pod, 0) - for _, p := range pods.Items { - // 找到 pod OwnerReferences uid相同的 - if p.OwnerReferences != nil && len(p.OwnerReferences) == 1 { - if p.OwnerReferences[0].UID == rs.UID { - ret = append(ret, p) - } - } - } - return ret -} - -// getRsIdsByDeployment 根据传入的dep,获取到相关连的rs列表(滚更后的ReplicaSet就没用了) -func getRsIdsByDeployment(dep *appv1.Deployment, clientSet kubernetes.Interface) []appv1.ReplicaSet { - // 需要使用match labels过滤 - rsList, err := clientSet.AppsV1().ReplicaSets(dep.Namespace). - List(context.TODO(), metav1.ListOptions{ - LabelSelector: labels.Set(dep.Spec.Selector.MatchLabels).String(), - }) - if err != nil { - klog.Error("list ReplicaSets error: ", err) - return nil - } - - ret := make([]appv1.ReplicaSet, 0) - for _, rs := range rsList.Items { - ret = append(ret, rs) - } - return ret -} diff --git a/test/test_dynamic_client.go b/test/test_dynamic_client.go new file mode 100644 index 0000000..842cae2 --- /dev/null +++ b/test/test_dynamic_client.go @@ -0,0 +1,47 @@ +package main + +import ( + "context" + "github.com/myoperator/poddeployer/pkg/k8sconfig" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime/schema" + "log" + "strings" +) + +func main() { + + //_, err := k8sconfig.DynamicClient.Resource(parseGVR("apps/v1/statefulsets")).Patch(context.TODO(), "mypoddeployer", + // types.JSONPatchType, jsonPatchBytes, metav1.PatchOptions{}) + + _, err := k8sconfig.DynamicClient.Resource(parseGVR("apps/v1/deployments")).Namespace("default").Get(context.TODO(), "mypoddeployer-2", metav1.GetOptions{}) + if err != nil { + log.Fatal(err) + } +} + +// parseGVR 解析并指定资源对象 "apps/v1/deployments" "core/v1/pods" "batch/v1/jobs" +func parseGVR(gvr string) schema.GroupVersionResource { + var group, version, resource string + gvList := strings.Split(gvr, "/") + + // 防止越界 + if len(gvList) < 2 { + panic("gvr input error, please input like format apps/v1/deployments or core/v1/pods") + } + + if len(gvList) < 3 { + group = "" + version = gvList[0] + resource = gvList[1] + } else { + if gvList[0] == "core" { + gvList[0] = "" + } + group, version, resource = gvList[0], gvList[1], gvList[2] + } + + return schema.GroupVersionResource{ + Group: group, Version: version, Resource: resource, + } +} diff --git a/yaml/deploy.yaml b/yaml/deploy.yaml index b06d5e3..b6148f5 100644 --- a/yaml/deploy.yaml +++ b/yaml/deploy.yaml @@ -43,7 +43,7 @@ spec: type: NodePort ports: - port: 8888 - nodePort: 31130 + nodePort: 31131 selector: app: mypodrestarter-controller --- \ No newline at end of file diff --git a/yaml/deploy_docker.yaml b/yaml/deploy_docker.yaml index 798da49..2fe169d 100644 --- a/yaml/deploy_docker.yaml +++ b/yaml/deploy_docker.yaml @@ -36,7 +36,7 @@ spec: type: NodePort ports: - port: 8888 - nodePort: 31130 + nodePort: 31131 selector: app: mypoddeployer-controller --- \ No newline at end of file diff --git a/yaml/example_daemonset.yaml b/yaml/example_daemonset.yaml new file mode 100644 index 0000000..39a8476 --- /dev/null +++ b/yaml/example_daemonset.yaml @@ -0,0 +1,39 @@ +apiVersion: api.practice.com/v1alpha1 +kind: Poddeployer +metadata: + name: mypoddeployer-daemonsets + namespace: default +spec: + type: apps/v1/daemonsets + daemonset_spec: + replicas: 3 + selector: + matchLabels: + app: myapp-pod + template: + metadata: + labels: + app: myapp-pod + spec: + containers: + - name: example1 + image: busybox:1.34 + command: + - "sleep" + - "3600" + - name: example2 + image: nginx:1.18-alpine + ports: + - containerPort: 80 + priority_images: # image的权重排序 + - image: example1 + value: 200 + - image: example2 + value: 50 + + + + + + + diff --git a/yaml/example.yaml b/yaml/example_deployment.yaml similarity index 58% rename from yaml/example.yaml rename to yaml/example_deployment.yaml index 6fb40ac..7f54d84 100644 --- a/yaml/example.yaml +++ b/yaml/example_deployment.yaml @@ -1,9 +1,10 @@ apiVersion: api.practice.com/v1alpha1 kind: Poddeployer metadata: - name: mypoddeployer + name: mypoddeployer-deployment namespace: default spec: + type: apps/v1/deployments deployment_spec: replicas: 1 template: @@ -15,26 +16,26 @@ spec: - "sleep" - "3600" - name: example2 - image: nginx:1.14.2 - ports: - - containerPort: 80 - - name: example3 - image: nginx:1.14.2 - ports: - - containerPort: 81 - - name: example4 image: nginx:1.18-alpine ports: - - containerPort: 82 + - containerPort: 80 +# - name: example3 +# image: redis:latest +# ports: +# - containerPort: 81 +# - name: example4 +# image: nginx:1.18-alpine +# ports: +# - containerPort: 82 priority_images: # image的权重排序 - image: example1 value: 200 - image: example2 value: 50 - - image: example3 - value: 100 - - image: example4 - value: 1000 +# - image: example3 +# value: 100 +# - image: example4 +# value: 1000 diff --git a/yaml/example_statefulset.yaml b/yaml/example_statefulset.yaml new file mode 100644 index 0000000..780bc15 --- /dev/null +++ b/yaml/example_statefulset.yaml @@ -0,0 +1,39 @@ +apiVersion: api.practice.com/v1alpha1 +kind: Poddeployer +metadata: + name: mypoddeployer-statefulset + namespace: default +spec: + type: apps/v1/statefulsets + statefulset_spec: + replicas: 3 + selector: + matchLabels: + app: myapp-pod + template: + metadata: + labels: + app: myapp-pod + spec: + containers: + - name: example1 + image: busybox:1.34 + command: + - "sleep" + - "3600" + - name: example2 + image: nginx:1.18-alpine + ports: + - containerPort: 80 + priority_images: # image的权重排序 + - image: example1 + value: 200 + - image: example2 + value: 50 + + + + + + + diff --git a/yaml/rbac.yaml b/yaml/rbac.yaml index eea751d..7556209 100644 --- a/yaml/rbac.yaml +++ b/yaml/rbac.yaml @@ -39,10 +39,15 @@ rules: - apps resources: - deployments + - statefulsets + - daemonsets verbs: - get - list - watch + - patch + - create + - update - apiGroups: - networking.k8s.io resources: @@ -81,7 +86,7 @@ rules: - apiGroups: - api.practice.com resources: - - podDeployer + - poddeployers verbs: - create - delete @@ -93,13 +98,13 @@ rules: - apiGroups: - api.practice.com resources: - - podDeployer/finalizers + - poddeployers/finalizers verbs: - update - apiGroups: - api.practice.com resources: - - podDeployer/status + - poddeployers/status verbs: - get - patch