From 7d8469045cd98f09f614c48851a94146313ccbad Mon Sep 17 00:00:00 2001 From: David Porter Date: Wed, 29 Oct 2025 22:39:26 -0700 Subject: [PATCH 1/9] Squashing commit Signed-off-by: David Porter --- common/domain/attrValidator.go | 10 ++ common/domain/errors.go | 1 + common/domain/handler.go | 128 +++++++++++++++++- common/domain/handler_MasterCluster_test.go | 2 - .../domain/handler_NotMasterCluster_test.go | 2 - common/domain/handler_test.go | 18 +-- common/persistence/data_manager_interfaces.go | 7 + common/types/shared.go | 9 +- common/types/shared_test.go | 35 +++++ 9 files changed, 192 insertions(+), 20 deletions(-) diff --git a/common/domain/attrValidator.go b/common/domain/attrValidator.go index a6403b07859..ccd1c6b012c 100644 --- a/common/domain/attrValidator.go +++ b/common/domain/attrValidator.go @@ -48,6 +48,16 @@ func newAttrValidator( } } +func (d *AttrValidatorImpl) validateLocalDomainUpdateRequest(updateRequest *types.UpdateDomainRequest) error { + if updateRequest.ActiveClusterName != nil || updateRequest.ActiveClusters != nil || updateRequest.Clusters != nil { + return errLocalDomainsCannotFailover + } + if updateRequest.FailoverTimeoutInSeconds != nil { + return errLocalDomainsCannotFailover + } + return nil +} + func (d *AttrValidatorImpl) validateDomainConfig(config *persistence.DomainConfig) error { if config.Retention < int32(d.minRetentionDays) { return errInvalidRetentionPeriod diff --git a/common/domain/errors.go b/common/domain/errors.go index 1a52346b457..a732e8c3846 100644 --- a/common/domain/errors.go +++ b/common/domain/errors.go @@ -33,6 +33,7 @@ var ( errOngoingGracefulFailover = &types.BadRequestError{Message: "Cannot start concurrent graceful failover."} errInvalidGracefulFailover = &types.BadRequestError{Message: "Cannot start graceful failover without updating active cluster or in local domain."} errActiveClusterNameRequired = &types.BadRequestError{Message: "ActiveClusterName is required for all global domains."} + errLocalDomainsCannotFailover = &types.BadRequestError{Message: "Local domains cannot perform failovers or change replication configuration"} errInvalidRetentionPeriod = &types.BadRequestError{Message: "A valid retention period is not set on request."} errInvalidArchivalConfig = &types.BadRequestError{Message: "Invalid to enable archival without specifying a uri."} diff --git a/common/domain/handler.go b/common/domain/handler.go index 07b3ef4926a..5e3d0feff92 100644 --- a/common/domain/handler.go +++ b/common/domain/handler.go @@ -427,6 +427,7 @@ func (d *handlerImpl) UpdateDomain( return nil, err } + // todo (david.porter) remove this and push the deepcopy into each of the branches getResponse := currentDomainState.DeepCopy() info := getResponse.Info @@ -439,10 +440,15 @@ func (d *handlerImpl) UpdateDomain( isGlobalDomain := getResponse.IsGlobalDomain gracefulFailoverEndTime := getResponse.FailoverEndTime currentActiveCluster := replicationConfig.ActiveClusterName - currentActiveClusters := replicationConfig.ActiveClusters.DeepCopy() previousFailoverVersion := getResponse.PreviousFailoverVersion lastUpdatedTime := time.Unix(0, getResponse.LastUpdatedTime) + if !isGlobalDomain { + return d.updateLocalDomain(ctx, updateRequest, getResponse, notificationVersion) + } + // todo (active-active) refactor the rest of this method to remove all branching for global domain variations + // and the split between domain update and failover + // whether history archival config changed historyArchivalConfigChanged := false // whether visibility archival config changed @@ -632,7 +638,7 @@ func (d *handlerImpl) UpdateDomain( failoverType, ¤tActiveCluster, updateRequest.ActiveClusterName, - currentActiveClusters, + currentDomainState.ReplicationConfig.GetActiveClusters(), replicationConfig.ActiveClusters, )) if err != nil { @@ -663,6 +669,8 @@ func (d *handlerImpl) UpdateDomain( return nil, err } } + // todo (david.porter) remove this - all domains at this point are global + // leaving during the refactor just for clarity if isGlobalDomain { if err = d.domainReplicator.HandleTransmissionTask( ctx, @@ -691,6 +699,122 @@ func (d *handlerImpl) UpdateDomain( return response, nil } +func (d *handlerImpl) updateLocalDomain(ctx context.Context, + updateRequest *types.UpdateDomainRequest, + currentState *persistence.GetDomainResponse, + notificationVersion int64, +) (*types.UpdateDomainResponse, error) { + + err := d.domainAttrValidator.validateLocalDomainUpdateRequest(updateRequest) + if err != nil { + return nil, err + } + + // whether history archival config changed + historyArchivalConfigChanged := false + // whether visibility archival config changed + visibilityArchivalConfigChanged := false + // whether anything other than active cluster is changed + configurationChanged := false + + intendedDomainState := currentState.DeepCopy() + + configVersion := currentState.ConfigVersion + + now := d.timeSource.Now() + + lastUpdatedTime := time.Unix(0, currentState.LastUpdatedTime) + + // Update history archival state + historyArchivalConfigChanged, err = d.updateHistoryArchivalState(intendedDomainState.Config, updateRequest) + if err != nil { + return nil, err + } + + // Update visibility archival state + visibilityArchivalConfigChanged, err = d.updateVisibilityArchivalState(intendedDomainState.Config, updateRequest) + if err != nil { + return nil, err + } + + // Update domain info + info, domainInfoChanged := d.updateDomainInfo( + updateRequest, + intendedDomainState.Info, + ) + + // Update domain config + config, domainConfigChanged, err := d.updateDomainConfiguration( + updateRequest.GetName(), + intendedDomainState.Config, + updateRequest, + ) + if err != nil { + return nil, err + } + + // Update domain bad binary + config, deleteBinaryChanged, err := d.updateDeleteBadBinary( + config, + updateRequest.DeleteBadBinary, + ) + if err != nil { + return nil, err + } + + configurationChanged = historyArchivalConfigChanged || visibilityArchivalConfigChanged || domainInfoChanged || domainConfigChanged || deleteBinaryChanged + + if err = d.domainAttrValidator.validateDomainConfig(config); err != nil { + return nil, err + } + + if err = d.domainAttrValidator.validateDomainReplicationConfigForLocalDomain( + intendedDomainState.ReplicationConfig, + ); err != nil { + return nil, err + } + + if configurationChanged { + // Check the failover cool down time + if lastUpdatedTime.Add(d.config.FailoverCoolDown(info.Name)).After(now) { + d.logger.Debugf("Domain was last updated at %v, failoverCoolDown: %v, current time: %v.", lastUpdatedTime, d.config.FailoverCoolDown(info.Name), now) + return nil, errDomainUpdateTooFrequent + } + + // set the versions + if configurationChanged { + configVersion = intendedDomainState.ConfigVersion + 1 + } + + lastUpdatedTime = now + + updateReq := createUpdateRequest( + info, + config, + intendedDomainState.ReplicationConfig, + configVersion, + intendedDomainState.FailoverVersion, + intendedDomainState.FailoverNotificationVersion, + intendedDomainState.FailoverEndTime, + intendedDomainState.PreviousFailoverVersion, + lastUpdatedTime, + notificationVersion, + ) + + err = d.domainManager.UpdateDomain(ctx, &updateReq) + if err != nil { + return nil, err + } + } + response := &types.UpdateDomainResponse{ + IsGlobalDomain: false, + FailoverVersion: intendedDomainState.FailoverVersion, + } + response.DomainInfo, response.Configuration, response.ReplicationConfiguration = d.createResponse(info, config, intendedDomainState.ReplicationConfig) + + return response, nil +} + // FailoverDomain handles failover of the domain to a different cluster func (d *handlerImpl) FailoverDomain( ctx context.Context, diff --git a/common/domain/handler_MasterCluster_test.go b/common/domain/handler_MasterCluster_test.go index ad56a75aa1e..ce0bf2f4837 100644 --- a/common/domain/handler_MasterCluster_test.go +++ b/common/domain/handler_MasterCluster_test.go @@ -427,8 +427,6 @@ func (s *domainHandlerGlobalDomainEnabledPrimaryClusterSuite) TestUpdateGetDomai VisibilityArchivalStatus: types.ArchivalStatusDisabled.Ptr(), VisibilityArchivalURI: common.StringPtr(""), BadBinaries: &types.BadBinaries{Binaries: map[string]*types.BadBinaryInfo{}}, - ActiveClusterName: common.StringPtr(s.ClusterMetadata.GetCurrentClusterName()), - Clusters: clusters, }) s.Nil(err) fnTest( diff --git a/common/domain/handler_NotMasterCluster_test.go b/common/domain/handler_NotMasterCluster_test.go index 943169cee33..b75e07382cb 100644 --- a/common/domain/handler_NotMasterCluster_test.go +++ b/common/domain/handler_NotMasterCluster_test.go @@ -398,8 +398,6 @@ func (s *domainHandlerGlobalDomainEnabledNotPrimaryClusterSuite) TestUpdateGetDo VisibilityArchivalStatus: types.ArchivalStatusDisabled.Ptr(), VisibilityArchivalURI: common.StringPtr(""), BadBinaries: &types.BadBinaries{Binaries: map[string]*types.BadBinaryInfo{}}, - ActiveClusterName: common.StringPtr(s.ClusterMetadata.GetCurrentClusterName()), - Clusters: clusters, }) s.Nil(err) fnTest( diff --git a/common/domain/handler_test.go b/common/domain/handler_test.go index 2268ac1c6d5..f309cdf4dfa 100644 --- a/common/domain/handler_test.go +++ b/common/domain/handler_test.go @@ -2210,7 +2210,7 @@ func TestHandler_UpdateDomain(t *testing.T) { }, }, { - name: "Success case - local domain force failover", + name: "Error case - local domain force failover - shoudl not be able to failover a local domain", setupMock: func(domainManager *persistence.MockDomainManager, updateRequest *types.UpdateDomainRequest, archivalMetadata *archiver.MockArchivalMetadata, timeSource clock.MockedTimeSource, _ *MockReplicator) { domainResponse := &persistence.GetDomainResponse{ ReplicationConfig: &persistence.DomainReplicationConfig{ @@ -2239,17 +2239,6 @@ func TestHandler_UpdateDomain(t *testing.T) { domainManager.EXPECT().GetMetadata(ctx).Return(&persistence.GetMetadataResponse{}, nil).Times(1) domainManager.EXPECT().GetDomain(ctx, &persistence.GetDomainRequest{Name: updateRequest.GetName()}). Return(domainResponse, nil).Times(1) - archivalConfig := archiver.NewArchivalConfig( - commonconstants.ArchivalDisabled, - dynamicproperties.GetStringPropertyFn(commonconstants.ArchivalDisabled), - false, - dynamicproperties.GetBoolPropertyFn(false), - commonconstants.ArchivalDisabled, - "") - archivalMetadata.On("GetHistoryConfig").Return(archivalConfig).Times(1) - archivalMetadata.On("GetVisibilityConfig").Return(archivalConfig).Times(1) - timeSource.Advance(time.Hour) - domainManager.EXPECT().UpdateDomain(ctx, gomock.Any()).Return(nil).Times(1) }, request: &types.UpdateDomainRequest{ Name: constants.TestDomainName, @@ -2281,6 +2270,7 @@ func TestHandler_UpdateDomain(t *testing.T) { }, } }, + err: errLocalDomainsCannotFailover, }, { name: "Error case - GetMetadata error", @@ -2417,7 +2407,7 @@ func TestHandler_UpdateDomain(t *testing.T) { }, }, { - name: "Error case - handleGracefulFailover error", + name: "Error case - handleGracefulFailover error in the case of a global domain - it should return an error to the user", setupMock: func(domainManager *persistence.MockDomainManager, updateRequest *types.UpdateDomainRequest, archivalMetadata *archiver.MockArchivalMetadata, _ clock.MockedTimeSource, _ *MockReplicator) { domainManager.EXPECT().GetMetadata(ctx).Return(&persistence.GetMetadataResponse{}, nil).Times(1) domainManager.EXPECT().GetDomain(ctx, &persistence.GetDomainRequest{Name: updateRequest.GetName()}). @@ -2427,6 +2417,7 @@ func TestHandler_UpdateDomain(t *testing.T) { }, ReplicationConfig: &persistence.DomainReplicationConfig{}, Config: &persistence.DomainConfig{}, + IsGlobalDomain: true, }, nil).Times(1) archivalConfig := archiver.NewArchivalConfig( commonconstants.ArchivalDisabled, @@ -2452,6 +2443,7 @@ func TestHandler_UpdateDomain(t *testing.T) { Return(&persistence.GetDomainResponse{ ReplicationConfig: &persistence.DomainReplicationConfig{}, Config: &persistence.DomainConfig{}, + IsGlobalDomain: true, Info: &persistence.DomainInfo{ Name: constants.TestDomainName, }, diff --git a/common/persistence/data_manager_interfaces.go b/common/persistence/data_manager_interfaces.go index ee4581bf34c..be1f1a2a971 100644 --- a/common/persistence/data_manager_interfaces.go +++ b/common/persistence/data_manager_interfaces.go @@ -1933,6 +1933,13 @@ func (t *TimerTaskInfo) ToTask() (Task, error) { } } +func (c *DomainReplicationConfig) GetActiveClusters() *types.ActiveClusters { + if c != nil && c.ActiveClusters != nil { + return c.ActiveClusters + } + return nil +} + // ToNilSafeCopy // TODO: it seems that we just need a nil safe shardInfo, deep copy is not necessary func (s *ShardInfo) ToNilSafeCopy() *ShardInfo { diff --git a/common/types/shared.go b/common/types/shared.go index 8347c4e1b31..3cbf91f4d21 100644 --- a/common/types/shared.go +++ b/common/types/shared.go @@ -2731,7 +2731,14 @@ func (v *ActiveClusters) DeepCopy() *ActiveClusters { if v.AttributeScopes != nil { result.AttributeScopes = make(map[string]ClusterAttributeScope, len(v.AttributeScopes)) for scopeType, scope := range v.AttributeScopes { - result.AttributeScopes[scopeType] = scope + copiedScope := ClusterAttributeScope{} + if scope.ClusterAttributes != nil { + copiedScope.ClusterAttributes = make(map[string]ActiveClusterInfo, len(scope.ClusterAttributes)) + for attrName, attrInfo := range scope.ClusterAttributes { + copiedScope.ClusterAttributes[attrName] = attrInfo + } + } + result.AttributeScopes[scopeType] = copiedScope } } return result diff --git a/common/types/shared_test.go b/common/types/shared_test.go index 695752b78a0..7bdaf23567b 100644 --- a/common/types/shared_test.go +++ b/common/types/shared_test.go @@ -130,6 +130,41 @@ func TestActiveClustersConfigDeepCopy(t *testing.T) { } } +// Todo (david.porter) delete this test and codegen this +func TestActiveClustersDeepCopyMutationIsolation(t *testing.T) { + + t.Run("modifying nested ClusterAttributes map in original should not affect copy", func(t *testing.T) { + original := &ActiveClusters{ + AttributeScopes: map[string]ClusterAttributeScope{ + "region": { + ClusterAttributes: map[string]ActiveClusterInfo{ + "us-east-1": { + ActiveClusterName: "cluster1", + FailoverVersion: 100, + }, + }, + }, + }, + } + + copied := original.DeepCopy() + + assert.Equal(t, original, copied) + + scope := original.AttributeScopes["region"] + scope.ClusterAttributes["us-west-1"] = ActiveClusterInfo{ + ActiveClusterName: "cluster2", + FailoverVersion: 200, + } + original.AttributeScopes["region"] = scope + + assert.Len(t, original.AttributeScopes["region"].ClusterAttributes, 2) + assert.Len(t, copied.AttributeScopes["region"].ClusterAttributes, 1) + assert.Contains(t, original.AttributeScopes["region"].ClusterAttributes, "us-west-1") + assert.NotContains(t, copied.AttributeScopes["region"].ClusterAttributes, "us-west-1") + }) +} + func TestIsActiveActiveDomain(t *testing.T) { tests := []struct { name string From 0e7faeede0f39442bbe16429db8337400fb0e319 Mon Sep 17 00:00:00 2001 From: David Porter Date: Wed, 29 Oct 2025 23:43:16 -0700 Subject: [PATCH 2/9] Wip, refactor failvoers Signed-off-by: David Porter --- common/domain/handler.go | 229 ++++++++++++++++++++++++++++++++++++++- common/types/shared.go | 6 + 2 files changed, 233 insertions(+), 2 deletions(-) diff --git a/common/domain/handler.go b/common/domain/handler.go index 5e3d0feff92..66d5a5988a2 100644 --- a/common/domain/handler.go +++ b/common/domain/handler.go @@ -446,8 +446,10 @@ func (d *handlerImpl) UpdateDomain( if !isGlobalDomain { return d.updateLocalDomain(ctx, updateRequest, getResponse, notificationVersion) } - // todo (active-active) refactor the rest of this method to remove all branching for global domain variations - // and the split between domain update and failover + + if updateRequest.IsAFailoverRequest() { + return d.handleFailoverRequest(ctx, updateRequest, getResponse, notificationVersion) + } // whether history archival config changed historyArchivalConfigChanged := false @@ -699,6 +701,229 @@ func (d *handlerImpl) UpdateDomain( return response, nil } +func (d *handlerImpl) handleFailoverRequest(ctx context.Context, + updateRequest *types.UpdateDomainRequest, + currentState *persistence.GetDomainResponse, + notificationVersion int64, +) (*types.UpdateDomainResponse, error) { + + // intendedDomainState will be modified + // into the intended shape by the functions here + intendedDomainState := currentState.DeepCopy() + + configVersion := currentState.ConfigVersion + failoverVersion := currentState.FailoverVersion + failoverNotificationVersion := currentState.FailoverNotificationVersion + isGlobalDomain := currentState.IsGlobalDomain + gracefulFailoverEndTime := currentState.FailoverEndTime + currentActiveCluster := currentState.ReplicationConfig.ActiveClusterName + previousFailoverVersion := currentState.PreviousFailoverVersion + lastUpdatedTime := time.Unix(0, currentState.LastUpdatedTime) + wasActiveActive := currentState.ReplicationConfig.IsActiveActive() + now := d.timeSource.Now() + + var activeClusterChanged bool + var configurationChanged bool + + // Update replication config + replicationConfig, replicationConfigChanged, activeClusterChanged, err := d.updateReplicationConfig( + currentState.Info.Name, + intendedDomainState.ReplicationConfig, + updateRequest, + ) + if err != nil { + return nil, err + } + if !activeClusterChanged && !replicationConfigChanged { + return nil, &types.BadRequestError{Message: "a failover was requested, but there was no change detected, the configuration was not updated"} + } + + // Handle graceful failover request + if updateRequest.FailoverTimeoutInSeconds != nil { + gracefulFailoverEndTime, previousFailoverVersion, err = d.handleGracefulFailover( + updateRequest, + replicationConfig, + currentActiveCluster, + gracefulFailoverEndTime, + failoverVersion, + activeClusterChanged, + isGlobalDomain, + ) + if err != nil { + return nil, err + } + } + + // replication config is a subset of config, + configurationChanged = replicationConfigChanged + + if err = d.domainAttrValidator.validateDomainConfig(intendedDomainState.Config); err != nil { + return nil, err + } + + err = d.validateDomainReplicationConfigForUpdateDomain(replicationConfig, isGlobalDomain, configurationChanged, activeClusterChanged) + if err != nil { + return nil, err + } + + // Check the failover cool down time + if lastUpdatedTime.Add(d.config.FailoverCoolDown(intendedDomainState.Info.Name)).After(now) { + d.logger.Debugf("Domain was last updated at %v, failoverCoolDown: %v, current time: %v.", lastUpdatedTime, d.config.FailoverCoolDown(intendedDomainState.Info.Name), now) + return nil, errDomainUpdateTooFrequent + } + + // set the versions + if configurationChanged { + configVersion++ + } + + var failoverType constants.FailoverType = constants.FailoverTypeGrace + + // Force failover cleans graceful failover state + if updateRequest.FailoverTimeoutInSeconds == nil { + failoverType = constants.FailoverTypeForce + gracefulFailoverEndTime = nil + previousFailoverVersion = constants.InitialPreviousFailoverVersion + } + + // Cases: + // 1. active-passive domain's ActiveClusterName is changed + // 2. active-passive domain is being migrated to active-active + // 3. active-active domain's ActiveClusters is changed + isActiveActive := replicationConfig.IsActiveActive() + + // case 1. active-passive domain's ActiveClusterName is changed + if !wasActiveActive && !isActiveActive { + failoverVersion = d.clusterMetadata.GetNextFailoverVersion( + replicationConfig.ActiveClusterName, + failoverVersion, + updateRequest.Name, + ) + + d.logger.Debug("active-passive domain failover", + tag.WorkflowDomainName(intendedDomainState.Info.Name), + tag.Dynamic("failover-version", failoverVersion), + tag.Dynamic("failover-type", failoverType), + ) + + err = updateFailoverHistoryInDomainData(intendedDomainState.Info, d.config, NewFailoverEvent( + now, + failoverType, + ¤tActiveCluster, + updateRequest.ActiveClusterName, + nil, + nil, + )) + if err != nil { + d.logger.Warn("failed to update failover history", tag.Error(err)) + } + } + + // case 2. active-passive domain is being migrated to active-active + if !wasActiveActive && isActiveActive { + // for active-passive to active-active migration, + // we increment failover version so top level failoverVersion is updated and domain data is replicated. + failoverVersion = d.clusterMetadata.GetNextFailoverVersion( + replicationConfig.ActiveClusterName, + failoverVersion+1, //todo: (active-active): Let's review if we need to increment + // this for cluster-attr failover changes. It may not be necessary to increment + updateRequest.Name, + ) + + d.logger.Debug("active-passive domain is being migrated to active-active", + tag.WorkflowDomainName(intendedDomainState.Info.Name), + tag.Dynamic("failover-version", failoverVersion), + tag.Dynamic("failover-type", failoverType), + ) + + err = updateFailoverHistoryInDomainData(intendedDomainState.Info, d.config, NewFailoverEvent( + now, + failoverType, + ¤tActiveCluster, + updateRequest.ActiveClusterName, + nil, + replicationConfig.ActiveClusters, + )) + if err != nil { + d.logger.Warn("failed to update failover history", tag.Error(err)) + } + } + + // case 3. active-active domain's ActiveClusters is changed + if wasActiveActive && isActiveActive { + failoverVersion = d.clusterMetadata.GetNextFailoverVersion( + replicationConfig.ActiveClusterName, + failoverVersion+1, //todo: (active-active): Let's review if we need to increment + // this for cluster-attr failover changes. It may not be necessary to increment + updateRequest.Name, + ) + + d.logger.Debug("active-active domain failover", + tag.WorkflowDomainName(intendedDomainState.Info.Name), + tag.Dynamic("failover-version", failoverVersion), + tag.Dynamic("failover-type", failoverType), + ) + + err = updateFailoverHistoryInDomainData(intendedDomainState.Info, d.config, NewFailoverEvent( + now, + failoverType, + ¤tActiveCluster, + updateRequest.ActiveClusterName, + currentState.ReplicationConfig.GetActiveClusters(), + intendedDomainState.ReplicationConfig.GetActiveClusters(), + )) + if err != nil { + d.logger.Warn("failed to update failover history", tag.Error(err)) + } + } + + failoverNotificationVersion = notificationVersion + + lastUpdatedTime = now + + updateReq := createUpdateRequest( + intendedDomainState.Info, + intendedDomainState.Config, + replicationConfig, + configVersion, + failoverVersion, + failoverNotificationVersion, + gracefulFailoverEndTime, + previousFailoverVersion, + lastUpdatedTime, + notificationVersion, + ) + + err = d.domainManager.UpdateDomain(ctx, &updateReq) + if err != nil { + return nil, err + } + if err = d.domainReplicator.HandleTransmissionTask( + ctx, + types.DomainOperationUpdate, + intendedDomainState.Info, + intendedDomainState.Config, + replicationConfig, + configVersion, + failoverVersion, + previousFailoverVersion, + isGlobalDomain, + ); err != nil { + return nil, err + } + response := &types.UpdateDomainResponse{ + IsGlobalDomain: isGlobalDomain, + FailoverVersion: failoverVersion, + } + response.DomainInfo, response.Configuration, response.ReplicationConfiguration = d.createResponse(intendedDomainState.Info, intendedDomainState.Config, intendedDomainState.ReplicationConfig) + + d.logger.Info("faiover request succeeded", + tag.WorkflowDomainName(intendedDomainState.Info.Name), + tag.WorkflowDomainID(intendedDomainState.Info.ID), + ) + return response, nil +} + func (d *handlerImpl) updateLocalDomain(ctx context.Context, updateRequest *types.UpdateDomainRequest, currentState *persistence.GetDomainResponse, diff --git a/common/types/shared.go b/common/types/shared.go index 3cbf91f4d21..68102326a1d 100644 --- a/common/types/shared.go +++ b/common/types/shared.go @@ -7943,6 +7943,12 @@ type UpdateDomainRequest struct { FailoverTimeoutInSeconds *int32 `json:"failoverTimeoutInSeconds,omitempty"` } +// IsAFailoverRequest will return true if the update request includes +// an active cluster change or a change to one of the domain's cluster-attribute fields +func (v *UpdateDomainRequest) IsAFailoverRequest() bool { + return v.ActiveClusterName != nil || (v.ActiveClusters != nil && len(v.ActiveClusters.AttributeScopes) > 0) +} + // GetName is an internal getter (TBD...) func (v *UpdateDomainRequest) GetName() (o string) { if v != nil { From eaca259a79300bc06e6067c296e254f8abf03d2e Mon Sep 17 00:00:00 2001 From: David Porter Date: Fri, 31 Oct 2025 13:37:37 -0700 Subject: [PATCH 3/9] Refactor of handler 2 Signed-off-by: David Porter --- common/domain/errors.go | 1 + common/domain/handler.go | 441 +++++++++++++--------------------- common/domain/handler_test.go | 2 +- common/types/shared.go | 16 +- 4 files changed, 186 insertions(+), 274 deletions(-) diff --git a/common/domain/errors.go b/common/domain/errors.go index a732e8c3846..73e3ffcfa5a 100644 --- a/common/domain/errors.go +++ b/common/domain/errors.go @@ -32,6 +32,7 @@ var ( errGracefulFailoverInActiveCluster = &types.BadRequestError{Message: "Cannot start the graceful failover from an active cluster to an active cluster."} errOngoingGracefulFailover = &types.BadRequestError{Message: "Cannot start concurrent graceful failover."} errInvalidGracefulFailover = &types.BadRequestError{Message: "Cannot start graceful failover without updating active cluster or in local domain."} + errInvalidFailoverNoChangeDetected = &types.BadRequestError{Message: "a failover was requested, but there was no change detected, the configuration was not updated"} errActiveClusterNameRequired = &types.BadRequestError{Message: "ActiveClusterName is required for all global domains."} errLocalDomainsCannotFailover = &types.BadRequestError{Message: "Local domains cannot perform failovers or change replication configuration"} diff --git a/common/domain/handler.go b/common/domain/handler.go index 66d5a5988a2..e5b75758577 100644 --- a/common/domain/handler.go +++ b/common/domain/handler.go @@ -427,280 +427,34 @@ func (d *handlerImpl) UpdateDomain( return nil, err } - // todo (david.porter) remove this and push the deepcopy into each of the branches - getResponse := currentDomainState.DeepCopy() - - info := getResponse.Info - config := getResponse.Config - replicationConfig := getResponse.ReplicationConfig - wasActiveActive := replicationConfig.IsActiveActive() - configVersion := getResponse.ConfigVersion - failoverVersion := getResponse.FailoverVersion - failoverNotificationVersion := getResponse.FailoverNotificationVersion - isGlobalDomain := getResponse.IsGlobalDomain - gracefulFailoverEndTime := getResponse.FailoverEndTime - currentActiveCluster := replicationConfig.ActiveClusterName - previousFailoverVersion := getResponse.PreviousFailoverVersion - lastUpdatedTime := time.Unix(0, getResponse.LastUpdatedTime) + isGlobalDomain := currentDomainState.IsGlobalDomain if !isGlobalDomain { - return d.updateLocalDomain(ctx, updateRequest, getResponse, notificationVersion) + return d.updateLocalDomain(ctx, updateRequest, currentDomainState, notificationVersion) } if updateRequest.IsAFailoverRequest() { - return d.handleFailoverRequest(ctx, updateRequest, getResponse, notificationVersion) - } - - // whether history archival config changed - historyArchivalConfigChanged := false - // whether visibility archival config changed - visibilityArchivalConfigChanged := false - // whether active cluster is changed - activeClusterChanged := false - // whether anything other than active cluster is changed - configurationChanged := false - - // Update history archival state - historyArchivalConfigChanged, err = d.updateHistoryArchivalState(config, updateRequest) - if err != nil { - return nil, err - } - - // Update visibility archival state - visibilityArchivalConfigChanged, err = d.updateVisibilityArchivalState(config, updateRequest) - if err != nil { - return nil, err - } - - // Update domain info - info, domainInfoChanged := d.updateDomainInfo( - updateRequest, - info, - ) - - // Update domain config - config, domainConfigChanged, err := d.updateDomainConfiguration( - updateRequest.GetName(), - config, - updateRequest, - ) - if err != nil { - return nil, err - } - - // Update domain bad binary - config, deleteBinaryChanged, err := d.updateDeleteBadBinary( - config, - updateRequest.DeleteBadBinary, - ) - if err != nil { - return nil, err + return d.handleFailoverRequest(ctx, updateRequest, currentDomainState, notificationVersion) } - // Update replication config - replicationConfig, replicationConfigChanged, activeClusterChanged, err := d.updateReplicationConfig( - getResponse.Info.Name, - replicationConfig, - updateRequest, - ) - if err != nil { - return nil, err - } - - // Handle graceful failover request - if updateRequest.FailoverTimeoutInSeconds != nil { - gracefulFailoverEndTime, previousFailoverVersion, err = d.handleGracefulFailover( - updateRequest, - replicationConfig, - currentActiveCluster, - gracefulFailoverEndTime, - failoverVersion, - activeClusterChanged, - isGlobalDomain, - ) - if err != nil { - return nil, err - } - } - - configurationChanged = historyArchivalConfigChanged || visibilityArchivalConfigChanged || domainInfoChanged || domainConfigChanged || deleteBinaryChanged || replicationConfigChanged - - if err = d.domainAttrValidator.validateDomainConfig(config); err != nil { - return nil, err - } - - err = d.validateDomainReplicationConfigForUpdateDomain(replicationConfig, isGlobalDomain, configurationChanged, activeClusterChanged) - if err != nil { - return nil, err - } - - if configurationChanged || activeClusterChanged { - now := d.timeSource.Now() - // Check the failover cool down time - if lastUpdatedTime.Add(d.config.FailoverCoolDown(info.Name)).After(now) { - d.logger.Debugf("Domain was last updated at %v, failoverCoolDown: %v, current time: %v.", lastUpdatedTime, d.config.FailoverCoolDown(info.Name), now) - return nil, errDomainUpdateTooFrequent - } - - // set the versions - if configurationChanged { - configVersion++ - } - - if activeClusterChanged && isGlobalDomain { - var failoverType constants.FailoverType = constants.FailoverTypeGrace - - // Force failover cleans graceful failover state - if updateRequest.FailoverTimeoutInSeconds == nil { - failoverType = constants.FailoverTypeForce - gracefulFailoverEndTime = nil - previousFailoverVersion = constants.InitialPreviousFailoverVersion - } - - // Cases: - // 1. active-passive domain's ActiveClusterName is changed - // 2. active-passive domain is being migrated to active-active - // 3. active-active domain's ActiveClusters is changed - isActiveActive := replicationConfig.IsActiveActive() - - // case 1. active-passive domain's ActiveClusterName is changed - if !wasActiveActive && !isActiveActive { - failoverVersion = d.clusterMetadata.GetNextFailoverVersion( - replicationConfig.ActiveClusterName, - failoverVersion, - updateRequest.Name, - ) - - d.logger.Debug("active-passive domain failover", - tag.WorkflowDomainName(info.Name), - tag.Dynamic("failover-version", failoverVersion), - tag.Dynamic("failover-type", failoverType), - ) - - err = updateFailoverHistoryInDomainData(info, d.config, NewFailoverEvent( - now, - failoverType, - ¤tActiveCluster, - updateRequest.ActiveClusterName, - nil, - nil, - )) - if err != nil { - d.logger.Warn("failed to update failover history", tag.Error(err)) - } - } - - // case 2. active-passive domain is being migrated to active-active - if !wasActiveActive && isActiveActive { - // for active-passive to active-active migration, - // we increment failover version so top level failoverVersion is updated and domain data is replicated. - failoverVersion = d.clusterMetadata.GetNextFailoverVersion( - replicationConfig.ActiveClusterName, - failoverVersion+1, //todo: (active-active): Let's review if we need to increment - // this for cluster-attr failover changes. It may not be necessary to increment - updateRequest.Name, - ) - - d.logger.Debug("active-passive domain is being migrated to active-active", - tag.WorkflowDomainName(info.Name), - tag.Dynamic("failover-version", failoverVersion), - tag.Dynamic("failover-type", failoverType), - ) - - err = updateFailoverHistoryInDomainData(info, d.config, NewFailoverEvent( - now, - failoverType, - ¤tActiveCluster, - updateRequest.ActiveClusterName, - nil, - replicationConfig.ActiveClusters, - )) - if err != nil { - d.logger.Warn("failed to update failover history", tag.Error(err)) - } - } - - // case 3. active-active domain's ActiveClusters is changed - if wasActiveActive && isActiveActive { - failoverVersion = d.clusterMetadata.GetNextFailoverVersion( - replicationConfig.ActiveClusterName, - failoverVersion+1, //todo: (active-active): Let's review if we need to increment - // this for cluster-attr failover changes. It may not be necessary to increment - updateRequest.Name, - ) - - d.logger.Debug("active-active domain failover", - tag.WorkflowDomainName(info.Name), - tag.Dynamic("failover-version", failoverVersion), - tag.Dynamic("failover-type", failoverType), - ) - - err = updateFailoverHistoryInDomainData(info, d.config, NewFailoverEvent( - now, - failoverType, - ¤tActiveCluster, - updateRequest.ActiveClusterName, - currentDomainState.ReplicationConfig.GetActiveClusters(), - replicationConfig.ActiveClusters, - )) - if err != nil { - d.logger.Warn("failed to update failover history", tag.Error(err)) - } - } - - failoverNotificationVersion = notificationVersion - } - - lastUpdatedTime = now - - updateReq := createUpdateRequest( - info, - config, - replicationConfig, - configVersion, - failoverVersion, - failoverNotificationVersion, - gracefulFailoverEndTime, - previousFailoverVersion, - lastUpdatedTime, - notificationVersion, - ) + return d.updateGlobalDomainConfiguration(ctx, updateRequest, currentDomainState, notificationVersion) +} - err = d.domainManager.UpdateDomain(ctx, &updateReq) - if err != nil { - return nil, err - } - } - // todo (david.porter) remove this - all domains at this point are global - // leaving during the refactor just for clarity - if isGlobalDomain { - if err = d.domainReplicator.HandleTransmissionTask( - ctx, - types.DomainOperationUpdate, - info, - config, - replicationConfig, - configVersion, - failoverVersion, - previousFailoverVersion, - isGlobalDomain, - ); err != nil { - return nil, err - } - } - response := &types.UpdateDomainResponse{ - IsGlobalDomain: isGlobalDomain, - FailoverVersion: failoverVersion, +// All domain updates are throttled by the cool down time (incorrecty called 'failover' cool down). +// The guard is an anti-flapping measure. +func (d *handlerImpl) ensureUpdateOrFailoverCooldown(currentDomainState *persistence.GetDomainResponse) error { + lastUpdatedTime := time.Unix(0, currentDomainState.LastUpdatedTime) + now := d.timeSource.Now() + if lastUpdatedTime.Add(d.config.FailoverCoolDown(currentDomainState.Info.Name)).After(now) { + d.logger.Debugf("Domain was last updated at %v, failoverCoolDown: %v, current time: %v.", lastUpdatedTime, d.config.FailoverCoolDown(currentDomainState.Info.Name), now) + return errDomainUpdateTooFrequent } - response.DomainInfo, response.Configuration, response.ReplicationConfiguration = d.createResponse(info, config, replicationConfig) - - d.logger.Info("Update domain succeeded", - tag.WorkflowDomainName(info.Name), - tag.WorkflowDomainID(info.ID), - ) - return response, nil + return nil } +// For global domains only, this is assumed to be invoked when +// the incoming request is either specifying an active-cluster parameter to update +// or active_clusters in the case of a AA domain func (d *handlerImpl) handleFailoverRequest(ctx context.Context, updateRequest *types.UpdateDomainRequest, currentState *persistence.GetDomainResponse, @@ -735,7 +489,12 @@ func (d *handlerImpl) handleFailoverRequest(ctx context.Context, return nil, err } if !activeClusterChanged && !replicationConfigChanged { - return nil, &types.BadRequestError{Message: "a failover was requested, but there was no change detected, the configuration was not updated"} + return nil, errInvalidFailoverNoChangeDetected + } + + err = d.ensureUpdateOrFailoverCooldown(currentState) + if err != nil { + return nil, err } // Handle graceful failover request @@ -924,6 +683,154 @@ func (d *handlerImpl) handleFailoverRequest(ctx context.Context, return response, nil } +// updateGlobalDomainConfiguration handles the update of a global domain configuration +// this excludes failover/active_cluster/active_clusters updates. They are grouped under +// forms of failover +func (d *handlerImpl) updateGlobalDomainConfiguration(ctx context.Context, + updateRequest *types.UpdateDomainRequest, + currentDomainState *persistence.GetDomainResponse, + notificationVersion int64, +) (*types.UpdateDomainResponse, error) { + + // intendedDomainState will be modified + // into the intended shape by the functions here + intendedDomainState := currentDomainState.DeepCopy() + + configVersion := currentDomainState.ConfigVersion + failoverVersion := currentDomainState.FailoverVersion + failoverNotificationVersion := currentDomainState.FailoverNotificationVersion + isGlobalDomain := currentDomainState.IsGlobalDomain + gracefulFailoverEndTime := currentDomainState.FailoverEndTime + previousFailoverVersion := currentDomainState.PreviousFailoverVersion + lastUpdatedTime := time.Unix(0, currentDomainState.LastUpdatedTime) + + now := d.timeSource.Now() + + // whether history archival config changed + historyArchivalConfigChanged := false + // whether visibility archival config changed + visibilityArchivalConfigChanged := false + // whether active cluster is changed + activeClusterChanged := false + // whether anything other than active cluster is changed + configurationChanged := false + + // Update history archival state + historyArchivalConfigChanged, err := d.updateHistoryArchivalState(intendedDomainState.Config, updateRequest) + if err != nil { + return nil, err + } + + // Update visibility archival state + visibilityArchivalConfigChanged, err = d.updateVisibilityArchivalState(intendedDomainState.Config, updateRequest) + if err != nil { + return nil, err + } + + // Update domain info + info, domainInfoChanged := d.updateDomainInfo( + updateRequest, + intendedDomainState.Info, + ) + + // Update domain config + config, domainConfigChanged, err := d.updateDomainConfiguration( + updateRequest.GetName(), + intendedDomainState.Config, + updateRequest, + ) + if err != nil { + return nil, err + } + + // Update domain bad binary + config, deleteBinaryChanged, err := d.updateDeleteBadBinary( + config, + updateRequest.DeleteBadBinary, + ) + if err != nil { + return nil, err + } + + // Update replication config + replicationConfig, replicationConfigChanged, activeClusterChanged, err := d.updateReplicationConfig( + intendedDomainState.Info.Name, + intendedDomainState.ReplicationConfig, + updateRequest, + ) + if err != nil { + return nil, err + } + + configurationChanged = historyArchivalConfigChanged || visibilityArchivalConfigChanged || domainInfoChanged || domainConfigChanged || deleteBinaryChanged || replicationConfigChanged + + if err = d.domainAttrValidator.validateDomainConfig(config); err != nil { + return nil, err + } + + err = d.validateDomainReplicationConfigForUpdateDomain(replicationConfig, isGlobalDomain, configurationChanged, activeClusterChanged) + if err != nil { + return nil, err + } + + err = d.ensureUpdateOrFailoverCooldown(currentDomainState) + if err != nil { + return nil, err + } + + if configurationChanged || activeClusterChanged { + + // set the versions + if configurationChanged { + configVersion++ + } + + lastUpdatedTime = now + + updateReq := createUpdateRequest( + info, + config, + replicationConfig, + configVersion, + failoverVersion, + failoverNotificationVersion, + gracefulFailoverEndTime, + previousFailoverVersion, + lastUpdatedTime, + notificationVersion, + ) + + err = d.domainManager.UpdateDomain(ctx, &updateReq) + if err != nil { + return nil, err + } + } + if err = d.domainReplicator.HandleTransmissionTask( + ctx, + types.DomainOperationUpdate, + info, + config, + replicationConfig, + configVersion, + failoverVersion, + previousFailoverVersion, + isGlobalDomain, + ); err != nil { + return nil, err + } + response := &types.UpdateDomainResponse{ + IsGlobalDomain: isGlobalDomain, + FailoverVersion: failoverVersion, + } + response.DomainInfo, response.Configuration, response.ReplicationConfiguration = d.createResponse(info, config, replicationConfig) + + d.logger.Info("Update domain succeeded", + tag.WorkflowDomainName(info.Name), + tag.WorkflowDomainID(info.ID), + ) + return response, nil +} + func (d *handlerImpl) updateLocalDomain(ctx context.Context, updateRequest *types.UpdateDomainRequest, currentState *persistence.GetDomainResponse, @@ -1000,12 +907,6 @@ func (d *handlerImpl) updateLocalDomain(ctx context.Context, } if configurationChanged { - // Check the failover cool down time - if lastUpdatedTime.Add(d.config.FailoverCoolDown(info.Name)).After(now) { - d.logger.Debugf("Domain was last updated at %v, failoverCoolDown: %v, current time: %v.", lastUpdatedTime, d.config.FailoverCoolDown(info.Name), now) - return nil, errDomainUpdateTooFrequent - } - // set the versions if configurationChanged { configVersion = intendedDomainState.ConfigVersion + 1 diff --git a/common/domain/handler_test.go b/common/domain/handler_test.go index f309cdf4dfa..c726275a53e 100644 --- a/common/domain/handler_test.go +++ b/common/domain/handler_test.go @@ -2433,7 +2433,7 @@ func TestHandler_UpdateDomain(t *testing.T) { Name: constants.TestDomainName, FailoverTimeoutInSeconds: common.Int32Ptr(1), }, - err: errInvalidGracefulFailover, + err: errInvalidFailoverNoChangeDetected, }, { name: "Error case - validateDomainConfig error", diff --git a/common/types/shared.go b/common/types/shared.go index 68102326a1d..5b8c7846199 100644 --- a/common/types/shared.go +++ b/common/types/shared.go @@ -7943,10 +7943,20 @@ type UpdateDomainRequest struct { FailoverTimeoutInSeconds *int32 `json:"failoverTimeoutInSeconds,omitempty"` } -// IsAFailoverRequest will return true if the update request includes -// an active cluster change or a change to one of the domain's cluster-attribute fields +// IsAFailoverRequest identifies if any part of the request is a failover request +// and if so, will return true +// this includes: +// +// - an active cluster change (force failver) +// - any failvoer timeout values (for graceful failover) +// - or a change to one of the domain's cluster-attribute fields (active-active failover) +// +// this is not a validation function +// and doesn't attempt to give valid or coherent combinations func (v *UpdateDomainRequest) IsAFailoverRequest() bool { - return v.ActiveClusterName != nil || (v.ActiveClusters != nil && len(v.ActiveClusters.AttributeScopes) > 0) + return v.ActiveClusterName != nil || + (v.FailoverTimeoutInSeconds != nil && *v.FailoverTimeoutInSeconds > 0) || + (v.ActiveClusters != nil && len(v.ActiveClusters.AttributeScopes) > 0) } // GetName is an internal getter (TBD...) From e7b7cdf6eff720784034d503659558b98c59998c Mon Sep 17 00:00:00 2001 From: David Porter Date: Fri, 31 Oct 2025 14:07:23 -0700 Subject: [PATCH 4/9] expected calls unncessar Signed-off-by: David Porter --- service/frontend/api/handler_test.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/service/frontend/api/handler_test.go b/service/frontend/api/handler_test.go index 02c180e8f1f..c66408f2ca0 100644 --- a/service/frontend/api/handler_test.go +++ b/service/frontend/api/handler_test.go @@ -1574,8 +1574,6 @@ func (s *workflowHandlerSuite) TestUpdateDomain_Success_FailOver() { s.mockMetadataMgr.On("GetDomain", mock.Anything, mock.Anything).Return(getDomainResp, nil) s.mockMetadataMgr.On("UpdateDomain", mock.Anything, mock.Anything).Return(nil) - s.mockArchivalMetadata.On("GetHistoryConfig").Return(archiver.NewArchivalConfig("enabled", dynamicproperties.GetStringPropertyFn("disabled"), false, dynamicproperties.GetBoolPropertyFn(false), "disabled", "some random URI")) - s.mockArchivalMetadata.On("GetVisibilityConfig").Return(archiver.NewArchivalConfig("enabled", dynamicproperties.GetStringPropertyFn("disabled"), false, dynamicproperties.GetBoolPropertyFn(false), "disabled", "some random URI")) s.mockProducer.On("Publish", mock.Anything, mock.Anything).Return(nil).Once() s.mockResource.RemoteFrontendClient.EXPECT().DescribeDomain(gomock.Any(), gomock.Any()). Return(describeDomainResponseServer, nil).AnyTimes() From c1784c8b06d469412114c9cc779213b5d9d2acaf Mon Sep 17 00:00:00 2001 From: David Porter Date: Fri, 31 Oct 2025 14:27:36 -0700 Subject: [PATCH 5/9] A little more cleanup Signed-off-by: David Porter --- common/domain/handler.go | 244 +++++++++++++++++++-------------------- 1 file changed, 122 insertions(+), 122 deletions(-) diff --git a/common/domain/handler.go b/common/domain/handler.go index e5b75758577..3aabb41d1c5 100644 --- a/common/domain/handler.go +++ b/common/domain/handler.go @@ -467,7 +467,6 @@ func (d *handlerImpl) handleFailoverRequest(ctx context.Context, configVersion := currentState.ConfigVersion failoverVersion := currentState.FailoverVersion - failoverNotificationVersion := currentState.FailoverNotificationVersion isGlobalDomain := currentState.IsGlobalDomain gracefulFailoverEndTime := currentState.FailoverEndTime currentActiveCluster := currentState.ReplicationConfig.ActiveClusterName @@ -479,6 +478,9 @@ func (d *handlerImpl) handleFailoverRequest(ctx context.Context, var activeClusterChanged bool var configurationChanged bool + // will be set to the notification version of the domain after the update + var failoverNotificationVersion int64 = -1 + // Update replication config replicationConfig, replicationConfigChanged, activeClusterChanged, err := d.updateReplicationConfig( currentState.Info.Name, @@ -702,7 +704,6 @@ func (d *handlerImpl) updateGlobalDomainConfiguration(ctx context.Context, isGlobalDomain := currentDomainState.IsGlobalDomain gracefulFailoverEndTime := currentDomainState.FailoverEndTime previousFailoverVersion := currentDomainState.PreviousFailoverVersion - lastUpdatedTime := time.Unix(0, currentDomainState.LastUpdatedTime) now := d.timeSource.Now() @@ -785,8 +786,6 @@ func (d *handlerImpl) updateGlobalDomainConfiguration(ctx context.Context, configVersion++ } - lastUpdatedTime = now - updateReq := createUpdateRequest( info, config, @@ -796,7 +795,7 @@ func (d *handlerImpl) updateGlobalDomainConfiguration(ctx context.Context, failoverNotificationVersion, gracefulFailoverEndTime, previousFailoverVersion, - lastUpdatedTime, + now, notificationVersion, ) @@ -967,14 +966,15 @@ func (d *handlerImpl) FailoverDomain( wasActiveActive := replicationConfig.IsActiveActive() configVersion := getResponse.ConfigVersion failoverVersion := getResponse.FailoverVersion - failoverNotificationVersion := getResponse.FailoverNotificationVersion - isGlobalDomain := getResponse.IsGlobalDomain gracefulFailoverEndTime := getResponse.FailoverEndTime currentActiveCluster := replicationConfig.ActiveClusterName currentActiveClusters := replicationConfig.ActiveClusters.DeepCopy() previousFailoverVersion := getResponse.PreviousFailoverVersion lastUpdatedTime := time.Unix(0, getResponse.LastUpdatedTime) + // will be set to the notification version of the domain after the update + var failoverNotificationVersion int64 = -1 + updateRequest := &types.UpdateDomainRequest{ Name: failoverRequest.DomainName, ActiveClusterName: failoverRequest.DomainActiveClusterName, @@ -990,6 +990,10 @@ func (d *handlerImpl) FailoverDomain( return nil, err } + if !activeClusterChanged && !replicationConfigChanged { + return nil, errInvalidFailoverNoChangeDetected + } + // Handle graceful failover request if updateRequest.FailoverTimeoutInSeconds != nil { gracefulFailoverEndTime, previousFailoverVersion, err = d.handleGracefulFailover( @@ -999,14 +1003,14 @@ func (d *handlerImpl) FailoverDomain( gracefulFailoverEndTime, failoverVersion, activeClusterChanged, - isGlobalDomain, + true, ) if err != nil { return nil, err } } - err = d.validateDomainReplicationConfigForUpdateDomain(replicationConfig, isGlobalDomain, replicationConfigChanged, activeClusterChanged) + err = d.validateDomainReplicationConfigForUpdateDomain(replicationConfig, true, replicationConfigChanged, activeClusterChanged) if err != nil { return nil, err } @@ -1023,113 +1027,111 @@ func (d *handlerImpl) FailoverDomain( configVersion++ } - if activeClusterChanged && isGlobalDomain { - var failoverType constants.FailoverType = constants.FailoverTypeGrace + var failoverType constants.FailoverType = constants.FailoverTypeGrace - // Force failover cleans graceful failover state - if updateRequest.FailoverTimeoutInSeconds == nil { - failoverType = constants.FailoverTypeForce - gracefulFailoverEndTime = nil - previousFailoverVersion = constants.InitialPreviousFailoverVersion - } + // Force failover cleans graceful failover state + if updateRequest.FailoverTimeoutInSeconds == nil { + failoverType = constants.FailoverTypeForce + gracefulFailoverEndTime = nil + previousFailoverVersion = constants.InitialPreviousFailoverVersion + } - // Cases: - // 1. active-passive domain's ActiveClusterName is changed - // 2. active-passive domain is being migrated to active-active - // 3. active-active domain's ActiveClusters is changed - isActiveActive := replicationConfig.IsActiveActive() - - // case 1. active-passive domain's ActiveClusterName is changed - if !wasActiveActive && !isActiveActive { - failoverVersion = d.clusterMetadata.GetNextFailoverVersion( - replicationConfig.ActiveClusterName, - failoverVersion, - updateRequest.Name, - ) - - d.logger.Debug("active-passive domain failover", - tag.WorkflowDomainName(info.Name), - tag.Dynamic("failover-version", failoverVersion), - tag.Dynamic("failover-type", failoverType), - ) - - err = updateFailoverHistoryInDomainData(info, d.config, NewFailoverEvent( - now, - failoverType, - ¤tActiveCluster, - updateRequest.ActiveClusterName, - nil, - nil, - )) - if err != nil { - d.logger.Warn("failed to update failover history", tag.Error(err)) - } - } + // Cases: + // 1. active-passive domain's ActiveClusterName is changed + // 2. active-passive domain is being migrated to active-active + // 3. active-active domain's ActiveClusters is changed + isActiveActive := replicationConfig.IsActiveActive() - // case 2. active-passive domain is being migrated to active-active - if !wasActiveActive && isActiveActive { - // for active-passive to active-active migration, - // we increment failover version so top level failoverVersion is updated and domain data is replicated. - - failoverVersion = d.clusterMetadata.GetNextFailoverVersion( - replicationConfig.ActiveClusterName, - failoverVersion+1, //todo: (active-active): Let's review if we need to increment - // this for cluster-attr failover changes. It may not be necessary to increment - updateRequest.Name, - ) - - d.logger.Debug("active-passive domain is being migrated to active-active", - tag.WorkflowDomainName(info.Name), - tag.Dynamic("failover-version", failoverVersion), - tag.Dynamic("failover-type", failoverType), - ) - - err = updateFailoverHistoryInDomainData(info, d.config, NewFailoverEvent( - now, - failoverType, - ¤tActiveCluster, - updateRequest.ActiveClusterName, - nil, - replicationConfig.ActiveClusters, - )) - if err != nil { - d.logger.Warn("failed to update failover history", tag.Error(err)) - } + // case 1. active-passive domain's ActiveClusterName is changed + if !wasActiveActive && !isActiveActive { + failoverVersion = d.clusterMetadata.GetNextFailoverVersion( + replicationConfig.ActiveClusterName, + failoverVersion, + updateRequest.Name, + ) + + d.logger.Debug("active-passive domain failover", + tag.WorkflowDomainName(info.Name), + tag.Dynamic("failover-version", failoverVersion), + tag.Dynamic("failover-type", failoverType), + ) + + err = updateFailoverHistoryInDomainData(info, d.config, NewFailoverEvent( + now, + failoverType, + ¤tActiveCluster, + updateRequest.ActiveClusterName, + nil, + nil, + )) + if err != nil { + d.logger.Warn("failed to update failover history", tag.Error(err)) } + } - // case 3. active-active domain's ActiveClusters is changed - if wasActiveActive && isActiveActive { - // top level failover version is not used for task versions for active-active domains but we still increment it - // to indicate there was a change in replication config - failoverVersion = d.clusterMetadata.GetNextFailoverVersion( - replicationConfig.ActiveClusterName, - failoverVersion+1, //todo: (active-active): Let's review if we need to increment - // this for cluster-attr failover changes. It may not be necessary to increment - updateRequest.Name, - ) - - d.logger.Debug("active-active domain failover", - tag.WorkflowDomainName(info.Name), - tag.Dynamic("failover-version", failoverVersion), - tag.Dynamic("failover-type", failoverType), - ) - - err = updateFailoverHistoryInDomainData(info, d.config, NewFailoverEvent( - now, - failoverType, - ¤tActiveCluster, - nil, - currentActiveClusters, - replicationConfig.ActiveClusters, - )) - if err != nil { - d.logger.Warn("failed to update failover history", tag.Error(err)) - } + // case 2. active-passive domain is being migrated to active-active + if !wasActiveActive && isActiveActive { + // for active-passive to active-active migration, + // we increment failover version so top level failoverVersion is updated and domain data is replicated. + + failoverVersion = d.clusterMetadata.GetNextFailoverVersion( + replicationConfig.ActiveClusterName, + failoverVersion+1, //todo: (active-active): Let's review if we need to increment + // this for cluster-attr failover changes. It may not be necessary to increment + updateRequest.Name, + ) + + d.logger.Debug("active-passive domain is being migrated to active-active", + tag.WorkflowDomainName(info.Name), + tag.Dynamic("failover-version", failoverVersion), + tag.Dynamic("failover-type", failoverType), + ) + + err = updateFailoverHistoryInDomainData(info, d.config, NewFailoverEvent( + now, + failoverType, + ¤tActiveCluster, + updateRequest.ActiveClusterName, + nil, + replicationConfig.ActiveClusters, + )) + if err != nil { + d.logger.Warn("failed to update failover history", tag.Error(err)) } + } - failoverNotificationVersion = notificationVersion + // case 3. active-active domain's ActiveClusters is changed + if wasActiveActive && isActiveActive { + // top level failover version is not used for task versions for active-active domains but we still increment it + // to indicate there was a change in replication config + failoverVersion = d.clusterMetadata.GetNextFailoverVersion( + replicationConfig.ActiveClusterName, + failoverVersion+1, //todo: (active-active): Let's review if we need to increment + // this for cluster-attr failover changes. It may not be necessary to increment + updateRequest.Name, + ) + + d.logger.Debug("active-active domain failover", + tag.WorkflowDomainName(info.Name), + tag.Dynamic("failover-version", failoverVersion), + tag.Dynamic("failover-type", failoverType), + ) + + err = updateFailoverHistoryInDomainData(info, d.config, NewFailoverEvent( + now, + failoverType, + ¤tActiveCluster, + nil, + currentActiveClusters, + replicationConfig.ActiveClusters, + )) + if err != nil { + d.logger.Warn("failed to update failover history", tag.Error(err)) + } } + failoverNotificationVersion = notificationVersion + lastUpdatedTime = now updateReq := createUpdateRequest( @@ -1154,20 +1156,18 @@ func (d *handlerImpl) FailoverDomain( return nil, err } - if isGlobalDomain { - if err = d.domainReplicator.HandleTransmissionTask( - ctx, - types.DomainOperationUpdate, - info, - config, - replicationConfig, - configVersion, - failoverVersion, - previousFailoverVersion, - isGlobalDomain, - ); err != nil { - return nil, err - } + if err = d.domainReplicator.HandleTransmissionTask( + ctx, + types.DomainOperationUpdate, + info, + config, + replicationConfig, + configVersion, + failoverVersion, + previousFailoverVersion, + true, + ); err != nil { + return nil, err } domainInfo, configuration, replicationConfiguration := d.createResponse(info, config, replicationConfig) @@ -1181,7 +1181,7 @@ func (d *handlerImpl) FailoverDomain( Configuration: configuration, ReplicationConfiguration: replicationConfiguration, FailoverVersion: failoverVersion, - IsGlobalDomain: isGlobalDomain, + IsGlobalDomain: true, }, nil } From c4c0ae7d4a45a3ffd9319df86aa8b6a81a76e7a2 Mon Sep 17 00:00:00 2001 From: David Porter Date: Fri, 31 Oct 2025 16:17:10 -0700 Subject: [PATCH 6/9] cleaned up further Signed-off-by: David Porter --- common/constants/constants.go | 2 +- common/domain/handler.go | 66 +++++++++++++---------------------- common/types/shared.go | 1 + 3 files changed, 27 insertions(+), 42 deletions(-) diff --git a/common/constants/constants.go b/common/constants/constants.go index 5e509139cb8..5f4c540955f 100644 --- a/common/constants/constants.go +++ b/common/constants/constants.go @@ -293,7 +293,7 @@ type ( ) const ( - FailoverTypeForce = iota + 1 + FailoverTypeForce FailoverType = iota + 1 FailoverTypeGrace ) diff --git a/common/domain/handler.go b/common/domain/handler.go index 3aabb41d1c5..ef47e6588fe 100644 --- a/common/domain/handler.go +++ b/common/domain/handler.go @@ -470,16 +470,22 @@ func (d *handlerImpl) handleFailoverRequest(ctx context.Context, isGlobalDomain := currentState.IsGlobalDomain gracefulFailoverEndTime := currentState.FailoverEndTime currentActiveCluster := currentState.ReplicationConfig.ActiveClusterName - previousFailoverVersion := currentState.PreviousFailoverVersion - lastUpdatedTime := time.Unix(0, currentState.LastUpdatedTime) wasActiveActive := currentState.ReplicationConfig.IsActiveActive() now := d.timeSource.Now() + failoverType := constants.FailoverTypeForce + var activeClusterChanged bool var configurationChanged bool // will be set to the notification version of the domain after the update - var failoverNotificationVersion int64 = -1 + intendedDomainState.FailoverNotificationVersion = types.UndefinedFailoverVersion + // not used except for graceful failover requests, but specifically set to -1 + // so as to be explicitly undefined + intendedDomainState.PreviousFailoverVersion = constants.InitialPreviousFailoverVersion + + // by default, we assume a force failover and that any preexisting graceful failover state is invalidated + intendedDomainState.FailoverEndTime = nil // Update replication config replicationConfig, replicationConfigChanged, activeClusterChanged, err := d.updateReplicationConfig( @@ -501,7 +507,8 @@ func (d *handlerImpl) handleFailoverRequest(ctx context.Context, // Handle graceful failover request if updateRequest.FailoverTimeoutInSeconds != nil { - gracefulFailoverEndTime, previousFailoverVersion, err = d.handleGracefulFailover( + failoverType = constants.FailoverTypeGrace + gracefulFailoverEndTime, previousFailoverVersion, err := d.handleGracefulFailover( updateRequest, replicationConfig, currentActiveCluster, @@ -513,6 +520,8 @@ func (d *handlerImpl) handleFailoverRequest(ctx context.Context, if err != nil { return nil, err } + intendedDomainState.FailoverEndTime = gracefulFailoverEndTime + intendedDomainState.PreviousFailoverVersion = previousFailoverVersion } // replication config is a subset of config, @@ -527,26 +536,11 @@ func (d *handlerImpl) handleFailoverRequest(ctx context.Context, return nil, err } - // Check the failover cool down time - if lastUpdatedTime.Add(d.config.FailoverCoolDown(intendedDomainState.Info.Name)).After(now) { - d.logger.Debugf("Domain was last updated at %v, failoverCoolDown: %v, current time: %v.", lastUpdatedTime, d.config.FailoverCoolDown(intendedDomainState.Info.Name), now) - return nil, errDomainUpdateTooFrequent - } - // set the versions if configurationChanged { configVersion++ } - var failoverType constants.FailoverType = constants.FailoverTypeGrace - - // Force failover cleans graceful failover state - if updateRequest.FailoverTimeoutInSeconds == nil { - failoverType = constants.FailoverTypeForce - gracefulFailoverEndTime = nil - previousFailoverVersion = constants.InitialPreviousFailoverVersion - } - // Cases: // 1. active-passive domain's ActiveClusterName is changed // 2. active-passive domain is being migrated to active-active @@ -638,9 +632,7 @@ func (d *handlerImpl) handleFailoverRequest(ctx context.Context, } } - failoverNotificationVersion = notificationVersion - - lastUpdatedTime = now + intendedDomainState.FailoverNotificationVersion = notificationVersion updateReq := createUpdateRequest( intendedDomainState.Info, @@ -648,10 +640,10 @@ func (d *handlerImpl) handleFailoverRequest(ctx context.Context, replicationConfig, configVersion, failoverVersion, - failoverNotificationVersion, - gracefulFailoverEndTime, - previousFailoverVersion, - lastUpdatedTime, + intendedDomainState.FailoverNotificationVersion, + intendedDomainState.FailoverEndTime, + intendedDomainState.PreviousFailoverVersion, + now, notificationVersion, ) @@ -667,7 +659,7 @@ func (d *handlerImpl) handleFailoverRequest(ctx context.Context, replicationConfig, configVersion, failoverVersion, - previousFailoverVersion, + intendedDomainState.PreviousFailoverVersion, isGlobalDomain, ); err != nil { return nil, err @@ -700,10 +692,7 @@ func (d *handlerImpl) updateGlobalDomainConfiguration(ctx context.Context, configVersion := currentDomainState.ConfigVersion failoverVersion := currentDomainState.FailoverVersion - failoverNotificationVersion := currentDomainState.FailoverNotificationVersion isGlobalDomain := currentDomainState.IsGlobalDomain - gracefulFailoverEndTime := currentDomainState.FailoverEndTime - previousFailoverVersion := currentDomainState.PreviousFailoverVersion now := d.timeSource.Now() @@ -792,9 +781,9 @@ func (d *handlerImpl) updateGlobalDomainConfiguration(ctx context.Context, replicationConfig, configVersion, failoverVersion, - failoverNotificationVersion, - gracefulFailoverEndTime, - previousFailoverVersion, + currentDomainState.FailoverNotificationVersion, + intendedDomainState.FailoverEndTime, + intendedDomainState.PreviousFailoverVersion, now, notificationVersion, ) @@ -812,7 +801,7 @@ func (d *handlerImpl) updateGlobalDomainConfiguration(ctx context.Context, replicationConfig, configVersion, failoverVersion, - previousFailoverVersion, + intendedDomainState.PreviousFailoverVersion, isGlobalDomain, ); err != nil { return nil, err @@ -854,8 +843,6 @@ func (d *handlerImpl) updateLocalDomain(ctx context.Context, now := d.timeSource.Now() - lastUpdatedTime := time.Unix(0, currentState.LastUpdatedTime) - // Update history archival state historyArchivalConfigChanged, err = d.updateHistoryArchivalState(intendedDomainState.Config, updateRequest) if err != nil { @@ -911,8 +898,6 @@ func (d *handlerImpl) updateLocalDomain(ctx context.Context, configVersion = intendedDomainState.ConfigVersion + 1 } - lastUpdatedTime = now - updateReq := createUpdateRequest( info, config, @@ -922,7 +907,7 @@ func (d *handlerImpl) updateLocalDomain(ctx context.Context, intendedDomainState.FailoverNotificationVersion, intendedDomainState.FailoverEndTime, intendedDomainState.PreviousFailoverVersion, - lastUpdatedTime, + now, notificationVersion, ) @@ -1882,10 +1867,9 @@ func (d *handlerImpl) handleGracefulFailover( return nil, 0, errOngoingGracefulFailover } endTime := d.timeSource.Now().Add(time.Duration(updateRequest.GetFailoverTimeoutInSeconds()) * time.Second).UnixNano() - gracefulFailoverEndTime = &endTime previousFailoverVersion := failoverVersion - return gracefulFailoverEndTime, previousFailoverVersion, nil + return &endTime, previousFailoverVersion, nil } func (d *handlerImpl) validateDomainReplicationConfigForUpdateDomain( diff --git a/common/types/shared.go b/common/types/shared.go index edadec9fd25..f40cc437da0 100644 --- a/common/types/shared.go +++ b/common/types/shared.go @@ -2543,6 +2543,7 @@ func (e *ClusterAttributeNotFoundError) Error() string { // Failover versions are valid for Int64 >= 0 // and, but implication, 0 is a valid failover version. To distinguish between it and an undefined // failover version, we use a negative value. +// todo (david.porter) move this to constants package and give it a type const UndefinedFailoverVersion = int64(-1) // GetFailoverVersionForAttribute returns the failover version for a given attribute. From 428fc5f190eb3d9ebb69994c0045c95f9b1e3bc3 Mon Sep 17 00:00:00 2001 From: David Porter Date: Fri, 31 Oct 2025 16:43:24 -0700 Subject: [PATCH 7/9] Fixed up a bit further Signed-off-by: David Porter --- common/domain/handler.go | 123 ++++++++++++++++++---------------- common/domain/handler_test.go | 48 +------------ 2 files changed, 68 insertions(+), 103 deletions(-) diff --git a/common/domain/handler.go b/common/domain/handler.go index ef47e6588fe..ab7fbc6a920 100644 --- a/common/domain/handler.go +++ b/common/domain/handler.go @@ -465,30 +465,32 @@ func (d *handlerImpl) handleFailoverRequest(ctx context.Context, // into the intended shape by the functions here intendedDomainState := currentState.DeepCopy() - configVersion := currentState.ConfigVersion - failoverVersion := currentState.FailoverVersion isGlobalDomain := currentState.IsGlobalDomain - gracefulFailoverEndTime := currentState.FailoverEndTime + currentActiveCluster := currentState.ReplicationConfig.ActiveClusterName wasActiveActive := currentState.ReplicationConfig.IsActiveActive() now := d.timeSource.Now() + // by default, we assume failovers are of type force failoverType := constants.FailoverTypeForce var activeClusterChanged bool var configurationChanged bool - // will be set to the notification version of the domain after the update + // will be set to the domain notification version after the update intendedDomainState.FailoverNotificationVersion = types.UndefinedFailoverVersion // not used except for graceful failover requests, but specifically set to -1 // so as to be explicitly undefined intendedDomainState.PreviousFailoverVersion = constants.InitialPreviousFailoverVersion // by default, we assume a force failover and that any preexisting graceful failover state is invalidated + // if there's a duration of failover time to occur (such as in graceful failover) this will be re-set. + // But if there's an existing graceful failover and a subsequent force + // we want to ensure that it'll be ended immmediately. intendedDomainState.FailoverEndTime = nil // Update replication config - replicationConfig, replicationConfigChanged, activeClusterChanged, err := d.updateReplicationConfig( + replicationCfg, replicationConfigChanged, activeClusterChanged, err := d.updateReplicationConfig( currentState.Info.Name, intendedDomainState.ReplicationConfig, updateRequest, @@ -499,27 +501,29 @@ func (d *handlerImpl) handleFailoverRequest(ctx context.Context, if !activeClusterChanged && !replicationConfigChanged { return nil, errInvalidFailoverNoChangeDetected } + intendedDomainState.ReplicationConfig = replicationCfg err = d.ensureUpdateOrFailoverCooldown(currentState) if err != nil { return nil, err } - // Handle graceful failover request + // if the failover 'graceful' - as indicated as having a FailoverTimeoutInSeconds, + // then we set some additional parameters for the graceful failover if updateRequest.FailoverTimeoutInSeconds != nil { - failoverType = constants.FailoverTypeGrace gracefulFailoverEndTime, previousFailoverVersion, err := d.handleGracefulFailover( updateRequest, - replicationConfig, + intendedDomainState.ReplicationConfig, currentActiveCluster, - gracefulFailoverEndTime, - failoverVersion, + currentState.FailoverEndTime, + currentState.FailoverVersion, activeClusterChanged, isGlobalDomain, ) if err != nil { return nil, err } + failoverType = constants.FailoverTypeGrace intendedDomainState.FailoverEndTime = gracefulFailoverEndTime intendedDomainState.PreviousFailoverVersion = previousFailoverVersion } @@ -531,33 +535,34 @@ func (d *handlerImpl) handleFailoverRequest(ctx context.Context, return nil, err } - err = d.validateDomainReplicationConfigForUpdateDomain(replicationConfig, isGlobalDomain, configurationChanged, activeClusterChanged) + err = d.validateDomainReplicationConfigForFailover(intendedDomainState.ReplicationConfig, configurationChanged, activeClusterChanged) if err != nil { return nil, err } - // set the versions + // increment the in the configuration fencing token to ensure that configurations + // are applied in order if configurationChanged { - configVersion++ + intendedDomainState.ConfigVersion++ } // Cases: // 1. active-passive domain's ActiveClusterName is changed // 2. active-passive domain is being migrated to active-active // 3. active-active domain's ActiveClusters is changed - isActiveActive := replicationConfig.IsActiveActive() + isActiveActive := intendedDomainState.ReplicationConfig.IsActiveActive() // case 1. active-passive domain's ActiveClusterName is changed if !wasActiveActive && !isActiveActive { - failoverVersion = d.clusterMetadata.GetNextFailoverVersion( - replicationConfig.ActiveClusterName, - failoverVersion, + intendedDomainState.FailoverVersion = d.clusterMetadata.GetNextFailoverVersion( + intendedDomainState.ReplicationConfig.ActiveClusterName, + currentState.FailoverVersion, updateRequest.Name, ) d.logger.Debug("active-passive domain failover", tag.WorkflowDomainName(intendedDomainState.Info.Name), - tag.Dynamic("failover-version", failoverVersion), + tag.Dynamic("failover-version", intendedDomainState.FailoverVersion), tag.Dynamic("failover-type", failoverType), ) @@ -578,16 +583,16 @@ func (d *handlerImpl) handleFailoverRequest(ctx context.Context, if !wasActiveActive && isActiveActive { // for active-passive to active-active migration, // we increment failover version so top level failoverVersion is updated and domain data is replicated. - failoverVersion = d.clusterMetadata.GetNextFailoverVersion( - replicationConfig.ActiveClusterName, - failoverVersion+1, //todo: (active-active): Let's review if we need to increment + intendedDomainState.FailoverVersion = d.clusterMetadata.GetNextFailoverVersion( + intendedDomainState.ReplicationConfig.ActiveClusterName, + currentState.FailoverVersion+1, //todo: (active-active): Let's review if we need to increment // this for cluster-attr failover changes. It may not be necessary to increment updateRequest.Name, ) d.logger.Debug("active-passive domain is being migrated to active-active", tag.WorkflowDomainName(intendedDomainState.Info.Name), - tag.Dynamic("failover-version", failoverVersion), + tag.Dynamic("failover-version", intendedDomainState.FailoverVersion), tag.Dynamic("failover-type", failoverType), ) @@ -597,7 +602,7 @@ func (d *handlerImpl) handleFailoverRequest(ctx context.Context, ¤tActiveCluster, updateRequest.ActiveClusterName, nil, - replicationConfig.ActiveClusters, + intendedDomainState.ReplicationConfig.ActiveClusters, )) if err != nil { d.logger.Warn("failed to update failover history", tag.Error(err)) @@ -606,16 +611,16 @@ func (d *handlerImpl) handleFailoverRequest(ctx context.Context, // case 3. active-active domain's ActiveClusters is changed if wasActiveActive && isActiveActive { - failoverVersion = d.clusterMetadata.GetNextFailoverVersion( - replicationConfig.ActiveClusterName, - failoverVersion+1, //todo: (active-active): Let's review if we need to increment + intendedDomainState.FailoverVersion = d.clusterMetadata.GetNextFailoverVersion( + intendedDomainState.ReplicationConfig.ActiveClusterName, + currentState.FailoverVersion+1, //todo: (active-active): Let's review if we need to increment // this for cluster-attr failover changes. It may not be necessary to increment updateRequest.Name, ) d.logger.Debug("active-active domain failover", tag.WorkflowDomainName(intendedDomainState.Info.Name), - tag.Dynamic("failover-version", failoverVersion), + tag.Dynamic("failover-version", intendedDomainState.FailoverVersion), tag.Dynamic("failover-type", failoverType), ) @@ -632,14 +637,17 @@ func (d *handlerImpl) handleFailoverRequest(ctx context.Context, } } + // this is an intended failover step, to capture the current global domain configuration + // (represented by notification version) and set it when running failover to allow + // systems like graceful failover to dedup failover processes and callbacks intendedDomainState.FailoverNotificationVersion = notificationVersion updateReq := createUpdateRequest( intendedDomainState.Info, intendedDomainState.Config, - replicationConfig, - configVersion, - failoverVersion, + intendedDomainState.ReplicationConfig, + intendedDomainState.ConfigVersion, + intendedDomainState.FailoverVersion, intendedDomainState.FailoverNotificationVersion, intendedDomainState.FailoverEndTime, intendedDomainState.PreviousFailoverVersion, @@ -656,9 +664,9 @@ func (d *handlerImpl) handleFailoverRequest(ctx context.Context, types.DomainOperationUpdate, intendedDomainState.Info, intendedDomainState.Config, - replicationConfig, - configVersion, - failoverVersion, + intendedDomainState.ReplicationConfig, + intendedDomainState.ConfigVersion, + intendedDomainState.FailoverVersion, intendedDomainState.PreviousFailoverVersion, isGlobalDomain, ); err != nil { @@ -666,7 +674,7 @@ func (d *handlerImpl) handleFailoverRequest(ctx context.Context, } response := &types.UpdateDomainResponse{ IsGlobalDomain: isGlobalDomain, - FailoverVersion: failoverVersion, + FailoverVersion: intendedDomainState.FailoverVersion, } response.DomainInfo, response.Configuration, response.ReplicationConfiguration = d.createResponse(intendedDomainState.Info, intendedDomainState.Config, intendedDomainState.ReplicationConfig) @@ -758,7 +766,7 @@ func (d *handlerImpl) updateGlobalDomainConfiguration(ctx context.Context, return nil, err } - err = d.validateDomainReplicationConfigForUpdateDomain(replicationConfig, isGlobalDomain, configurationChanged, activeClusterChanged) + err = d.validateGlobalDomainReplicationConfigForUpdateDomain(replicationConfig, configurationChanged, activeClusterChanged) if err != nil { return nil, err } @@ -995,7 +1003,7 @@ func (d *handlerImpl) FailoverDomain( } } - err = d.validateDomainReplicationConfigForUpdateDomain(replicationConfig, true, replicationConfigChanged, activeClusterChanged) + err = d.validateDomainReplicationConfigForFailover(replicationConfig, replicationConfigChanged, activeClusterChanged) if err != nil { return nil, err } @@ -1872,38 +1880,39 @@ func (d *handlerImpl) handleGracefulFailover( return &endTime, previousFailoverVersion, nil } -func (d *handlerImpl) validateDomainReplicationConfigForUpdateDomain( +func (d *handlerImpl) validateGlobalDomainReplicationConfigForUpdateDomain( replicationConfig *persistence.DomainReplicationConfig, - isGlobalDomain bool, configurationChanged bool, activeClusterChanged bool, ) error { var err error - if isGlobalDomain { - if err = d.domainAttrValidator.validateDomainReplicationConfigForGlobalDomain( - replicationConfig, - ); err != nil { - return err - } - - if configurationChanged && activeClusterChanged && !replicationConfig.IsActiveActive() { - return errCannotDoDomainFailoverAndUpdate - } + if err = d.domainAttrValidator.validateDomainReplicationConfigForGlobalDomain( + replicationConfig, + ); err != nil { + return err + } - if !activeClusterChanged && !d.clusterMetadata.IsPrimaryCluster() { - return errNotPrimaryCluster - } - } else { - if err = d.domainAttrValidator.validateDomainReplicationConfigForLocalDomain( - replicationConfig, - ); err != nil { - return err - } + if configurationChanged && activeClusterChanged && !replicationConfig.IsActiveActive() { + return errCannotDoDomainFailoverAndUpdate } + if !activeClusterChanged && !d.clusterMetadata.IsPrimaryCluster() { + return errNotPrimaryCluster + } return nil } +// validateDomainReplicationConfigForFailover is to check if the replication config +// is valid and sane for failovers. It is only for global domains +func (d *handlerImpl) validateDomainReplicationConfigForFailover( + replicationConfig *persistence.DomainReplicationConfig, + configurationChanged bool, + activeClusterChanged bool, +) error { + // todo (add any additional failover validation here) + return d.validateGlobalDomainReplicationConfigForUpdateDomain(replicationConfig, configurationChanged, activeClusterChanged) +} + func (d *handlerImpl) activeClustersFromRegisterRequest(registerRequest *types.RegisterDomainRequest) (*types.ActiveClusters, error) { if !registerRequest.GetIsGlobalDomain() || registerRequest.ActiveClusters == nil { // local or active-passive domain diff --git a/common/domain/handler_test.go b/common/domain/handler_test.go index c726275a53e..1dfb0834cc5 100644 --- a/common/domain/handler_test.go +++ b/common/domain/handler_test.go @@ -4350,7 +4350,7 @@ func TestActiveClustersFromRegisterRequest(t *testing.T) { } } -func TestValidateDomainReplicationConfigForUpdateDomain(t *testing.T) { +func TestValidateDomainReplicationConfigForFailover(t *testing.T) { tests := []struct { name string replicationConfig *persistence.DomainReplicationConfig @@ -4360,49 +4360,6 @@ func TestValidateDomainReplicationConfigForUpdateDomain(t *testing.T) { isPrimaryCluster bool expectedErr error }{ - { - name: "local domain with valid config", - replicationConfig: &persistence.DomainReplicationConfig{ - ActiveClusterName: cluster.TestCurrentClusterName, - Clusters: []*persistence.ClusterReplicationConfig{ - {ClusterName: cluster.TestCurrentClusterName}, - }, - }, - isGlobalDomain: false, - configurationChanged: false, - activeClusterChanged: false, - isPrimaryCluster: true, - expectedErr: nil, - }, - { - name: "local domain with invalid active cluster", - replicationConfig: &persistence.DomainReplicationConfig{ - ActiveClusterName: cluster.TestAlternativeClusterName, - Clusters: []*persistence.ClusterReplicationConfig{ - {ClusterName: cluster.TestAlternativeClusterName}, - }, - }, - isGlobalDomain: false, - configurationChanged: false, - activeClusterChanged: false, - isPrimaryCluster: true, - expectedErr: &types.BadRequestError{}, - }, - { - name: "local domain with invalid cluster configuration", - replicationConfig: &persistence.DomainReplicationConfig{ - ActiveClusterName: cluster.TestCurrentClusterName, - Clusters: []*persistence.ClusterReplicationConfig{ - {ClusterName: cluster.TestCurrentClusterName}, - {ClusterName: cluster.TestAlternativeClusterName}, - }, - }, - isGlobalDomain: false, - configurationChanged: false, - activeClusterChanged: false, - isPrimaryCluster: true, - expectedErr: &types.BadRequestError{}, - }, { name: "global domain with valid config on primary cluster - no changes", replicationConfig: &persistence.DomainReplicationConfig{ @@ -4565,9 +4522,8 @@ func TestValidateDomainReplicationConfigForUpdateDomain(t *testing.T) { logger: log.NewNoop(), } - err := handler.validateDomainReplicationConfigForUpdateDomain( + err := handler.validateDomainReplicationConfigForFailover( tc.replicationConfig, - tc.isGlobalDomain, tc.configurationChanged, tc.activeClusterChanged, ) From 1166834aebb621b32f56b4b473f152f10ec4dcd7 Mon Sep 17 00:00:00 2001 From: David Porter Date: Fri, 31 Oct 2025 16:51:04 -0700 Subject: [PATCH 8/9] reverting some changes to the failvoer endpoint for this PR Signed-off-by: David Porter --- common/domain/handler.go | 237 ++++++++++++++++++++------------------- 1 file changed, 119 insertions(+), 118 deletions(-) diff --git a/common/domain/handler.go b/common/domain/handler.go index ab7fbc6a920..41b6edddd99 100644 --- a/common/domain/handler.go +++ b/common/domain/handler.go @@ -827,6 +827,7 @@ func (d *handlerImpl) updateGlobalDomainConfiguration(ctx context.Context, return response, nil } + func (d *handlerImpl) updateLocalDomain(ctx context.Context, updateRequest *types.UpdateDomainRequest, currentState *persistence.GetDomainResponse, @@ -959,15 +960,14 @@ func (d *handlerImpl) FailoverDomain( wasActiveActive := replicationConfig.IsActiveActive() configVersion := getResponse.ConfigVersion failoverVersion := getResponse.FailoverVersion + failoverNotificationVersion := getResponse.FailoverNotificationVersion + isGlobalDomain := getResponse.IsGlobalDomain gracefulFailoverEndTime := getResponse.FailoverEndTime currentActiveCluster := replicationConfig.ActiveClusterName currentActiveClusters := replicationConfig.ActiveClusters.DeepCopy() previousFailoverVersion := getResponse.PreviousFailoverVersion lastUpdatedTime := time.Unix(0, getResponse.LastUpdatedTime) - // will be set to the notification version of the domain after the update - var failoverNotificationVersion int64 = -1 - updateRequest := &types.UpdateDomainRequest{ Name: failoverRequest.DomainName, ActiveClusterName: failoverRequest.DomainActiveClusterName, @@ -983,10 +983,6 @@ func (d *handlerImpl) FailoverDomain( return nil, err } - if !activeClusterChanged && !replicationConfigChanged { - return nil, errInvalidFailoverNoChangeDetected - } - // Handle graceful failover request if updateRequest.FailoverTimeoutInSeconds != nil { gracefulFailoverEndTime, previousFailoverVersion, err = d.handleGracefulFailover( @@ -996,14 +992,14 @@ func (d *handlerImpl) FailoverDomain( gracefulFailoverEndTime, failoverVersion, activeClusterChanged, - true, + isGlobalDomain, ) if err != nil { return nil, err } } - err = d.validateDomainReplicationConfigForFailover(replicationConfig, replicationConfigChanged, activeClusterChanged) + err = d.validateGlobalDomainReplicationConfigForUpdateDomain(replicationConfig, replicationConfigChanged, activeClusterChanged) if err != nil { return nil, err } @@ -1020,110 +1016,112 @@ func (d *handlerImpl) FailoverDomain( configVersion++ } - var failoverType constants.FailoverType = constants.FailoverTypeGrace - - // Force failover cleans graceful failover state - if updateRequest.FailoverTimeoutInSeconds == nil { - failoverType = constants.FailoverTypeForce - gracefulFailoverEndTime = nil - previousFailoverVersion = constants.InitialPreviousFailoverVersion - } + if activeClusterChanged && isGlobalDomain { + var failoverType constants.FailoverType = constants.FailoverTypeGrace - // Cases: - // 1. active-passive domain's ActiveClusterName is changed - // 2. active-passive domain is being migrated to active-active - // 3. active-active domain's ActiveClusters is changed - isActiveActive := replicationConfig.IsActiveActive() - - // case 1. active-passive domain's ActiveClusterName is changed - if !wasActiveActive && !isActiveActive { - failoverVersion = d.clusterMetadata.GetNextFailoverVersion( - replicationConfig.ActiveClusterName, - failoverVersion, - updateRequest.Name, - ) - - d.logger.Debug("active-passive domain failover", - tag.WorkflowDomainName(info.Name), - tag.Dynamic("failover-version", failoverVersion), - tag.Dynamic("failover-type", failoverType), - ) - - err = updateFailoverHistoryInDomainData(info, d.config, NewFailoverEvent( - now, - failoverType, - ¤tActiveCluster, - updateRequest.ActiveClusterName, - nil, - nil, - )) - if err != nil { - d.logger.Warn("failed to update failover history", tag.Error(err)) + // Force failover cleans graceful failover state + if updateRequest.FailoverTimeoutInSeconds == nil { + failoverType = constants.FailoverTypeForce + gracefulFailoverEndTime = nil + previousFailoverVersion = constants.InitialPreviousFailoverVersion } - } - - // case 2. active-passive domain is being migrated to active-active - if !wasActiveActive && isActiveActive { - // for active-passive to active-active migration, - // we increment failover version so top level failoverVersion is updated and domain data is replicated. - failoverVersion = d.clusterMetadata.GetNextFailoverVersion( - replicationConfig.ActiveClusterName, - failoverVersion+1, //todo: (active-active): Let's review if we need to increment - // this for cluster-attr failover changes. It may not be necessary to increment - updateRequest.Name, - ) - - d.logger.Debug("active-passive domain is being migrated to active-active", - tag.WorkflowDomainName(info.Name), - tag.Dynamic("failover-version", failoverVersion), - tag.Dynamic("failover-type", failoverType), - ) - - err = updateFailoverHistoryInDomainData(info, d.config, NewFailoverEvent( - now, - failoverType, - ¤tActiveCluster, - updateRequest.ActiveClusterName, - nil, - replicationConfig.ActiveClusters, - )) - if err != nil { - d.logger.Warn("failed to update failover history", tag.Error(err)) + // Cases: + // 1. active-passive domain's ActiveClusterName is changed + // 2. active-passive domain is being migrated to active-active + // 3. active-active domain's ActiveClusters is changed + isActiveActive := replicationConfig.IsActiveActive() + + // case 1. active-passive domain's ActiveClusterName is changed + if !wasActiveActive && !isActiveActive { + failoverVersion = d.clusterMetadata.GetNextFailoverVersion( + replicationConfig.ActiveClusterName, + failoverVersion, + updateRequest.Name, + ) + + d.logger.Debug("active-passive domain failover", + tag.WorkflowDomainName(info.Name), + tag.Dynamic("failover-version", failoverVersion), + tag.Dynamic("failover-type", failoverType), + ) + + err = updateFailoverHistoryInDomainData(info, d.config, NewFailoverEvent( + now, + failoverType, + ¤tActiveCluster, + updateRequest.ActiveClusterName, + nil, + nil, + )) + if err != nil { + d.logger.Warn("failed to update failover history", tag.Error(err)) + } } - } - // case 3. active-active domain's ActiveClusters is changed - if wasActiveActive && isActiveActive { - // top level failover version is not used for task versions for active-active domains but we still increment it - // to indicate there was a change in replication config - failoverVersion = d.clusterMetadata.GetNextFailoverVersion( - replicationConfig.ActiveClusterName, - failoverVersion+1, //todo: (active-active): Let's review if we need to increment - // this for cluster-attr failover changes. It may not be necessary to increment - updateRequest.Name, - ) - - d.logger.Debug("active-active domain failover", - tag.WorkflowDomainName(info.Name), - tag.Dynamic("failover-version", failoverVersion), - tag.Dynamic("failover-type", failoverType), - ) + // case 2. active-passive domain is being migrated to active-active + if !wasActiveActive && isActiveActive { + // for active-passive to active-active migration, + // we increment failover version so top level failoverVersion is updated and domain data is replicated. + + failoverVersion = d.clusterMetadata.GetNextFailoverVersion( + replicationConfig.ActiveClusterName, + failoverVersion+1, //todo: (active-active): Let's review if we need to increment + // this for cluster-attr failover changes. It may not be necessary to increment + updateRequest.Name, + ) + + d.logger.Debug("active-passive domain is being migrated to active-active", + tag.WorkflowDomainName(info.Name), + tag.Dynamic("failover-version", failoverVersion), + tag.Dynamic("failover-type", failoverType), + ) + + err = updateFailoverHistoryInDomainData(info, d.config, NewFailoverEvent( + now, + failoverType, + ¤tActiveCluster, + updateRequest.ActiveClusterName, + nil, + replicationConfig.ActiveClusters, + )) + if err != nil { + d.logger.Warn("failed to update failover history", tag.Error(err)) + } + } - err = updateFailoverHistoryInDomainData(info, d.config, NewFailoverEvent( - now, - failoverType, - ¤tActiveCluster, - nil, - currentActiveClusters, - replicationConfig.ActiveClusters, - )) - if err != nil { - d.logger.Warn("failed to update failover history", tag.Error(err)) + // case 3. active-active domain's ActiveClusters is changed + if wasActiveActive && isActiveActive { + // top level failover version is not used for task versions for active-active domains but we still increment it + // to indicate there was a change in replication config + failoverVersion = d.clusterMetadata.GetNextFailoverVersion( + replicationConfig.ActiveClusterName, + failoverVersion+1, //todo: (active-active): Let's review if we need to increment + // this for cluster-attr failover changes. It may not be necessary to increment + updateRequest.Name, + ) + + d.logger.Debug("active-active domain failover", + tag.WorkflowDomainName(info.Name), + tag.Dynamic("failover-version", failoverVersion), + tag.Dynamic("failover-type", failoverType), + ) + + err = updateFailoverHistoryInDomainData(info, d.config, NewFailoverEvent( + now, + failoverType, + ¤tActiveCluster, + nil, + currentActiveClusters, + replicationConfig.ActiveClusters, + )) + if err != nil { + d.logger.Warn("failed to update failover history", tag.Error(err)) + } } - } - failoverNotificationVersion = notificationVersion + failoverNotificationVersion = notificationVersion + } lastUpdatedTime = now @@ -1149,18 +1147,20 @@ func (d *handlerImpl) FailoverDomain( return nil, err } - if err = d.domainReplicator.HandleTransmissionTask( - ctx, - types.DomainOperationUpdate, - info, - config, - replicationConfig, - configVersion, - failoverVersion, - previousFailoverVersion, - true, - ); err != nil { - return nil, err + if isGlobalDomain { + if err = d.domainReplicator.HandleTransmissionTask( + ctx, + types.DomainOperationUpdate, + info, + config, + replicationConfig, + configVersion, + failoverVersion, + previousFailoverVersion, + isGlobalDomain, + ); err != nil { + return nil, err + } } domainInfo, configuration, replicationConfiguration := d.createResponse(info, config, replicationConfig) @@ -1174,10 +1174,11 @@ func (d *handlerImpl) FailoverDomain( Configuration: configuration, ReplicationConfiguration: replicationConfiguration, FailoverVersion: failoverVersion, - IsGlobalDomain: true, + IsGlobalDomain: isGlobalDomain, }, nil } + // DeleteDomain deletes a domain func (d *handlerImpl) DeleteDomain( ctx context.Context, From 2924260ef23faf783e9f77c6fa1641155e880a19 Mon Sep 17 00:00:00 2001 From: David Porter Date: Fri, 31 Oct 2025 17:01:45 -0700 Subject: [PATCH 9/9] lint Signed-off-by: David Porter --- common/domain/handler.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/common/domain/handler.go b/common/domain/handler.go index 41b6edddd99..d47b845ceb7 100644 --- a/common/domain/handler.go +++ b/common/domain/handler.go @@ -827,7 +827,6 @@ func (d *handlerImpl) updateGlobalDomainConfiguration(ctx context.Context, return response, nil } - func (d *handlerImpl) updateLocalDomain(ctx context.Context, updateRequest *types.UpdateDomainRequest, currentState *persistence.GetDomainResponse, @@ -1178,7 +1177,6 @@ func (d *handlerImpl) FailoverDomain( }, nil } - // DeleteDomain deletes a domain func (d *handlerImpl) DeleteDomain( ctx context.Context,