Conversation
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## develop #3235 +/- ##
===========================================
+ Coverage 46.76% 47.75% +0.98%
===========================================
Files 295 463 +168
Lines 17172 33979 +16807
===========================================
+ Hits 8031 16227 +8196
- Misses 8287 16436 +8149
- Partials 854 1316 +462 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
| // those signals' original behavior is exit with dump ths stack, so we try to keep the behavior | ||
| for _, dumpSignal := range DumpHeapShutdownSignals { | ||
| if sig == dumpSignal { | ||
| debug.WriteHeapDump(os.Stdout.Fd()) | ||
| } | ||
| } | ||
| os.Exit(0) | ||
|
|
||
| }() | ||
| } | ||
| } | ||
|
|
||
| // BeforeShutdown provides processing flow before shutdown | ||
| func BeforeShutdown() { | ||
| destroyAllRegistries() | ||
| // waiting for a short time so that the clients have enough time to get the notification that server shutdowns | ||
| // The value of configuration depends on how long the clients will get notification. | ||
| waitAndAcceptNewRequests() | ||
|
|
||
| // reject sending/receiving the new request, but keeping waiting for accepting requests | ||
| waitForSendingAndReceivingRequests() | ||
|
|
||
| // destroy all protocols |
There was a problem hiding this comment.
do not delete these comments
There was a problem hiding this comment.
Sorry, I will restore annotations
|
| var ok bool | ||
| tripleProtocol = tp.(*TripleProtocol) | ||
| if !ok { |
There was a problem hiding this comment.
类型断言写错了:tripleProtocol = tp.(*TripleProtocol) 没有捕获 ok,而 var ok bool 声明后从未赋值,始终是 false。导致 if !ok { return nil } 永远成立,回调永远提前返回,grpc.GracefulStop() 从不执行。triple 的 graceful shutdown 完全失效。
建议改为:
tripleProtocol, ok := tp.(*TripleProtocol)
if !ok {
return nil
}There was a problem hiding this comment.
Sorry, I'll correct it
| func SetGracefulShutdownCallback(name string, f GracefulShutdownCallback) { | ||
| gracefulShutdownCallbacks[name] = f | ||
| } | ||
|
|
||
| // GetGracefulShutdownCallback returns protocol's graceful shutdown callback | ||
| func GetGracefulShutdownCallback(name string) (GracefulShutdownCallback, bool) { | ||
| f, ok := gracefulShutdownCallbacks[name] | ||
| return f, ok | ||
| } | ||
|
|
||
| // GetAllGracefulShutdownCallbacks returns all protocol's graceful shutdown callbacks | ||
| func GetAllGracefulShutdownCallbacks() map[string]GracefulShutdownCallback { | ||
| return gracefulShutdownCallbacks |
There was a problem hiding this comment.
gracefulShutdownCallbacks 是全局 map,SetGracefulShutdownCallback 和 GetAllGracefulShutdownCallbacks 没有任何锁保护,Go race detector 会报警。
GetAllGracefulShutdownCallbacks 直接返回内部 map 引用,调用方若并发修改则 crash。
建议:加 sync.RWMutex 保护;GetAllGracefulShutdownCallbacks 返回副本而不是原始 map。
There was a problem hiding this comment.
Sorry, I'll correct it
| func notifyLongConnectionConsumers() { | ||
| logger.Info("Graceful shutdown --- Notify long connection consumers.") | ||
|
|
||
| notifyTimeout := 3 * time.Second |
There was a problem hiding this comment.
notifyTimeout 硬编码 3s,与 ShutdownConfig 中 StepTimeout、ConsumerUpdateWaitTime 等配置体系完全割裂,无法通过配置控制。
建议:函数签名改为接收 shutdown *global.ShutdownConfig,复用已有的 timeout 配置字段,或新增 NotifyTimeout 字段。
| if attempt < maxRetries { | ||
| delay := baseDelay | ||
| for i := 0; i < attempt; i++ { | ||
| delay *= 2 |
There was a problem hiding this comment.
三次重试的总等待时间:500ms + 1s + 2s = 3.5s,超过了外层 context timeout 3s,实际最多只能完成 1-2 次重试,配置自相矛盾。
另外 go.mod 里本就有 github.com/cenkalti/backoff/v4 依赖,不需要手写指数退避逻辑,直接用更可靠。
建议:要么把 notifyTimeout 调大(至少 10s),要么减少重试次数/延迟;退避逻辑改用 cenkalti/backoff/v4。
| isConnectionError := strings.Contains(errMsg, "client has closed") || | ||
| strings.Contains(errMsg, "connection") || |
There was a problem hiding this comment.
strings.Contains(errMsg, "connection") 过于宽泛,任何包含 "connection" 字样的业务错误(如 "database connection pool exhausted"、"connection refused by business logic")都会被误判为连接关闭错误,将正常 invoker 标记为 closing 并 SetAvailable(false),导致请求被永久拒绝直到 30s 过期。
建议:改用 errors.Is 对已知错误类型匹配,或缩窄字符串匹配为 "connection reset"、"use of closed network connection" 等具体错误。
There was a problem hiding this comment.
Sorry, I'll correct it
| if time.Now().Before(expireTime.(time.Time)) { | ||
| return true | ||
| } | ||
| f.closingInvokers.Delete(key) |
There was a problem hiding this comment.
markClosingInvoker 调用了 bi.SetAvailable(false),30s 过期后 closingInvokers.Delete(key) 把节点从 map 移除,但 IsAvailable() 仍然是 false,负载均衡器会永久跳过该节点,产生永久不可用的 invoker。
建议:closingInvokers.Delete(key) 之后同步调用 bi.SetAvailable(true) 恢复状态;或者干脆不调用 SetAvailable(false),只靠 closingInvokers 的过期机制控制路由。
| gp.serverLock.Lock() | ||
| defer gp.serverLock.Unlock() | ||
|
|
||
| for _, server := range gp.serverMap { | ||
| server.GracefulStop() |
There was a problem hiding this comment.
GracefulStop() 是阻塞调用,会等待所有活跃 RPC 完成。在持有 serverLock 的情况下调用,若 GracefulStop() 等待的 RPC handler 内部触发了 Export() 等需要 serverLock 的操作,会死锁。triple.go 的对应实现存在同样问题。
建议:先拷贝 server 列表,释放锁,再逐一调用 GracefulStop():
gp.serverLock.Lock()
servers := make([]*Server, 0, len(gp.serverMap))
for _, s := range gp.serverMap {
servers = append(servers, s)
}
gp.serverLock.Unlock()
for _, s := range servers {
s.GracefulStop()
}|
|
||
| // add closing flag to response | ||
| if f.isClosing() { | ||
| result.AddAttachment(constant.GracefulShutdownClosingKey, "true") |
There was a problem hiding this comment.
result 可能为 nil(invoker 返回 nil result 时),直接调用 result.AddAttachment 会 panic。
建议:加 nil 检查:
if f.isClosing() && result != nil {
result.AddAttachment(constant.GracefulShutdownClosingKey, "true")
}
Alanxtl
left a comment
There was a problem hiding this comment.
don't forget to write a sample (integration test) for this feature
| // SetGracefulShutdownCallback sets protocol-level graceful shutdown callback | ||
| func SetGracefulShutdownCallback(name string, f GracefulShutdownCallback) { | ||
| gracefulShutdownCallbacks[name] = f | ||
| } | ||
|
|
||
| // GetGracefulShutdownCallback returns protocol's graceful shutdown callback | ||
| func GetGracefulShutdownCallback(name string) (GracefulShutdownCallback, bool) { | ||
| f, ok := gracefulShutdownCallbacks[name] | ||
| return f, ok | ||
| } | ||
|
|
||
| // GetAllGracefulShutdownCallbacks returns all protocol's graceful shutdown callbacks | ||
| func GetAllGracefulShutdownCallbacks() map[string]GracefulShutdownCallback { | ||
| return gracefulShutdownCallbacks | ||
| } |
There was a problem hiding this comment.
- getter和setter风格的命名不是go的习惯,可以考虑改成
func RegisterGracefulShutdownCallback(name string, f GracefulShutdownCallback)
func LookupGracefulShutdownCallback(name string) (GracefulShutdownCallback, bool)
func GracefulShutdownCallbacks() map[string]GracefulShutdownCallback- 并发访问问题需要再考虑一下
- 这个SetGracefulShutdownCallback是否允许重复注册,如:
extension.SetGracefulShutdownCallback(GRPC, cb1)
extension.SetGracefulShutdownCallback(GRPC, cb2)如果不允许需要加一下判断
- 不要直接把内部 map 返回出去,如果需要的话可以返回一个拷贝
| /** | ||
| * AddCustomShutdownCallback | ||
| * you should not make any assumption about the order. | ||
| * For example, if you have more than one callbacks, and you wish the order is: | ||
| * callback1() | ||
| * callback2() | ||
| * ... | ||
| * callbackN() | ||
| * Then you should put then together: | ||
| * func callback() { | ||
| * callback1() | ||
| * callback2() | ||
| * ... | ||
| * callbackN() | ||
| * } | ||
| * I think the order of custom callbacks should be decided by the users. | ||
| * Even though I can design a mechanism to support the ordered custom callbacks, | ||
| * the benefit of that mechanism is low. | ||
| * And it may introduce much complication for another users. | ||
| */ |
|
|
||
| if bi, ok := invoker.(*base.BaseInvoker); ok { | ||
| bi.SetAvailable(false) | ||
| logger.Infof("Graceful shutdown: set invoker unavailable: %s, IsAvailable now=%v", |
There was a problem hiding this comment.
| logger.Infof("Graceful shutdown: set invoker unavailable: %s, IsAvailable now=%v", | |
| logger.Infof("Graceful shutdown --- Set invoker unavailable: %s, IsAvailable now=%v", |
| expireTime := time.Now().Add(f.getClosingInvokerExpireTime()) | ||
| f.closingInvokers.Store(key, expireTime) | ||
|
|
||
| logger.Infof("Graceful shutdown: connection error detected for invoker: %s, marking as closing, will expire at %v, IsAvailable=%v", |
| defaultStepTimeout = 3 * time.Second | ||
| defaultConsumerUpdateWaitTime = 3 * time.Second | ||
| defaultOfflineRequestWindowTimeout = 3 * time.Second | ||
| // retry config |
There was a problem hiding this comment.
| // retry config | |
| // retry config |
| // SetAvailable sets available flag | ||
| func (bi *BaseInvoker) SetAvailable(available bool) { | ||
| bi.available.Store(available) | ||
| } |



What kind of change does this PR introduce?
What is the current behavior?
Related Issue: #1977
What is the new behavior?
This PR adds graceful shutdown enhancement with the following features:
SetGracefulShutdownCallbackfor protocol-specific shutdown logicGracefulStop()