Skip to content

Commit 18f7677

Browse files
authored
feat: [Executor-client] Passthrough logic for migration (#7345)
<!-- Describe what has changed in this PR --> **What changed?** - Implemented the logic for the migration: - no heartbeating if local passthrough - no assignment if local passthrough shadow - assignment in case of distributed passthrough and onboarded - logic for consistency checks and metrics in case passthrough shadow and distributed passthrough - static configuration to control the first step of the migration <!-- Tell your future self why have you made these changes --> **Why?** These mechanism are needed to migrate incrementally from using sharding logic which is part of the system to the shard distributor <!-- How have you verified this change? Tested locally? Added a unit test? Checked in staging env? --> **How did you test it?** unit tests and local testing <!-- Assuming the worst case, what can be broken when deploying this change to production? --> **Potential risks** nothing is onboarded yet, except canary service <!-- Is it notable for release? e.g. schema updates, configuration or data migration required? If so, please mention it, and also update CHANGELOG.md --> **Release notes** <!-- Is there any documentation updates should be made for config, https://cadenceworkflow.io/docs/operation-guide/setup/ ? If so, please open an PR in https://github.com/cadence-workflow/cadence-docs --> **Documentation Changes** --------- Signed-off-by: edigregorio <[email protected]>
1 parent bcabd9d commit 18f7677

File tree

10 files changed

+600
-17
lines changed

10 files changed

+600
-17
lines changed

cmd/sharddistributor-canary/main.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import (
1414
"github.com/uber/cadence/common/clock"
1515
"github.com/uber/cadence/common/log"
1616
"github.com/uber/cadence/service/sharddistributor/canary"
17+
"github.com/uber/cadence/service/sharddistributor/config"
1718
"github.com/uber/cadence/service/sharddistributor/executorclient"
1819
"github.com/uber/cadence/tools/common/commoncli"
1920
)
@@ -38,8 +39,8 @@ func runApp(c *cli.Context) {
3839
func opts(fixedNamespace, ephemeralNamespace, endpoint string) fx.Option {
3940
config := executorclient.Config{
4041
Namespaces: []executorclient.NamespaceConfig{
41-
{Namespace: fixedNamespace, HeartBeatInterval: 1 * time.Second},
42-
{Namespace: ephemeralNamespace, HeartBeatInterval: 1 * time.Second},
42+
{Namespace: fixedNamespace, HeartBeatInterval: 1 * time.Second, MigrationMode: config.MigrationModeONBOARDED},
43+
{Namespace: ephemeralNamespace, HeartBeatInterval: 1 * time.Second, MigrationMode: config.MigrationModeONBOARDED},
4344
},
4445
}
4546

common/types/sharddistributor.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ package types
2424

2525
import "fmt"
2626

27-
//go:generate enumer -type=ExecutorStatus,ShardStatus,AssignmentStatus -json -output sharddistributor_statuses_enumer_generated.go
27+
//go:generate enumer -type=ExecutorStatus,ShardStatus,AssignmentStatus,MigrationMode -json -output sharddistributor_statuses_enumer_generated.go
2828

2929
type GetShardOwnerRequest struct {
3030
ShardKey string
@@ -204,9 +204,9 @@ const (
204204
type MigrationMode int32
205205

206206
const (
207-
MigrationModeINVALID = 0
208-
MigrationModeLOCALPASSTHROUGH = 1
209-
MigrationModeLOCALPASSTHROUGHSHADOW = 2
210-
MigrationModeDISTRIBUTEDPASSTHROUGH = 3
211-
MigrationModeONBOARDED = 4
207+
MigrationModeINVALID MigrationMode = 0
208+
MigrationModeLOCALPASSTHROUGH MigrationMode = 1
209+
MigrationModeLOCALPASSTHROUGHSHADOW MigrationMode = 2
210+
MigrationModeDISTRIBUTEDPASSTHROUGH MigrationMode = 3
211+
MigrationModeONBOARDED MigrationMode = 4
212212
)

common/types/sharddistributor_statuses_enumer_generated.go

Lines changed: 100 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

service/sharddistributor/config/config.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,8 @@ const (
9595
MigrationModeONBOARDED = "onboarded"
9696
)
9797

98-
var configMode = map[string]types.MigrationMode{
98+
// ConfigMode maps string migration mode values to types.MigrationMode
99+
var ConfigMode = map[string]types.MigrationMode{
99100
MigrationModeINVALID: types.MigrationModeINVALID,
100101
MigrationModeLOCALPASSTHROUGH: types.MigrationModeLOCALPASSTHROUGH,
101102
MigrationModeLOCALPASSTHROUGHSHADOW: types.MigrationModeLOCALPASSTHROUGHSHADOW,
@@ -106,11 +107,11 @@ var configMode = map[string]types.MigrationMode{
106107
func (s *ShardDistribution) GetMigrationMode(namespace string) types.MigrationMode {
107108
for _, ns := range s.Namespaces {
108109
if ns.Name == namespace {
109-
return configMode[ns.Mode]
110+
return ConfigMode[ns.Mode]
110111
}
111112
}
112113
// TODO in the dynamic configuration I will setup a default value
113-
return configMode[MigrationModeONBOARDED]
114+
return ConfigMode[MigrationModeONBOARDED]
114115
}
115116

116117
// NewConfig returns new service config with default values

service/sharddistributor/executorclient/client.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,7 @@ func newExecutorWithConfig[SP ShardProcessor](params Params[SP], namespaceConfig
112112
timeSource: params.TimeSource,
113113
stopC: make(chan struct{}),
114114
metrics: metricsScope,
115+
migrationMode: namespaceConfig.GetMigrationMode(),
115116
}, nil
116117
}
117118

service/sharddistributor/executorclient/clientimpl.go

Lines changed: 94 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@ type executorImpl[SP ShardProcessor] struct {
6767
processLoopWG sync.WaitGroup
6868
assignmentMutex sync.Mutex
6969
metrics tally.Scope
70+
migrationMode types.MigrationMode
7071
}
7172

7273
func (e *executorImpl[SP]) Start(ctx context.Context) {
@@ -102,6 +103,12 @@ func (e *executorImpl[SP]) AssignShardsFromLocalLogic(ctx context.Context, shard
102103
}
103104

104105
func (e *executorImpl[SP]) heartbeatloop(ctx context.Context) {
106+
// Check if initial migration mode is LOCAL_PASSTHROUGH - if so, skip heartbeating entirely
107+
if e.migrationMode == types.MigrationModeLOCALPASSTHROUGH {
108+
e.logger.Info("initial migration mode is local passthrough, skipping heartbeat loop")
109+
return
110+
}
111+
105112
heartBeatTimer := e.timeSource.NewTimer(getJitteredHeartbeatDuration(e.heartBeatInterval, heartbeatJitterMax))
106113
defer heartBeatTimer.Stop()
107114

@@ -117,11 +124,39 @@ func (e *executorImpl[SP]) heartbeatloop(ctx context.Context) {
117124
return
118125
case <-heartBeatTimer.Chan():
119126
heartBeatTimer.Reset(getJitteredHeartbeatDuration(e.heartBeatInterval, heartbeatJitterMax))
120-
shardAssignment, err := e.heartbeat(ctx)
127+
shardAssignment, migrationMode, err := e.heartbeat(ctx)
121128
if err != nil {
122129
e.logger.Error("failed to heartbeat", tag.Error(err))
123130
continue // TODO: should we stop the executor, and drop all the shards?
124131
}
132+
133+
// Handle migration mode logic
134+
switch migrationMode {
135+
case types.MigrationModeLOCALPASSTHROUGH:
136+
// LOCAL_PASSTHROUGH: statically assigned, stop heartbeating
137+
e.logger.Info("local passthrough mode: stopping heartbeat loop")
138+
return
139+
140+
case types.MigrationModeLOCALPASSTHROUGHSHADOW:
141+
// LOCAL_PASSTHROUGH_SHADOW: check response but don't apply it
142+
e.compareAssignments(shardAssignment)
143+
continue
144+
145+
case types.MigrationModeDISTRIBUTEDPASSTHROUGH:
146+
// DISTRIBUTED_PASSTHROUGH: validate then apply the assignment
147+
e.compareAssignments(shardAssignment)
148+
// Continue with applying the assignment from heartbeat
149+
150+
case types.MigrationModeONBOARDED:
151+
// ONBOARDED: normal flow, apply the assignment from heartbeat
152+
// Continue with normal assignment logic below
153+
154+
default:
155+
e.logger.Warn("unknown migration mode, skipping assignment",
156+
tag.Dynamic("migration-mode", migrationMode))
157+
continue
158+
}
159+
125160
if !e.assignmentMutex.TryLock() {
126161
e.logger.Warn("already doing shard assignment, will skip this assignment")
127162
e.metrics.Counter(metricsconstants.ShardDistributorExecutorAssignmentSkipped).Inc(1)
@@ -141,7 +176,7 @@ func (e *executorImpl[SP]) heartbeatloop(ctx context.Context) {
141176
}
142177
}
143178

144-
func (e *executorImpl[SP]) heartbeat(ctx context.Context) (shardAssignments map[string]*types.ShardAssignment, err error) {
179+
func (e *executorImpl[SP]) heartbeat(ctx context.Context) (shardAssignments map[string]*types.ShardAssignment, migrationMode types.MigrationMode, err error) {
145180
// Fill in the shard status reports
146181
shardStatusReports := make(map[string]*types.ShardStatusReport)
147182
e.managedProcessors.Range(func(shardID string, managedProcessor *managedProcessor[SP]) bool {
@@ -169,10 +204,19 @@ func (e *executorImpl[SP]) heartbeat(ctx context.Context) (shardAssignments map[
169204
// Send the request
170205
response, err := e.shardDistributorClient.Heartbeat(ctx, request)
171206
if err != nil {
172-
return nil, fmt.Errorf("send heartbeat: %w", err)
207+
return nil, types.MigrationModeINVALID, fmt.Errorf("send heartbeat: %w", err)
208+
}
209+
210+
previousMode := e.migrationMode
211+
currentMode := response.MigrationMode
212+
if previousMode != currentMode {
213+
e.logger.Info("migration mode transition",
214+
tag.Dynamic("previous", previousMode),
215+
tag.Dynamic("current", currentMode))
216+
e.migrationMode = currentMode
173217
}
174218

175-
return response.ShardAssignments, nil
219+
return response.ShardAssignments, response.MigrationMode, nil
176220
}
177221

178222
func (e *executorImpl[SP]) updateShardAssignment(ctx context.Context, shardAssignments map[string]*types.ShardAssignment) {
@@ -246,6 +290,52 @@ func (e *executorImpl[SP]) stopShardProcessors() {
246290
wg.Wait()
247291
}
248292

293+
// compareAssignments compares the local assignments with the heartbeat response assignments
294+
// and emits convergence or divergence metrics
295+
func (e *executorImpl[SP]) compareAssignments(heartbeatAssignments map[string]*types.ShardAssignment) {
296+
// Get current local assignments
297+
localAssignments := make(map[string]bool)
298+
e.managedProcessors.Range(func(shardID string, managedProcessor *managedProcessor[SP]) bool {
299+
if managedProcessor.getState() == processorStateStarted {
300+
localAssignments[shardID] = true
301+
}
302+
return true
303+
})
304+
305+
// Check if all local assignments are in heartbeat assignments with READY status
306+
for shardID := range localAssignments {
307+
assignment, exists := heartbeatAssignments[shardID]
308+
if !exists || assignment.Status != types.AssignmentStatusREADY {
309+
e.logger.Warn("assignment divergence: local shard not in heartbeat or not ready",
310+
tag.Dynamic("shard-id", shardID))
311+
e.emitMetricsConvergence(false)
312+
return
313+
}
314+
}
315+
316+
// Check if all heartbeat READY assignments are in local assignments
317+
for shardID, assignment := range heartbeatAssignments {
318+
if assignment.Status == types.AssignmentStatusREADY {
319+
if !localAssignments[shardID] {
320+
e.logger.Warn("assignment divergence: heartbeat shard not in local",
321+
tag.Dynamic("shard-id", shardID))
322+
e.emitMetricsConvergence(false)
323+
return
324+
}
325+
}
326+
}
327+
328+
e.emitMetricsConvergence(true)
329+
}
330+
331+
func (e *executorImpl[SP]) emitMetricsConvergence(converged bool) {
332+
if converged {
333+
e.metrics.Counter(metricsconstants.ShardDistributorExecutorAssignmentConvergence).Inc(1)
334+
} else {
335+
e.metrics.Counter(metricsconstants.ShardDistributorExecutorAssignmentDivergence).Inc(1)
336+
}
337+
}
338+
249339
func getJitteredHeartbeatDuration(interval time.Duration, jitterMax time.Duration) time.Duration {
250340
jitterMaxNanos := int64(jitterMax)
251341
randomJitterNanos := rand.Int63n(jitterMaxNanos)

0 commit comments

Comments
 (0)