Skip to content

feat-localCache #162

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -25,4 +25,4 @@ dist-ssr
*.sw?
deploy/docker-compose/mysql
deploy/docker-compose/redis

cache_file
10 changes: 5 additions & 5 deletions alert/consumer/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ func (c *Consume) executeTask(faultCenter models.FaultCenter, taskChan chan stru
// 处理静默规则
c.processSilenceRule(faultCenter)
// 获取故障中心的所有告警事件
data, err := c.ctx.Redis.Redis().HGetAll(faultCenter.GetFaultCenterKey()).Result()
data, err := c.ctx.Cache.Cache().GetHashAll(faultCenter.GetFaultCenterKey())
if err != nil {
logc.Error(c.ctx.Ctx, fmt.Sprintf("从 Redis 中获取事件信息错误, faultCenterKey: %s, err: %s", faultCenter.GetFaultCenterKey(), err.Error()))
return
Expand Down Expand Up @@ -373,7 +373,7 @@ func (c *Consume) handleAlert(faultCenter models.FaultCenter, noticeId string, a
for _, event := range events {
if !event.IsRecovered {
event.LastSendTime = curTime
c.ctx.Redis.Event().PushEventToFaultCenter(event)
c.ctx.Cache.Event().PushEventToFaultCenter(event)
}

phoneNumber := func() []string {
Expand Down Expand Up @@ -465,7 +465,7 @@ func (c *Consume) withRuleGroupByAlerts(timeInt int64, alerts []*models.AlertCur

if !alert.IsRecovered {
alert.LastSendTime = timeInt
c.ctx.Redis.Event().PushEventToFaultCenter(alert)
c.ctx.Cache.Event().PushEventToFaultCenter(alert)
}
}

Expand All @@ -474,7 +474,7 @@ func (c *Consume) withRuleGroupByAlerts(timeInt int64, alerts []*models.AlertCur

// removeAlertFromCache 从缓存中删除告警
func (c *Consume) removeAlertFromCache(alert *models.AlertCurEvent) {
c.ctx.Redis.Event().RemoveEventFromFaultCenter(alert.TenantId, alert.FaultCenterId, alert.Fingerprint)
c.ctx.Cache.Event().RemoveEventFromFaultCenter(alert.TenantId, alert.FaultCenterId, alert.Fingerprint)
}

// getNoticeData 获取 Notice 数据
Expand All @@ -499,7 +499,7 @@ func (c *Consume) RestartAllConsumers() {

func (c *Consume) processSilenceRule(faultCenter models.FaultCenter) {
currentTime := time.Now().Unix()
silenceCtx := c.ctx.Redis.Silence()
silenceCtx := c.ctx.Cache.Silence()
// 获取静默列表中所有的id
silenceIds, err := silenceCtx.GetMutesForFaultCenter(faultCenter.TenantId, faultCenter.ID)
if err != nil {
Expand Down
8 changes: 4 additions & 4 deletions alert/eval/eval.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ func (t *AlertRule) getEvalTimeDuration(evalTimeType string, evalInterval int64)

func (t *AlertRule) Recover(RuleId, faultCenterKey string, faultCenterInfoKey string, curFingerprints []string) {
// 获取所有的故障中心的告警事件
events, err := t.ctx.Redis.Event().GetAllEventsForFaultCenter(faultCenterKey)
events, err := t.ctx.Cache.Event().GetAllEventsForFaultCenter(faultCenterKey)
if err != nil {
return
}
Expand Down Expand Up @@ -165,7 +165,7 @@ func (t *AlertRule) Recover(RuleId, faultCenterKey string, faultCenterInfoKey st

// 调整为待恢复状态
event.Status = 3
t.ctx.Redis.Event().PushEventToFaultCenter(&event)
t.ctx.Cache.Event().PushEventToFaultCenter(&event)

// 判断是否在等待时间范围内
wTime, exists := t.alarmRecoverWaitStore.Get(RuleId, fingerprint)
Expand All @@ -185,14 +185,14 @@ func (t *AlertRule) Recover(RuleId, faultCenterKey string, faultCenterInfoKey st
event.IsRecovered = true
event.RecoverTime = curTime
event.LastSendTime = 0
t.ctx.Redis.Event().PushEventToFaultCenter(&event)
t.ctx.Cache.Event().PushEventToFaultCenter(&event)
// 触发恢复删除带恢复中的 key
t.alarmRecoverWaitStore.Remove(RuleId, fingerprint)
}
}

func (t *AlertRule) getRecoverWaitTime(faultCenterInfoKey string) int64 {
faultCenter := t.ctx.Redis.FaultCenter().GetFaultCenterInfo(faultCenterInfoKey)
faultCenter := t.ctx.Cache.FaultCenter().GetFaultCenterInfo(faultCenterInfoKey)
if faultCenter.RecoverWaitTime == 0 {
return 1
}
Expand Down
16 changes: 8 additions & 8 deletions alert/eval/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import (

// Metrics 包含 Prometheus、VictoriaMetrics 数据源
func metrics(ctx *ctx.Context, datasourceId, datasourceType string, rule models.AlertRule) []string {
pools := ctx.Redis.ProviderPools()
pools := ctx.Cache.ProviderPools()
var (
resQuery []provider.Metrics
externalLabels map[string]interface{}
Expand Down Expand Up @@ -110,9 +110,9 @@ func metrics(ctx *ctx.Context, datasourceId, datasourceType string, rule models.
break
} else if _, exist := fingerPrintMap[fingerprint]; exist {
// 如果是 预告警 状态的事件,触发了恢复逻辑,但它并非是真正触发告警而恢复,所以只需要删除历史事件即可,无需继续处理恢复逻辑。
if ctx.Redis.Event().GetEventStatusForFaultCenter(event.TenantId, event.FaultCenterId, fingerprint) == 0 {
if ctx.Cache.Event().GetEventStatusForFaultCenter(event.TenantId, event.FaultCenterId, fingerprint) == 0 {
logc.Alert(ctx.Ctx, fmt.Sprintf("移除预告警恢复事件, Rule: %s, Fingerprint: %s", rule.RuleName, fingerprint))
ctx.Redis.Event().RemoveEventFromFaultCenter(event.TenantId, event.FaultCenterId, fingerprint)
ctx.Cache.Event().RemoveEventFromFaultCenter(event.TenantId, event.FaultCenterId, fingerprint)
continue
}

Expand All @@ -122,7 +122,7 @@ func metrics(ctx *ctx.Context, datasourceId, datasourceType string, rule models.
}

// 获取上一次告警值
event.Metric["value"] = ctx.Redis.Event().GetLastFiringValueForFaultCenter(event.TenantId, event.FaultCenterId, event.Fingerprint)
event.Metric["value"] = ctx.Cache.Event().GetLastFiringValueForFaultCenter(event.TenantId, event.FaultCenterId, event.Fingerprint)
// 获取当前恢复值
event.Metric["recover_value"] = v.GetValue()
process.PushEventToFaultCenter(ctx, &event)
Expand Down Expand Up @@ -177,7 +177,7 @@ func logs(ctx *ctx.Context, datasourceId, datasourceType string, rule models.Ale
externalLabels map[string]interface{}
)

pools := ctx.Redis.ProviderPools()
pools := ctx.Cache.ProviderPools()
switch datasourceType {
case provider.LokiDsProviderName:
cli, err := pools.GetClient(datasourceId)
Expand Down Expand Up @@ -379,7 +379,7 @@ func traces(ctx *ctx.Context, datasourceId, datasourceType string, rule models.A
externalLabels map[string]interface{}
)

pools := ctx.Redis.ProviderPools()
pools := ctx.Cache.ProviderPools()
switch datasourceType {
case provider.JaegerDsProviderName:
curAt := time.Now().UTC()
Expand Down Expand Up @@ -427,7 +427,7 @@ func traces(ctx *ctx.Context, datasourceId, datasourceType string, rule models.A

func cloudWatch(ctx *ctx.Context, datasourceId string, rule models.AlertRule) []string {
var externalLabels map[string]interface{}
pools := ctx.Redis.ProviderPools()
pools := ctx.Cache.ProviderPools()
cfg, err := pools.GetClient(datasourceId)
if err != nil {
logc.Errorf(ctx.Ctx, err.Error())
Expand Down Expand Up @@ -489,7 +489,7 @@ func kubernetesEvent(ctx *ctx.Context, datasourceId string, rule models.AlertRul
return []string{}
}

pools := ctx.Redis.ProviderPools()
pools := ctx.Cache.ProviderPools()
cli, err := pools.GetClient(datasourceId)
if err != nil {
logc.Errorf(ctx.Ctx, err.Error())
Expand Down
2 changes: 1 addition & 1 deletion alert/mute/mute.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func RecoverNotify(mp MuteParams) bool {

// IsSilence 判断是否静默
func IsSilence(mute MuteParams) bool {
silenceCtx := ctx.Redis.Silence()
silenceCtx := ctx.Cache.Silence()
// 获取静默列表中所有的id
ids, err := silenceCtx.GetMutesForFaultCenter(mute.TenantId, mute.FaultCenterId)
if err != nil {
Expand Down
6 changes: 3 additions & 3 deletions alert/probing/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func (m *ConsumeProbing) Add(r models.ProbingRule) {
for {
select {
case <-ticker:
result, err := m.ctx.Redis.Event().GetProbingEventCache(r.GetFiringAlertCacheKey())
result, err := m.ctx.Cache.Event().GetProbingEventCache(r.GetFiringAlertCacheKey())
if err == nil {
m.handleAlert(result)
}
Expand Down Expand Up @@ -81,7 +81,7 @@ func (m *ConsumeProbing) filterEvent(alert models.ProbingEvent) bool {
if !alert.IsRecovered {
if alert.LastSendTime == 0 || alert.LastEvalTime >= alert.LastSendTime+alert.RepeatNoticeInterval*60 {
alert.LastSendTime = time.Now().Unix()
m.ctx.Redis.Event().SetProbingEventCache(alert, 0)
m.ctx.Cache.Event().SetProbingEventCache(alert, 0)
return true
}
} else {
Expand Down Expand Up @@ -120,7 +120,7 @@ func (m *ConsumeProbing) getContent(alert models.ProbingEvent, noticeData models

// 删除缓存
func removeAlertFromCache(alert models.ProbingEvent) {
ctx.DO().Redis.Redis().Del(alert.GetFiringAlertCacheKey())
ctx.DO().Cache.Cache().DeleteKey(alert.GetFiringAlertCacheKey())
}

func buildEvent(event models.ProbingEvent) models.AlertCurEvent {
Expand Down
6 changes: 3 additions & 3 deletions alert/probing/process.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ func (t *ProductProbing) buildEvent(rule models.ProbingRule) models.ProbingEvent

func SaveProbingEndpointEvent(event models.ProbingEvent) {
firingKey := event.GetFiringAlertCacheKey()
cache := ctx.DO().Redis.Event()
cache := ctx.DO().Cache.Event()
resFiring, _ := cache.GetProbingEventCache(firingKey)
event.FirstTriggerTime = cache.GetProbingEventFirstTime(firingKey)
event.LastEvalTime = cache.GetProbingEventLastEvalTime(firingKey)
Expand All @@ -31,12 +31,12 @@ func SaveProbingEndpointEvent(event models.ProbingEvent) {

func SetProbingValueMap(key string, m map[string]any) error {
for k, v := range m {
ctx.DO().Redis.Redis().HSet(key, k, v)
ctx.DO().Cache.Cache().SetHashAny(key, k, v)
}
return nil
}

func GetProbingValueMap(key string) map[string]string {
result := ctx.DO().Redis.Redis().HGetAll(key).Val()
result, _ := ctx.DO().Cache.Cache().GetHashAll(key)
return result
}
2 changes: 1 addition & 1 deletion alert/probing/producter.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ func (t *ProductProbing) Evaluation(event models.ProbingEvent, option models.Eva
t.cleanFrequency(t.OkFrequency, event.RuleId)
}()

c := ctx.Redis.Event()
c := ctx.Cache.Event()
neCache, err := c.GetProbingEventCache(event.GetFiringAlertCacheKey())
if err != nil {
logc.Error(ctx.Ctx, err.Error())
Expand Down
4 changes: 2 additions & 2 deletions alert/process/process.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func PushEventToFaultCenter(ctx *ctx.Context, event *models.AlertCurEvent) {
return
}

eventOpt := ctx.Redis.Event()
eventOpt := ctx.Cache.Event()
event.FirstTriggerTime = eventOpt.GetFirstTimeForFaultCenter(event.TenantId, event.FaultCenterId, event.Fingerprint)
event.LastEvalTime = eventOpt.GetLastEvalTimeForFaultCenter()
event.LastSendTime = eventOpt.GetLastSendTimeForFaultCenter(event.TenantId, event.FaultCenterId, event.Fingerprint)
Expand Down Expand Up @@ -138,7 +138,7 @@ func RecordAlertHisEvent(ctx *ctx.Context, alert models.AlertCurEvent) error {

// GetFingerPrint 获取指纹信息
func GetFingerPrint(ctx *ctx.Context, tenantId string, faultCenterId string, ruleId string) map[string]struct{} {
fingerPrints := ctx.Redis.Event().GetFingerprintsByRuleId(tenantId, faultCenterId, ruleId)
fingerPrints := ctx.Cache.Event().GetFingerprintsByRuleId(tenantId, faultCenterId, ruleId)
fingerPrintMap := make(map[string]struct{})
for _, fingerPrint := range fingerPrints {
fingerPrintMap[fingerPrint] = struct{}{}
Expand Down
4 changes: 2 additions & 2 deletions api/dashboardInfo.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ func getUserNumber(ctx *ctx.Context) int64 {

// getAlertList 获取当前告警 annotations 列表
func getAlertList(ctx *ctx.Context, faultCenter models.FaultCenter) []string {
events, err := ctx.Redis.Event().GetAllEventsForFaultCenter(faultCenter.GetFaultCenterKey())
events, err := ctx.Cache.Event().GetAllEventsForFaultCenter(faultCenter.GetFaultCenterKey())
if err != nil {
return nil
}
Expand All @@ -115,7 +115,7 @@ func getAlertList(ctx *ctx.Context, faultCenter models.FaultCenter) []string {

// getAlarmDistribution 获取告警分布
func getAlarmDistribution(ctx *ctx.Context, faultCenter models.FaultCenter, severity string) int64 {
events, err := ctx.Redis.Event().GetAllEventsForFaultCenter(faultCenter.GetFaultCenterKey())
events, err := ctx.Cache.Event().GetAllEventsForFaultCenter(faultCenter.GetFaultCenterKey())
if err != nil {
return 0
}
Expand Down
1 change: 1 addition & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ type App struct {
Jwt Jwt `json:"Jwt"`
Jaeger Jaeger `json:"Jaeger"`
Ldap Ldap `json:"ldap"`
Cache string `json:"Cache"`
}

type Server struct {
Expand Down
2 changes: 2 additions & 0 deletions config/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ MySQL:
dbName: watchalert
timeout: 10s

Cache: default

Redis:
host: w8t-redis
port: 6379
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ require (
github.com/onsi/ginkgo v1.16.5 // indirect
github.com/onsi/ginkgo/v2 v2.15.0 // indirect
github.com/onsi/gomega v1.31.1 // indirect
github.com/patrickmn/go-cache v2.1.0+incompatible // indirect
github.com/pelletier/go-toml/v2 v2.2.2 // indirect
github.com/pierrec/lz4 v2.6.1+incompatible // indirect
github.com/pkg/errors v0.9.1 // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,8 @@ github.com/onsi/gomega v1.7.1/go.mod h1:XdKZgCCFLUoM/7CFJVPcG8C1xQ1AJ0vpAezJrB7J
github.com/onsi/gomega v1.10.1/go.mod h1:iN09h71vgCQne3DLsj+A5owkum+a2tYe+TOCB1ybHNo=
github.com/onsi/gomega v1.31.1 h1:KYppCUK+bUgAZwHOu7EXVBKyQA6ILvOESHkn/tgoqvo=
github.com/onsi/gomega v1.31.1/go.mod h1:y40C95dwAD1Nz36SsEnxvfFe8FFfNxzI5eJ0EYGyAy0=
github.com/patrickmn/go-cache v2.1.0+incompatible h1:HRMgzkcYKYpi3C8ajMPV8OFXaaRUnok+kx1WdO15EQc=
github.com/patrickmn/go-cache v2.1.0+incompatible/go.mod h1:3Qf8kWWT7OJRJbdiICTKqZju1ZixQ/KpMGzzAfe6+WQ=
github.com/pelletier/go-toml/v2 v2.2.2 h1:aYUidT7k73Pcl9nb2gScu7NSrKCSHIDE89b3+6Wq+LM=
github.com/pelletier/go-toml/v2 v2.2.2/go.mod h1:1t835xjRzz80PqgE6HHgN2JOsmgYu/h4qDAS4n929Rs=
github.com/pierrec/lz4 v2.6.1+incompatible h1:9UY3+iC23yxF0UfGaYrGplQ+79Rg+h/q9FV9ix19jjM=
Expand Down
6 changes: 3 additions & 3 deletions initialization/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ func InitBasic() {
global.Config = config.InitConfig()

dbRepo := repo.NewRepoEntry()
rCache := cache.NewEntryCache()
ctx := ctx.NewContext(context.Background(), dbRepo, rCache)
rCache := cache.NewEntryCache(global.Config.Cache)
ctx := ctx.NewContext(context.Background(), dbRepo, rCache, global.Config.Cache)

services.NewServices(ctx)

Expand Down Expand Up @@ -56,7 +56,7 @@ func InitBasic() {
logc.Error(ctx.Ctx, fmt.Sprintf("创建 Ai 客户端失败: %s", err.Error()))
return
}
ctx.Redis.ProviderPools().SetClient("AiClient", client)
ctx.Cache.ProviderPools().SetClient("AiClient", client)
}
}

Expand Down
16 changes: 16 additions & 0 deletions internal/cache/cache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package cache

import (
"time"
)

type Cache interface {
SetKey(key, value string, expiration time.Duration)
GetKey(key string) (string, error)
DeleteKey(key string)
SetHash(key, field, value string)
SetHashAny(key, field string, value any)
DeleteHash(key, field string)
GetHash(key, field string) (string, error)
GetHashAll(key string) (map[string]string, error)
}
33 changes: 20 additions & 13 deletions internal/cache/entry.go
Original file line number Diff line number Diff line change
@@ -1,39 +1,46 @@
package cache

import (
"github.com/go-redis/redis"
"watchAlert/pkg/client"
)

type (
entryCache struct {
redis *redis.Client
//redis *redis.Client
provider *ProviderPoolStore
cache Cache
}

InterEntryCache interface {
Redis() *redis.Client
Cache() Cache
Silence() SilenceCacheInterface
Event() EventCacheInterface
ProviderPools() *ProviderPoolStore
FaultCenter() FaultCenterCacheInterface
}
)

func NewEntryCache() InterEntryCache {
r := client.InitRedis()
p := NewClientPoolStore()
func NewEntryCache(cacheType string) InterEntryCache {

return &entryCache{
redis: r,
provider: p,
p := NewClientPoolStore()
switch cacheType {
case "Redis":
return &entryCache{
cache: client.NewRedisCache(),
provider: p,
}
default:
return &entryCache{
cache: client.InitLocalCache(),
provider: p,
}
}
}

func (e entryCache) Redis() *redis.Client { return e.redis }
func (e entryCache) Silence() SilenceCacheInterface { return newSilenceCacheInterface(e.redis) }
func (e entryCache) Event() EventCacheInterface { return newEventCacheInterface(e.redis) }
func (e entryCache) Cache() Cache { return e.cache }
func (e entryCache) Silence() SilenceCacheInterface { return newSilenceCacheInterface(e.cache) }
func (e entryCache) Event() EventCacheInterface { return newEventCacheInterface(e.cache) }
func (e entryCache) ProviderPools() *ProviderPoolStore { return e.provider }
func (e entryCache) FaultCenter() FaultCenterCacheInterface {
return newFaultCenterCacheInterface(e.redis)
return newFaultCenterCacheInterface(e.cache)
}
Loading