Skip to content

Commit

Permalink
✨ Add a priority queue (#3014)
Browse files Browse the repository at this point in the history
* :sparkling: POC of a priority queue

This change contains the POC of a priority workqueue that allows to
prioritize events over one another. It is opt-in and will by default
de-prioritize events originating from the initial listwatch and from
periodic resyncs.

* Use a btree, it is faster

```
$ benchstat slice.txt btree.txt
goos: darwin
goarch: arm64
pkg: sigs.k8s.io/controller-runtime/pkg/controllerworkqueue
cpu: Apple M2 Pro
              │  slice.txt  │              btree.txt              │
              │   sec/op    │   sec/op     vs base                │
AddGetDone-10   5.078m ± 0%   1.163m ± 0%  -77.09% (p=0.000 n=10)

              │  slice.txt   │              btree.txt               │
              │     B/op     │     B/op      vs base                │
AddGetDone-10   55.11Ki ± 0%   46.98Ki ± 0%  -14.75% (p=0.000 n=10)

              │  slice.txt  │              btree.txt              │
              │  allocs/op  │  allocs/op   vs base                │
AddGetDone-10   3.000k ± 0%   1.000k ± 0%  -66.67% (p=0.000 n=10)
```

* Add fuzztest and fix bug it found

* Fix handler

* Move into package priorityqueue

* Metric: Adds are only counted if the object didn't exist yet

* Validate correct usage of btree and tick

* Add retry metrics

* Fix missing notification when item is added

* Add tests for handler

* Controller tests

* Add some benchmarks

* Make Add non-blocking

* Revert "Make Add non-blocking"

This reverts commit ce23de5.

Speedup is tiny and at the expense of increased mem usage (which due to
increasing GC pressure is likely the explanation why its so small), so
doesn't seem worth it overall:
```
goos: darwin
goarch: arm64
pkg: sigs.k8s.io/controller-runtime/pkg/controller/priorityqueue
cpu: Apple M2 Pro
                    │ blocking.txt │          non-blocking.txt          │
                    │    sec/op    │   sec/op     vs base               │
AddGetDone-10          1.320m ± 1%   1.410m ± 0%  +6.81% (p=0.000 n=10)
AddOnly-10             373.9µ ± 1%   343.2µ ± 1%  -8.22% (p=0.000 n=10)
AddLockContended-10    375.8µ ± 1%   342.8µ ± 1%  -8.78% (p=0.000 n=10)
geomean                570.3µ        549.4µ       -3.66%

                    │ blocking.txt │            non-blocking.txt             │
                    │     B/op     │     B/op      vs base                   │
AddGetDone-10         109.9Ki ± 0%   164.2Ki ± 0%     +49.42% (p=0.000 n=10)
AddOnly-10              553.0 ± 2%   56045.0 ± 0%  +10034.72% (p=0.000 n=10)
AddLockContended-10     569.0 ± 6%   56045.0 ± 0%   +9749.74% (p=0.000 n=10)
geomean               3.207Ki        78.94Ki        +2361.60%

                    │ blocking.txt │            non-blocking.txt             │
                    │  allocs/op   │  allocs/op    vs base                   │
AddGetDone-10          3.013k ± 0%    5.001k ± 0%     +65.98% (p=0.000 n=10)
AddOnly-10              16.00 ± 6%   2000.00 ± 0%  +12400.00% (p=0.000 n=10)
AddLockContended-10     16.00 ± 6%   2000.00 ± 0%  +12400.00% (p=0.000 n=10)
geomean                 91.71         2.715k        +2860.01%
```

* Remove unneccesarry timestamp

* Consolidate require directiv

* Godocs and simplification

* Fix priorityqueue defaulting

* Avoid unnecessary else when returning
  • Loading branch information
alvaroaleman authored Dec 6, 2024
1 parent 203ef4e commit aea2e32
Show file tree
Hide file tree
Showing 20 changed files with 1,599 additions and 106 deletions.
8 changes: 8 additions & 0 deletions .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,14 @@ issues:
- linters:
- dupl
path: _test\.go
- linters:
- revive
path: .*/internal/.*
- linters:
- unused
# Seems to incorrectly trigger on the two implementations that are only
# used through an interface and not directly..?
path: pkg/controller/priorityqueue/metrics\.go

run:
go: "1.23"
Expand Down
3 changes: 3 additions & 0 deletions .gomodcheck.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,6 @@ excludedModules:
# --- test dependencies:
- github.com/onsi/ginkgo/v2
- github.com/onsi/gomega

# --- We want a newer version with generics support for this
- github.com/google/btree
73 changes: 73 additions & 0 deletions examples/priorityqueue/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/*
Copyright 2024 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package main

import (
"context"
"fmt"
"os"
"time"

corev1 "k8s.io/api/core/v1"
"sigs.k8s.io/controller-runtime/pkg/builder"
kubeconfig "sigs.k8s.io/controller-runtime/pkg/client/config"
"sigs.k8s.io/controller-runtime/pkg/config"
"sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/log/zap"
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/manager/signals"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
)

func init() {
}

func main() {
if err := run(); err != nil {
fmt.Fprintf(os.Stderr, "%v\n", err)
os.Exit(1)
}
}

func run() error {
log.SetLogger(zap.New())

// Setup a Manager
mgr, err := manager.New(kubeconfig.GetConfigOrDie(), manager.Options{
Controller: config.Controller{UsePriorityQueue: true},
})
if err != nil {
return fmt.Errorf("failed to set up controller-manager: %w", err)
}

if err := builder.ControllerManagedBy(mgr).
For(&corev1.ConfigMap{}).
Complete(reconcile.Func(func(ctx context.Context, r reconcile.Request) (reconcile.Result, error) {
log.FromContext(ctx).Info("Reconciling")
time.Sleep(10 * time.Second)

return reconcile.Result{}, nil
})); err != nil {
return fmt.Errorf("failed to set up controller: %w", err)
}

if err := mgr.Start(signals.SetupSignalHandler()); err != nil {
return fmt.Errorf("failed to start manager: %w", err)
}

return nil
}
1 change: 1 addition & 0 deletions examples/scratch-env/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ require (
github.com/go-openapi/swag v0.23.0 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang/protobuf v1.5.4 // indirect
github.com/google/btree v1.1.3 // indirect
github.com/google/gnostic-models v0.6.8 // indirect
github.com/google/go-cmp v0.6.0 // indirect
github.com/google/gofuzz v1.2.0 // indirect
Expand Down
2 changes: 2 additions & 0 deletions examples/scratch-env/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q=
github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q=
github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek=
github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps=
github.com/google/btree v1.1.3 h1:CVpQJjYgC4VbzxeGVHfvZrv1ctoYCAI8vbl07Fcxlyg=
github.com/google/btree v1.1.3/go.mod h1:qOPhT0dTNdNzV6Z/lhRX0YXUafgPLFUh+gZMl761Gm4=
github.com/google/gnostic-models v0.6.8 h1:yo/ABAfM5IMRsS1VnXjTBvUb61tFIHozhlYvRgGre9I=
github.com/google/gnostic-models v0.6.8/go.mod h1:5n7qKqH0f5wFt+aWF8CW6pZLLNOfYuF5OpfBSENuI8U=
github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ require (
github.com/evanphx/json-patch/v5 v5.9.0
github.com/go-logr/logr v1.4.2
github.com/go-logr/zapr v1.3.0
github.com/google/btree v1.1.3
github.com/google/go-cmp v0.6.0
github.com/google/gofuzz v1.2.0
github.com/onsi/ginkgo/v2 v2.21.0
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q=
github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q=
github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek=
github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps=
github.com/google/btree v1.1.3 h1:CVpQJjYgC4VbzxeGVHfvZrv1ctoYCAI8vbl07Fcxlyg=
github.com/google/btree v1.1.3/go.mod h1:qOPhT0dTNdNzV6Z/lhRX0YXUafgPLFUh+gZMl761Gm4=
github.com/google/cel-go v0.22.0 h1:b3FJZxpiv1vTMo2/5RDUqAHPxkT8mmMfJIrq1llbf7g=
github.com/google/cel-go v0.22.0/go.mod h1:BuznPXXfQDpXKWQ9sPW3TzlAJN5zzFe+i9tIs0yC4s8=
github.com/google/gnostic-models v0.6.8 h1:yo/ABAfM5IMRsS1VnXjTBvUb61tFIHozhlYvRgGre9I=
Expand Down
8 changes: 4 additions & 4 deletions pkg/builder/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ func (blder *TypedBuilder[request]) Watches(
) *TypedBuilder[request] {
input := WatchesInput[request]{
obj: object,
handler: eventHandler,
handler: handler.WithLowPriorityWhenUnchanged(eventHandler),
}
for _, opt := range opts {
opt.ApplyToWatches(&input)
Expand Down Expand Up @@ -317,7 +317,7 @@ func (blder *TypedBuilder[request]) doWatch() error {
}

var hdler handler.TypedEventHandler[client.Object, request]
reflect.ValueOf(&hdler).Elem().Set(reflect.ValueOf(&handler.EnqueueRequestForObject{}))
reflect.ValueOf(&hdler).Elem().Set(reflect.ValueOf(handler.WithLowPriorityWhenUnchanged(&handler.EnqueueRequestForObject{})))
allPredicates := append([]predicate.Predicate(nil), blder.globalPredicates...)
allPredicates = append(allPredicates, blder.forInput.predicates...)
src := source.TypedKind(blder.mgr.GetCache(), obj, hdler, allPredicates...)
Expand All @@ -341,11 +341,11 @@ func (blder *TypedBuilder[request]) doWatch() error {
}

var hdler handler.TypedEventHandler[client.Object, request]
reflect.ValueOf(&hdler).Elem().Set(reflect.ValueOf(handler.EnqueueRequestForOwner(
reflect.ValueOf(&hdler).Elem().Set(reflect.ValueOf(handler.WithLowPriorityWhenUnchanged(handler.EnqueueRequestForOwner(
blder.mgr.GetScheme(), blder.mgr.GetRESTMapper(),
blder.forInput.object,
opts...,
)))
))))
allPredicates := append([]predicate.Predicate(nil), blder.globalPredicates...)
allPredicates = append(allPredicates, own.predicates...)
src := source.TypedKind(blder.mgr.GetCache(), obj, hdler, allPredicates...)
Expand Down
6 changes: 6 additions & 0 deletions pkg/config/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,4 +53,10 @@ type Controller struct {
// NeedLeaderElection indicates whether the controller needs to use leader election.
// Defaults to true, which means the controller will use leader election.
NeedLeaderElection *bool

// UsePriorityQueue configures the controllers queue to use the controller-runtime provided
// priority queue.
//
// Note: This flag is disabled by default until a future version. It's currently in beta.
UsePriorityQueue bool
}
12 changes: 11 additions & 1 deletion pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"k8s.io/client-go/util/workqueue"
"k8s.io/klog/v2"

"sigs.k8s.io/controller-runtime/pkg/controller/priorityqueue"
"sigs.k8s.io/controller-runtime/pkg/internal/controller"
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
Expand Down Expand Up @@ -189,11 +190,20 @@ func NewTypedUnmanaged[request comparable](name string, mgr manager.Manager, opt
}

if options.RateLimiter == nil {
options.RateLimiter = workqueue.DefaultTypedControllerRateLimiter[request]()
if mgr.GetControllerOptions().UsePriorityQueue {
options.RateLimiter = workqueue.NewTypedItemExponentialFailureRateLimiter[request](5*time.Millisecond, 1000*time.Second)
} else {
options.RateLimiter = workqueue.DefaultTypedControllerRateLimiter[request]()
}
}

if options.NewQueue == nil {
options.NewQueue = func(controllerName string, rateLimiter workqueue.TypedRateLimiter[request]) workqueue.TypedRateLimitingInterface[request] {
if mgr.GetControllerOptions().UsePriorityQueue {
return priorityqueue.New(controllerName, func(o *priorityqueue.Opts[request]) {
o.RateLimiter = rateLimiter
})
}
return workqueue.NewTypedRateLimitingQueueWithConfig(rateLimiter, workqueue.TypedRateLimitingQueueConfig[request]{
Name: controllerName,
})
Expand Down
37 changes: 37 additions & 0 deletions pkg/controller/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (

"sigs.k8s.io/controller-runtime/pkg/config"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/controller/priorityqueue"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/handler"
internalcontroller "sigs.k8s.io/controller-runtime/pkg/internal/controller"
Expand Down Expand Up @@ -437,5 +438,41 @@ var _ = Describe("controller.Controller", func() {
_, ok := c.(manager.LeaderElectionRunnable)
Expect(ok).To(BeTrue())
})

It("should configure a priority queue if UsePriorityQueue is set", func() {
m, err := manager.New(cfg, manager.Options{
Controller: config.Controller{UsePriorityQueue: true},
})
Expect(err).NotTo(HaveOccurred())

c, err := controller.New("new-controller-16", m, controller.Options{
Reconciler: rec,
})
Expect(err).NotTo(HaveOccurred())

ctrl, ok := c.(*internalcontroller.Controller[reconcile.Request])
Expect(ok).To(BeTrue())

q := ctrl.NewQueue("foo", nil)
_, ok = q.(priorityqueue.PriorityQueue[reconcile.Request])
Expect(ok).To(BeTrue())
})

It("should not configure a priority queue if UsePriorityQueue is not set", func() {
m, err := manager.New(cfg, manager.Options{})
Expect(err).NotTo(HaveOccurred())

c, err := controller.New("new-controller-17", m, controller.Options{
Reconciler: rec,
})
Expect(err).NotTo(HaveOccurred())

ctrl, ok := c.(*internalcontroller.Controller[reconcile.Request])
Expect(ok).To(BeTrue())

q := ctrl.NewQueue("foo", nil)
_, ok = q.(priorityqueue.PriorityQueue[reconcile.Request])
Expect(ok).To(BeFalse())
})
})
})
Loading

0 comments on commit aea2e32

Please sign in to comment.