Skip to content
Open
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions common/constant/key.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ const (
GracefulShutdownProviderFilterKey = "pshutdown"
GracefulShutdownConsumerFilterKey = "cshutdown"
GracefulShutdownFilterShutdownConfig = "GracefulShutdownFilterShutdownConfig"
GracefulShutdownClosingKey = "closing"
HystrixConsumerFilterKey = "hystrix_consumer"
HystrixProviderFilterKey = "hystrix_provider"
MetricsFilterKey = "metrics"
Expand Down
52 changes: 29 additions & 23 deletions common/extension/graceful_shutdown.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,35 +19,41 @@ package extension

import (
"container/list"
"context"
)

var customShutdownCallbacks = list.New()

/**
* AddCustomShutdownCallback
* you should not make any assumption about the order.
* For example, if you have more than one callbacks, and you wish the order is:
* callback1()
* callback2()
* ...
* callbackN()
* Then you should put then together:
* func callback() {
* callback1()
* callback2()
* ...
* callbackN()
* }
* I think the order of custom callbacks should be decided by the users.
* Even though I can design a mechanism to support the ordered custom callbacks,
* the benefit of that mechanism is low.
* And it may introduce much complication for another users.
*/
Comment on lines -26 to -45
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这个注释也别删

// GracefulShutdownCallback is the callback for graceful shutdown
// name: protocol name such as "grpc", "tri", "dubbo"
// returns error if notify failed
type GracefulShutdownCallback func(ctx context.Context) error

var (
customShutdownCallbacks = list.New()
gracefulShutdownCallbacks = make(map[string]GracefulShutdownCallback)
)

// AddCustomShutdownCallback adds custom shutdown callback
func AddCustomShutdownCallback(callback func()) {
customShutdownCallbacks.PushBack(callback)
}

// GetAllCustomShutdownCallbacks gets all custom shutdown callbacks
// GetAllCustomShutdownCallbacks returns all custom shutdown callbacks
func GetAllCustomShutdownCallbacks() *list.List {
return customShutdownCallbacks
}

// SetGracefulShutdownCallback sets protocol-level graceful shutdown callback
func SetGracefulShutdownCallback(name string, f GracefulShutdownCallback) {
gracefulShutdownCallbacks[name] = f
}

// GetGracefulShutdownCallback returns protocol's graceful shutdown callback
func GetGracefulShutdownCallback(name string) (GracefulShutdownCallback, bool) {
f, ok := gracefulShutdownCallbacks[name]
return f, ok
}

// GetAllGracefulShutdownCallbacks returns all protocol's graceful shutdown callbacks
func GetAllGracefulShutdownCallbacks() map[string]GracefulShutdownCallback {
return gracefulShutdownCallbacks
Comment on lines +46 to +58
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

gracefulShutdownCallbacks 是全局 map,SetGracefulShutdownCallbackGetAllGracefulShutdownCallbacks 没有任何锁保护,Go race detector 会报警。

GetAllGracefulShutdownCallbacks 直接返回内部 map 引用,调用方若并发修改则 crash。

建议:加 sync.RWMutex 保护;GetAllGracefulShutdownCallbacks 返回副本而不是原始 map。

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I'll correct it

}
Comment on lines +45 to +59
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. getter和setter风格的命名不是go的习惯,可以考虑改成
func RegisterGracefulShutdownCallback(name string, f GracefulShutdownCallback)
func LookupGracefulShutdownCallback(name string) (GracefulShutdownCallback, bool)
func GracefulShutdownCallbacks() map[string]GracefulShutdownCallback
  1. 并发访问问题需要再考虑一下
  2. 这个SetGracefulShutdownCallback是否允许重复注册,如:
extension.SetGracefulShutdownCallback(GRPC, cb1)
extension.SetGracefulShutdownCallback(GRPC, cb2)

如果不允许需要加一下判断

  1. 不要直接把内部 map 返回出去,如果需要的话可以返回一个拷贝

12 changes: 2 additions & 10 deletions config/graceful_shutdown.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@ import (
const defaultShutDownTime = time.Second * 60

func gracefulShutdownInit() {
// retrieve ShutdownConfig for gracefulShutdownFilter
gracefulShutdownConsumerFilter, exist := extension.GetFilter(constant.GracefulShutdownConsumerFilterKey)
if !exist {
return
Expand All @@ -63,6 +62,7 @@ func gracefulShutdownInit() {
if !exist {
return
}
// retrieve ShutdownConfig for gracefulShutdownFilter
if filter, ok := gracefulShutdownConsumerFilter.(Setter); ok && rootConfig.Shutdown != nil {
filter.Set(constant.GracefulShutdownFilterShutdownConfig, GetShutDown())
}
Expand All @@ -84,7 +84,6 @@ func gracefulShutdownInit() {
os.Exit(0)
})
BeforeShutdown()
// those signals' original behavior is exit with dump ths stack, so we try to keep the behavior
for _, dumpSignal := range DumpHeapShutdownSignals {
if sig == dumpSignal {
debug.WriteHeapDump(os.Stdout.Fd())
Expand All @@ -99,14 +98,8 @@ func gracefulShutdownInit() {
// BeforeShutdown provides processing flow before shutdown
func BeforeShutdown() {
destroyAllRegistries()
// waiting for a short time so that the clients have enough time to get the notification that server shutdowns
// The value of configuration depends on how long the clients will get notification.
waitAndAcceptNewRequests()

// reject sending/receiving the new request, but keeping waiting for accepting requests
waitForSendingAndReceivingRequests()

// destroy all protocols
destroyProtocols()

logger.Info("Graceful shutdown --- Execute the custom callbacks.")
Expand Down Expand Up @@ -142,7 +135,6 @@ func destroyProtocols() {
func destroyProviderProtocols(consumerProtocols *gxset.HashSet) {
logger.Info("Graceful shutdown --- First destroy provider's protocols. ")
for _, protocol := range rootConfig.Protocols {
// the protocol is the consumer's protocol too, we can not destroy it.
if consumerProtocols.Contains(protocol.Name) {
continue
}
Expand Down Expand Up @@ -216,7 +208,7 @@ func waitingConsumerProcessedTimeout(shutdownConfig *ShutdownConfig) {
}

func totalTimeout() time.Duration {
timeout := defaultShutDownTime
timeout := defaultShutDownTime // 60s
if rootConfig.Shutdown != nil && rootConfig.Shutdown.GetTimeout() > timeout {
timeout = rootConfig.Shutdown.GetTimeout()
}
Expand Down
4 changes: 3 additions & 1 deletion config/root_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,6 @@ func (rc *RootConfig) Init() error {
return err
}
}

if err := rc.Application.Init(); err != nil {
return err
}
Expand Down Expand Up @@ -175,6 +174,7 @@ func (rc *RootConfig) Init() error {
if err := rc.MetadataReport.Init(rc); err != nil {
return err
}

if err := rc.Otel.Init(rc.Application); err != nil {
return err
}
Expand All @@ -189,6 +189,7 @@ func (rc *RootConfig) Init() error {
if err := initRouterConfig(rc); err != nil {
return err
}

// provider、consumer must last init
if err := rc.Provider.Init(rc); err != nil {
return err
Expand All @@ -208,6 +209,7 @@ func (rc *RootConfig) Init() error {
func (rc *RootConfig) Start() {
startOnce.Do(func() {
gracefulShutdownInit()

rc.Consumer.Load()
rc.Provider.Load()
if err := initMetadata(rc); err != nil {
Expand Down
111 changes: 109 additions & 2 deletions filter/graceful_shutdown/consumer_filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,11 @@ package graceful_shutdown

import (
"context"
"errors"
"strconv"
"strings"
"sync"
"time"
)

import (
Expand Down Expand Up @@ -50,7 +54,8 @@ func init() {
}

type consumerGracefulShutdownFilter struct {
shutdownConfig *global.ShutdownConfig
shutdownConfig *global.ShutdownConfig
closingInvokers sync.Map // map[string]time.Time (url key -> expire time)
}

func newConsumerGracefulShutdownFilter() filter.Filter {
Expand All @@ -62,15 +67,31 @@ func newConsumerGracefulShutdownFilter() filter.Filter {
return csf
}

// Invoke adds the requests count and block the new requests if application is closing
// Invoke adds the requests count and checks if invoker is closing
func (f *consumerGracefulShutdownFilter) Invoke(ctx context.Context, invoker base.Invoker, invocation base.Invocation) result.Result {
// check if invoker is closing
if f.isClosingInvoker(invoker) {
logger.Warnf("Graceful shutdown: skipping closing invoker: %s", invoker.GetURL().String())
return &result.RPCResult{Err: errors.New("provider is closing")}
}
f.shutdownConfig.ConsumerActiveCount.Inc()
return invoker.Invoke(ctx, invocation)
}

// OnResponse reduces the number of active processes then return the process result
func (f *consumerGracefulShutdownFilter) OnResponse(ctx context.Context, result result.Result, invoker base.Invoker, invocation base.Invocation) result.Result {
f.shutdownConfig.ConsumerActiveCount.Dec()

// check closing flag in response
if f.isClosingResponse(result) {
f.markClosingInvoker(invoker)
}

// handle request error
if result.Error() != nil {
f.handleRequestError(invoker, result.Error())
}

return result
}

Expand All @@ -91,3 +112,89 @@ func (f *consumerGracefulShutdownFilter) Set(name string, conf any) {
// do nothing
}
}

// isClosingInvoker checks if invoker is in closing list
func (f *consumerGracefulShutdownFilter) isClosingInvoker(invoker base.Invoker) bool {
key := invoker.GetURL().String()
if expireTime, ok := f.closingInvokers.Load(key); ok {
if time.Now().Before(expireTime.(time.Time)) {
return true
}
f.closingInvokers.Delete(key)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

markClosingInvoker 调用了 bi.SetAvailable(false),30s 过期后 closingInvokers.Delete(key) 把节点从 map 移除,但 IsAvailable() 仍然是 false,负载均衡器会永久跳过该节点,产生永久不可用的 invoker。

建议:closingInvokers.Delete(key) 之后同步调用 bi.SetAvailable(true) 恢复状态;或者干脆不调用 SetAvailable(false),只靠 closingInvokers 的过期机制控制路由。

}
return false
}

// isClosingResponse checks if response contains closing flag
func (f *consumerGracefulShutdownFilter) isClosingResponse(result result.Result) bool {
if result != nil && result.Attachments() != nil {
if v, ok := result.Attachments()[constant.GracefulShutdownClosingKey]; ok {
if v == "true" {
return true
}
}
}
return false
}

// markClosingInvoker marks invoker as closing and sets available=false
func (f *consumerGracefulShutdownFilter) markClosingInvoker(invoker base.Invoker) {
key := invoker.GetURL().String()
expireTime := time.Now().Add(f.getClosingInvokerExpireTime())
f.closingInvokers.Store(key, expireTime)

logger.Infof("Graceful shutdown: marked invoker as closing: %s, will expire at %v, IsAvailable=%v",
key, expireTime, invoker.IsAvailable())

if bi, ok := invoker.(*base.BaseInvoker); ok {
bi.SetAvailable(false)
logger.Infof("Graceful shutdown: set invoker unavailable: %s, IsAvailable now=%v",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
logger.Infof("Graceful shutdown: set invoker unavailable: %s, IsAvailable now=%v",
logger.Infof("Graceful shutdown --- Set invoker unavailable: %s, IsAvailable now=%v",

key, invoker.IsAvailable())
}
}

func (f *consumerGracefulShutdownFilter) getClosingInvokerExpireTime() time.Duration {
if f.shutdownConfig != nil && f.shutdownConfig.ClosingInvokerExpireTime != "" {
if duration, err := time.ParseDuration(f.shutdownConfig.ClosingInvokerExpireTime); err == nil && duration > 0 {
return duration
}
}
// default 30s, also try parsing numeric string as milliseconds
if f.shutdownConfig != nil && f.shutdownConfig.ClosingInvokerExpireTime != "" {
if ms, err := strconv.ParseInt(f.shutdownConfig.ClosingInvokerExpireTime, 10, 64); err == nil && ms > 0 {
return time.Duration(ms) * time.Millisecond
}
}
return 30 * time.Second
}

// handleRequestError handles request errors and marks invoker as unavailable for connection errors
func (f *consumerGracefulShutdownFilter) handleRequestError(invoker base.Invoker, err error) {
if err == nil {
return
}

// check for connection-related errors
errMsg := err.Error()
isConnectionError := strings.Contains(errMsg, "client has closed") ||
strings.Contains(errMsg, "connection") ||
Comment on lines +179 to +180
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

strings.Contains(errMsg, "connection") 过于宽泛,任何包含 "connection" 字样的业务错误(如 "database connection pool exhausted"、"connection refused by business logic")都会被误判为连接关闭错误,将正常 invoker 标记为 closing 并 SetAvailable(false),导致请求被永久拒绝直到 30s 过期。

建议:改用 errors.Is 对已知错误类型匹配,或缩窄字符串匹配为 "connection reset""use of closed network connection" 等具体错误。

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I'll correct it

strings.Contains(errMsg, "EOF") ||
strings.Contains(errMsg, "broken pipe") ||
strings.Contains(errMsg, "gRPC") && strings.Contains(errMsg, "closing") ||
strings.Contains(errMsg, "http2") && strings.Contains(errMsg, "close")

if isConnectionError {
key := invoker.GetURL().String()
expireTime := time.Now().Add(f.getClosingInvokerExpireTime())
f.closingInvokers.Store(key, expireTime)

logger.Infof("Graceful shutdown: connection error detected for invoker: %s, marking as closing, will expire at %v, IsAvailable=%v",
Copy link
Contributor

@Alanxtl Alanxtl Mar 9, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

其他的logger.Infof都改一下

key, expireTime, invoker.IsAvailable())

if bi, ok := invoker.(*base.BaseInvoker); ok {
bi.SetAvailable(false)
logger.Infof("Graceful shutdown: set invoker unavailable due to connection error: %s, IsAvailable now=%v",
key, invoker.IsAvailable())
}
}
}
16 changes: 13 additions & 3 deletions filter/graceful_shutdown/provider_filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ var (
)

func init() {
// `init()` is performed before config.Load(), so shutdownConfig will be retrieved after config was loaded.
extension.SetFilter(constant.GracefulShutdownProviderFilterKey, func() filter.Filter {
return newProviderGracefulShutdownFilter()
})
Expand Down Expand Up @@ -85,6 +84,12 @@ func (f *providerGracefulShutdownFilter) Invoke(ctx context.Context, invoker bas
// OnResponse reduces the number of active processes then return the process result
func (f *providerGracefulShutdownFilter) OnResponse(ctx context.Context, result result.Result, invoker base.Invoker, invocation base.Invocation) result.Result {
f.shutdownConfig.ProviderActiveCount.Dec()

// add closing flag to response
if f.isClosing() {
result.AddAttachment(constant.GracefulShutdownClosingKey, "true")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

result 可能为 nil(invoker 返回 nil result 时),直接调用 result.AddAttachment 会 panic。

建议:加 nil 检查:

if f.isClosing() && result != nil {
    result.AddAttachment(constant.GracefulShutdownClosingKey, "true")
}

}

return result
}

Expand All @@ -94,15 +99,13 @@ func (f *providerGracefulShutdownFilter) Set(name string, conf any) {
switch ct := conf.(type) {
case *global.ShutdownConfig:
f.shutdownConfig = ct
// only for compatibility with old config, able to directly remove after config is deleted
case *config.ShutdownConfig:
f.shutdownConfig = compatGlobalShutdownConfig(ct)
default:
logger.Warnf("the type of config for {%s} should be *global.ShutdownConfig", constant.GracefulShutdownFilterShutdownConfig)
}
return
default:
// do nothing
}
}

Expand All @@ -112,3 +115,10 @@ func (f *providerGracefulShutdownFilter) rejectNewRequest() bool {
}
return f.shutdownConfig.RejectRequest.Load()
}

func (f *providerGracefulShutdownFilter) isClosing() bool {
if f.shutdownConfig == nil {
return false
}
return f.shutdownConfig.Closing.Load()
}
6 changes: 6 additions & 0 deletions global/shutdown_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,10 @@ type ShutdownConfig struct {

// provider last received request timestamp
ProviderLastReceivedRequestTime atomic.Time

Closing atomic.Bool

ClosingInvokerExpireTime string `default:"30s" yaml:"closing-invoker-expire-time" json:"closingInvokerExpireTime,omitempty" property:"closingInvokerExpireTime"`
}

func DefaultShutdownConfig() *ShutdownConfig {
Expand Down Expand Up @@ -88,12 +92,14 @@ func (c *ShutdownConfig) Clone() *ShutdownConfig {
RejectRequestHandler: c.RejectRequestHandler,
InternalSignal: newInternalSignal,
OfflineRequestWindowTimeout: c.OfflineRequestWindowTimeout,
ClosingInvokerExpireTime: c.ClosingInvokerExpireTime,
}

newShutdownConfig.RejectRequest.Store(c.RejectRequest.Load())
newShutdownConfig.ConsumerActiveCount.Store(c.ConsumerActiveCount.Load())
newShutdownConfig.ProviderActiveCount.Store(c.ProviderActiveCount.Load())
newShutdownConfig.ProviderLastReceivedRequestTime.Store(c.ProviderLastReceivedRequestTime.Load())
newShutdownConfig.Closing.Store(c.Closing.Load())

return newShutdownConfig
}
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ require (
github.com/apache/dubbo-getty v1.4.10
github.com/apache/dubbo-go-hessian2 v1.12.5
github.com/apolloconfig/agollo/v4 v4.4.0
github.com/cenkalti/backoff/v4 v4.2.1
github.com/creasty/defaults v1.5.2
github.com/dop251/goja v0.0.0-20240220182346-e401ed450204
github.com/dop251/goja_nodejs v0.0.0-20211022123610-8dd9abb0616d
Expand Down Expand Up @@ -81,6 +80,7 @@ require (
github.com/beorn7/perks v1.0.1 // indirect
github.com/bits-and-blooms/bitset v1.2.0 // indirect
github.com/buger/jsonparser v1.1.1 // indirect
github.com/cenkalti/backoff/v4 v4.2.1 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/coreos/go-semver v0.3.0 // indirect
github.com/coreos/go-systemd/v22 v22.3.2 // indirect
Expand Down
Loading
Loading