diff --git a/.github/workflows/replication-simulation.yml b/.github/workflows/replication-simulation.yml index e08c2bbda12..686af2af24c 100644 --- a/.github/workflows/replication-simulation.yml +++ b/.github/workflows/replication-simulation.yml @@ -14,6 +14,7 @@ jobs: matrix: scenario: - activeactive + - activeactive_child - activeactive_same_wfid - activeactive_same_wfid_signalwithstart # TODO(active-active): Re-enable this scenario once we have fixed the auto-forwarding issue diff --git a/config/dynamicconfig/replication_simulation_activeactive_child.yml b/config/dynamicconfig/replication_simulation_activeactive_child.yml new file mode 100644 index 00000000000..2ec8a252828 --- /dev/null +++ b/config/dynamicconfig/replication_simulation_activeactive_child.yml @@ -0,0 +1,22 @@ +# This file is used as dynamicconfig override for "activeactive" replication simulation scenario configured via simulation/replication/testdata/replication_simulation_activeactive.yaml +system.writeVisibilityStoreName: + - value: "db" +system.readVisibilityStoreName: + - value: "db" +history.replicatorTaskBatchSize: + - value: 25 + constraints: {} +frontend.failoverCoolDown: + - value: 5s +history.ReplicationTaskProcessorStartWait: # default is 5s. repl task processor sleeps this much before processing received messages. + - value: 10ms +history.standbyTaskMissingEventsResendDelay: + - value: 5s +history.standbyTaskMissingEventsDiscardDelay: + - value: 10s +history.standbyClusterDelay: + - value: 10s +history.enableTransferQueueV2: + - value: true +history.enableTimerQueueV2: + - value: true diff --git a/service/history/task/transfer_active_task_executor.go b/service/history/task/transfer_active_task_executor.go index f7e408ac025..28570a44b14 100644 --- a/service/history/task/transfer_active_task_executor.go +++ b/service/history/task/transfer_active_task_executor.go @@ -852,6 +852,7 @@ func (t *transferActiveTaskExecutor) processStartChildExecution( childInfo.CreateRequestID, attributes, mutableState.GetExecutionInfo().PartitionConfig, + mutableState.GetExecutionInfo().ActiveClusterSelectionPolicy, ) if err != nil { @@ -1620,6 +1621,7 @@ func startWorkflowWithRetry( requestID string, attributes *types.StartChildWorkflowExecutionInitiatedEventAttributes, partitionConfig map[string]string, + activeClusterSelectionPolicy *types.ActiveClusterSelectionPolicy, ) (string, error) { // Get parent domain name @@ -1652,7 +1654,7 @@ func startWorkflowWithRetry( DelayStartSeconds: attributes.DelayStartSeconds, JitterStartSeconds: attributes.JitterStartSeconds, FirstRunAtTimeStamp: attributes.FirstRunAtTimestamp, - ActiveClusterSelectionPolicy: attributes.ActiveClusterSelectionPolicy, + ActiveClusterSelectionPolicy: activeClusterSelectionPolicy, } historyStartReq, err := common.CreateHistoryStartWorkflowRequest(task.TargetDomainID, frontendStartReq, timeSource.Now(), partitionConfig) diff --git a/service/history/task/transfer_active_task_executor_test.go b/service/history/task/transfer_active_task_executor_test.go index 1b98bc37ae4..6d2d7a69a27 100644 --- a/service/history/task/transfer_active_task_executor_test.go +++ b/service/history/task/transfer_active_task_executor_test.go @@ -1522,6 +1522,7 @@ func (s *transferActiveTaskExecutorSuite) TestProcessStartChildExecution_Success childInfo.CreateRequestID, s.mockShard.GetTimeSource().Now(), mutableState.GetExecutionInfo().PartitionConfig, + mutableState.GetExecutionInfo().ActiveClusterSelectionPolicy, ) require.NoError(s.T(), err) s.mockHistoryClient.EXPECT().StartWorkflowExecution(gomock.Any(), historyReq).Return(&types.StartWorkflowExecutionResponse{RunID: childExecution.RunID}, nil).Times(1) @@ -1563,6 +1564,7 @@ func (s *transferActiveTaskExecutorSuite) TestProcessStartChildExecution_Failure childInfo.CreateRequestID, s.mockShard.GetTimeSource().Now(), mutableState.GetExecutionInfo().PartitionConfig, + mutableState.GetExecutionInfo().ActiveClusterSelectionPolicy, ) require.NoError(s.T(), err) s.mockHistoryClient.EXPECT().StartWorkflowExecution(gomock.Any(), historyReq).Return(nil, &types.WorkflowExecutionAlreadyStartedError{}).Times(1) @@ -2358,6 +2360,7 @@ func createTestChildWorkflowExecutionRequest( requestID string, now time.Time, partitionConfig map[string]string, + activeClusterSelectionPolicy *types.ActiveClusterSelectionPolicy, ) (*types.HistoryStartWorkflowExecutionRequest, error) { workflowExecution := types.WorkflowExecution{ @@ -2373,9 +2376,10 @@ func createTestChildWorkflowExecutionRequest( ExecutionStartToCloseTimeoutSeconds: attributes.ExecutionStartToCloseTimeoutSeconds, TaskStartToCloseTimeoutSeconds: attributes.TaskStartToCloseTimeoutSeconds, // Use the same request ID to dedupe StartWorkflowExecution calls - RequestID: requestID, - WorkflowIDReusePolicy: attributes.WorkflowIDReusePolicy, - RetryPolicy: attributes.RetryPolicy, + RequestID: requestID, + WorkflowIDReusePolicy: attributes.WorkflowIDReusePolicy, + RetryPolicy: attributes.RetryPolicy, + ActiveClusterSelectionPolicy: activeClusterSelectionPolicy, } parentInfo := &types.ParentExecutionInfo{ diff --git a/simulation/replication/replication_simulation_test.go b/simulation/replication/replication_simulation_test.go index d93eb179efa..d5a46267b73 100644 --- a/simulation/replication/replication_simulation_test.go +++ b/simulation/replication/replication_simulation_test.go @@ -140,6 +140,12 @@ func startWorkflow( return fmt.Errorf("workflow execution start to close timeout must be specified and should be greater than workflow duration") } + input := mustJSON(t, &simTypes.WorkflowInput{ + Duration: op.WorkflowDuration, + ActivityCount: op.ActivityCount, + ChildWorkflowID: op.ChildWorkflowID, + ChildWorkflowTimeout: op.ChildWorkflowTimeout, + }) resp, err := simCfg.MustGetFrontendClient(t, op.Cluster).StartWorkflowExecution(ctx, &types.StartWorkflowExecutionRequest{ RequestID: uuid.New(), @@ -149,7 +155,7 @@ func startWorkflow( TaskList: &types.TaskList{Name: simTypes.TasklistName}, ExecutionStartToCloseTimeoutSeconds: common.Int32Ptr(int32((op.WorkflowExecutionStartToCloseTimeout).Seconds())), TaskStartToCloseTimeoutSeconds: common.Int32Ptr(5), - Input: mustJSON(t, &simTypes.WorkflowInput{Duration: op.WorkflowDuration, ActivityCount: op.ActivityCount}), + Input: input, WorkflowIDReusePolicy: types.WorkflowIDReusePolicyAllowDuplicate.Ptr(), DelayStartSeconds: common.Int32Ptr(op.DelayStartSeconds), CronSchedule: op.CronSchedule, diff --git a/simulation/replication/testdata/replication_simulation_activeactive_child.yaml b/simulation/replication/testdata/replication_simulation_activeactive_child.yaml new file mode 100644 index 00000000000..87cb1c6e2b2 --- /dev/null +++ b/simulation/replication/testdata/replication_simulation_activeactive_child.yaml @@ -0,0 +1,98 @@ +# This file is a replication simulation scenario spec. +# It is parsed into ReplicationSimulationConfig struct. +# Replication simulation for this file can be run via ./simulation/replication/run.sh --scenario activeactive_child +# Dynamic config overrides can be set via config/dynamicconfig/replication_simulation_activeactive_child.yml +# Tests that active-active allows workflows to be created in multiple clusters + +clusters: + cluster0: + grpcEndpoint: "cadence-cluster0:7833" + cluster1: + grpcEndpoint: "cadence-cluster1:7833" + +# primaryCluster is where domain data is written to and replicates to others. e.g. domain registration +primaryCluster: "cluster0" + +domains: + test-domain-aa: + activeClusterName: cluster0 + activeClustersByRegion: + region0: cluster0 + region1: cluster1 + clusterAttributes: + region: + region0: cluster0 + region1: cluster1 + +operations: + # start workflow in cluster0 + - op: start_workflow + at: 0s + workflowID: wf1 + workflowType: child-activity-loop-workflow + cluster: cluster0 + childWorkflowID: wf1-child + childWorkflowTimeout: 65s + domain: test-domain-aa + workflowExecutionStartToCloseTimeout: 65s + workflowDuration: 35s + activeClusterSelectionPolicy: + clusterAttribute: + scope: region + name: region0 + + # start workflow in cluster1 + - op: start_workflow + at: 0s + workflowID: wf2 + workflowType: child-activity-loop-workflow + cluster: cluster1 + childWorkflowID: wf2-child + childWorkflowTimeout: 65s + domain: test-domain-aa + workflowExecutionStartToCloseTimeout: 65s + workflowDuration: 35s + activeClusterSelectionPolicy: + clusterAttribute: + scope: region + name: region1 + + # validate that wf1 is started in cluster0 and completed in cluster1 + - op: validate + at: 70s + workflowID: wf1-child + cluster: cluster0 + domain: test-domain-aa + want: + status: completed + startedByWorkersInCluster: cluster0 + completedByWorkersInCluster: cluster0 + - op: validate + at: 70s + workflowID: wf1 + cluster: cluster0 + domain: test-domain-aa + want: + status: completed + startedByWorkersInCluster: cluster0 + completedByWorkersInCluster: cluster0 + + # validate that wf2 is started and completed in cluster1 + - op: validate + at: 70s + workflowID: wf2-child + cluster: cluster1 + domain: test-domain-aa + want: + status: completed + startedByWorkersInCluster: cluster1 + completedByWorkersInCluster: cluster1 + - op: validate + at: 70s + workflowID: wf2 + cluster: cluster1 + domain: test-domain-aa + want: + status: completed + startedByWorkersInCluster: cluster1 + completedByWorkersInCluster: cluster1 diff --git a/simulation/replication/types/repl_sim_config.go b/simulation/replication/types/repl_sim_config.go index 2366b196333..9fc9cd6838f 100644 --- a/simulation/replication/types/repl_sim_config.go +++ b/simulation/replication/types/repl_sim_config.go @@ -93,6 +93,8 @@ type Operation struct { WorkflowID string `yaml:"workflowID"` WorkflowExecutionStartToCloseTimeout time.Duration `yaml:"workflowExecutionStartToCloseTimeout"` WorkflowDuration time.Duration `yaml:"workflowDuration"` + ChildWorkflowID string `yaml:"childWorkflowID"` + ChildWorkflowTimeout time.Duration `yaml:"childWorkflowTimeout"` ActivityCount int `yaml:"activityCount"` DelayStartSeconds int32 `yaml:"delayStartSeconds"` CronSchedule string `yaml:"cronSchedule"` diff --git a/simulation/replication/types/types.go b/simulation/replication/types/types.go index 6a8d249c623..85738834de9 100644 --- a/simulation/replication/types/types.go +++ b/simulation/replication/types/types.go @@ -36,8 +36,10 @@ const ( type OperationFunction func(t *testing.T, op *Operation, simCfg *ReplicationSimulationConfig) error type WorkflowInput struct { - Duration time.Duration - ActivityCount int + Duration time.Duration + ActivityCount int + ChildWorkflowID string + ChildWorkflowTimeout time.Duration } type WorkflowOutput struct { diff --git a/simulation/replication/workflows/childactivityloop/workflow.go b/simulation/replication/workflows/childactivityloop/workflow.go new file mode 100644 index 00000000000..af833cd4fd4 --- /dev/null +++ b/simulation/replication/workflows/childactivityloop/workflow.go @@ -0,0 +1,26 @@ +package childactivityloop + +import ( + "go.uber.org/cadence/workflow" + + "github.com/uber/cadence/simulation/replication/types" +) + +func Workflow(ctx workflow.Context, input types.WorkflowInput) (types.WorkflowOutput, error) { + logger := workflow.GetLogger(ctx) + logger.Sugar().Infof("child-activity-loop-workflow started with input: %+v", input) + + cwo := workflow.ChildWorkflowOptions{ + WorkflowID: input.ChildWorkflowID, + ExecutionStartToCloseTimeout: input.ChildWorkflowTimeout, + } + ctx = workflow.WithChildOptions(ctx, cwo) + var output types.WorkflowOutput + err := workflow.ExecuteChildWorkflow(ctx, "timer-activity-loop-workflow", input).Get(ctx, &output) + if err != nil { + logger.Sugar().Errorf("failed to execute child workflow: %v", err) + return types.WorkflowOutput{}, err + } + + return output, nil +} diff --git a/simulation/replication/workflows/workflows.go b/simulation/replication/workflows/workflows.go index 6ed1c74a467..8664911f12d 100644 --- a/simulation/replication/workflows/workflows.go +++ b/simulation/replication/workflows/workflows.go @@ -2,6 +2,7 @@ package workflows import ( "github.com/uber/cadence/simulation/replication/workflows/activityloop" + "github.com/uber/cadence/simulation/replication/workflows/childactivityloop" "github.com/uber/cadence/simulation/replication/workflows/query" "github.com/uber/cadence/simulation/replication/workflows/timeractivityloop" ) @@ -14,6 +15,7 @@ var ( "timer-activity-loop-workflow": timeractivityloop.Workflow, "activity-loop-workflow": activityloop.Workflow, "query-workflow": queryWFRunner.Workflow, + "child-activity-loop-workflow": childactivityloop.Workflow, } } Activities = map[string]any{