diff --git a/pkg/manager/manager.go b/pkg/manager/manager.go index c3ae317b04..09207fa302 100644 --- a/pkg/manager/manager.go +++ b/pkg/manager/manager.go @@ -417,7 +417,7 @@ func New(config *rest.Config, options Options) (Manager, error) { } errChan := make(chan error, 1) - runnables := newRunnables(options.BaseContext, errChan) + runnables := newRunnables(options.BaseContext, errChan).withLogger(options.Logger) return &controllerManager{ stopProcedureEngaged: ptr.To(int64(0)), cluster: cluster, diff --git a/pkg/manager/manager_test.go b/pkg/manager/manager_test.go index 247a33f9dc..25bc02c3e1 100644 --- a/pkg/manager/manager_test.go +++ b/pkg/manager/manager_test.go @@ -1886,6 +1886,54 @@ var _ = Describe("manger.Manager", func() { Eventually(func() error { return goleak.Find(currentGRs) }).Should(Succeed()) }) + It("should not leak goroutines when a runnable returns error slowly after being signaled to stop", func() { + // This test reproduces the race condition where the manager's Start method + // exits due to context cancellation, leaving no one to drain errChan + + currentGRs := goleak.IgnoreCurrent() + + // Create manager with a very short graceful shutdown timeout to reliablytrigger the race condition + shortGracefulShutdownTimeout := 10 * time.Millisecond + m, err := New(cfg, Options{ + GracefulShutdownTimeout: &shortGracefulShutdownTimeout, + }) + Expect(err).NotTo(HaveOccurred()) + + // Add the slow runnable that will return an error after some delay + for i := 0; i < 3; i++ { + slowRunnable := RunnableFunc(func(c context.Context) error { + <-c.Done() + + // Simulate some work that delays the error from being returned + // Choosing a large delay to reliably trigger the race condition + time.Sleep(100 * time.Millisecond) + + // This simulates the race condition where runnables try to send + // errors after the manager has stopped reading from errChan + return errors.New("slow runnable error") + }) + + Expect(m.Add(slowRunnable)).To(Succeed()) + } + + ctx, cancel := context.WithTimeout(context.Background(), 50*time.Millisecond) + defer cancel() + go func() { + defer GinkgoRecover() + Expect(m.Start(ctx)).To(HaveOccurred()) // We expect error here because the slow runnables will return errors + }() + + // Wait for context to be cancelled + <-ctx.Done() + + // Give time for any leaks to become apparent. This makes sure that we don't false alarm on go routine leaks because runnables are still running. + time.Sleep(300 * time.Millisecond) + + // force-close keep-alive connections + clientTransport.CloseIdleConnections() + Eventually(func() error { return goleak.Find(currentGRs) }).Should(Succeed()) + }) + It("should provide a function to get the Config", func() { m, err := New(cfg, Options{}) Expect(err).NotTo(HaveOccurred()) diff --git a/pkg/manager/runnable_group.go b/pkg/manager/runnable_group.go index db5cda7c88..4bf4aff43c 100644 --- a/pkg/manager/runnable_group.go +++ b/pkg/manager/runnable_group.go @@ -5,6 +5,7 @@ import ( "errors" "sync" + "github.com/go-logr/logr" "sigs.k8s.io/controller-runtime/pkg/webhook" ) @@ -46,6 +47,16 @@ func newRunnables(baseContext BaseContextFunc, errChan chan error) *runnables { } } +// withLogger returns the runnables with the logger set for all runnable groups. +func (r *runnables) withLogger(logger logr.Logger) *runnables { + r.HTTPServers.withLogger(logger) + r.Webhooks.withLogger(logger) + r.Caches.withLogger(logger) + r.LeaderElection.withLogger(logger) + r.Others.withLogger(logger) + return r +} + // Add adds a runnable to closest group of runnable that they belong to. // // Add should be able to be called before and after Start, but not after StopAndWait. @@ -105,6 +116,9 @@ type runnableGroup struct { // wg is an internal sync.WaitGroup that allows us to properly stop // and wait for all the runnables to finish before returning. wg *sync.WaitGroup + + // logger is used for logging when errors are dropped during shutdown + logger logr.Logger } func newRunnableGroup(baseContext BaseContextFunc, errChan chan error) *runnableGroup { @@ -113,12 +127,18 @@ func newRunnableGroup(baseContext BaseContextFunc, errChan chan error) *runnable errChan: errChan, ch: make(chan *readyRunnable), wg: new(sync.WaitGroup), + logger: logr.Discard(), // Default to no-op logger } r.ctx, r.cancel = context.WithCancel(baseContext()) return r } +// withLogger sets the logger for this runnable group. +func (r *runnableGroup) withLogger(logger logr.Logger) { + r.logger = logger +} + // Started returns true if the group has started. func (r *runnableGroup) Started() bool { r.start.Lock() @@ -224,7 +244,27 @@ func (r *runnableGroup) reconcile() { // Start the runnable. if err := rn.Start(r.ctx); err != nil { - r.errChan <- err + // Check if we're during the shutdown process. + r.stop.RLock() + isStopped := r.stopped + r.stop.RUnlock() + + if isStopped { + // During shutdown, try to send error first (error drain goroutine might still be running) + // but drop if it would block to prevent goroutine leaks + select { + case r.errChan <- err: + // Error sent successfully (error drain goroutine is still running) + default: + // Error drain goroutine has exited, drop error to prevent goroutine leak + if !errors.Is(err, context.Canceled) { // don't log context.Canceled errors as they are expected during shutdown + r.logger.Info("error dropped during shutdown to prevent goroutine leak", "error", err) + } + } + } else { + // During normal operation, always try to send errors (may block briefly) + r.errChan <- err + } } }(runnable) }