Skip to content

Commit 2648215

Browse files
authored
fix(active-active): Child workflows should always use parent's active cluster policy (#7348)
<!-- Describe what has changed in this PR --> **What changed?** Update child workflow to use parent's active cluster selection policy <!-- Tell your future self why have you made these changes --> **Why?** Child workflow should have the same active cluster selection policy as parent workflow. <!-- How have you verified this change? Tested locally? Added a unit test? Checked in staging env? --> **How did you test it?** Simulation tests <!-- Assuming the worst case, what can be broken when deploying this change to production? --> **Potential risks** <!-- Is it notable for release? e.g. schema updates, configuration or data migration required? If so, please mention it, and also update CHANGELOG.md --> **Release notes** <!-- Is there any documentation updates should be made for config, https://cadenceworkflow.io/docs/operation-guide/setup/ ? If so, please open an PR in https://github.com/cadence-workflow/cadence-docs --> **Documentation Changes**
1 parent 2a4f120 commit 2648215

File tree

10 files changed

+172
-7
lines changed

10 files changed

+172
-7
lines changed

.github/workflows/replication-simulation.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ jobs:
1414
matrix:
1515
scenario:
1616
- activeactive
17+
- activeactive_child
1718
- activeactive_same_wfid
1819
- activeactive_same_wfid_signalwithstart
1920
# TODO(active-active): Re-enable this scenario once we have fixed the auto-forwarding issue
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
# This file is used as dynamicconfig override for "activeactive" replication simulation scenario configured via simulation/replication/testdata/replication_simulation_activeactive.yaml
2+
system.writeVisibilityStoreName:
3+
- value: "db"
4+
system.readVisibilityStoreName:
5+
- value: "db"
6+
history.replicatorTaskBatchSize:
7+
- value: 25
8+
constraints: {}
9+
frontend.failoverCoolDown:
10+
- value: 5s
11+
history.ReplicationTaskProcessorStartWait: # default is 5s. repl task processor sleeps this much before processing received messages.
12+
- value: 10ms
13+
history.standbyTaskMissingEventsResendDelay:
14+
- value: 5s
15+
history.standbyTaskMissingEventsDiscardDelay:
16+
- value: 10s
17+
history.standbyClusterDelay:
18+
- value: 10s
19+
history.enableTransferQueueV2:
20+
- value: true
21+
history.enableTimerQueueV2:
22+
- value: true

