Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cmd/microshift/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,5 +42,6 @@ func newCommand() *cobra.Command {
cmd.AddCommand(cmds.NewRestoreCommand())
cmd.AddCommand(cmds.NewHealthcheckCommand())
cmd.AddCommand(cmds.NewAddNodeCommand())
cmd.AddCommand(cmds.NewPromoteLearnerCommand())
return cmd
}
14 changes: 6 additions & 8 deletions pkg/cmd/addnode.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ func runAddNode(ctx context.Context, opts *AddNodeOptions) error {
}

nodeName := cfg.CanonicalNodeName()
if isNodeAlreadyInCluster(ctx, client, nodeName) {
if isNodeInKubernetesCluster(ctx, client, nodeName) {
klog.Infof("Node %s is already part of the cluster. Skipping join process.", nodeName)
return nil
}
Expand All @@ -126,12 +126,12 @@ func runAddNode(ctx context.Context, opts *AddNodeOptions) error {
}
klog.Info("Etcd certificates generated successfully")

clusterMembers, err := getClusterNodes(ctx, client)
etcdMembers, err := getEtcdClusterNodes(ctx, client)
if err != nil {
return fmt.Errorf("failed to get cluster information: %w", err)
}

if err := configureEtcdForCluster(ctx, cfg, clusterMembers, opts.Learner); err != nil {
if err := configureEtcdForCluster(ctx, cfg, etcdMembers, opts.Learner); err != nil {
return fmt.Errorf("failed to configure etcd for cluster: %w", err)
}

Expand Down Expand Up @@ -374,17 +374,14 @@ func generateEtcdCertificates(cfg *config.Config) error {
return nil
}

func getClusterNodes(ctx context.Context, client kubernetes.Interface) ([]string, error) {
func getEtcdClusterNodes(ctx context.Context, client kubernetes.Interface) ([]string, error) {
nodes, err := client.CoreV1().Nodes().List(ctx, metav1.ListOptions{})
if err != nil {
return nil, fmt.Errorf("failed to list nodes: %w", err)
}

var members []string
for _, node := range nodes.Items {
if !isNodeReady(&node) {
continue
}
nodeIP := ""
for _, addr := range node.Status.Addresses {
if addr.Type == corev1.NodeInternalIP {
Expand All @@ -393,6 +390,7 @@ func getClusterNodes(ctx context.Context, client kubernetes.Interface) ([]string
}
}
if nodeIP != "" {
//TODO net.JoinHostPort
members = append(members, fmt.Sprintf("%s=https://%s:2380", node.Name, nodeIP))
}
}
Expand Down Expand Up @@ -532,7 +530,7 @@ func restartMicroShift() error {
return nil
}

func isNodeAlreadyInCluster(ctx context.Context, client kubernetes.Interface, nodeName string) bool {
func isNodeInKubernetesCluster(ctx context.Context, client kubernetes.Interface, nodeName string) bool {
_, err := client.CoreV1().Nodes().Get(ctx, nodeName, metav1.GetOptions{})
if err != nil {
return false
Expand Down
95 changes: 95 additions & 0 deletions pkg/cmd/promotelearner.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
package cmd

import (
"context"
"fmt"

"github.com/openshift/microshift/pkg/config"
"github.com/openshift/microshift/pkg/controllers"
"github.com/openshift/microshift/pkg/version"
"github.com/spf13/cobra"

"k8s.io/klog/v2"
)

func NewPromoteLearnerCommand() *cobra.Command {
cmd := &cobra.Command{
Use: "promote-learner",
Short: "Promote learner node to member in the etcd cluster",
Long: `This command promotes the local etcd instance from a learner node to a full voting member within an existing MicroShift etcd cluster.
It:
- Connects to the etcd cluster using the current node's configuration.
- Verifies that the local etcd instance is currently a learner.
- Issues a promote request to the cluster.
- Restarts the MicroShift service to make the membership change effective to apiserver.
Use this command only after the learner node has fully caught up with the cluster and you are ready for it to become a voting member.
`,
RunE: func(cmd *cobra.Command, args []string) error {
return runPromoteLearner(cmd.Context())
},
}

if version.Get().BuildVariant != version.BuildVariantCommunity {
cmd.Hidden = true
}

return cmd
}

func runPromoteLearner(ctx context.Context) error {
klog.Info("Starting learner promotion process")

cfg, err := config.ActiveConfig()
if err != nil {
return fmt.Errorf("failed to load MicroShift configuration: %w", err)
}

klog.Info("Promoting etcd learner to member")
if err := promoteEtcdLearner(ctx, cfg); err != nil {
return fmt.Errorf("failed to promote etcd learner: %w", err)
}

klog.Info("etcd node successfully promoted to member. Restart MicroShift service")

return nil
}

func promoteEtcdLearner(ctx context.Context, cfg *config.Config) error {
client, err := controllers.GetClusterEtcdClient(ctx, cfg.KubeConfigPath(config.KubeAdmin))
if err != nil {
return fmt.Errorf("failed to create etcd client: %v", err)
}
defer func() { _ = client.Close() }()

memberResponse, err := client.MemberList(ctx)
if err != nil {
return fmt.Errorf("failed to list etcd members: %v", err)
}

found, learner := false, false
var id uint64 = 0

for _, member := range memberResponse.Members {
if member.Name == cfg.CanonicalNodeName() {
found = true
if member.IsLearner {
learner = true
id = member.ID
}
}
}

if !found {
return fmt.Errorf("node %s is not in the etcd cluster", cfg.CanonicalNodeName())
}
if !learner {
return fmt.Errorf("node %s is not a learner", cfg.CanonicalNodeName())
}

response, err := client.MemberPromote(ctx, id)
if err != nil {
return fmt.Errorf("failed to promote etcd learner: %v", err)
}
klog.Infof("Successfully promoted etcd learner %s with response: %v", cfg.CanonicalNodeName(), response)
return nil
}
84 changes: 84 additions & 0 deletions pkg/controllers/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,18 @@ import (
"context"
"errors"
"fmt"
"net"
"os"
"os/exec"
"path/filepath"
"time"

"github.com/openshift/microshift/pkg/config"
"github.com/openshift/microshift/pkg/util/cryptomaterial"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/clientcmd"
klog "k8s.io/klog/v2"

"go.etcd.io/etcd/client/pkg/v3/transport"
Expand Down Expand Up @@ -251,3 +256,82 @@ func getEtcdClient(ctx context.Context) (*clientv3.Client, error) {
}
return cli, nil
}

// GetClusterEtcdClient creates a new etcd client for the cluster.
// It uses the kubeconfig to list the nodes in the cluster to test which ones are learners
// and then creates a new client with voting members only.
func GetClusterEtcdClient(ctx context.Context, kubeConfigPath string) (*clientv3.Client, error) {
certsDir := cryptomaterial.CertsDirectory(config.DataDir)
etcdAPIServerClientCertDir := cryptomaterial.EtcdAPIServerClientCertDir(certsDir)

tlsInfo := transport.TLSInfo{
CertFile: cryptomaterial.ClientCertPath(etcdAPIServerClientCertDir),
KeyFile: cryptomaterial.ClientKeyPath(etcdAPIServerClientCertDir),
TrustedCAFile: cryptomaterial.CACertPath(cryptomaterial.EtcdSignerDir(certsDir)),
}
tlsConfig, err := tlsInfo.ClientConfig()
if err != nil {
return nil, err
}

client, err := clientv3.New(clientv3.Config{
Endpoints: []string{"https://localhost:2379"},
DialTimeout: 5 * time.Second,
TLS: tlsConfig,
Context: ctx,
})
if err != nil {
return nil, err
}
defer func() { _ = client.Close() }()

restConfig, err := clientcmd.BuildConfigFromFlags("", kubeConfigPath)
if err != nil {
return nil, fmt.Errorf("failed to load kubeconfig from %s: %v", kubeConfigPath, err)
}
adminClient, err := kubernetes.NewForConfig(restConfig)
if err != nil {
return nil, fmt.Errorf("failed to create admin kubernetes client: %w", err)
}

nodes, err := adminClient.CoreV1().Nodes().List(ctx, metav1.ListOptions{})
if err != nil {
return nil, fmt.Errorf("failed to list nodes: %w", err)
}

var memberEndpoints []string
for _, node := range nodes.Items {
var nodeIP string
for _, addr := range node.Status.Addresses {
if addr.Type == corev1.NodeInternalIP {
nodeIP = addr.Address
break
}
}
if nodeIP == "" {
continue
}
endpoint := net.JoinHostPort(nodeIP, "2379")
status, err := client.Status(ctx, endpoint)
if err != nil {
continue
}
if status != nil && !status.IsLearner {
memberEndpoints = append(memberEndpoints, fmt.Sprintf("https://%s", endpoint))
}
}
if len(memberEndpoints) == 0 {
memberEndpoints = []string{"https://localhost:2379"}
}

clusterClient, err := clientv3.New(clientv3.Config{
Endpoints: memberEndpoints,
DialTimeout: 5 * time.Second,
TLS: tlsConfig,
Context: ctx,
})
if err != nil {
return nil, err
}
return clusterClient, nil
}
Loading