diff --git a/.gitignore b/.gitignore index 848f41e..61aba9d 100644 --- a/.gitignore +++ b/.gitignore @@ -25,4 +25,4 @@ dist-ssr *.sw? deploy/docker-compose/mysql deploy/docker-compose/redis - +cache_file \ No newline at end of file diff --git a/alert/consumer/consumer.go b/alert/consumer/consumer.go index 00723fe..bc1db4d 100644 --- a/alert/consumer/consumer.go +++ b/alert/consumer/consumer.go @@ -192,7 +192,7 @@ func (c *Consume) executeTask(faultCenter models.FaultCenter, taskChan chan stru // 处理静默规则 c.processSilenceRule(faultCenter) // 获取故障中心的所有告警事件 - data, err := c.ctx.Redis.Redis().HGetAll(faultCenter.GetFaultCenterKey()).Result() + data, err := c.ctx.Cache.Cache().GetHashAll(faultCenter.GetFaultCenterKey()) if err != nil { logc.Error(c.ctx.Ctx, fmt.Sprintf("从 Redis 中获取事件信息错误, faultCenterKey: %s, err: %s", faultCenter.GetFaultCenterKey(), err.Error())) return @@ -373,7 +373,7 @@ func (c *Consume) handleAlert(faultCenter models.FaultCenter, noticeId string, a for _, event := range events { if !event.IsRecovered { event.LastSendTime = curTime - c.ctx.Redis.Event().PushEventToFaultCenter(event) + c.ctx.Cache.Event().PushEventToFaultCenter(event) } phoneNumber := func() []string { @@ -465,7 +465,7 @@ func (c *Consume) withRuleGroupByAlerts(timeInt int64, alerts []*models.AlertCur if !alert.IsRecovered { alert.LastSendTime = timeInt - c.ctx.Redis.Event().PushEventToFaultCenter(alert) + c.ctx.Cache.Event().PushEventToFaultCenter(alert) } } @@ -474,7 +474,7 @@ func (c *Consume) withRuleGroupByAlerts(timeInt int64, alerts []*models.AlertCur // removeAlertFromCache 从缓存中删除告警 func (c *Consume) removeAlertFromCache(alert *models.AlertCurEvent) { - c.ctx.Redis.Event().RemoveEventFromFaultCenter(alert.TenantId, alert.FaultCenterId, alert.Fingerprint) + c.ctx.Cache.Event().RemoveEventFromFaultCenter(alert.TenantId, alert.FaultCenterId, alert.Fingerprint) } // getNoticeData 获取 Notice 数据 @@ -499,7 +499,7 @@ func (c *Consume) RestartAllConsumers() { func (c *Consume) processSilenceRule(faultCenter models.FaultCenter) { currentTime := time.Now().Unix() - silenceCtx := c.ctx.Redis.Silence() + silenceCtx := c.ctx.Cache.Silence() // 获取静默列表中所有的id silenceIds, err := silenceCtx.GetMutesForFaultCenter(faultCenter.TenantId, faultCenter.ID) if err != nil { diff --git a/alert/eval/eval.go b/alert/eval/eval.go index c737b05..960438e 100644 --- a/alert/eval/eval.go +++ b/alert/eval/eval.go @@ -137,7 +137,7 @@ func (t *AlertRule) getEvalTimeDuration(evalTimeType string, evalInterval int64) func (t *AlertRule) Recover(RuleId, faultCenterKey string, faultCenterInfoKey string, curFingerprints []string) { // 获取所有的故障中心的告警事件 - events, err := t.ctx.Redis.Event().GetAllEventsForFaultCenter(faultCenterKey) + events, err := t.ctx.Cache.Event().GetAllEventsForFaultCenter(faultCenterKey) if err != nil { return } @@ -165,7 +165,7 @@ func (t *AlertRule) Recover(RuleId, faultCenterKey string, faultCenterInfoKey st // 调整为待恢复状态 event.Status = 3 - t.ctx.Redis.Event().PushEventToFaultCenter(&event) + t.ctx.Cache.Event().PushEventToFaultCenter(&event) // 判断是否在等待时间范围内 wTime, exists := t.alarmRecoverWaitStore.Get(RuleId, fingerprint) @@ -185,14 +185,14 @@ func (t *AlertRule) Recover(RuleId, faultCenterKey string, faultCenterInfoKey st event.IsRecovered = true event.RecoverTime = curTime event.LastSendTime = 0 - t.ctx.Redis.Event().PushEventToFaultCenter(&event) + t.ctx.Cache.Event().PushEventToFaultCenter(&event) // 触发恢复删除带恢复中的 key t.alarmRecoverWaitStore.Remove(RuleId, fingerprint) } } func (t *AlertRule) getRecoverWaitTime(faultCenterInfoKey string) int64 { - faultCenter := t.ctx.Redis.FaultCenter().GetFaultCenterInfo(faultCenterInfoKey) + faultCenter := t.ctx.Cache.FaultCenter().GetFaultCenterInfo(faultCenterInfoKey) if faultCenter.RecoverWaitTime == 0 { return 1 } diff --git a/alert/eval/query.go b/alert/eval/query.go index 286ebc4..2f2afe0 100644 --- a/alert/eval/query.go +++ b/alert/eval/query.go @@ -17,7 +17,7 @@ import ( // Metrics 包含 Prometheus、VictoriaMetrics 数据源 func metrics(ctx *ctx.Context, datasourceId, datasourceType string, rule models.AlertRule) []string { - pools := ctx.Redis.ProviderPools() + pools := ctx.Cache.ProviderPools() var ( resQuery []provider.Metrics externalLabels map[string]interface{} @@ -110,9 +110,9 @@ func metrics(ctx *ctx.Context, datasourceId, datasourceType string, rule models. break } else if _, exist := fingerPrintMap[fingerprint]; exist { // 如果是 预告警 状态的事件,触发了恢复逻辑,但它并非是真正触发告警而恢复,所以只需要删除历史事件即可,无需继续处理恢复逻辑。 - if ctx.Redis.Event().GetEventStatusForFaultCenter(event.TenantId, event.FaultCenterId, fingerprint) == 0 { + if ctx.Cache.Event().GetEventStatusForFaultCenter(event.TenantId, event.FaultCenterId, fingerprint) == 0 { logc.Alert(ctx.Ctx, fmt.Sprintf("移除预告警恢复事件, Rule: %s, Fingerprint: %s", rule.RuleName, fingerprint)) - ctx.Redis.Event().RemoveEventFromFaultCenter(event.TenantId, event.FaultCenterId, fingerprint) + ctx.Cache.Event().RemoveEventFromFaultCenter(event.TenantId, event.FaultCenterId, fingerprint) continue } @@ -122,7 +122,7 @@ func metrics(ctx *ctx.Context, datasourceId, datasourceType string, rule models. } // 获取上一次告警值 - event.Metric["value"] = ctx.Redis.Event().GetLastFiringValueForFaultCenter(event.TenantId, event.FaultCenterId, event.Fingerprint) + event.Metric["value"] = ctx.Cache.Event().GetLastFiringValueForFaultCenter(event.TenantId, event.FaultCenterId, event.Fingerprint) // 获取当前恢复值 event.Metric["recover_value"] = v.GetValue() process.PushEventToFaultCenter(ctx, &event) @@ -177,7 +177,7 @@ func logs(ctx *ctx.Context, datasourceId, datasourceType string, rule models.Ale externalLabels map[string]interface{} ) - pools := ctx.Redis.ProviderPools() + pools := ctx.Cache.ProviderPools() switch datasourceType { case provider.LokiDsProviderName: cli, err := pools.GetClient(datasourceId) @@ -379,7 +379,7 @@ func traces(ctx *ctx.Context, datasourceId, datasourceType string, rule models.A externalLabels map[string]interface{} ) - pools := ctx.Redis.ProviderPools() + pools := ctx.Cache.ProviderPools() switch datasourceType { case provider.JaegerDsProviderName: curAt := time.Now().UTC() @@ -427,7 +427,7 @@ func traces(ctx *ctx.Context, datasourceId, datasourceType string, rule models.A func cloudWatch(ctx *ctx.Context, datasourceId string, rule models.AlertRule) []string { var externalLabels map[string]interface{} - pools := ctx.Redis.ProviderPools() + pools := ctx.Cache.ProviderPools() cfg, err := pools.GetClient(datasourceId) if err != nil { logc.Errorf(ctx.Ctx, err.Error()) @@ -489,7 +489,7 @@ func kubernetesEvent(ctx *ctx.Context, datasourceId string, rule models.AlertRul return []string{} } - pools := ctx.Redis.ProviderPools() + pools := ctx.Cache.ProviderPools() cli, err := pools.GetClient(datasourceId) if err != nil { logc.Errorf(ctx.Ctx, err.Error()) diff --git a/alert/mute/mute.go b/alert/mute/mute.go index 5875f20..be6367e 100644 --- a/alert/mute/mute.go +++ b/alert/mute/mute.go @@ -59,7 +59,7 @@ func RecoverNotify(mp MuteParams) bool { // IsSilence 判断是否静默 func IsSilence(mute MuteParams) bool { - silenceCtx := ctx.Redis.Silence() + silenceCtx := ctx.Cache.Silence() // 获取静默列表中所有的id ids, err := silenceCtx.GetMutesForFaultCenter(mute.TenantId, mute.FaultCenterId) if err != nil { diff --git a/alert/probing/consumer.go b/alert/probing/consumer.go index 9918d32..f0fa241 100644 --- a/alert/probing/consumer.go +++ b/alert/probing/consumer.go @@ -36,7 +36,7 @@ func (m *ConsumeProbing) Add(r models.ProbingRule) { for { select { case <-ticker: - result, err := m.ctx.Redis.Event().GetProbingEventCache(r.GetFiringAlertCacheKey()) + result, err := m.ctx.Cache.Event().GetProbingEventCache(r.GetFiringAlertCacheKey()) if err == nil { m.handleAlert(result) } @@ -81,7 +81,7 @@ func (m *ConsumeProbing) filterEvent(alert models.ProbingEvent) bool { if !alert.IsRecovered { if alert.LastSendTime == 0 || alert.LastEvalTime >= alert.LastSendTime+alert.RepeatNoticeInterval*60 { alert.LastSendTime = time.Now().Unix() - m.ctx.Redis.Event().SetProbingEventCache(alert, 0) + m.ctx.Cache.Event().SetProbingEventCache(alert, 0) return true } } else { @@ -120,7 +120,7 @@ func (m *ConsumeProbing) getContent(alert models.ProbingEvent, noticeData models // 删除缓存 func removeAlertFromCache(alert models.ProbingEvent) { - ctx.DO().Redis.Redis().Del(alert.GetFiringAlertCacheKey()) + ctx.DO().Cache.Cache().DeleteKey(alert.GetFiringAlertCacheKey()) } func buildEvent(event models.ProbingEvent) models.AlertCurEvent { diff --git a/alert/probing/process.go b/alert/probing/process.go index c17ac56..738c843 100644 --- a/alert/probing/process.go +++ b/alert/probing/process.go @@ -21,7 +21,7 @@ func (t *ProductProbing) buildEvent(rule models.ProbingRule) models.ProbingEvent func SaveProbingEndpointEvent(event models.ProbingEvent) { firingKey := event.GetFiringAlertCacheKey() - cache := ctx.DO().Redis.Event() + cache := ctx.DO().Cache.Event() resFiring, _ := cache.GetProbingEventCache(firingKey) event.FirstTriggerTime = cache.GetProbingEventFirstTime(firingKey) event.LastEvalTime = cache.GetProbingEventLastEvalTime(firingKey) @@ -31,12 +31,12 @@ func SaveProbingEndpointEvent(event models.ProbingEvent) { func SetProbingValueMap(key string, m map[string]any) error { for k, v := range m { - ctx.DO().Redis.Redis().HSet(key, k, v) + ctx.DO().Cache.Cache().SetHashAny(key, k, v) } return nil } func GetProbingValueMap(key string) map[string]string { - result := ctx.DO().Redis.Redis().HGetAll(key).Val() + result, _ := ctx.DO().Cache.Cache().GetHashAll(key) return result } diff --git a/alert/probing/producter.go b/alert/probing/producter.go index 421a044..55b8593 100644 --- a/alert/probing/producter.go +++ b/alert/probing/producter.go @@ -173,7 +173,7 @@ func (t *ProductProbing) Evaluation(event models.ProbingEvent, option models.Eva t.cleanFrequency(t.OkFrequency, event.RuleId) }() - c := ctx.Redis.Event() + c := ctx.Cache.Event() neCache, err := c.GetProbingEventCache(event.GetFiringAlertCacheKey()) if err != nil { logc.Error(ctx.Ctx, err.Error()) diff --git a/alert/process/process.go b/alert/process/process.go index ba2fae5..8a13fbc 100644 --- a/alert/process/process.go +++ b/alert/process/process.go @@ -33,7 +33,7 @@ func PushEventToFaultCenter(ctx *ctx.Context, event *models.AlertCurEvent) { return } - eventOpt := ctx.Redis.Event() + eventOpt := ctx.Cache.Event() event.FirstTriggerTime = eventOpt.GetFirstTimeForFaultCenter(event.TenantId, event.FaultCenterId, event.Fingerprint) event.LastEvalTime = eventOpt.GetLastEvalTimeForFaultCenter() event.LastSendTime = eventOpt.GetLastSendTimeForFaultCenter(event.TenantId, event.FaultCenterId, event.Fingerprint) @@ -138,7 +138,7 @@ func RecordAlertHisEvent(ctx *ctx.Context, alert models.AlertCurEvent) error { // GetFingerPrint 获取指纹信息 func GetFingerPrint(ctx *ctx.Context, tenantId string, faultCenterId string, ruleId string) map[string]struct{} { - fingerPrints := ctx.Redis.Event().GetFingerprintsByRuleId(tenantId, faultCenterId, ruleId) + fingerPrints := ctx.Cache.Event().GetFingerprintsByRuleId(tenantId, faultCenterId, ruleId) fingerPrintMap := make(map[string]struct{}) for _, fingerPrint := range fingerPrints { fingerPrintMap[fingerPrint] = struct{}{} diff --git a/api/dashboardInfo.go b/api/dashboardInfo.go index 24ca8b1..28a0e18 100644 --- a/api/dashboardInfo.go +++ b/api/dashboardInfo.go @@ -101,7 +101,7 @@ func getUserNumber(ctx *ctx.Context) int64 { // getAlertList 获取当前告警 annotations 列表 func getAlertList(ctx *ctx.Context, faultCenter models.FaultCenter) []string { - events, err := ctx.Redis.Event().GetAllEventsForFaultCenter(faultCenter.GetFaultCenterKey()) + events, err := ctx.Cache.Event().GetAllEventsForFaultCenter(faultCenter.GetFaultCenterKey()) if err != nil { return nil } @@ -115,7 +115,7 @@ func getAlertList(ctx *ctx.Context, faultCenter models.FaultCenter) []string { // getAlarmDistribution 获取告警分布 func getAlarmDistribution(ctx *ctx.Context, faultCenter models.FaultCenter, severity string) int64 { - events, err := ctx.Redis.Event().GetAllEventsForFaultCenter(faultCenter.GetFaultCenterKey()) + events, err := ctx.Cache.Event().GetAllEventsForFaultCenter(faultCenter.GetFaultCenterKey()) if err != nil { return 0 } diff --git a/config/config.go b/config/config.go index 614aec7..9af102f 100644 --- a/config/config.go +++ b/config/config.go @@ -12,6 +12,7 @@ type App struct { Jwt Jwt `json:"Jwt"` Jaeger Jaeger `json:"Jaeger"` Ldap Ldap `json:"ldap"` + Cache string `json:"Cache"` } type Server struct { diff --git a/config/config.yaml b/config/config.yaml index 6ac1dd8..15eab0b 100644 --- a/config/config.yaml +++ b/config/config.yaml @@ -11,6 +11,8 @@ MySQL: dbName: watchalert timeout: 10s +Cache: default + Redis: host: w8t-redis port: 6379 diff --git a/go.mod b/go.mod index 0968f34..21f478e 100644 --- a/go.mod +++ b/go.mod @@ -106,6 +106,7 @@ require ( github.com/onsi/ginkgo v1.16.5 // indirect github.com/onsi/ginkgo/v2 v2.15.0 // indirect github.com/onsi/gomega v1.31.1 // indirect + github.com/patrickmn/go-cache v2.1.0+incompatible // indirect github.com/pelletier/go-toml/v2 v2.2.2 // indirect github.com/pierrec/lz4 v2.6.1+incompatible // indirect github.com/pkg/errors v0.9.1 // indirect diff --git a/go.sum b/go.sum index 2e10dea..854cb33 100644 --- a/go.sum +++ b/go.sum @@ -280,6 +280,8 @@ github.com/onsi/gomega v1.7.1/go.mod h1:XdKZgCCFLUoM/7CFJVPcG8C1xQ1AJ0vpAezJrB7J github.com/onsi/gomega v1.10.1/go.mod h1:iN09h71vgCQne3DLsj+A5owkum+a2tYe+TOCB1ybHNo= github.com/onsi/gomega v1.31.1 h1:KYppCUK+bUgAZwHOu7EXVBKyQA6ILvOESHkn/tgoqvo= github.com/onsi/gomega v1.31.1/go.mod h1:y40C95dwAD1Nz36SsEnxvfFe8FFfNxzI5eJ0EYGyAy0= +github.com/patrickmn/go-cache v2.1.0+incompatible h1:HRMgzkcYKYpi3C8ajMPV8OFXaaRUnok+kx1WdO15EQc= +github.com/patrickmn/go-cache v2.1.0+incompatible/go.mod h1:3Qf8kWWT7OJRJbdiICTKqZju1ZixQ/KpMGzzAfe6+WQ= github.com/pelletier/go-toml/v2 v2.2.2 h1:aYUidT7k73Pcl9nb2gScu7NSrKCSHIDE89b3+6Wq+LM= github.com/pelletier/go-toml/v2 v2.2.2/go.mod h1:1t835xjRzz80PqgE6HHgN2JOsmgYu/h4qDAS4n929Rs= github.com/pierrec/lz4 v2.6.1+incompatible h1:9UY3+iC23yxF0UfGaYrGplQ+79Rg+h/q9FV9ix19jjM= diff --git a/initialization/init.go b/initialization/init.go index 04ea3cf..c77cfd4 100644 --- a/initialization/init.go +++ b/initialization/init.go @@ -22,8 +22,8 @@ func InitBasic() { global.Config = config.InitConfig() dbRepo := repo.NewRepoEntry() - rCache := cache.NewEntryCache() - ctx := ctx.NewContext(context.Background(), dbRepo, rCache) + rCache := cache.NewEntryCache(global.Config.Cache) + ctx := ctx.NewContext(context.Background(), dbRepo, rCache, global.Config.Cache) services.NewServices(ctx) @@ -56,7 +56,7 @@ func InitBasic() { logc.Error(ctx.Ctx, fmt.Sprintf("创建 Ai 客户端失败: %s", err.Error())) return } - ctx.Redis.ProviderPools().SetClient("AiClient", client) + ctx.Cache.ProviderPools().SetClient("AiClient", client) } } diff --git a/internal/cache/cache.go b/internal/cache/cache.go new file mode 100644 index 0000000..6358b45 --- /dev/null +++ b/internal/cache/cache.go @@ -0,0 +1,16 @@ +package cache + +import ( + "time" +) + +type Cache interface { + SetKey(key, value string, expiration time.Duration) + GetKey(key string) (string, error) + DeleteKey(key string) + SetHash(key, field, value string) + SetHashAny(key, field string, value any) + DeleteHash(key, field string) + GetHash(key, field string) (string, error) + GetHashAll(key string) (map[string]string, error) +} diff --git a/internal/cache/entry.go b/internal/cache/entry.go index dff2692..f3c47f9 100644 --- a/internal/cache/entry.go +++ b/internal/cache/entry.go @@ -1,18 +1,18 @@ package cache import ( - "github.com/go-redis/redis" "watchAlert/pkg/client" ) type ( entryCache struct { - redis *redis.Client + //redis *redis.Client provider *ProviderPoolStore + cache Cache } InterEntryCache interface { - Redis() *redis.Client + Cache() Cache Silence() SilenceCacheInterface Event() EventCacheInterface ProviderPools() *ProviderPoolStore @@ -20,20 +20,27 @@ type ( } ) -func NewEntryCache() InterEntryCache { - r := client.InitRedis() - p := NewClientPoolStore() +func NewEntryCache(cacheType string) InterEntryCache { - return &entryCache{ - redis: r, - provider: p, + p := NewClientPoolStore() + switch cacheType { + case "Redis": + return &entryCache{ + cache: client.NewRedisCache(), + provider: p, + } + default: + return &entryCache{ + cache: client.InitLocalCache(), + provider: p, + } } } -func (e entryCache) Redis() *redis.Client { return e.redis } -func (e entryCache) Silence() SilenceCacheInterface { return newSilenceCacheInterface(e.redis) } -func (e entryCache) Event() EventCacheInterface { return newEventCacheInterface(e.redis) } +func (e entryCache) Cache() Cache { return e.cache } +func (e entryCache) Silence() SilenceCacheInterface { return newSilenceCacheInterface(e.cache) } +func (e entryCache) Event() EventCacheInterface { return newEventCacheInterface(e.cache) } func (e entryCache) ProviderPools() *ProviderPoolStore { return e.provider } func (e entryCache) FaultCenter() FaultCenterCacheInterface { - return newFaultCenterCacheInterface(e.redis) + return newFaultCenterCacheInterface(e.cache) } diff --git a/internal/cache/event.go b/internal/cache/event.go index b132e7a..8c6c361 100644 --- a/internal/cache/event.go +++ b/internal/cache/event.go @@ -3,19 +3,18 @@ package cache import ( "context" "encoding/json" - "github.com/go-redis/redis" "github.com/zeromicro/go-zero/core/logc" "sync" "time" "watchAlert/internal/models" - "watchAlert/pkg/client" "watchAlert/pkg/tools" ) type ( // EventCache 用于管理事件缓存操作 EventCache struct { - rc *redis.Client + //rc *redis.Client + cache Cache sync.RWMutex } @@ -39,9 +38,9 @@ type ( ) // newEventCacheInterface 创建一个新的 EventCache 实例 -func newEventCacheInterface(r *redis.Client) EventCacheInterface { +func newEventCacheInterface(c Cache) EventCacheInterface { return &EventCache{ - rc: r, + cache: c, } } @@ -51,14 +50,14 @@ func (ec *EventCache) SetProbingEventCache(event models.ProbingEvent, expiration defer ec.Unlock() eventJSON, _ := json.Marshal(event) - ec.setRedisKey(event.GetFiringAlertCacheKey(), string(eventJSON), expiration) + ec.cache.SetKey(event.GetFiringAlertCacheKey(), string(eventJSON), expiration) } // GetProbingEventCache 获取探测事件缓存 func (ec *EventCache) GetProbingEventCache(key string) (models.ProbingEvent, error) { var event models.ProbingEvent - data, err := ec.getRedisKey(key) + data, err := ec.cache.GetKey(key) if err != nil { return event, err } @@ -104,7 +103,7 @@ func (ec *EventCache) PushEventToFaultCenter(event *models.AlertCurEvent) { defer ec.Unlock() key := models.BuildCacheEventKey(event.TenantId, event.FaultCenterId) - ec.setRedisHash(key, event.Fingerprint, tools.JsonMarshal(event)) + ec.cache.SetHash(key, event.Fingerprint, tools.JsonMarshal(event)) } // RemoveEventFromFaultCenter 从故障中心的缓存中移除事件 @@ -113,7 +112,7 @@ func (ec *EventCache) RemoveEventFromFaultCenter(tenantId, faultCenterId, finger defer ec.Unlock() key := models.BuildCacheEventKey(tenantId, faultCenterId) - ec.deleteRedisHash(key, fingerprint) + ec.cache.DeleteHash(key, fingerprint) } // GetAllEventsForFaultCenter 获取故障中心的所有事件 @@ -121,7 +120,7 @@ func (ec *EventCache) GetAllEventsForFaultCenter(fcKey string) (map[string]model ec.RLock() defer ec.RUnlock() - result, err := ec.getRedisHashAll(fcKey) + result, err := ec.cache.GetHashAll(fcKey) if err != nil { return nil, err } @@ -159,7 +158,7 @@ func (ec *EventCache) GetFingerprintsByRuleId(tenantId, faultCenterId, ruleId st // GetEventFromCache 从缓存中获取事件数据 func (ec *EventCache) GetEventFromCache(tenantId, faultCenterId, fingerprint string) (models.AlertCurEvent, error) { key := models.BuildCacheEventKey(tenantId, faultCenterId) - data, err := ec.getRedisHash(key, fingerprint) + data, err := ec.cache.GetHash(key, fingerprint) if err != nil { return models.AlertCurEvent{}, err } @@ -213,27 +212,27 @@ func (ec *EventCache) GetLastFiringValueForFaultCenter(tenantId, faultCenterId, return event.Metric["value"].(float64) } -// 封装 Redis 操作 -func (ec *EventCache) setRedisKey(key, value string, expiration time.Duration) { - ec.rc.Set(key, value, expiration) -} - -func (ec *EventCache) getRedisKey(key string) (string, error) { - return ec.rc.Get(key).Result() -} - -func (ec *EventCache) setRedisHash(key, field, value string) { - client.Redis.HSet(key, field, value) -} - -func (ec *EventCache) deleteRedisHash(key, field string) { - client.Redis.HDel(key, field) -} - -func (ec *EventCache) getRedisHash(key, field string) (string, error) { - return ec.rc.HGet(key, field).Result() -} - -func (ec *EventCache) getRedisHashAll(key string) (map[string]string, error) { - return ec.rc.HGetAll(key).Result() -} +//// 封装 Redis 操作 +//func (ec *EventCache) setRedisKey(key, value string, expiration time.Duration) { +// ec.rc.Set(key, value, expiration) +//} +// +//func (ec *EventCache) getRedisKey(key string) (string, error) { +// return ec.rc.Get(key).Result() +//} +// +//func (ec *EventCache) setRedisHash(key, field, value string) { +// client.Redis.HSet(key, field, value) +//} +// +//func (ec *EventCache) deleteRedisHash(key, field string) { +// client.Redis.HDel(key, field) +//} +// +//func (ec *EventCache) getRedisHash(key, field string) (string, error) { +// return ec.rc.HGet(key, field).Result() +//} +// +//func (ec *EventCache) getRedisHashAll(key string) (map[string]string, error) { +// return ec.rc.HGetAll(key).Result() +//} diff --git a/internal/cache/faultCenter.go b/internal/cache/faultCenter.go index fd4dd13..df4ff25 100644 --- a/internal/cache/faultCenter.go +++ b/internal/cache/faultCenter.go @@ -2,9 +2,6 @@ package cache import ( "encoding/json" - "github.com/go-redis/redis" - "github.com/zeromicro/go-zero/core/logc" - "golang.org/x/net/context" "sync" "watchAlert/internal/models" "watchAlert/pkg/tools" @@ -12,7 +9,8 @@ import ( type ( FaultCenterCache struct { - rc *redis.Client + //rc *redis.Client + cache Cache sync.RWMutex } @@ -24,24 +22,20 @@ type ( ) // newFaultCenterCacheInterface 创建一个新的 FaultCenterCache 实例 -func newFaultCenterCacheInterface(r *redis.Client) FaultCenterCacheInterface { +func newFaultCenterCacheInterface(c Cache) FaultCenterCacheInterface { return &FaultCenterCache{ - rc: r, + cache: c, } } // PushFaultCenterInfo 添加 Info 数据 func (f *FaultCenterCache) PushFaultCenterInfo(center models.FaultCenter) { - err := f.rc.Set(center.GetFaultCenterInfoKey(), tools.JsonMarshal(center), 0).Err() - if err != nil { - logc.Errorf(context.Background(), err.Error()) - return - } + f.cache.SetKey(center.GetFaultCenterInfoKey(), tools.JsonMarshal(center), 0) } // GetFaultCenterInfo 获取 Info 数据 func (f *FaultCenterCache) GetFaultCenterInfo(faultCenterInfoKey string) models.FaultCenter { - result, err := f.rc.Get(faultCenterInfoKey).Result() + result, err := f.cache.GetKey(faultCenterInfoKey) if err != nil { return models.FaultCenter{} } @@ -53,5 +47,5 @@ func (f *FaultCenterCache) GetFaultCenterInfo(faultCenterInfoKey string) models. // RemoveFaultCenterInfo 删除 Info 数据 func (f *FaultCenterCache) RemoveFaultCenterInfo(faultCenterInfoKey string) { - f.rc.Del(faultCenterInfoKey) + f.cache.DeleteKey(faultCenterInfoKey) } diff --git a/internal/cache/silence.go b/internal/cache/silence.go index c5c37e0..3c9d2f0 100644 --- a/internal/cache/silence.go +++ b/internal/cache/silence.go @@ -2,17 +2,16 @@ package cache import ( "encoding/json" - "github.com/go-redis/redis" "sync" "watchAlert/internal/models" - "watchAlert/pkg/client" "watchAlert/pkg/tools" ) type ( // SilenceCache 用于管理告警静默的缓存操作 SilenceCache struct { - rc *redis.Client + //rc *redis.Client + cache Cache sync.RWMutex } @@ -26,9 +25,9 @@ type ( ) // newSilenceCacheInterface 创建一个新的 SilenceCache 实例 -func newSilenceCacheInterface(r *redis.Client) SilenceCacheInterface { +func newSilenceCacheInterface(c Cache) SilenceCacheInterface { return &SilenceCache{ - rc: r, + cache: c, } } @@ -38,7 +37,7 @@ func (sc *SilenceCache) PushMuteToFaultCenter(mute models.AlertSilences) { defer sc.Unlock() key := models.BuildCacheMuteKey(mute.TenantId, mute.FaultCenterId) - sc.setRedisHash(key, mute.Id, tools.JsonMarshal(mute)) + sc.cache.SetHash(key, mute.Id, tools.JsonMarshal(mute)) } // RemoveMuteFromFaultCenter 从故障中心的缓存中移除静默规则 @@ -47,7 +46,7 @@ func (sc *SilenceCache) RemoveMuteFromFaultCenter(tenantId, faultCenterId, id st defer sc.Unlock() key := models.BuildCacheMuteKey(tenantId, faultCenterId) - sc.deleteRedisHash(key, id) + sc.cache.DeleteHash(key, id) } func (sc *SilenceCache) GetMutesForFaultCenter(tenantId, faultCenterId string) ([]string, error) { @@ -55,7 +54,7 @@ func (sc *SilenceCache) GetMutesForFaultCenter(tenantId, faultCenterId string) ( defer sc.RUnlock() key := models.BuildCacheMuteKey(tenantId, faultCenterId) - mapping, err := sc.getRedisAllHashMap(key) + mapping, err := sc.cache.GetHashAll(key) if err != nil { return nil, err } @@ -69,35 +68,35 @@ func (sc *SilenceCache) GetMutesForFaultCenter(tenantId, faultCenterId string) ( // WithIdGetMuteFromCache 从缓存中获取静默规则 func (sc *SilenceCache) WithIdGetMuteFromCache(tenantId, faultCenterId, id string) (*models.AlertSilences, error) { key := models.BuildCacheMuteKey(tenantId, faultCenterId) - cache, err := sc.getRedisHash(key, id) + cache, err := sc.cache.GetHash(key, id) if err != nil { return nil, err } var mute models.AlertSilences - if err := json.Unmarshal(cache, &mute); err != nil { + if err := json.Unmarshal([]byte(cache), &mute); err != nil { return nil, err } return &mute, nil } -// setRedisHash 设置 Redis 哈希表中的值 -func (sc *SilenceCache) setRedisHash(key, field string, value interface{}) { - client.Redis.HSet(key, field, value) -} - -// deleteRedisHash 删除 Redis 哈希表中的值 -func (sc *SilenceCache) deleteRedisHash(key, field string) { - client.Redis.HDel(key, field) -} - -// getRedisHash 获取 Redis 哈希表中的值 -func (sc *SilenceCache) getRedisHash(key, field string) ([]byte, error) { - return sc.rc.HGet(key, field).Bytes() -} - -// getRedisAllMap 获取 Redis 哈希表Map -func (sc *SilenceCache) getRedisAllHashMap(key string) (map[string]string, error) { - return sc.rc.HGetAll(key).Result() -} +//// setRedisHash 设置 Redis 哈希表中的值 +//func (sc *SilenceCache) setRedisHash(key, field string, value interface{}) { +// client.Redis.HSet(key, field, value) +//} +// +//// deleteRedisHash 删除 Redis 哈希表中的值 +//func (sc *SilenceCache) deleteRedisHash(key, field string) { +// client.Redis.HDel(key, field) +//} +// +//// getRedisHash 获取 Redis 哈希表中的值 +//func (sc *SilenceCache) getRedisHash(key, field string) ([]byte, error) { +// return sc.rc.HGet(key, field).Bytes() +//} +// +//// getRedisAllMap 获取 Redis 哈希表Map +//func (sc *SilenceCache) getRedisAllHashMap(key string) (map[string]string, error) { +// return sc.rc.HGetAll(key).Result() +//} diff --git a/internal/middleware/Auth.go b/internal/middleware/Auth.go index 8a231ff..0afa244 100644 --- a/internal/middleware/Auth.go +++ b/internal/middleware/Auth.go @@ -50,7 +50,7 @@ func IsTokenValid(ctx *ctx.Context, tokenStr string) (int64, bool) { // 密码校验, 当修改密码后其他已登陆的终端会被下线。 var user models.Member - result, err := ctx.Redis.Redis().Get("uid-" + token.ID).Result() + result, err := ctx.Cache.Cache().GetKey("uid-" + token.ID) if err != nil { return 400, false } diff --git a/internal/repo/silence.go b/internal/repo/silence.go index 24ac422..58acbee 100644 --- a/internal/repo/silence.go +++ b/internal/repo/silence.go @@ -33,15 +33,19 @@ func (sr SilenceRepo) List(r models.AlertSilenceQuery) (models.SilenceResponse, count int64 ) db := sr.db.Model(models.AlertSilences{}) - db.Where("tenant_id = ?", r.TenantId) - db.Where("fault_center_id = ?", r.FaultCenterId) + if r.TenantId != "" && r.FaultCenterId != "" { + db.Where("tenant_id = ?", r.TenantId) + db.Where("fault_center_id = ?", r.FaultCenterId) + } if r.Query != "" { db.Where("id LIKE ? OR comment LIKE ?", "%"+r.Query+"%", "%"+r.Query+"%") } db.Count(&count) - db.Limit(int(r.Page.Size)).Offset(int((r.Page.Index - 1) * r.Page.Size)) + if r.Page.Size != 0 && r.Page.Index != 0 { + db.Limit(int(r.Page.Size)).Offset(int((r.Page.Index - 1) * r.Page.Size)) + } err := db.Find(&silenceList).Error if err != nil { return models.SilenceResponse{}, err diff --git a/internal/services/ai.go b/internal/services/ai.go index 6b1b1d5..0d0ad98 100644 --- a/internal/services/ai.go +++ b/internal/services/ai.go @@ -41,7 +41,7 @@ func (a aiService) Chat(req interface{}) (interface{}, interface{}) { return nil, err } - client, err := a.ctx.Redis.ProviderPools().GetClient("AiClient") + client, err := a.ctx.Cache.ProviderPools().GetClient("AiClient") if err != nil { return "", err } diff --git a/internal/services/datasource.go b/internal/services/datasource.go index 7042d0e..7399daf 100644 --- a/internal/services/datasource.go +++ b/internal/services/datasource.go @@ -114,7 +114,7 @@ func (ds datasourceService) WithAddClientToProviderPools(datasource models.Alert cli interface{} err error ) - pools := ds.ctx.Redis.ProviderPools() + pools := ds.ctx.Cache.ProviderPools() switch datasource.Type { case provider.PrometheusDsProvider: cli, err = provider.NewPrometheusClient(datasource) @@ -145,6 +145,6 @@ func (ds datasourceService) WithAddClientToProviderPools(datasource models.Alert } func (ds datasourceService) WithRemoveClientForProviderPools(datasourceId string) { - pools := ds.ctx.Redis.ProviderPools() + pools := ds.ctx.Cache.ProviderPools() pools.RemoveClient(datasourceId) } diff --git a/internal/services/event.go b/internal/services/event.go index 384d3dc..e8603ba 100644 --- a/internal/services/event.go +++ b/internal/services/event.go @@ -25,7 +25,7 @@ func newInterEventService(ctx *ctx.Context) InterEventService { func (e eventService) ListCurrentEvent(req interface{}) (interface{}, interface{}) { r := req.(*models.AlertCurEventQuery) - center, err := e.ctx.Redis.Event().GetAllEventsForFaultCenter(models.BuildCacheEventKey(r.TenantId, r.FaultCenterId)) + center, err := e.ctx.Cache.Event().GetAllEventsForFaultCenter(models.BuildCacheEventKey(r.TenantId, r.FaultCenterId)) if err != nil { return nil, err } diff --git a/internal/services/fault_center.go b/internal/services/fault_center.go index 835b635..cd49a7e 100644 --- a/internal/services/fault_center.go +++ b/internal/services/fault_center.go @@ -38,7 +38,7 @@ func (f faultCenterService) Create(req interface{}) (data interface{}, err inter return nil, err } - f.ctx.Redis.FaultCenter().PushFaultCenterInfo(*r) + f.ctx.Cache.FaultCenter().PushFaultCenterInfo(*r) alert.ConsumerWork.Submit(*r) return nil, nil @@ -51,7 +51,7 @@ func (f faultCenterService) Update(req interface{}) (data interface{}, err inter return nil, err } - f.ctx.Redis.FaultCenter().PushFaultCenterInfo(*r) + f.ctx.Cache.FaultCenter().PushFaultCenterInfo(*r) alert.ConsumerWork.Stop(r.ID) alert.ConsumerWork.Submit(*r) @@ -65,7 +65,7 @@ func (f faultCenterService) Delete(req interface{}) (data interface{}, err inter return nil, err } - f.ctx.Redis.FaultCenter().RemoveFaultCenterInfo(models.BuildCacheInfoKey(r.TenantId, r.ID)) + f.ctx.Cache.FaultCenter().RemoveFaultCenterInfo(models.BuildCacheInfoKey(r.TenantId, r.ID)) alert.ConsumerWork.Stop(r.ID) return nil, nil @@ -83,7 +83,7 @@ func (f faultCenterService) List(req interface{}) (data interface{}, err interfa faultCenters := data.([]models.FaultCenter) for index, fc := range data.([]models.FaultCenter) { - events, err := f.ctx.Redis.Event().GetAllEventsForFaultCenter(fc.GetFaultCenterKey()) + events, err := f.ctx.Cache.Event().GetAllEventsForFaultCenter(fc.GetFaultCenterKey()) if err != nil { return nil, err } @@ -123,7 +123,7 @@ func (f faultCenterService) Reset(req interface{}) (data interface{}, err interf alert.ConsumerWork.Stop(r.ID) data, err = f.ctx.DB.FaultCenter().Get(models.FaultCenterQuery{ID: r.ID}) - f.ctx.Redis.FaultCenter().PushFaultCenterInfo(data.(models.FaultCenter)) + f.ctx.Cache.FaultCenter().PushFaultCenterInfo(data.(models.FaultCenter)) alert.ConsumerWork.Submit(data.(models.FaultCenter)) return nil, nil diff --git a/internal/services/probing.go b/internal/services/probing.go index 43a0dab..1d6cf31 100644 --- a/internal/services/probing.go +++ b/internal/services/probing.go @@ -86,7 +86,8 @@ func (m probingService) Delete(req interface{}) (interface{}, interface{}) { m.ProductTask.Stop(r.RuleId) m.ConsumerTask.Stop(r.RuleId) - err = m.ctx.Redis.Redis().Del(res.GetFiringAlertCacheKey(), res.GetProbingMappingKey()).Err() + // TODO miss err + m.ctx.Cache.Cache().DeleteHash(res.GetFiringAlertCacheKey(), res.GetProbingMappingKey()) if err != nil { return nil, err } diff --git a/internal/services/rule.go b/internal/services/rule.go index e194a85..f273bef 100644 --- a/internal/services/rule.go +++ b/internal/services/rule.go @@ -67,9 +67,9 @@ func (rs ruleService) Update(req interface{}) (interface{}, interface{}) { logc.Infof(rs.ctx.Ctx, fmt.Sprintf("重启 RuleId 为 %s 的 Worker 进程", rule.RuleId)) } else { // 删除缓存 - fingerprints := rs.ctx.Redis.Event().GetFingerprintsByRuleId(rule.TenantId, rule.FaultCenterId, rule.RuleId) + fingerprints := rs.ctx.Cache.Event().GetFingerprintsByRuleId(rule.TenantId, rule.FaultCenterId, rule.RuleId) for _, fingerprint := range fingerprints { - rs.ctx.Redis.Event().RemoveEventFromFaultCenter(rule.TenantId, rule.FaultCenterId, fingerprint) + rs.ctx.Cache.Event().RemoveEventFromFaultCenter(rule.TenantId, rule.FaultCenterId, fingerprint) } } @@ -102,9 +102,9 @@ func (rs ruleService) Delete(req interface{}) (interface{}, interface{}) { } // 删除缓存 - fingerprints := rs.ctx.Redis.Event().GetFingerprintsByRuleId(rule.TenantId, info.FaultCenterId, rule.RuleId) + fingerprints := rs.ctx.Cache.Event().GetFingerprintsByRuleId(rule.TenantId, info.FaultCenterId, rule.RuleId) for _, fingerprint := range fingerprints { - rs.ctx.Redis.Event().RemoveEventFromFaultCenter(rule.TenantId, info.FaultCenterId, fingerprint) + rs.ctx.Cache.Event().RemoveEventFromFaultCenter(rule.TenantId, info.FaultCenterId, fingerprint) } return nil, nil diff --git a/internal/services/settings.go b/internal/services/settings.go index 161aed0..a5bc101 100644 --- a/internal/services/settings.go +++ b/internal/services/settings.go @@ -43,7 +43,7 @@ func (a settingService) Save(req interface{}) (interface{}, interface{}) { if err != nil { return nil, err } - a.ctx.Redis.ProviderPools().SetClient("AiClient", client) + a.ctx.Cache.ProviderPools().SetClient("AiClient", client) } return nil, nil diff --git a/internal/services/silence.go b/internal/services/silence.go index a018179..d6eb95c 100644 --- a/internal/services/silence.go +++ b/internal/services/silence.go @@ -46,7 +46,7 @@ func (ass alertSilenceService) Create(req interface{}) (interface{}, interface{} r.Status = 0 } - ass.ctx.Redis.Silence().PushMuteToFaultCenter(silenceEvent) + ass.ctx.Cache.Silence().PushMuteToFaultCenter(silenceEvent) err := ass.ctx.DB.Silence().Create(silenceEvent) if err != nil { return nil, err @@ -66,7 +66,7 @@ func (ass alertSilenceService) Update(req interface{}) (interface{}, interface{} r.Status = 1 } - ass.ctx.Redis.Silence().PushMuteToFaultCenter(*r) + ass.ctx.Cache.Silence().PushMuteToFaultCenter(*r) err := ass.ctx.DB.Silence().Update(*r) if err != nil { return nil, err @@ -77,7 +77,7 @@ func (ass alertSilenceService) Update(req interface{}) (interface{}, interface{} func (ass alertSilenceService) Delete(req interface{}) (interface{}, interface{}) { r := req.(*models.AlertSilenceQuery) - ass.ctx.Redis.Silence().RemoveMuteFromFaultCenter(r.TenantId, r.FaultCenterId, r.Id) + ass.ctx.Cache.Silence().RemoveMuteFromFaultCenter(r.TenantId, r.FaultCenterId, r.Id) err := ass.ctx.DB.Silence().Delete(*r) if err != nil { return nil, err diff --git a/internal/services/user.go b/internal/services/user.go index ac5955d..167d305 100644 --- a/internal/services/user.go +++ b/internal/services/user.go @@ -97,7 +97,7 @@ func (us userService) Login(req interface{}) (interface{}, interface{}) { } duration := time.Duration(global.Config.Jwt.Expire) * time.Second - us.ctx.Redis.Redis().Set("uid-"+data.UserId, tools.JsonMarshal(r), duration) + us.ctx.Cache.Cache().SetKey("uid-"+data.UserId, tools.JsonMarshal(r), duration) return tokenData, nil } diff --git a/pkg/client/localcache.go b/pkg/client/localcache.go new file mode 100644 index 0000000..ef0dd6c --- /dev/null +++ b/pkg/client/localcache.go @@ -0,0 +1,195 @@ +package client + +import ( + "context" + "encoding/gob" + "fmt" + "github.com/patrickmn/go-cache" + log "github.com/sirupsen/logrus" + "github.com/zeromicro/go-zero/core/logc" + "os" + "os/signal" + "sync" + "syscall" + "time" +) + +var ( + LocalCache *Cache + once sync.Once + cacheFile = "cache_file" +) + +type Cache struct { + cache *cache.Cache + mu sync.Mutex + shutdown chan struct{} + //wg sync.WaitGroup +} + +func InitLocalCache() *Cache { + var localCache *Cache + once.Do( + func() { + c := cache.New(cache.DefaultExpiration, cache.NoExpiration) + gob.Register(cache.Item{}) + gob.Register(map[string]interface{}{}) + gob.Register(map[string]string{}) + + if err := c.LoadFile(cacheFile); err != nil { + logc.Debugf(context.Background(), "缓存加载失败,首次启动请忽略。%v", err) + } else { + logc.Debugf(context.Background(), "加载缓存成功") + } + + localCache = &Cache{cache: c, shutdown: make(chan struct{})} + LocalCache = localCache + }) + + //localCache.wg.Add(1) + go localCache.persist() + + //localCache.wg.Add(1) + go localCache.listenSignals() + + return localCache +} + +func (c *Cache) SetKey(key, value string, expiration time.Duration) { + c.cache.Set(key, value, expiration) +} + +func (c *Cache) GetKey(key string) (string, error) { + value, exist := c.cache.Get(key) + if !exist { + return "", nil + } + strValue, ok := value.(string) + if !ok { + return "", nil + } + return strValue, nil +} + +func (c *Cache) DeleteKey(key string) { + c.cache.Delete(key) +} + +func (c *Cache) SetHash(key, field, value string) { + c.mu.Lock() + defer c.mu.Unlock() + + var hashMap map[string]string + if val, found := c.cache.Get(key); found { + hashMap, _ = val.(map[string]string) + } + if hashMap == nil { + hashMap = make(map[string]string) + } + + hashMap[field] = value + c.cache.Set(key, hashMap, cache.NoExpiration) +} + +func (c *Cache) SetHashAny(key, field string, value any) { + c.mu.Lock() + defer c.mu.Unlock() + + var hashMap map[string]interface{} + if val, found := c.cache.Get(key); found { + hashMap, _ = val.(map[string]interface{}) + } + if hashMap == nil { + hashMap = make(map[string]interface{}) + } + + hashMap[field] = value + c.cache.Set(key, hashMap, cache.NoExpiration) +} + +func (c *Cache) DeleteHash(key, field string) { + c.mu.Lock() + defer c.mu.Unlock() + + if val, found := c.cache.Get(key); found { + if hashMap, ok := val.(map[string]string); ok { + delete(hashMap, field) + c.cache.Set(key, hashMap, cache.NoExpiration) + } + } +} + +func (c *Cache) GetHash(key, field string) (string, error) { + c.mu.Lock() + defer c.mu.Unlock() + + val, found := c.cache.Get(key) + if !found { + return "", nil + } + + hashMap, ok := val.(map[string]string) + if !ok { + return "", nil + } + + value, exist := hashMap[field] + if !exist { + return "", nil + } + + return value, nil +} + +func (c *Cache) GetHashAll(key string) (map[string]string, error) { + c.mu.Lock() + defer c.mu.Unlock() + + val, found := c.cache.Get(key) + if !found { + return nil, nil + } + + hashMap, ok := val.(map[string]string) + if !ok { + return nil, nil + } + + return hashMap, nil +} + +func (c *Cache) listenSignals() { + //defer c.wg.Done() + + sigChan := make(chan os.Signal, 2) + signal.Notify(sigChan, os.Interrupt, syscall.SIGTERM) + select { + case <-sigChan: + logc.Infof(context.Background(), "接收到终止信号,启动关闭流程") + close(c.shutdown) + case <-c.shutdown: + } +} + +func (c *Cache) persist() { + //defer c.wg.Done() + + ticker := time.NewTicker(30 * time.Second) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + if err := c.cache.SaveFile(cacheFile); err != nil { + log.Warn(err) + } + logc.Debugf(context.Background(), "定时缓存持久化成功") + case <-c.shutdown: + if err := c.cache.SaveFile(cacheFile); err != nil { + logc.Errorf(context.Background(), fmt.Sprintf("退出前缓存持久化异常: %v", err)) + } + logc.Debugf(context.Background(), "缓存持久化成功") + return + } + } +} diff --git a/pkg/client/redis.go b/pkg/client/redis.go index 7537921..41f1b1d 100644 --- a/pkg/client/redis.go +++ b/pkg/client/redis.go @@ -4,6 +4,7 @@ import ( "fmt" "github.com/go-redis/redis" "log" + "time" "watchAlert/internal/global" ) @@ -29,3 +30,43 @@ func InitRedis() *redis.Client { return client } + +type RedisCache struct { + *redis.Client +} + +func NewRedisCache() *RedisCache { + return &RedisCache{InitRedis()} +} + +func (c *RedisCache) SetKey(key, value string, expiration time.Duration) { + c.Set(key, value, expiration) +} + +func (c *RedisCache) GetKey(key string) (string, error) { + return c.Get(key).Result() +} + +func (c *RedisCache) SetHashAny(key, field string, value any) { + c.HSet(key, field, value) +} + +func (c *RedisCache) DeleteKey(key string) { + c.Del(key) +} + +func (c *RedisCache) SetHash(key, field, value string) { + c.HSet(key, field, value) +} + +func (c *RedisCache) DeleteHash(key, field string) { + c.HDel(key, field) +} + +func (c *RedisCache) GetHash(key, field string) (string, error) { + return c.HGet(key, field).Result() +} + +func (c *RedisCache) GetHashAll(key string) (map[string]string, error) { + return c.HGetAll(key).Result() +} diff --git a/pkg/ctx/ctx.go b/pkg/ctx/ctx.go index 453c94c..efa7cdb 100644 --- a/pkg/ctx/ctx.go +++ b/pkg/ctx/ctx.go @@ -9,34 +9,39 @@ import ( type Context struct { DB repo.InterEntryRepo - Redis cache.InterEntryCache + Cache cache.InterEntryCache Ctx context.Context Mux sync.RWMutex ConsumerContextMap map[string]context.CancelFunc + CacheType string } var ( - DB repo.InterEntryRepo - Redis cache.InterEntryCache - Ctx context.Context + DB repo.InterEntryRepo + Cache cache.InterEntryCache + Ctx context.Context + CacheType string ) -func NewContext(ctx context.Context, db repo.InterEntryRepo, redis cache.InterEntryCache) *Context { +func NewContext(ctx context.Context, db repo.InterEntryRepo, c cache.InterEntryCache, cacheType string) *Context { DB = db - Redis = redis + Cache = c Ctx = ctx + CacheType = cacheType return &Context{ DB: db, - Redis: redis, + Cache: c, Ctx: ctx, ConsumerContextMap: make(map[string]context.CancelFunc), + CacheType: cacheType, } } func DO() *Context { return &Context{ - DB: DB, - Redis: Redis, - Ctx: Ctx, + DB: DB, + Cache: Cache, + Ctx: Ctx, + CacheType: CacheType, } }