Skip to content

Commit bb7ff49

Browse files
unify failover and updateDomain failover paths
Signed-off-by: David Porter <[email protected]>
1 parent 2924260 commit bb7ff49

File tree

4 files changed

+41
-228
lines changed

4 files changed

+41
-228
lines changed

common/domain/handler.go

Lines changed: 9 additions & 224 deletions
Original file line numberDiff line numberDiff line change
@@ -939,242 +939,27 @@ func (d *handlerImpl) FailoverDomain(
939939
failoverRequest *types.FailoverDomainRequest,
940940
) (*types.FailoverDomainResponse, error) {
941941

942-
// must get the metadata (notificationVersion) first
943-
// this version can be regarded as the lock on the v2 domain table
944-
// and since we do not know which table will return the domain afterwards
945-
// this call has to be made
946-
metadata, err := d.domainManager.GetMetadata(ctx)
947-
if err != nil {
948-
return nil, err
949-
}
950-
notificationVersion := metadata.NotificationVersion
951-
getResponse, err := d.domainManager.GetDomain(ctx, &persistence.GetDomainRequest{Name: failoverRequest.GetDomainName()})
952-
if err != nil {
953-
return nil, err
954-
}
955-
956-
info := getResponse.Info
957-
config := getResponse.Config
958-
replicationConfig := getResponse.ReplicationConfig
959-
wasActiveActive := replicationConfig.IsActiveActive()
960-
configVersion := getResponse.ConfigVersion
961-
failoverVersion := getResponse.FailoverVersion
962-
failoverNotificationVersion := getResponse.FailoverNotificationVersion
963-
isGlobalDomain := getResponse.IsGlobalDomain
964-
gracefulFailoverEndTime := getResponse.FailoverEndTime
965-
currentActiveCluster := replicationConfig.ActiveClusterName
966-
currentActiveClusters := replicationConfig.ActiveClusters.DeepCopy()
967-
previousFailoverVersion := getResponse.PreviousFailoverVersion
968-
lastUpdatedTime := time.Unix(0, getResponse.LastUpdatedTime)
969-
970-
updateRequest := &types.UpdateDomainRequest{
971-
Name: failoverRequest.DomainName,
972-
ActiveClusterName: failoverRequest.DomainActiveClusterName,
973-
}
974-
975-
// Update replication config
976-
replicationConfig, replicationConfigChanged, activeClusterChanged, err := d.updateReplicationConfig(
977-
getResponse.Info.Name,
978-
replicationConfig,
979-
updateRequest,
980-
)
942+
currentDomainState, err := d.domainManager.GetDomain(ctx, &persistence.GetDomainRequest{Name: failoverRequest.GetDomainName()})
981943
if err != nil {
982944
return nil, err
983945
}
984946

985-
// Handle graceful failover request
986-
if updateRequest.FailoverTimeoutInSeconds != nil {
987-
gracefulFailoverEndTime, previousFailoverVersion, err = d.handleGracefulFailover(
988-
updateRequest,
989-
replicationConfig,
990-
currentActiveCluster,
991-
gracefulFailoverEndTime,
992-
failoverVersion,
993-
activeClusterChanged,
994-
isGlobalDomain,
995-
)
996-
if err != nil {
997-
return nil, err
998-
}
999-
}
1000-
1001-
err = d.validateGlobalDomainReplicationConfigForUpdateDomain(replicationConfig, replicationConfigChanged, activeClusterChanged)
1002-
if err != nil {
1003-
return nil, err
1004-
}
1005-
1006-
now := d.timeSource.Now()
1007-
// Check the failover cool down time
1008-
if lastUpdatedTime.Add(d.config.FailoverCoolDown(info.Name)).After(now) {
1009-
d.logger.Debugf("Domain was last updated at %v, failoverCoolDown: %v, current time: %v.", lastUpdatedTime, d.config.FailoverCoolDown(info.Name), now)
1010-
return nil, errDomainUpdateTooFrequent
1011-
}
1012-
1013-
// set the version
1014-
if replicationConfigChanged {
1015-
configVersion++
1016-
}
1017-
1018-
if activeClusterChanged && isGlobalDomain {
1019-
var failoverType constants.FailoverType = constants.FailoverTypeGrace
1020-
1021-
// Force failover cleans graceful failover state
1022-
if updateRequest.FailoverTimeoutInSeconds == nil {
1023-
failoverType = constants.FailoverTypeForce
1024-
gracefulFailoverEndTime = nil
1025-
previousFailoverVersion = constants.InitialPreviousFailoverVersion
1026-
}
1027-
1028-
// Cases:
1029-
// 1. active-passive domain's ActiveClusterName is changed
1030-
// 2. active-passive domain is being migrated to active-active
1031-
// 3. active-active domain's ActiveClusters is changed
1032-
isActiveActive := replicationConfig.IsActiveActive()
1033-
1034-
// case 1. active-passive domain's ActiveClusterName is changed
1035-
if !wasActiveActive && !isActiveActive {
1036-
failoverVersion = d.clusterMetadata.GetNextFailoverVersion(
1037-
replicationConfig.ActiveClusterName,
1038-
failoverVersion,
1039-
updateRequest.Name,
1040-
)
1041-
1042-
d.logger.Debug("active-passive domain failover",
1043-
tag.WorkflowDomainName(info.Name),
1044-
tag.Dynamic("failover-version", failoverVersion),
1045-
tag.Dynamic("failover-type", failoverType),
1046-
)
1047-
1048-
err = updateFailoverHistoryInDomainData(info, d.config, NewFailoverEvent(
1049-
now,
1050-
failoverType,
1051-
&currentActiveCluster,
1052-
updateRequest.ActiveClusterName,
1053-
nil,
1054-
nil,
1055-
))
1056-
if err != nil {
1057-
d.logger.Warn("failed to update failover history", tag.Error(err))
1058-
}
1059-
}
1060-
1061-
// case 2. active-passive domain is being migrated to active-active
1062-
if !wasActiveActive && isActiveActive {
1063-
// for active-passive to active-active migration,
1064-
// we increment failover version so top level failoverVersion is updated and domain data is replicated.
1065-
1066-
failoverVersion = d.clusterMetadata.GetNextFailoverVersion(
1067-
replicationConfig.ActiveClusterName,
1068-
failoverVersion+1, //todo: (active-active): Let's review if we need to increment
1069-
// this for cluster-attr failover changes. It may not be necessary to increment
1070-
updateRequest.Name,
1071-
)
1072-
1073-
d.logger.Debug("active-passive domain is being migrated to active-active",
1074-
tag.WorkflowDomainName(info.Name),
1075-
tag.Dynamic("failover-version", failoverVersion),
1076-
tag.Dynamic("failover-type", failoverType),
1077-
)
1078-
1079-
err = updateFailoverHistoryInDomainData(info, d.config, NewFailoverEvent(
1080-
now,
1081-
failoverType,
1082-
&currentActiveCluster,
1083-
updateRequest.ActiveClusterName,
1084-
nil,
1085-
replicationConfig.ActiveClusters,
1086-
))
1087-
if err != nil {
1088-
d.logger.Warn("failed to update failover history", tag.Error(err))
1089-
}
1090-
}
1091-
1092-
// case 3. active-active domain's ActiveClusters is changed
1093-
if wasActiveActive && isActiveActive {
1094-
// top level failover version is not used for task versions for active-active domains but we still increment it
1095-
// to indicate there was a change in replication config
1096-
failoverVersion = d.clusterMetadata.GetNextFailoverVersion(
1097-
replicationConfig.ActiveClusterName,
1098-
failoverVersion+1, //todo: (active-active): Let's review if we need to increment
1099-
// this for cluster-attr failover changes. It may not be necessary to increment
1100-
updateRequest.Name,
1101-
)
1102-
1103-
d.logger.Debug("active-active domain failover",
1104-
tag.WorkflowDomainName(info.Name),
1105-
tag.Dynamic("failover-version", failoverVersion),
1106-
tag.Dynamic("failover-type", failoverType),
1107-
)
1108-
1109-
err = updateFailoverHistoryInDomainData(info, d.config, NewFailoverEvent(
1110-
now,
1111-
failoverType,
1112-
&currentActiveCluster,
1113-
nil,
1114-
currentActiveClusters,
1115-
replicationConfig.ActiveClusters,
1116-
))
1117-
if err != nil {
1118-
d.logger.Warn("failed to update failover history", tag.Error(err))
1119-
}
1120-
}
1121-
1122-
failoverNotificationVersion = notificationVersion
947+
if !currentDomainState.IsGlobalDomain {
948+
return nil, errLocalDomainsCannotFailover
1123949
}
1124950

1125-
lastUpdatedTime = now
951+
notificationVersion := currentDomainState.NotificationVersion
1126952

1127-
updateReq := createUpdateRequest(
1128-
info,
1129-
config,
1130-
replicationConfig,
1131-
configVersion,
1132-
failoverVersion,
1133-
failoverNotificationVersion,
1134-
gracefulFailoverEndTime,
1135-
previousFailoverVersion,
1136-
lastUpdatedTime,
953+
response, err := d.handleFailoverRequest(
954+
ctx,
955+
failoverRequest.ToUpdateDomainRequest(),
956+
currentDomainState,
1137957
notificationVersion,
1138958
)
1139-
1140-
err = d.domainManager.UpdateDomain(ctx, &updateReq)
1141959
if err != nil {
1142-
d.logger.Info("Update domain's replication configs failed",
1143-
tag.WorkflowDomainName(info.Name),
1144-
tag.WorkflowDomainID(info.ID),
1145-
)
1146960
return nil, err
1147961
}
1148-
1149-
if isGlobalDomain {
1150-
if err = d.domainReplicator.HandleTransmissionTask(
1151-
ctx,
1152-
types.DomainOperationUpdate,
1153-
info,
1154-
config,
1155-
replicationConfig,
1156-
configVersion,
1157-
failoverVersion,
1158-
previousFailoverVersion,
1159-
isGlobalDomain,
1160-
); err != nil {
1161-
return nil, err
1162-
}
1163-
}
1164-
1165-
domainInfo, configuration, replicationConfiguration := d.createResponse(info, config, replicationConfig)
1166-
1167-
d.logger.Info("Update domain's replication configs succeeded",
1168-
tag.WorkflowDomainName(info.Name),
1169-
tag.WorkflowDomainID(info.ID),
1170-
)
1171-
return &types.FailoverDomainResponse{
1172-
DomainInfo: domainInfo,
1173-
Configuration: configuration,
1174-
ReplicationConfiguration: replicationConfiguration,
1175-
FailoverVersion: failoverVersion,
1176-
IsGlobalDomain: isGlobalDomain,
1177-
}, nil
962+
return response.ToFailoverDomainResponse(), nil
1178963
}
1179964

1180965
// DeleteDomain deletes a domain

common/types/mapper/proto/api.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4514,7 +4514,8 @@ func FromFailoverDomainRequest(t *types.FailoverDomainRequest) *apiv1.FailoverDo
45144514
}
45154515
return &apiv1.FailoverDomainRequest{
45164516
DomainName: t.DomainName,
4517-
DomainActiveClusterName: *t.DomainActiveClusterName,
4517+
DomainActiveClusterName: t.GetDomainActiveClusterName(),
4518+
ActiveClusters: FromActiveClusters(t.ActiveClusters),
45184519
}
45194520
}
45204521

