Skip to content

Commit 40ca8d4

Browse files
authored
feat: [Dynamic config] Support for namespace filter and onboarding property (#7358)
<!-- Describe what has changed in this PR --> **What changed?** - Adding namespace as a filter for dynamic properties - Model the onboarding mode as a dynamic property <!-- Tell your future self why have you made these changes --> **Why?** We need to onboard to the shard distributor incrementally using a dynamic configuration <!-- How have you verified this change? Tested locally? Added a unit test? Checked in staging env? --> **How did you test it?** Unit tests, the dynamic config it is not integrated yet and used in the code <!-- Assuming the worst case, what can be broken when deploying this change to production? --> **Potential risks** <!-- Is it notable for release? e.g. schema updates, configuration or data migration required? If so, please mention it, and also update CHANGELOG.md --> **Release notes** <!-- Is there any documentation updates should be made for config, https://cadenceworkflow.io/docs/operation-guide/setup/ ? If so, please open an PR in https://github.com/cadence-workflow/cadence-docs --> **Documentation Changes** --------- Signed-off-by: edigregorio <[email protected]>
1 parent 18f7677 commit 40ca8d4

File tree

6 files changed

+73
-3
lines changed

6 files changed

+73
-3
lines changed

common/dynamicconfig/config.go

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -447,6 +447,22 @@ func (c *Collection) GetStringPropertyFilteredByTaskListInfo(key dynamicproperti
447447
}
448448
}
449449

