Skip to content

Commit

Permalink
Various fixes (#106)
Browse files Browse the repository at this point in the history
  • Loading branch information
flavio authored Apr 22, 2024
1 parent 17e2492 commit 3b6e1db
Show file tree
Hide file tree
Showing 5 changed files with 81 additions and 65 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ on:
workflow_call:
push:
branches:
- 'main'
- "main"
pull_request:

# Declare default permissions as read only.
Expand Down Expand Up @@ -38,5 +38,5 @@ jobs:
- name: golangci-lint
uses: golangci/golangci-lint-action@3cfe3a4abbb849e10058ce4af15d205b6da42804 # v4.0.0
with:
version: v1.56.2
version: v1.57.2
skip-cache: true
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ test: manifests generate fmt vet envtest ## Run tests.
KUBEBUILDER_ASSETS="$(shell $(ENVTEST) use $(ENVTEST_K8S_VERSION) --bin-dir $(LOCALBIN) -p path)" go test ./... -coverprofile cover.out

GOLANGCI_LINT = $(shell pwd)/bin/golangci-lint
GOLANGCI_LINT_VERSION ?= v1.54.2
GOLANGCI_LINT_VERSION ?= v1.57.2
golangci-lint:
@[ -f $(GOLANGCI_LINT) ] || { \
set -e ;\
Expand Down
12 changes: 10 additions & 2 deletions api/v1alpha1/shim_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,17 @@ type RuntimeClassSpec struct {
Handler string `json:"handler"`
}

// +kubebuilder:validation:Enum=rolling;recreate
type RolloutStrategyType string

const (
RolloutStrategyTypeRolling RolloutStrategyType = "rolling"
RolloutStrategyTypeRecreate RolloutStrategyType = "recreate"
)

type RolloutStrategy struct {
Type string `json:"type"`
Rolling RollingSpec `json:"rolling,omitempty"`
Type RolloutStrategyType `json:"type"`
Rolling RollingSpec `json:"rolling,omitempty"`
}

type RollingSpec struct {
Expand Down
3 changes: 3 additions & 0 deletions config/crd/bases/runtime.kwasm.sh_shims.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,9 @@ spec:
- maxUpdate
type: object
type:
enum:
- rolling
- recreate
type: string
required:
- type
Expand Down
125 changes: 65 additions & 60 deletions internal/controller/shim_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package controller

import (
"context"
"errors"
"fmt"
"math"
"os"
Expand All @@ -38,13 +39,15 @@ import (
"sigs.k8s.io/controller-runtime/pkg/predicate"
"sigs.k8s.io/controller-runtime/pkg/reconcile"

kwasmv1 "github.com/spinkube/runtime-class-manager/api/v1alpha1"
rcmv1 "github.com/spinkube/runtime-class-manager/api/v1alpha1"
)

const (
KwasmOperatorFinalizer = "kwasm.sh/finalizer"
INSTALL = "install"
UNINSTALL = "uninstall"
RCMOperatorFinalizer = "rcm.spinkube.dev/finalizer"
INSTALL = "install"
UNINSTALL = "uninstall"
ProvisioningStatusProvisioned = "provisioned"
ProvisioningStatusPending = "pending"
)

// ShimReconciler reconciles a Shim object
Expand All @@ -60,7 +63,7 @@ type ShimReconciler struct {
// SetupWithManager sets up the controller with the Manager.
func (sr *ShimReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
For(&kwasmv1.Shim{}).
For(&rcmv1.Shim{}).
// As we create and own the created jobs
// Jobs are important for us to update the Shims installation status
// on respective nodes
Expand Down Expand Up @@ -91,12 +94,20 @@ func (sr *ShimReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl
ctx = log.WithContext(ctx)

// 1. Check if the shim resource exists
var shimResource kwasmv1.Shim
var shimResource rcmv1.Shim
if err := sr.Client.Get(ctx, req.NamespacedName, &shimResource); err != nil {
log.Err(err).Msg("Unable to fetch shimResource")
return ctrl.Result{}, client.IgnoreNotFound(err)
}

// Ensure the finalizer is called even if a return happens before
defer func() {
err := sr.ensureFinalizerForShim(ctx, &shimResource, RCMOperatorFinalizer)
if err != nil {
log.Error().Msgf("Failed to ensure finalizer: %s", err)
}
}()

// 2. Get list of nodes where this shim is supposed to be deployed on
nodes, err := sr.getNodeListFromShimsNodeSelctor(ctx, &shimResource)
if err != nil {
Expand Down Expand Up @@ -137,15 +148,11 @@ func (sr *ShimReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl
// 4. Deploy job to each node in list
if len(nodes.Items) > 0 {
_, err = sr.handleInstallShim(ctx, &shimResource, nodes)
if err != nil {
return ctrl.Result{}, err
}
} else {
log.Info().Msg("No nodes found")
}

err = sr.ensureFinalizerForShim(ctx, &shimResource, KwasmOperatorFinalizer)
return ctrl.Result{}, client.IgnoreNotFound(err)
return ctrl.Result{}, err
}

// findShimsToReconcile finds all Shims that need to be reconciled.
Expand All @@ -154,7 +161,7 @@ func (sr *ShimReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl
// that the shim is deployed on the node if it should be.
func (sr *ShimReconciler) findShimsToReconcile(ctx context.Context, node client.Object) []reconcile.Request {
_ = node
shimList := &kwasmv1.ShimList{}
shimList := &rcmv1.ShimList{}
listOps := &client.ListOptions{
Namespace: "",
}
Expand All @@ -175,15 +182,15 @@ func (sr *ShimReconciler) findShimsToReconcile(ctx context.Context, node client.
return requests
}

func (sr *ShimReconciler) updateStatus(ctx context.Context, shim *kwasmv1.Shim, nodes *corev1.NodeList) error {
func (sr *ShimReconciler) updateStatus(ctx context.Context, shim *rcmv1.Shim, nodes *corev1.NodeList) error {
log := log.Ctx(ctx)

shim.Status.NodeCount = len(nodes.Items)
shim.Status.NodeReadyCount = 0

if len(nodes.Items) > 0 {
for _, node := range nodes.Items {
if node.Labels[shim.Name] == "provisioned" {
if node.Labels[shim.Name] == ProvisioningStatusProvisioned {
shim.Status.NodeReadyCount++
}
}
Expand All @@ -205,43 +212,50 @@ func (sr *ShimReconciler) updateStatus(ctx context.Context, shim *kwasmv1.Shim,
}

// handleInstallShim deploys a Job to each node in a list.
func (sr *ShimReconciler) handleInstallShim(ctx context.Context, shim *kwasmv1.Shim, nodes *corev1.NodeList) (ctrl.Result, error) {
func (sr *ShimReconciler) handleInstallShim(ctx context.Context, shim *rcmv1.Shim, nodes *corev1.NodeList) (ctrl.Result, error) {
log := log.Ctx(ctx)

switch shim.Spec.RolloutStrategy.Type {
case "rolling":
case rcmv1.RolloutStrategyTypeRolling:
{
log.Debug().Msgf("Rolling strategy selected: maxUpdate=%d", shim.Spec.RolloutStrategy.Rolling.MaxUpdate)
return ctrl.Result{}, errors.New("Rolling strategy not implemented yet")
}
case "recreate":
case rcmv1.RolloutStrategyTypeRecreate:
{
log.Debug().Msgf("Recreate strategy selected")
for i := range nodes.Items {
node := nodes.Items[i]

shimProvisioned := node.Labels[shim.Name] == "provisioned"
shimPending := node.Labels[shim.Name] == "pending"
if !shimProvisioned && !shimPending {
err := sr.deployJobOnNode(ctx, shim, node, INSTALL)
if err != nil {
return ctrl.Result{}, err
}
} else {
log.Info().Msgf("Shim %s already provisioned on Node %s", shim.Name, node.Name)
}
}
return sr.recreateStrategyRollout(ctx, shim, nodes)
}
default:
{
log.Debug().Msgf("No rollout strategy selected; using default: rolling")
log.Debug().Msgf("No rollout strategy selected; using default: recreate")
return sr.recreateStrategyRollout(ctx, shim, nodes)
}
}
}

return ctrl.Result{}, nil
func (sr *ShimReconciler) recreateStrategyRollout(ctx context.Context, shim *rcmv1.Shim, nodes *corev1.NodeList) (ctrl.Result, error) {
log := log.Ctx(ctx)
shimInstallationErrors := []error{}
for i := range nodes.Items {
node := nodes.Items[i]

shimProvisioned := node.Labels[shim.Name] == ProvisioningStatusProvisioned
shimPending := node.Labels[shim.Name] == ProvisioningStatusPending
if !shimProvisioned && !shimPending {
err := sr.deployJobOnNode(ctx, shim, node, INSTALL)
shimInstallationErrors = append(shimInstallationErrors, err)
}

if shimProvisioned {
log.Info().Msgf("Shim %s already provisioned on Node %s", shim.Name, node.Name)
}
}
return ctrl.Result{}, errors.Join(shimInstallationErrors...)
}

// deployUninstallJob deploys an uninstall Job for a Shim.
func (sr *ShimReconciler) deployJobOnNode(ctx context.Context, shim *kwasmv1.Shim, node corev1.Node, jobType string) error {
func (sr *ShimReconciler) deployJobOnNode(ctx context.Context, shim *rcmv1.Shim, node corev1.Node, jobType string) error {
log := log.Ctx(ctx)

if err := sr.Client.Get(ctx, types.NamespacedName{Name: node.Name}, &node); err != nil {
Expand All @@ -255,7 +269,7 @@ func (sr *ShimReconciler) deployJobOnNode(ctx context.Context, shim *kwasmv1.Shi

switch jobType {
case INSTALL:
err := sr.updateNodeLabels(ctx, &node, shim, "pending")
err := sr.updateNodeLabels(ctx, &node, shim, ProvisioningStatusPending)
if err != nil {
log.Error().Msgf("Unable to update node label %s: %s", shim.Name, err)
}
Expand Down Expand Up @@ -297,7 +311,7 @@ func (sr *ShimReconciler) deployJobOnNode(ctx context.Context, shim *kwasmv1.Shi
return nil
}

func (sr *ShimReconciler) updateNodeLabels(ctx context.Context, node *corev1.Node, shim *kwasmv1.Shim, status string) error {
func (sr *ShimReconciler) updateNodeLabels(ctx context.Context, node *corev1.Node, shim *rcmv1.Shim, status string) error {
node.Labels[shim.Name] = status

if err := sr.Update(ctx, node); err != nil {
Expand All @@ -308,16 +322,12 @@ func (sr *ShimReconciler) updateNodeLabels(ctx context.Context, node *corev1.Nod
}

// createJobManifest creates a Job manifest for a Shim.
func (sr *ShimReconciler) createJobManifest(shim *kwasmv1.Shim, node *corev1.Node, operation string) (*batchv1.Job, error) {
func (sr *ShimReconciler) createJobManifest(shim *rcmv1.Shim, node *corev1.Node, operation string) (*batchv1.Job, error) {
priv := true
name := node.Name + "-" + shim.Name + "-" + operation
nameMax := int(math.Min(float64(len(name)), 63))

job := &batchv1.Job{
TypeMeta: metav1.TypeMeta{
Kind: "Job",
APIVersion: "batch/v1",
},
ObjectMeta: metav1.ObjectMeta{
Name: name[:nameMax],
Namespace: os.Getenv("CONTROLLER_NAMESPACE"),
Expand Down Expand Up @@ -397,7 +407,7 @@ func (sr *ShimReconciler) createJobManifest(shim *kwasmv1.Shim, node *corev1.Nod
}

// handleDeployRuntimeClass deploys a RuntimeClass for a Shim.
func (sr *ShimReconciler) handleDeployRuntimeClass(ctx context.Context, shim *kwasmv1.Shim) (ctrl.Result, error) {
func (sr *ShimReconciler) handleDeployRuntimeClass(ctx context.Context, shim *rcmv1.Shim) (ctrl.Result, error) {
log := log.Ctx(ctx)

log.Info().Msgf("Deploying RuntimeClass: %s", shim.Spec.RuntimeClass.Name)
Expand All @@ -423,7 +433,7 @@ func (sr *ShimReconciler) handleDeployRuntimeClass(ctx context.Context, shim *kw
}

// createRuntimeClassManifest creates a RuntimeClass manifest for a Shim.
func (sr *ShimReconciler) createRuntimeClassManifest(shim *kwasmv1.Shim) (*nodev1.RuntimeClass, error) {
func (sr *ShimReconciler) createRuntimeClassManifest(shim *rcmv1.Shim) (*nodev1.RuntimeClass, error) {
name := shim.Name
nameMax := int(math.Min(float64(len(name)), 63))

Expand All @@ -433,14 +443,9 @@ func (sr *ShimReconciler) createRuntimeClassManifest(shim *kwasmv1.Shim) (*nodev
}

runtimeClass := &nodev1.RuntimeClass{
TypeMeta: metav1.TypeMeta{
Kind: "RuntimeClass",
APIVersion: "node.k8s.io/v1",
},
ObjectMeta: metav1.ObjectMeta{
Name: name[:nameMax],
Namespace: os.Getenv("CONTROLLER_NAMESPACE"),
Labels: map[string]string{name[:nameMax]: "true"},
Name: name[:nameMax],
Labels: map[string]string{name[:nameMax]: "true"},
},
Handler: shim.Spec.RuntimeClass.Handler,
Scheduling: &nodev1.Scheduling{
Expand All @@ -456,7 +461,7 @@ func (sr *ShimReconciler) createRuntimeClassManifest(shim *kwasmv1.Shim) (*nodev
}

// handleDeleteShim deletes all possible child resources of a Shim. It will ignore NotFound errors.
func (sr *ShimReconciler) handleDeleteShim(ctx context.Context, shim *kwasmv1.Shim, nodes *corev1.NodeList) error {
func (sr *ShimReconciler) handleDeleteShim(ctx context.Context, shim *rcmv1.Shim, nodes *corev1.NodeList) error {
// deploy uninstall job on every node in node list
for i := range nodes.Items {
node := nodes.Items[i]
Expand All @@ -473,15 +478,15 @@ func (sr *ShimReconciler) handleDeleteShim(ctx context.Context, shim *kwasmv1.Sh
return nil
}

func (sr *ShimReconciler) getNodeListFromShimsNodeSelctor(ctx context.Context, shim *kwasmv1.Shim) (*corev1.NodeList, error) {
func (sr *ShimReconciler) getNodeListFromShimsNodeSelctor(ctx context.Context, shim *rcmv1.Shim) (*corev1.NodeList, error) {
nodes := &corev1.NodeList{}
if shim.Spec.NodeSelector != nil {
err := sr.List(ctx, nodes, client.InNamespace(shim.Namespace), client.MatchingLabels(shim.Spec.NodeSelector))
err := sr.List(ctx, nodes, client.MatchingLabels(shim.Spec.NodeSelector))
if err != nil {
return &corev1.NodeList{}, fmt.Errorf("failed to get node list: %w", err)
}
} else {
err := sr.List(ctx, nodes, client.InNamespace(shim.Namespace))
err := sr.List(ctx, nodes)
if err != nil {
return &corev1.NodeList{}, fmt.Errorf("failed to get node list: %w", err)
}
Expand All @@ -491,7 +496,7 @@ func (sr *ShimReconciler) getNodeListFromShimsNodeSelctor(ctx context.Context, s
}

// runtimeClassExists checks whether a RuntimeClass for a Shim exists.
func (sr *ShimReconciler) runtimeClassExists(ctx context.Context, shim *kwasmv1.Shim) (bool, error) {
func (sr *ShimReconciler) runtimeClassExists(ctx context.Context, shim *rcmv1.Shim) (bool, error) {
log := log.Ctx(ctx)

if shim.Spec.RuntimeClass.Name != "" {
Expand All @@ -509,7 +514,7 @@ func (sr *ShimReconciler) runtimeClassExists(ctx context.Context, shim *kwasmv1.
}

// getRuntimeClass finds a RuntimeClass.
func (sr *ShimReconciler) getRuntimeClass(ctx context.Context, shim *kwasmv1.Shim) (*nodev1.RuntimeClass, error) {
func (sr *ShimReconciler) getRuntimeClass(ctx context.Context, shim *rcmv1.Shim) (*nodev1.RuntimeClass, error) {
rc := nodev1.RuntimeClass{}
err := sr.Client.Get(ctx, types.NamespacedName{Name: shim.Spec.RuntimeClass.Name, Namespace: shim.Namespace}, &rc)
if err != nil {
Expand All @@ -519,9 +524,9 @@ func (sr *ShimReconciler) getRuntimeClass(ctx context.Context, shim *kwasmv1.Shi
}

// removeFinalizerFromShim removes the finalizer from a Shim.
func (sr *ShimReconciler) removeFinalizerFromShim(ctx context.Context, shim *kwasmv1.Shim) error {
if controllerutil.ContainsFinalizer(shim, KwasmOperatorFinalizer) {
controllerutil.RemoveFinalizer(shim, KwasmOperatorFinalizer)
func (sr *ShimReconciler) removeFinalizerFromShim(ctx context.Context, shim *rcmv1.Shim) error {
if controllerutil.ContainsFinalizer(shim, RCMOperatorFinalizer) {
controllerutil.RemoveFinalizer(shim, RCMOperatorFinalizer)
if err := sr.Client.Update(ctx, shim); err != nil {
return fmt.Errorf("failed to remove finalizer: %w", err)
}
Expand All @@ -530,7 +535,7 @@ func (sr *ShimReconciler) removeFinalizerFromShim(ctx context.Context, shim *kwa
}

// ensureFinalizerForShim ensures the finalizer is present on a Shim resource.
func (sr *ShimReconciler) ensureFinalizerForShim(ctx context.Context, shim *kwasmv1.Shim, finalizer string) error {
func (sr *ShimReconciler) ensureFinalizerForShim(ctx context.Context, shim *rcmv1.Shim, finalizer string) error {
if !controllerutil.ContainsFinalizer(shim, finalizer) {
controllerutil.AddFinalizer(shim, finalizer)
if err := sr.Client.Update(ctx, shim); err != nil {
Expand Down

0 comments on commit 3b6e1db

Please sign in to comment.