diff --git a/cmd/controller/controller.go b/cmd/controller/controller.go index d122a4676812..1d24627d2be6 100644 --- a/cmd/controller/controller.go +++ b/cmd/controller/controller.go @@ -42,6 +42,7 @@ import ( "github.com/k0sproject/k0s/pkg/constant" "github.com/k0sproject/k0s/pkg/k0scontext" "github.com/k0sproject/k0s/pkg/kubernetes" + "github.com/k0sproject/k0s/pkg/leaderelection" "github.com/k0sproject/k0s/pkg/performance" "github.com/k0sproject/k0s/pkg/telemetry" "github.com/k0sproject/k0s/pkg/token" @@ -311,7 +312,7 @@ func (c *command) start(ctx context.Context, flags *config.ControllerOptions, de // One leader elector per controller if singleController { - leaderElector = &leaderelector.Dummy{Leader: true} + leaderElector = leaderelector.Off() } else { // The name used to be hardcoded in the component itself // At some point we need to rename this. @@ -324,7 +325,10 @@ func (c *command) start(ctx context.Context, flags *config.ControllerOptions, de K0sVars: c.K0sVars, KubeClientFactory: adminClientFactory, IgnoredStacks: []string{ + controller.AutopilotStackName, controller.ClusterConfigStackName, + controller.EtcdMemberStackName, + controller.HelmExtensionStackName, controller.SystemRBACStackName, controller.WindowsStackName, }, @@ -373,8 +377,8 @@ func (c *command) start(ctx context.Context, flags *config.ControllerOptions, de if err != nil { return err } - clusterComponents.Add(ctx, controller.NewCRD(c.K0sVars.ManifestsDir, "etcd", controller.WithStackName("etcd-member"))) - nodeComponents.Add(ctx, etcdReconciler) + clusterComponents.Add(ctx, controller.NewCRDStack(adminClientFactory, "etcd", controller.WithStackName(controller.EtcdMemberStackName))) + clusterComponents.Add(ctx, etcdReconciler) } perfTimer.Checkpoint("starting-certificates-init") @@ -420,7 +424,7 @@ func (c *command) start(ctx context.Context, flags *config.ControllerOptions, de if flags.EnableDynamicConfig { clusterComponents.Add(ctx, controller.NewClusterConfigInitializer( adminClientFactory, - leaderElector, + func() leaderelection.Status { status, _ := leaderElector.CurrentStatus(); return status }, nodeConfig, )) @@ -439,7 +443,6 @@ func (c *command) start(ctx context.Context, flags *config.ControllerOptions, de )) if !slices.Contains(flags.DisableComponents, constant.HelmComponentName) { - clusterComponents.Add(ctx, controller.NewCRD(c.K0sVars.ManifestsDir, "helm")) clusterComponents.Add(ctx, controller.NewExtensionsController( c.K0sVars, adminClientFactory, @@ -448,13 +451,13 @@ func (c *command) start(ctx context.Context, flags *config.ControllerOptions, de } if !slices.Contains(flags.DisableComponents, constant.AutopilotComponentName) { - clusterComponents.Add(ctx, controller.NewCRD(c.K0sVars.ManifestsDir, "autopilot")) + clusterComponents.Add(ctx, controller.NewCRDStack(adminClientFactory, controller.AutopilotStackName)) } if enableK0sEndpointReconciler { clusterComponents.Add(ctx, controller.NewEndpointReconciler( nodeConfig, - leaderElector, + func() leaderelection.Status { status, _ := leaderElector.CurrentStatus(); return status }, adminClientFactory, net.DefaultResolver, nodeConfig.PrimaryAddressFamily(), diff --git a/inttest/addons/addons_test.go b/inttest/addons/addons_test.go index 7a87255b674d..e8e60ebf467c 100644 --- a/inttest/addons/addons_test.go +++ b/inttest/addons/addons_test.go @@ -8,6 +8,7 @@ import ( "context" "crypto" "crypto/x509" + "encoding/json" "errors" "fmt" "os" @@ -24,7 +25,6 @@ import ( "github.com/cloudflare/cfssl/initca" "github.com/cloudflare/cfssl/signer" "github.com/cloudflare/cfssl/signer/local" - "github.com/k0sproject/k0s/internal/pkg/templatewriter" "github.com/k0sproject/k0s/inttest/common" helmv1beta1 "github.com/k0sproject/k0s/pkg/apis/helm/v1beta1" k0sv1beta1 "github.com/k0sproject/k0s/pkg/apis/k0s/v1beta1" @@ -34,11 +34,14 @@ import ( "github.com/stretchr/testify/require" "helm.sh/helm/v3/pkg/action" "helm.sh/helm/v3/pkg/cli" + corev1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/fields" + "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/wait" - k8s "k8s.io/client-go/kubernetes" - "k8s.io/utils/ptr" + "k8s.io/client-go/dynamic" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" crlog "sigs.k8s.io/controller-runtime/pkg/log" @@ -173,13 +176,7 @@ func (as *AddonsSuite) TestHelmBasedAddons() { as.Run("Rename chart in Helm extension", func() { as.renameChart(ctx) }) - values := map[string]any{ - "replicaCount": 2, - "image": map[string]any{ - "pullPolicy": "Always", - }, - } - as.doTestAddonUpdate(addonName, values) + as.doTestAddonUpdate(ctx, addonName) chart := as.waitForTestRelease(addonName, "0.6.0", metav1.NamespaceDefault, 2) as.Require().NoError(as.checkCustomValues(chart.Status.ReleaseName)) as.deleteRelease(chart) @@ -243,19 +240,22 @@ func (as *AddonsSuite) renameChart(ctx context.Context) { func (as *AddonsSuite) deleteRelease(chart *helmv1beta1.Chart) { ctx := as.Context() as.T().Logf("Deleting chart %s/%s", chart.Namespace, chart.Name) - ssh, err := as.SSH(ctx, as.ControllerNode(0)) + kubeconfig, err := as.GetKubeConfig(as.ControllerNode(0)) as.Require().NoError(err) - defer ssh.Disconnect() - _, err = ssh.ExecWithOutput(ctx, "rm /var/lib/k0s/manifests/helm/0_helm_extension_test-addon.yaml") + dyn, err := dynamic.NewForConfig(kubeconfig) as.Require().NoError(err) - cfg, err := as.GetKubeConfig(as.ControllerNode(0)) - as.Require().NoError(err) - k8sclient, err := k8s.NewForConfig(cfg) + + _, err = patchResource( + ctx, dyn.Resource(k0sv1beta1.SchemeGroupVersion.WithResource("clusterconfigs")).Namespace(metav1.NamespaceSystem), "k0s", + patchOp{"test", "/spec/extensions/helm/charts/0/name", chart.Status.ReleaseName}, + patchOp{"remove", "/spec/extensions/helm/charts/0", nil}, + ) as.Require().NoError(err) as.Require().NoError(wait.PollUntilContextCancel(ctx, 1*time.Second, true, func(pollCtx context.Context) (done bool, err error) { as.T().Logf("Expecting have no secrets left for release %s/%s", chart.Status.Namespace, chart.Status.ReleaseName) - items, err := k8sclient.CoreV1().Secrets(chart.Status.Namespace).List(pollCtx, metav1.ListOptions{ - LabelSelector: "name=" + chart.Status.ReleaseName, + secrets := dyn.Resource(corev1.SchemeGroupVersion.WithResource("secrets")) + items, err := secrets.Namespace(chart.Status.Namespace).List(pollCtx, metav1.ListOptions{ + LabelSelector: fields.OneTermEqualSelector("name", chart.Status.ReleaseName).String(), }) if err != nil { if ctxErr := context.Cause(ctx); ctxErr != nil { @@ -271,19 +271,16 @@ func (as *AddonsSuite) deleteRelease(chart *helmv1beta1.Chart) { return true, nil })) - chartClient, err := client.New(cfg, client.Options{Scheme: k0sscheme.Scheme}) - as.Require().NoError(err) - as.T().Logf("Expecting chart %s/%s to be deleted", chart.Namespace, chart.Name) var lastResourceVersion string as.Require().NoError(wait.PollUntilContextCancel(ctx, 1*time.Second, true, func(ctx context.Context) (bool, error) { - var found helmv1beta1.Chart - err := chartClient.Get(ctx, client.ObjectKey{Namespace: chart.Namespace, Name: chart.Name}, &found) + charts := dyn.Resource(helmv1beta1.SchemeGroupVersion.WithResource("charts")) + found, err := charts.Namespace(chart.Namespace).Get(ctx, chart.Name, metav1.GetOptions{}) switch { case err == nil: - if lastResourceVersion == "" || lastResourceVersion != found.ResourceVersion { + if lastResourceVersion == "" || lastResourceVersion != found.GetResourceVersion() { as.T().Log("Chart not yet deleted") - lastResourceVersion = found.ResourceVersion + lastResourceVersion = found.GetResourceVersion() } return false, nil @@ -461,33 +458,25 @@ func (as *AddonsSuite) checkCustomValues(releaseName string) error { }) } -func (as *AddonsSuite) doTestAddonUpdate(addonName string, values map[string]any) { - path := fmt.Sprintf("/var/lib/k0s/manifests/helm/0_helm_extension_%s.yaml", addonName) - valuesBytes, err := yaml.Marshal(values) +func (as *AddonsSuite) doTestAddonUpdate(ctx context.Context, addonName string) { + kubeconfig, err := as.GetKubeConfig(as.ControllerNode(0)) + as.Require().NoError(err) + dyn, err := dynamic.NewForConfig(kubeconfig) as.Require().NoError(err) - tw := templatewriter.TemplateWriter{ - Name: "testChartUpdate", - Template: chartCrdTemplate, - Data: struct { - Name string - ChartName string - Values string - Version string - TargetNS string - ForceUpgrade *bool - }{ - Name: "test-addon", - ChartName: "ealenn/echo-server", - Values: string(valuesBytes), - Version: "0.5.0", - TargetNS: metav1.NamespaceDefault, - ForceUpgrade: ptr.To(false), - }, - } - buf := bytes.NewBuffer([]byte{}) - as.Require().NoError(tw.WriteToBuffer(buf)) - as.PutFile(as.ControllerNode(0), path, buf.String()) + _, err = patchResource( + ctx, dyn.Resource(k0sv1beta1.SchemeGroupVersion.WithResource("clusterconfigs")).Namespace(metav1.NamespaceSystem), "k0s", + patchOp{"test", "/spec/extensions/helm/charts/0/name", addonName}, + patchOp{"replace", "/spec/extensions/helm/charts/0", map[string]any{ + "name": addonName, + "chartname": "ealenn/echo-server", + "values": `{"replicaCount": 2, "image": {"pullPolicy": "Always"}}`, + "version": "0.5.0", + "namespace": metav1.NamespaceDefault, + "forceUpgrade": false, + }}, + ) + as.Require().NoError(err) } func TestAddonsSuite(t *testing.T) { @@ -507,6 +496,21 @@ func TestAddonsSuite(t *testing.T) { suite.Run(t, &s) } +type patchOp = struct { + Op string `json:"op"` + Path string `json:"path"` + Value any `json:"value,omitempty"` +} + +//nolint:unparam // returned object not yet used +func patchResource(ctx context.Context, c dynamic.ResourceInterface, name string, ops ...patchOp) (*unstructured.Unstructured, error) { + if patch, err := json.Marshal(ops); err != nil { + return nil, err + } else { + return c.Patch(ctx, name, types.JSONPatchType, patch, metav1.PatchOptions{}) + } +} + type k0sConfigParams struct { BasicAddonName string @@ -550,25 +554,3 @@ spec: ` var k0sConfigWithAddonTemplate = template.Must(template.New("k0sConfigWithAddon").Parse(k0sConfigWithAddonRawTemplate)) - -// TODO: this actually duplicates logic from the controller code -// better to somehow handle it by programmatic api -const chartCrdTemplate = ` -apiVersion: helm.k0sproject.io/v1beta1 -kind: Chart -metadata: - name: k0s-addon-chart-{{ .Name }} - namespace: ` + metav1.NamespaceSystem + ` - finalizers: - - helm.k0sproject.io/uninstall-helm-release -spec: - chartName: {{ .ChartName }} - releaseName: {{ .Name }} - values: | -{{ .Values | nindent 4 }} - version: {{ .Version }} - namespace: {{ .TargetNS }} -{{- if ne .ForceUpgrade nil }} - forceUpgrade: {{ .ForceUpgrade }} -{{- end }} -` diff --git a/pkg/applier/manager_test.go b/pkg/applier/manager_test.go index dcbf11d3c592..dfce611c672e 100644 --- a/pkg/applier/manager_test.go +++ b/pkg/applier/manager_test.go @@ -38,13 +38,13 @@ import ( func TestManager_AppliesStacks(t *testing.T) { k0sVars, err := config.NewCfgVars(nil, t.TempDir()) require.NoError(t, err) - leaderElector := leaderelector.Dummy{Leader: true} + leaderElector := leaderelector.Off() clients := testutil.NewFakeClientFactory() underTest := applier.Manager{ K0sVars: k0sVars, KubeClientFactory: clients, - LeaderElector: &leaderElector, + LeaderElector: leaderElector, } // A stack that exists on disk before the manager is started. @@ -100,14 +100,14 @@ data: {} func TestManager_IgnoresStacks(t *testing.T) { k0sVars, err := config.NewCfgVars(nil, t.TempDir()) require.NoError(t, err) - leaderElector := leaderelector.Dummy{Leader: true} + leaderElector := leaderelector.Off() clients := testutil.NewFakeClientFactory() underTest := applier.Manager{ K0sVars: k0sVars, IgnoredStacks: []string{"ignored"}, KubeClientFactory: clients, - LeaderElector: &leaderElector, + LeaderElector: leaderElector, } ignored := filepath.Join(k0sVars.ManifestsDir, "ignored") diff --git a/pkg/applier/stack.go b/pkg/applier/stack.go index c45a5a3f9b43..cf27d748ac77 100644 --- a/pkg/applier/stack.go +++ b/pkg/applier/stack.go @@ -4,11 +4,13 @@ package applier import ( + "cmp" "context" "crypto/md5" "encoding/hex" "errors" "fmt" + "io" "slices" "sync" "time" @@ -23,15 +25,60 @@ import ( "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/types" + "k8s.io/cli-runtime/pkg/resource" "k8s.io/client-go/discovery" "k8s.io/client-go/dynamic" "k8s.io/client-go/restmapper" "k8s.io/utils/ptr" + "github.com/avast/retry-go" jsonpatch "github.com/evanphx/json-patch" "github.com/sirupsen/logrus" ) +func ApplyStack(ctx context.Context, clients kubernetes.ClientFactoryInterface, src io.Reader, srcName, stackName string) error { + infos, err := resource.NewLocalBuilder(). + Unstructured(). + Stream(src, srcName). + Flatten(). + Do(). + Infos() + if err != nil { + return err + } + + resources := make([]*unstructured.Unstructured, len(infos)) + for i := range infos { + resources[i] = infos[i].Object.(*unstructured.Unstructured) + } + + var lastErr error + if err := retry.Do( + func() error { + stack := Stack{ + Name: stackName, + Resources: resources, + Clients: clients, + } + lastErr = stack.Apply(ctx, true) + return lastErr + }, + retry.Context(ctx), + retry.LastErrorOnly(true), + retry.OnRetry(func(attempt uint, err error) { + logrus.WithFields(logrus.Fields{ + "component": "applier", + "stack": stackName, + "attempt": attempt + 1, + }).WithError(err).Debug("Failed to apply stack, retrying after backoff") + }), + ); err != nil { + return cmp.Or(lastErr, err) + } + + return nil +} + // Stack is a k8s resource bundle type Stack struct { Name string diff --git a/pkg/autopilot/controller/setup.go b/pkg/autopilot/controller/setup.go index 70e2a3be25dd..d2ba37cb938e 100644 --- a/pkg/autopilot/controller/setup.go +++ b/pkg/autopilot/controller/setup.go @@ -9,7 +9,6 @@ import ( "context" "fmt" "runtime" - "time" apv1beta2 "github.com/k0sproject/k0s/pkg/apis/autopilot/v1beta2" apcli "github.com/k0sproject/k0s/pkg/autopilot/client" @@ -17,12 +16,10 @@ import ( apconst "github.com/k0sproject/k0s/pkg/autopilot/constant" "github.com/k0sproject/k0s/pkg/build" "github.com/k0sproject/k0s/pkg/component/status" - "github.com/k0sproject/k0s/pkg/kubernetes/watch" "github.com/avast/retry-go" "github.com/sirupsen/logrus" corev1 "k8s.io/api/core/v1" - extensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) @@ -81,7 +78,7 @@ func (sc *setupController) Run(ctx context.Context) error { if err := retry.Do(func() error { logger.Infof("Attempting to create controlnode '%s'", controlNodeName) - if err := sc.createControlNode(ctx, sc.clientFactory, controlNodeName, kubeletNodeName); err != nil { + if err := sc.createControlNode(ctx, controlNodeName, kubeletNodeName); err != nil { return fmt.Errorf("create controlnode '%s' attempt failed, retrying: %w", controlNodeName, err) } @@ -112,7 +109,7 @@ func createNamespace(ctx context.Context, cf apcli.FactoryInterface, name string // createControlNode creates a new control node, ignoring errors if one already exists // for this physical host. -func (sc *setupController) createControlNode(ctx context.Context, cf apcli.FactoryInterface, name, nodeName string) error { +func (sc *setupController) createControlNode(ctx context.Context, name, nodeName string) error { logger := sc.log.WithField("component", "setup") client, err := sc.clientFactory.GetK0sClient() if err != nil { @@ -122,13 +119,6 @@ func (sc *setupController) createControlNode(ctx context.Context, cf apcli.Facto // Create the ControlNode object if needed node, err := client.AutopilotV1beta2().ControlNodes().Get(ctx, name, metav1.GetOptions{}) if errors.IsNotFound(err) { - logger.Info("Autopilot 'controlnodes' CRD not found, waiting...") - if err := sc.waitForControlNodesCRD(ctx, cf); err != nil { - return fmt.Errorf("while waiting for autopilot 'controlnodes' CRD: %w", err) - } - - logger.Info("Autopilot 'controlnodes' CRD found, continuing") - logger.Infof("ControlNode '%s' not found, creating", name) mode := apconst.K0SControlNodeModeController if sc.enableWorker { @@ -208,36 +198,3 @@ func getControllerAPIAddress() (string, error) { return status.ClusterConfig.Spec.API.Address, nil } - -// waitForControlNodesCRD waits until the controlnodes CRD is established for -// max 2 minutes. -func (sc *setupController) waitForControlNodesCRD(ctx context.Context, cf apcli.FactoryInterface) error { - extClient, err := cf.GetExtensionClient() - if err != nil { - return fmt.Errorf("unable to obtain extensions client: %w", err) - } - - ctx, cancel := context.WithTimeout(ctx, 2*time.Minute) - defer cancel() - return watch.CRDs(extClient.CustomResourceDefinitions()). - WithObjectName("controlnodes."+apv1beta2.GroupName). - WithErrorCallback(func(err error) (time.Duration, error) { - if retryDelay, e := watch.IsRetryable(err); e == nil { - sc.log.WithError(err).Debugf( - "Encountered transient error while waiting for autopilot 'controlnodes' CRD, retrying in %s", - retryDelay, - ) - return retryDelay, nil - } - return 0, err - }). - Until(ctx, func(item *extensionsv1.CustomResourceDefinition) (bool, error) { - for _, cond := range item.Status.Conditions { - if cond.Type == extensionsv1.Established { - return cond.Status == extensionsv1.ConditionTrue, nil - } - } - - return false, nil - }) -} diff --git a/pkg/component/controller/apiendpointreconciler.go b/pkg/component/controller/apiendpointreconciler.go index 05897c677a9d..171d8508750e 100644 --- a/pkg/component/controller/apiendpointreconciler.go +++ b/pkg/component/controller/apiendpointreconciler.go @@ -12,8 +12,8 @@ import ( "time" "github.com/k0sproject/k0s/pkg/apis/k0s/v1beta1" - "github.com/k0sproject/k0s/pkg/component/controller/leaderelector" kubeutil "github.com/k0sproject/k0s/pkg/kubernetes" + "github.com/k0sproject/k0s/pkg/leaderelection" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" @@ -36,7 +36,7 @@ type APIEndpointReconciler struct { externalAddress string apiServerPort int - leaderElector leaderelector.Interface + leaderStatus func() leaderelection.Status kubeClientFactory kubeutil.ClientFactoryInterface resolver resolver afnet string @@ -45,7 +45,7 @@ type APIEndpointReconciler struct { } // NewEndpointReconciler creates new endpoint reconciler -func NewEndpointReconciler(nodeConfig *v1beta1.ClusterConfig, leaderElector leaderelector.Interface, kubeClientFactory kubeutil.ClientFactoryInterface, resolver resolver, primaryAddressFamily v1beta1.PrimaryAddressFamilyType) *APIEndpointReconciler { +func NewEndpointReconciler(nodeConfig *v1beta1.ClusterConfig, leaderStatus func() leaderelection.Status, kubeClientFactory kubeutil.ClientFactoryInterface, resolver resolver, primaryAddressFamily v1beta1.PrimaryAddressFamilyType) *APIEndpointReconciler { var afnet string switch primaryAddressFamily { case v1beta1.PrimaryFamilyIPv4: @@ -58,7 +58,7 @@ func NewEndpointReconciler(nodeConfig *v1beta1.ClusterConfig, leaderElector lead logger: logrus.WithFields(logrus.Fields{"component": "endpointreconciler"}), externalAddress: nodeConfig.Spec.API.ExternalHost(), apiServerPort: nodeConfig.Spec.API.ExternalPort(), - leaderElector: leaderElector, + leaderStatus: leaderStatus, stopCh: make(chan struct{}), kubeClientFactory: kubeClientFactory, resolver: resolver, @@ -101,7 +101,7 @@ func (a *APIEndpointReconciler) Stop() error { } func (a *APIEndpointReconciler) reconcileEndpoints(ctx context.Context) error { - if !a.leaderElector.IsLeader() { + if a.leaderStatus() != leaderelection.StatusLeading { a.logger.Debug("Not the leader, not reconciling API endpoints") return nil } diff --git a/pkg/component/controller/apiendpointreconciler_test.go b/pkg/component/controller/apiendpointreconciler_test.go index 0ad3b6c765aa..bd8459bb4f26 100644 --- a/pkg/component/controller/apiendpointreconciler_test.go +++ b/pkg/component/controller/apiendpointreconciler_test.go @@ -10,7 +10,7 @@ import ( "testing" "github.com/k0sproject/k0s/pkg/apis/k0s/v1beta1" - "github.com/k0sproject/k0s/pkg/component/controller/leaderelector" + "github.com/k0sproject/k0s/pkg/leaderelection" corev1 "k8s.io/api/core/v1" k8serrors "k8s.io/apimachinery/pkg/api/errors" @@ -36,7 +36,8 @@ func TestBasicReconcilerWithNoLeader(t *testing.T) { config := getFakeConfig() - r := NewEndpointReconciler(config, &leaderelector.Dummy{Leader: false}, fakeFactory, fakeResolver{}, v1beta1.PrimaryFamilyIPv4) + leaderStatus := func() leaderelection.Status { return leaderelection.StatusPending } + r := NewEndpointReconciler(config, leaderStatus, fakeFactory, fakeResolver{}, v1beta1.PrimaryFamilyIPv4) ctx := t.Context() assert.NoError(t, r.Init(ctx)) @@ -70,7 +71,8 @@ func TestBasicReconcilerWithNoExistingEndpoint(t *testing.T) { fakeFactory := testutil.NewFakeClientFactory() config := getFakeConfig() - r := NewEndpointReconciler(config, &leaderelector.Dummy{Leader: true}, fakeFactory, fakeResolver{}, test.afnet) + leaderStatus := func() leaderelection.Status { return leaderelection.StatusLeading } + r := NewEndpointReconciler(config, leaderStatus, fakeFactory, fakeResolver{}, test.afnet) ctx := t.Context() assert.NoError(t, r.Init(ctx)) @@ -100,7 +102,8 @@ func TestBasicReconcilerWithEmptyEndpointSubset(t *testing.T) { assert.NoError(t, err) config := getFakeConfig() - r := NewEndpointReconciler(config, &leaderelector.Dummy{Leader: true}, fakeFactory, fakeResolver{}, v1beta1.PrimaryFamilyIPv4) + leaderStatus := func() leaderelection.Status { return leaderelection.StatusLeading } + r := NewEndpointReconciler(config, leaderStatus, fakeFactory, fakeResolver{}, v1beta1.PrimaryFamilyIPv4) assert.NoError(t, r.Init(ctx)) @@ -136,7 +139,8 @@ func TestReconcilerWithNoNeedForUpdate(t *testing.T) { config := getFakeConfig() - r := NewEndpointReconciler(config, &leaderelector.Dummy{Leader: true}, fakeFactory, fakeResolver{}, v1beta1.PrimaryFamilyIPv4) + leaderStatus := func() leaderelection.Status { return leaderelection.StatusLeading } + r := NewEndpointReconciler(config, leaderStatus, fakeFactory, fakeResolver{}, v1beta1.PrimaryFamilyIPv4) assert.NoError(t, r.Init(ctx)) @@ -173,7 +177,8 @@ func TestReconcilerWithNeedForUpdate(t *testing.T) { config := getFakeConfig() - r := NewEndpointReconciler(config, &leaderelector.Dummy{Leader: true}, fakeFactory, fakeResolver{}, v1beta1.PrimaryFamilyIPv4) + leaderStatus := func() leaderelection.Status { return leaderelection.StatusLeading } + r := NewEndpointReconciler(config, leaderStatus, fakeFactory, fakeResolver{}, v1beta1.PrimaryFamilyIPv4) assert.NoError(t, r.Init(ctx)) assert.NoError(t, r.reconcileEndpoints(ctx)) diff --git a/pkg/component/controller/autopilot.go b/pkg/component/controller/autopilot.go index 21fc3d991982..4b03ab7219f2 100644 --- a/pkg/component/controller/autopilot.go +++ b/pkg/component/controller/autopilot.go @@ -18,6 +18,8 @@ import ( "github.com/sirupsen/logrus" ) +const AutopilotStackName = "autopilot" + var _ manager.Component = (*Autopilot)(nil) type Autopilot struct { diff --git a/pkg/component/controller/clusterconfig.go b/pkg/component/controller/clusterconfig.go index 0421c640dd60..d14e15067781 100644 --- a/pkg/component/controller/clusterconfig.go +++ b/pkg/component/controller/clusterconfig.go @@ -16,10 +16,10 @@ import ( "github.com/k0sproject/k0s/pkg/applier" k0sv1beta1client "github.com/k0sproject/k0s/pkg/client/clientset/typed/k0s/v1beta1" "github.com/k0sproject/k0s/pkg/component/controller/clusterconfig" - "github.com/k0sproject/k0s/pkg/component/controller/leaderelector" "github.com/k0sproject/k0s/pkg/component/manager" "github.com/k0sproject/k0s/pkg/constant" "github.com/k0sproject/k0s/pkg/kubernetes" + "github.com/k0sproject/k0s/pkg/leaderelection" "github.com/k0sproject/k0s/static" corev1 "k8s.io/api/core/v1" @@ -142,7 +142,7 @@ func (r *ClusterConfigReconciler) reportStatus(ctx context.Context, config *k0sv type ClusterConfigInitializer struct { log logrus.FieldLogger clients kubernetes.ClientFactoryInterface - leaderElector leaderelector.Interface + leaderStatus func() leaderelection.Status initialConfig *k0sv1beta1.ClusterConfig } @@ -160,11 +160,11 @@ func (i *ClusterConfigInitializer) Start(ctx context.Context) error { // Stop implements [manager.Component]. func (*ClusterConfigInitializer) Stop() error { return nil } -func NewClusterConfigInitializer(clients kubernetes.ClientFactoryInterface, leaderElector leaderelector.Interface, initialConfig *k0sv1beta1.ClusterConfig) *ClusterConfigInitializer { +func NewClusterConfigInitializer(clients kubernetes.ClientFactoryInterface, leaderStatus func() leaderelection.Status, initialConfig *k0sv1beta1.ClusterConfig) *ClusterConfigInitializer { return &ClusterConfigInitializer{ log: logrus.WithField("component", "clusterConfigInitializer"), clients: clients, - leaderElector: leaderElector, + leaderStatus: leaderStatus, initialConfig: initialConfig, } } @@ -180,7 +180,7 @@ func (i *ClusterConfigInitializer) ensureClusterConfigExistence(ctx context.Cont start := time.Now() var stackApplied bool for { - if i.leaderElector.IsLeader() { + if i.leaderStatus() == leaderelection.StatusLeading { if stackApplied { err = nil } else { diff --git a/pkg/component/controller/clusterconfig_test.go b/pkg/component/controller/clusterconfig_test.go index de0be056b4ad..99dbf2551b79 100644 --- a/pkg/component/controller/clusterconfig_test.go +++ b/pkg/component/controller/clusterconfig_test.go @@ -13,8 +13,8 @@ import ( "github.com/k0sproject/k0s/internal/testutil" k0sv1beta1 "github.com/k0sproject/k0s/pkg/apis/k0s/v1beta1" "github.com/k0sproject/k0s/pkg/component/controller" - "github.com/k0sproject/k0s/pkg/component/controller/leaderelector" "github.com/k0sproject/k0s/pkg/constant" + "github.com/k0sproject/k0s/pkg/leaderelection" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -27,12 +27,12 @@ import ( func TestClusterConfigInitializer_Create(t *testing.T) { clients := testutil.NewFakeClientFactory() - leaderElector := leaderelector.Dummy{Leader: true} + leaderStatus := func() leaderelection.Status { return leaderelection.StatusLeading } initialConfig := k0sv1beta1.DefaultClusterConfig() initialConfig.ResourceVersion = "42" underTest := controller.NewClusterConfigInitializer( - clients, &leaderElector, initialConfig.DeepCopy(), + clients, leaderStatus, initialConfig.DeepCopy(), ) require.NoError(t, underTest.Init(t.Context())) @@ -57,11 +57,11 @@ func TestClusterConfigInitializer_Create(t *testing.T) { func TestClusterConfigInitializer_NoConfig(t *testing.T) { clients := testutil.NewFakeClientFactory() - leaderElector := leaderelector.Dummy{Leader: false} + leaderStatus := func() leaderelection.Status { return leaderelection.StatusPending } initialConfig := k0sv1beta1.DefaultClusterConfig() underTest := controller.NewClusterConfigInitializer( - clients, &leaderElector, initialConfig.DeepCopy(), + clients, leaderStatus, initialConfig.DeepCopy(), ) ctx, cancel := context.WithCancelCause(t.Context()) @@ -85,16 +85,15 @@ func TestClusterConfigInitializer_NoConfig(t *testing.T) { } func TestClusterConfigInitializer_Exists(t *testing.T) { - test := func(t *testing.T, leader bool) *testutil.FakeClientFactory { + test := func(t *testing.T, leaderStatus leaderelection.Status) *testutil.FakeClientFactory { existingConfig := k0sv1beta1.DefaultClusterConfig() existingConfig.ResourceVersion = "42" clients := testutil.NewFakeClientFactory(existingConfig) - leaderElector := leaderelector.Dummy{Leader: leader} initialConfig := existingConfig.DeepCopy() initialConfig.ResourceVersion = "1337" underTest := controller.NewClusterConfigInitializer( - clients, &leaderElector, initialConfig, + clients, func() leaderelection.Status { return leaderStatus }, initialConfig, ) require.NoError(t, underTest.Init(t.Context())) diff --git a/pkg/component/controller/crd.go b/pkg/component/controller/crd.go index 87def326e40d..975312f70c81 100644 --- a/pkg/component/controller/crd.go +++ b/pkg/component/controller/crd.go @@ -4,25 +4,23 @@ package controller import ( + "bytes" "context" "fmt" "io/fs" + "iter" "path" - "path/filepath" - "github.com/k0sproject/k0s/internal/pkg/dir" - "github.com/k0sproject/k0s/internal/pkg/file" + "github.com/k0sproject/k0s/pkg/applier" "github.com/k0sproject/k0s/pkg/component/manager" - "github.com/k0sproject/k0s/pkg/constant" + "github.com/k0sproject/k0s/pkg/kubernetes" "github.com/k0sproject/k0s/static" ) -var _ manager.Component = (*CRD)(nil) - -// CRD unpacks bundled CRD definitions to the filesystem -type CRD struct { - bundle string - manifestsDir string +// CRDStack applies bundled CRDs. +type CRDStack struct { + clients kubernetes.ClientFactoryInterface + bundle string crdOpts } @@ -33,8 +31,10 @@ type crdOpts struct { type CRDOption func(*crdOpts) -// NewCRD build new CRD -func NewCRD(manifestsDir, bundle string, opts ...CRDOption) *CRD { +var _ manager.Component = (*CRDStack)(nil) + +// Creates a new CRD stack for the given bundle. +func NewCRDStack(clients kubernetes.ClientFactoryInterface, bundle string, opts ...CRDOption) *CRDStack { var options crdOpts for _, opt := range opts { opt(&options) @@ -45,10 +45,10 @@ func NewCRD(manifestsDir, bundle string, opts ...CRDOption) *CRD { options.assetsDir = bundle } - return &CRD{ - bundle: bundle, - manifestsDir: manifestsDir, - crdOpts: options, + return &CRDStack{ + clients: clients, + bundle: bundle, + crdOpts: options, } } @@ -60,36 +60,54 @@ func WithCRDAssetsDir(assetsDir string) CRDOption { return func(opts *crdOpts) { opts.assetsDir = assetsDir } } -func (c CRD) Init(context.Context) error { - return dir.Init(filepath.Join(c.manifestsDir, c.stackName), constant.ManifestsDirMode) -} +// Applies this CRD stack. Implements [manager.Component]. +func (c *CRDStack) Init(ctx context.Context) error { + var crds bytes.Buffer + for content, err := range eachCRD(c.assetsDir) { + if err != nil { + return err + } -// Run unpacks manifests from bindata -func (c CRD) Start(context.Context) error { - crds, err := fs.ReadDir(static.CRDs, c.assetsDir) - if err != nil { - return fmt.Errorf("can't unbundle CRD `%s` manifests: %w", c.bundle, err) + crds.WriteString("\n---\n") + crds.Write(content) } - for _, entry := range crds { - filename := entry.Name() - src := path.Join(c.assetsDir, filename) - dst := filepath.Join(c.manifestsDir, c.stackName, fmt.Sprintf("%s-crd-%s", c.bundle, filename)) - - content, err := fs.ReadFile(static.CRDs, src) - if err != nil { - return fmt.Errorf("failed to fetch CRD %s manifest %s: %w", c.bundle, filename, err) - } - if err := file.AtomicWithTarget(dst). - WithPermissions(constant.CertMode). - Write(content); err != nil { - return fmt.Errorf("failed to save CRD %s manifest %s to FS: %w", c.bundle, filename, err) - } + if err := applier.ApplyStack(ctx, c.clients, &crds, c.bundle, c.stackName); err != nil { + return fmt.Errorf("failed to apply %s CRD stack: %w", c.bundle, err) } return nil } -func (c CRD) Stop() error { +// Start implements [manager.Component]. It does nothing. +func (c *CRDStack) Start(context.Context) error { return nil } + +// Stop implements [manager.Component]. It does nothing. +func (*CRDStack) Stop() error { + return nil +} + +// Iterates over the contents of each CRD in the given asset directory. +func eachCRD(assetsDir string) iter.Seq2[[]byte, error] { + return func(yield func([]byte, error) bool) { + crdFiles, err := fs.ReadDir(static.CRDs, assetsDir) + if err != nil { + yield(nil, fmt.Errorf("failed to read %s CRD stack: %w", assetsDir, err)) + return + } + + for _, entry := range crdFiles { + filename := entry.Name() + content, err := fs.ReadFile(static.CRDs, path.Join(assetsDir, filename)) + if err != nil { + yield(nil, fmt.Errorf("failed to fetch %s CRD manifest %s: %w", assetsDir, filename, err)) + return + } + if !yield(content, nil) { + return + } + } + } +} diff --git a/pkg/component/controller/csrapprover_test.go b/pkg/component/controller/csrapprover_test.go index fe3c4ac73c83..b1adfcf83ba8 100644 --- a/pkg/component/controller/csrapprover_test.go +++ b/pkg/component/controller/csrapprover_test.go @@ -57,7 +57,7 @@ func TestBasicCRSApprover(t *testing.T) { }, }, } - c := NewCSRApprover(config, &leaderelector.Dummy{Leader: true}, fakeFactory) + c := NewCSRApprover(config, leaderelector.Off(), fakeFactory) assert.NoError(t, c.Init(ctx)) assert.NoError(t, c.approveCSR(ctx)) diff --git a/pkg/component/controller/etcd_member_reconciler.go b/pkg/component/controller/etcd_member_reconciler.go index 082c55278aac..e3eb4a963f9e 100644 --- a/pkg/component/controller/etcd_member_reconciler.go +++ b/pkg/component/controller/etcd_member_reconciler.go @@ -23,13 +23,14 @@ import ( "github.com/k0sproject/k0s/pkg/kubernetes/watch" "github.com/k0sproject/k0s/pkg/leaderelection" "github.com/sirupsen/logrus" - apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/wait" nodeutil "k8s.io/component-helpers/node/util" ) +const EtcdMemberStackName = "etcd-member" + var _ manager.Component = (*EtcdMemberReconciler)(nil) func NewEtcdMemberReconciler(kubeClientFactory kubeutil.ClientFactoryInterface, k0sVars *config.CfgVars, etcdConfig *v1beta1.EtcdConfig, leaderElector leaderelector.Interface) (*EtcdMemberReconciler, error) { @@ -106,15 +107,9 @@ func (e *EtcdMemberReconciler) Start(ctx context.Context) error { } func (e *EtcdMemberReconciler) reconcile(ctx context.Context, log logrus.FieldLogger, client etcdclient.EtcdMemberInterface) { - err := e.waitForCRD(ctx) - if err != nil { - log.WithError(err).Errorf("didn't see EtcdMember CRD ready in time") - return - } - // Create the object for this node // Need to be done in retry loop as during the initial startup the etcd might not be stable - err = retry.Do( + err := retry.Do( func() error { return e.createMemberObject(ctx, client) }, @@ -209,50 +204,6 @@ func (e *EtcdMemberReconciler) Stop() error { return nil } -func (e *EtcdMemberReconciler) waitForCRD(ctx context.Context) error { - client, err := e.clientFactory.GetAPIExtensionsClient() - if err != nil { - return err - } - var lastObservedVersion string - log := logrus.WithField("component", "etcdMemberReconciler") - log.Info("waiting to see EtcdMember CRD ready") - return watch.CRDs(client.ApiextensionsV1().CustomResourceDefinitions()). - WithObjectName(fmt.Sprintf("%s.%s", "etcdmembers", "etcd.k0sproject.io")). - WithErrorCallback(func(err error) (time.Duration, error) { - if retryAfter, e := watch.IsRetryable(err); e == nil { - log.WithError(err).Infof( - "Transient error while watching etcdmember CRD"+ - ", last observed version is %q"+ - ", starting over after %s ...", - lastObservedVersion, retryAfter, - ) - return retryAfter, nil - } - - retryAfter := 10 * time.Second - log.WithError(err).Errorf( - "Failed to watch for etcdmember CRD"+ - ", last observed version is %q"+ - ", starting over after %s ...", - lastObservedVersion, retryAfter, - ) - return retryAfter, nil - }). - Until(ctx, func(item *apiextensionsv1.CustomResourceDefinition) (bool, error) { - lastObservedVersion = item.ResourceVersion - for _, cond := range item.Status.Conditions { - if cond.Type == apiextensionsv1.Established { - log.Infof("EtcdMember CRD status: %s", cond.Status) - return cond.Status == apiextensionsv1.ConditionTrue, nil - } - } - - return false, nil - }) - -} - func (e *EtcdMemberReconciler) createMemberObject(ctx context.Context, client etcdclient.EtcdMemberInterface) error { log := logrus.WithFields(logrus.Fields{"component": "etcdMemberReconciler", "phase": "createMemberObject"}) ctx, cancel := context.WithTimeout(ctx, 10*time.Second) diff --git a/pkg/component/controller/extensions_controller.go b/pkg/component/controller/extensions_controller.go index 7a75498f6daa..bfd5844f8c6a 100644 --- a/pkg/component/controller/extensions_controller.go +++ b/pkg/component/controller/extensions_controller.go @@ -4,20 +4,21 @@ package controller import ( + "bytes" "context" "encoding/json" "errors" "fmt" - "io/fs" - "os" - "path/filepath" - "regexp" + "reflect" "slices" + "sync" + "text/template" "time" - "github.com/k0sproject/k0s/internal/pkg/templatewriter" + "github.com/k0sproject/k0s/internal/sync/value" helmv1beta1 "github.com/k0sproject/k0s/pkg/apis/helm/v1beta1" k0sv1beta1 "github.com/k0sproject/k0s/pkg/apis/k0s/v1beta1" + "github.com/k0sproject/k0s/pkg/applier" k0sscheme "github.com/k0sproject/k0s/pkg/client/clientset/scheme" "github.com/k0sproject/k0s/pkg/component/controller/leaderelector" "github.com/k0sproject/k0s/pkg/component/manager" @@ -28,12 +29,10 @@ import ( apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/types" - "k8s.io/client-go/tools/clientcmd" apiretry "k8s.io/client-go/util/retry" - "github.com/avast/retry-go" + "github.com/Masterminds/sprig" "github.com/bombsimon/logrusr/v4" "github.com/sirupsen/logrus" "helm.sh/helm/v3/pkg/release" @@ -49,14 +48,17 @@ import ( "sigs.k8s.io/controller-runtime/pkg/reconcile" ) -// Helm watch for Chart crd +const HelmExtensionStackName = "helm" + +// ExtensionsController reconciles Helm extension repositories and charts, and +// keeps the Helm stack up to date as long as leadership is held. type ExtensionsController struct { L *logrus.Entry helm *helm.Commands - kubeConfig string + clientFactory kubeutil.ClientFactoryInterface leaderElector leaderelector.Interface - manifestsDir string - stop context.CancelFunc + stop func() + helmConfig value.Latest[*k0sv1beta1.HelmExtensions] } var _ manager.Component = (*ExtensionsController)(nil) @@ -67,9 +69,8 @@ func NewExtensionsController(k0sVars *config.CfgVars, kubeClientFactory kubeutil return &ExtensionsController{ L: logrus.WithFields(logrus.Fields{"component": "extensions_controller"}), helm: helm.NewCommands(k0sVars), - kubeConfig: k0sVars.AdminKubeConfigPath, + clientFactory: kubeClientFactory, leaderElector: leaderElector, - manifestsDir: filepath.Join(k0sVars.ManifestsDir, "helm"), } } @@ -79,93 +80,123 @@ const ( // Run runs the extensions controller func (ec *ExtensionsController) Reconcile(ctx context.Context, clusterConfig *k0sv1beta1.ClusterConfig) error { - ec.L.Info("Extensions reconciliation started") - defer ec.L.Info("Extensions reconciliation finished") - return ec.reconcileHelmExtensions(clusterConfig.Spec.Extensions.Helm) + helmConfig := clusterConfig.Spec.Extensions.Helm.DeepCopy() + if helmConfig == nil { + helmConfig = new(k0sv1beta1.HelmExtensions) + } + ec.helmConfig.Set(helmConfig) + return nil } -// reconcileHelmExtensions creates instance of Chart CR for each chart of the config file -// it also reconciles repositories settings -// the actual helm install/update/delete management is done by ChartReconciler structure -func (ec *ExtensionsController) reconcileHelmExtensions(helmSpec *k0sv1beta1.HelmExtensions) error { - if helmSpec == nil { - return nil +func (ec *ExtensionsController) reconcileConfig(ctx context.Context, stackReconciledOnce chan<- struct{}) { + var ( + currentRepositories k0sv1beta1.RepositoriesSettings + currentCharts k0sv1beta1.ChartsSettings + ) + + for { + config, configChanged := ec.helmConfig.Peek() + leaderStatus, statusChanged := ec.leaderElector.CurrentStatus() + + var retry <-chan time.Time + if config != nil { + var fail bool + + if reflect.DeepEqual(currentRepositories, config.Repositories) { + ec.L.Debug("Helm repositories unchanged") + } else if ec.reconcileRepositories(config.Repositories) { + currentRepositories = config.Repositories + ec.L.Info("Reconciled Helm repositories") + } else { + fail = true + } + + if leaderStatus == leaderelection.StatusLeading { + if reflect.DeepEqual(currentCharts, config.Charts) { + ec.L.Debug("Helm charts unchanged") + } else { + ctx, cancel := context.WithCancelCause(ctx) + go func() { + select { + case <-statusChanged: + cancel(leaderelection.ErrLostLead) + case <-ctx.Done(): + } + }() + + if ec.reconcileStack(ctx, config.Charts) { + currentCharts = config.Charts + ec.L.Info("Reconciled ", HelmExtensionStackName, " stack") + if stackReconciledOnce != nil { + close(stackReconciledOnce) + stackReconciledOnce = nil + } + } else { + fail = true + } + } + } + + if fail { + retry = time.After(30 * time.Second) + } + } + + select { + case <-configChanged: + ec.L.Debug("Processing configuration change") + + case <-statusChanged: + ec.L.Debug("Processing leader change") + + case <-retry: + ec.L.Info("Retrying configuration reconciliation") + + case <-ctx.Done(): + return + } } +} - var errs []error - for _, repo := range helmSpec.Repositories { +func (ec *ExtensionsController) reconcileRepositories(repos k0sv1beta1.RepositoriesSettings) bool { + var fail bool + for _, repo := range repos { if err := ec.addRepo(repo); err != nil { - errs = append(errs, fmt.Errorf("can't init repository %q: %w", repo.URL, err)) + fail = true + ec.L.WithError(err).Error("Failed to reconcile Helm repository ", repo.URL) } } + return !fail +} - var fileNamesToKeep []string - for _, chart := range helmSpec.Charts { - fileName := chartManifestFileName(&chart) - fileNamesToKeep = append(fileNamesToKeep, fileName) +func (ec *ExtensionsController) reconcileStack(ctx context.Context, charts k0sv1beta1.ChartsSettings) bool { + var fail bool - path, err := ec.writeChartManifestFile(chart, fileName) + var stack bytes.Buffer + for content, err := range eachCRD(HelmExtensionStackName) { if err != nil { - errs = append(errs, fmt.Errorf("can't write file for Helm chart manifest %q: %w", chart.ChartName, err)) - continue + ec.L.WithError(err).Error("Failed to fetch Helm CRDs") + return false } - ec.L.Infof("Wrote Helm chart manifest file %q", path) + stack.WriteString("\n---\n") + stack.Write(content) } - if err := filepath.WalkDir(ec.manifestsDir, func(path string, entry fs.DirEntry, err error) error { - switch { - case !entry.Type().IsRegular(): - ec.L.Debugf("Keeping %v as it is not a regular file", entry) - case slices.Contains(fileNamesToKeep, entry.Name()): - ec.L.Debugf("Keeping %v as it belongs to a known Helm extension", entry) - case !isChartManifestFileName(entry.Name()): - ec.L.Debugf("Keeping %v as it is not a Helm chart manifest file", entry) - default: - if err := os.Remove(path); err != nil { - if !errors.Is(err, os.ErrNotExist) { - errs = append(errs, fmt.Errorf("failed to remove Helm chart manifest file, the Chart resource will remain in the cluster: %w", err)) - } - } else { - ec.L.Infof("Removed Helm chart manifest file %q", path) - } + chartTemplate := template.Must(template.New("addon_crd_manifest").Funcs(sprig.TxtFuncMap()).Parse(chartCrdTemplate)) + for _, chart := range charts { + if err := chartTemplate.Execute(&stack, chart); err != nil { + ec.L.WithError(err).Error("Failed to render Helm chart manifest ", chart.Name) + return false } - - return nil - }); err != nil { - errs = append(errs, fmt.Errorf("failed to walk Helm chart manifest directory: %w", err)) } - return errors.Join(errs...) -} - -func (ec *ExtensionsController) writeChartManifestFile(chart k0sv1beta1.Chart, fileName string) (string, error) { - tw := templatewriter.TemplateWriter{ - Path: filepath.Join(ec.manifestsDir, fileName), - Name: "addon_crd_manifest", - Template: chartCrdTemplate, - Data: struct { - k0sv1beta1.Chart - Finalizer string - }{ - Chart: chart, - Finalizer: finalizerName, - }, - } - if err := tw.Write(); err != nil { - return "", err + if err := applier.ApplyStack(ctx, ec.clientFactory, &stack, HelmExtensionStackName, HelmExtensionStackName); err != nil { + fail = true + ec.L.WithError(err).Error("Failed to apply ", HelmExtensionStackName, " stack") } - return tw.Path, nil -} - -// Determines the file name to use when storing a chart as a manifest on disk. -func chartManifestFileName(c *k0sv1beta1.Chart) string { - return fmt.Sprintf("%d_helm_extension_%s.yaml", c.Order, c.Name) -} -// Determines if the given file name is in the format for chart manifest file names. -func isChartManifestFileName(fileName string) bool { - return regexp.MustCompile(`^-?[0-9]+_helm_extension_.+\.yaml$`).MatchString(fileName) + return !fail } type ChartReconciler struct { @@ -354,13 +385,14 @@ func (ec *ExtensionsController) addRepo(repo k0sv1beta1.Repository) error { } const chartCrdTemplate = ` +--- apiVersion: helm.k0sproject.io/v1beta1 kind: Chart metadata: name: k0s-addon-chart-{{ .Name }} namespace: ` + metav1.NamespaceSystem + ` finalizers: - - {{ .Finalizer }} + - ` + finalizerName + ` spec: chartName: {{ .ChartName }} releaseName: {{ .Name }} @@ -382,46 +414,50 @@ func (ec *ExtensionsController) Init(_ context.Context) error { } // Start -func (ec *ExtensionsController) Start(ctx context.Context) error { +func (ec *ExtensionsController) Start(context.Context) error { ec.L.Debug("Starting") - mgr, err := ec.instantiateManager(ctx) + mgr, err := ec.instantiateManager() if err != nil { return fmt.Errorf("can't instantiate controller-runtime manager: %w", err) } + var wg sync.WaitGroup ctx, cancel := context.WithCancel(context.Background()) - done := make(chan struct{}) + stackReconciledOnce := make(chan struct{}) - go func() { - defer close(done) + wg.Go(func() { ec.reconcileConfig(ctx, stackReconciledOnce) }) + wg.Go(func() { leaderelection.RunLeaderTasks(ctx, ec.leaderElector.CurrentStatus, func(ctx context.Context) { + select { + case <-stackReconciledOnce: + case <-ctx.Done(): + return + } + ec.L.Info("Running controller-runtime manager") if err := mgr.Start(ctx); err != nil { ec.L.WithError(err).Error("Failed to run controller-runtime manager") + } else { + ec.L.Info("Controller-runtime manager exited") } - ec.L.Info("Controller-runtime manager exited") }) - }() + }) ec.stop = func() { cancel() - <-done + wg.Wait() } return nil } -func (ec *ExtensionsController) instantiateManager(ctx context.Context) (crman.Manager, error) { - clientConfig, err := clientcmd.BuildConfigFromFlags("", ec.kubeConfig) +func (ec *ExtensionsController) instantiateManager() (crman.Manager, error) { + clientConfig, err := ec.clientFactory.GetRESTConfig() if err != nil { return nil, fmt.Errorf("can't build controller-runtime controller for helm extensions: %w", err) } - gk := schema.GroupKind{ - Group: helmv1beta1.GroupName, - Kind: "Chart", - } mgr, err := controllerruntime.NewManager(clientConfig, crman.Options{ Scheme: k0sscheme.Scheme, @@ -434,17 +470,6 @@ func (ec *ExtensionsController) instantiateManager(ctx context.Context) (crman.M if err != nil { return nil, fmt.Errorf("can't build controller-runtime controller for helm extensions: %w", err) } - if err := retry.Do(func() error { - _, err := mgr.GetRESTMapper().RESTMapping(gk) - if err != nil { - ec.L.Warn("Extensions CRD is not yet ready, waiting before starting ExtensionsController") - return err - } - ec.L.Info("Extensions CRD is ready, going nuts") - return nil - }, retry.Context(ctx)); err != nil { - return nil, fmt.Errorf("can't start ExtensionsReconciler, helm CRD is not registered, check CRD registration reconciler: %w", err) - } if err := builder. ControllerManagedBy(mgr). diff --git a/pkg/component/controller/extensions_controller_test.go b/pkg/component/controller/extensions_controller_test.go index 6aa5668e4c0c..d6c54733c732 100644 --- a/pkg/component/controller/extensions_controller_test.go +++ b/pkg/component/controller/extensions_controller_test.go @@ -4,17 +4,27 @@ package controller import ( - "os" - "strings" + "sync" "testing" + "testing/synctest" "time" "github.com/k0sproject/k0s/pkg/apis/helm/v1beta1" k0sv1beta1 "github.com/k0sproject/k0s/pkg/apis/k0s/v1beta1" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" + "github.com/k0sproject/k0s/pkg/component/controller/leaderelector" + "github.com/k0sproject/k0s/pkg/config" + "sigs.k8s.io/yaml" + + apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime" "k8s.io/utils/ptr" + + "github.com/k0sproject/k0s/internal/testutil" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + clientgotesting "k8s.io/client-go/testing" ) func TestChartNeedsUpgrade(t *testing.T) { @@ -129,122 +139,139 @@ func TestChartNeedsUpgrade(t *testing.T) { } } -func TestChartManifestFileName(t *testing.T) { - chart := k0sv1beta1.Chart{ - Name: "release", - ChartName: "k0s/chart", - TargetNS: metav1.NamespaceDefault, - } - - chart1 := k0sv1beta1.Chart{ - Name: "release", - ChartName: "k0s/chart", - TargetNS: metav1.NamespaceDefault, - Order: 1, - } - - chart2 := k0sv1beta1.Chart{ - Name: "release", - ChartName: "k0s/chart", - TargetNS: metav1.NamespaceDefault, - Order: 2, - } - - assert.Equal(t, "0_helm_extension_release.yaml", chartManifestFileName(&chart)) - assert.Equal(t, "1_helm_extension_release.yaml", chartManifestFileName(&chart1)) - assert.Equal(t, "2_helm_extension_release.yaml", chartManifestFileName(&chart2)) - assert.True(t, isChartManifestFileName("0_helm_extension_release.yaml")) -} - -func TestExtensionsController_writeChartManifestFile(t *testing.T) { - type args struct { - chart k0sv1beta1.Chart - fileName string - } +func TestExtensionsController_reconcilesCharts(t *testing.T) { tests := []struct { - name string - args args - want string + name string + chart k0sv1beta1.Chart + want string }{ { name: "forceUpgrade is nil should omit from manifest", - args: args{ - chart: k0sv1beta1.Chart{ - Name: "release", - ChartName: "k0s/chart", - Version: "0.0.1", - Values: "values", - TargetNS: metav1.NamespaceDefault, - Timeout: k0sv1beta1.BackwardCompatibleDuration( - metav1.Duration{Duration: 5 * time.Minute}, - ), - }, - fileName: "0_helm_extension_release.yaml", + chart: k0sv1beta1.Chart{ + Name: "release", + ChartName: "k0s/chart", + Version: "0.0.1", + Values: "values", + TargetNS: metav1.NamespaceDefault, + Timeout: k0sv1beta1.BackwardCompatibleDuration( + metav1.Duration{Duration: 5 * time.Minute}, + ), }, want: `apiVersion: helm.k0sproject.io/v1beta1 kind: Chart metadata: + finalizers: + - helm.k0sproject.io/uninstall-helm-release + labels: + k0s.k0sproject.io/stack: helm name: k0s-addon-chart-release namespace: ` + metav1.NamespaceSystem + ` - finalizers: - - helm.k0sproject.io/uninstall-helm-release spec: chartName: k0s/chart + namespace: default releaseName: release timeout: 5m0s - values: | + values: |2 values version: 0.0.1 - namespace: default +status: {} `, }, { name: "forceUpgrade is false should be included in manifest", - args: args{ - chart: k0sv1beta1.Chart{ - Name: "release", - ChartName: "k0s/chart", - Version: "0.0.1", - Values: "values", - TargetNS: metav1.NamespaceDefault, - ForceUpgrade: ptr.To(false), - Timeout: k0sv1beta1.BackwardCompatibleDuration( - metav1.Duration{Duration: 5 * time.Minute}, - ), - }, - fileName: "0_helm_extension_release.yaml", + chart: k0sv1beta1.Chart{ + Name: "release", + ChartName: "k0s/chart", + Version: "0.0.1", + Values: "values", + TargetNS: metav1.NamespaceDefault, + ForceUpgrade: ptr.To(false), + Timeout: k0sv1beta1.BackwardCompatibleDuration( + metav1.Duration{Duration: 5 * time.Minute}, + ), }, want: `apiVersion: helm.k0sproject.io/v1beta1 kind: Chart metadata: + finalizers: + - helm.k0sproject.io/uninstall-helm-release + labels: + k0s.k0sproject.io/stack: helm name: k0s-addon-chart-release namespace: ` + metav1.NamespaceSystem + ` - finalizers: - - helm.k0sproject.io/uninstall-helm-release spec: chartName: k0s/chart + forceUpgrade: false + namespace: default releaseName: release timeout: 5m0s - values: | + values: |2 values version: 0.0.1 - namespace: default - forceUpgrade: false +status: {} `, }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - ec := &ExtensionsController{ - manifestsDir: t.TempDir(), - } - path, err := ec.writeChartManifestFile(tt.args.chart, tt.args.fileName) - require.NoError(t, err) - contents, err := os.ReadFile(path) + k0sVars, err := config.NewCfgVars(nil, t.TempDir()) require.NoError(t, err) - assert.Equal(t, strings.TrimSpace(tt.want), strings.TrimSpace(string(contents))) + + synctest.Test(t, func(t *testing.T) { + cf := testutil.NewFakeClientFactory() + + // Automatically mark all added CRDs as established. + cf.DynamicClient.PrependReactor("create", "customresourcedefinitions", func(action clientgotesting.Action) (handled bool, ret runtime.Object, err error) { + crd := action.(clientgotesting.CreateAction).GetObject().(*unstructured.Unstructured) + if err := unstructured.SetNestedSlice(crd.Object, []any{ + map[string]any{ + "type": string(apiextensionsv1.Established), + "status": string(apiextensionsv1.ConditionTrue), + }, + }, "status", "conditions"); !assert.NoError(t, err) { + return true, nil, err + } + + return false, nil, nil + }) + + underTest := NewExtensionsController(k0sVars, cf, leaderelector.Off()) + + // Spawn the config reconciliation goroutine. + ctx := t.Context() + reconciled := make(chan struct{}) + var wg sync.WaitGroup + t.Cleanup(wg.Wait) + wg.Go(func() { underTest.reconcileConfig(ctx, reconciled) }) + + // Reconcile the chart via the cluster config. + require.NoError(t, underTest.Reconcile(ctx, &k0sv1beta1.ClusterConfig{ + Spec: &k0sv1beta1.ClusterSpec{ + Extensions: &k0sv1beta1.ClusterExtensions{ + Helm: &k0sv1beta1.HelmExtensions{ + Charts: k0sv1beta1.ChartsSettings{tt.chart}, + }, + }, + }, + })) + + // Wait for the reconciliation goroutine to do its job. + <-reconciled + synctest.Wait() + + // Check that the chart has been written to the API server. + charts, err := cf.K0sClient.HelmV1beta1().Charts(metav1.NamespaceSystem).List(ctx, metav1.ListOptions{}) + require.NoError(t, err) + require.Len(t, charts.Items, 1) + + chart := &charts.Items[0] + chart.Annotations = nil // we don't assert on annotations + b, err := yaml.Marshal(chart) + require.NoError(t, err) + assert.Equal(t, tt.want, string(b)) + }) }) } } diff --git a/pkg/component/controller/leaderelector/dummy.go b/pkg/component/controller/leaderelector/dummy.go index b1814ba19b24..f76fc6960cfa 100644 --- a/pkg/component/controller/leaderelector/dummy.go +++ b/pkg/component/controller/leaderelector/dummy.go @@ -6,47 +6,45 @@ package leaderelector import ( "context" - "github.com/k0sproject/k0s/pkg/component/manager" "github.com/k0sproject/k0s/pkg/leaderelection" ) -type Dummy struct { - Leader bool +// AlwaysLeading is a dummy leader elector that reports itself as the leader +// forever. Used single-node setups where leader election is unnecessary. +type AlwaysLeading struct { + never <-chan struct{} callbacks []func() } -var _ Interface = (*Dummy)(nil) -var _ manager.Component = (*Dummy)(nil) +// Off returns an always-leading leader elector. +func Off() *AlwaysLeading { + return &AlwaysLeading{make(<-chan struct{}), nil} +} -func (l *Dummy) Init(_ context.Context) error { return nil } -func (l *Dummy) Stop() error { return nil } -func (l *Dummy) IsLeader() bool { return l.Leader } +func (*AlwaysLeading) Init(context.Context) error { + return nil +} -func (l *Dummy) AddAcquiredLeaseCallback(fn func()) { +func (l *AlwaysLeading) AddAcquiredLeaseCallback(fn func()) { l.callbacks = append(l.callbacks, fn) } -var never = make(<-chan struct{}) +func (*AlwaysLeading) AddLostLeaseCallback(func()) { +} -func (l *Dummy) CurrentStatus() (leaderelection.Status, <-chan struct{}) { - var status leaderelection.Status - if l.Leader { - status = leaderelection.StatusLeading +func (l *AlwaysLeading) Start(context.Context) error { + for _, fn := range l.callbacks { + fn() } - - return status, never + return nil } -func (l *Dummy) AddLostLeaseCallback(func()) {} +func (*AlwaysLeading) IsLeader() bool { return true } -func (l *Dummy) Start(_ context.Context) error { - if !l.Leader { - return nil - } - for _, fn := range l.callbacks { - if fn != nil { - fn() - } - } +func (l *AlwaysLeading) CurrentStatus() (leaderelection.Status, <-chan struct{}) { + return leaderelection.StatusLeading, l.never +} + +func (*AlwaysLeading) Stop() error { return nil } diff --git a/pkg/component/controller/systemrbac.go b/pkg/component/controller/systemrbac.go index 9ed22b4b28f0..b832241a632e 100644 --- a/pkg/component/controller/systemrbac.go +++ b/pkg/component/controller/systemrbac.go @@ -5,7 +5,6 @@ package controller import ( "bytes" - "cmp" "context" _ "embed" "fmt" @@ -13,14 +12,7 @@ import ( "github.com/k0sproject/k0s/pkg/applier" "github.com/k0sproject/k0s/pkg/component/manager" - "github.com/k0sproject/k0s/pkg/constant" "github.com/k0sproject/k0s/pkg/kubernetes" - - "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" - "k8s.io/cli-runtime/pkg/resource" - - "github.com/avast/retry-go" - "github.com/sirupsen/logrus" ) const SystemRBACStackName = "bootstraprbac" @@ -40,43 +32,8 @@ func (s *SystemRBAC) Init(ctx context.Context) error { in = io.MultiReader(in, bytes.NewReader(apSystemRBAC)) } - infos, err := resource.NewLocalBuilder(). - Unstructured(). - Stream(in, SystemRBACStackName). - Flatten(). - Do(). - Infos() - if err != nil { - return err - } - - resources := make([]*unstructured.Unstructured, len(infos)) - for i := range infos { - resources[i] = infos[i].Object.(*unstructured.Unstructured) - } - - var lastErr error - if err := retry.Do( - func() error { - stack := applier.Stack{ - Name: SystemRBACStackName, - Resources: resources, - Clients: s.Clients, - } - lastErr := stack.Apply(ctx, true) - return lastErr - }, - retry.Context(ctx), - retry.LastErrorOnly(true), - retry.OnRetry(func(attempt uint, err error) { - logrus.WithFields(logrus.Fields{ - "component": constant.SystemRBACComponentName, - "stack": SystemRBACStackName, - "attempt": attempt + 1, - }).WithError(err).Debug("Failed to apply stack, retrying after backoff") - }), - ); err != nil { - return fmt.Errorf("failed to apply system RBAC stack: %w", cmp.Or(lastErr, err)) + if err := applier.ApplyStack(ctx, s.Clients, in, SystemRBACStackName, SystemRBACStackName); err != nil { + return fmt.Errorf("failed to apply system RBAC stack: %w", err) } return nil diff --git a/pkg/component/controller/workerconfig/reconciler_test.go b/pkg/component/controller/workerconfig/reconciler_test.go index 76bc973012e5..e45c6c0f2e42 100644 --- a/pkg/component/controller/workerconfig/reconciler_test.go +++ b/pkg/component/controller/workerconfig/reconciler_test.go @@ -57,7 +57,7 @@ func TestReconciler_Lifecycle(t *testing.T) { }, }, clients, - &leaderelector.Dummy{Leader: true}, + leaderelector.Off(), true, ) require.NoError(t, err) @@ -312,7 +312,7 @@ func TestReconciler_ResourceGeneration(t *testing.T) { }, }, clients, - &leaderelector.Dummy{Leader: true}, + leaderelector.Off(), true, ) require.NoError(t, err) @@ -495,7 +495,7 @@ func TestReconciler_ReconcilesOnChangesOnly(t *testing.T) { }, }, clients, - &leaderelector.Dummy{Leader: true}, + leaderelector.Off(), true, ) require.NoError(t, err) @@ -580,7 +580,7 @@ func TestReconciler_ReconcilesOnChangesOnly(t *testing.T) { func TestReconciler_runReconcileLoop(t *testing.T) { underTest := Reconciler{ log: newTestLogger(t), - leaderElector: &leaderelector.Dummy{Leader: true}, + leaderElector: leaderelector.Off(), } ctx, cancel := context.WithTimeout(t.Context(), 5*time.Second)