Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add ValidationPolicy & ValidationEvent #488

Merged
merged 1 commit into from
Jan 28, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 62 additions & 10 deletions pkg/apply/applier.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,12 +144,14 @@ func (a *Applier) Run(ctx context.Context, invInfo inventory.InventoryInfo, obje

// Validate the resources to make sure we catch those problems early
// before anything has been updated in the cluster.
validator := &validation.Validator{Mapper: mapper}
if err := validator.Validate(objects); err != nil {
handleError(eventChannel, err)
return
vCollector := &validation.Collector{}
validator := &validation.Validator{
Collector: vCollector,
Mapper: mapper,
}
validator.Validate(objects)

// Decide which objects to apply and which to prune
applyObjs, pruneObjs, err := a.prepareObjects(invInfo, objects, options)
if err != nil {
handleError(eventChannel, err)
Expand All @@ -166,6 +168,7 @@ func (a *Applier) Run(ctx context.Context, invInfo inventory.InventoryInfo, obje
Mapper: mapper,
InvClient: a.invClient,
Destroy: false,
Collector: vCollector,
}
opts := solver.Options{
ServerSideOptions: options.ServerSideOptions,
Expand Down Expand Up @@ -199,7 +202,6 @@ func (a *Applier) Run(ctx context.Context, invInfo inventory.InventoryInfo, obje
},
}
// Build list of apply mutators.
// Share a thread-safe cache with the status poller.
resourceCache := cache.NewResourceCacheMap()
applyMutators := []mutator.Interface{
&mutator.ApplyTimeMutator{
Expand All @@ -208,17 +210,43 @@ func (a *Applier) Run(ctx context.Context, invInfo inventory.InventoryInfo, obje
ResourceCache: resourceCache,
},
}
// Build the task queue by appending tasks in the proper order.
taskQueue, err := taskBuilder.

// Build the ordered set of tasks to execute.
taskQueue := taskBuilder.
AppendInvAddTask(invInfo, applyObjs, options.DryRunStrategy).
AppendApplyWaitTasks(applyObjs, applyFilters, applyMutators, opts).
AppendPruneWaitTasks(pruneObjs, pruneFilters, opts).
AppendInvSetTask(invInfo, options.DryRunStrategy).
Build()
if err != nil {
handleError(eventChannel, err)

klog.V(4).Infof("validation errors: %d", len(vCollector.Errors))
klog.V(4).Infof("invalid objects: %d", len(vCollector.InvalidIds))

// Handle validation errors
switch options.ValidationPolicy {
case validation.ExitEarly:
err = vCollector.ToError()
if err != nil {
handleError(eventChannel, err)
return
}
case validation.SkipInvalid:
for _, err := range vCollector.Errors {
handleValidationError(eventChannel, err)
}
default:
handleError(eventChannel, fmt.Errorf("invalid ValidationPolicy: %q", options.ValidationPolicy))
return
}

// Build a TaskContext for passing info between tasks
taskContext := taskrunner.NewTaskContext(eventChannel, resourceCache)

// Register invalid objects to be retained in the inventory, if present.
for _, id := range vCollector.InvalidIds {
taskContext.AddInvalidObject(id)
}

// Send event to inform the caller about the resources that
// will be applied/pruned.
eventChannel <- event.Event{
Expand All @@ -231,7 +259,6 @@ func (a *Applier) Run(ctx context.Context, invInfo inventory.InventoryInfo, obje
klog.V(4).Infoln("applier building TaskStatusRunner...")
allIds := object.UnstructuredSetToObjMetadataSet(append(applyObjs, pruneObjs...))
runner := taskrunner.NewTaskStatusRunner(allIds, a.StatusPoller)
taskContext := taskrunner.NewTaskContext(eventChannel, resourceCache)
klog.V(4).Infoln("applier running TaskStatusRunner...")
err = runner.Run(ctx, taskContext, taskQueue.ToChannel(), taskrunner.Options{
PollInterval: options.PollInterval,
Expand Down Expand Up @@ -283,6 +310,9 @@ type Options struct {

// InventoryPolicy defines the inventory policy of apply.
InventoryPolicy inventory.InventoryPolicy

// ValidationPolicy defines how to handle invalid objects.
ValidationPolicy validation.Policy
}

// setDefaults set the options to the default values if they
Expand Down Expand Up @@ -322,3 +352,25 @@ func localNamespaces(localInv inventory.InventoryInfo, localObjs []object.ObjMet
}
return namespaces
}

func handleValidationError(eventChannel chan<- event.Event, err error) {
switch tErr := err.(type) {
case *validation.Error:
// handle validation error about one or more specific objects
eventChannel <- event.Event{
Type: event.ValidationType,
ValidationEvent: event.ValidationEvent{
Identifiers: tErr.Identifiers(),
Error: tErr,
},
}
default:
// handle general validation error (no specific object)
eventChannel <- event.Event{
Type: event.ValidationType,
ValidationEvent: event.ValidationEvent{
Error: tErr,
},
}
}
}
Loading