Skip to content

Commit e43de18

Browse files
aojeaaroradaman
andcommitted
conntrack reconciler must check the dst port
The conntrack reconciler maintains the consistency between the conntrack table on each node and the desired state of Kubernetes UDP services. A valid entry matches a service's ClusterIP, LoadBalancerIP, or ExternalIP and Service port, or any ip matching a NodePort, and has a reverse source IP matching an active endpoint for that service. Other entries are deleted. Services without endpoints and traffic not handled by kube-proxy are ignored Co-authored-by: Daman Arora <[email protected]>
1 parent 807d22f commit e43de18

File tree

3 files changed

+345
-77
lines changed

3 files changed

+345
-77
lines changed

pkg/proxy/conntrack/cleanup.go

Lines changed: 44 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@ package conntrack
2121

2222
import (
2323
"errors"
24+
"net"
25+
"strconv"
2426
"time"
2527

2628
"github.com/vishvananda/netlink"
@@ -34,8 +36,14 @@ import (
3436
netutils "k8s.io/utils/net"
3537
)
3638

39+
// Kubernetes UDP services can be affected by stale conntrack entries.
40+
// These entries may point to endpoints that no longer exist,
41+
// leading to packet loss and connectivity problems.
42+
3743
// CleanStaleEntries scans conntrack table and removes any entries
3844
// for a service that do not correspond to a serving endpoint.
45+
// List existing conntrack entries and calculate the desired conntrack state
46+
// based on the current Services and Endpoints.
3947
func CleanStaleEntries(ct Interface, ipFamily v1.IPFamily,
4048
svcPortMap proxy.ServicePortMap, endpointsMap proxy.EndpointsMap) {
4149

@@ -52,7 +60,7 @@ func CleanStaleEntries(ct Interface, ipFamily v1.IPFamily,
5260
}
5361
}
5462

55-
// serviceIPEndpointIPs maps service IPs (ClusterIP, LoadBalancerIPs and ExternalIPs)
63+
// serviceIPEndpointIPs maps service IPs (ClusterIP, LoadBalancerIPs and ExternalIPs) and Service Port
5664
// to the set of serving endpoint IPs.
5765
serviceIPEndpointIPs := make(map[string]sets.Set[string])
5866
// serviceNodePortEndpointIPs maps service NodePort to the set of serving endpoint IPs.
@@ -79,14 +87,28 @@ func CleanStaleEntries(ct Interface, ipFamily v1.IPFamily,
7987
}
8088
}
8189

82-
serviceIPEndpointIPs[svc.ClusterIP().String()] = endpointIPs
90+
// a Service without endpoints does not require to clean the conntrack entries associated.
91+
if endpointIPs.Len() == 0 {
92+
continue
93+
}
94+
95+
// we need to filter entries that are directed to a Service IP:Port frontend
96+
// that does not have a backend as part of the endpoints IPs
97+
portStr := strconv.Itoa(svc.Port())
98+
// clusterIP:Port
99+
serviceIPEndpointIPs[net.JoinHostPort(svc.ClusterIP().String(), portStr)] = endpointIPs
100+
// loadbalancerIP:Port
83101
for _, loadBalancerIP := range svc.LoadBalancerVIPs() {
84-
serviceIPEndpointIPs[loadBalancerIP.String()] = endpointIPs
102+
serviceIPEndpointIPs[net.JoinHostPort(loadBalancerIP.String(), portStr)] = endpointIPs
85103
}
104+
// externalIP:Port
86105
for _, externalIP := range svc.ExternalIPs() {
87-
serviceIPEndpointIPs[externalIP.String()] = endpointIPs
106+
serviceIPEndpointIPs[net.JoinHostPort(externalIP.String(), portStr)] = endpointIPs
88107
}
108+
// we need to filter entries that are directed to a *:NodePort
109+
// that does not have a backend as part of the endpoints IPs
89110
if svc.NodePort() != 0 {
111+
// *:NodePort
90112
serviceNodePortEndpointIPs[svc.NodePort()] = endpointIPs
91113
}
92114
}
@@ -98,26 +120,25 @@ func CleanStaleEntries(ct Interface, ipFamily v1.IPFamily,
98120
continue
99121
}
100122

