@@ -21,6 +21,8 @@ package conntrack
21
21
22
22
import (
23
23
"errors"
24
+ "net"
25
+ "strconv"
24
26
"time"
25
27
26
28
"github.com/vishvananda/netlink"
@@ -34,8 +36,14 @@ import (
34
36
netutils "k8s.io/utils/net"
35
37
)
36
38
39
+ // Kubernetes UDP services can be affected by stale conntrack entries.
40
+ // These entries may point to endpoints that no longer exist,
41
+ // leading to packet loss and connectivity problems.
42
+
37
43
// CleanStaleEntries scans conntrack table and removes any entries
38
44
// for a service that do not correspond to a serving endpoint.
45
+ // List existing conntrack entries and calculate the desired conntrack state
46
+ // based on the current Services and Endpoints.
39
47
func CleanStaleEntries (ct Interface , ipFamily v1.IPFamily ,
40
48
svcPortMap proxy.ServicePortMap , endpointsMap proxy.EndpointsMap ) {
41
49
@@ -52,7 +60,7 @@ func CleanStaleEntries(ct Interface, ipFamily v1.IPFamily,
52
60
}
53
61
}
54
62
55
- // serviceIPEndpointIPs maps service IPs (ClusterIP, LoadBalancerIPs and ExternalIPs)
63
+ // serviceIPEndpointIPs maps service IPs (ClusterIP, LoadBalancerIPs and ExternalIPs) and Service Port
56
64
// to the set of serving endpoint IPs.
57
65
serviceIPEndpointIPs := make (map [string ]sets.Set [string ])
58
66
// serviceNodePortEndpointIPs maps service NodePort to the set of serving endpoint IPs.
@@ -79,14 +87,28 @@ func CleanStaleEntries(ct Interface, ipFamily v1.IPFamily,
79
87
}
80
88
}
81
89
82
- serviceIPEndpointIPs [svc .ClusterIP ().String ()] = endpointIPs
90
+ // a Service without endpoints does not require to clean the conntrack entries associated.
91
+ if endpointIPs .Len () == 0 {
92
+ continue
93
+ }
94
+
95
+ // we need to filter entries that are directed to a Service IP:Port frontend
96
+ // that does not have a backend as part of the endpoints IPs
97
+ portStr := strconv .Itoa (svc .Port ())
98
+ // clusterIP:Port
99
+ serviceIPEndpointIPs [net .JoinHostPort (svc .ClusterIP ().String (), portStr )] = endpointIPs
100
+ // loadbalancerIP:Port
83
101
for _ , loadBalancerIP := range svc .LoadBalancerVIPs () {
84
- serviceIPEndpointIPs [loadBalancerIP .String ()] = endpointIPs
102
+ serviceIPEndpointIPs [net . JoinHostPort ( loadBalancerIP .String (), portStr )] = endpointIPs
85
103
}
104
+ // externalIP:Port
86
105
for _ , externalIP := range svc .ExternalIPs () {
87
- serviceIPEndpointIPs [externalIP .String ()] = endpointIPs
106
+ serviceIPEndpointIPs [net . JoinHostPort ( externalIP .String (), portStr )] = endpointIPs
88
107
}
108
+ // we need to filter entries that are directed to a *:NodePort
109
+ // that does not have a backend as part of the endpoints IPs
89
110
if svc .NodePort () != 0 {
111
+ // *:NodePort
90
112
serviceNodePortEndpointIPs [svc .NodePort ()] = endpointIPs
91
113
}
92
114
}
@@ -98,26 +120,25 @@ func CleanStaleEntries(ct Interface, ipFamily v1.IPFamily,
98
120
continue
99
121
}
100
122
101
- origDst := entry .Forward .DstIP .String ()
102
- origPortDst := int (entry .Forward .DstPort )
103
- replySrc := entry .Reverse .SrcIP .String ()
123
+ origDst := entry .Forward .DstIP .String () // match Service IP
124
+ origPortDst := int (entry .Forward .DstPort ) // match Service Port
125
+ origPortDstStr := strconv .Itoa (origPortDst )
126
+ replySrc := entry .Reverse .SrcIP .String () // match Serving Endpoint IP
104
127
105
128
// if the original destination (--orig-dst) of the entry is service IP (ClusterIP,
106
- // LoadBalancerIPs or ExternalIPs) and the reply source (--reply-src) is not IP of
107
- // any serving endpoint, we clear the entry.
108
- if _ , ok := serviceIPEndpointIPs [origDst ]; ok {
109
- if ! serviceIPEndpointIPs [origDst ].Has (replySrc ) {
110
- filters = append (filters , filterForNAT (origDst , replySrc , v1 .ProtocolUDP ))
111
- }
129
+ // LoadBalancerIPs or ExternalIPs) and (--orig-port-dst) of the flow is service Port
130
+ // and the reply source (--reply-src) is not IP of any serving endpoint, we clear the entry.
131
+ endpoints , ok := serviceIPEndpointIPs [net .JoinHostPort (origDst , origPortDstStr )]
132
+ if ok && ! endpoints .Has (replySrc ) {
133
+ filters = append (filters , filterForIPPortNAT (origDst , replySrc , entry .Forward .DstPort , v1 .ProtocolUDP ))
112
134
}
113
135
114
136
// if the original port destination (--orig-port-dst) of the flow is service
115
137
// NodePort and the reply source (--reply-src) is not IP of any serving endpoint,
116
138
// we clear the entry.
117
- if _ , ok := serviceNodePortEndpointIPs [origPortDst ]; ok {
118
- if ! serviceNodePortEndpointIPs [origPortDst ].Has (replySrc ) {
119
- filters = append (filters , filterForPortNAT (replySrc , origPortDst , v1 .ProtocolUDP ))
120
- }
139
+ endpoints , ok = serviceNodePortEndpointIPs [origPortDst ]
140
+ if ok && ! endpoints .Has (replySrc ) {
141
+ filters = append (filters , filterForPortNAT (replySrc , origPortDst , v1 .ProtocolUDP ))
121
142
}
122
143
}
123
144
@@ -145,14 +166,16 @@ var protocolMap = map[v1.Protocol]uint8{
145
166
v1 .ProtocolSCTP : unix .IPPROTO_SCTP ,
146
167
}
147
168
148
- // filterForNAT returns *conntrackFilter to delete the conntrack entries for connections
149
- // specified by the destination IP (original direction) and source IP (reply direction).
150
- func filterForNAT (origin , dest string , protocol v1.Protocol ) * conntrackFilter {
169
+ // filterForIPPortNAT returns *conntrackFilter to delete the conntrack entries for connections
170
+ // specified by the destination IP (original direction) and destination port (original direction)
171
+ // and source IP (reply direction).
172
+ func filterForIPPortNAT (origin , dest string , dstPort uint16 , protocol v1.Protocol ) * conntrackFilter {
151
173
klog .V (6 ).InfoS ("Adding conntrack filter for cleanup" , "org-dst" , origin , "reply-src" , dest , "protocol" , protocol )
152
174
return & conntrackFilter {
153
175
protocol : protocolMap [protocol ],
154
176
original : & connectionTuple {
155
- dstIP : netutils .ParseIPSloppy (origin ),
177
+ dstIP : netutils .ParseIPSloppy (origin ),
178
+ dstPort : dstPort ,
156
179
},
157
180
reply : & connectionTuple {
158
181
srcIP : netutils .ParseIPSloppy (dest ),
0 commit comments