Skip to content

Commit b69dd78

Browse files
authored
fix(active-active): Fix auto-forwarding for active-active domains (#7356)
<!-- Describe what has changed in this PR --> **What changed?** - Update active cluster manager with a new method to get current workflow state and active cluster selection policy - Fix auto-forwarding for active-active domains - Add new test case to simulation tests <!-- Tell your future self why have you made these changes --> **Why?** Bug fix <!-- How have you verified this change? Tested locally? Added a unit test? Checked in staging env? --> **How did you test it?** unit tests && simulation tests <!-- Assuming the worst case, what can be broken when deploying this change to production? --> **Potential risks** <!-- Is it notable for release? e.g. schema updates, configuration or data migration required? If so, please mention it, and also update CHANGELOG.md --> **Release notes** <!-- Is there any documentation updates should be made for config, https://cadenceworkflow.io/docs/operation-guide/setup/ ? If so, please open an PR in https://github.com/cadence-workflow/cadence-docs --> **Documentation Changes**
1 parent aafa165 commit b69dd78

File tree

14 files changed

+647
-31
lines changed

14 files changed

+647
-31
lines changed

.github/workflows/replication-simulation.yml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,10 @@ jobs:
1515
scenario:
1616
- activeactive
1717
- activeactive_child
18+
- activeactive_invalid_cluster_attribute
1819
- activeactive_same_wfid
1920
- activeactive_same_wfid_signalwithstart
20-
# TODO(active-active): Re-enable this scenario once we have fixed the auto-forwarding issue
21-
# - activeactive_same_wfid_signalwithstart_delayed
21+
- activeactive_same_wfid_signalwithstart_delayed
2222
- activeactive_cron
2323
- activeactive_regional_failover
2424
- activeactive_regional_failover_start_same_wfid

common/activecluster/manager.go

Lines changed: 44 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -40,12 +40,13 @@ import (
4040
type DomainIDToDomainFn func(id string) (*cache.DomainCacheEntry, error)
4141

4242
const (
43-
LookupNewWorkflowOpName = "LookupNewWorkflow"
44-
LookupWorkflowOpName = "LookupWorkflow"
45-
GetActiveClusterInfoByClusterAttributeOpName = "GetActiveClusterInfoByClusterAttribute"
46-
GetActiveClusterInfoByWorkflowOpName = "GetActiveClusterInfoByWorkflow"
47-
GetActiveClusterSelectionPolicyForWorkflowOpName = "GetActiveClusterSelectionPolicyForWorkflow"
48-
DomainIDToDomainFnErrorReason = "domain_id_to_name_fn_error"
43+
LookupNewWorkflowOpName = "LookupNewWorkflow"
44+
LookupWorkflowOpName = "LookupWorkflow"
45+
GetActiveClusterInfoByClusterAttributeOpName = "GetActiveClusterInfoByClusterAttribute"
46+
GetActiveClusterInfoByWorkflowOpName = "GetActiveClusterInfoByWorkflow"
47+
GetActiveClusterSelectionPolicyForWorkflowOpName = "GetActiveClusterSelectionPolicyForWorkflow"
48+
GetActiveClusterSelectionPolicyForCurrentWorkflowOpName = "GetActiveClusterSelectionPolicyForCurrentWorkflow"
49+
DomainIDToDomainFnErrorReason = "domain_id_to_name_fn_error"
4950

5051
workflowPolicyCacheTTL = 10 * time.Second
5152
workflowPolicyCacheMaxCount = 1000
@@ -260,3 +261,40 @@ func (m *managerImpl) GetActiveClusterSelectionPolicyForWorkflow(ctx context.Con
260261
}
261262
return plcy, nil
262263
}
264+
265+
func (m *managerImpl) GetActiveClusterSelectionPolicyForCurrentWorkflow(ctx context.Context, domainID, wfID string) (res *types.ActiveClusterSelectionPolicy, running bool, e error) {
266+
d, scope, err := m.getDomainAndScope(domainID, GetActiveClusterSelectionPolicyForCurrentWorkflowOpName)
267+
if err != nil {
268+
return nil, false, err
269+
}
270+
defer m.handleError(scope, &e, time.Now())
271+
if !d.GetReplicationConfig().IsActiveActive() {
272+
// Not an active-active domain. return nil
273+
m.logger.Debug("GetActiveClusterSelectionPolicyForCurrentWorkflow: not an active-active domain. returning nil",
274+
tag.WorkflowDomainID(domainID),
275+
tag.WorkflowID(wfID),
276+
)
277+
return nil, false, nil
278+
}
279+
280+
shardID := common.WorkflowIDToHistoryShard(wfID, m.numShards)
281+
executionManager, err := m.executionManagerProvider.GetExecutionManager(shardID)
282+
if err != nil {
283+
return nil, false, err
284+
}
285+
execution, err := executionManager.GetCurrentExecution(ctx, &persistence.GetCurrentExecutionRequest{
286+
DomainID: domainID,
287+
WorkflowID: wfID,
288+
})
289+
if err != nil {
290+
return nil, false, err
291+
}
292+
if persistence.IsWorkflowRunning(execution.State) {
293+
policy, err := m.getClusterSelectionPolicy(ctx, domainID, wfID, execution.RunID)
294+
if err != nil {
295+
return nil, false, err
296+
}
297+
return policy, true, nil
298+
}
299+
return nil, false, nil
300+
}

common/activecluster/manager_mock.go

Lines changed: 16 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)