450+
// GetStringPropertyFilteredByNamespace gets property with domain filter and asserts that it's a string
451+
func (c *Collection) GetStringPropertyFilteredByNamespace(key dynamicproperties.StringKey) dynamicproperties.StringPropertyFnWithNamespaceFilters {
452+
return func(namespace string) string {
453+
filters := c.toFilterMap(dynamicproperties.NamespaceFilter(namespace))
454+
val, err := c.client.GetStringValue(
455+
key,
456+
filters,
457+
)
458+
if err != nil {
459+
c.logError(key, filters, err)
460+
return key.DefaultString()
461+
}
462+
return val
463+
}
464+
}
465+
450466
// GetBoolPropertyFilteredByDomain gets property with domain filter and asserts that it's a bool
451467
func (c *Collection) GetBoolPropertyFilteredByDomain(key dynamicproperties.BoolKey) dynamicproperties.BoolPropertyFnWithDomainFilter {
452468
return func(domain string) bool {

common/dynamicconfig/config_test.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,15 @@ func (s *configSuite) TestGetStringPropertyFnByTaskListInfo() {
120120
s.Equal("round-robin", value(domain, taskList, taskType))
121121
}
122122

123+
func (s *configSuite) TestGetStringPropertyFnWithNamespaceFilter() {
124+
key := dynamicproperties.MigrationMode
125+
namespace := "testService"
126+
value := s.cln.GetStringPropertyFilteredByNamespace(key)
127+
s.Equal(key.DefaultString(), value(namespace))
128+
s.client.SetValue(key, "efg")
129+
s.Equal("efg", value(namespace))
130+
}
131+
123132
func (s *configSuite) TestGetStringPropertyFilteredByRatelimitKey() {
124133
key := dynamicproperties.FrontendGlobalRatelimiterMode
125134
ratelimitKey := "user:testDomain"

common/dynamicconfig/dynamicproperties/constants.go

Lines changed: 23 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2422,14 +2422,28 @@ const (
24222422
// Default value: "hash_ring"
24232423
MatchingShardDistributionMode
24242424

2425-
// LastStringKey must be the last one in this const group
2426-
LastStringKey
2427-
24282425
// SerializationEncoding is the encoding type for blobs
24292426
// KeyName: history.serializationEncoding
24302427
// Value type: String
24312428
// Default value: "thriftrw"
24322429
SerializationEncoding
2430+
2431+
// MigrationMode is the mode the at represent the state of the migration to rely on shard distributor for the sharding mechanism
2432+
//
2433+
// "invalid" invalid mode for the migration, not expected to be used
2434+
// "local_pass" the executor library is integrated but no external call to the SD happening
2435+
// "local_pass_shadow" heartbeat calls to the SD to update the sharding state in SD
2436+
// "distributed_pass" the local sharding mechanism is sent to SD, returned by SD and applied in the onboarded service
2437+
// "onboarded" the sharding logic in SD is used
2438+
//
2439+
// KeyName: shardDistributor.migrationMode
2440+
// Value type: String
2441+
// Default value: local_pass
2442+
// Allowed filters: namespace
2443+
MigrationMode
2444+
2445+
// LastStringKey must be the last one in this const group
2446+
LastStringKey
24332447
)
24342448

24352449
const (
@@ -4939,6 +4953,12 @@ var StringKeys = map[StringKey]DynamicString{
49394953
Description: "SerializationEncoding is the encoding type for blobs",
49404954
DefaultValue: string(constants.EncodingTypeThriftRW),
49414955
},
4956+
MigrationMode: {
4957+
KeyName: "shardDistributor.migrationMode",
4958+
Description: "MigrationMode is the mode the at represent the state of the migration to rely on shard distributor for the sharding mechanism",
4959+
DefaultValue: "local_pass",
4960+
Filters: []Filter{Namespace},
4961+
},
49424962
}
49434963

49444964
var DurationKeys = map[DurationKey]DynamicDuration{

common/dynamicconfig/dynamicproperties/constants_test.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -248,6 +248,13 @@ func (s *constantSuite) TestStringKey() {
248248
Description: "ReadVisibilityStoreName is key to identify which store to read visibility data from",
249249
DefaultValue: "es",
250250
},
251+
"MigrationMode": {
252+
Key: MigrationMode,
253+
KeyName: "shardDistributor.migrationMode",
254+
Filters: []Filter{Namespace},
255+
Description: "MigrationMode is the mode the at represent the state of the migration to rely on shard distributor for the sharding mechanism",
256+
DefaultValue: "local_pass",
257+
},
251258
}
252259

253260
for _, value := range testStringKeys {

common/dynamicconfig/dynamicproperties/definitions.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,9 @@ type StringPropertyFnWithDomainFilter func(domain string) string
8686
// StringPropertyFnWithTaskListInfoFilters is a wrapper to get string property from dynamic config with domainID as filter
8787
type StringPropertyFnWithTaskListInfoFilters func(domain string, taskList string, taskType int) string
8888

89+
// StringPropertyFnWithNamespaceFilters is a wrapper to get string property from dynamic config with namespace as filter
90+
type StringPropertyFnWithNamespaceFilters func(namespace string) string
91+
8992
// BoolPropertyFnWithDomainFilter is a wrapper to get bool property from dynamic config with domain as filter
9093
type BoolPropertyFnWithDomainFilter func(domain string) bool
9194

@@ -113,6 +116,9 @@ type ListPropertyFn func(opts ...FilterOption) []interface{}
113116
// StringPropertyWithRatelimitKeyFilter is a wrapper to get strings (currently global ratelimiter modes) per global ratelimit key
114117
type StringPropertyWithRatelimitKeyFilter func(globalRatelimitKey string) string
115118

119+
// StringPropertyWithNamespaceFilter is a wrapper to get strings per namespace
120+
type StringPropertyWithNamespaceFilter func(namespace string) string
121+
116122
func (f IntPropertyFn) AsFloat64(opts ...FilterOption) func() float64 {
117123
return func() float64 { return float64(f(opts...)) }
118124
}

common/dynamicconfig/dynamicproperties/filter.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,8 @@ func ParseFilter(filterName string) Filter {
5757
return WorkflowType
5858
case "ratelimitKey":
5959
return RatelimitKey
60+
case "namespace":
61+
return Namespace
6062
default:
6163
return UnknownFilter
6264
}
@@ -73,6 +75,7 @@ var filters = []string{
7375
"workflowID",
7476
"workflowType",
7577
"ratelimitKey",
78+
"namespace",
7679
}
7780

7881
const (
@@ -95,6 +98,8 @@ const (
9598
WorkflowType
9699
// RatelimitKey is the global ratelimit key (not a local key name)
97100
RatelimitKey
101+
// Namespace is the entity of independent shard distribution mechanism
102+
Namespace
98103

99104
// LastFilterTypeForTest must be the last one in this const group for testing purpose
100105
LastFilterTypeForTest
@@ -166,6 +171,13 @@ func RatelimitKeyFilter(key string) FilterOption {
166171
}
167172
}
168173

174+
// NamespaceFilter filters by namespace
175+
func NamespaceFilter(namespace string) FilterOption {
176+
return func(filterMap map[Filter]interface{}) {
177+
filterMap[Namespace] = namespace
178+
}
179+
}
180+
169181
// ToGetDynamicConfigFilterRequest generates a GetDynamicConfigRequest object
170182
// by converting filters to DynamicConfigFilter objects and setting values
171183
func ToGetDynamicConfigFilterRequest(configName string, filters []FilterOption) *types.GetDynamicConfigRequest {

0 commit comments

Comments
 (0)