Skip to content

🐛 Fix(manager): Prevent goroutine leak on shutdown timeout #3247

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/manager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -417,7 +417,7 @@ func New(config *rest.Config, options Options) (Manager, error) {
}

errChan := make(chan error, 1)
runnables := newRunnables(options.BaseContext, errChan)
runnables := newRunnables(options.BaseContext, errChan).withLogger(options.Logger)
return &controllerManager{
stopProcedureEngaged: ptr.To(int64(0)),
cluster: cluster,
Expand Down
48 changes: 48 additions & 0 deletions pkg/manager/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1886,12 +1886,60 @@
Eventually(func() error { return goleak.Find(currentGRs) }).Should(Succeed())
})

It("should not leak goroutines when a runnable returns error slowly after being signaled to stop", func() {
// This test reproduces the race condition where the manager's Start method
// exits due to context cancellation, leaving no one to drain errChan

currentGRs := goleak.IgnoreCurrent()

// Create manager with a very short graceful shutdown timeout to reliablytrigger the race condition
shortGracefulShutdownTimeout := 10 * time.Millisecond
m, err := New(cfg, Options{
GracefulShutdownTimeout: &shortGracefulShutdownTimeout,
})
Expect(err).NotTo(HaveOccurred())

// Add the slow runnable that will return an error after some delay
for i := 0; i < 3; i++ {
slowRunnable := RunnableFunc(func(c context.Context) error {
<-c.Done()

// Simulate some work that delays the error from being returned
// Choosing a large delay to reliably trigger the race condition
time.Sleep(100 * time.Millisecond)

// This simulates the race condition where runnables try to send
// errors after the manager has stopped reading from errChan
return errors.New("slow runnable error")
})

Expect(m.Add(slowRunnable)).To(Succeed())
}

ctx, cancel := context.WithTimeout(context.Background(), 50*time.Millisecond)
defer cancel()
go func() {
defer GinkgoRecover()
m.Start(ctx)
}()

// Wait for context to be cancelled
<-ctx.Done()

// Give time for any leaks to become apparent. This makes sure that we don't false alarm on go routine leaks because runnables are still running.
time.Sleep(300 * time.Millisecond)

// force-close keep-alive connections
clientTransport.CloseIdleConnections()
Eventually(func() error { return goleak.Find(currentGRs) }).Should(Succeed())
})

It("should provide a function to get the Config", func() {
m, err := New(cfg, Options{})
Expect(err).NotTo(HaveOccurred())
mgr, ok := m.(*controllerManager)
Expect(ok).To(BeTrue())
Expect(m.GetConfig()).To(Equal(mgr.cluster.GetConfig()))

Check failure on line 1942 in pkg/manager/manager_test.go

View workflow job for this annotation

GitHub Actions / lint

Error return value of `m.Start` is not checked (errcheck)
})

It("should provide a function to get the Client", func() {
Expand Down
33 changes: 32 additions & 1 deletion pkg/manager/runnable_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
"errors"
"sync"

"github.com/go-logr/logr"
"sigs.k8s.io/controller-runtime/pkg/webhook"
)

Expand Down Expand Up @@ -46,6 +47,16 @@
}
}

// withLogger sets the logger for all runnable groups.
func (r *runnables) withLogger(logger logr.Logger) *runnables {
r.HTTPServers.withLogger(logger)
r.Webhooks.withLogger(logger)
r.Caches.withLogger(logger)
r.LeaderElection.withLogger(logger)
r.Others.withLogger(logger)
return r
}

// Add adds a runnable to closest group of runnable that they belong to.
//
// Add should be able to be called before and after Start, but not after StopAndWait.
Expand Down Expand Up @@ -105,6 +116,9 @@
// wg is an internal sync.WaitGroup that allows us to properly stop
// and wait for all the runnables to finish before returning.
wg *sync.WaitGroup

// logger is used for logging when errors are dropped during shutdown
logger logr.Logger
}

func newRunnableGroup(baseContext BaseContextFunc, errChan chan error) *runnableGroup {
Expand All @@ -113,12 +127,19 @@
errChan: errChan,
ch: make(chan *readyRunnable),
wg: new(sync.WaitGroup),
logger: logr.Discard(), // Default to no-op logger
}

r.ctx, r.cancel = context.WithCancel(baseContext())
return r
}

// withLogger sets the logger for this runnable group.
func (r *runnableGroup) withLogger(logger logr.Logger) *runnableGroup {

Check failure on line 138 in pkg/manager/runnable_group.go

View workflow job for this annotation

GitHub Actions / lint

(*runnableGroup).withLogger - result 0 (*sigs.k8s.io/controller-runtime/pkg/manager.runnableGroup) is never used (unparam)
r.logger = logger
return r
}

// Started returns true if the group has started.
func (r *runnableGroup) Started() bool {
r.start.Lock()
Expand Down Expand Up @@ -224,7 +245,17 @@

// Start the runnable.
if err := rn.Start(r.ctx); err != nil {
r.errChan <- err
// Send error with context awareness to prevent blocking during shutdown
select {
case r.errChan <- err:
// Error sent successfully
case <-r.ctx.Done():
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shutdown is triggered by cancelling the context and select is not ordered, i.E. if both clauses are valid, its not deterministic which it will use. Shouldn't this be a default instead to ensure we try sending on the errChan first?

// Context cancelled (shutdown), drop error to prevent blocking forever
// This prevents goroutine leaks when error drain go routine has exited after timeout
if !errors.Is(err, context.Canceled) { // don't log context.Canceled errors as they are expected during shutdown
r.logger.Info("error dropped during shutdown to prevent goroutine leak", "error", err)
}
}
}
}(runnable)
}
Expand Down
Loading