Skip to content

Commit

Permalink
[WIP] More validation work
Browse files Browse the repository at this point in the history
- Move dependency validation from Validator to SortObjs.
- Move SortObjs call from solver to applier/destroyer.
  This allows the resulting errors to be treated as validation errors.
- Modify SortObjs to return a MultiError so that all validation errors
  can be sent and invalid objects can be skipped or cause early exit.
- Add invalid objects to the TestContext so they can be retained in
  the inventory (only if already present). This primarily applies to
  invalid annotations and dependencies. Objects without name or kind
  should never be added to the inventory.
- Remove invalid objects from the applySet/pruneSet, rather than
  filtering and skipping them in the apply/prune/wait tasks. This
  reduces the number of events that are redundant now with validation
  events.
- Handle CyclicDependencyError as a validation error. It applies to
  multiple objects, instead of just one, like ValidationError.
- Handle validation of dependency errors in destroyer as well as
  the applier.
- Replace many -OrDie object commands that were panicing when an
  invalid object was found. Add ValidateObjMetadata to do the
  validation that the -OrDie functions were doing.
- Replace MultiValidationError with a more generic MultiError.
- Simplify ValidationError to optionally wrap MultiError.
- Modify ValidationError output to be easier to read.
- Add e2e test for invalid object handling.
  • Loading branch information
karlkfi committed Dec 2, 2021
1 parent 52a0ca3 commit 63c2cfc
Show file tree
Hide file tree
Showing 43 changed files with 1,213 additions and 507 deletions.
140 changes: 98 additions & 42 deletions pkg/apply/applier.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"sort"
"time"

