Skip to content

Add closing support to otel metrics instrumentation #3444

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jul 24, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions extra/redisotel/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ type config struct {
meter metric.Meter

poolName string

closeChan chan struct{}
}

type baseOption interface {
Expand Down Expand Up @@ -145,3 +147,9 @@ func WithMeterProvider(mp metric.MeterProvider) MetricsOption {
conf.mp = mp
})
}

func WithCloseChan(closeChan chan struct{}) MetricsOption {
return metricsOption(func(conf *config) {
conf.closeChan = closeChan
})
}
117 changes: 73 additions & 44 deletions extra/redisotel/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"net"
"sync"
"time"

"go.opentelemetry.io/otel"
Expand All @@ -13,6 +14,12 @@ import (
"github.com/redis/go-redis/v9"
)

type metricsState struct {
registrations []metric.Registration
closed bool
mutex sync.Mutex
}

// InstrumentMetrics starts reporting OpenTelemetry Metrics.
//
// Based on https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/metrics/semantic_conventions/database-metrics.md
Expand All @@ -30,49 +37,42 @@ func InstrumentMetrics(rdb redis.UniversalClient, opts ...MetricsOption) error {
)
}

switch rdb := rdb.(type) {
case *redis.Client:
if conf.poolName == "" {
opt := rdb.Options()
conf.poolName = opt.Addr
var state *metricsState
if conf.closeChan != nil {
state = &metricsState{
registrations: make([]metric.Registration, 0),
closed: false,
mutex: sync.Mutex{},
}
conf.attrs = append(conf.attrs, attribute.String("pool.name", conf.poolName))

if err := reportPoolStats(rdb, conf); err != nil {
return err
}
if err := addMetricsHook(rdb, conf); err != nil {
return err
}
return nil
case *redis.ClusterClient:
rdb.OnNewNode(func(rdb *redis.Client) {
if conf.poolName == "" {
opt := rdb.Options()
conf.poolName = opt.Addr
}
conf.attrs = append(conf.attrs, attribute.String("pool.name", conf.poolName))
go func() {
<-conf.closeChan

if err := reportPoolStats(rdb, conf); err != nil {
otel.Handle(err)
state.mutex.Lock()
state.closed = true

for _, registration := range state.registrations {
if err := registration.Unregister(); err != nil {
otel.Handle(err)
}
}
if err := addMetricsHook(rdb, conf); err != nil {
state.mutex.Unlock()
}()
}

switch rdb := rdb.(type) {
case *redis.Client:
return registerClient(rdb, conf, state)
case *redis.ClusterClient:
rdb.OnNewNode(func(rdb *redis.Client) {
if err := registerClient(rdb, conf, state); err != nil {
otel.Handle(err)
}
})
return nil
case *redis.Ring:
rdb.OnNewNode(func(rdb *redis.Client) {
if conf.poolName == "" {
opt := rdb.Options()
conf.poolName = opt.Addr
}
conf.attrs = append(conf.attrs, attribute.String("pool.name", conf.poolName))

if err := reportPoolStats(rdb, conf); err != nil {
otel.Handle(err)
}
if err := addMetricsHook(rdb, conf); err != nil {
if err := registerClient(rdb, conf, state); err != nil {
otel.Handle(err)
}
})
Expand All @@ -82,7 +82,38 @@ func InstrumentMetrics(rdb redis.UniversalClient, opts ...MetricsOption) error {
}
}

func reportPoolStats(rdb *redis.Client, conf *config) error {
func registerClient(rdb *redis.Client, conf *config, state *metricsState) error {
if state != nil {
state.mutex.Lock()
defer state.mutex.Unlock()

if state.closed {
return nil
}
}

if conf.poolName == "" {
opt := rdb.Options()
conf.poolName = opt.Addr
}
conf.attrs = append(conf.attrs, attribute.String("pool.name", conf.poolName))

registration, err := reportPoolStats(rdb, conf)
if err != nil {
return err
}

if state != nil {
state.registrations = append(state.registrations, registration)
}

if err := addMetricsHook(rdb, conf); err != nil {
return err
}
return nil
}

func reportPoolStats(rdb *redis.Client, conf *config) (metric.Registration, error) {
labels := conf.attrs
idleAttrs := append(labels, attribute.String("state", "idle"))
usedAttrs := append(labels, attribute.String("state", "used"))
Expand All @@ -92,59 +123,59 @@ func reportPoolStats(rdb *redis.Client, conf *config) error {
metric.WithDescription("The maximum number of idle open connections allowed"),
)
if err != nil {
return err
return nil, err
}

idleMin, err := conf.meter.Int64ObservableUpDownCounter(
"db.client.connections.idle.min",
metric.WithDescription("The minimum number of idle open connections allowed"),
)
if err != nil {
return err
return nil, err
}

connsMax, err := conf.meter.Int64ObservableUpDownCounter(
"db.client.connections.max",
metric.WithDescription("The maximum number of open connections allowed"),
)
if err != nil {
return err
return nil, err
}

usage, err := conf.meter.Int64ObservableUpDownCounter(
"db.client.connections.usage",
metric.WithDescription("The number of connections that are currently in state described by the state attribute"),
)
if err != nil {
return err
return nil, err
}

timeouts, err := conf.meter.Int64ObservableUpDownCounter(
"db.client.connections.timeouts",
metric.WithDescription("The number of connection timeouts that have occurred trying to obtain a connection from the pool"),
)
if err != nil {
return err
return nil, err
}

hits, err := conf.meter.Int64ObservableUpDownCounter(
"db.client.connections.hits",
metric.WithDescription("The number of times free connection was found in the pool"),
)
if err != nil {
return err
return nil, err
}

misses, err := conf.meter.Int64ObservableUpDownCounter(
"db.client.connections.misses",
metric.WithDescription("The number of times free connection was not found in the pool"),
)
if err != nil {
return err
return nil, err
}

redisConf := rdb.Options()
_, err = conf.meter.RegisterCallback(
return conf.meter.RegisterCallback(
func(ctx context.Context, o metric.Observer) error {
stats := rdb.PoolStats()

Expand All @@ -168,8 +199,6 @@ func reportPoolStats(rdb *redis.Client, conf *config) error {
hits,
misses,
)

return err
}

func addMetricsHook(rdb *redis.Client, conf *config) error {
Expand Down
Loading