Skip to content

Commit bcabd9d

Browse files
authored
fix(active-active): Fix active workflow check for history APIs (#7347)
<!-- Describe what has changed in this PR --> **What changed?** - Fix active workflow check for history APIs - Update GetActiveClusterInfoByWorkflow to allow empty runID <!-- Tell your future self why have you made these changes --> **Why?** To fix active cluster check for active-active domains <!-- How have you verified this change? Tested locally? Added a unit test? Checked in staging env? --> **How did you test it?** unit tests & simulation tests <!-- Assuming the worst case, what can be broken when deploying this change to production? --> **Potential risks** <!-- Is it notable for release? e.g. schema updates, configuration or data migration required? If so, please mention it, and also update CHANGELOG.md --> **Release notes** <!-- Is there any documentation updates should be made for config, https://cadenceworkflow.io/docs/operation-guide/setup/ ? If so, please open an PR in https://github.com/cadence-workflow/cadence-docs --> **Documentation Changes**
1 parent cc0fd87 commit bcabd9d

14 files changed

Lines changed: 232 additions & 231 deletions

common/activecluster/manager.go

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ import (
3333
"github.com/uber/cadence/common/log"
3434
"github.com/uber/cadence/common/log/tag"
3535
"github.com/uber/cadence/common/metrics"
36+
"github.com/uber/cadence/common/persistence"
3637
"github.com/uber/cadence/common/types"
3738
)
3839

@@ -91,6 +92,21 @@ func NewManager(
9192
}
9293

9394
func (m *managerImpl) getClusterSelectionPolicy(ctx context.Context, domainID, wfID, rID string) (*types.ActiveClusterSelectionPolicy, error) {
95+
shardID := common.WorkflowIDToHistoryShard(wfID, m.numShards)
96+
executionManager, err := m.executionManagerProvider.GetExecutionManager(shardID)
97+
if err != nil {
98+
return nil, err
99+
}
100+
if rID == "" {
101+
execution, err := executionManager.GetCurrentExecution(ctx, &persistence.GetCurrentExecutionRequest{
102+
DomainID: domainID,
103+
WorkflowID: wfID,
104+
})
105+
if err != nil {
106+
return nil, err
107+
}
108+
rID = execution.RunID
109+
}
94110
// Check if the policy is already in the cache. create a key from domainID, wfID, rID
95111
key := fmt.Sprintf("%s:%s:%s", domainID, wfID, rID)
96112
cacheData := m.workflowPolicyCache.Get(key)
@@ -104,12 +120,6 @@ func (m *managerImpl) getClusterSelectionPolicy(ctx context.Context, domainID, w
104120
m.logger.Warn(fmt.Sprintf("Cache data for key %s is of type %T, not a *types.ActiveClusterSelectionPolicy", key, cacheData))
105121
}
106122

107-
shardID := common.WorkflowIDToHistoryShard(wfID, m.numShards)
108-
executionManager, err := m.executionManagerProvider.GetExecutionManager(shardID)
109-
if err != nil {
110-
return nil, err
111-
}
112-
113123
plcy, err := executionManager.GetActiveClusterSelectionPolicy(ctx, domainID, wfID, rID)
114124
if err != nil {
115125
return nil, err

common/activecluster/manager_test.go

Lines changed: 91 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -304,6 +304,7 @@ func TestGetActiveClusterInfoByWorkflow(t *testing.T) {
304304

305305
tests := []struct {
306306
name string
307+
runID string
307308
activeClusterCfg *types.ActiveClusters
308309
domainIDToNameErr error
309310
mockExecutionManagerFn func(em *persistence.MockExecutionManager)
@@ -313,7 +314,8 @@ func TestGetActiveClusterInfoByWorkflow(t *testing.T) {
313314
expectedError string
314315
}{
315316
{
316-
name: "domain ID to name function returns error",
317+
name: "domain ID to name function returns error",
318+
runID: "test-run-id",
317319
activeClusterCfg: &types.ActiveClusters{
318320
AttributeScopes: map[string]types.ClusterAttributeScope{
319321
"region": {
@@ -330,7 +332,8 @@ func TestGetActiveClusterInfoByWorkflow(t *testing.T) {
330332
expectedError: "failed to find domain by id",
331333
},
332334
{
333-
name: "execution manager provider returns error",
335+
name: "execution manager provider returns error",
336+
runID: "test-run-id",
334337
activeClusterCfg: &types.ActiveClusters{
335338
AttributeScopes: map[string]types.ClusterAttributeScope{
336339
"region": {
@@ -349,7 +352,8 @@ func TestGetActiveClusterInfoByWorkflow(t *testing.T) {
349352
expectedError: "failed to get execution manager",
350353
},
351354
{
352-
name: "execution manager GetActiveClusterSelectionPolicy returns error",
355+
name: "execution manager GetActiveClusterSelectionPolicy returns error",
356+
runID: "test-run-id",
353357
activeClusterCfg: &types.ActiveClusters{
354358
AttributeScopes: map[string]types.ClusterAttributeScope{
355359
"region": {
@@ -369,7 +373,8 @@ func TestGetActiveClusterInfoByWorkflow(t *testing.T) {
369373
expectedError: "database error",
370374
},
371375
{
372-
name: "policy not found (EntityNotExistsError) - uses empty policy with nil cluster attribute",
376+
name: "policy not found (EntityNotExistsError) - uses empty policy with nil cluster attribute",
377+
runID: "test-run-id",
373378
activeClusterCfg: &types.ActiveClusters{
374379
AttributeScopes: map[string]types.ClusterAttributeScope{
375380
"region": {
@@ -392,7 +397,8 @@ func TestGetActiveClusterInfoByWorkflow(t *testing.T) {
392397
},
393398
},
394399
{
395-
name: "policy not found (EntityNotExistsError) - empty policy with cluster attribute not found",
400+
name: "policy not found (EntityNotExistsError) - empty policy with cluster attribute not found",
401+
runID: "test-run-id",
396402
activeClusterCfg: &types.ActiveClusters{
397403
AttributeScopes: map[string]types.ClusterAttributeScope{
398404
"region": {
@@ -415,7 +421,8 @@ func TestGetActiveClusterInfoByWorkflow(t *testing.T) {
415421
},
416422
},
417423
{
418-
name: "policy found but cluster attribute not found in domain config",
424+
name: "policy found but cluster attribute not found in domain config",
425+
runID: "test-run-id",
419426
activeClusterCfg: &types.ActiveClusters{
420427
AttributeScopes: map[string]types.ClusterAttributeScope{
421428
"region": {
@@ -440,7 +447,8 @@ func TestGetActiveClusterInfoByWorkflow(t *testing.T) {
440447
expectedError: "could not find cluster attribute &{datacenter dc1} in the domain test-domain-id's active cluster config",
441448
},
442449
{
443-
name: "successful lookup - policy found and cluster attribute exists",
450+
name: "successful lookup - policy found and cluster attribute exists",
451+
runID: "test-run-id",
444452
activeClusterCfg: &types.ActiveClusters{
445453
AttributeScopes: map[string]types.ClusterAttributeScope{
446454
"region": {
@@ -472,7 +480,8 @@ func TestGetActiveClusterInfoByWorkflow(t *testing.T) {
472480
},
473481
},
474482
{
475-
name: "successful lookup - policy from cache",
483+
name: "successful lookup - policy from cache",
484+
runID: "test-run-id",
476485
activeClusterCfg: &types.ActiveClusters{
477486
AttributeScopes: map[string]types.ClusterAttributeScope{
478487
"datacenter": {
@@ -495,14 +504,17 @@ func TestGetActiveClusterInfoByWorkflow(t *testing.T) {
495504
Name: "dc2",
496505
},
497506
},
498-
// No mock needed since policy comes from cache
507+
mockExecutionManagerFn: func(em *persistence.MockExecutionManager) {
508+
// No expectations needed since policy comes from cache, but we need the provider to be non-nil
509+
},
499510
expectedResult: &types.ActiveClusterInfo{
500511
ActiveClusterName: "cluster1",
501512
FailoverVersion: 250,
502513
},
503514
},
504515
{
505-
name: "successful lookup - nil cluster attribute in policy uses domain-level info",
516+
name: "successful lookup - nil cluster attribute in policy uses domain-level info",
517+
runID: "test-run-id",
506518
activeClusterCfg: &types.ActiveClusters{
507519
AttributeScopes: map[string]types.ClusterAttributeScope{
508520
"region": {
@@ -527,7 +539,8 @@ func TestGetActiveClusterInfoByWorkflow(t *testing.T) {
527539
},
528540
},
529541
{
530-
name: "successful lookup - multiple scopes with different attributes",
542+
name: "successful lookup - multiple scopes with different attributes",
543+
runID: "test-run-id",
531544
activeClusterCfg: &types.ActiveClusters{
532545
AttributeScopes: map[string]types.ClusterAttributeScope{
533546
"region": {
@@ -566,6 +579,71 @@ func TestGetActiveClusterInfoByWorkflow(t *testing.T) {
566579
FailoverVersion: 300,
567580
},
568581
},
582+
{
583+
name: "successful lookup - empty runID triggers GetCurrentExecution",
584+
runID: "",
585+
activeClusterCfg: &types.ActiveClusters{
586+
AttributeScopes: map[string]types.ClusterAttributeScope{
587+
"region": {
588+
ClusterAttributes: map[string]types.ActiveClusterInfo{
589+
"us-west": {
590+
ActiveClusterName: "cluster0",
591+
FailoverVersion: 100,
592+
},
593+
"us-east": {
594+
ActiveClusterName: "cluster1",
595+
FailoverVersion: 200,
596+
},
597+
},
598+
},
599+
},
600+
},
601+
mockExecutionManagerFn: func(em *persistence.MockExecutionManager) {
602+
// Expect GetCurrentExecution to be called first to get the runID
603+
em.EXPECT().GetCurrentExecution(gomock.Any(), &persistence.GetCurrentExecutionRequest{
604+
DomainID: "test-domain-id",
605+
WorkflowID: "test-workflow-id",
606+
}).Return(&persistence.GetCurrentExecutionResponse{
607+
RunID: "current-run-id",
608+
}, nil)
609+
// Then expect GetActiveClusterSelectionPolicy with the returned runID
610+
em.EXPECT().GetActiveClusterSelectionPolicy(gomock.Any(), "test-domain-id", "test-workflow-id", "current-run-id").
611+
Return(&types.ActiveClusterSelectionPolicy{
612+
ClusterAttribute: &types.ClusterAttribute{
613+
Scope: "region",
614+
Name: "us-east",
615+
},
616+
}, nil)
617+
},
618+
expectedResult: &types.ActiveClusterInfo{
619+
ActiveClusterName: "cluster1",
620+
FailoverVersion: 200,
621+
},
622+
},
623+
{
624+
name: "empty runID - GetCurrentExecution returns error",
625+
runID: "",
626+
activeClusterCfg: &types.ActiveClusters{
627+
AttributeScopes: map[string]types.ClusterAttributeScope{
628+
"region": {
629+
ClusterAttributes: map[string]types.ActiveClusterInfo{
630+
"us-west": {
631+
ActiveClusterName: "cluster0",
632+
FailoverVersion: 100,
633+
},
634+
},
635+
},
636+
},
637+
},
638+
mockExecutionManagerFn: func(em *persistence.MockExecutionManager) {
639+
// GetCurrentExecution returns an error
640+
em.EXPECT().GetCurrentExecution(gomock.Any(), &persistence.GetCurrentExecutionRequest{
641+
DomainID: "test-domain-id",
642+
WorkflowID: "test-workflow-id",
643+
}).Return(nil, errors.New("workflow not found"))
644+
},
645+
expectedError: "workflow not found",
646+
},
569647
}
570648

571649
for _, tc := range tests {
@@ -605,11 +683,11 @@ func TestGetActiveClusterInfoByWorkflow(t *testing.T) {
605683
assert.NoError(t, err)
606684

607685
if tc.cachedPolicy != nil {
608-
key := fmt.Sprintf("%s:%s:%s", "test-domain-id", wfID, "test-run-id")
686+
key := fmt.Sprintf("%s:%s:%s", "test-domain-id", wfID, tc.runID)
609687
mgr.(*managerImpl).workflowPolicyCache.Put(key, tc.cachedPolicy)
610688
}
611689

612-
result, err := mgr.GetActiveClusterInfoByWorkflow(context.Background(), "test-domain-id", wfID, "test-run-id")
690+
result, err := mgr.GetActiveClusterInfoByWorkflow(context.Background(), "test-domain-id", wfID, tc.runID)
613691
if tc.expectedError != "" {
614692
assert.EqualError(t, err, tc.expectedError)
615693
assert.Nil(t, result)

common/cache/domainCache.go

Lines changed: 0 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1033,27 +1033,6 @@ func (entry *DomainCacheEntry) IsSampledForLongerRetention(
10331033
return false
10341034
}
10351035

1036-
// TODO(active-active): This function should accept active cluster selection policy as a parameter
1037-
func GetActiveDomainByID(cache DomainCache, currentCluster string, domainID string) (*DomainCacheEntry, error) {
1038-
if err := common.ValidateDomainUUID(domainID); err != nil {
1039-
return nil, err
1040-
}
1041-
1042-
domain, err := cache.GetDomainByID(domainID)
1043-
if err != nil {
1044-
return nil, err
1045-
}
1046-
1047-
if !domain.IsActiveIn(currentCluster) {
1048-
// return the domain record as well as the not-active-error because some callers check
1049-
// whether the domain is pending active or not
1050-
// it's not a good design, but we need to keep it for backward compatibility
1051-
return domain, domain.NewDomainNotActiveError(currentCluster, domain.GetReplicationConfig().ActiveClusterName)
1052-
}
1053-
1054-
return domain, nil
1055-
}
1056-
10571036
// IsDeprecatedOrDeleted This function checks the domain status to see if the domain has been deprecated or deleted.
10581037
func (entry *DomainCacheEntry) IsDeprecatedOrDeleted() bool {
10591038
if entry.info.Status == persistence.DomainStatusDeprecated || entry.info.Status == persistence.DomainStatusDeleted {

common/cache/domainCache_test.go

Lines changed: 0 additions & 68 deletions
Original file line numberDiff line numberDiff line change
@@ -1117,74 +1117,6 @@ func Test_IsSampledForLongerRetention(t *testing.T) {
11171117
require.False(t, d.IsSampledForLongerRetention(wid))
11181118
}
11191119

1120-
func Test_GetActiveDomainByID(t *testing.T) {
1121-
nonExistingUUID := uuid.New()
1122-
activeDomainUUID := uuid.New()
1123-
passiveDomainUUID := uuid.New()
1124-
1125-
activeDomain := NewGlobalDomainCacheEntryForTest(
1126-
&persistence.DomainInfo{ID: activeDomainUUID, Name: "active"},
1127-
nil,
1128-
&persistence.DomainReplicationConfig{ActiveClusterName: "A"},
1129-
0,
1130-
)
1131-
passiveDomain := NewGlobalDomainCacheEntryForTest(
1132-
&persistence.DomainInfo{ID: passiveDomainUUID, Name: "passive"},
1133-
nil,
1134-
&persistence.DomainReplicationConfig{ActiveClusterName: "B"},
1135-
0,
1136-
)
1137-
1138-
tests := []struct {
1139-
msg string
1140-
domainID string
1141-
expectDomain *DomainCacheEntry
1142-
expectedErr error
1143-
}{
1144-
{
1145-
msg: "invalid UUID",
1146-
domainID: "invalid",
1147-
expectedErr: &types.BadRequestError{Message: "Invalid domain UUID."},
1148-
},
1149-
{
1150-
msg: "non existing domain",
1151-
domainID: nonExistingUUID,
1152-
expectedErr: assert.AnError,
1153-
},
1154-
{
1155-
msg: "active domain",
1156-
domainID: activeDomainUUID,
1157-
expectDomain: activeDomain,
1158-
},
1159-
{
1160-
msg: "passive domain",
1161-
domainID: passiveDomainUUID,
1162-
expectDomain: passiveDomain,
1163-
expectedErr: &types.DomainNotActiveError{
1164-
Message: "Domain: passive is active in cluster: B, while current cluster A is a standby cluster.",
1165-
DomainName: "passive",
1166-
CurrentCluster: "A",
1167-
ActiveCluster: "B",
1168-
},
1169-
},
1170-
}
1171-
1172-
for _, tt := range tests {
1173-
t.Run(tt.msg, func(t *testing.T) {
1174-
ctrl := gomock.NewController(t)
1175-
cache := NewMockDomainCache(ctrl)
1176-
cache.EXPECT().GetDomainByID(nonExistingUUID).Return(nil, assert.AnError).AnyTimes()
1177-
cache.EXPECT().GetDomainByID(activeDomainUUID).Return(activeDomain, nil).AnyTimes()
1178-
cache.EXPECT().GetDomainByID(passiveDomainUUID).Return(passiveDomain, nil).AnyTimes()
1179-
1180-
domain, err := GetActiveDomainByID(cache, "A", tt.domainID)
1181-
1182-
assert.Equal(t, tt.expectDomain, domain)
1183-
assert.Equal(t, tt.expectedErr, err)
1184-
})
1185-
}
1186-
}
1187-
11881120
func Test_WithTimeSource(t *testing.T) {
11891121
ctrl := gomock.NewController(t)
11901122
defer ctrl.Finish()

service/history/engine/engineimpl/history_engine.go

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -425,10 +425,6 @@ func getScheduleID(activityID string, mutableState execution.MutableState) (int6
425425
return activityInfo.ScheduleID, nil
426426
}
427427

428-
func (e *historyEngineImpl) getActiveDomainByID(id string) (*cache.DomainCacheEntry, error) {
429-
return cache.GetActiveDomainByID(e.shard.GetDomainCache(), e.clusterMetadata.GetCurrentClusterName(), id)
430-
}
431-
432428
func (e *historyEngineImpl) getActiveDomainByWorkflow(ctx context.Context, domainID, workflowID, runID string) (*cache.DomainCacheEntry, error) {
433429
activeClusterInfo, err := e.activeClusterManager.GetActiveClusterInfoByWorkflow(ctx, domainID, workflowID, runID)
434430
if err != nil {

0 commit comments

Comments
 (0)