101-
origDst := entry.Forward.DstIP.String()
102-
origPortDst := int(entry.Forward.DstPort)
103-
replySrc := entry.Reverse.SrcIP.String()
123+
origDst := entry.Forward.DstIP.String() // match Service IP
124+
origPortDst := int(entry.Forward.DstPort) // match Service Port
125+
origPortDstStr := strconv.Itoa(origPortDst)
126+
replySrc := entry.Reverse.SrcIP.String() // match Serving Endpoint IP
104127

105128
// if the original destination (--orig-dst) of the entry is service IP (ClusterIP,
106-
// LoadBalancerIPs or ExternalIPs) and the reply source (--reply-src) is not IP of
107-
// any serving endpoint, we clear the entry.
108-
if _, ok := serviceIPEndpointIPs[origDst]; ok {
109-
if !serviceIPEndpointIPs[origDst].Has(replySrc) {
110-
filters = append(filters, filterForNAT(origDst, replySrc, v1.ProtocolUDP))
111-
}
129+
// LoadBalancerIPs or ExternalIPs) and (--orig-port-dst) of the flow is service Port
130+
// and the reply source (--reply-src) is not IP of any serving endpoint, we clear the entry.
131+
endpoints, ok := serviceIPEndpointIPs[net.JoinHostPort(origDst, origPortDstStr)]
132+
if ok && !endpoints.Has(replySrc) {
133+
filters = append(filters, filterForIPPortNAT(origDst, replySrc, entry.Forward.DstPort, v1.ProtocolUDP))
112134
}
113135

114136
// if the original port destination (--orig-port-dst) of the flow is service
115137
// NodePort and the reply source (--reply-src) is not IP of any serving endpoint,
116138
// we clear the entry.
117-
if _, ok := serviceNodePortEndpointIPs[origPortDst]; ok {
118-
if !serviceNodePortEndpointIPs[origPortDst].Has(replySrc) {
119-
filters = append(filters, filterForPortNAT(replySrc, origPortDst, v1.ProtocolUDP))
120-
}
139+
endpoints, ok = serviceNodePortEndpointIPs[origPortDst]
140+
if ok && !endpoints.Has(replySrc) {
141+
filters = append(filters, filterForPortNAT(replySrc, origPortDst, v1.ProtocolUDP))
121142
}
122143
}
123144

@@ -145,14 +166,16 @@ var protocolMap = map[v1.Protocol]uint8{
145166
v1.ProtocolSCTP: unix.IPPROTO_SCTP,
146167
}
147168

148-
// filterForNAT returns *conntrackFilter to delete the conntrack entries for connections
149-
// specified by the destination IP (original direction) and source IP (reply direction).
150-
func filterForNAT(origin, dest string, protocol v1.Protocol) *conntrackFilter {
169+
// filterForIPPortNAT returns *conntrackFilter to delete the conntrack entries for connections
170+
// specified by the destination IP (original direction) and destination port (original direction)
171+
// and source IP (reply direction).
172+
func filterForIPPortNAT(origin, dest string, dstPort uint16, protocol v1.Protocol) *conntrackFilter {
151173
klog.V(6).InfoS("Adding conntrack filter for cleanup", "org-dst", origin, "reply-src", dest, "protocol", protocol)
152174
return &conntrackFilter{
153175
protocol: protocolMap[protocol],
154176
original: &connectionTuple{
155-
dstIP: netutils.ParseIPSloppy(origin),
177+
dstIP: netutils.ParseIPSloppy(origin),
178+
dstPort: dstPort,
156179
},
157180
reply: &connectionTuple{
158181
srcIP: netutils.ParseIPSloppy(dest),

0 commit comments

Comments
 (0)