Skip to content

Commit

Permalink
Merge pull request #1047 from hzxuzhonghu/workload-handle-order
Browse files Browse the repository at this point in the history
Correct Workload handle order
  • Loading branch information
kmesh-bot authored Nov 30, 2024
2 parents 4674c48 + a9b9dca commit 204d46d
Show file tree
Hide file tree
Showing 5 changed files with 297 additions and 149 deletions.
53 changes: 28 additions & 25 deletions pkg/controller/workload/bpfcache/endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package bpfcache

import (
"errors"
"fmt"

"github.com/cilium/ebpf"
"istio.io/istio/pkg/util/sets"
Expand Down Expand Up @@ -69,22 +70,34 @@ func (c *Cache) EndpointDelete(key *EndpointKey) error {
return err
}

// EndpointSwap update the last endpoint index and remove the current endpoint
func (c *Cache) EndpointSwap(currentIndex, lastIndex uint32, serviceId uint32, prio uint32) error {
// EndpointSwap replaces the current endpoint with the last endpoint
func (c *Cache) EndpointSwap(currentIndex, backendUid, lastIndex uint32, serviceId uint32, prio uint32) error {
if currentIndex > lastIndex {
return fmt.Errorf("currentIndex %d > lastIndex %d", currentIndex, lastIndex)
}
var k EndpointKey // key for the last endpoint
if currentIndex == lastIndex {
return c.EndpointDelete(&EndpointKey{
if lastIndex <= 1 {
// if only one endpoint, do nothing
return nil
}
// update the last endpoint with lastIndex-1 endpoint
k = EndpointKey{
ServiceId: serviceId,
Prio: prio,
BackendIndex: lastIndex - 1,
}
} else {
// update the current endpoint with last endpoint
k = EndpointKey{
ServiceId: serviceId,
Prio: prio,
BackendIndex: lastIndex,
})
}
lastKey := &EndpointKey{
ServiceId: serviceId,
Prio: prio,
BackendIndex: lastIndex,
}
}

lastValue := &EndpointValue{}
if err := c.EndpointLookup(lastKey, lastValue); err != nil {
if err := c.EndpointLookup(&k, lastValue); err != nil {
return err
}

Expand All @@ -93,29 +106,19 @@ func (c *Cache) EndpointSwap(currentIndex, lastIndex uint32, serviceId uint32, p
Prio: prio,
BackendIndex: currentIndex,
}
currentValue := &EndpointValue{}
if err := c.EndpointLookup(currentKey, currentValue); err != nil {
return err
}

// update the last endpoint's index, in other word delete the current endpoint
// replace the current endpoint with the last endpoint
if err := c.bpfMap.KmeshEndpoint.Update(currentKey, lastValue, ebpf.UpdateAny); err != nil {
return err
}

// delete the duplicate last endpoint
if err := c.bpfMap.KmeshEndpoint.Delete(lastKey); err != nil {
return err
}

// delete index for the current endpoint
c.endpointKeys[currentValue.BackendUid].Delete(*currentKey)
if len(c.endpointKeys[currentValue.BackendUid]) == 0 {
delete(c.endpointKeys, currentValue.BackendUid)
c.endpointKeys[backendUid].Delete(*currentKey)
if len(c.endpointKeys[backendUid]) == 0 {
delete(c.endpointKeys, backendUid)
}

// update the last endpoint index
c.endpointKeys[lastValue.BackendUid].Delete(*lastKey)
// add another index for the last endpoint
c.endpointKeys[lastValue.BackendUid].Insert(*currentKey)
return nil
}
Expand Down
86 changes: 86 additions & 0 deletions pkg/controller/workload/bpfcache/endpoint_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
/*
* Copyright The Kmesh Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package bpfcache

import (
"testing"

"github.com/stretchr/testify/assert"
)

func TestEndpointSwap(t *testing.T) {
workloadMap := NewFakeWorkloadMap(t)
defer CleanupFakeWorkloadMap(workloadMap)

c := NewCache(workloadMap)
endpointsMap := map[*EndpointKey]*EndpointValue{
{ServiceId: 1, Prio: 1, BackendIndex: 1}: {BackendUid: 1},
{ServiceId: 1, Prio: 1, BackendIndex: 2}: {BackendUid: 2},
{ServiceId: 1, Prio: 1, BackendIndex: 3}: {BackendUid: 3},
{ServiceId: 1, Prio: 1, BackendIndex: 4}: {BackendUid: 4},
{ServiceId: 1, Prio: 1, BackendIndex: 5}: {BackendUid: 5},
}
for k, v := range endpointsMap {
c.EndpointUpdate(k, v)
}

// invalid currentIndex
err := c.EndpointSwap(6, 5, 5, 1, 1)
assert.ErrorContains(t, err, "> lastIndex")

// update mid element 3 with 5 -> 1 2 5 4 5
err = c.EndpointSwap(3, 3, 5, 1, 1)
assert.Nil(t, err)

lastKey := &EndpointKey{
ServiceId: 1,
Prio: 1,
BackendIndex: 5,
}
// 1 2 5 4 5 -> 1 2 5 4
if err := c.EndpointDelete(lastKey); err != nil {
t.Errorf("EndpointDelete [%#v] failed: %v", lastKey, err)
}

// delete the last element 4 -> 1 2 5 5
err = c.EndpointSwap(4, 4, 4, 1, 1)
assert.Nil(t, err)

lastKey.BackendIndex = 4
// 1 2 5 5 -> 1 2 5
if err := c.EndpointDelete(lastKey); err != nil {
t.Errorf("EndpointDelete [%#v] failed: %v", lastKey, err)
}

// delete the first element 1 2 5 -> 5 2 5
err = c.EndpointSwap(1, 1, 3, 1, 1)
assert.Nil(t, err)
lastKey.BackendIndex = 3
// 5 2 5 -> 5 2
if err := c.EndpointDelete(lastKey); err != nil {
t.Errorf("EndpointDelete [%#v] failed: %v", lastKey, err)
}

eps := c.GetAllEndpointsForService(1)
assert.Equal(t, len(eps), 2)

assert.Equal(t, 2, len(c.endpointKeys))
epKs := c.GetEndpointKeys(2)
assert.Equal(t, 1, epKs.Len())
epKs = c.GetEndpointKeys(5)
assert.Equal(t, 1, epKs.Len())
}
14 changes: 14 additions & 0 deletions pkg/controller/workload/cache/endpoint_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,11 @@ type Endpoint struct {
type EndpointCache interface {
List(uint32) map[uint32]Endpoint // Endpoint slice by ServiceId
AddEndpointToService(ep Endpoint, serviceID uint32)
// DeleteEndpoint delete a endpoint regardless of the priority
DeleteEndpoint(workloadID, serviceID uint32)
// DeleteEndpointWithPriority delete a endpoint with given priority
DeleteEndpointWithPriority(serviceID, workloadID, prio uint32)
// DeleteEndpointByServiceId delete all endpoints belong to a given service
DeleteEndpointByServiceId(uint32)
}

Expand Down Expand Up @@ -62,6 +66,16 @@ func (s *endpointCache) DeleteEndpoint(serviceID, workloadID uint32) {
delete(s.endpointsByServiceId[serviceID], workloadID)
}

func (s *endpointCache) DeleteEndpointWithPriority(serviceID, workloadID, prio uint32) {
s.mutex.Lock()
defer s.mutex.Unlock()
if s.endpointsByServiceId[serviceID] != nil {
if ep, ok := s.endpointsByServiceId[serviceID][workloadID]; ok && ep.Prio == prio {
delete(s.endpointsByServiceId[serviceID], workloadID)
}
}
}

func (s *endpointCache) DeleteEndpointByServiceId(serviceId uint32) {
s.mutex.Lock()
defer s.mutex.Unlock()
Expand Down
Loading

0 comments on commit 204d46d

Please sign in to comment.