@@ -4525,6 +4526,7 @@ func ToFailoverDomainRequest(t *apiv1.FailoverDomainRequest) *types.FailoverDoma
45254526
return &types.FailoverDomainRequest{
45264527
DomainName: t.DomainName,
45274528
DomainActiveClusterName: common.StringPtr(t.DomainActiveClusterName),
4529+
ActiveClusters: ToActiveClusters(t.ActiveClusters),
45284530
}
45294531
}
45304532

@@ -5810,7 +5812,6 @@ func ToActiveClusters(t *apiv1.ActiveClusters) *types.ActiveClusters {
58105812

58115813
var attributeScopes map[string]types.ClusterAttributeScope
58125814

5813-
// Start with ActiveClustersByClusterAttribute if it exists
58145815
if t.ActiveClustersByClusterAttribute != nil {
58155816
attributeScopes = make(map[string]types.ClusterAttributeScope)
58165817
for scopeType, scope := range t.ActiveClustersByClusterAttribute {

common/types/mapper/thrift/shared.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1622,6 +1622,7 @@ func FromFailoverDomainRequest(t *types.FailoverDomainRequest) *shared.FailoverD
16221622
return &shared.FailoverDomainRequest{
16231623
DomainName: &t.DomainName,
16241624
DomainActiveClusterName: t.DomainActiveClusterName,
1625+
ActiveClusters: FromActiveClusters(t.ActiveClusters),
16251626
}
16261627
}
16271628

@@ -1633,6 +1634,7 @@ func ToFailoverDomainRequest(t *shared.FailoverDomainRequest) *types.FailoverDom
16331634
return &types.FailoverDomainRequest{
16341635
DomainName: t.GetDomainName(),
16351636
DomainActiveClusterName: t.DomainActiveClusterName,
1637+
ActiveClusters: ToActiveClusters(t.ActiveClusters),
16361638
}
16371639
}
16381640

common/types/shared.go

Lines changed: 27 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1779,8 +1779,20 @@ func (v *DescribeDomainResponse) GetFailoverInfo() (o *FailoverInfo) {
17791779

17801780
// FailoverDomainRequest is an internal type (TBD...)
17811781
type FailoverDomainRequest struct {
1782-
DomainName string `json:"domainName,omitempty"`
1783-
DomainActiveClusterName *string `json:"domainActiveClusterName,omitempty"`
1782+
DomainName string `json:"domainName,omitempty"`
1783+
DomainActiveClusterName *string `json:"domainActiveClusterName,omitempty"`
1784+
ActiveClusters *ActiveClusters `json:"activeClusters,omitempty"`
1785+
}
1786+
1787+
func (v *FailoverDomainRequest) ToUpdateDomainRequest() *UpdateDomainRequest {
1788+
if v == nil {
1789+
return nil
1790+
}
1791+
return &UpdateDomainRequest{
1792+
Name: v.DomainName,
1793+
ActiveClusterName: v.DomainActiveClusterName,
1794+
ActiveClusters: v.ActiveClusters,
1795+
}
17841796
}
17851797

17861798
// GetDomainName is an internal getter (TBD...)
@@ -8063,6 +8075,19 @@ type UpdateDomainResponse struct {
80638075
IsGlobalDomain bool `json:"isGlobalDomain,omitempty"`
80648076
}
80658077

8078+
func (v *UpdateDomainResponse) ToFailoverDomainResponse() *FailoverDomainResponse {
8079+
if v == nil {
8080+
return nil
8081+
}
8082+
return &FailoverDomainResponse{
8083+
DomainInfo: v.DomainInfo,
8084+
Configuration: v.Configuration,
8085+
ReplicationConfiguration: v.ReplicationConfiguration,
8086+
FailoverVersion: v.FailoverVersion,
8087+
IsGlobalDomain: v.IsGlobalDomain,
8088+
}
8089+
}
8090+
80668091
// GetDomainInfo is an internal getter (TBD...)
80678092
func (v *UpdateDomainResponse) GetDomainInfo() (o *DomainInfo) {
80688093
if v != nil && v.DomainInfo != nil {

0 commit comments

Comments
 (0)