Skip to content

Commit

Permalink
support multiple ports on services
Browse files Browse the repository at this point in the history
  • Loading branch information
aojea committed Mar 31, 2024
1 parent f191e45 commit f5741fa
Show file tree
Hide file tree
Showing 2 changed files with 314 additions and 76 deletions.
141 changes: 65 additions & 76 deletions pkg/loadbalancer/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package loadbalancer
import (
"bytes"
"context"
"fmt"
"net"
"strconv"
"strings"
Expand All @@ -24,16 +25,15 @@ const proxyConfigPath = "/usr/local/etc/haproxy/haproxy.cfg"

// proxyConfigData is supplied to the loadbalancer config template
type proxyConfigData struct {
ServicePorts []string
HealthCheckPort int
BackendServers map[string][]backendServer
IPv6 bool
IPv4 bool
HealthCheckPort int // is the same for all ServicePorts
ServicePorts map[string]data // key is the IP family and Port to support MultiPort services
}

type backendServer struct {
Name string
Address string
type data struct {
// frontend
BindAddress string // *:Port for IPv4 :::Port for IPv6
// backend
Backends map[string]string // key: node name value: IP:Port
}

// proxyDefaultConfigTemplate is the loadbalancer config template
Expand All @@ -57,41 +57,17 @@ defaults
# allow to boot despite dns don't resolve backends
default-server init-addr none
{{- if .IPv4}}
frontend ipv4-frontend
{{- $bind := "*:" }}
{{- range $index, $port := .ServicePorts }}
bind {{$bind}}{{ $port }}
{{- end}}
default_backend ipv4-backend
{{end}}
{{- if .IPv6}}
frontend ipv6-frontend
{{- $bind := ":::"}}
{{- range $index, $port := .ServicePorts }}
bind {{$bind}}{{ $port }}
{{- end}}
default_backend ipv6-backend
{{end}}
{{- if .IPv4}}
backend ipv4-backend
option httpchk GET /healthz
{{- $hcport := .HealthCheckPort -}}
{{- range $i, $server := index .BackendServers "IPv4" }}
server {{ $server.Name }} {{ $server.Address }} check port {{ $hcport }} inter 5s fall 3 rise 1
{{- end}}
{{end}}
{{- if .IPv6}}
backend ipv6-backend
option httpchk GET /healthz
{{- $hcport := .HealthCheckPort -}}
{{- range $i, $server := index .BackendServers "IPv6" }}
server {{ $server.Name }} {{ $server.Address }} check port {{ $hcport }} inter 5s fall 3 rise 1
{{- end}}
{{- end}}
{{ range $index, $data := .ServicePorts }}
frontend {{$index}}-frontend
bind {{ $data.BindAddress }}
default_backend {{$index}}-backend
backend {{$index}}-backend
option httpchk GET /healthz
{{- range $server, $address := $data.Backends }}
server {{ $server }} {{ $address }} check port {{ $.HealthCheckPort }} inter 5s fall 3 rise 1
{{- end}}
{{ end }}
`

// proxyConfig returns a kubeadm config generated from config data, in particular
Expand All @@ -110,52 +86,64 @@ func proxyConfig(data *proxyConfigData) (config string, err error) {
return buff.String(), nil
}

func proxyUpdateLoadBalancer(ctx context.Context, clusterName string, service *v1.Service, nodes []*v1.Node) error {
name := loadBalancerName(clusterName, service)
func generateConfig(service *v1.Service, nodes []*v1.Node) *proxyConfigData {
if service == nil {
return nil
}
config := &proxyConfigData{
HealthCheckPort: 10256, // kube-proxy default port
BackendServers: map[string][]backendServer{},
ServicePorts: []string{},
}

config.IPv6 = len(service.Spec.IPFamilies) == 2 || service.Spec.IPFamilies[0] == v1.IPv6Protocol
config.IPv4 = len(service.Spec.IPFamilies) == 2 || service.Spec.IPFamilies[0] == v1.IPv4Protocol
hcPort := 10256 // kube-proxy default port
if service.Spec.ExternalTrafficPolicy == v1.ServiceExternalTrafficPolicyTypeLocal {
config.HealthCheckPort = int(service.Spec.HealthCheckNodePort)
hcPort = int(service.Spec.HealthCheckNodePort)
}

backends := map[string][]backendServer{}
for _, n := range nodes {
for _, addr := range n.Status.Addresses {
if addr.Type == v1.NodeInternalIP {
if netutils.IsIPv4String(addr.Address) {
backends[string(v1.IPv4Protocol)] = append(backends[string(v1.IPv4Protocol)], backendServer{Name: n.Name, Address: addr.Address})
} else if netutils.IsIPv6String(addr.Address) {
backends[string(v1.IPv6Protocol)] = append(backends[string(v1.IPv6Protocol)], backendServer{Name: n.Name, Address: addr.Address})
}
}
}
lbConfig := &proxyConfigData{
HealthCheckPort: hcPort,
}

// TODO: support UDP
for _, port := range service.Spec.Ports {
if port.Protocol != v1.ProtocolTCP {
continue
}
config.ServicePorts = append(config.ServicePorts, strconv.Itoa(int(port.Port)))
servicePortConfig := map[string]data{}
for _, ipFamily := range service.Spec.IPFamilies {
// TODO: support UDP
for _, port := range service.Spec.Ports {
if port.Protocol != v1.ProtocolTCP {
klog.Infof("service port protocol %s not supported", port.Protocol)
continue
}
key := fmt.Sprintf("%s_%d", string(ipFamily), port.Port)
bind := `*`
if ipFamily == v1.IPv6Protocol {
bind = `::`
}
servicePortConfig[key] = data{
BindAddress: fmt.Sprintf("%s:%d", bind, port.Port),
Backends: map[string]string{},
}

for _, be := range backends {
for i := range be {
be[i].Address = net.JoinHostPort(be[i].Address, strconv.Itoa(int(port.NodePort)))
for _, n := range nodes {
for _, addr := range n.Status.Addresses {
// only internal IPs supported
if addr.Type != v1.NodeInternalIP {
klog.V(2).Infof("address type %s, only %s supported", addr.Type, v1.NodeInternalIP)
continue
}
// only addresses that match the Service IP family
if (netutils.IsIPv4String(addr.Address) && ipFamily != v1.IPv4Protocol) ||
(netutils.IsIPv6String(addr.Address) && ipFamily != v1.IPv6Protocol) {
continue
}
servicePortConfig[key].Backends[n.Name] = net.JoinHostPort(addr.Address, strconv.Itoa(int(port.NodePort)))
}
}
}
}
config.BackendServers = backends
klog.V(2).Infof("backend servers info: %v", backends)
lbConfig.ServicePorts = servicePortConfig
klog.V(2).Infof("haproxy config info: %+v", lbConfig)
return lbConfig
}

func proxyUpdateLoadBalancer(ctx context.Context, clusterName string, service *v1.Service, nodes []*v1.Node) error {
if service == nil {
return nil
}
config := generateConfig(service, nodes)
// create loadbalancer config data
loadbalancerConfig, err := proxyConfig(config)
if err != nil {
Expand All @@ -164,6 +152,7 @@ func proxyUpdateLoadBalancer(ctx context.Context, clusterName string, service *v

klog.V(2).Infof("updating loadbalancer with config %s", loadbalancerConfig)
var stdout, stderr bytes.Buffer
name := loadBalancerName(clusterName, service)
err = container.Exec(name, []string{"cp", "/dev/stdin", proxyConfigPath}, strings.NewReader(loadbalancerConfig), &stdout, &stderr)
if err != nil {
return err
Expand Down
Loading

0 comments on commit f5741fa

Please sign in to comment.