From c67e8a987ecb0edaeb761b8371596835cccaea1e Mon Sep 17 00:00:00 2001 From: Antonio Mindov Date: Wed, 23 Jul 2025 14:16:38 +0300 Subject: [PATCH] Add closing support to otel metrics instrumentation --- extra/redisotel/config.go | 8 +++ extra/redisotel/metrics.go | 117 +++++++++++++++++++++++-------------- 2 files changed, 81 insertions(+), 44 deletions(-) diff --git a/extra/redisotel/config.go b/extra/redisotel/config.go index 6ebd4bd56a..6d90abfd0d 100644 --- a/extra/redisotel/config.go +++ b/extra/redisotel/config.go @@ -28,6 +28,8 @@ type config struct { meter metric.Meter poolName string + + closeChan chan struct{} } type baseOption interface { @@ -145,3 +147,9 @@ func WithMeterProvider(mp metric.MeterProvider) MetricsOption { conf.mp = mp }) } + +func WithCloseChan(closeChan chan struct{}) MetricsOption { + return metricsOption(func(conf *config) { + conf.closeChan = closeChan + }) +} diff --git a/extra/redisotel/metrics.go b/extra/redisotel/metrics.go index 4974f4e8de..d9a1c72196 100644 --- a/extra/redisotel/metrics.go +++ b/extra/redisotel/metrics.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "net" + "sync" "time" "go.opentelemetry.io/otel" @@ -13,6 +14,12 @@ import ( "github.com/redis/go-redis/v9" ) +type metricsState struct { + registrations []metric.Registration + closed bool + mutex sync.Mutex +} + // InstrumentMetrics starts reporting OpenTelemetry Metrics. // // Based on https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/metrics/semantic_conventions/database-metrics.md @@ -30,49 +37,42 @@ func InstrumentMetrics(rdb redis.UniversalClient, opts ...MetricsOption) error { ) } - switch rdb := rdb.(type) { - case *redis.Client: - if conf.poolName == "" { - opt := rdb.Options() - conf.poolName = opt.Addr + var state *metricsState + if conf.closeChan != nil { + state = &metricsState{ + registrations: make([]metric.Registration, 0), + closed: false, + mutex: sync.Mutex{}, } - conf.attrs = append(conf.attrs, attribute.String("pool.name", conf.poolName)) - if err := reportPoolStats(rdb, conf); err != nil { - return err - } - if err := addMetricsHook(rdb, conf); err != nil { - return err - } - return nil - case *redis.ClusterClient: - rdb.OnNewNode(func(rdb *redis.Client) { - if conf.poolName == "" { - opt := rdb.Options() - conf.poolName = opt.Addr - } - conf.attrs = append(conf.attrs, attribute.String("pool.name", conf.poolName)) + go func() { + <-conf.closeChan - if err := reportPoolStats(rdb, conf); err != nil { - otel.Handle(err) + state.mutex.Lock() + state.closed = true + + for _, registration := range state.registrations { + if err := registration.Unregister(); err != nil { + otel.Handle(err) + } } - if err := addMetricsHook(rdb, conf); err != nil { + state.mutex.Unlock() + }() + } + + switch rdb := rdb.(type) { + case *redis.Client: + return registerClient(rdb, conf, state) + case *redis.ClusterClient: + rdb.OnNewNode(func(rdb *redis.Client) { + if err := registerClient(rdb, conf, state); err != nil { otel.Handle(err) } }) return nil case *redis.Ring: rdb.OnNewNode(func(rdb *redis.Client) { - if conf.poolName == "" { - opt := rdb.Options() - conf.poolName = opt.Addr - } - conf.attrs = append(conf.attrs, attribute.String("pool.name", conf.poolName)) - - if err := reportPoolStats(rdb, conf); err != nil { - otel.Handle(err) - } - if err := addMetricsHook(rdb, conf); err != nil { + if err := registerClient(rdb, conf, state); err != nil { otel.Handle(err) } }) @@ -82,7 +82,38 @@ func InstrumentMetrics(rdb redis.UniversalClient, opts ...MetricsOption) error { } } -func reportPoolStats(rdb *redis.Client, conf *config) error { +func registerClient(rdb *redis.Client, conf *config, state *metricsState) error { + if state != nil { + state.mutex.Lock() + defer state.mutex.Unlock() + + if state.closed { + return nil + } + } + + if conf.poolName == "" { + opt := rdb.Options() + conf.poolName = opt.Addr + } + conf.attrs = append(conf.attrs, attribute.String("pool.name", conf.poolName)) + + registration, err := reportPoolStats(rdb, conf) + if err != nil { + return err + } + + if state != nil { + state.registrations = append(state.registrations, registration) + } + + if err := addMetricsHook(rdb, conf); err != nil { + return err + } + return nil +} + +func reportPoolStats(rdb *redis.Client, conf *config) (metric.Registration, error) { labels := conf.attrs idleAttrs := append(labels, attribute.String("state", "idle")) usedAttrs := append(labels, attribute.String("state", "used")) @@ -92,7 +123,7 @@ func reportPoolStats(rdb *redis.Client, conf *config) error { metric.WithDescription("The maximum number of idle open connections allowed"), ) if err != nil { - return err + return nil, err } idleMin, err := conf.meter.Int64ObservableUpDownCounter( @@ -100,7 +131,7 @@ func reportPoolStats(rdb *redis.Client, conf *config) error { metric.WithDescription("The minimum number of idle open connections allowed"), ) if err != nil { - return err + return nil, err } connsMax, err := conf.meter.Int64ObservableUpDownCounter( @@ -108,7 +139,7 @@ func reportPoolStats(rdb *redis.Client, conf *config) error { metric.WithDescription("The maximum number of open connections allowed"), ) if err != nil { - return err + return nil, err } usage, err := conf.meter.Int64ObservableUpDownCounter( @@ -116,7 +147,7 @@ func reportPoolStats(rdb *redis.Client, conf *config) error { metric.WithDescription("The number of connections that are currently in state described by the state attribute"), ) if err != nil { - return err + return nil, err } timeouts, err := conf.meter.Int64ObservableUpDownCounter( @@ -124,7 +155,7 @@ func reportPoolStats(rdb *redis.Client, conf *config) error { metric.WithDescription("The number of connection timeouts that have occurred trying to obtain a connection from the pool"), ) if err != nil { - return err + return nil, err } hits, err := conf.meter.Int64ObservableUpDownCounter( @@ -132,7 +163,7 @@ func reportPoolStats(rdb *redis.Client, conf *config) error { metric.WithDescription("The number of times free connection was found in the pool"), ) if err != nil { - return err + return nil, err } misses, err := conf.meter.Int64ObservableUpDownCounter( @@ -140,11 +171,11 @@ func reportPoolStats(rdb *redis.Client, conf *config) error { metric.WithDescription("The number of times free connection was not found in the pool"), ) if err != nil { - return err + return nil, err } redisConf := rdb.Options() - _, err = conf.meter.RegisterCallback( + return conf.meter.RegisterCallback( func(ctx context.Context, o metric.Observer) error { stats := rdb.PoolStats() @@ -168,8 +199,6 @@ func reportPoolStats(rdb *redis.Client, conf *config) error { hits, misses, ) - - return err } func addMetricsHook(rdb *redis.Client, conf *config) error {