Skip to content

Commit 9ddcbac

Browse files
CairryCairry
authored andcommitted
✨ Supported multi instance leader elector
1 parent 73ff51a commit 9ddcbac

File tree

15 files changed

+900
-91
lines changed

15 files changed

+900
-91
lines changed

alert/alert.go

Lines changed: 190 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,16 @@
11
package alert
22

33
import (
4+
"context"
45
"watchAlert/alert/consumer"
56
"watchAlert/alert/eval"
67
"watchAlert/alert/probing"
78
"watchAlert/internal/ctx"
9+
"watchAlert/internal/global"
10+
"watchAlert/pkg/client"
11+
"watchAlert/pkg/tools"
12+
13+
"github.com/zeromicro/go-zero/core/logc"
814
)
915

1016
var (
@@ -13,18 +19,199 @@ var (
1319

1420
ProductProbing probing.ProductProbing
1521
ConsumeProbing probing.ConsumeProbing
22+
23+
// Leader 选举器
24+
LeaderElector *tools.LeaderElector
25+
26+
// 消息订阅取消函数
27+
subscriberCancels []context.CancelFunc
28+
29+
// 选举开关
30+
leaderElectionEnabled bool
1631
)
1732

1833
func Initialize(ctx *ctx.Context) {
1934
// 初始化告警规则评估任务
2035
AlertRule = eval.NewAlertRuleEval(ctx)
21-
AlertRule.RestartAllEvals()
22-
2336
ConsumerWork = consumer.NewConsumerWork(ctx)
24-
ConsumerWork.RestartAllConsumers()
2537

2638
// 初始化拨测任务
2739
ConsumeProbing = probing.NewProbingConsumerTask(ctx)
2840
ProductProbing = probing.NewProbingTask(ctx)
41+
42+
// 检查 Leader 选举是否启用
43+
leaderElectionEnabled = global.Config.Server.EnableElection
44+
45+
if leaderElectionEnabled {
46+
// 启用 Leader 选举模式
47+
logc.Infof(ctx.Ctx, "Leader 选举已启用,开始选举流程...")
48+
LeaderElector = tools.NewLeaderElector(
49+
ctx.Ctx,
50+
client.Redis,
51+
loadRules,
52+
unloadRules,
53+
)
54+
// 启动 Leader 选举
55+
LeaderElector.Start()
56+
} else {
57+
loadRules()
58+
}
59+
}
60+
61+
// loadRules 加载所有规则(成为 Leader 时调用)
62+
func loadRules() {
63+
logc.Infof(ctx.Ctx, "本节点为 Leader 节点,开始加载规则...")
64+
65+
// 重启所有告警规则评估器
66+
AlertRule.RestartAllEvals()
67+
68+
// 重启所有故障中心消费者
69+
ConsumerWork.RestartAllConsumers()
70+
71+
// 重启所有拨测任务
2972
ProductProbing.RePushRule(&ConsumeProbing)
73+
74+
// 启动 Redis 消息订阅,监听规则变更
75+
startMessageSubscribers()
76+
}
77+
78+
// startMessageSubscribers 启动消息订阅器
79+
func startMessageSubscribers() {
80+
subscriberCancels = make([]context.CancelFunc, 0)
81+
82+
// 订阅告警规则重载消息
83+
subCtx1, cancel1 := context.WithCancel(ctx.Ctx)
84+
subscriberCancels = append(subscriberCancels, cancel1)
85+
go tools.SubscribeReloadMessages(subCtx1, client.Redis, tools.ChannelRuleReload, handleRuleReload)
86+
87+
// 订阅故障中心重载消息
88+
subCtx2, cancel2 := context.WithCancel(ctx.Ctx)
89+
subscriberCancels = append(subscriberCancels, cancel2)
90+
go tools.SubscribeReloadMessages(subCtx2, client.Redis, tools.ChannelFaultCenterReload, handleFaultCenterReload)
91+
92+
// 订阅拨测规则重载消息
93+
subCtx3, cancel3 := context.WithCancel(ctx.Ctx)
94+
subscriberCancels = append(subscriberCancels, cancel3)
95+
go tools.SubscribeReloadMessages(subCtx3, client.Redis, tools.ChannelProbingReload, handleProbingReload)
96+
}
97+
98+
// stopMessageSubscribers 停止消息订阅器
99+
func stopMessageSubscribers() {
100+
for _, cancel := range subscriberCancels {
101+
cancel()
102+
}
103+
subscriberCancels = nil
104+
logc.Infof(ctx.Ctx, "消息订阅器已停止")
105+
}
106+
107+
// handleRuleReload 处理告警规则重载消息
108+
func handleRuleReload(msg tools.ReloadMessage) {
109+
110+
// 从数据库获取规则
111+
rule := ctx.DB.Rule().GetRuleObject(msg.ID)
112+
if rule.RuleId == "" {
113+
logc.Errorf(ctx.Ctx, "规则不存在: %s", msg.ID)
114+
return
115+
}
116+
117+
switch msg.Action {
118+
case tools.ActionCreate, tools.ActionEnable:
119+
if rule.Enabled != nil && *rule.Enabled {
120+
AlertRule.Submit(rule)
121+
logc.Infof(ctx.Ctx, "[Leader] 已启动规则评估: %s", msg.Name)
122+
}
123+
124+
case tools.ActionUpdate:
125+
AlertRule.Stop(msg.ID)
126+
if rule.Enabled != nil && *rule.Enabled {
127+
AlertRule.Submit(rule)
128+
logc.Infof(ctx.Ctx, "[Leader] 已重启规则评估: %s", msg.Name)
129+
}
130+
131+
case tools.ActionDelete, tools.ActionDisable:
132+
AlertRule.Stop(msg.ID)
133+
logc.Infof(ctx.Ctx, "[Leader] 已停止规则评估: %s", msg.Name)
134+
}
135+
}
136+
137+
// handleFaultCenterReload 处理故障中心重载消息
138+
func handleFaultCenterReload(msg tools.ReloadMessage) {
139+
fc, err := ctx.DB.FaultCenter().Get(msg.TenantID, msg.ID, "")
140+
if err != nil {
141+
logc.Errorf(ctx.Ctx, "故障中心不存在: %s, err: %v", msg.ID, err)
142+
return
143+
}
144+
145+
switch msg.Action {
146+
case tools.ActionCreate, tools.ActionEnable:
147+
ConsumerWork.Submit(fc)
148+
logc.Infof(ctx.Ctx, "[Leader] 已启动故障中心消费: %s", msg.Name)
149+
150+
case tools.ActionUpdate:
151+
ConsumerWork.Stop(msg.ID)
152+
ConsumerWork.Submit(fc)
153+
logc.Infof(ctx.Ctx, "[Leader] 已重启故障中心消费: %s", msg.Name)
154+
155+
case tools.ActionDelete, tools.ActionDisable:
156+
ConsumerWork.Stop(msg.ID)
157+
logc.Infof(ctx.Ctx, "[Leader] 已停止故障中心消费: %s", msg.Name)
158+
}
159+
}
160+
161+
// handleProbingReload 处理拨测规则重载消息
162+
func handleProbingReload(msg tools.ReloadMessage) {
163+
rule, err := ctx.DB.Probing().Search(msg.TenantID, msg.ID)
164+
if err != nil {
165+
logc.Errorf(ctx.Ctx, "拨测规则不存在: %s, err: %v", msg.ID, err)
166+
return
167+
}
168+
switch msg.Action {
169+
case tools.ActionCreate, tools.ActionEnable:
170+
if rule.Enabled != nil && *rule.Enabled {
171+
ProductProbing.Add(rule)
172+
ConsumeProbing.Add(rule)
173+
logc.Infof(ctx.Ctx, "[Leader] 已启动拨测任务: %s", msg.Name)
174+
}
175+
176+
case tools.ActionUpdate:
177+
ProductProbing.Stop(msg.ID)
178+
ConsumeProbing.Stop(msg.ID)
179+
if rule.Enabled != nil && *rule.Enabled {
180+
ProductProbing.Add(rule)
181+
ConsumeProbing.Add(rule)
182+
logc.Infof(ctx.Ctx, "[Leader] 已重启拨测任务: %s", msg.Name)
183+
}
184+
185+
case tools.ActionDelete, tools.ActionDisable:
186+
ProductProbing.Stop(msg.ID)
187+
ConsumeProbing.Stop(msg.ID)
188+
logc.Infof(ctx.Ctx, "[Leader] 已停止拨测任务: %s", msg.Name)
189+
}
190+
}
191+
192+
// unloadRules 卸载所有规则(失去 Leader 时调用)
193+
func unloadRules() {
194+
logc.Infof(ctx.Ctx, "本节点失去 Leader 身份,停止所有任务...")
195+
196+
// 停止消息订阅
197+
stopMessageSubscribers()
198+
199+
// 停止所有告警规则评估器
200+
AlertRule.StopAllEvals()
201+
202+
// 停止所有故障中心消费者
203+
ConsumerWork.StopAllConsumers()
204+
205+
// 停止所有拨测任务
206+
ProductProbing.StopAllTasks()
207+
ConsumeProbing.StopAllTasks()
208+
}
209+
210+
// IsLeader 判断节点角色
211+
func IsLeader() bool {
212+
if !leaderElectionEnabled {
213+
return true
214+
}
215+
216+
return LeaderElector != nil && LeaderElector.IsLeader()
30217
}

alert/consumer/consumer.go

Lines changed: 25 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,6 @@ package consumer
33
import (
44
"context"
55
"fmt"
6-
"github.com/zeromicro/go-zero/core/logc"
7-
"golang.org/x/sync/errgroup"
86
"regexp"
97
"runtime/debug"
108
"sort"
@@ -14,6 +12,9 @@ import (
1412
"watchAlert/alert/process"
1513
"watchAlert/internal/ctx"
1614
"watchAlert/internal/models"
15+
16+
"github.com/zeromicro/go-zero/core/logc"
17+
"golang.org/x/sync/errgroup"
1718
)
1819

1920
const (
@@ -30,6 +31,7 @@ type (
3031
Stop(faultCenterId string)
3132
Watch(ctx context.Context, faultCenter models.FaultCenter)
3233
RestartAllConsumers()
34+
StopAllConsumers()
3335
}
3436

3537
Consume struct {
@@ -403,3 +405,24 @@ func (c *Consume) processSilenceRule(faultCenter models.FaultCenter) {
403405
silenceCtx.PushAlertMute(*muteRule)
404406
}
405407
}
408+
409+
// StopAllConsumers 停止所有消费者
410+
func (c *Consume) StopAllConsumers() {
411+
c.ctx.Mux.Lock()
412+
defer c.ctx.Mux.Unlock()
413+
414+
count := len(c.ctx.ContextMap)
415+
if count == 0 {
416+
return
417+
}
418+
419+
logc.Infof(c.ctx.Ctx, "停止 %d 个故障中心消费者...", count)
420+
421+
// 取消所有消费任务
422+
for fcId, cancel := range c.ctx.ContextMap {
423+
cancel()
424+
delete(c.ctx.ContextMap, fcId)
425+
}
426+
427+
logc.Infof(c.ctx.Ctx, "所有故障中心消费者已停止")
428+
}

alert/consumer/upgrader.go

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,14 @@
11
package consumer
22

33
import (
4-
"context"
54
"fmt"
6-
"github.com/zeromicro/go-zero/core/logc"
75
"time"
86
"watchAlert/alert/mute"
97
"watchAlert/alert/process"
108
"watchAlert/internal/ctx"
119
"watchAlert/internal/models"
10+
11+
"github.com/zeromicro/go-zero/core/logc"
1212
)
1313

1414
// alarmUpgrade 处理告警升级主入口
@@ -17,7 +17,7 @@ func alarmUpgrade(ctx *ctx.Context, faultCenter models.FaultCenter, alerts map[s
1717
return nil
1818
}
1919

20-
filterAlerts := filterAlertEvents(ctx.Ctx, faultCenter, alerts)
20+
filterAlerts := filterAlertEvents(faultCenter, alerts)
2121
currentTime := time.Now().Unix()
2222

2323
confirmAggregated := createAggregatedAlert(models.ConfirmStatus, faultCenter)
@@ -139,7 +139,7 @@ func getContent(number int) string {
139139
}
140140

141141
// filterAlertEvents 过滤告警事件
142-
func filterAlertEvents(ctx context.Context, faultCenter models.FaultCenter, alerts map[string]*models.AlertCurEvent) []*models.AlertCurEvent {
142+
func filterAlertEvents(faultCenter models.FaultCenter, alerts map[string]*models.AlertCurEvent) []*models.AlertCurEvent {
143143
var newEvents []*models.AlertCurEvent
144144
for _, event := range alerts {
145145
// 过滤掉 预告警, 待恢复 状态的事件
@@ -175,10 +175,15 @@ func isMutedEvent(event *models.AlertCurEvent, faultCenter models.FaultCenter) b
175175
// sendAggregatedAlert 发送聚合后的告警函数
176176
func sendAggregatedAlert(ctx *ctx.Context, faultCenter models.FaultCenter, aggregated *AggregatedAlert) error {
177177
var noticeId string
178-
if aggregated.Status == models.ConfirmStatus {
179-
noticeId = faultCenter.GetUpgradeNoticeId(models.ConfirmStatus)
180-
} else if aggregated.Status == models.HandleStatus {
181-
noticeId = faultCenter.GetUpgradeNoticeId(models.HandleStatus)
178+
switch aggregated.Status {
179+
case models.ConfirmStatus:
180+
if faultCenter.GetStrategy(models.ConfirmStatus).GetEnabled() {
181+
noticeId = faultCenter.GetUpgradeNoticeId(models.ConfirmStatus)
182+
}
183+
case models.HandleStatus:
184+
if faultCenter.GetStrategy(models.HandleStatus).GetEnabled() {
185+
noticeId = faultCenter.GetUpgradeNoticeId(models.HandleStatus)
186+
}
182187
}
183188

184189
logc.Alert(ctx.Ctx, fmt.Sprintf("Aggregated alarm %s timeout fingerprints: %v, exceeded %d min",

alert/eval/eval.go

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@ type (
6363
Eval(ctx context.Context, rule models.AlertRule)
6464
Recover(tenantId, ruleId string, eventCacheKey models.AlertEventCacheKey, faultCenterInfoKey models.FaultCenterInfoCacheKey, curFingerprints []string)
6565
RestartAllEvals()
66+
StopAllEvals()
6667
}
6768

6869
// AlertRule 告警规则
@@ -412,3 +413,24 @@ func (t *AlertRule) getRuleList() ([]models.AlertRule, error) {
412413
}
413414
return ruleList, nil
414415
}
416+
417+
// StopAllEvals 停止所有评估器
418+
func (t *AlertRule) StopAllEvals() {
419+
t.ctx.Mux.Lock()
420+
defer t.ctx.Mux.Unlock()
421+
422+
count := len(t.ctx.ContextMap)
423+
if count == 0 {
424+
return
425+
}
426+
427+
logc.Infof(t.ctx.Ctx, "停止 %d 个规则评估器...", count)
428+
429+
// 取消所有评估任务
430+
for ruleId, cancel := range t.ctx.ContextMap {
431+
cancel()
432+
delete(t.ctx.ContextMap, ruleId)
433+
}
434+
435+
logc.Infof(t.ctx.Ctx, "所有规则评估器已停止")
436+
}

0 commit comments

Comments
 (0)