diff --git a/pkg/controller/workload/bpfcache/endpoint.go b/pkg/controller/workload/bpfcache/endpoint.go index 434cd2b6c..04ab36847 100644 --- a/pkg/controller/workload/bpfcache/endpoint.go +++ b/pkg/controller/workload/bpfcache/endpoint.go @@ -18,6 +18,7 @@ package bpfcache import ( "errors" + "fmt" "github.com/cilium/ebpf" "istio.io/istio/pkg/util/sets" @@ -69,22 +70,34 @@ func (c *Cache) EndpointDelete(key *EndpointKey) error { return err } -// EndpointSwap update the last endpoint index and remove the current endpoint -func (c *Cache) EndpointSwap(currentIndex, lastIndex uint32, serviceId uint32, prio uint32) error { +// EndpointSwap replaces the current endpoint with the last endpoint +func (c *Cache) EndpointSwap(currentIndex, backendUid, lastIndex uint32, serviceId uint32, prio uint32) error { + if currentIndex > lastIndex { + return fmt.Errorf("currentIndex %d > lastIndex %d", currentIndex, lastIndex) + } + var k EndpointKey // key for the last endpoint if currentIndex == lastIndex { - return c.EndpointDelete(&EndpointKey{ + if lastIndex <= 1 { + // if only one endpoint, do nothing + return nil + } + // update the last endpoint with lastIndex-1 endpoint + k = EndpointKey{ + ServiceId: serviceId, + Prio: prio, + BackendIndex: lastIndex - 1, + } + } else { + // update the current endpoint with last endpoint + k = EndpointKey{ ServiceId: serviceId, Prio: prio, BackendIndex: lastIndex, - }) - } - lastKey := &EndpointKey{ - ServiceId: serviceId, - Prio: prio, - BackendIndex: lastIndex, + } } + lastValue := &EndpointValue{} - if err := c.EndpointLookup(lastKey, lastValue); err != nil { + if err := c.EndpointLookup(&k, lastValue); err != nil { return err } @@ -93,29 +106,19 @@ func (c *Cache) EndpointSwap(currentIndex, lastIndex uint32, serviceId uint32, p Prio: prio, BackendIndex: currentIndex, } - currentValue := &EndpointValue{} - if err := c.EndpointLookup(currentKey, currentValue); err != nil { - return err - } - // update the last endpoint's index, in other word delete the current endpoint + // replace the current endpoint with the last endpoint if err := c.bpfMap.KmeshEndpoint.Update(currentKey, lastValue, ebpf.UpdateAny); err != nil { return err } - // delete the duplicate last endpoint - if err := c.bpfMap.KmeshEndpoint.Delete(lastKey); err != nil { - return err - } - // delete index for the current endpoint - c.endpointKeys[currentValue.BackendUid].Delete(*currentKey) - if len(c.endpointKeys[currentValue.BackendUid]) == 0 { - delete(c.endpointKeys, currentValue.BackendUid) + c.endpointKeys[backendUid].Delete(*currentKey) + if len(c.endpointKeys[backendUid]) == 0 { + delete(c.endpointKeys, backendUid) } - // update the last endpoint index - c.endpointKeys[lastValue.BackendUid].Delete(*lastKey) + // add another index for the last endpoint c.endpointKeys[lastValue.BackendUid].Insert(*currentKey) return nil } diff --git a/pkg/controller/workload/bpfcache/endpoint_test.go b/pkg/controller/workload/bpfcache/endpoint_test.go new file mode 100644 index 000000000..416af7f7a --- /dev/null +++ b/pkg/controller/workload/bpfcache/endpoint_test.go @@ -0,0 +1,86 @@ +/* + * Copyright The Kmesh Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package bpfcache + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestEndpointSwap(t *testing.T) { + workloadMap := NewFakeWorkloadMap(t) + defer CleanupFakeWorkloadMap(workloadMap) + + c := NewCache(workloadMap) + endpointsMap := map[*EndpointKey]*EndpointValue{ + {ServiceId: 1, Prio: 1, BackendIndex: 1}: {BackendUid: 1}, + {ServiceId: 1, Prio: 1, BackendIndex: 2}: {BackendUid: 2}, + {ServiceId: 1, Prio: 1, BackendIndex: 3}: {BackendUid: 3}, + {ServiceId: 1, Prio: 1, BackendIndex: 4}: {BackendUid: 4}, + {ServiceId: 1, Prio: 1, BackendIndex: 5}: {BackendUid: 5}, + } + for k, v := range endpointsMap { + c.EndpointUpdate(k, v) + } + + // invalid currentIndex + err := c.EndpointSwap(6, 5, 5, 1, 1) + assert.ErrorContains(t, err, "> lastIndex") + + // update mid element 3 with 5 -> 1 2 5 4 5 + err = c.EndpointSwap(3, 3, 5, 1, 1) + assert.Nil(t, err) + + lastKey := &EndpointKey{ + ServiceId: 1, + Prio: 1, + BackendIndex: 5, + } + // 1 2 5 4 5 -> 1 2 5 4 + if err := c.EndpointDelete(lastKey); err != nil { + t.Errorf("EndpointDelete [%#v] failed: %v", lastKey, err) + } + + // delete the last element 4 -> 1 2 5 5 + err = c.EndpointSwap(4, 4, 4, 1, 1) + assert.Nil(t, err) + + lastKey.BackendIndex = 4 + // 1 2 5 5 -> 1 2 5 + if err := c.EndpointDelete(lastKey); err != nil { + t.Errorf("EndpointDelete [%#v] failed: %v", lastKey, err) + } + + // delete the first element 1 2 5 -> 5 2 5 + err = c.EndpointSwap(1, 1, 3, 1, 1) + assert.Nil(t, err) + lastKey.BackendIndex = 3 + // 5 2 5 -> 5 2 + if err := c.EndpointDelete(lastKey); err != nil { + t.Errorf("EndpointDelete [%#v] failed: %v", lastKey, err) + } + + eps := c.GetAllEndpointsForService(1) + assert.Equal(t, len(eps), 2) + + assert.Equal(t, 2, len(c.endpointKeys)) + epKs := c.GetEndpointKeys(2) + assert.Equal(t, 1, epKs.Len()) + epKs = c.GetEndpointKeys(5) + assert.Equal(t, 1, epKs.Len()) +} diff --git a/pkg/controller/workload/cache/endpoint_cache.go b/pkg/controller/workload/cache/endpoint_cache.go index 41346e2d2..4ec63686c 100644 --- a/pkg/controller/workload/cache/endpoint_cache.go +++ b/pkg/controller/workload/cache/endpoint_cache.go @@ -30,7 +30,11 @@ type Endpoint struct { type EndpointCache interface { List(uint32) map[uint32]Endpoint // Endpoint slice by ServiceId AddEndpointToService(ep Endpoint, serviceID uint32) + // DeleteEndpoint delete a endpoint regardless of the priority DeleteEndpoint(workloadID, serviceID uint32) + // DeleteEndpointWithPriority delete a endpoint with given priority + DeleteEndpointWithPriority(serviceID, workloadID, prio uint32) + // DeleteEndpointByServiceId delete all endpoints belong to a given service DeleteEndpointByServiceId(uint32) } @@ -62,6 +66,16 @@ func (s *endpointCache) DeleteEndpoint(serviceID, workloadID uint32) { delete(s.endpointsByServiceId[serviceID], workloadID) } +func (s *endpointCache) DeleteEndpointWithPriority(serviceID, workloadID, prio uint32) { + s.mutex.Lock() + defer s.mutex.Unlock() + if s.endpointsByServiceId[serviceID] != nil { + if ep, ok := s.endpointsByServiceId[serviceID][workloadID]; ok && ep.Prio == prio { + delete(s.endpointsByServiceId[serviceID], workloadID) + } + } +} + func (s *endpointCache) DeleteEndpointByServiceId(serviceId uint32) { s.mutex.Lock() defer s.mutex.Unlock() diff --git a/pkg/controller/workload/workload_processor.go b/pkg/controller/workload/workload_processor.go index c8963af23..8421f1abd 100644 --- a/pkg/controller/workload/workload_processor.go +++ b/pkg/controller/workload/workload_processor.go @@ -309,24 +309,23 @@ func (p *Processor) removeServiceResourceFromBpfMap(svc *workloadapi.Service, na } // addWorkloadToService update service & endpoint bpf map when a workload has new bound services -func (p *Processor) addWorkloadToService(sk *bpf.ServiceKey, sv *bpf.ServiceValue, uid uint32, priority uint32) (error, bpf.EndpointKey) { +func (p *Processor) addWorkloadToService(sk *bpf.ServiceKey, sv *bpf.ServiceValue, workloadUid uint32, priority uint32) (error, bpf.EndpointKey) { var ( - err error - ek = bpf.EndpointKey{} - ev = bpf.EndpointValue{} + ek = bpf.EndpointKey{} + ev = bpf.EndpointValue{} ) sv.EndpointCount[priority]++ ek.BackendIndex = sv.EndpointCount[priority] ek.ServiceId = sk.ServiceId ek.Prio = priority - ev.BackendUid = uid - if err = p.bpf.EndpointUpdate(&ek, &ev); err != nil { + ev.BackendUid = workloadUid + if err := p.bpf.EndpointUpdate(&ek, &ev); err != nil { log.Errorf("Update endpoint map failed, err:%s", err) return err, ek } p.EndpointCache.AddEndpointToService(cache.Endpoint{ServiceId: ek.ServiceId, Prio: ek.Prio, BackendIndex: ek.BackendIndex}, ev.BackendUid) - if err = p.bpf.ServiceUpdate(sk, sv); err != nil { + if err := p.bpf.ServiceUpdate(sk, sv); err != nil { log.Errorf("Update ServiceUpdate map failed, err:%s", err) return err, ek } @@ -346,12 +345,11 @@ func (p *Processor) handleWorkloadUnboundServices(workload *workloadapi.Workload // handleWorkloadNewBoundServices handles when a workload's belonging services added func (p *Processor) handleWorkloadNewBoundServices(workload *workloadapi.Workload, newServices []uint32) error { var ( - err error - sk = bpf.ServiceKey{} - sv = bpf.ServiceValue{} + sk = bpf.ServiceKey{} + sv = bpf.ServiceValue{} ) - if newServices == nil { + if len(newServices) == 0 { return nil } @@ -360,9 +358,10 @@ func (p *Processor) handleWorkloadNewBoundServices(workload *workloadapi.Workloa for _, svcUid := range newServices { sk.ServiceId = svcUid // the service already stored in map, add endpoint - if err = p.bpf.ServiceLookup(&sk, &sv); err == nil { + if err := p.bpf.ServiceLookup(&sk, &sv); err == nil { if sv.LbPolicy == uint32(workloadapi.LoadBalancing_UNSPECIFIED_MODE) { // random mode - if err, _ = p.addWorkloadToService(&sk, &sv, workloadId, 0); err != nil { // In random mode, we save all workload to max priority + // In random mode, we save all workload to max priority group + if err, _ = p.addWorkloadToService(&sk, &sv, workloadId, 0); err != nil { log.Errorf("addWorkloadToService workload %d service %d failed: %v", workloadId, sk.ServiceId, err) return err } @@ -381,16 +380,15 @@ func (p *Processor) handleWorkloadNewBoundServices(workload *workloadapi.Workloa return nil } -func (p *Processor) updateWorkload(workload *workloadapi.Workload) error { +func (p *Processor) updateWorkloadInBackendMap(workload *workloadapi.Workload) error { var ( - err error - bk = bpf.BackendKey{} - bv = bpf.BackendValue{} - networkMode = workload.GetNetworkMode() + err error + bk = bpf.BackendKey{} + bv = bpf.BackendValue{} ) - uid := p.hashName.Hash(workload.GetUid()) - log.Debugf("updateWorkload: workload %s, backendUid: %v", workload.GetUid(), uid) + backendUid := p.hashName.Hash(workload.GetUid()) + log.Debugf("updateWorkloadInBackendMap: workload %s, backendUid: %v", workload.GetUid(), backendUid) if waypoint := workload.GetWaypoint(); waypoint != nil && waypoint.GetAddress() != nil { nets.CopyIpByteFromSlice(&bv.WaypointAddr, waypoint.GetAddress().Address) @@ -407,20 +405,30 @@ func (p *Processor) updateWorkload(workload *workloadapi.Workload) error { } for _, ip := range workload.GetAddresses() { - bk.BackendUid = uid + bk.BackendUid = backendUid nets.CopyIpByteFromSlice(&bv.Ip, ip) if err = p.bpf.BackendUpdate(&bk, &bv); err != nil { log.Errorf("Update backend map failed, err:%s", err) return err } + } + return nil +} - // we should not store frontend data of hostname network mode pods - // please see https://github.com/kmesh-net/kmesh/issues/631 - if networkMode != workloadapi.NetworkMode_HOST_NETWORK { - if err = p.storePodFrontendData(uid, ip); err != nil { - log.Errorf("storePodFrontendData failed, err:%s", err) - return err - } +func (p *Processor) updateWorkloadInFrontendMap(workload *workloadapi.Workload) error { + // we should not store frontend data of hostname network mode pods + // please see https://github.com/kmesh-net/kmesh/issues/631 + if workload.GetNetworkMode() == workloadapi.NetworkMode_HOST_NETWORK { + return nil + } + + backendUid := p.hashName.Hash(workload.GetUid()) + log.Debugf("updateWorkloadInFrontendMap: workload %s, backendUid: %v", workload.GetUid(), backendUid) + + for _, ip := range workload.GetAddresses() { + if err := p.storePodFrontendData(backendUid, ip); err != nil { + log.Errorf("storePodFrontendData failed, err:%s", err) + return err } } return nil @@ -454,6 +462,13 @@ func (p *Processor) handleWorkload(workload *workloadapi.Workload) error { return p.removeWorkloadFromBpfMap(workload.Uid) } + // 1. update workload in backend map + if err := p.updateWorkloadInBackendMap(workload); err != nil { + log.Errorf("updateWorkloadInBackendMap %s failed: %v", workload.Uid, err) + return err + } + + // 2~3. update workload in endpoint map and service map unboundedEndpointKeys, newServices := p.compareWorkloadServices(workload) if err := p.handleWorkloadUnboundServices(workload, unboundedEndpointKeys); err != nil { log.Errorf("handleWorkloadUnboundServices %s failed: %v", workload.ResourceName(), err) @@ -466,9 +481,9 @@ func (p *Processor) handleWorkload(workload *workloadapi.Workload) error { return err } - // update frontend and backend bpf map - if err := p.updateWorkload(workload); err != nil { - log.Errorf("updateWorkload %s failed: %v", workload.Uid, err) + // 4. update workload in frontend map + if err := p.updateWorkloadInFrontendMap(workload); err != nil { + log.Errorf("updateWorkloadInFrontendMap %s failed: %v", workload.Uid, err) return err } @@ -494,7 +509,7 @@ func (p *Processor) compareWorkloadServices(workload *workloadapi.Workload) ([]b return unboundedEndpointKeys, newServices } -func (p *Processor) storeServiceFrontendData(serviceId uint32, service *workloadapi.Service) error { +func (p *Processor) updateServiceFrontendMap(serviceId uint32, service *workloadapi.Service) error { var ( err error fk = bpf.FrontendKey{} @@ -505,7 +520,7 @@ func (p *Processor) storeServiceFrontendData(serviceId uint32, service *workload for _, networkAddress := range service.GetAddresses() { nets.CopyIpByteFromSlice(&fk.Ip, networkAddress.Address) if err = p.bpf.FrontendUpdate(&fk, &fv); err != nil { - log.Errorf("Update Frontend failed, err:%s", err) + log.Errorf("frontend map update err:%s", err) return err } } @@ -538,7 +553,7 @@ func (p *Processor) updateEndpointOneByOne(serviceId uint32, epsUpdate []cache.E } ev := bpf.EndpointValue{} if err := p.bpf.EndpointLookup(&ek, &ev); err != nil { // get backend Uid - return err + return fmt.Errorf("lookup endpoint %#v failed: %v", ek, err) } // Calc Priority @@ -557,28 +572,27 @@ func (p *Processor) updateEndpointOneByOne(serviceId uint32, epsUpdate []cache.E sKey := bpf.ServiceKey{ServiceId: serviceId} sValue := bpf.ServiceValue{} if err := p.bpf.ServiceLookup(&sKey, &sValue); err != nil { - log.Errorf("Lookup service %d failed: %v", sKey.ServiceId, err) + return fmt.Errorf("lookup service %v failed: %v", serviceId, err) } - // add ek first + // add ek first to another priority group if err, _ := p.addWorkloadToService(&sKey, &sValue, ev.BackendUid, prio); err != nil { - log.Errorf("addWorkloadToService workload %d service %d failed: %v", ev.BackendUid, sKey.ServiceId, err) - return err + return fmt.Errorf("update endpoint %d priority to %d failed: %v", ev.BackendUid, prio, err) } - eksDeletes := []bpfcache.EndpointKey{ek} - // delete ek - if err := p.deleteEndpointRecords(eksDeletes); err != nil { - log.Errorf("delete [%#v] from endpoint map failed: %s", eksDeletes, err) + epKeys := []bpfcache.EndpointKey{ek} + // delete ek from old priority group + if err := p.deleteEndpointRecords(epKeys); err != nil { + return fmt.Errorf("delete endpoint %d from old priority group %d failed: %v", ev.BackendUid, ek.Prio, err) } } return nil } -// updateEndpoint is called when service lb policy is changed to update the endpoint priority. +// updateEndpointPriority is called when service lb policy is changed to update the endpoint priority. // toLLb indicates whether we are performing a locality load balance update. // If toLLb is true, it means we need to calculate priority; otherwise, // it represents a random strategy, in which case we just set the priority to 0. -func (p *Processor) updateEndpoint(serviceId uint32, toLLb bool) error { +func (p *Processor) updateEndpointPriority(serviceId uint32, toLLb bool) error { endpoints := p.EndpointCache.List(serviceId) endpointSlice := make([]cache.Endpoint, 0, len(endpoints)) for _, endpoint := range endpoints { @@ -594,16 +608,17 @@ func (p *Processor) updateEndpoint(serviceId uint32, toLLb bool) error { } } -func (p *Processor) storeServiceData(serviceName string, waypoint *workloadapi.GatewayAddress, ports []*workloadapi.Port, lb *workloadapi.LoadBalancing) error { - var ( - err error - sk = bpf.ServiceKey{} - oldValue = bpf.ServiceValue{} - ) +func (p *Processor) updateServiceMap(service *workloadapi.Service) error { + sk := bpf.ServiceKey{} + oldValue := bpf.ServiceValue{} + newValue := bpf.ServiceValue{} - sk.ServiceId = p.hashName.Hash(serviceName) + serviceName := service.ResourceName() + waypoint := service.Waypoint + ports := service.Ports + lb := service.LoadBalancing - newValue := bpf.ServiceValue{} + sk.ServiceId = p.hashName.Hash(serviceName) newValue.LbPolicy = uint32(lb.GetMode()) // set loadbalance mode if waypoint != nil && waypoint.GetAddress() != nil { @@ -629,49 +644,55 @@ func (p *Processor) storeServiceData(serviceName string, waypoint *workloadapi.G } } - // Already exists, it means this is service update. - if err = p.bpf.ServiceLookup(&sk, &oldValue); err == nil { + if err := p.bpf.ServiceLookup(&sk, &oldValue); err == nil { + // Already exists, it means this is service update. newValue.EndpointCount = oldValue.EndpointCount - } + // if it is a policy update + if newValue.LbPolicy != oldValue.LbPolicy { + // transit from locality loadbalance to random + if newValue.LbPolicy == uint32(workloadapi.LoadBalancing_UNSPECIFIED_MODE) { + // In locality load balancing mode, the workloads are stored according to the calculated corresponding priorities. + // When switching from locality load balancing mode to random, we first update the endpoint map, as at this point, + // there might not be any workload with the highest priority, and directly switching the service's LB policy could + // lead to unexpected service disruptions. + if err = p.updateEndpointPriority(sk.ServiceId, false); err != nil { // this will change bpf map totally + return fmt.Errorf("update endpoint priority failed: %v", err) + } + updateValue := bpf.ServiceValue{} + if err = p.bpf.ServiceLookup(&sk, &updateValue); err != nil { + return fmt.Errorf("service map lookup %v failed: %v", sk.ServiceId, err) + } + updateValue.LbPolicy = newValue.LbPolicy + if err = p.bpf.ServiceUpdate(&sk, &updateValue); err != nil { + return fmt.Errorf("service map update failed: %v", err) + } + return nil + } else if oldValue.LbPolicy == uint32(workloadapi.LoadBalancing_UNSPECIFIED_MODE) { // from random to locality loadbalance + // In random mode, the workloads are stored with the highest priority. When switching from random mode to locality + // load balancing, we first update the service map to quickly initiate the transition of the strategy. Subsequently, + // we update the endpoint map. During this update process, the load balancer may briefly exhibit abnormal random behavior, + // after which it will fully transition to the locality load balancing mode. + if err = p.bpf.ServiceUpdate(&sk, &newValue); err != nil { + return fmt.Errorf("service map update lb policy failed: %v", err) + } - // if it is a policy update - if newValue.LbPolicy != oldValue.LbPolicy { - if newValue.LbPolicy == uint32(workloadapi.LoadBalancing_UNSPECIFIED_MODE) { // from locality loadbalance to random - // In locality load balancing mode, the workloads are stored according to the calculated corresponding priorities. - // When switching from locality load balancing mode to random, we first update the endpoint map, as at this point, - // there might not be any workload with the highest priority, and directly switching the service's LB policy could - // lead to unexpected service disruptions. - if err = p.updateEndpoint(sk.ServiceId, false); err != nil { // this will change bpf map totally - log.Errorf("UpdateEndpoint failed, err:%s", err) - } - updateValue := bpf.ServiceValue{} - if err = p.bpf.ServiceLookup(&sk, &updateValue); err != nil { - log.Warnf("Lookup Service failed, err:%s", err) - } - updateValue.LbPolicy = newValue.LbPolicy - if err = p.bpf.ServiceUpdate(&sk, &updateValue); err != nil { - log.Errorf("Update Service failed, err:%s", err) - } - return nil - } else if oldValue.LbPolicy == uint32(workloadapi.LoadBalancing_UNSPECIFIED_MODE) { // from random to locality loadbalance - // In random mode, the workloads are stored with the highest priority. When switching from random mode to locality - // load balancing, we first update the service map to quickly initiate the transition of the strategy. Subsequently, - // we update the endpoint map. During this update process, the load balancer may briefly exhibit abnormal random behavior, - // after which it will fully transition to the locality load balancing mode. - if err = p.bpf.ServiceUpdate(&sk, &newValue); err != nil { // update service first - log.Errorf("Update Service failed, err:%s", err) - } - if err = p.updateEndpoint(sk.ServiceId, true); err != nil { // update new service bpf map - log.Errorf("UpdateEndpoint failed, err:%s", err) + if err = p.updateEndpointPriority(sk.ServiceId, true); err != nil { + return fmt.Errorf("update endpoint priority failed: %v", err) + } + return nil } - return nil - } - } else { // normal update - if err = p.bpf.ServiceUpdate(&sk, &newValue); err != nil { - log.Errorf("Update Service failed, err:%s", err) } } + // normal update + if err := p.bpf.ServiceUpdate(&sk, &newValue); err != nil { + return fmt.Errorf("service map update failed: %v", err) + } + + if err := p.updateServiceFrontendMap(sk.ServiceId, service); err != nil { + return fmt.Errorf("updateServiceFrontendMap failed: %v", err) + } + return nil } @@ -708,18 +729,9 @@ func (p *Processor) handleService(service *workloadapi.Service) error { } p.ServiceCache.AddOrUpdateService(service) - serviceName := service.ResourceName() - serviceId := p.hashName.Hash(serviceName) - - // store in frontend - if err := p.storeServiceFrontendData(serviceId, service); err != nil { - log.Errorf("storeServiceFrontendData failed, err:%s", err) - return err - } - - // get endpoint from ServiceCache, and update service and endpoint map - if err := p.storeServiceData(serviceName, service.GetWaypoint(), service.GetPorts(), service.GetLoadBalancing()); err != nil { - log.Errorf("storeServiceData failed, err:%s", err) + // update service and endpoint map + if err := p.updateServiceMap(service); err != nil { + log.Errorf("update service %s maps failed: %v", service.ResourceName(), err) return err } @@ -906,7 +918,8 @@ func (p *Processor) handleRemovedAuthzPolicyDuringRestart(rbac *auth.Rbac) { } } -// deleteEndpointRecords deletes endpoint from endpoint map and simultaneously update service map +// deleteEndpointRecords deletes endpoint from endpoint map and moves the last endpoints to occupy the deleted position, +// then simultaneously update service map's endpoint count. func (p *Processor) deleteEndpointRecords(endpointKeys []bpf.EndpointKey) error { var ( sk = bpf.ServiceKey{} @@ -918,45 +931,75 @@ func (p *Processor) deleteEndpointRecords(endpointKeys []bpf.EndpointKey) error return nil } + // sort endpointKeys, first delete the endpoint with the highest priority and the largest BackendIndex + // so that it will not influence the backendInde of the other endpoints that deleted later + sort.Slice(endpointKeys, func(i, j int) bool { + if endpointKeys[i].Prio == endpointKeys[j].Prio { + return endpointKeys[i].BackendIndex > endpointKeys[j].BackendIndex + } + return endpointKeys[i].Prio > endpointKeys[j].Prio + }) + for _, ek := range endpointKeys { sk.ServiceId = ek.ServiceId - // 1. find the service if err := p.bpf.ServiceLookup(&sk, &sv); err == nil { - // 2. find the last indexed endpoint of the service - currentIndex := ek.BackendIndex - if err := p.bpf.EndpointSwap(currentIndex, sv.EndpointCount[ek.Prio], sk.ServiceId, ek.Prio); err != nil { - log.Errorf("swap workload endpoint index failed: %s", err) - return err - } - if err = p.bpf.EndpointLookup(&ek, &ev); err != nil { - log.Errorf("Lookup endpoint %#v failed: %s", ek, err) + log.Errorf("Lookup endpoint %#v failed: %v", ek, err) + continue } - p.EndpointCache.DeleteEndpoint(ek.ServiceId, ev.BackendUid) - sv.EndpointCount[ek.Prio] = sv.EndpointCount[ek.Prio] - 1 - if err = p.bpf.ServiceUpdate(&sk, &sv); err != nil { - log.Errorf("ServiceUpdate failed: %s", err) - return err + if err := p.deleteEndpoint(ek, ev, sv, sk); err != nil { + log.Errorf("deleteEndpoint failed: %v", err) + continue } + p.EndpointCache.DeleteEndpointWithPriority(ek.ServiceId, ev.BackendUid, ek.Prio) } else { // service not exist, we should also delete the endpoint - log.Errorf("service %d not found, should not occur: %v", ek.ServiceId, err) + log.Warnf("service %d not found, should not occur: %v", ek.ServiceId, err) if err = p.bpf.EndpointLookup(&ek, &ev); err != nil { log.Errorf("Lookup endpoint %#v failed: %s", ek, err) + continue } // delete endpoint from map if err := p.bpf.EndpointDelete(&ek); err != nil { log.Errorf("EndpointDelete [%#v] failed: %v", ek, err) - return err + continue } - p.EndpointCache.DeleteEndpoint(ek.ServiceId, ev.BackendUid) + p.EndpointCache.DeleteEndpointWithPriority(ek.ServiceId, ev.BackendUid, ek.Prio) } } return nil } +// In order to make sure the bpf prog can always get the healthy endpoint, we should update the bpf map in the following order: +// 1. replace the current endpoint with the last endpoint +// 2. update the service map's endpoint count +// 3. delete the last endpoint +func (p *Processor) deleteEndpoint(ek bpf.EndpointKey, ev bpf.EndpointValue, sv bpf.ServiceValue, sk bpf.ServiceKey) error { + if err := p.bpf.EndpointSwap(ek.BackendIndex, ev.BackendUid, sv.EndpointCount[ek.Prio], sk.ServiceId, ek.Prio); err != nil { + log.Errorf("swap workload endpoint index failed: %s", err) + return err + } + + sv.EndpointCount[ek.Prio] = sv.EndpointCount[ek.Prio] - 1 + if err := p.bpf.ServiceUpdate(&sk, &sv); err != nil { + log.Errorf("ServiceUpdate %#v failed: %v", sk, err) + return err + } + + lastKey := &bpf.EndpointKey{ + ServiceId: sk.ServiceId, + Prio: ek.Prio, + BackendIndex: sv.EndpointCount[ek.Prio] + 1, + } + if err := p.bpf.EndpointDelete(lastKey); err != nil { + log.Errorf("EndpointDelete [%#v] failed: %v", lastKey, err) + return err + } + return nil +} + func (p *Processor) storeWorkloadPolicies(uid string, polices []string) { var ( key = bpf.WorkloadPolicyKey{} diff --git a/pkg/controller/workload/workload_processor_test.go b/pkg/controller/workload/workload_processor_test.go index 6709a1c75..9963c1174 100644 --- a/pkg/controller/workload/workload_processor_test.go +++ b/pkg/controller/workload/workload_processor_test.go @@ -782,17 +782,19 @@ func TestLBPolicyUpdate(t *testing.T) { checkServiceMap(t, p, p.hashName.Hash(randomSvc.ResourceName()), randomSvc, 3, 0) assert.Equal(t, 1, p.bpf.ServiceCount()) // check endpoint map - t.Log("1. check endpoint map") + t.Log("2. check endpoint map") checkEndpointMap(t, p, randomSvc, backendUid) assert.Equal(t, 4, p.bpf.EndpointCount()) // check backend map + t.Log("3. check backend map") for _, wl := range []*workloadapi.Workload{wl1, wl2, wl3, wl4} { checkBackendMap(t, p, p.hashName.Hash(wl.ResourceName()), wl) } assert.Equal(t, 4, p.bpf.BackendCount()) // 2. Locality Loadbalance Update from random to locality LB + t.Log("lb policy update to locality lb") addr = serviceToAddress(llbSvc) res2.Resources = append(res2.Resources, &service_discovery_v3.Resource{ Resource: protoconv.MessageToAny(addr), @@ -803,14 +805,14 @@ func TestLBPolicyUpdate(t *testing.T) { assert.Equal(t, 5, p.bpf.FrontendCount()) // check service map - t.Log("2. check service map") + t.Log("4. check service map") checkServiceMap(t, p, p.hashName.Hash(llbSvc.ResourceName()), llbSvc, 0, 1) checkServiceMap(t, p, p.hashName.Hash(llbSvc.ResourceName()), llbSvc, 1, 1) checkServiceMap(t, p, p.hashName.Hash(llbSvc.ResourceName()), llbSvc, 2, 1) checkServiceMap(t, p, p.hashName.Hash(llbSvc.ResourceName()), llbSvc, 3, 1) assert.Equal(t, 1, p.bpf.ServiceCount()) // check endpoint map - t.Log("2. check endpoint map") + t.Log("5. check endpoint map") checkEndpointMap(t, p, llbSvc, backendUid) assert.Equal(t, 4, p.bpf.EndpointCount()) @@ -825,14 +827,14 @@ func TestLBPolicyUpdate(t *testing.T) { assert.Equal(t, 5, p.bpf.FrontendCount()) // check service map - t.Log("3. check service map") + t.Log("6. check service map") checkServiceMap(t, p, p.hashName.Hash(randomSvc.ResourceName()), randomSvc, 0, 4) // 4 1 checkServiceMap(t, p, p.hashName.Hash(randomSvc.ResourceName()), randomSvc, 1, 0) // 0 1 checkServiceMap(t, p, p.hashName.Hash(randomSvc.ResourceName()), randomSvc, 2, 0) // 0 1 checkServiceMap(t, p, p.hashName.Hash(randomSvc.ResourceName()), randomSvc, 3, 0) // 0 1 assert.Equal(t, 1, p.bpf.ServiceCount()) // check endpoint map - t.Log("3. check endpoint map") + t.Log("7. check endpoint map") checkEndpointMap(t, p, randomSvc, backendUid) assert.Equal(t, 4, p.bpf.EndpointCount())