service/history/task/transfer_active_task_executor.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -852,6 +852,7 @@ func (t *transferActiveTaskExecutor) processStartChildExecution(
852852
childInfo.CreateRequestID,
853853
attributes,
854854
mutableState.GetExecutionInfo().PartitionConfig,
855+
mutableState.GetExecutionInfo().ActiveClusterSelectionPolicy,
855856
)
856857
if err != nil {
857858

@@ -1620,6 +1621,7 @@ func startWorkflowWithRetry(
16201621
requestID string,
16211622
attributes *types.StartChildWorkflowExecutionInitiatedEventAttributes,
16221623
partitionConfig map[string]string,
1624+
activeClusterSelectionPolicy *types.ActiveClusterSelectionPolicy,
16231625
) (string, error) {
16241626

16251627
// Get parent domain name
@@ -1652,7 +1654,7 @@ func startWorkflowWithRetry(
16521654
DelayStartSeconds: attributes.DelayStartSeconds,
16531655
JitterStartSeconds: attributes.JitterStartSeconds,
16541656
FirstRunAtTimeStamp: attributes.FirstRunAtTimestamp,
1655-
ActiveClusterSelectionPolicy: attributes.ActiveClusterSelectionPolicy,
1657+
ActiveClusterSelectionPolicy: activeClusterSelectionPolicy,
16561658
}
16571659

16581660
historyStartReq, err := common.CreateHistoryStartWorkflowRequest(task.TargetDomainID, frontendStartReq, timeSource.Now(), partitionConfig)

service/history/task/transfer_active_task_executor_test.go

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1522,6 +1522,7 @@ func (s *transferActiveTaskExecutorSuite) TestProcessStartChildExecution_Success
15221522
childInfo.CreateRequestID,
15231523
s.mockShard.GetTimeSource().Now(),
15241524
mutableState.GetExecutionInfo().PartitionConfig,
1525+
mutableState.GetExecutionInfo().ActiveClusterSelectionPolicy,
15251526
)
15261527
require.NoError(s.T(), err)
15271528
s.mockHistoryClient.EXPECT().StartWorkflowExecution(gomock.Any(), historyReq).Return(&types.StartWorkflowExecutionResponse{RunID: childExecution.RunID}, nil).Times(1)
@@ -1563,6 +1564,7 @@ func (s *transferActiveTaskExecutorSuite) TestProcessStartChildExecution_Failure
15631564
childInfo.CreateRequestID,
15641565
s.mockShard.GetTimeSource().Now(),
15651566
mutableState.GetExecutionInfo().PartitionConfig,
1567+
mutableState.GetExecutionInfo().ActiveClusterSelectionPolicy,
15661568
)
15671569
require.NoError(s.T(), err)
15681570
s.mockHistoryClient.EXPECT().StartWorkflowExecution(gomock.Any(), historyReq).Return(nil, &types.WorkflowExecutionAlreadyStartedError{}).Times(1)
@@ -2358,6 +2360,7 @@ func createTestChildWorkflowExecutionRequest(
23582360
requestID string,
23592361
now time.Time,
23602362
partitionConfig map[string]string,
2363+
activeClusterSelectionPolicy *types.ActiveClusterSelectionPolicy,
23612364
) (*types.HistoryStartWorkflowExecutionRequest, error) {
23622365

23632366
workflowExecution := types.WorkflowExecution{
@@ -2373,9 +2376,10 @@ func createTestChildWorkflowExecutionRequest(
23732376
ExecutionStartToCloseTimeoutSeconds: attributes.ExecutionStartToCloseTimeoutSeconds,
23742377
TaskStartToCloseTimeoutSeconds: attributes.TaskStartToCloseTimeoutSeconds,
23752378
// Use the same request ID to dedupe StartWorkflowExecution calls
2376-
RequestID: requestID,
2377-
WorkflowIDReusePolicy: attributes.WorkflowIDReusePolicy,
2378-
RetryPolicy: attributes.RetryPolicy,
2379+
RequestID: requestID,
2380+
WorkflowIDReusePolicy: attributes.WorkflowIDReusePolicy,
2381+
RetryPolicy: attributes.RetryPolicy,
2382+
ActiveClusterSelectionPolicy: activeClusterSelectionPolicy,
23792383
}
23802384

23812385
parentInfo := &types.ParentExecutionInfo{

simulation/replication/replication_simulation_test.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -140,6 +140,12 @@ func startWorkflow(
140140
return fmt.Errorf("workflow execution start to close timeout must be specified and should be greater than workflow duration")
141141
}
142142

143+
input := mustJSON(t, &simTypes.WorkflowInput{
144+
Duration: op.WorkflowDuration,
145+
ActivityCount: op.ActivityCount,
146+
ChildWorkflowID: op.ChildWorkflowID,
147+
ChildWorkflowTimeout: op.ChildWorkflowTimeout,
148+
})
143149
resp, err := simCfg.MustGetFrontendClient(t, op.Cluster).StartWorkflowExecution(ctx,
144150
&types.StartWorkflowExecutionRequest{
145151
RequestID: uuid.New(),
@@ -149,7 +155,7 @@ func startWorkflow(
149155
TaskList: &types.TaskList{Name: simTypes.TasklistName},
150156
ExecutionStartToCloseTimeoutSeconds: common.Int32Ptr(int32((op.WorkflowExecutionStartToCloseTimeout).Seconds())),
151157
TaskStartToCloseTimeoutSeconds: common.Int32Ptr(5),
152-
Input: mustJSON(t, &simTypes.WorkflowInput{Duration: op.WorkflowDuration, ActivityCount: op.ActivityCount}),
158+
Input: input,
153159
WorkflowIDReusePolicy: types.WorkflowIDReusePolicyAllowDuplicate.Ptr(),
154160
DelayStartSeconds: common.Int32Ptr(op.DelayStartSeconds),
155161
CronSchedule: op.CronSchedule,
Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
# This file is a replication simulation scenario spec.
2+
# It is parsed into ReplicationSimulationConfig struct.
3+
# Replication simulation for this file can be run via ./simulation/replication/run.sh --scenario activeactive_child
4+
# Dynamic config overrides can be set via config/dynamicconfig/replication_simulation_activeactive_child.yml
5+
# Tests that active-active allows workflows to be created in multiple clusters
6+
7+
clusters:
8+
cluster0:
9+
grpcEndpoint: "cadence-cluster0:7833"
10+
cluster1:
11+
grpcEndpoint: "cadence-cluster1:7833"
12+
13+
# primaryCluster is where domain data is written to and replicates to others. e.g. domain registration
14+
primaryCluster: "cluster0"
15+
16+
domains:
17+
test-domain-aa:
18+
activeClusterName: cluster0
19+
activeClustersByRegion:
20+
region0: cluster0
21+
region1: cluster1
22+
clusterAttributes:
23+
region:
24+
region0: cluster0
25+
region1: cluster1
26+
27+
operations:
28+
# start workflow in cluster0
29+
- op: start_workflow
30+
at: 0s
31+
workflowID: wf1
32+
workflowType: child-activity-loop-workflow
33+
cluster: cluster0
34+
childWorkflowID: wf1-child
35+
childWorkflowTimeout: 65s
36+
domain: test-domain-aa
37+
workflowExecutionStartToCloseTimeout: 65s
38+
workflowDuration: 35s
39+
activeClusterSelectionPolicy:
40+
clusterAttribute:
41+
scope: region
42+
name: region0
43+
44+
# start workflow in cluster1
45+
- op: start_workflow
46+
at: 0s
47+
workflowID: wf2
48+
workflowType: child-activity-loop-workflow
49+
cluster: cluster1
50+
childWorkflowID: wf2-child
51+
childWorkflowTimeout: 65s
52+
domain: test-domain-aa
53+
workflowExecutionStartToCloseTimeout: 65s
54+
workflowDuration: 35s
55+
activeClusterSelectionPolicy:
56+
clusterAttribute:
57+
scope: region
58+
name: region1
59+
60+
# validate that wf1 is started in cluster0 and completed in cluster1
61+
- op: validate
62+
at: 70s
63+
workflowID: wf1-child
64+
cluster: cluster0
65+
domain: test-domain-aa
66+
want:
67+
status: completed
68+
startedByWorkersInCluster: cluster0
69+
completedByWorkersInCluster: cluster0
70+
- op: validate
71+
at: 70s
72+
workflowID: wf1
73+
cluster: cluster0
74+
domain: test-domain-aa
75+
want:
76+
status: completed
77+
startedByWorkersInCluster: cluster0
78+
completedByWorkersInCluster: cluster0
79+
80+
# validate that wf2 is started and completed in cluster1
81+
- op: validate
82+
at: 70s
83+
workflowID: wf2-child
84+
cluster: cluster1
85+
domain: test-domain-aa
86+
want:
87+
status: completed
88+
startedByWorkersInCluster: cluster1
89+
completedByWorkersInCluster: cluster1
90+
- op: validate
91+
at: 70s
92+
workflowID: wf2
93+
cluster: cluster1
94+
domain: test-domain-aa
95+
want:
96+
status: completed
97+
startedByWorkersInCluster: cluster1
98+
completedByWorkersInCluster: cluster1

simulation/replication/types/repl_sim_config.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,8 @@ type Operation struct {
9393
WorkflowID string `yaml:"workflowID"`
9494
WorkflowExecutionStartToCloseTimeout time.Duration `yaml:"workflowExecutionStartToCloseTimeout"`
9595
WorkflowDuration time.Duration `yaml:"workflowDuration"`
96+
ChildWorkflowID string `yaml:"childWorkflowID"`
97+
ChildWorkflowTimeout time.Duration `yaml:"childWorkflowTimeout"`
9698
ActivityCount int `yaml:"activityCount"`
9799
DelayStartSeconds int32 `yaml:"delayStartSeconds"`
98100
CronSchedule string `yaml:"cronSchedule"`

simulation/replication/types/types.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,8 +36,10 @@ const (
3636
type OperationFunction func(t *testing.T, op *Operation, simCfg *ReplicationSimulationConfig) error
3737

3838
type WorkflowInput struct {
39-
Duration time.Duration
40-
ActivityCount int
39+
Duration time.Duration
40+
ActivityCount int
41+
ChildWorkflowID string
42+
ChildWorkflowTimeout time.Duration
4143
}
4244

4345
type WorkflowOutput struct {
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
package childactivityloop
2+
3+
import (
4+
"go.uber.org/cadence/workflow"
5+
6+
"github.com/uber/cadence/simulation/replication/types"
7+
)
8+
9+
func Workflow(ctx workflow.Context, input types.WorkflowInput) (types.WorkflowOutput, error) {
10+
logger := workflow.GetLogger(ctx)
11+
logger.Sugar().Infof("child-activity-loop-workflow started with input: %+v", input)
12+
13+
cwo := workflow.ChildWorkflowOptions{
14+
WorkflowID: input.ChildWorkflowID,
15+
ExecutionStartToCloseTimeout: input.ChildWorkflowTimeout,
16+
}
17+
ctx = workflow.WithChildOptions(ctx, cwo)
18+
var output types.WorkflowOutput
19+
err := workflow.ExecuteChildWorkflow(ctx, "timer-activity-loop-workflow", input).Get(ctx, &output)
20+
if err != nil {
21+
logger.Sugar().Errorf("failed to execute child workflow: %v", err)
22+
return types.WorkflowOutput{}, err
23+
}
24+
25+
return output, nil
26+
}

simulation/replication/workflows/workflows.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package workflows
22

33
import (
44
"github.com/uber/cadence/simulation/replication/workflows/activityloop"
5+
"github.com/uber/cadence/simulation/replication/workflows/childactivityloop"
56
"github.com/uber/cadence/simulation/replication/workflows/query"
67
"github.com/uber/cadence/simulation/replication/workflows/timeractivityloop"
78
)
@@ -14,6 +15,7 @@ var (
1415
"timer-activity-loop-workflow": timeractivityloop.Workflow,
1516
"activity-loop-workflow": activityloop.Workflow,
1617
"query-workflow": queryWFRunner.Workflow,
18+
"child-activity-loop-workflow": childactivityloop.Workflow,
1719
}
1820
}
1921
Activities = map[string]any{

0 commit comments

Comments
 (0)