Skip to content
This repository has been archived by the owner on Apr 25, 2023. It is now read-only.

Commit

Permalink
Merge pull request #1380 from ExpediaInc/infinite-reconciliation-loop…
Browse files Browse the repository at this point in the history
…-rawstatuscollection

Infinite reconciliation loop (rawstatuscollection)
  • Loading branch information
k8s-ci-robot authored Mar 25, 2021
2 parents 66abaa4 + c7f9e8c commit e13973c
Show file tree
Hide file tree
Showing 3 changed files with 249 additions and 36 deletions.
2 changes: 1 addition & 1 deletion pkg/controller/sync/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -387,7 +387,7 @@ func (s *KubeFedSyncController) syncToClusters(fedResource FederatedResource) ut
}

collectedStatus, collectedResourceStatus := dispatcher.CollectedStatus()
klog.V(4).Infof("Setting the federated status '%v'", collectedResourceStatus)
klog.V(4).Infof("Setting the federated status '%v' for %s %q", collectedResourceStatus, kind, key)
return s.setFederatedStatus(fedResource, status.AggregateSuccess, &collectedStatus, &collectedResourceStatus, enableRawResourceStatusCollection)
}

Expand Down
49 changes: 42 additions & 7 deletions pkg/controller/sync/status/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,15 +125,25 @@ type CollectedResourceStatus struct {
// whether status should be written to the API.
func SetFederatedStatus(fedObject *unstructured.Unstructured, reason AggregateReason, collectedStatus CollectedPropagationStatus, collectedResourceStatus CollectedResourceStatus, resourceStatusCollection bool) (bool, error) {
resource := &GenericFederatedResource{}

err := util.UnstructuredToInterface(fedObject, resource)
if err != nil {
return false, errors.Wrapf(err, "Failed to unmarshall to generic resource")
}

// we apply to collectedResourceStatus the same marshalling applied to GenericFederatedResource
// so the resources can be actually comparable later on
normalizedCollectedResourceStatus, err := normalizeStatus(collectedResourceStatus)
if err != nil {
return false, errors.Wrap(err, "Failed to normalize status")
}

if resource.Status == nil {
resource.Status = &GenericFederatedStatus{}
}

changed := resource.Status.update(fedObject.GetGeneration(), reason, collectedStatus, collectedResourceStatus, resourceStatusCollection)
changed := resource.Status.update(fedObject.GetGeneration(), reason, collectedStatus, *normalizedCollectedResourceStatus, resourceStatusCollection)

if !changed {
return false, nil
}
Expand Down Expand Up @@ -177,7 +187,7 @@ func (s *GenericFederatedStatus) update(generation int64, reason AggregateReason
}
}

clustersChanged := s.setClusters(collectedStatus.StatusMap, collectedResourceStatus.StatusMap)
clustersChanged := s.setClusters(collectedStatus.StatusMap, collectedResourceStatus.StatusMap, resourceStatusCollection)

// Indicate that changes were propagated if either status.clusters
// was changed or if existing resources were updated (which could
Expand All @@ -196,8 +206,8 @@ func (s *GenericFederatedStatus) update(generation int64, reason AggregateReason
// setClusters sets the status.clusters slice from propagation and resource status
// maps. Returns a boolean indication of whether the status.clusters was
// modified.
func (s *GenericFederatedStatus) setClusters(statusMap PropagationStatusMap, resourceStatusMap map[string]interface{}) bool {
if !s.clustersDiffer(statusMap, resourceStatusMap) {
func (s *GenericFederatedStatus) setClusters(statusMap PropagationStatusMap, resourceStatusMap map[string]interface{}, resourceStatusCollection bool) bool {
if !s.clustersDiffer(statusMap, resourceStatusMap, resourceStatusCollection) {
return false
}
s.Clusters = []GenericClusterStatus{}
Expand All @@ -214,9 +224,9 @@ func (s *GenericFederatedStatus) setClusters(statusMap PropagationStatusMap, res

// clustersDiffer checks whether `status.clusters` differs from the
// given status map.
func (s *GenericFederatedStatus) clustersDiffer(statusMap PropagationStatusMap, resourceStatusMap map[string]interface{}) bool {
if len(s.Clusters) != len(statusMap) || len(s.Clusters) != len(resourceStatusMap) {
klog.V(4).Info("Clusters differs from the size")
func (s *GenericFederatedStatus) clustersDiffer(statusMap PropagationStatusMap, resourceStatusMap map[string]interface{}, resourceStatusCollection bool) bool {
if len(s.Clusters) != len(statusMap) || resourceStatusCollection && len(s.Clusters) != len(resourceStatusMap) {
klog.V(4).Infof("Clusters differs from the size: clusters = %v, statusMap = %v, resourceStatusMap = %v", s.Clusters, statusMap, resourceStatusMap)
return true
}
for _, status := range s.Clusters {
Expand Down Expand Up @@ -278,3 +288,28 @@ func (s *GenericFederatedStatus) setPropagationCondition(reason AggregateReason,

return updateRequired
}

func normalizeStatus(collectedResourceStatus CollectedResourceStatus) (*CollectedResourceStatus, error) {
if len(collectedResourceStatus.StatusMap) == 0 {
return &collectedResourceStatus, nil
}
cleanedStatus := CollectedResourceStatus{
StatusMap: map[string]interface{}{},
ResourcesUpdated: collectedResourceStatus.ResourcesUpdated,
}

for key, value := range collectedResourceStatus.StatusMap {
content, err := json.Marshal(value)
if err != nil {
return nil, errors.Wrapf(err, "Failed to marshall collected resource status for cluster %s", key)
}
var status interface{}
err = json.Unmarshal(content, &status)
if err != nil {
return nil, errors.Wrapf(err, "Failed to unmarshall collected resource status as interface for cluster %s", key)
}
cleanedStatus.StatusMap[key] = status
}

return &cleanedStatus, nil
}
234 changes: 206 additions & 28 deletions pkg/controller/sync/status/status_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,67 +17,187 @@ limitations under the License.
package status

import (
"reflect"
"testing"

apiv1 "k8s.io/api/core/v1"
)

func TestGenericPropagationStatusUpdateChanged(t *testing.T) {
testCases := map[string]struct {
generation int64
reason AggregateReason
statusMap PropagationStatusMap
resourceStatusMap map[string]interface{}
resourcesUpdated bool
expectedChanged bool
generation int64
reason AggregateReason
statusMap PropagationStatusMap
resourceStatusMap map[string]interface{}
remoteStatus interface{}
resourcesUpdated bool
expectedChanged bool
resourceStatusCollection bool
}{
"No change in clusters indicates unchanged": {
"Cluster not propagated indicates changed with status collected enabled": {
statusMap: PropagationStatusMap{
"cluster1": ClusterNotReady,
},
resourceStatusMap: map[string]interface{}{
"cluster1": map[string]interface{}{},
},
reason: AggregateSuccess,
resourcesUpdated: false,
resourceStatusCollection: true,
expectedChanged: true,
},
"Cluster not propagated indicates changed with status collected disabled": {
statusMap: PropagationStatusMap{
"cluster1": ClusterNotReady,
},
resourceStatusMap: map[string]interface{}{
"cluster1": map[string]interface{}{},
},
reason: AggregateSuccess,
resourcesUpdated: false,
resourceStatusCollection: false,
expectedChanged: true,
},
"Cluster status not retrieved indicates changed with status collected enabled": {
statusMap: PropagationStatusMap{},
reason: AggregateSuccess,
resourcesUpdated: false,
resourceStatusCollection: true,
expectedChanged: true,
},
"Cluster status not retrieved indicates changed with status collected disabled": {
statusMap: PropagationStatusMap{},
reason: AggregateSuccess,
resourcesUpdated: false,
resourceStatusCollection: false,
expectedChanged: true,
},
"No collected remote status indicates changed with status collected enabled": {
statusMap: PropagationStatusMap{
"cluster1": ClusterPropagationOK,
},
reason: AggregateSuccess,
resourcesUpdated: false,
resourceStatusCollection: true,
expectedChanged: true,
},
"No collected remote status indicates unchanged with status collected disabled": {
statusMap: PropagationStatusMap{
"cluster1": ClusterPropagationOK,
},
reason: AggregateSuccess,
resourcesUpdated: false,
resourceStatusCollection: false,
expectedChanged: false,
},
"No change in clusters indicates unchanged with status collected enabled": {
statusMap: PropagationStatusMap{
"cluster1": ClusterPropagationOK,
},
resourceStatusMap: map[string]interface{}{
"ready": false,
"stage": "absent",
"cluster1": map[string]interface{}{},
},
resourcesUpdated: false,
expectedChanged: true,
resourcesUpdated: false,
resourceStatusCollection: true,
expectedChanged: true,
},
"No change in clusters with update indicates changed": {
"No change in clusters indicates unchanged with status collected disabled": {
statusMap: PropagationStatusMap{
"cluster1": ClusterPropagationOK,
},
resourceStatusMap: map[string]interface{}{
"ready": false,
"stage": "absent",
"cluster1": map[string]interface{}{},
},
resourcesUpdated: true,
expectedChanged: true,
resourcesUpdated: false,
resourceStatusCollection: false,
expectedChanged: true,
},
"Change in clusters indicates changed": {
"No change in clusters with update indicates changed with status collected enabled": {
statusMap: PropagationStatusMap{
"cluster1": ClusterPropagationOK,
},
resourceStatusMap: map[string]interface{}{
"ready": true,
"stage": "deployed",
"cluster1": map[string]interface{}{},
},
expectedChanged: true,
resourcesUpdated: true,
resourceStatusCollection: true,
expectedChanged: true,
},
"Transition indicates changed": {
reason: NamespaceNotFederated,
expectedChanged: true,
"No change in clusters with update indicates changed with status collected disabled": {
statusMap: PropagationStatusMap{
"cluster1": ClusterPropagationOK,
},
resourceStatusMap: map[string]interface{}{
"cluster1": map[string]interface{}{},
},
resourcesUpdated: true,
resourceStatusCollection: false,
expectedChanged: true,
},
"Changed generation indicates changed": {
generation: 1,
expectedChanged: true,
"No change in remote status indicates unchanged with status collected enabled": {
statusMap: PropagationStatusMap{
"cluster1": ClusterPropagationOK,
},
resourceStatusMap: map[string]interface{}{
"cluster1": map[string]interface{}{
"status": map[string]interface{}{
"status": "remoteStatus",
},
},
},
remoteStatus: map[string]interface{}{
"status": map[string]interface{}{
"status": "remoteStatus",
},
},
resourcesUpdated: false,
resourceStatusCollection: true,
expectedChanged: false,
},
"Change in clusters indicates changed with status collected enabled": {
statusMap: PropagationStatusMap{
"cluster1": ClusterPropagationOK,
"cluster2": ClusterPropagationOK,
},
resourceStatusCollection: true,
expectedChanged: true,
},
"Change in clusters indicates changed with status collected disabled": {
statusMap: PropagationStatusMap{
"cluster1": ClusterPropagationOK,
"cluster2": ClusterPropagationOK,
},
resourceStatusCollection: false,
expectedChanged: true,
},
"Transition indicates changed with remote status collection enabled": {
reason: NamespaceNotFederated,
resourceStatusCollection: true,
expectedChanged: true,
},
"Transition indicates changed with remote status collection disabled": {
reason: NamespaceNotFederated,
resourceStatusCollection: false,
expectedChanged: true,
},
"Changed generation indicates changed with remote status collection enabled": {
generation: 1,
resourceStatusCollection: true,
expectedChanged: true,
},
"Changed generation indicates changed with remote status collection disabled": {
generation: 1,
resourceStatusCollection: false,
expectedChanged: true,
},
}
for testName, tc := range testCases {
t.Run(testName, func(t *testing.T) {
fedStatus := &GenericFederatedStatus{
Clusters: []GenericClusterStatus{
{
Name: "cluster1",
Name: "cluster1",
RemoteStatus: tc.remoteStatus,
},
},
Conditions: []*GenericCondition{
Expand All @@ -95,10 +215,68 @@ func TestGenericPropagationStatusUpdateChanged(t *testing.T) {
StatusMap: tc.resourceStatusMap,
ResourcesUpdated: tc.resourcesUpdated,
}
changed := fedStatus.update(tc.generation, tc.reason, collectedStatus, collectedResourceStatus, true)
changed := fedStatus.update(tc.generation, tc.reason, collectedStatus, collectedResourceStatus, tc.resourceStatusCollection)
if tc.expectedChanged != changed {
t.Fatalf("Expected changed to be %v, got %v", tc.expectedChanged, changed)
}
})
}
}

func TestNormalizeStatus(t *testing.T) {
testCases := []struct {
name string
input CollectedResourceStatus
expectedResult *CollectedResourceStatus
expectedError error
}{
{
name: "CollectedResourceStatus is not modified if StatusMap is nil",
input: CollectedResourceStatus{},
expectedResult: &CollectedResourceStatus{},
},
{
name: "CollectedResourceStatus is not modified if StatusMap is empty",
input: CollectedResourceStatus{
StatusMap: map[string]interface{}{},
},
expectedResult: &CollectedResourceStatus{
StatusMap: map[string]interface{}{},
},
},
{
name: "CollectedResourceStatus StatusMap is correctly normalized and numbers are converted to float64",
input: CollectedResourceStatus{
StatusMap: map[string]interface{}{
"number": 1,
"string": "value",
"complexobj": map[string]int{
"one": 1,
},
},
},
expectedResult: &CollectedResourceStatus{
StatusMap: map[string]interface{}{
"number": float64(1),
"string": "value",
"complexobj": map[string]interface{}{
"one": float64(1),
},
},
},
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
actual, err := normalizeStatus(tc.input)
if err != tc.expectedError {
t.Fatalf("Expected error to be %v, got %v", tc.expectedError, err)
}

if !reflect.DeepEqual(tc.expectedResult, actual) {
t.Fatalf("Expected result to be %#v, got %#v", tc.expectedResult, actual)
}
})
}
}

0 comments on commit e13973c

Please sign in to comment.