From 61baaf5f5dea4abce0d7ec87c6bec3f86f80bfdb Mon Sep 17 00:00:00 2001 From: Mykhailo Alipa Date: Wed, 13 Mar 2024 23:14:21 +0100 Subject: [PATCH 1/2] specify custom health check func via ShardHealthCheckFn option --- extra/rediscensus/go.sum | 2 ++ extra/rediscmd/go.mod | 4 ++-- extra/rediscmd/go.sum | 2 ++ extra/redisotel/go.sum | 2 ++ ring.go | 21 ++++++++++++++++++--- 5 files changed, 26 insertions(+), 5 deletions(-) diff --git a/extra/rediscensus/go.sum b/extra/rediscensus/go.sum index 697db5f85..c65067e28 100644 --- a/extra/rediscensus/go.sum +++ b/extra/rediscensus/go.sum @@ -2,8 +2,10 @@ cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMT github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/bsm/ginkgo/v2 v2.7.0 h1:ItPMPH90RbmZJt5GtkcNvIRuGEdwlBItdNVoyzaNQao= github.com/bsm/ginkgo/v2 v2.7.0/go.mod h1:AiKlXPm7ItEHNc/2+OkrNG4E0ITzojb9/xWzvQ9XZ9w= +github.com/bsm/ginkgo/v2 v2.12.0/go.mod h1:SwYbGRRDovPVboqFv0tPTcG1sN61LM1Z4ARdbAV9g4c= github.com/bsm/gomega v1.26.0 h1:LhQm+AFcgV2M0WyKroMASzAzCAJVpAxQXv4SaI9a69Y= github.com/bsm/gomega v1.26.0/go.mod h1:JyEr/xRbxbtgWNi8tIEVPUYZ5Dzef52k01W3YH0H+O0= +github.com/bsm/gomega v1.27.10/go.mod h1:JyEr/xRbxbtgWNi8tIEVPUYZ5Dzef52k01W3YH0H+O0= github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44= github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= diff --git a/extra/rediscmd/go.mod b/extra/rediscmd/go.mod index eb38857d4..e3b4be2f9 100644 --- a/extra/rediscmd/go.mod +++ b/extra/rediscmd/go.mod @@ -5,7 +5,7 @@ go 1.15 replace github.com/redis/go-redis/v9 => ../.. require ( - github.com/bsm/ginkgo/v2 v2.7.0 - github.com/bsm/gomega v1.26.0 + github.com/bsm/ginkgo/v2 v2.12.0 + github.com/bsm/gomega v1.27.10 github.com/redis/go-redis/v9 v9.5.1 ) diff --git a/extra/rediscmd/go.sum b/extra/rediscmd/go.sum index 33f9e16b4..966a0535a 100644 --- a/extra/rediscmd/go.sum +++ b/extra/rediscmd/go.sum @@ -1,7 +1,9 @@ github.com/bsm/ginkgo/v2 v2.7.0 h1:ItPMPH90RbmZJt5GtkcNvIRuGEdwlBItdNVoyzaNQao= github.com/bsm/ginkgo/v2 v2.7.0/go.mod h1:AiKlXPm7ItEHNc/2+OkrNG4E0ITzojb9/xWzvQ9XZ9w= +github.com/bsm/ginkgo/v2 v2.12.0/go.mod h1:SwYbGRRDovPVboqFv0tPTcG1sN61LM1Z4ARdbAV9g4c= github.com/bsm/gomega v1.26.0 h1:LhQm+AFcgV2M0WyKroMASzAzCAJVpAxQXv4SaI9a69Y= github.com/bsm/gomega v1.26.0/go.mod h1:JyEr/xRbxbtgWNi8tIEVPUYZ5Dzef52k01W3YH0H+O0= +github.com/bsm/gomega v1.27.10/go.mod h1:JyEr/xRbxbtgWNi8tIEVPUYZ5Dzef52k01W3YH0H+O0= github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44= github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78= diff --git a/extra/redisotel/go.sum b/extra/redisotel/go.sum index 22049451c..29ba7275a 100644 --- a/extra/redisotel/go.sum +++ b/extra/redisotel/go.sum @@ -1,7 +1,9 @@ github.com/bsm/ginkgo/v2 v2.7.0 h1:ItPMPH90RbmZJt5GtkcNvIRuGEdwlBItdNVoyzaNQao= github.com/bsm/ginkgo/v2 v2.7.0/go.mod h1:AiKlXPm7ItEHNc/2+OkrNG4E0ITzojb9/xWzvQ9XZ9w= +github.com/bsm/ginkgo/v2 v2.12.0/go.mod h1:SwYbGRRDovPVboqFv0tPTcG1sN61LM1Z4ARdbAV9g4c= github.com/bsm/gomega v1.26.0 h1:LhQm+AFcgV2M0WyKroMASzAzCAJVpAxQXv4SaI9a69Y= github.com/bsm/gomega v1.26.0/go.mod h1:JyEr/xRbxbtgWNi8tIEVPUYZ5Dzef52k01W3YH0H+O0= +github.com/bsm/gomega v1.27.10/go.mod h1:JyEr/xRbxbtgWNi8tIEVPUYZ5Dzef52k01W3YH0H+O0= github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44= github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= diff --git a/ring.go b/ring.go index 4ae00542b..a4b21add4 100644 --- a/ring.go +++ b/ring.go @@ -22,12 +22,20 @@ import ( var errRingShardsDown = errors.New("redis: all ring shards are down") +// defaultShardHealthCheckFn is the default function used to check the shard liveness +var defaultShardHealthCheckFn = func(ctx context.Context, client *Client) bool { + err := client.Ping(ctx).Err() + return err == nil || err == pool.ErrPoolTimeout +} + //------------------------------------------------------------------------------ type ConsistentHash interface { Get(string) string } +type ShardHealthCheckFn func(ctx context.Context, client *Client) bool + type rendezvousWrapper struct { *rendezvous.Rendezvous } @@ -54,10 +62,14 @@ type RingOptions struct { // ClientName will execute the `CLIENT SETNAME ClientName` command for each conn. ClientName string - // Frequency of PING commands sent to check shards availability. + // Frequency of executing ShardHealthCheckFn to check shards availability. // Shard is considered down after 3 subsequent failed checks. HeartbeatFrequency time.Duration + // A function used to check the shard liveness + // if not set, defaults to defaultShardHealthCheckFn + ShardHealthCheckFn ShardHealthCheckFn + // NewConsistentHash returns a consistent hash that is used // to distribute keys across the shards. // @@ -113,6 +125,10 @@ func (opt *RingOptions) init() { opt.HeartbeatFrequency = 500 * time.Millisecond } + if opt.ShardHealthCheckFn == nil { + opt.ShardHealthCheckFn = defaultShardHealthCheckFn + } + if opt.NewConsistentHash == nil { opt.NewConsistentHash = newRendezvous } @@ -408,8 +424,7 @@ func (c *ringSharding) Heartbeat(ctx context.Context, frequency time.Duration) { var rebalance bool for _, shard := range c.List() { - err := shard.Client.Ping(ctx).Err() - isUp := err == nil || err == pool.ErrPoolTimeout + isUp := c.opt.ShardHealthCheckFn(ctx, shard.Client) if shard.Vote(isUp) { internal.Logger.Printf(ctx, "ring shard state changed: %s", shard) rebalance = true From 7b064d44d6c3e45e26cb31b5e21c0115e77b9e3e Mon Sep 17 00:00:00 2001 From: Mykhailo Alipa Date: Thu, 14 Mar 2024 11:38:24 +0100 Subject: [PATCH 2/2] ShardHealthCheckFn renamed to HeartbeatFn --- ring.go | 18 ++++++++---------- 1 file changed, 8 insertions(+), 10 deletions(-) diff --git a/ring.go b/ring.go index a4b21add4..48238faef 100644 --- a/ring.go +++ b/ring.go @@ -22,8 +22,8 @@ import ( var errRingShardsDown = errors.New("redis: all ring shards are down") -// defaultShardHealthCheckFn is the default function used to check the shard liveness -var defaultShardHealthCheckFn = func(ctx context.Context, client *Client) bool { +// defaultHeartbeatFn is the default function used to check the shard liveness +var defaultHeartbeatFn = func(ctx context.Context, client *Client) bool { err := client.Ping(ctx).Err() return err == nil || err == pool.ErrPoolTimeout } @@ -34,8 +34,6 @@ type ConsistentHash interface { Get(string) string } -type ShardHealthCheckFn func(ctx context.Context, client *Client) bool - type rendezvousWrapper struct { *rendezvous.Rendezvous } @@ -62,13 +60,13 @@ type RingOptions struct { // ClientName will execute the `CLIENT SETNAME ClientName` command for each conn. ClientName string - // Frequency of executing ShardHealthCheckFn to check shards availability. + // Frequency of executing HeartbeatFn to check shards availability. // Shard is considered down after 3 subsequent failed checks. HeartbeatFrequency time.Duration // A function used to check the shard liveness - // if not set, defaults to defaultShardHealthCheckFn - ShardHealthCheckFn ShardHealthCheckFn + // if not set, defaults to defaultHeartbeatFn + HeartbeatFn func(ctx context.Context, client *Client) bool // NewConsistentHash returns a consistent hash that is used // to distribute keys across the shards. @@ -125,8 +123,8 @@ func (opt *RingOptions) init() { opt.HeartbeatFrequency = 500 * time.Millisecond } - if opt.ShardHealthCheckFn == nil { - opt.ShardHealthCheckFn = defaultShardHealthCheckFn + if opt.HeartbeatFn == nil { + opt.HeartbeatFn = defaultHeartbeatFn } if opt.NewConsistentHash == nil { @@ -424,7 +422,7 @@ func (c *ringSharding) Heartbeat(ctx context.Context, frequency time.Duration) { var rebalance bool for _, shard := range c.List() { - isUp := c.opt.ShardHealthCheckFn(ctx, shard.Client) + isUp := c.opt.HeartbeatFn(ctx, shard.Client) if shard.Vote(isUp) { internal.Logger.Printf(ctx, "ring shard state changed: %s", shard) rebalance = true