Skip to content

Commit e26eacc

Browse files
committed
add multiple executors in canary and config
1 parent 2ff73d9 commit e26eacc

6 files changed

Lines changed: 229 additions & 12 deletions

File tree

cmd/sharddistributor-canary/main.go

Lines changed: 35 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import (
1818
"github.com/uber/cadence/common/clock"
1919
"github.com/uber/cadence/common/log"
2020
"github.com/uber/cadence/service/sharddistributor/canary"
21+
canaryConfig "github.com/uber/cadence/service/sharddistributor/canary/config"
2122
"github.com/uber/cadence/service/sharddistributor/canary/executors"
2223
"github.com/uber/cadence/service/sharddistributor/client/clientcommon"
2324
"github.com/uber/cadence/service/sharddistributor/client/executorclient"
@@ -32,6 +33,7 @@ const (
3233
defaultFixedNamespace = "shard-distributor-canary"
3334
defaultEphemeralNamespace = "shard-distributor-canary-ephemeral"
3435
defaultCanaryGRPCPort = 7953 // Port for canary to receive ping requests
36+
defaultNumExecutors = 1
3537

3638
shardDistributorServiceName = "cadence-shard-distributor"
3739
)
@@ -42,10 +44,14 @@ func runApp(c *cli.Context) {
4244
ephemeralNamespace := c.String("ephemeral-namespace")
4345
canaryGRPCPort := c.Int("canary-grpc-port")
4446

45-
fx.New(opts(fixedNamespace, ephemeralNamespace, endpoint, canaryGRPCPort)).Run()
47+
numExecutors := c.Int("num-executors")
48+
numFixedExecutors := max(c.Int("num-fixed-executors"), numExecutors)
49+
numEphemeralxecutors := max(c.Int("num-ephemeral-executors"), numExecutors)
50+
51+
fx.New(opts(fixedNamespace, ephemeralNamespace, endpoint, canaryGRPCPort, numFixedExecutors, numEphemeralxecutors)).Run()
4652
}
4753

48-
func opts(fixedNamespace, ephemeralNamespace, endpoint string, canaryGRPCPort int) fx.Option {
54+
func opts(fixedNamespace, ephemeralNamespace, endpoint string, canaryGRPCPort int, numFixedExecutors, numEphemeral int) fx.Option {
4955
configuration := clientcommon.Config{
5056
Namespaces: []clientcommon.NamespaceConfig{
5157
{Namespace: fixedNamespace, HeartBeatInterval: 1 * time.Second, MigrationMode: config.MigrationModeONBOARDED},
@@ -130,7 +136,18 @@ func opts(fixedNamespace, ephemeralNamespace, endpoint string, canaryGRPCPort in
130136
}),
131137

132138
// Include the canary module - it will set up spectator peer choosers and canary client
133-
canary.Module(canary.NamespacesNames{FixedNamespace: fixedNamespace, EphemeralNamespace: ephemeralNamespace, ExternalAssignmentNamespace: executors.ExternalAssignmentNamespace, SharddistributorServiceName: shardDistributorServiceName}),
139+
canary.Module(canary.NamespacesNames{
140+
FixedNamespace: fixedNamespace,
141+
EphemeralNamespace: ephemeralNamespace,
142+
ExternalAssignmentNamespace: executors.ExternalAssignmentNamespace,
143+
SharddistributorServiceName: shardDistributorServiceName,
144+
Config: canaryConfig.Config{
145+
Canary: canaryConfig.CanaryConfig{
146+
NumFixedExecutors: numFixedExecutors,
147+
NumEphemeralExecutors: numEphemeral,
148+
},
149+
},
150+
}),
134151
)
135152
}
136153

@@ -166,6 +183,21 @@ func buildCLI() *cli.App {
166183
Value: defaultCanaryGRPCPort,
167184
Usage: "port for canary to receive ping requests",
168185
},
186+
&cli.IntFlag{
187+
Name: "num-executors",
188+
Value: defaultNumExecutors,
189+
Usage: "number of executors for fixed and ephemeral to start. Overrides num-fixed-executors and num-ephemeral-executors flags",
190+
},
191+
&cli.IntFlag{
192+
Name: "num-fixed-executors",
193+
Value: defaultNumExecutors,
194+
Usage: "number of executors of fixed namespace to start. Don't use with num-executors",
195+
},
196+
&cli.IntFlag{
197+
Name: "num-ephemeral-executors",
198+
Value: defaultNumExecutors,
199+
Usage: "number of executors of ephemeral namespace to start. Don't use with num-executors",
200+
},
169201
},
170202
Action: func(c *cli.Context) error {
171203
runApp(c)

cmd/sharddistributor-canary/main_test.go

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,5 +8,12 @@ import (
88
)
99

1010
func TestDependenciesAreSatisfied(t *testing.T) {
11-
assert.NoError(t, fx.ValidateApp(opts(defaultFixedNamespace, defaultEphemeralNamespace, defaultShardDistributorEndpoint, defaultCanaryGRPCPort)))
11+
assert.NoError(t, fx.ValidateApp(opts(
12+
defaultFixedNamespace,
13+
defaultEphemeralNamespace,
14+
defaultShardDistributorEndpoint,
15+
defaultCanaryGRPCPort,
16+
defaultNumExecutors,
17+
defaultNumExecutors,
18+
)))
1219
}
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
package config
2+
3+
// Config is the configuration for the shard distributor canary
4+
type Config struct {
5+
Canary CanaryConfig `yaml:"canary"`
6+
}
7+
8+
type CanaryConfig struct {
9+
// NumFixedExecutors is the number of executors of fixed namespace
10+
// Values more than 1 will create multiple executors processing the same fixed namespace
11+
// Default: 1
12+
NumFixedExecutors int `yaml:"numFixedExecutors"`
13+
14+
// NumEphemeralExecutors is the number of executors of ephemeral namespace
15+
// Values more than 1 will create multiple executors processing the same ephemeral namespace
16+
// Default: 1
17+
NumEphemeralExecutors int `yaml:"numEphemeralExecutors"`
18+
}

service/sharddistributor/canary/executors/executors.go

Lines changed: 55 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"go.uber.org/fx"
55

66
"github.com/uber/cadence/client/sharddistributor"
7+
"github.com/uber/cadence/service/sharddistributor/canary/config"
78
"github.com/uber/cadence/service/sharddistributor/canary/externalshardassignment"
89
"github.com/uber/cadence/service/sharddistributor/canary/processor"
910
"github.com/uber/cadence/service/sharddistributor/canary/processorephemeral"
@@ -31,11 +32,57 @@ type ExecutorEphemeralResult struct {
3132
Executor executorclient.Executor[*processorephemeral.ShardProcessor] `group:"executor-ephemeral-proc"`
3233
}
3334

35+
type ExecutorsResult struct {
36+
fx.Out
37+
Executors []executorclient.Executor[*processor.ShardProcessor] `group:"executor-fixed-proc,flatten"`
38+
}
39+
40+
type ExecutorsEphemeralResult struct {
41+
fx.Out
42+
Executors []executorclient.Executor[*processorephemeral.ShardProcessor] `group:"executor-ephemeral-proc,flatten"`
43+
}
44+
45+
func NewExecutorsWithFixedNamespace(params executorclient.Params[*processor.ShardProcessor], namespace string, numExecutors int) (ExecutorsResult, error) {
46+
var result ExecutorsResult
47+
48+
if numExecutors <= 0 {
49+
numExecutors = 1
50+
}
51+
52+
for i := 0; i < numExecutors; i++ {
53+
executor, err := executorclient.NewExecutorWithNamespace(params, namespace)
54+
if err != nil {
55+
return ExecutorsResult{}, err
56+
}
57+
result.Executors = append(result.Executors, executor)
58+
}
59+
60+
return result, nil
61+
}
62+
3463
func NewExecutorWithFixedNamespace(params executorclient.Params[*processor.ShardProcessor], namespace string) (ExecutorResult, error) {
3564
executor, err := executorclient.NewExecutorWithNamespace(params, namespace)
3665
return ExecutorResult{Executor: executor}, err
3766
}
3867

68+
func NewExecutorsWithEphemeralNamespace(params executorclient.Params[*processorephemeral.ShardProcessor], namespace string, numExecutors int) (ExecutorsEphemeralResult, error) {
69+
var result ExecutorsEphemeralResult
70+
71+
if numExecutors <= 0 {
72+
numExecutors = 1
73+
}
74+
75+
for i := 0; i < numExecutors; i++ {
76+
executor, err := executorclient.NewExecutorWithNamespace(params, namespace)
77+
if err != nil {
78+
return ExecutorsEphemeralResult{}, err
79+
}
80+
result.Executors = append(result.Executors, executor)
81+
}
82+
83+
return result, nil
84+
}
85+
3986
func NewExecutorWithEphemeralNamespace(params executorclient.Params[*processorephemeral.ShardProcessor], namespace string) (ExecutorEphemeralResult, error) {
4087
executor, err := executorclient.NewExecutorWithNamespace(params, namespace)
4188
return ExecutorEphemeralResult{Executor: executor}, err
@@ -83,17 +130,17 @@ func NewExecutorsModule(params ExecutorsParams) {
83130
}
84131

85132
func Module(fixedNamespace, ephemeralNamespace, externalAssignmentNamespace string) fx.Option {
86-
return fx.Module(
87-
"Executors",
133+
return fx.Module("Executors",
88134
// Executor that is used for testing a namespace with fixed shards
89-
fx.Provide(
90-
func(params executorclient.Params[*processor.ShardProcessor]) (ExecutorResult, error) {
91-
return NewExecutorWithFixedNamespace(params, fixedNamespace)
92-
}),
135+
fx.Provide(func(cfg config.Config, params executorclient.Params[*processor.ShardProcessor]) (ExecutorsResult, error) {
136+
return NewExecutorsWithFixedNamespace(params, fixedNamespace, cfg.Canary.NumFixedExecutors)
137+
}),
138+
93139
// Executor that is used for testing a namespaces with ephemeral shards
94-
fx.Provide(func(params executorclient.Params[*processorephemeral.ShardProcessor]) (ExecutorEphemeralResult, error) {
95-
return NewExecutorWithEphemeralNamespace(params, ephemeralNamespace)
140+
fx.Provide(func(cfg config.Config, params executorclient.Params[*processorephemeral.ShardProcessor]) (ExecutorsEphemeralResult, error) {
141+
return NewExecutorsWithEphemeralNamespace(params, ephemeralNamespace, cfg.Canary.NumEphemeralExecutors)
96142
}),
143+
97144
// Executor used for testing a namespace where the shards are assigned externally and reflected in the state of the SD
98145
// this is reproducing the behaviour that matching service is going to have during the DistributedPassthrough mode
99146
fx.Module("Executor-with-external-assignment",

service/sharddistributor/canary/executors/executors_test.go

Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -232,3 +232,111 @@ func createMockParams[SP executorclient.ShardProcessor](
232232
TimeSource: clock.NewMockedTimeSource(),
233233
}
234234
}
235+
236+
func TestNewExecutorsWithFixedNamespace(t *testing.T) {
237+
ctrl := gomock.NewController(t)
238+
tests := []struct {
239+
name string
240+
namespace string
241+
numExecutors int
242+
expected int
243+
}{
244+
{
245+
name: "zero executors defaults to one",
246+
namespace: "test-namespace",
247+
numExecutors: 0,
248+
expected: 1,
249+
},
250+
{
251+
name: "negative executors defaults to one",
252+
namespace: "test-namespace",
253+
numExecutors: -1,
254+
expected: 1,
255+
},
256+
{
257+
name: "one executor",
258+
namespace: "test-namespace",
259+
numExecutors: 1,
260+
expected: 1,
261+
},
262+
{
263+
name: "two executors",
264+
namespace: "test-namespace",
265+
numExecutors: 2,
266+
expected: 2,
267+
},
268+
{
269+
name: "five executors",
270+
namespace: "test-namespace",
271+
numExecutors: 5,
272+
expected: 5,
273+
},
274+
}
275+
276+
for _, tt := range tests {
277+
t.Run(tt.name, func(t *testing.T) {
278+
params := createMockParams[*processor.ShardProcessor](ctrl, tt.namespace)
279+
result, err := NewExecutorsWithFixedNamespace(params, tt.namespace, tt.numExecutors)
280+
281+
require.NoError(t, err)
282+
assert.Len(t, result.Executors, tt.expected)
283+
for _, executor := range result.Executors {
284+
assert.NotNil(t, executor)
285+
}
286+
})
287+
}
288+
}
289+
290+
func TestNewExecutorsWithEphemeralNamespace(t *testing.T) {
291+
ctrl := gomock.NewController(t)
292+
tests := []struct {
293+
name string
294+
namespace string
295+
numExecutors int
296+
expected int
297+
}{
298+
{
299+
name: "zero executors defaults to one",
300+
namespace: "test-namespace",
301+
numExecutors: 0,
302+
expected: 1,
303+
},
304+
{
305+
name: "negative executors defaults to one",
306+
namespace: "test-namespace",
307+
numExecutors: -1,
308+
expected: 1,
309+
},
310+
{
311+
name: "one executor",
312+
namespace: "test-namespace",
313+
numExecutors: 1,
314+
expected: 1,
315+
},
316+
{
317+
name: "two executors",
318+
namespace: "test-namespace",
319+
numExecutors: 2,
320+
expected: 2,
321+
},
322+
{
323+
name: "five executors",
324+
namespace: "test-namespace",
325+
numExecutors: 5,
326+
expected: 5,
327+
},
328+
}
329+
330+
for _, tt := range tests {
331+
t.Run(tt.name, func(t *testing.T) {
332+
params := createMockParams[*processorephemeral.ShardProcessor](ctrl, tt.namespace)
333+
result, err := NewExecutorsWithEphemeralNamespace(params, tt.namespace, tt.numExecutors)
334+
335+
require.NoError(t, err)
336+
assert.Len(t, result.Executors, tt.expected)
337+
for _, executor := range result.Executors {
338+
assert.NotNil(t, executor)
339+
}
340+
})
341+
}
342+
}

service/sharddistributor/canary/module.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"go.uber.org/yarpc"
66

77
sharddistributorv1 "github.com/uber/cadence/.gen/proto/sharddistributor/v1"
8+
"github.com/uber/cadence/service/sharddistributor/canary/config"
89
"github.com/uber/cadence/service/sharddistributor/canary/executors"
910
"github.com/uber/cadence/service/sharddistributor/canary/factory"
1011
"github.com/uber/cadence/service/sharddistributor/canary/handler"
@@ -23,6 +24,8 @@ type NamespacesNames struct {
2324
EphemeralNamespace string
2425
ExternalAssignmentNamespace string
2526
SharddistributorServiceName string
27+
28+
Config config.Config
2629
}
2730

2831
func Module(namespacesNames NamespacesNames) fx.Option {
@@ -31,6 +34,8 @@ func Module(namespacesNames NamespacesNames) fx.Option {
3134

3235
func opts(names NamespacesNames) fx.Option {
3336
return fx.Options(
37+
fx.Supply(names.Config),
38+
3439
fx.Provide(sharddistributorv1.NewFxShardDistributorExecutorAPIYARPCClient(names.SharddistributorServiceName)),
3540
fx.Provide(sharddistributorv1.NewFxShardDistributorAPIYARPCClient(names.SharddistributorServiceName)),
3641

0 commit comments

Comments
 (0)