Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/replication-simulation.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ jobs:
matrix:
scenario:
- activeactive
- activeactive_child
- activeactive_same_wfid
- activeactive_same_wfid_signalwithstart
# TODO(active-active): Re-enable this scenario once we have fixed the auto-forwarding issue
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
# This file is used as dynamicconfig override for "activeactive" replication simulation scenario configured via simulation/replication/testdata/replication_simulation_activeactive.yaml
system.writeVisibilityStoreName:
- value: "db"
system.readVisibilityStoreName:
- value: "db"
history.replicatorTaskBatchSize:
- value: 25
constraints: {}
frontend.failoverCoolDown:
- value: 5s
history.ReplicationTaskProcessorStartWait: # default is 5s. repl task processor sleeps this much before processing received messages.
- value: 10ms
history.standbyTaskMissingEventsResendDelay:
- value: 5s
history.standbyTaskMissingEventsDiscardDelay:
- value: 10s
history.standbyClusterDelay:
- value: 10s
history.enableTransferQueueV2:
- value: true
history.enableTimerQueueV2:
- value: true
4 changes: 3 additions & 1 deletion service/history/task/transfer_active_task_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -852,6 +852,7 @@ func (t *transferActiveTaskExecutor) processStartChildExecution(
childInfo.CreateRequestID,
attributes,
mutableState.GetExecutionInfo().PartitionConfig,
mutableState.GetExecutionInfo().ActiveClusterSelectionPolicy,
)
if err != nil {

Expand Down Expand Up @@ -1620,6 +1621,7 @@ func startWorkflowWithRetry(
requestID string,
attributes *types.StartChildWorkflowExecutionInitiatedEventAttributes,
partitionConfig map[string]string,
activeClusterSelectionPolicy *types.ActiveClusterSelectionPolicy,
) (string, error) {

// Get parent domain name
Expand Down Expand Up @@ -1652,7 +1654,7 @@ func startWorkflowWithRetry(
DelayStartSeconds: attributes.DelayStartSeconds,
JitterStartSeconds: attributes.JitterStartSeconds,
FirstRunAtTimeStamp: attributes.FirstRunAtTimestamp,
ActiveClusterSelectionPolicy: attributes.ActiveClusterSelectionPolicy,
ActiveClusterSelectionPolicy: activeClusterSelectionPolicy,
}

historyStartReq, err := common.CreateHistoryStartWorkflowRequest(task.TargetDomainID, frontendStartReq, timeSource.Now(), partitionConfig)
Expand Down
10 changes: 7 additions & 3 deletions service/history/task/transfer_active_task_executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1522,6 +1522,7 @@ func (s *transferActiveTaskExecutorSuite) TestProcessStartChildExecution_Success
childInfo.CreateRequestID,
s.mockShard.GetTimeSource().Now(),
mutableState.GetExecutionInfo().PartitionConfig,
mutableState.GetExecutionInfo().ActiveClusterSelectionPolicy,
)
require.NoError(s.T(), err)
s.mockHistoryClient.EXPECT().StartWorkflowExecution(gomock.Any(), historyReq).Return(&types.StartWorkflowExecutionResponse{RunID: childExecution.RunID}, nil).Times(1)
Expand Down Expand Up @@ -1563,6 +1564,7 @@ func (s *transferActiveTaskExecutorSuite) TestProcessStartChildExecution_Failure
childInfo.CreateRequestID,
s.mockShard.GetTimeSource().Now(),
mutableState.GetExecutionInfo().PartitionConfig,
mutableState.GetExecutionInfo().ActiveClusterSelectionPolicy,
)
require.NoError(s.T(), err)
s.mockHistoryClient.EXPECT().StartWorkflowExecution(gomock.Any(), historyReq).Return(nil, &types.WorkflowExecutionAlreadyStartedError{}).Times(1)
Expand Down Expand Up @@ -2358,6 +2360,7 @@ func createTestChildWorkflowExecutionRequest(
requestID string,
now time.Time,
partitionConfig map[string]string,
activeClusterSelectionPolicy *types.ActiveClusterSelectionPolicy,
) (*types.HistoryStartWorkflowExecutionRequest, error) {

workflowExecution := types.WorkflowExecution{
Expand All @@ -2373,9 +2376,10 @@ func createTestChildWorkflowExecutionRequest(
ExecutionStartToCloseTimeoutSeconds: attributes.ExecutionStartToCloseTimeoutSeconds,
TaskStartToCloseTimeoutSeconds: attributes.TaskStartToCloseTimeoutSeconds,
// Use the same request ID to dedupe StartWorkflowExecution calls
RequestID: requestID,
WorkflowIDReusePolicy: attributes.WorkflowIDReusePolicy,
RetryPolicy: attributes.RetryPolicy,
RequestID: requestID,
WorkflowIDReusePolicy: attributes.WorkflowIDReusePolicy,
RetryPolicy: attributes.RetryPolicy,
ActiveClusterSelectionPolicy: activeClusterSelectionPolicy,
}

parentInfo := &types.ParentExecutionInfo{
Expand Down
8 changes: 7 additions & 1 deletion simulation/replication/replication_simulation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,12 @@ func startWorkflow(
return fmt.Errorf("workflow execution start to close timeout must be specified and should be greater than workflow duration")
}

input := mustJSON(t, &simTypes.WorkflowInput{
Duration: op.WorkflowDuration,
ActivityCount: op.ActivityCount,
ChildWorkflowID: op.ChildWorkflowID,
ChildWorkflowTimeout: op.ChildWorkflowTimeout,
})
resp, err := simCfg.MustGetFrontendClient(t, op.Cluster).StartWorkflowExecution(ctx,
&types.StartWorkflowExecutionRequest{
RequestID: uuid.New(),
Expand All @@ -149,7 +155,7 @@ func startWorkflow(
TaskList: &types.TaskList{Name: simTypes.TasklistName},
ExecutionStartToCloseTimeoutSeconds: common.Int32Ptr(int32((op.WorkflowExecutionStartToCloseTimeout).Seconds())),
TaskStartToCloseTimeoutSeconds: common.Int32Ptr(5),
Input: mustJSON(t, &simTypes.WorkflowInput{Duration: op.WorkflowDuration, ActivityCount: op.ActivityCount}),
Input: input,
WorkflowIDReusePolicy: types.WorkflowIDReusePolicyAllowDuplicate.Ptr(),
DelayStartSeconds: common.Int32Ptr(op.DelayStartSeconds),
CronSchedule: op.CronSchedule,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
# This file is a replication simulation scenario spec.
# It is parsed into ReplicationSimulationConfig struct.
# Replication simulation for this file can be run via ./simulation/replication/run.sh --scenario activeactive_child
# Dynamic config overrides can be set via config/dynamicconfig/replication_simulation_activeactive_child.yml
# Tests that active-active allows workflows to be created in multiple clusters

clusters:
cluster0:
grpcEndpoint: "cadence-cluster0:7833"
cluster1:
grpcEndpoint: "cadence-cluster1:7833"

# primaryCluster is where domain data is written to and replicates to others. e.g. domain registration
primaryCluster: "cluster0"

domains:
test-domain-aa:
activeClusterName: cluster0
activeClustersByRegion:
region0: cluster0
region1: cluster1
clusterAttributes:
region:
region0: cluster0
region1: cluster1

operations:
# start workflow in cluster0
- op: start_workflow
at: 0s
workflowID: wf1
workflowType: child-activity-loop-workflow
cluster: cluster0
childWorkflowID: wf1-child
childWorkflowTimeout: 65s
domain: test-domain-aa
workflowExecutionStartToCloseTimeout: 65s
workflowDuration: 35s
activeClusterSelectionPolicy:
clusterAttribute:
scope: region
name: region0

# start workflow in cluster1
- op: start_workflow
at: 0s
workflowID: wf2
workflowType: child-activity-loop-workflow
cluster: cluster1
childWorkflowID: wf2-child
childWorkflowTimeout: 65s
domain: test-domain-aa
workflowExecutionStartToCloseTimeout: 65s
workflowDuration: 35s
activeClusterSelectionPolicy:
clusterAttribute:
scope: region
name: region1

# validate that wf1 is started in cluster0 and completed in cluster1
- op: validate
at: 70s
workflowID: wf1-child
cluster: cluster0
domain: test-domain-aa
want:
status: completed
startedByWorkersInCluster: cluster0
completedByWorkersInCluster: cluster0
- op: validate
at: 70s
workflowID: wf1
cluster: cluster0
domain: test-domain-aa
want:
status: completed
startedByWorkersInCluster: cluster0
completedByWorkersInCluster: cluster0

# validate that wf2 is started and completed in cluster1
- op: validate
at: 70s
workflowID: wf2-child
cluster: cluster1
domain: test-domain-aa
want:
status: completed
startedByWorkersInCluster: cluster1
completedByWorkersInCluster: cluster1
- op: validate
at: 70s
workflowID: wf2
cluster: cluster1
domain: test-domain-aa
want:
status: completed
startedByWorkersInCluster: cluster1
completedByWorkersInCluster: cluster1
2 changes: 2 additions & 0 deletions simulation/replication/types/repl_sim_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,8 @@ type Operation struct {
WorkflowID string `yaml:"workflowID"`
WorkflowExecutionStartToCloseTimeout time.Duration `yaml:"workflowExecutionStartToCloseTimeout"`
WorkflowDuration time.Duration `yaml:"workflowDuration"`
ChildWorkflowID string `yaml:"childWorkflowID"`
ChildWorkflowTimeout time.Duration `yaml:"childWorkflowTimeout"`
ActivityCount int `yaml:"activityCount"`
DelayStartSeconds int32 `yaml:"delayStartSeconds"`
CronSchedule string `yaml:"cronSchedule"`
Expand Down
6 changes: 4 additions & 2 deletions simulation/replication/types/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,10 @@ const (
type OperationFunction func(t *testing.T, op *Operation, simCfg *ReplicationSimulationConfig) error

type WorkflowInput struct {
Duration time.Duration
ActivityCount int
Duration time.Duration
ActivityCount int
ChildWorkflowID string
ChildWorkflowTimeout time.Duration
}

type WorkflowOutput struct {
Expand Down
26 changes: 26 additions & 0 deletions simulation/replication/workflows/childactivityloop/workflow.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package childactivityloop

import (
"go.uber.org/cadence/workflow"

"github.com/uber/cadence/simulation/replication/types"
)

func Workflow(ctx workflow.Context, input types.WorkflowInput) (types.WorkflowOutput, error) {
logger := workflow.GetLogger(ctx)
logger.Sugar().Infof("child-activity-loop-workflow started with input: %+v", input)

cwo := workflow.ChildWorkflowOptions{
WorkflowID: input.ChildWorkflowID,
ExecutionStartToCloseTimeout: input.ChildWorkflowTimeout,
}
ctx = workflow.WithChildOptions(ctx, cwo)
var output types.WorkflowOutput
err := workflow.ExecuteChildWorkflow(ctx, "timer-activity-loop-workflow", input).Get(ctx, &output)
if err != nil {
logger.Sugar().Errorf("failed to execute child workflow: %v", err)
return types.WorkflowOutput{}, err
}

return output, nil
}
2 changes: 2 additions & 0 deletions simulation/replication/workflows/workflows.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package workflows

import (
"github.com/uber/cadence/simulation/replication/workflows/activityloop"
"github.com/uber/cadence/simulation/replication/workflows/childactivityloop"
"github.com/uber/cadence/simulation/replication/workflows/query"
"github.com/uber/cadence/simulation/replication/workflows/timeractivityloop"
)
Expand All @@ -14,6 +15,7 @@ var (
"timer-activity-loop-workflow": timeractivityloop.Workflow,
"activity-loop-workflow": activityloop.Workflow,
"query-workflow": queryWFRunner.Workflow,
"child-activity-loop-workflow": childactivityloop.Workflow,
}
}
Activities = map[string]any{
Expand Down
Loading