diff --git a/cmd/app.go b/cmd/app.go index 49354a5d..bb6985b1 100644 --- a/cmd/app.go +++ b/cmd/app.go @@ -6,7 +6,9 @@ import ( "fmt" "os" "os/signal" + "runtime" "strconv" + "strings" "syscall" "k8s.io/component-base/logs" @@ -14,6 +16,7 @@ import ( "sigs.k8s.io/cloud-provider-kind/pkg/config" "sigs.k8s.io/cloud-provider-kind/pkg/controller" + "sigs.k8s.io/kind/pkg/cluster" kindcmd "sigs.k8s.io/kind/pkg/cmd" ) @@ -98,5 +101,32 @@ func Main() { klog.Infof("**** Dumping load balancers logs to: %s", logDumpDir) } - controller.New(logger).Run(ctx) + // some platforms require to enable tunneling for the LoadBalancers + if runtime.GOOS == "darwin" || runtime.GOOS == "windows" || isWSL2() { + config.DefaultConfig.LoadBalancerConnectivity = config.Tunnel + } + + // default control plane connectivity to portmap, it will be + // overriden if the first cluster added detects direct + // connecitivity + config.DefaultConfig.ControlPlaneConnectivity = config.Portmap + + // initialize kind provider + option, err := cluster.DetectNodeProvider() + if err != nil { + klog.Fatalf("can not detect cluster provider: %v", err) + } + kindProvider := cluster.NewProvider( + option, + cluster.ProviderWithLogger(logger), + ) + controller.New(kindProvider).Run(ctx) +} + +func isWSL2() bool { + if v, err := os.ReadFile("/proc/version"); err == nil { + return strings.Contains(string(v), "WSL2") + } + + return false } diff --git a/pkg/config/config.go b/pkg/config/config.go index ce45b1c2..0943701e 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -7,4 +7,20 @@ var DefaultConfig = &Config{} type Config struct { EnableLogDump bool LogDir string + // Platforms like Mac or Windows can not access the containers directly + // so we do a double hop, enable container portmapping for the LoadBalancer containter + // and do userspace proxying from the original port to the portmaps. + // If the cloud-provider-kind runs in a container on these platforms only enables portmapping. + LoadBalancerConnectivity Connectivity + // Type of connectivity between the cloud-provider-kind and the clusters + ControlPlaneConnectivity Connectivity } + +type Connectivity int + +const ( + Unknown Connectivity = iota + Direct + Portmap + Tunnel +) diff --git a/pkg/controller/controller.go b/pkg/controller/controller.go index 329ae85c..e474a363 100644 --- a/pkg/controller/controller.go +++ b/pkg/controller/controller.go @@ -6,6 +6,7 @@ import ( "fmt" "io" "net/http" + "sync" "time" "k8s.io/apimachinery/pkg/util/sets" @@ -20,14 +21,16 @@ import ( controllersmetrics "k8s.io/component-base/metrics/prometheus/controllers" ccmfeatures "k8s.io/controller-manager/pkg/features" "k8s.io/klog/v2" + cpkconfig "sigs.k8s.io/cloud-provider-kind/pkg/config" "sigs.k8s.io/cloud-provider-kind/pkg/constants" "sigs.k8s.io/cloud-provider-kind/pkg/container" "sigs.k8s.io/cloud-provider-kind/pkg/loadbalancer" "sigs.k8s.io/cloud-provider-kind/pkg/provider" "sigs.k8s.io/kind/pkg/cluster" - "sigs.k8s.io/kind/pkg/log" ) +var once sync.Once + type Controller struct { kind *cluster.Provider clusters map[string]*ccm @@ -40,12 +43,10 @@ type ccm struct { cancelFn context.CancelFunc } -func New(logger log.Logger) *Controller { +func New(provider *cluster.Provider) *Controller { controllersmetrics.Register() return &Controller{ - kind: cluster.NewProvider( - cluster.ProviderWithLogger(logger), - ), + kind: provider, clusters: make(map[string]*ccm), } } @@ -108,9 +109,8 @@ func (c *Controller) Run(ctx context.Context) { } } -// getKubeClient returns a kubeclient depending if the ccm runs inside a container -// inside the same docker network that the kind cluster or run externally in the host -// It tries first to connect to the external endpoint +// getKubeClient returns a kubeclient for the cluster passed as argument +// It tries first to connect to the internal endpoint. func (c *Controller) getKubeClient(ctx context.Context, cluster string) (kubernetes.Interface, error) { httpClient := &http.Client{ Timeout: 5 * time.Second, @@ -118,8 +118,8 @@ func (c *Controller) getKubeClient(ctx context.Context, cluster string) (kuberne TLSClientConfig: &tls.Config{InsecureSkipVerify: true}, }, } - // try internal first - for _, internal := range []bool{false, true} { + // prefer internal (direct connectivity) over no-internal (commonly portmap) + for _, internal := range []bool{true, false} { kconfig, err := c.kind.KubeConfig(cluster, internal) if err != nil { klog.Errorf("Failed to get kubeconfig for cluster %s: %v", cluster, err) @@ -157,6 +157,14 @@ func (c *Controller) getKubeClient(ctx context.Context, cluster string) (kuberne klog.Errorf("Failed to create kubeClient for cluster %s: %v", cluster, err) continue } + // the first cluster will give us the type of connectivity between + // cloud-provider-kind and the clusters and load balancer containers. + // In Linux or containerized cloud-provider-kind this will be direct. + once.Do(func() { + if internal { + cpkconfig.DefaultConfig.ControlPlaneConnectivity = cpkconfig.Direct + } + }) return kubeClient, err } return nil, fmt.Errorf("can not find a working kubernetes clientset") diff --git a/pkg/loadbalancer/proxy.go b/pkg/loadbalancer/proxy.go index e239ef4f..17eef488 100644 --- a/pkg/loadbalancer/proxy.go +++ b/pkg/loadbalancer/proxy.go @@ -5,6 +5,7 @@ import ( "context" "fmt" "io" + "net" "net/http" "strconv" "strings" @@ -17,6 +18,7 @@ import ( "k8s.io/klog/v2" netutils "k8s.io/utils/net" + "sigs.k8s.io/cloud-provider-kind/pkg/config" "sigs.k8s.io/cloud-provider-kind/pkg/container" ) @@ -319,15 +321,26 @@ func waitLoadBalancerReady(ctx context.Context, name string, timeout time.Durati if err != nil { return err } - port, ok := portmaps[strconv.Itoa(envoyAdminPort)] - if !ok { - return fmt.Errorf("envoy admin port %d not found, got %v", envoyAdminPort, portmaps) + + var authority string + if config.DefaultConfig.ControlPlaneConnectivity == config.Direct { + ipv4, _, err := container.IPs(name) + if err != nil { + return err + } + authority = net.JoinHostPort(ipv4, strconv.Itoa(envoyAdminPort)) + } else { + port, ok := portmaps[strconv.Itoa(envoyAdminPort)] + if !ok { + return fmt.Errorf("envoy admin port %d not found, got %v", envoyAdminPort, portmaps) + } + authority = net.JoinHostPort("127.0.0.1", port) } httpClient := http.DefaultClient err = wait.PollUntilContextTimeout(ctx, 1*time.Second, timeout, true, func(ctx context.Context) (done bool, err error) { // iptables port forwarding on localhost only works for IPv4 - resp, err := httpClient.Get(fmt.Sprintf("http://127.0.0.1:%s/ready", port)) + resp, err := httpClient.Get(fmt.Sprintf("http://%s/ready", authority)) if err != nil { klog.V(2).Infof("unexpected error trying to get load balancer %s readiness :%v", name, err) return false, nil diff --git a/pkg/loadbalancer/server.go b/pkg/loadbalancer/server.go index 3075c8fb..b827073d 100644 --- a/pkg/loadbalancer/server.go +++ b/pkg/loadbalancer/server.go @@ -8,7 +8,6 @@ import ( "fmt" "os" "path" - "runtime" "strings" v1 "k8s.io/api/core/v1" @@ -29,20 +28,13 @@ var _ cloudprovider.LoadBalancer = &Server{} func NewServer() cloudprovider.LoadBalancer { s := &Server{} - if runtime.GOOS == "darwin" || runtime.GOOS == "windows" || isWSL2() { + + if config.DefaultConfig.LoadBalancerConnectivity == config.Tunnel { s.tunnelManager = NewTunnelManager() } return s } -func isWSL2() bool { - if v, err := os.ReadFile("/proc/version"); err == nil { - return strings.Contains(string(v), "WSL2") - } - - return false -} - func (s *Server) GetLoadBalancer(ctx context.Context, clusterName string, service *v1.Service) (*v1.LoadBalancerStatus, bool, error) { // report status name := loadBalancerName(clusterName, service)