diff --git a/pkg/provider/aws/config.go b/pkg/provider/aws/config.go index b340a76..2ef50f9 100644 --- a/pkg/provider/aws/config.go +++ b/pkg/provider/aws/config.go @@ -1,5 +1,12 @@ package aws +import "github.com/nebari-dev/nebari-infrastructure-core/pkg/storage/longhorn" + +// LonghornConfig is kept as a package-local alias so existing yaml under +// `longhorn:` still unmarshals into the same shape; the underlying type now +// lives in pkg/storage/longhorn so non-AWS providers can share install logic. +type LonghornConfig = longhorn.Config + type Config struct { Region string `yaml:"region"` StateBucket string `yaml:"state_bucket,omitempty"` @@ -19,7 +26,7 @@ type Config struct { NodeGroups map[string]NodeGroup `yaml:"node_groups"` Tags map[string]string `yaml:"tags,omitempty"` EFS *EFSConfig `yaml:"efs,omitempty"` - Longhorn *LonghornConfig `yaml:"longhorn,omitempty"` + Longhorn *longhorn.Config `yaml:"longhorn,omitempty"` } type NodeGroup struct { @@ -41,25 +48,20 @@ type Taint struct { } // LonghornEnabled returns whether Longhorn distributed block storage should -// be deployed on this AWS cluster. Defaults to true when the Longhorn config -// is nil or Enabled is not set. +// be deployed on this AWS cluster. Defaults to true when the Longhorn block +// is omitted entirely — Longhorn is the AWS storage default. The shared +// longhorn.Config defaults to disabled-when-nil because non-AWS providers +// require an explicit opt-in. func (c *Config) LonghornEnabled() bool { if c.Longhorn == nil { return true } - if c.Longhorn.Enabled == nil { - return true - } - return *c.Longhorn.Enabled + return c.Longhorn.IsEnabled() } // LonghornReplicaCount returns the number of Longhorn volume replicas. -// Defaults to 2 when not set. func (c *Config) LonghornReplicaCount() int { - if c.Longhorn == nil || c.Longhorn.ReplicaCount == 0 { - return 2 - } - return c.Longhorn.ReplicaCount + return c.Longhorn.Replicas() } type EFSConfig struct { @@ -83,10 +85,3 @@ func (c *Config) EFSStorageClassName() string { } return c.EFS.StorageClassName } - -type LonghornConfig struct { - Enabled *bool `yaml:"enabled,omitempty"` - ReplicaCount int `yaml:"replica_count,omitempty"` - DedicatedNodes bool `yaml:"dedicated_nodes,omitempty"` - NodeSelector map[string]string `yaml:"node_selector,omitempty"` -} diff --git a/pkg/provider/aws/longhorn.go b/pkg/provider/aws/longhorn.go index e4eac4c..986fd4f 100644 --- a/pkg/provider/aws/longhorn.go +++ b/pkg/provider/aws/longhorn.go @@ -1,415 +1,6 @@ package aws -import ( - "context" - "fmt" - "strings" - "time" - - "go.opentelemetry.io/otel" - "go.opentelemetry.io/otel/attribute" - "helm.sh/helm/v3/pkg/action" - "helm.sh/helm/v3/pkg/chart" - "helm.sh/helm/v3/pkg/chart/loader" - "helm.sh/helm/v3/pkg/cli" - appsv1 "k8s.io/api/apps/v1" - corev1 "k8s.io/api/core/v1" - k8serrors "k8s.io/apimachinery/pkg/api/errors" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/util/yaml" - "k8s.io/client-go/kubernetes" - - "github.com/nebari-dev/nebari-infrastructure-core/pkg/helm" - "github.com/nebari-dev/nebari-infrastructure-core/pkg/status" -) - -const ( - longhornRepoName = "longhorn" - longhornRepoURL = "https://charts.longhorn.io" - longhornChartName = "longhorn/longhorn" - longhornChartVersion = "1.8.1" - longhornNamespace = "longhorn-system" - longhornReleaseName = "longhorn" - longhornTimeout = 10 * time.Minute - - iscsiDaemonSetTimeout = 3 * time.Minute - - // iscsiDaemonSetYAML is the Longhorn iSCSI prerequisite DaemonSet. - // Source: https://github.com/longhorn/longhorn/blob/v1.8.1/deploy/prerequisite/longhorn-iscsi-installation.yaml - // Embedded to avoid runtime HTTP fetches and support air-gapped installs. - iscsiDaemonSetYAML = `apiVersion: apps/v1 -kind: DaemonSet -metadata: - name: longhorn-iscsi-installation - namespace: longhorn-system - labels: - app: longhorn-iscsi-installation -spec: - selector: - matchLabels: - app: longhorn-iscsi-installation - template: - metadata: - labels: - app: longhorn-iscsi-installation - spec: - hostNetwork: true - hostPID: true - initContainers: - - name: iscsi-installation - command: - - nsenter - - --mount=/proc/1/ns/mnt - - -- - - bash - - -c - - | - OS=$(grep -E "^ID_LIKE=" /etc/os-release | cut -d '=' -f 2) - if [[ -z "${OS}" ]]; then - OS=$(grep -E "^ID=" /etc/os-release | cut -d '=' -f 2) - fi - if [[ "${OS}" == *"debian"* ]]; then - sudo apt-get update -q -y && sudo apt-get install -q -y open-iscsi && sudo systemctl -q enable iscsid && sudo systemctl start iscsid && sudo modprobe iscsi_tcp - elif [[ "${OS}" == *"suse"* ]]; then - sudo zypper --gpg-auto-import-keys -q refresh && sudo zypper --gpg-auto-import-keys -q install -y open-iscsi && sudo systemctl -q enable iscsid && sudo systemctl start iscsid && sudo modprobe iscsi_tcp - else - sudo yum makecache -q -y && sudo yum --setopt=tsflags=noscripts install -q -y iscsi-initiator-utils && echo "InitiatorName=$(/sbin/iscsi-iname)" > /etc/iscsi/initiatorname.iscsi && sudo systemctl -q enable iscsid && sudo systemctl start iscsid && sudo modprobe iscsi_tcp - fi - if [ $? -eq 0 ]; then echo "iscsi install successfully"; else echo "iscsi install failed error code $?"; fi - image: alpine:3.17 - securityContext: - privileged: true - containers: - - name: sleep - image: registry.k8s.io/pause:3.1 - updateStrategy: - type: RollingUpdate -` -) - -// ensureISCSI deploys the Longhorn iSCSI prerequisite DaemonSet and waits -// for all pods to become ready. This is required on Amazon Linux 2023 nodes -// which do not ship with open-iscsi/iscsi-initiator-utils. -func ensureISCSI(ctx context.Context, kubeconfigBytes []byte) error { - tracer := otel.Tracer("nebari-infrastructure-core") - ctx, span := tracer.Start(ctx, "aws.ensureISCSI") - defer span.End() - - status.Send(ctx, status.NewUpdate(status.LevelProgress, "Installing iSCSI prerequisites on cluster nodes"). - WithResource("iscsi-daemonset"). - WithAction("installing")) - - client, err := newK8sClient(kubeconfigBytes) - if err != nil { - span.RecordError(err) - return fmt.Errorf("failed to create Kubernetes client: %w", err) - } - - return ensureISCSIWithClient(ctx, client) -} - -// ensureISCSIWithClient performs the iSCSI DaemonSet deployment using the -// provided Kubernetes client interface. Separated from ensureISCSI to allow -// testing with fake clients. -func ensureISCSIWithClient(ctx context.Context, client kubernetes.Interface) error { - tracer := otel.Tracer("nebari-infrastructure-core") - ctx, span := tracer.Start(ctx, "aws.ensureISCSIWithClient") - defer span.End() - - // Create the longhorn-system namespace if it doesn't exist - if err := ensureNamespace(ctx, client, longhornNamespace); err != nil { - span.RecordError(err) - return fmt.Errorf("failed to ensure namespace %s: %w", longhornNamespace, err) - } - - // Parse the embedded DaemonSet YAML - var ds appsv1.DaemonSet - if err := yaml.NewYAMLOrJSONDecoder( - strings.NewReader(iscsiDaemonSetYAML), 4096, - ).Decode(&ds); err != nil { - span.RecordError(err) - return fmt.Errorf("failed to parse iSCSI DaemonSet YAML: %w", err) - } - - // Create or update the DaemonSet for idempotency - existing, err := client.AppsV1().DaemonSets(longhornNamespace).Get(ctx, ds.Name, metav1.GetOptions{}) - switch { - case k8serrors.IsNotFound(err): - status.Send(ctx, status.NewUpdate(status.LevelProgress, "Creating iSCSI prerequisite DaemonSet"). - WithResource("iscsi-daemonset"). - WithAction("creating")) - if _, err := client.AppsV1().DaemonSets(longhornNamespace).Create(ctx, &ds, metav1.CreateOptions{}); err != nil { - span.RecordError(err) - return fmt.Errorf("failed to create iSCSI DaemonSet: %w", err) - } - case err != nil: - span.RecordError(err) - return fmt.Errorf("failed to get iSCSI DaemonSet: %w", err) - default: - status.Send(ctx, status.NewUpdate(status.LevelInfo, "Updating existing iSCSI prerequisite DaemonSet"). - WithResource("iscsi-daemonset"). - WithAction("updating")) - ds.ResourceVersion = existing.ResourceVersion - if _, err := client.AppsV1().DaemonSets(longhornNamespace).Update(ctx, &ds, metav1.UpdateOptions{}); err != nil { - span.RecordError(err) - return fmt.Errorf("failed to update iSCSI DaemonSet: %w", err) - } - } - - // Poll until all DaemonSet pods are ready - if err := waitForDaemonSetReady(ctx, client, longhornNamespace, ds.Name, iscsiDaemonSetTimeout); err != nil { - span.RecordError(err) - return fmt.Errorf("iSCSI DaemonSet not ready: %w", err) - } - - status.Send(ctx, status.NewUpdate(status.LevelSuccess, "iSCSI prerequisites installed on all nodes"). - WithResource("iscsi-daemonset"). - WithAction("ready")) - - return nil -} - -// ensureNamespace creates a namespace if it doesn't already exist. -func ensureNamespace(ctx context.Context, client kubernetes.Interface, namespace string) error { - _, err := client.CoreV1().Namespaces().Get(ctx, namespace, metav1.GetOptions{}) - if k8serrors.IsNotFound(err) { - ns := &corev1.Namespace{ - ObjectMeta: metav1.ObjectMeta{Name: namespace}, - } - _, err = client.CoreV1().Namespaces().Create(ctx, ns, metav1.CreateOptions{}) - if err != nil { - return fmt.Errorf("failed to create namespace %s: %w", namespace, err) - } - return nil - } - return err -} - -// waitForDaemonSetReady polls until the DaemonSet has all desired pods ready. -func waitForDaemonSetReady(ctx context.Context, client kubernetes.Interface, namespace, name string, timeout time.Duration) error { - ctx, cancel := context.WithTimeout(ctx, timeout) - defer cancel() - - checkReady := func() (bool, error) { - ds, err := client.AppsV1().DaemonSets(namespace).Get(ctx, name, metav1.GetOptions{}) - if err != nil { - return false, err - } - return ds.Status.DesiredNumberScheduled > 0 && - ds.Status.DesiredNumberScheduled == ds.Status.NumberReady, nil - } - - // Immediate check - if ready, err := checkReady(); err == nil && ready { - return nil - } - - ticker := time.NewTicker(5 * time.Second) - defer ticker.Stop() - - for { - select { - case <-ctx.Done(): - return fmt.Errorf("timeout waiting for DaemonSet %s/%s: %w", namespace, name, ctx.Err()) - case <-ticker.C: - ready, err := checkReady() - if err != nil { - continue - } - if ready { - return nil - } - } - } -} - -// longhornHelmValues generates the Helm values map for the Longhorn chart -// based on the AWS provider configuration. -func longhornHelmValues(cfg *Config) map[string]any { - replicaCount := cfg.LonghornReplicaCount() - - persistence := map[string]any{ - "defaultClass": true, - "defaultClassReplicaCount": replicaCount, - "defaultFsType": "ext4", - } - - settings := map[string]any{ - "replicaZoneSoftAntiAffinity": "true", - "replicaAutoBalance": "best-effort", - } - - values := map[string]any{ - "persistence": persistence, - "defaultSettings": settings, - } - - if cfg.Longhorn != nil && cfg.Longhorn.DedicatedNodes { - settings["createDefaultDiskLabeledNodes"] = true - - nodeSelector := map[string]string{"node.longhorn.io/storage": "true"} - if cfg.Longhorn.NodeSelector != nil { - nodeSelector = cfg.Longhorn.NodeSelector - } - - tolerations := []map[string]string{ - { - "key": "node.longhorn.io/storage", - "operator": "Exists", - "effect": "NoSchedule", - }, - } - - values["longhornManager"] = map[string]any{ - "nodeSelector": nodeSelector, - "tolerations": tolerations, - } - values["longhornDriver"] = map[string]any{ - "nodeSelector": nodeSelector, - "tolerations": tolerations, - } - } - - return values -} - -// loadLonghornChart locates and loads the Longhorn Helm chart. -// This is extracted to avoid duplication between install and upgrade operations. -func loadLonghornChart(chartPathOptions action.ChartPathOptions) (*chart.Chart, error) { - chartPath, err := chartPathOptions.LocateChart(longhornChartName, cli.New()) - if err != nil { - return nil, fmt.Errorf("failed to locate Longhorn chart: %w", err) - } - - loadedChart, err := loader.Load(chartPath) - if err != nil { - return nil, fmt.Errorf("failed to load Longhorn chart: %w", err) - } - - return loadedChart, nil -} - -// installLonghorn installs or upgrades Longhorn on the cluster via Helm. -func installLonghorn(ctx context.Context, kubeconfigBytes []byte, cfg *Config) error { - tracer := otel.Tracer("nebari-infrastructure-core") - _, span := tracer.Start(ctx, "aws.installLonghorn") - defer span.End() - - span.SetAttributes( - attribute.String("chart_version", longhornChartVersion), - attribute.Int("replica_count", cfg.LonghornReplicaCount()), - attribute.Bool("dedicated_nodes", cfg.Longhorn != nil && cfg.Longhorn.DedicatedNodes), - ) - - // Install iSCSI prerequisites on all nodes before Longhorn - if err := ensureISCSI(ctx, kubeconfigBytes); err != nil { - span.RecordError(err) - return fmt.Errorf("failed to install iSCSI prerequisites: %w", err) - } - - kubeconfigPath, cleanup, err := helm.WriteTempKubeconfig(kubeconfigBytes) - if err != nil { - span.RecordError(err) - return err - } - defer cleanup() - - if err := helm.AddRepo(ctx, longhornRepoName, longhornRepoURL); err != nil { - span.RecordError(err) - return fmt.Errorf("failed to add Longhorn Helm repository: %w", err) - } - - actionConfig, err := helm.NewActionConfig(kubeconfigPath, longhornNamespace) - if err != nil { - span.RecordError(err) - return fmt.Errorf("failed to create Helm action config: %w", err) - } - - // Check if release already exists (idempotency) - histClient := action.NewHistory(actionConfig) - histClient.Max = 1 - if _, err := histClient.Run(longhornReleaseName); err == nil { - status.Send(ctx, status.NewUpdate(status.LevelInfo, "Longhorn already installed, upgrading"). - WithResource("longhorn"). - WithAction("upgrading")) - return upgradeLonghorn(ctx, actionConfig, cfg) - } - - helmValues := longhornHelmValues(cfg) - - status.Send(ctx, status.NewUpdate(status.LevelProgress, "Installing Longhorn storage"). - WithResource("longhorn"). - WithAction("installing"). - WithMetadata("chart_version", longhornChartVersion)) - - client := action.NewInstall(actionConfig) - client.Namespace = longhornNamespace - client.ReleaseName = longhornReleaseName - client.CreateNamespace = true - client.Wait = true - client.Timeout = longhornTimeout - client.Version = longhornChartVersion - - loadedChart, err := loadLonghornChart(client.ChartPathOptions) - if err != nil { - span.RecordError(err) - return err - } - - release, err := client.Run(loadedChart, helmValues) - if err != nil { - span.RecordError(err) - return fmt.Errorf("failed to install Longhorn: %w", err) - } - - span.SetAttributes( - attribute.String("release_status", string(release.Info.Status)), - attribute.Int("release_version", release.Version), - ) - - status.Send(ctx, status.NewUpdate(status.LevelSuccess, "Longhorn storage installed"). - WithResource("longhorn"). - WithAction("installed"). - WithMetadata("chart_version", longhornChartVersion)) - - return nil -} - -// upgradeLonghorn upgrades an existing Longhorn installation. -func upgradeLonghorn(ctx context.Context, actionConfig *action.Configuration, cfg *Config) error { - tracer := otel.Tracer("nebari-infrastructure-core") - _, span := tracer.Start(ctx, "aws.upgradeLonghorn") - defer span.End() - - helmValues := longhornHelmValues(cfg) - - client := action.NewUpgrade(actionConfig) - client.Namespace = longhornNamespace - client.Wait = true - client.Timeout = longhornTimeout - client.Version = longhornChartVersion - - loadedChart, err := loadLonghornChart(client.ChartPathOptions) - if err != nil { - span.RecordError(err) - return err - } - - release, err := client.Run(longhornReleaseName, loadedChart, helmValues) - if err != nil { - span.RecordError(err) - return fmt.Errorf("failed to upgrade Longhorn: %w", err) - } - - span.SetAttributes( - attribute.String("release_status", string(release.Info.Status)), - attribute.Int("release_version", release.Version), - ) - - status.Send(ctx, status.NewUpdate(status.LevelSuccess, "Longhorn storage upgraded"). - WithResource("longhorn"). - WithAction("upgraded"). - WithMetadata("chart_version", longhornChartVersion)) - - return nil -} +// Longhorn install logic moved to pkg/storage/longhorn so the existing-cluster +// provider (and any future on-prem provider) can install Longhorn the same +// way the AWS provider does. AWS-specific behaviour lives in this package as +// LonghornEnabled / LonghornReplicaCount on the AWS Config (see config.go). diff --git a/pkg/provider/aws/longhorn_test.go b/pkg/provider/aws/longhorn_test.go index b4c1288..379e022 100644 --- a/pkg/provider/aws/longhorn_test.go +++ b/pkg/provider/aws/longhorn_test.go @@ -1,422 +1,30 @@ package aws -import ( - "context" - "testing" - "time" +import "testing" - appsv1 "k8s.io/api/apps/v1" - corev1 "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/runtime" - "k8s.io/client-go/kubernetes" - "k8s.io/client-go/kubernetes/fake" -) - -func TestLonghornHelmValues(t *testing.T) { - tests := []struct { - name string - config *Config - checkValues map[string]any // nested key checks - }{ - { - name: "default config produces base values", - config: &Config{}, - checkValues: map[string]any{ - "persistence.defaultClassReplicaCount": 2, - "persistence.defaultFsType": "ext4", - "persistence.defaultClass": true, - "defaultSettings.replicaZoneSoftAntiAffinity": "true", - "defaultSettings.replicaAutoBalance": "best-effort", - }, - }, - { - name: "custom replica count", - config: &Config{ - Longhorn: &LonghornConfig{ReplicaCount: 3}, - }, - checkValues: map[string]any{ - "persistence.defaultClassReplicaCount": 3, - }, - }, - { - name: "dedicated nodes adds nodeSelector and tolerations", - config: &Config{ - Longhorn: &LonghornConfig{ - DedicatedNodes: true, - NodeSelector: map[string]string{"node.longhorn.io/storage": "true"}, - }, - }, - checkValues: map[string]any{ - "defaultSettings.createDefaultDiskLabeledNodes": true, - }, - }, - { - name: "dedicated nodes without custom nodeSelector uses default", - config: &Config{ - Longhorn: &LonghornConfig{ - DedicatedNodes: true, - }, - }, - checkValues: map[string]any{ - "defaultSettings.createDefaultDiskLabeledNodes": true, - }, - }, - { - name: "non-dedicated nodes omits nodeSelector and tolerations", - config: &Config{ - Longhorn: &LonghornConfig{ - DedicatedNodes: false, - ReplicaCount: 2, - }, - }, - checkValues: map[string]any{ - "persistence.defaultClassReplicaCount": 2, - }, - }, - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - values := longhornHelmValues(tt.config) - - for key, want := range tt.checkValues { - got := getNestedValue(values, key) - if got == nil { - t.Errorf("key %q not found in values", key) - continue - } - if got != want { - t.Errorf("values[%q] = %v (%T), want %v (%T)", key, got, got, want, want) - } - } - }) - } -} - -func TestLonghornHelmValuesDedicatedNodesStructure(t *testing.T) { - cfg := &Config{ - Longhorn: &LonghornConfig{ - DedicatedNodes: true, - NodeSelector: map[string]string{"node.longhorn.io/storage": "true"}, - }, - } - - values := longhornHelmValues(cfg) - - // Check longhornManager has nodeSelector - manager, ok := values["longhornManager"].(map[string]any) - if !ok { - t.Fatal("longhornManager not found or not a map") - } - ns, ok := manager["nodeSelector"].(map[string]string) - if !ok { - t.Fatal("longhornManager.nodeSelector not found or not a map[string]string") - } - if ns["node.longhorn.io/storage"] != "true" { - t.Errorf("longhornManager.nodeSelector[node.longhorn.io/storage] = %q, want %q", ns["node.longhorn.io/storage"], "true") - } - - // Check longhornManager has tolerations - tolerations, ok := manager["tolerations"].([]map[string]string) - if !ok { - t.Fatal("longhornManager.tolerations not found or not a []map[string]string") - } - if len(tolerations) != 1 { - t.Fatalf("longhornManager.tolerations length = %d, want 1", len(tolerations)) - } - if tolerations[0]["key"] != "node.longhorn.io/storage" { - t.Errorf("toleration key = %q, want %q", tolerations[0]["key"], "node.longhorn.io/storage") - } - if tolerations[0]["operator"] != "Exists" { - t.Errorf("toleration operator = %q, want %q", tolerations[0]["operator"], "Exists") - } - if tolerations[0]["effect"] != "NoSchedule" { - t.Errorf("toleration effect = %q, want %q", tolerations[0]["effect"], "NoSchedule") - } - - // Check longhornDriver has the same structure - driver, ok := values["longhornDriver"].(map[string]any) - if !ok { - t.Fatal("longhornDriver not found or not a map") - } - _, ok = driver["nodeSelector"].(map[string]string) - if !ok { - t.Fatal("longhornDriver.nodeSelector not found or not a map[string]string") - } - _, ok = driver["tolerations"].([]map[string]string) - if !ok { - t.Fatal("longhornDriver.tolerations not found or not a []map[string]string") - } -} - -func TestLonghornHelmValuesNonDedicatedOmitsNodeSelector(t *testing.T) { - cfg := &Config{ - Longhorn: &LonghornConfig{ - DedicatedNodes: false, - }, - } - - values := longhornHelmValues(cfg) - - if _, ok := values["longhornManager"]; ok { - t.Error("longhornManager should not be set when DedicatedNodes is false") - } - if _, ok := values["longhornDriver"]; ok { - t.Error("longhornDriver should not be set when DedicatedNodes is false") - } -} - -func TestEnsureNamespace(t *testing.T) { - tests := []struct { - name string - namespace string - existing []runtime.Object - }{ - { - name: "creates namespace when it does not exist", - namespace: longhornNamespace, - existing: nil, - }, - { - name: "succeeds when namespace already exists", - namespace: longhornNamespace, - existing: []runtime.Object{ - &corev1.Namespace{ - ObjectMeta: metav1.ObjectMeta{Name: longhornNamespace}, - }, - }, - }, - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - client := fake.NewSimpleClientset(tt.existing...) //nolint:staticcheck - - err := ensureNamespace(context.Background(), client, tt.namespace) - if err != nil { - t.Fatalf("ensureNamespace() error = %v", err) - } - - ns, err := client.CoreV1().Namespaces().Get(context.Background(), tt.namespace, metav1.GetOptions{}) - if err != nil { - t.Fatalf("namespace %s not found after ensureNamespace: %v", tt.namespace, err) - } - if ns.Name != tt.namespace { - t.Errorf("namespace name = %q, want %q", ns.Name, tt.namespace) - } - }) - } -} - -func TestEnsureISCSIWithClient(t *testing.T) { - tests := []struct { - name string - existing []runtime.Object - wantErr bool - checkDSExists bool - checkNSExists bool - }{ - { - name: "creates namespace and DaemonSet when neither exists", - existing: nil, - wantErr: true, // DaemonSet status won't update with fake client - checkDSExists: true, - checkNSExists: true, - }, - { - name: "creates DaemonSet when namespace already exists", - existing: []runtime.Object{ - &corev1.Namespace{ - ObjectMeta: metav1.ObjectMeta{Name: longhornNamespace}, - }, - }, - wantErr: true, // DaemonSet status won't update with fake client - checkDSExists: true, - checkNSExists: true, - }, - { - name: "updates DaemonSet when it already exists", - existing: []runtime.Object{ - &corev1.Namespace{ - ObjectMeta: metav1.ObjectMeta{Name: longhornNamespace}, - }, - &appsv1.DaemonSet{ - ObjectMeta: metav1.ObjectMeta{ - Name: "longhorn-iscsi-installation", - Namespace: longhornNamespace, - }, - Spec: appsv1.DaemonSetSpec{ - Selector: &metav1.LabelSelector{ - MatchLabels: map[string]string{"app": "longhorn-iscsi-installation"}, - }, - Template: corev1.PodTemplateSpec{ - ObjectMeta: metav1.ObjectMeta{ - Labels: map[string]string{"app": "longhorn-iscsi-installation"}, - }, - Spec: corev1.PodSpec{ - Containers: []corev1.Container{ - {Name: "sleep", Image: "registry.k8s.io/pause:3.1"}, - }, - }, - }, - }, - }, - }, - // The fake client doesn't simulate the DaemonSet controller, - // so status won't be updated after the Update call. Readiness - // polling is tested separately in TestWaitForDaemonSetReady. - wantErr: true, - checkDSExists: true, - checkNSExists: true, - }, - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - client := fake.NewSimpleClientset(tt.existing...) //nolint:staticcheck - - // Use a short context timeout so tests that expect a timeout - // don't block for the full iscsiDaemonSetTimeout (3 min). - ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) - defer cancel() - - err := ensureISCSIWithClient(ctx, client) - if tt.wantErr { - if err == nil { - t.Fatal("expected error (timeout), got nil") - } - } else { - if err != nil { - t.Fatalf("unexpected error: %v", err) - } - } - - // Use a fresh context for verification since the test context may be expired - verifyCtx := context.Background() - - if tt.checkNSExists { - ns, nsErr := client.CoreV1().Namespaces().Get(verifyCtx, longhornNamespace, metav1.GetOptions{}) - if nsErr != nil { - t.Fatalf("namespace %s not found: %v", longhornNamespace, nsErr) - } - if ns.Name != longhornNamespace { - t.Errorf("namespace name = %q, want %q", ns.Name, longhornNamespace) - } - } - - if tt.checkDSExists { - ds, dsErr := client.AppsV1().DaemonSets(longhornNamespace).Get(verifyCtx, "longhorn-iscsi-installation", metav1.GetOptions{}) - if dsErr != nil { - t.Fatalf("DaemonSet not found: %v", dsErr) - } - if ds.Name != "longhorn-iscsi-installation" { - t.Errorf("DaemonSet name = %q, want %q", ds.Name, "longhorn-iscsi-installation") - } - if ds.Namespace != longhornNamespace { - t.Errorf("DaemonSet namespace = %q, want %q", ds.Namespace, longhornNamespace) - } - } - }) - } -} - -func TestWaitForDaemonSetReady(t *testing.T) { - tests := []struct { - name string - ds *appsv1.DaemonSet - wantErr bool - }{ - { - name: "returns immediately when DaemonSet is ready", - ds: &appsv1.DaemonSet{ - ObjectMeta: metav1.ObjectMeta{ - Name: "test-ds", - Namespace: longhornNamespace, - }, - Status: appsv1.DaemonSetStatus{ - DesiredNumberScheduled: 3, - NumberReady: 3, - }, - }, - wantErr: false, - }, - { - name: "times out when DaemonSet is not ready", - ds: &appsv1.DaemonSet{ - ObjectMeta: metav1.ObjectMeta{ - Name: "test-ds", - Namespace: longhornNamespace, - }, - Status: appsv1.DaemonSetStatus{ - DesiredNumberScheduled: 3, - NumberReady: 1, - }, - }, - wantErr: true, - }, - { - name: "times out when DesiredNumberScheduled is zero", - ds: &appsv1.DaemonSet{ - ObjectMeta: metav1.ObjectMeta{ - Name: "test-ds", - Namespace: longhornNamespace, - }, - Status: appsv1.DaemonSetStatus{ - DesiredNumberScheduled: 0, - NumberReady: 0, - }, - }, - wantErr: true, - }, - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - ns := &corev1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: longhornNamespace}} - client := fake.NewSimpleClientset(ns, tt.ds) //nolint:staticcheck - - err := waitForDaemonSetReady(context.Background(), kubernetes.Interface(client), longhornNamespace, tt.ds.Name, 1*time.Second) - if tt.wantErr { - if err == nil { - t.Fatal("expected timeout error, got nil") - } - } else { - if err != nil { - t.Fatalf("unexpected error: %v", err) - } - } - }) - } -} - -// getNestedValue retrieves a value from a nested map using a dot-separated path. -func getNestedValue(m map[string]any, path string) any { - parts := splitDotPath(path) - var current any = m - for _, part := range parts { - cm, ok := current.(map[string]any) - if !ok { - return nil +func TestAWSConfigLonghornDefaults(t *testing.T) { + t.Run("nil Longhorn block defaults to enabled (AWS opt-in)", func(t *testing.T) { + cfg := &Config{} + if !cfg.LonghornEnabled() { + t.Error("LonghornEnabled() = false on AWS config with nil Longhorn, want true") } - current, ok = cm[part] - if !ok { - return nil + if cfg.LonghornReplicaCount() == 0 { + t.Error("LonghornReplicaCount() = 0, want non-zero default") } - } - return current -} + }) + + t.Run("explicit disabled honours user", func(t *testing.T) { + disabled := false + cfg := &Config{Longhorn: &LonghornConfig{Enabled: &disabled}} + if cfg.LonghornEnabled() { + t.Error("LonghornEnabled() = true with explicit Enabled=false") + } + }) -// splitDotPath splits a dot-separated path into parts. -func splitDotPath(path string) []string { - var parts []string - start := 0 - for i := 0; i < len(path); i++ { - if path[i] == '.' { - parts = append(parts, path[start:i]) - start = i + 1 + t.Run("explicit replica count overrides default", func(t *testing.T) { + cfg := &Config{Longhorn: &LonghornConfig{ReplicaCount: 5}} + if got := cfg.LonghornReplicaCount(); got != 5 { + t.Errorf("LonghornReplicaCount() = %d, want 5", got) } - } - parts = append(parts, path[start:]) - return parts + }) } diff --git a/pkg/provider/aws/provider.go b/pkg/provider/aws/provider.go index 880575b..d0d5592 100644 --- a/pkg/provider/aws/provider.go +++ b/pkg/provider/aws/provider.go @@ -14,6 +14,7 @@ import ( "github.com/nebari-dev/nebari-infrastructure-core/pkg/config" "github.com/nebari-dev/nebari-infrastructure-core/pkg/provider" "github.com/nebari-dev/nebari-infrastructure-core/pkg/status" + "github.com/nebari-dev/nebari-infrastructure-core/pkg/storage/longhorn" "github.com/nebari-dev/nebari-infrastructure-core/pkg/tofu" ) @@ -26,9 +27,6 @@ const ( ReconcileTimeout = 30 * time.Minute AWS = "aws" - // storageClassLonghorn is the StorageClass name used when Longhorn is enabled. - storageClassLonghorn = "longhorn" - // storageClassGP2 is the default EBS StorageClass name when Longhorn is disabled. storageClassGP2 = "gp2" ) @@ -319,7 +317,7 @@ func (p *Provider) Deploy(ctx context.Context, projectName string, clusterConfig return fmt.Errorf("failed to get kubeconfig for Longhorn install: %w", err) } - if err := installLonghorn(ctx, kubeconfigBytes, awsCfg); err != nil { + if err := longhorn.Install(ctx, kubeconfigBytes, awsCfg.Longhorn); err != nil { span.RecordError(err) return fmt.Errorf("failed to install Longhorn: %w", err) } @@ -640,7 +638,7 @@ func (p *Provider) Summary(clusterConfig *config.ClusterConfig) map[string]strin // StorageClass is "longhorn" when Longhorn is enabled (default), "gp2" otherwise. // EFSStorageClass is set when EFS is enabled. func (p *Provider) InfraSettings(clusterConfig *config.ClusterConfig) provider.InfraSettings { - sc := storageClassLonghorn + sc := longhorn.StorageClassName var efsSC string rawCfg := clusterConfig.ProviderConfig() diff --git a/pkg/provider/existing/config.go b/pkg/provider/existing/config.go index f326b8d..2cab0c6 100644 --- a/pkg/provider/existing/config.go +++ b/pkg/provider/existing/config.go @@ -2,6 +2,7 @@ package existing import ( "github.com/nebari-dev/nebari-infrastructure-core/pkg/kubeconfig" + "github.com/nebari-dev/nebari-infrastructure-core/pkg/storage/longhorn" ) // Config represents configuration for connecting to a pre-existing Kubernetes cluster. @@ -16,22 +17,36 @@ type Config struct { Context string `yaml:"context"` // StorageClass is the default Kubernetes StorageClass for persistent volumes. - // Defaults to "standard" when empty. + // Defaults to "standard" when empty, or to "longhorn" when Longhorn is + // enabled below and StorageClass is left unset. StorageClass string `yaml:"storage_class,omitempty"` // LoadBalancerAnnotations are added to the Gateway's LoadBalancer Service. // Use this to pass cloud-specific annotations the Cloud Controller Manager may require for // provisioning LoadBalancers (e.g., "load-balancer.hetzner.cloud/location: ash"). LoadBalancerAnnotations map[string]string `yaml:"load_balancer_annotations,omitempty"` + + // Longhorn opts the existing-cluster provider into installing Longhorn for + // distributed/replicated block + RWX storage. The block is required to + // opt-in (nil means "do not install"). Use this on bare-metal / hetzner-k3s + // clusters that lack a managed RWX StorageClass — without it, charts that + // need RWX (e.g. jupyterhub shared-storage for group dirs) fall back to + // the in-cluster NFS-on-RWO workaround. + Longhorn *longhorn.Config `yaml:"longhorn,omitempty"` } const defaultStorageClass = "standard" // GetStorageClass returns the configured storage class or the default. +// When Longhorn is enabled and StorageClass is unset, returns "longhorn" +// so downstream charts pick up the Longhorn-managed StorageClass. func (c *Config) GetStorageClass() string { if c.StorageClass != "" { return c.StorageClass } + if c.Longhorn.IsEnabled() { + return longhorn.StorageClassName + } return defaultStorageClass } diff --git a/pkg/provider/existing/provider.go b/pkg/provider/existing/provider.go index ff25fde..94f1339 100644 --- a/pkg/provider/existing/provider.go +++ b/pkg/provider/existing/provider.go @@ -10,6 +10,7 @@ import ( "github.com/nebari-dev/nebari-infrastructure-core/pkg/config" "github.com/nebari-dev/nebari-infrastructure-core/pkg/kubeconfig" "github.com/nebari-dev/nebari-infrastructure-core/pkg/provider" + "github.com/nebari-dev/nebari-infrastructure-core/pkg/storage/longhorn" ) const ProviderName = "existing" @@ -114,10 +115,13 @@ func (p *Provider) Validate(ctx context.Context, projectName string, clusterConf return nil } -// Deploy is a no-op for existing clusters. +// Deploy installs cluster-side prerequisites for the existing cluster. +// Infrastructure is assumed to already exist (the cluster was provisioned +// out-of-band), so this is mostly a no-op except for opt-in components like +// Longhorn that the user can request via the existing-cluster config. func (p *Provider) Deploy(ctx context.Context, projectName string, clusterConfig *config.ClusterConfig, opts provider.DeployOptions) error { tracer := otel.Tracer("nebari-infrastructure-core") - _, span := tracer.Start(ctx, "existing.Deploy") + ctx, span := tracer.Start(ctx, "existing.Deploy") defer span.End() span.SetAttributes( @@ -126,6 +130,28 @@ func (p *Provider) Deploy(ctx context.Context, projectName string, clusterConfig attribute.Bool("dry_run", opts.DryRun), ) + if opts.DryRun { + return nil + } + + existingCfg, err := extractConfig(ctx, clusterConfig) + if err != nil { + span.RecordError(err) + return err + } + + if existingCfg.Longhorn.IsEnabled() { + kubeconfigBytes, err := p.GetKubeconfig(ctx, projectName, clusterConfig) + if err != nil { + span.RecordError(err) + return fmt.Errorf("failed to get kubeconfig for Longhorn install: %w", err) + } + if err := longhorn.Install(ctx, kubeconfigBytes, existingCfg.Longhorn); err != nil { + span.RecordError(err) + return fmt.Errorf("failed to install Longhorn: %w", err) + } + } + return nil } diff --git a/pkg/provider/existing/provider_test.go b/pkg/provider/existing/provider_test.go index eb4452d..c0cc399 100644 --- a/pkg/provider/existing/provider_test.go +++ b/pkg/provider/existing/provider_test.go @@ -198,6 +198,34 @@ func TestInfraSettings(t *testing.T) { wantLBAnnotationLen: 1, wantLBAnnotations: map[string]string{"load-balancer.hetzner.cloud/location": "ash"}, }, + { + name: "longhorn opt-in defaults StorageClass to longhorn", + config: map[string]any{ + "context": "my-context", + "longhorn": map[string]any{}, + }, + wantStorageClass: "longhorn", + wantMetalLB: false, + }, + { + name: "longhorn explicitly disabled keeps default StorageClass", + config: map[string]any{ + "context": "my-context", + "longhorn": map[string]any{"enabled": false}, + }, + wantStorageClass: "standard", + wantMetalLB: false, + }, + { + name: "explicit storage_class wins over longhorn block", + config: map[string]any{ + "context": "my-context", + "storage_class": "hcloud-volumes", + "longhorn": map[string]any{}, + }, + wantStorageClass: "hcloud-volumes", + wantMetalLB: false, + }, } for _, tt := range tests { diff --git a/pkg/provider/hetzner/config.go b/pkg/provider/hetzner/config.go index 56e7699..a3fa522 100644 --- a/pkg/provider/hetzner/config.go +++ b/pkg/provider/hetzner/config.go @@ -5,6 +5,8 @@ import ( "net" "regexp" "strings" + + "github.com/nebari-dev/nebari-infrastructure-core/pkg/storage/longhorn" ) // Config holds Hetzner-specific provider configuration. @@ -30,6 +32,27 @@ type Config struct { SSH *SSHConfig `yaml:"ssh,omitempty"` Network *NetworkConfig `yaml:"network,omitempty"` + + // Longhorn configures the Longhorn distributed block storage install. + // Hetzner's hcloud-volumes CSI is RWO-only; charts that need RWX (e.g. + // jupyterhub shared-storage for group dirs) require Longhorn — or another + // RWX provider — to avoid the in-cluster NFS-on-RWO workaround. + // Defaults to enabled when the block is omitted; set `enabled: false` to + // opt out. + Longhorn *longhorn.Config `yaml:"longhorn,omitempty"` +} + +// LonghornEnabled returns whether Longhorn distributed block storage should +// be deployed on this Hetzner cluster. Defaults to true when the Longhorn +// block is omitted entirely — Longhorn is the Hetzner storage default since +// hcloud-volumes is RWO-only. The shared longhorn.Config defaults to +// disabled-when-nil because non-managed providers (e.g. existing) require +// an explicit opt-in. +func (c *Config) LonghornEnabled() bool { + if c.Longhorn == nil { + return true + } + return c.Longhorn.IsEnabled() } // NodeGroup defines a pool of Hetzner Cloud instances. Exactly one node group diff --git a/pkg/provider/hetzner/provider.go b/pkg/provider/hetzner/provider.go index 387b191..bb25478 100644 --- a/pkg/provider/hetzner/provider.go +++ b/pkg/provider/hetzner/provider.go @@ -12,6 +12,7 @@ import ( "github.com/nebari-dev/nebari-infrastructure-core/pkg/config" "github.com/nebari-dev/nebari-infrastructure-core/pkg/provider" "github.com/nebari-dev/nebari-infrastructure-core/pkg/status" + "github.com/nebari-dev/nebari-infrastructure-core/pkg/storage/longhorn" ) const providerName = "hetzner" @@ -211,6 +212,21 @@ func (p *Provider) Deploy(ctx context.Context, projectName string, clusterConfig status.Send(ctx, status.NewUpdate(status.LevelInfo, "Hetzner k3s cluster created successfully"). WithResource("provider").WithAction("deploy")) + // Install Longhorn unless explicitly disabled. Hetzner's hcloud-volumes + // CSI is RWO-only; Longhorn provides the RWX StorageClass that downstream + // charts (e.g. jupyterhub shared-storage for group dirs) need. + if hCfg.LonghornEnabled() { + kubeconfigBytes, err := os.ReadFile(kubeconfigPath) //nolint:gosec // Path constructed from known cache dir + project name + if err != nil { + span.RecordError(err) + return fmt.Errorf("failed to read kubeconfig for Longhorn install: %w", err) + } + if err := longhorn.Install(ctx, kubeconfigBytes, hCfg.Longhorn); err != nil { + span.RecordError(err) + return fmt.Errorf("failed to install Longhorn: %w", err) + } + } + // Label volumes for persistence if configured if hCfg.PersistData { status.Send(ctx, status.NewUpdate(status.LevelInfo, "Labeling CSI volumes with persist=true"). @@ -341,20 +357,27 @@ func (p *Provider) GetKubeconfig(ctx context.Context, projectName string, _ *con func (p *Provider) InfraSettings(clusterConfig *config.ClusterConfig) provider.InfraSettings { settings := provider.InfraSettings{ - StorageClass: "hcloud-volumes", + StorageClass: longhorn.StorageClassName, NeedsMetalLB: false, } - // Derive LB annotations from location. + // Derive LB annotations from location, and fall back to "hcloud-volumes" + // when Longhorn is explicitly disabled. Longhorn is the Hetzner default + // because hcloud-volumes is RWO-only (see Config.LonghornEnabled). // Parse errors are intentionally ignored here: InfraSettings is called after // Validate() has already confirmed the config is parseable. If it somehow // fails (e.g., nil config in tests), we return valid defaults without annotations. rawCfg := clusterConfig.ProviderConfig() if rawCfg != nil { var hCfg Config - if err := config.UnmarshalProviderConfig(context.Background(), rawCfg, &hCfg); err == nil && hCfg.Location != "" { - settings.LoadBalancerAnnotations = map[string]string{ - "load-balancer.hetzner.cloud/location": hCfg.Location, + if err := config.UnmarshalProviderConfig(context.Background(), rawCfg, &hCfg); err == nil { + if hCfg.Location != "" { + settings.LoadBalancerAnnotations = map[string]string{ + "load-balancer.hetzner.cloud/location": hCfg.Location, + } + } + if !hCfg.LonghornEnabled() { + settings.StorageClass = "hcloud-volumes" } } } diff --git a/pkg/provider/hetzner/provider_test.go b/pkg/provider/hetzner/provider_test.go index db5fe7d..2fb295b 100644 --- a/pkg/provider/hetzner/provider_test.go +++ b/pkg/provider/hetzner/provider_test.go @@ -28,7 +28,7 @@ func TestProvider_InfraSettings(t *testing.T) { wantMLB bool }{ { - name: "default settings with location", + name: "default settings with location use longhorn", cfg: &config.ClusterConfig{ Providers: map[string]any{ "hetzner": map[string]any{ @@ -36,17 +36,32 @@ func TestProvider_InfraSettings(t *testing.T) { }, }, }, - wantSC: "hcloud-volumes", + wantSC: "longhorn", wantLBA: map[string]string{"load-balancer.hetzner.cloud/location": "ash"}, wantKBP: "", wantMLB: false, }, { - name: "nil provider config uses defaults", + name: "empty provider config defaults to longhorn", cfg: &config.ClusterConfig{ Providers: map[string]any{"hetzner": map[string]any{}}, }, + wantSC: "longhorn", + wantKBP: "", + wantMLB: false, + }, + { + name: "longhorn explicitly disabled falls back to hcloud-volumes", + cfg: &config.ClusterConfig{ + Providers: map[string]any{ + "hetzner": map[string]any{ + "location": "ash", + "longhorn": map[string]any{"enabled": false}, + }, + }, + }, wantSC: "hcloud-volumes", + wantLBA: map[string]string{"load-balancer.hetzner.cloud/location": "ash"}, wantKBP: "", wantMLB: false, }, diff --git a/pkg/storage/longhorn/config.go b/pkg/storage/longhorn/config.go new file mode 100644 index 0000000..466c608 --- /dev/null +++ b/pkg/storage/longhorn/config.go @@ -0,0 +1,63 @@ +// Package longhorn installs and manages Longhorn distributed block storage on +// a Kubernetes cluster. It is provider-agnostic and intended to be consumed by +// any NIC provider that needs RWO/RWX storage without a managed cloud offering +// (e.g. on-prem, Hetzner via hetzner-k3s, kind/k3d for development). +package longhorn + +const ( + // StorageClassName is the Longhorn-managed StorageClass providers should + // surface to downstream charts when Longhorn is enabled. + StorageClassName = "longhorn" + + // Namespace is the Kubernetes namespace Longhorn is installed into. + Namespace = "longhorn-system" + + // ReleaseName is the Helm release name used for Longhorn. + ReleaseName = "longhorn" + + // ChartVersion pins the upstream Longhorn Helm chart version. Bump + // together with iscsiDaemonSetYAML when upgrading. + ChartVersion = "1.8.1" + + chartRepoName = "longhorn" + chartRepoURL = "https://charts.longhorn.io" + chartName = "longhorn/longhorn" + + defaultReplicaCount = 2 +) + +// Config carries the user-tunable Longhorn settings shared across providers. +// +// A nil *Config means "do not install" (see IsEnabled). When the user supplies +// a non-nil Config, Enabled defaults to true so an empty block (`longhorn: {}`) +// is the minimal opt-in. ReplicaCount defaults to 2 — appropriate for small +// clusters; production deploys should raise it. +type Config struct { + Enabled *bool `yaml:"enabled,omitempty"` + ReplicaCount int `yaml:"replica_count,omitempty"` + DedicatedNodes bool `yaml:"dedicated_nodes,omitempty"` + NodeSelector map[string]string `yaml:"node_selector,omitempty"` +} + +// IsEnabled returns whether Longhorn should be installed. A nil Config (i.e. +// the user omitted the longhorn block entirely) is treated as disabled so +// providers can opt-in by setting a non-nil zero-value Config; an explicit +// Enabled=false also disables. +func (c *Config) IsEnabled() bool { + if c == nil { + return false + } + if c.Enabled == nil { + return true + } + return *c.Enabled +} + +// Replicas returns the configured replica count, falling back to the default +// when the field is unset or zero. +func (c *Config) Replicas() int { + if c == nil || c.ReplicaCount == 0 { + return defaultReplicaCount + } + return c.ReplicaCount +} diff --git a/pkg/storage/longhorn/install.go b/pkg/storage/longhorn/install.go new file mode 100644 index 0000000..7bfb1d4 --- /dev/null +++ b/pkg/storage/longhorn/install.go @@ -0,0 +1,214 @@ +package longhorn + +import ( + "context" + "fmt" + "time" + + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + "helm.sh/helm/v3/pkg/action" + "helm.sh/helm/v3/pkg/chart" + "helm.sh/helm/v3/pkg/chart/loader" + "helm.sh/helm/v3/pkg/cli" + + "github.com/nebari-dev/nebari-infrastructure-core/pkg/helm" + "github.com/nebari-dev/nebari-infrastructure-core/pkg/status" +) + +const installTimeout = 10 * time.Minute + +// Install installs (or upgrades, if a release exists) Longhorn on the cluster +// the kubeconfigBytes connect to. +// +// On a fresh cluster, the iSCSI prerequisite DaemonSet is deployed and waited +// on before the Helm install. On clusters with an existing release, the iSCSI +// step is skipped (it was already provisioned on first install) and the Helm +// release is upgraded in place. +// +// Install is idempotent: re-running on an installed cluster is a no-op modulo +// any Config changes that would shift the rendered Helm values. +func Install(ctx context.Context, kubeconfigBytes []byte, cfg *Config) error { + tracer := otel.Tracer("nebari-infrastructure-core") + ctx, span := tracer.Start(ctx, "longhorn.Install") + defer span.End() + + span.SetAttributes( + attribute.String("chart_version", ChartVersion), + attribute.Int("replica_count", cfg.Replicas()), + attribute.Bool("dedicated_nodes", cfg != nil && cfg.DedicatedNodes), + ) + + kubeconfigPath, cleanup, err := helm.WriteTempKubeconfig(kubeconfigBytes) + if err != nil { + span.RecordError(err) + return err + } + defer cleanup() + + if err := helm.AddRepo(ctx, chartRepoName, chartRepoURL); err != nil { + span.RecordError(err) + return fmt.Errorf("failed to add Longhorn Helm repository: %w", err) + } + + actionConfig, err := helm.NewActionConfig(kubeconfigPath, Namespace) + if err != nil { + span.RecordError(err) + return fmt.Errorf("failed to create Helm action config: %w", err) + } + + histClient := action.NewHistory(actionConfig) + histClient.Max = 1 + if _, err := histClient.Run(ReleaseName); err == nil { + // Release exists — iSCSI was provisioned on initial install, so skip + // the 3-minute DaemonSet readiness wait and go straight to upgrade. + status.Send(ctx, status.NewUpdate(status.LevelInfo, "Longhorn already installed, upgrading"). + WithResource("longhorn"). + WithAction("upgrading")) + return upgrade(ctx, actionConfig, cfg) + } + + if err := ensureISCSI(ctx, kubeconfigBytes); err != nil { + span.RecordError(err) + return fmt.Errorf("failed to install iSCSI prerequisites: %w", err) + } + + helmValues := buildHelmValues(cfg) + + status.Send(ctx, status.NewUpdate(status.LevelProgress, "Installing Longhorn storage"). + WithResource("longhorn"). + WithAction("installing"). + WithMetadata("chart_version", ChartVersion)) + + client := action.NewInstall(actionConfig) + client.Namespace = Namespace + client.ReleaseName = ReleaseName + client.CreateNamespace = true + client.Wait = true + client.Timeout = installTimeout + client.Version = ChartVersion + + loadedChart, err := loadChart(client.ChartPathOptions) + if err != nil { + span.RecordError(err) + return err + } + + release, err := client.Run(loadedChart, helmValues) + if err != nil { + span.RecordError(err) + return fmt.Errorf("failed to install Longhorn: %w", err) + } + + span.SetAttributes( + attribute.String("release_status", string(release.Info.Status)), + attribute.Int("release_version", release.Version), + ) + + status.Send(ctx, status.NewUpdate(status.LevelSuccess, "Longhorn storage installed"). + WithResource("longhorn"). + WithAction("installed"). + WithMetadata("chart_version", ChartVersion)) + + return nil +} + +func upgrade(ctx context.Context, actionConfig *action.Configuration, cfg *Config) error { + tracer := otel.Tracer("nebari-infrastructure-core") + _, span := tracer.Start(ctx, "longhorn.upgrade") + defer span.End() + + helmValues := buildHelmValues(cfg) + + client := action.NewUpgrade(actionConfig) + client.Namespace = Namespace + client.Wait = true + client.Timeout = installTimeout + client.Version = ChartVersion + + loadedChart, err := loadChart(client.ChartPathOptions) + if err != nil { + span.RecordError(err) + return err + } + + release, err := client.Run(ReleaseName, loadedChart, helmValues) + if err != nil { + span.RecordError(err) + return fmt.Errorf("failed to upgrade Longhorn: %w", err) + } + + span.SetAttributes( + attribute.String("release_status", string(release.Info.Status)), + attribute.Int("release_version", release.Version), + ) + + status.Send(ctx, status.NewUpdate(status.LevelSuccess, "Longhorn storage upgraded"). + WithResource("longhorn"). + WithAction("upgraded"). + WithMetadata("chart_version", ChartVersion)) + + return nil +} + +func loadChart(chartPathOptions action.ChartPathOptions) (*chart.Chart, error) { + chartPath, err := chartPathOptions.LocateChart(chartName, cli.New()) + if err != nil { + return nil, fmt.Errorf("failed to locate Longhorn chart: %w", err) + } + + loaded, err := loader.Load(chartPath) + if err != nil { + return nil, fmt.Errorf("failed to load Longhorn chart: %w", err) + } + + return loaded, nil +} + +// buildHelmValues turns a Config into the values map passed to the Longhorn +// Helm chart. +func buildHelmValues(cfg *Config) map[string]any { + persistence := map[string]any{ + "defaultClass": true, + "defaultClassReplicaCount": cfg.Replicas(), + "defaultFsType": "ext4", + } + + settings := map[string]any{ + "replicaZoneSoftAntiAffinity": "true", + "replicaAutoBalance": "best-effort", + } + + values := map[string]any{ + "persistence": persistence, + "defaultSettings": settings, + } + + if cfg != nil && cfg.DedicatedNodes { + settings["createDefaultDiskLabeledNodes"] = true + + nodeSelector := map[string]string{"node.longhorn.io/storage": "true"} + if cfg.NodeSelector != nil { + nodeSelector = cfg.NodeSelector + } + + tolerations := []map[string]string{ + { + "key": "node.longhorn.io/storage", + "operator": "Exists", + "effect": "NoSchedule", + }, + } + + values["longhornManager"] = map[string]any{ + "nodeSelector": nodeSelector, + "tolerations": tolerations, + } + values["longhornDriver"] = map[string]any{ + "nodeSelector": nodeSelector, + "tolerations": tolerations, + } + } + + return values +} diff --git a/pkg/storage/longhorn/iscsi.go b/pkg/storage/longhorn/iscsi.go new file mode 100644 index 0000000..8423188 --- /dev/null +++ b/pkg/storage/longhorn/iscsi.go @@ -0,0 +1,210 @@ +package longhorn + +import ( + "context" + "fmt" + "strings" + "time" + + "go.opentelemetry.io/otel" + appsv1 "k8s.io/api/apps/v1" + corev1 "k8s.io/api/core/v1" + k8serrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/yaml" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/tools/clientcmd" + + "github.com/nebari-dev/nebari-infrastructure-core/pkg/status" +) + +const iscsiDaemonSetTimeout = 3 * time.Minute + +// iscsiDaemonSetYAML is the Longhorn iSCSI prerequisite DaemonSet. +// Source: https://github.com/longhorn/longhorn/blob/v1.8.1/deploy/prerequisite/longhorn-iscsi-installation.yaml +// Embedded to avoid runtime HTTP fetches and support air-gapped installs. +const iscsiDaemonSetYAML = `apiVersion: apps/v1 +kind: DaemonSet +metadata: + name: longhorn-iscsi-installation + namespace: longhorn-system + labels: + app: longhorn-iscsi-installation +spec: + selector: + matchLabels: + app: longhorn-iscsi-installation + template: + metadata: + labels: + app: longhorn-iscsi-installation + spec: + hostNetwork: true + hostPID: true + initContainers: + - name: iscsi-installation + command: + - nsenter + - --mount=/proc/1/ns/mnt + - -- + - bash + - -c + - | + OS=$(grep -E "^ID_LIKE=" /etc/os-release | cut -d '=' -f 2) + if [[ -z "${OS}" ]]; then + OS=$(grep -E "^ID=" /etc/os-release | cut -d '=' -f 2) + fi + if [[ "${OS}" == *"debian"* ]]; then + sudo apt-get update -q -y && sudo apt-get install -q -y open-iscsi && sudo systemctl -q enable iscsid && sudo systemctl start iscsid && sudo modprobe iscsi_tcp + elif [[ "${OS}" == *"suse"* ]]; then + sudo zypper --gpg-auto-import-keys -q refresh && sudo zypper --gpg-auto-import-keys -q install -y open-iscsi && sudo systemctl -q enable iscsid && sudo systemctl start iscsid && sudo modprobe iscsi_tcp + else + sudo yum makecache -q -y && sudo yum --setopt=tsflags=noscripts install -q -y iscsi-initiator-utils && echo "InitiatorName=$(/sbin/iscsi-iname)" > /etc/iscsi/initiatorname.iscsi && sudo systemctl -q enable iscsid && sudo systemctl start iscsid && sudo modprobe iscsi_tcp + fi + if [ $? -eq 0 ]; then echo "iscsi install successfully"; else echo "iscsi install failed error code $?"; fi + image: alpine:3.17 + securityContext: + privileged: true + containers: + - name: sleep + image: registry.k8s.io/pause:3.1 + updateStrategy: + type: RollingUpdate +` + +// newK8sClient builds a Kubernetes client from raw kubeconfig bytes. +func newK8sClient(kubeconfigBytes []byte) (*kubernetes.Clientset, error) { + restConfig, err := clientcmd.RESTConfigFromKubeConfig(kubeconfigBytes) + if err != nil { + return nil, fmt.Errorf("failed to parse kubeconfig: %w", err) + } + return kubernetes.NewForConfig(restConfig) +} + +// ensureISCSI deploys the Longhorn iSCSI prerequisite DaemonSet and waits +// for all pods to become ready. Required for nodes that don't ship with +// open-iscsi/iscsi-initiator-utils (Amazon Linux 2023, k3s minimal images). +func ensureISCSI(ctx context.Context, kubeconfigBytes []byte) error { + tracer := otel.Tracer("nebari-infrastructure-core") + ctx, span := tracer.Start(ctx, "longhorn.ensureISCSI") + defer span.End() + + status.Send(ctx, status.NewUpdate(status.LevelProgress, "Installing iSCSI prerequisites on cluster nodes"). + WithResource("iscsi-daemonset"). + WithAction("installing")) + + client, err := newK8sClient(kubeconfigBytes) + if err != nil { + span.RecordError(err) + return fmt.Errorf("failed to create Kubernetes client: %w", err) + } + + return ensureISCSIWithClient(ctx, client) +} + +// ensureISCSIWithClient is the testable inner form of ensureISCSI; takes a +// kubernetes.Interface so unit tests can inject a fake client. +func ensureISCSIWithClient(ctx context.Context, client kubernetes.Interface) error { + tracer := otel.Tracer("nebari-infrastructure-core") + ctx, span := tracer.Start(ctx, "longhorn.ensureISCSIWithClient") + defer span.End() + + if err := ensureNamespace(ctx, client, Namespace); err != nil { + span.RecordError(err) + return fmt.Errorf("failed to ensure namespace %s: %w", Namespace, err) + } + + var ds appsv1.DaemonSet + if err := yaml.NewYAMLOrJSONDecoder( + strings.NewReader(iscsiDaemonSetYAML), 4096, + ).Decode(&ds); err != nil { + span.RecordError(err) + return fmt.Errorf("failed to parse iSCSI DaemonSet YAML: %w", err) + } + + existing, err := client.AppsV1().DaemonSets(Namespace).Get(ctx, ds.Name, metav1.GetOptions{}) + switch { + case k8serrors.IsNotFound(err): + status.Send(ctx, status.NewUpdate(status.LevelProgress, "Creating iSCSI prerequisite DaemonSet"). + WithResource("iscsi-daemonset"). + WithAction("creating")) + if _, err := client.AppsV1().DaemonSets(Namespace).Create(ctx, &ds, metav1.CreateOptions{}); err != nil { + span.RecordError(err) + return fmt.Errorf("failed to create iSCSI DaemonSet: %w", err) + } + case err != nil: + span.RecordError(err) + return fmt.Errorf("failed to get iSCSI DaemonSet: %w", err) + default: + status.Send(ctx, status.NewUpdate(status.LevelInfo, "Updating existing iSCSI prerequisite DaemonSet"). + WithResource("iscsi-daemonset"). + WithAction("updating")) + ds.ResourceVersion = existing.ResourceVersion + if _, err := client.AppsV1().DaemonSets(Namespace).Update(ctx, &ds, metav1.UpdateOptions{}); err != nil { + span.RecordError(err) + return fmt.Errorf("failed to update iSCSI DaemonSet: %w", err) + } + } + + if err := waitForDaemonSetReady(ctx, client, Namespace, ds.Name, iscsiDaemonSetTimeout); err != nil { + span.RecordError(err) + return fmt.Errorf("iSCSI DaemonSet not ready: %w", err) + } + + status.Send(ctx, status.NewUpdate(status.LevelSuccess, "iSCSI prerequisites installed on all nodes"). + WithResource("iscsi-daemonset"). + WithAction("ready")) + + return nil +} + +func ensureNamespace(ctx context.Context, client kubernetes.Interface, namespace string) error { + _, err := client.CoreV1().Namespaces().Get(ctx, namespace, metav1.GetOptions{}) + if k8serrors.IsNotFound(err) { + ns := &corev1.Namespace{ + ObjectMeta: metav1.ObjectMeta{Name: namespace}, + } + _, err = client.CoreV1().Namespaces().Create(ctx, ns, metav1.CreateOptions{}) + if err != nil { + return fmt.Errorf("failed to create namespace %s: %w", namespace, err) + } + return nil + } + return err +} + +func waitForDaemonSetReady(ctx context.Context, client kubernetes.Interface, namespace, name string, timeout time.Duration) error { + ctx, cancel := context.WithTimeout(ctx, timeout) + defer cancel() + + checkReady := func() (bool, error) { + ds, err := client.AppsV1().DaemonSets(namespace).Get(ctx, name, metav1.GetOptions{}) + if err != nil { + return false, err + } + return ds.Status.DesiredNumberScheduled > 0 && + ds.Status.DesiredNumberScheduled == ds.Status.NumberReady, nil + } + + if ready, err := checkReady(); err == nil && ready { + return nil + } + + ticker := time.NewTicker(5 * time.Second) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + return fmt.Errorf("timeout waiting for DaemonSet %s/%s: %w", namespace, name, ctx.Err()) + case <-ticker.C: + ready, err := checkReady() + if err != nil { + continue + } + if ready { + return nil + } + } + } +} diff --git a/pkg/storage/longhorn/longhorn_test.go b/pkg/storage/longhorn/longhorn_test.go new file mode 100644 index 0000000..09b7855 --- /dev/null +++ b/pkg/storage/longhorn/longhorn_test.go @@ -0,0 +1,390 @@ +package longhorn + +import ( + "context" + "testing" + "time" + + appsv1 "k8s.io/api/apps/v1" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/kubernetes/fake" +) + +func TestBuildHelmValues(t *testing.T) { + tests := []struct { + name string + config *Config + checkValues map[string]any + }{ + { + name: "default config produces base values", + config: &Config{}, + checkValues: map[string]any{ + "persistence.defaultClassReplicaCount": defaultReplicaCount, + "persistence.defaultFsType": "ext4", + "persistence.defaultClass": true, + "defaultSettings.replicaZoneSoftAntiAffinity": "true", + "defaultSettings.replicaAutoBalance": "best-effort", + }, + }, + { + name: "custom replica count", + config: &Config{ReplicaCount: 3}, + checkValues: map[string]any{ + "persistence.defaultClassReplicaCount": 3, + }, + }, + { + name: "dedicated nodes adds nodeSelector and tolerations", + config: &Config{ + DedicatedNodes: true, + NodeSelector: map[string]string{"node.longhorn.io/storage": "true"}, + }, + checkValues: map[string]any{ + "defaultSettings.createDefaultDiskLabeledNodes": true, + }, + }, + { + name: "dedicated nodes without custom nodeSelector uses default", + config: &Config{DedicatedNodes: true}, + checkValues: map[string]any{ + "defaultSettings.createDefaultDiskLabeledNodes": true, + }, + }, + { + name: "non-dedicated nodes omits nodeSelector and tolerations", + config: &Config{DedicatedNodes: false, ReplicaCount: 2}, + checkValues: map[string]any{ + "persistence.defaultClassReplicaCount": 2, + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + values := buildHelmValues(tt.config) + for key, want := range tt.checkValues { + got := getNestedValue(values, key) + if got == nil { + t.Errorf("key %q not found in values", key) + continue + } + if got != want { + t.Errorf("values[%q] = %v (%T), want %v (%T)", key, got, got, want, want) + } + } + }) + } +} + +func TestBuildHelmValuesDedicatedNodesStructure(t *testing.T) { + cfg := &Config{ + DedicatedNodes: true, + NodeSelector: map[string]string{"node.longhorn.io/storage": "true"}, + } + + values := buildHelmValues(cfg) + + manager, ok := values["longhornManager"].(map[string]any) + if !ok { + t.Fatal("longhornManager not found or not a map") + } + ns, ok := manager["nodeSelector"].(map[string]string) + if !ok { + t.Fatal("longhornManager.nodeSelector not found or not a map[string]string") + } + if ns["node.longhorn.io/storage"] != "true" { + t.Errorf("longhornManager.nodeSelector[node.longhorn.io/storage] = %q, want %q", ns["node.longhorn.io/storage"], "true") + } + + tolerations, ok := manager["tolerations"].([]map[string]string) + if !ok { + t.Fatal("longhornManager.tolerations not found or not a []map[string]string") + } + if len(tolerations) != 1 { + t.Fatalf("longhornManager.tolerations length = %d, want 1", len(tolerations)) + } + if tolerations[0]["key"] != "node.longhorn.io/storage" { + t.Errorf("toleration key = %q, want %q", tolerations[0]["key"], "node.longhorn.io/storage") + } + if tolerations[0]["operator"] != "Exists" { + t.Errorf("toleration operator = %q, want %q", tolerations[0]["operator"], "Exists") + } + if tolerations[0]["effect"] != "NoSchedule" { + t.Errorf("toleration effect = %q, want %q", tolerations[0]["effect"], "NoSchedule") + } + + driver, ok := values["longhornDriver"].(map[string]any) + if !ok { + t.Fatal("longhornDriver not found or not a map") + } + if _, ok := driver["nodeSelector"].(map[string]string); !ok { + t.Fatal("longhornDriver.nodeSelector not found or not a map[string]string") + } + if _, ok := driver["tolerations"].([]map[string]string); !ok { + t.Fatal("longhornDriver.tolerations not found or not a []map[string]string") + } +} + +func TestBuildHelmValuesNonDedicatedOmitsNodeSelector(t *testing.T) { + cfg := &Config{DedicatedNodes: false} + values := buildHelmValues(cfg) + if _, ok := values["longhornManager"]; ok { + t.Error("longhornManager should not be set when DedicatedNodes is false") + } + if _, ok := values["longhornDriver"]; ok { + t.Error("longhornDriver should not be set when DedicatedNodes is false") + } +} + +func TestConfigIsEnabled(t *testing.T) { + enabled := true + disabled := false + tests := []struct { + name string + cfg *Config + want bool + }{ + {"nil config disabled", nil, false}, + {"empty config enabled (opted-in)", &Config{}, true}, + {"explicit enabled true", &Config{Enabled: &enabled}, true}, + {"explicit enabled false", &Config{Enabled: &disabled}, false}, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := tt.cfg.IsEnabled(); got != tt.want { + t.Errorf("IsEnabled() = %v, want %v", got, tt.want) + } + }) + } +} + +func TestConfigReplicas(t *testing.T) { + tests := []struct { + name string + cfg *Config + want int + }{ + {"nil config defaults", nil, defaultReplicaCount}, + {"empty config defaults", &Config{}, defaultReplicaCount}, + {"zero count defaults", &Config{ReplicaCount: 0}, defaultReplicaCount}, + {"explicit count", &Config{ReplicaCount: 5}, 5}, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := tt.cfg.Replicas(); got != tt.want { + t.Errorf("Replicas() = %v, want %v", got, tt.want) + } + }) + } +} + +func TestEnsureNamespace(t *testing.T) { + tests := []struct { + name string + namespace string + existing []runtime.Object + }{ + { + name: "creates namespace when it does not exist", + namespace: Namespace, + }, + { + name: "succeeds when namespace already exists", + namespace: Namespace, + existing: []runtime.Object{ + &corev1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: Namespace}}, + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + client := fake.NewSimpleClientset(tt.existing...) //nolint:staticcheck + if err := ensureNamespace(context.Background(), client, tt.namespace); err != nil { + t.Fatalf("ensureNamespace() error = %v", err) + } + ns, err := client.CoreV1().Namespaces().Get(context.Background(), tt.namespace, metav1.GetOptions{}) + if err != nil { + t.Fatalf("namespace %s not found after ensureNamespace: %v", tt.namespace, err) + } + if ns.Name != tt.namespace { + t.Errorf("namespace name = %q, want %q", ns.Name, tt.namespace) + } + }) + } +} + +func TestEnsureISCSIWithClient(t *testing.T) { + tests := []struct { + name string + existing []runtime.Object + wantErr bool + checkDSExists bool + checkNSExists bool + }{ + { + name: "creates namespace and DaemonSet when neither exists", + wantErr: true, // fake client doesn't update DS status + checkDSExists: true, + checkNSExists: true, + }, + { + name: "creates DaemonSet when namespace already exists", + existing: []runtime.Object{ + &corev1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: Namespace}}, + }, + wantErr: true, + checkDSExists: true, + checkNSExists: true, + }, + { + name: "updates DaemonSet when it already exists", + existing: []runtime.Object{ + &corev1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: Namespace}}, + &appsv1.DaemonSet{ + ObjectMeta: metav1.ObjectMeta{ + Name: "longhorn-iscsi-installation", + Namespace: Namespace, + }, + Spec: appsv1.DaemonSetSpec{ + Selector: &metav1.LabelSelector{ + MatchLabels: map[string]string{"app": "longhorn-iscsi-installation"}, + }, + Template: corev1.PodTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{"app": "longhorn-iscsi-installation"}, + }, + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + {Name: "sleep", Image: "registry.k8s.io/pause:3.1"}, + }, + }, + }, + }, + }, + }, + wantErr: true, + checkDSExists: true, + checkNSExists: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + client := fake.NewSimpleClientset(tt.existing...) //nolint:staticcheck + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + + err := ensureISCSIWithClient(ctx, client) + if tt.wantErr && err == nil { + t.Fatal("expected error (timeout), got nil") + } + if !tt.wantErr && err != nil { + t.Fatalf("unexpected error: %v", err) + } + + verifyCtx := context.Background() + if tt.checkNSExists { + ns, nsErr := client.CoreV1().Namespaces().Get(verifyCtx, Namespace, metav1.GetOptions{}) + if nsErr != nil { + t.Fatalf("namespace %s not found: %v", Namespace, nsErr) + } + if ns.Name != Namespace { + t.Errorf("namespace name = %q, want %q", ns.Name, Namespace) + } + } + if tt.checkDSExists { + ds, dsErr := client.AppsV1().DaemonSets(Namespace).Get(verifyCtx, "longhorn-iscsi-installation", metav1.GetOptions{}) + if dsErr != nil { + t.Fatalf("DaemonSet not found: %v", dsErr) + } + if ds.Name != "longhorn-iscsi-installation" { + t.Errorf("DaemonSet name = %q, want %q", ds.Name, "longhorn-iscsi-installation") + } + if ds.Namespace != Namespace { + t.Errorf("DaemonSet namespace = %q, want %q", ds.Namespace, Namespace) + } + } + }) + } +} + +func TestWaitForDaemonSetReady(t *testing.T) { + tests := []struct { + name string + ds *appsv1.DaemonSet + wantErr bool + }{ + { + name: "returns immediately when DaemonSet is ready", + ds: &appsv1.DaemonSet{ + ObjectMeta: metav1.ObjectMeta{Name: "test-ds", Namespace: Namespace}, + Status: appsv1.DaemonSetStatus{DesiredNumberScheduled: 3, NumberReady: 3}, + }, + wantErr: false, + }, + { + name: "times out when DaemonSet is not ready", + ds: &appsv1.DaemonSet{ + ObjectMeta: metav1.ObjectMeta{Name: "test-ds", Namespace: Namespace}, + Status: appsv1.DaemonSetStatus{DesiredNumberScheduled: 3, NumberReady: 1}, + }, + wantErr: true, + }, + { + name: "times out when DesiredNumberScheduled is zero", + ds: &appsv1.DaemonSet{ + ObjectMeta: metav1.ObjectMeta{Name: "test-ds", Namespace: Namespace}, + Status: appsv1.DaemonSetStatus{DesiredNumberScheduled: 0, NumberReady: 0}, + }, + wantErr: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + ns := &corev1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: Namespace}} + client := fake.NewSimpleClientset(ns, tt.ds) //nolint:staticcheck + err := waitForDaemonSetReady(context.Background(), kubernetes.Interface(client), Namespace, tt.ds.Name, 1*time.Second) + if tt.wantErr && err == nil { + t.Fatal("expected timeout error, got nil") + } + if !tt.wantErr && err != nil { + t.Fatalf("unexpected error: %v", err) + } + }) + } +} + +func getNestedValue(m map[string]any, path string) any { + parts := splitDotPath(path) + var current any = m + for _, part := range parts { + cm, ok := current.(map[string]any) + if !ok { + return nil + } + current, ok = cm[part] + if !ok { + return nil + } + } + return current +} + +func splitDotPath(path string) []string { + var parts []string + start := 0 + for i := 0; i < len(path); i++ { + if path[i] == '.' { + parts = append(parts, path[start:i]) + start = i + 1 + } + } + parts = append(parts, path[start:]) + return parts +}