"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/klog/v2"
Expand All @@ -25,7 +24,9 @@ import (
"sigs.k8s.io/cli-utils/pkg/apply/taskrunner"
"sigs.k8s.io/cli-utils/pkg/common"
"sigs.k8s.io/cli-utils/pkg/inventory"
"sigs.k8s.io/cli-utils/pkg/multierror"
"sigs.k8s.io/cli-utils/pkg/object"
"sigs.k8s.io/cli-utils/pkg/object/graph"
"sigs.k8s.io/cli-utils/pkg/object/validator"
"sigs.k8s.io/cli-utils/pkg/ordering"
statusfactory "sigs.k8s.io/cli-utils/pkg/util/factory"
Expand Down Expand Up @@ -143,17 +144,13 @@ func (a *Applier) Run(ctx context.Context, invInfo inventory.InventoryInfo, obje
}

// Validate objects
invalidObjects, err := validateObjects(objects, mapper, eventChannel)
if err != nil {
klog.V(4).Infof("calculated %d invalid objs", len(invalidObjects))
if options.ValidationPolicy == validator.ExitEarly {
handleError(eventChannel, err)
return
}
}

// TODO: figure out where invalid objects will fail in subsequent steps.
// Example: missing name or namespace breaks UnstructuredToObjMeta!
var validationErrors []error
v := &validator.Validator{Mapper: mapper}
if err := v.Validate(objects); err != nil {
validationErrors = append(validationErrors, err)
}

applyObjs, pruneObjs, err := a.prepareObjects(invInfo, objects, options)
if err != nil {
Expand All @@ -162,6 +159,46 @@ func (a *Applier) Run(ctx context.Context, invInfo inventory.InventoryInfo, obje
}
klog.V(4).Infof("calculated %d apply objs; %d prune objs", len(applyObjs), len(pruneObjs))

// Build list of apply sets, based on dependency ordering.
applySets, err := graph.SortObjs(applyObjs)
if err != nil {
validationErrors = append(validationErrors, err)
}
klog.V(4).Infof("apply sets: %d", len(applySets))
// Build list of prune sets, based on reverse dependency ordering.
pruneSets, err := graph.ReverseSortObjs(pruneObjs)
if err != nil {
validationErrors = append(validationErrors, err)
}
klog.V(4).Infof("prune sets: %d", len(pruneSets))
// Handle validation events
validationErrors = multierror.Unwrap(validationErrors...)
klog.V(4).Infof("validation errors: %d", len(validationErrors))
if len(validationErrors) > 0 {
for _, err := range validationErrors {
handleValidationError(eventChannel, err)
}
// Exit early, if configured to do so
if options.ValidationPolicy == validator.ExitEarly {
handleError(eventChannel, multierror.New(validationErrors))
return
}
}
// Extract unique IDs from ValidationErrors
invalidIds := invalidIds(validationErrors)
klog.V(4).Infof("invalid objects: %d", len(invalidIds))
// Don't apply or prune invalid objects
// TODO: what about dependencies?
applySets = removeObjects(applySets, invalidIds)
pruneSets = removeObjects(pruneSets, invalidIds)
// Build a TaskContext for passing info between tasks
resourceCache := cache.NewResourceCacheMap()
taskContext := taskrunner.NewTaskContext(eventChannel, resourceCache)
// Register invalid objects to be retained in the inventory, if present.
for _, id := range invalidIds {
taskContext.AddInvalidObject(id)
}

// Fetch the queue (channel) of tasks that should be executed.
klog.V(4).Infoln("applier building task queue...")
taskBuilder := &solver.TaskQueueBuilder{
Expand All @@ -184,7 +221,7 @@ func (a *Applier) Run(ctx context.Context, invInfo inventory.InventoryInfo, obje
// Build list of apply validation filters.
applyFilters := []filter.ValidationFilter{
filter.InvalidObjectFilter{
InvalidIds: invalidObjects,
InvalidIds: invalidIds,
Policy: options.ValidationPolicy,
},
}
Expand All @@ -200,7 +237,7 @@ func (a *Applier) Run(ctx context.Context, invInfo inventory.InventoryInfo, obje
// Build list of prune validation filters.
pruneFilters := []filter.ValidationFilter{
filter.InvalidObjectFilter{
InvalidIds: invalidObjects,
InvalidIds: invalidIds,
Policy: options.ValidationPolicy,
},
filter.PreventRemoveFilter{},
Expand All @@ -209,12 +246,10 @@ func (a *Applier) Run(ctx context.Context, invInfo inventory.InventoryInfo, obje
InvPolicy: options.InventoryPolicy,
},
filter.LocalNamespacesFilter{
LocalNamespaces: localNamespaces(invInfo, object.UnstructuredsToObjMetasOrDie(objects)),
LocalNamespaces: localNamespaces(invInfo, object.UnstructuredSetToObjMetadataSet(objects)),
},
}
// Build list of apply mutators.
// Share a thread-safe cache with the status poller.
resourceCache := cache.NewResourceCacheMap()
applyMutators := []mutator.Interface{
&mutator.ApplyTimeMutator{
Client: client,
Expand All @@ -225,8 +260,8 @@ func (a *Applier) Run(ctx context.Context, invInfo inventory.InventoryInfo, obje
// Build the task queue by appending tasks in the proper order.
taskQueue, err := taskBuilder.
AppendInvAddTask(invInfo, applyObjs, options.DryRunStrategy).
AppendApplyWaitTasks(applyObjs, applyFilters, applyMutators, opts).
AppendPruneWaitTasks(pruneObjs, pruneFilters, opts).
AppendApplyWaitTasks(applySets, applyFilters, applyMutators, opts).
AppendPruneWaitTasks(pruneSets, pruneFilters, opts).
AppendInvSetTask(invInfo, options.DryRunStrategy).
Build()
if err != nil {
Expand All @@ -243,9 +278,8 @@ func (a *Applier) Run(ctx context.Context, invInfo inventory.InventoryInfo, obje
}
// Create a new TaskStatusRunner to execute the taskQueue.
klog.V(4).Infoln("applier building TaskStatusRunner...")
allIds := object.UnstructuredsToObjMetasOrDie(append(applyObjs, pruneObjs...))
allIds := object.UnstructuredSetToObjMetadataSet(append(applyObjs, pruneObjs...))
runner := taskrunner.NewTaskStatusRunner(allIds, a.StatusPoller)
taskContext := taskrunner.NewTaskContext(eventChannel, resourceCache)
klog.V(4).Infoln("applier running TaskStatusRunner...")
err = runner.Run(ctx, taskContext, taskQueue.ToChannel(), taskrunner.Options{
PollInterval: options.PollInterval,
Expand Down Expand Up @@ -340,32 +374,54 @@ func localNamespaces(localInv inventory.InventoryInfo, localObjs []object.ObjMet
return namespaces
}

func sendValidationEvent(eventChannel chan<- event.Event, id object.ObjMetadata, err error) {
eventChannel <- event.Event{
Type: event.ValidationType,
ValidationEvent: event.ValidationEvent{
Identifier: id,
Error: err,
},
func handleValidationError(eventChannel chan<- event.Event, err error) {
switch tErr := err.(type) {
case *validator.ValidationError:
// handle validation error about a specific object
eventChannel <- event.Event{
Type: event.ValidationType,
ValidationEvent: event.ValidationEvent{
Identifier: tErr.Identifier,
Error: tErr,
},
}
default:
// handle general validation error (no specific object)
eventChannel <- event.Event{
Type: event.ValidationType,
ValidationEvent: event.ValidationEvent{
Error: tErr,
},
}
}
}

// validateObjects validates the objects and sends a ValidationEvent for each
// invalid object. The list of invalid object Ids is returned. If any objects
// are invalid, an error is also returned.
func validateObjects(
objs object.UnstructuredSet,
mapper meta.RESTMapper,
eventChannel chan<- event.Event,
) (object.ObjMetadataSet, error) {
var invalidObjects object.ObjMetadataSet
v := &validator.Validator{Mapper: mapper}
if mve := v.Validate(objs); mve != nil {
for _, ve := range mve.Errors {
invalidObjects = append(invalidObjects, ve.Identifier)
sendValidationEvent(eventChannel, ve.Identifier, mve)
// invalidIds extracts unique IDs from ValidationErrors
func invalidIds(errs []error) object.ObjMetadataSet {
var ids object.ObjMetadataSet
for _, err := range errs {
switch tErr := err.(type) {
case *validator.ValidationError:
if tErr.Identifier != object.NilObjMetadata {
ids = append(ids, tErr.Identifier)
}
case graph.CyclicDependencyError:
ids = append(ids, tErr.Identifiers...)
}
}
return ids.Unique()
}

// removeObjects removes objects from the setList that match ids in the removeSet.
// The original setList is modified and returned.
func removeObjects(setList []object.UnstructuredSet, removeSet object.ObjMetadataSet) []object.UnstructuredSet {
for i := range setList {
for j := len(setList[i]) - 1; j >= 0; j-- {
id := object.UnstructuredToObjMetadata(setList[i][j])
if removeSet.Contains(id) {
setList[i] = append(setList[i][:j], setList[i][j+1:]...)
}
}
return invalidObjects, mve
}
return invalidObjects, nil
return setList
}
2 changes: 1 addition & 1 deletion pkg/apply/cache/resource_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ type ResourceCache interface {
ResourceCacheReader
// Load one or more resources into the cache, generating the ObjMetadata
// from the objects.
Load(...ResourceStatus) error
Load(...ResourceStatus)
// Put the resource into the cache using the specified ID.
Put(object.ObjMetadata, ResourceStatus)
// Remove the resource associated with the ID from the cache.
Expand Down
10 changes: 2 additions & 8 deletions pkg/apply/cache/resource_cache_map.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
package cache

import (
"fmt"
"sync"

"k8s.io/klog/v2"
Expand All @@ -28,19 +27,14 @@ func NewResourceCacheMap() *ResourceCacheMap {

// Load resources into the cache, generating the ID from the resource itself.
// Existing resources with the same ID will be replaced.
// Returns an error if any resource ID cannot be generated.
func (rc *ResourceCacheMap) Load(values ...ResourceStatus) error {
func (rc *ResourceCacheMap) Load(values ...ResourceStatus) {
rc.mu.Lock()
defer rc.mu.Unlock()

for _, value := range values {
id, err := object.UnstructuredToObjMeta(value.Resource)
if err != nil {
return fmt.Errorf("failed to generate resource ID: %w", err)
}
id := object.UnstructuredToObjMetadata(value.Resource)
rc.cache[id] = value
}
return nil
}

// Put the resource into the cache using the supplied ID, replacing any
Expand Down
45 changes: 41 additions & 4 deletions pkg/apply/destroyer.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,10 @@ import (
"sigs.k8s.io/cli-utils/pkg/apply/taskrunner"
"sigs.k8s.io/cli-utils/pkg/common"
"sigs.k8s.io/cli-utils/pkg/inventory"
"sigs.k8s.io/cli-utils/pkg/multierror"
"sigs.k8s.io/cli-utils/pkg/object"
"sigs.k8s.io/cli-utils/pkg/object/graph"
"sigs.k8s.io/cli-utils/pkg/object/validator"
statusfactory "sigs.k8s.io/cli-utils/pkg/util/factory"
)

Expand Down Expand Up @@ -80,6 +83,9 @@ type DestroyerOptions struct {
// PollInterval defines how often we should poll for the status
// of resources.
PollInterval time.Duration

// ValidationPolicy defines how to handle invalid objects.
ValidationPolicy validator.ValidationPolicy
}

func setDestroyerDefaults(o *DestroyerOptions) {
Expand Down Expand Up @@ -135,9 +141,42 @@ func (d *Destroyer) Run(ctx context.Context, inv inventory.InventoryInfo, option
InvPolicy: options.InventoryPolicy,
},
}
// Build list of delete sets, based on reverse dependency ordering.
var validationErrors []error
deleteSets, err := graph.ReverseSortObjs(deleteObjs)
if err != nil {
validationErrors = append(validationErrors, err)
}
klog.V(4).Infof("delete sets: %d", len(deleteSets))
// Handle validation events
validationErrors = multierror.Unwrap(validationErrors...)
klog.V(4).Infof("validation errors: %d", len(validationErrors))
if len(validationErrors) > 0 {
for _, err := range validationErrors {
handleValidationError(eventChannel, err)
}
// Exit early, if configured to do so
if options.ValidationPolicy == validator.ExitEarly {
handleError(eventChannel, multierror.New(validationErrors))
return
}
}
// Extract unique IDs from ValidationErrors
invalidIds := invalidIds(validationErrors)
klog.V(4).Infof("invalid objects: %d", len(invalidIds))
// Don't delete invalid objects
// TODO: what about dependencies?
deleteSets = removeObjects(deleteSets, invalidIds)
// Build a TaskContext for passing info between tasks
resourceCache := cache.NewResourceCacheMap()
taskContext := taskrunner.NewTaskContext(eventChannel, resourceCache)
// Register invalid objects to be retained in the inventory, if present.
for _, id := range invalidIds {
taskContext.AddInvalidObject(id)
}
// Build the ordered set of tasks to execute.
taskQueue, err := taskBuilder.
AppendPruneWaitTasks(deleteObjs, deleteFilters, opts).
AppendPruneWaitTasks(deleteSets, deleteFilters, opts).
AppendDeleteInvTask(inv, options.DryRunStrategy).
Build()
if err != nil {
Expand All @@ -154,10 +193,8 @@ func (d *Destroyer) Run(ctx context.Context, inv inventory.InventoryInfo, option
}
// Create a new TaskStatusRunner to execute the taskQueue.
klog.V(4).Infoln("destroyer building TaskStatusRunner...")
deleteIds := object.UnstructuredsToObjMetasOrDie(deleteObjs)
resourceCache := cache.NewResourceCacheMap()
deleteIds := object.UnstructuredSetToObjMetadataSet(deleteObjs)
runner := taskrunner.NewTaskStatusRunner(deleteIds, d.StatusPoller)
taskContext := taskrunner.NewTaskContext(eventChannel, resourceCache)
klog.V(4).Infoln("destroyer running TaskStatusRunner...")
// TODO(seans): Make the poll interval configurable like the applier.
err = runner.Run(ctx, taskContext, taskQueue.ToChannel(), taskrunner.Options{
Expand Down
10 changes: 5 additions & 5 deletions pkg/apply/event/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,11 +204,11 @@ func (age ActionGroupEvent) String() string {
type ApplyEventOperation int

const (
ApplyUnspecified ApplyEventOperation = iota
ServersideApplied
Created
Unchanged
Configured
ApplyUnspecified ApplyEventOperation = iota
ServersideApplied // TODO: rename AppliedServerside
Created // TODO: rename AppliedNew
Unchanged // TODO: rename ApplySkipped
Configured // TODO: rename Applied
)

type ApplyEvent struct {
Expand Down
4 changes: 2 additions & 2 deletions pkg/apply/filter/invalid-object-filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,14 +35,14 @@ func (sif InvalidObjectFilter) Filter(obj *unstructured.Unstructured) (bool, str
case validator.ApplyAll:
return false, "", nil
case validator.SkipInvalid:
id := validator.UnstructuredToObjMetadata(obj)
id := object.UnstructuredToObjMetadata(obj)
if sif.InvalidIds.Contains(id) {
// skip - invalid object
return true, fmt.Sprintf("validation policy prevented apply (policy: %q)", sif.Policy), nil
}
return false, "", nil
case validator.ExitEarly:
id := validator.UnstructuredToObjMetadata(obj)
id := object.UnstructuredToObjMetadata(obj)
if sif.InvalidIds.Contains(id) {
// shouldn't happen - should have errored earlier
return false, "", fmt.Errorf("invalid object (object: %q) ", id)
Expand Down
5 changes: 3 additions & 2 deletions pkg/apply/prune/event-factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@ func TestEventFactory(t *testing.T) {
}
for name, tc := range tests {
t.Run(name, func(t *testing.T) {
id, err := object.UnstructuredToObjMeta(tc.obj)
require.NoError(t, err)
id := object.UnstructuredToObjMetadata(tc.obj)
require.NoError(t, object.ValidateObjMetadata(id))
eventFactory := CreateEventFactory(tc.destroy, "task-0")
// Validate the "success" event"
actualEvent := eventFactory.CreateSuccessEvent(tc.obj)
Expand All @@ -42,6 +42,7 @@ func TestEventFactory(t *testing.T) {
tc.expectedType, actualEvent.Type)
}
var actualObj *unstructured.Unstructured
var err error
if tc.expectedType == event.PruneType {
if event.Pruned != actualEvent.PruneEvent.Operation {
t.Errorf("success event expected operation (Pruned), got (%s)",
Expand Down
2 changes: 1 addition & 1 deletion pkg/apply/prune/prune.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ func (p *Pruner) GetPruneObjs(
objs object.UnstructuredSet,
opts Options,
) (object.UnstructuredSet, error) {
ids := object.UnstructuredsToObjMetasOrDie(objs)
ids := object.UnstructuredSetToObjMetadataSet(objs)
invIDs, err := p.InvClient.GetClusterObjs(inv, opts.DryRunStrategy)
if err != nil {
return nil, err
Expand Down
Loading

0 comments on commit 63c2cfc

Please sign in to comment.