diff --git a/src/groups/mqb/mqbblp/mqbblp_storagemanager.cpp b/src/groups/mqb/mqbblp/mqbblp_storagemanager.cpp index cb511b557a..b786dc0527 100644 --- a/src/groups/mqb/mqbblp/mqbblp_storagemanager.cpp +++ b/src/groups/mqb/mqbblp/mqbblp_storagemanager.cpp @@ -14,6 +14,7 @@ // limitations under the License. // mqbblp_storagemanager.cpp -*-C++-*- +#include #include #include @@ -606,12 +607,22 @@ void StorageManager::clearPrimaryForPartitionDispatched( mqbs::FileStore* fs = d_fileStores[partitionId].get(); PartitionInfo& pinfo = d_partitionInfoVec[partitionId]; + if (primary != pinfo.primary()) { + BALL_LOG_WARN << d_clusterData_p->identity().description() + << " Partition [" << partitionId + << "]: Failed to clear primary as specified primary: " + << (primary ? primary->nodeDescription() : "**null**") + << " is different from current perceived primary: " + << (pinfo.primary() ? pinfo.primary()->nodeDescription() + : "** null **"); + return; // RETURN + } + mqbc::StorageUtil::clearPrimaryForPartition( fs, &pinfo, d_clusterData_p->identity().description(), - partitionId, - primary); + partitionId); } void StorageManager::processStorageEventDispatched( diff --git a/src/groups/mqb/mqbc/mqbc_clusterstate.h b/src/groups/mqb/mqbc/mqbc_clusterstate.h index b66e3d9cd0..ea404dad32 100644 --- a/src/groups/mqb/mqbc/mqbc_clusterstate.h +++ b/src/groups/mqb/mqbc/mqbc_clusterstate.h @@ -701,7 +701,7 @@ class ClusterState { // ACCESSORS /// Return the value of the corresponding member of this object. const mqbi::Cluster* cluster() const; - const PartitionsInfo& partitionsInfo() const; + const PartitionsInfo& partitionsInfo() const; // TODO why diff name?! const DomainStates& domainStates() const; const QueueKeys& queueKeys() const; const ObserversSet& observers() const; @@ -737,7 +737,7 @@ class ClusterState { bool hasActivePrimary(int partitionId) const; /// Return a reference to the partitions info. - const PartitionsInfo& partitions() const; + const PartitionsInfo& partitions() const; // TODO why diff name?! /// Return a reference to the PartitionInfo corresponding to the /// specified `partitionId`. This method is the same as diff --git a/src/groups/mqb/mqbc/mqbc_partitionstatetable.h b/src/groups/mqb/mqbc/mqbc_partitionstatetable.h index bb0c79e7b9..60eaf732b4 100644 --- a/src/groups/mqb/mqbc/mqbc_partitionstatetable.h +++ b/src/groups/mqb/mqbc/mqbc_partitionstatetable.h @@ -211,10 +211,6 @@ class PartitionStateTableActions { virtual void do_storeReplicaSeq(const ARGS& args) = 0; - virtual void do_storePartitionInfo(const ARGS& args) = 0; - - virtual void do_clearPartitionInfo(const ARGS& args) = 0; - virtual void do_replicaStateRequest(const ARGS& args) = 0; virtual void do_replicaStateResponse(const ARGS& args) = 0; @@ -296,11 +292,10 @@ class PartitionStateTableActions { const ARGS& args); void - do_startWatchDog_storePartitionInfo_openRecoveryFileSet_storeSelfSeq_replicaStateRequest_checkQuorumSeq( + do_startWatchDog_openRecoveryFileSet_storeSelfSeq_replicaStateRequest_checkQuorumSeq( const ARGS& args); - void - do_startWatchDog_storePartitionInfo_openRecoveryFileSet_storeSelfSeq_primaryStateRequest( + void do_startWatchDog_openRecoveryFileSet_storeSelfSeq_primaryStateRequest( const ARGS& args); void @@ -310,16 +305,12 @@ class PartitionStateTableActions { void do_storePrimarySeq_replicaStateResponse(const ARGS& args); - void do_cleanupMetadata_clearPartitionInfo_reapplyEvent(const ARGS& args); - - void do_cleanupMetadata_clearPartitionInfo_stopWatchDog_reapplyEvent( - const ARGS& args); + void do_cleanupMetadata_reapplyEvent(const ARGS& args); - void do_cleanupMetadata_clearPartitionInfo(const ARGS& args); + void do_cleanupMetadata_stopWatchDog_reapplyEvent(const ARGS& args); void - do_cleanupMetadata_clearPartitionInfo_closeRecoveryFileSet_stopWatchDog( - const ARGS& args); + do_cleanupMetadata_closeRecoveryFileSet_stopWatchDog(const ARGS& args); void do_cleanupMetadata_closeRecoveryFileSet_reapplyDetectSelfPrimary( const ARGS& args); @@ -351,8 +342,7 @@ class PartitionStateTableActions { void do_cleanupMetadata_reapplyDetectSelfReplica(const ARGS& args); void - do_resetReceiveDataCtx_clearPartitionInfo_closeRecoveryFileSet_stopWatchDog( - const ARGS& args); + do_resetReceiveDataCtx_closeRecoveryFileSet_stopWatchDog(const ARGS& args); void do_cleanupMetadata_closeRecoveryFileSet_reapplyDetectSelfReplica( const ARGS& args); @@ -434,17 +424,17 @@ class PartitionStateTable PST_CFG( UNKNOWN, DETECT_SELF_PRIMARY, - startWatchDog_storePartitionInfo_openRecoveryFileSet_storeSelfSeq_replicaStateRequest_checkQuorumSeq, + startWatchDog_openRecoveryFileSet_storeSelfSeq_replicaStateRequest_checkQuorumSeq, PRIMARY_HEALING_STG1); PST_CFG( UNKNOWN, DETECT_SELF_REPLICA, - startWatchDog_storePartitionInfo_openRecoveryFileSet_storeSelfSeq_primaryStateRequest, + startWatchDog_openRecoveryFileSet_storeSelfSeq_primaryStateRequest, REPLICA_HEALING); PST_CFG(UNKNOWN, STOP_NODE, none, STOPPED); PST_CFG(PRIMARY_HEALING_STG1, DETECT_SELF_REPLICA, - cleanupMetadata_clearPartitionInfo_stopWatchDog_reapplyEvent, + unsupportedPrimaryDowngrade, UNKNOWN); PST_CFG(PRIMARY_HEALING_STG1, REPLICA_STATE_RQST, @@ -475,23 +465,21 @@ class PartitionStateTable REPLICA_HIGHEST_SEQ, setExpectedDataChunkRange_replicaDataRequestPull, PRIMARY_HEALING_STG2); - PST_CFG( - PRIMARY_HEALING_STG1, - RST_UNKNOWN, - cleanupMetadata_clearPartitionInfo_closeRecoveryFileSet_stopWatchDog, - UNKNOWN); - PST_CFG( - PRIMARY_HEALING_STG1, - STOP_NODE, - cleanupMetadata_clearPartitionInfo_closeRecoveryFileSet_stopWatchDog, - STOPPED); + PST_CFG(PRIMARY_HEALING_STG1, + RST_UNKNOWN, + cleanupMetadata_closeRecoveryFileSet_stopWatchDog, + UNKNOWN); + PST_CFG(PRIMARY_HEALING_STG1, + STOP_NODE, + cleanupMetadata_closeRecoveryFileSet_stopWatchDog, + STOPPED); PST_CFG(PRIMARY_HEALING_STG1, WATCH_DOG, cleanupMetadata_closeRecoveryFileSet_reapplyDetectSelfPrimary, UNKNOWN); PST_CFG(PRIMARY_HEALING_STG2, DETECT_SELF_REPLICA, - cleanupMetadata_clearPartitionInfo_stopWatchDog_reapplyEvent, + unsupportedPrimaryDowngrade, UNKNOWN); PST_CFG(PRIMARY_HEALING_STG2, FAIL_REPLICA_DATA_RSPN_PULL, @@ -533,27 +521,25 @@ class PartitionStateTable QUORUM_REPLICA_DATA_RSPN, stopWatchDog_transitionToActivePrimary, PRIMARY_HEALED); - PST_CFG( - PRIMARY_HEALING_STG2, - RST_UNKNOWN, - cleanupMetadata_clearPartitionInfo_closeRecoveryFileSet_stopWatchDog, - UNKNOWN); + PST_CFG(PRIMARY_HEALING_STG2, + RST_UNKNOWN, + cleanupMetadata_closeRecoveryFileSet_stopWatchDog, + UNKNOWN); PST_CFG(PRIMARY_HEALING_STG2, WATCH_DOG, cleanupMetadata_closeRecoveryFileSet_reapplyDetectSelfPrimary, UNKNOWN); - PST_CFG( - PRIMARY_HEALING_STG2, - STOP_NODE, - cleanupMetadata_clearPartitionInfo_closeRecoveryFileSet_stopWatchDog, - STOPPED); + PST_CFG(PRIMARY_HEALING_STG2, + STOP_NODE, + cleanupMetadata_closeRecoveryFileSet_stopWatchDog, + STOPPED); PST_CFG(REPLICA_HEALING, DETECT_SELF_PRIMARY, - cleanupMetadata_clearPartitionInfo_stopWatchDog_reapplyEvent, + cleanupMetadata_stopWatchDog_reapplyEvent, UNKNOWN); PST_CFG(REPLICA_HEALING, DETECT_SELF_REPLICA, - cleanupMetadata_clearPartitionInfo_stopWatchDog_reapplyEvent, + cleanupMetadata_stopWatchDog_reapplyEvent, UNKNOWN); PST_CFG(REPLICA_HEALING, REPLICA_STATE_RQST, @@ -608,27 +594,25 @@ class PartitionStateTable failureReplicaDataResponsePush_cleanupMetadata_closeRecoveryFileSet_stopWatchDog_reapplyDetectSelfReplica, UNKNOWN); PST_CFG(REPLICA_HEALING, LIVE_DATA, bufferLiveData, REPLICA_HEALING); - PST_CFG( - REPLICA_HEALING, - RST_UNKNOWN, - cleanupMetadata_clearPartitionInfo_closeRecoveryFileSet_stopWatchDog, - UNKNOWN); + PST_CFG(REPLICA_HEALING, + RST_UNKNOWN, + cleanupMetadata_closeRecoveryFileSet_stopWatchDog, + UNKNOWN); PST_CFG(REPLICA_HEALING, WATCH_DOG, cleanupMetadata_closeRecoveryFileSet_reapplyDetectSelfReplica, UNKNOWN); - PST_CFG( - REPLICA_HEALING, - STOP_NODE, - cleanupMetadata_clearPartitionInfo_closeRecoveryFileSet_stopWatchDog, - STOPPED); + PST_CFG(REPLICA_HEALING, + STOP_NODE, + cleanupMetadata_closeRecoveryFileSet_stopWatchDog, + STOPPED); PST_CFG(REPLICA_HEALED, DETECT_SELF_PRIMARY, - cleanupMetadata_clearPartitionInfo_reapplyEvent, + cleanupMetadata_reapplyEvent, UNKNOWN); PST_CFG(REPLICA_HEALED, DETECT_SELF_REPLICA, - cleanupMetadata_clearPartitionInfo_reapplyEvent, + cleanupMetadata_reapplyEvent, UNKNOWN); PST_CFG(REPLICA_HEALED, REPLICA_STATE_RQST, @@ -643,14 +627,8 @@ class PartitionStateTable ISSUE_LIVESTREAM, cleanupMetadata_reapplyDetectSelfReplica, UNKNOWN); - PST_CFG(REPLICA_HEALED, - RST_UNKNOWN, - cleanupMetadata_clearPartitionInfo, - UNKNOWN); - PST_CFG(REPLICA_HEALED, - STOP_NODE, - cleanupMetadata_clearPartitionInfo, - STOPPED); + PST_CFG(REPLICA_HEALED, RST_UNKNOWN, cleanupMetadata, UNKNOWN); + PST_CFG(REPLICA_HEALED, STOP_NODE, cleanupMetadata, STOPPED); PST_CFG(PRIMARY_HEALED, DETECT_SELF_REPLICA, unsupportedPrimaryDowngrade, @@ -665,14 +643,8 @@ class PartitionStateTable PRIMARY_STATE_RQST, storeSelfSeq_storeReplicaSeq_primaryStateResponse_replicaDataRequestPush_replicaDataRequestDrop_startSendDataChunks, PRIMARY_HEALED); - PST_CFG(PRIMARY_HEALED, - RST_UNKNOWN, - cleanupMetadata_clearPartitionInfo, - UNKNOWN); - PST_CFG(PRIMARY_HEALED, - STOP_NODE, - cleanupMetadata_clearPartitionInfo, - STOPPED); + PST_CFG(PRIMARY_HEALED, RST_UNKNOWN, cleanupMetadata, UNKNOWN); + PST_CFG(PRIMARY_HEALED, STOP_NODE, cleanupMetadata, STOPPED); #undef PST_CFG } @@ -718,11 +690,10 @@ void PartitionStateTableActions:: template void PartitionStateTableActions:: - do_startWatchDog_storePartitionInfo_openRecoveryFileSet_storeSelfSeq_replicaStateRequest_checkQuorumSeq( + do_startWatchDog_openRecoveryFileSet_storeSelfSeq_replicaStateRequest_checkQuorumSeq( const ARGS& args) { do_startWatchDog(args); - do_storePartitionInfo(args); do_openRecoveryFileSet(args); do_storeSelfSeq(args); do_replicaStateRequest(args); @@ -731,11 +702,10 @@ void PartitionStateTableActions:: template void PartitionStateTableActions:: - do_startWatchDog_storePartitionInfo_openRecoveryFileSet_storeSelfSeq_primaryStateRequest( + do_startWatchDog_openRecoveryFileSet_storeSelfSeq_primaryStateRequest( const ARGS& args) { do_startWatchDog(args); - do_storePartitionInfo(args); do_openRecoveryFileSet(args); do_storeSelfSeq(args); do_primaryStateRequest(args); @@ -767,40 +737,27 @@ void PartitionStateTableActions::do_storePrimarySeq_replicaStateResponse( } template -void PartitionStateTableActions< - ARGS>::do_cleanupMetadata_clearPartitionInfo_reapplyEvent(const ARGS& args) +void PartitionStateTableActions::do_cleanupMetadata_reapplyEvent( + const ARGS& args) { do_cleanupMetadata(args); - do_clearPartitionInfo(args); do_reapplyEvent(args); } template -void PartitionStateTableActions:: - do_cleanupMetadata_clearPartitionInfo_stopWatchDog_reapplyEvent( - const ARGS& args) +void PartitionStateTableActions< + ARGS>::do_cleanupMetadata_stopWatchDog_reapplyEvent(const ARGS& args) { do_cleanupMetadata(args); - do_clearPartitionInfo(args); do_stopWatchDog(args); do_reapplyEvent(args); } -template -void PartitionStateTableActions::do_cleanupMetadata_clearPartitionInfo( - const ARGS& args) -{ - do_cleanupMetadata(args); - do_clearPartitionInfo(args); -} - template void PartitionStateTableActions:: - do_cleanupMetadata_clearPartitionInfo_closeRecoveryFileSet_stopWatchDog( - const ARGS& args) + do_cleanupMetadata_closeRecoveryFileSet_stopWatchDog(const ARGS& args) { do_cleanupMetadata(args); - do_clearPartitionInfo(args); do_closeRecoveryFileSet(args); do_stopWatchDog(args); } @@ -914,11 +871,9 @@ void PartitionStateTableActions< template void PartitionStateTableActions:: - do_resetReceiveDataCtx_clearPartitionInfo_closeRecoveryFileSet_stopWatchDog( - const ARGS& args) + do_resetReceiveDataCtx_closeRecoveryFileSet_stopWatchDog(const ARGS& args) { do_resetReceiveDataCtx(args); - do_clearPartitionInfo(args); do_closeRecoveryFileSet(args); do_stopWatchDog(args); } diff --git a/src/groups/mqb/mqbc/mqbc_storagemanager.cpp b/src/groups/mqb/mqbc/mqbc_storagemanager.cpp index 581ee61c6c..3d0483b966 100644 --- a/src/groups/mqb/mqbc/mqbc_storagemanager.cpp +++ b/src/groups/mqb/mqbc/mqbc_storagemanager.cpp @@ -183,18 +183,13 @@ void StorageManager::onWatchDogDispatched(int partitionId) << d_watchDogTimeoutInterval.totalSeconds() << " seconds." << BMQTSK_ALARMLOG_END; - mqbs::FileStore* fs = d_fileStores[partitionId].get(); - BSLS_ASSERT_SAFE(fs); - EventData eventDataVec; eventDataVec.emplace_back(d_clusterData_p->membership().selfNode(), -1, // placeholder requestId partitionId, 1); - dispatchEventToPartition(fs, - PartitionFSM::Event::e_WATCH_DOG, - eventDataVec); + dispatchEventToPartition(PartitionFSM::Event::e_WATCH_DOG, eventDataVec); } void StorageManager::onPartitionDoneSendDataChunksCb( @@ -218,22 +213,17 @@ void StorageManager::onPartitionDoneSendDataChunksCb( EventData eventDataVec; eventDataVec.emplace_back(destination, requestId, partitionId, 1, range); - mqbs::FileStore* fs = d_fileStores[partitionId].get(); - BSLS_ASSERT_SAFE(fs); - // Note that currently the two FSM events below are no-op if we are the // primary. As replica, we will send either success or failure // ReplicaDataResponsePull depending on 'status'. In the future, it might // no longer be no-op for primary. if (status != 0) { dispatchEventToPartition( - fs, PartitionFSM::Event::e_ERROR_SENDING_DATA_CHUNKS, eventDataVec); } else { dispatchEventToPartition( - fs, PartitionFSM::Event::e_DONE_SENDING_DATA_CHUNKS, eventDataVec); } @@ -295,19 +285,19 @@ void StorageManager::onPartitionRecovery(int partitionId) } } -void StorageManager::dispatchEventToPartition(mqbs::FileStore* fs, - PartitionFSM::Event::Enum event, +void StorageManager::dispatchEventToPartition(PartitionFSM::Event::Enum event, const EventData& eventDataVec) { - // executed by the cluster *DISPATCHER* thread + // executed by the cluster *DISPATCHER* or *QUEUE_DISPATCHER* thread // PRECONDITIONS - BSLS_ASSERT_SAFE(fs); BSLS_ASSERT_SAFE(eventDataVec.size() >= 1); // NOTE: it is assumed that all elements in 'eventDataVec' have the same // 'partitionId'. const int partitionId = eventDataVec[0].partitionId(); + BSLS_ASSERT_SAFE(0 <= partitionId && + partitionId < static_cast(d_fileStores.size())); if (d_cluster_p->isStopping()) { BALL_LOG_WARN << d_clusterData_p->identity().description() @@ -326,9 +316,16 @@ void StorageManager::dispatchEventToPartition(mqbs::FileStore* fs, // actions as documented in the state transition table for PartitionFSM. queueSp->emplace(event, eventDataVec); - fs->execute(bdlf::BindUtil::bind(&PartitionFSM::popEventAndProcess, - d_partitionFSMVec[partitionId].get(), - queueSp)); + mqbs::FileStore* fs = d_fileStores[partitionId].get(); + BSLS_ASSERT_SAFE(fs); + if (fs->inDispatcherThread()) { + d_partitionFSMVec[partitionId]->popEventAndProcess(queueSp); + } + else { + fs->execute(bdlf::BindUtil::bind(&PartitionFSM::popEventAndProcess, + d_partitionFSMVec[partitionId].get(), + queueSp)); + } } void StorageManager::setPrimaryStatusForPartitionDispatched( @@ -391,19 +388,98 @@ void StorageManager::setPrimaryStatusForPartitionDispatched( } } +void StorageManager::setPrimaryForPartitionDispatched( + int partitionId, + mqbnet::ClusterNode* primaryNode, + unsigned int primaryLeaseId) +{ + // executed by *QUEUE_DISPATCHER* thread associated with 'partitionId' + + // PRECONDITIONS + BSLS_ASSERT_SAFE(0 <= partitionId && + partitionId < static_cast(d_fileStores.size())); + BSLS_ASSERT_SAFE(d_fileStores[partitionId]->inDispatcherThread()); + BSLS_ASSERT_SAFE(primaryNode); + + PartitionInfo& pinfo = d_partitionInfoVec[partitionId]; + if (pinfo.primary() && + (pinfo.primary()->nodeId() == primaryNode->nodeId())) { + // Primary node did not change + pinfo.setPrimaryLeaseId(primaryLeaseId); + return; // RETURN + } + + pinfo.setPrimary(primaryNode); + pinfo.setPrimaryLeaseId(primaryLeaseId); + pinfo.setPrimaryStatus(bmqp_ctrlmsg::PrimaryStatus::E_PASSIVE); + + if (!d_isQueueKeyInfoMapVecInitialized) { + // We must wait until queue key info map is initialized before + // processing primary detection in the Partition FSM. + + // RETURN + } + + if (primaryNode->nodeId() == + d_clusterData_p->membership().selfNode()->nodeId()) { + processPrimaryDetect(partitionId, primaryNode, primaryLeaseId); + } + else { + processReplicaDetect(partitionId, primaryNode, primaryLeaseId); + } +} + +void StorageManager::clearPrimaryForPartitionDispatched( + int partitionId, + mqbnet::ClusterNode* primary) +{ + // executed by *QUEUE_DISPATCHER* thread associated with 'partitionId' + + // PRECONDITIONS + BSLS_ASSERT_SAFE(0 <= partitionId && + partitionId < static_cast(d_fileStores.size())); + BSLS_ASSERT_SAFE(d_fileStores[partitionId]->inDispatcherThread()); + BSLS_ASSERT_SAFE(primary); + + BALL_LOG_INFO << d_clusterData_p->identity().description() + << " Partition [" << partitionId << "]: " + << "Self Transition back to Unknown in the Partition FSM."; + + mqbs::FileStore* fs = d_fileStores[partitionId].get(); + BSLS_ASSERT_SAFE(fs); + + StorageUtil::clearPrimaryForPartition( + fs, + &d_partitionInfoVec[partitionId], + d_clusterData_p->identity().description(), + partitionId); + + EventData eventDataVec; + eventDataVec.emplace_back( + d_clusterData_p->membership().selfNode(), + -1, // placeholder requestId + partitionId, + 1, + primary, + d_clusterState_p->partitionsInfo().at(partitionId).primaryLeaseId()); + + dispatchEventToPartition(PartitionFSM::Event::e_RST_UNKNOWN, eventDataVec); +} + void StorageManager::processPrimaryDetect(int partitionId, mqbnet::ClusterNode* primaryNode, unsigned int primaryLeaseId) { - // executed by the cluster *DISPATCHER* thread + // executed by *QUEUE_DISPATCHER* thread associated with 'partitionId' // PRECONDITIONS - BSLS_ASSERT_SAFE(dispatcher()->inDispatcherThread(d_cluster_p)); - BSLS_ASSERT_SAFE(primaryNode->nodeId() == - d_clusterData_p->membership().selfNode()->nodeId()); BSLS_ASSERT_SAFE(0 <= partitionId && partitionId < static_cast(d_fileStores.size())); + BSLS_ASSERT_SAFE(d_fileStores[partitionId]->inDispatcherThread()); + BSLS_ASSERT_SAFE(primaryNode->nodeId() == + d_clusterData_p->membership().selfNode()->nodeId()); + BALL_LOG_ERROR << "xxm"; if (d_cluster_p->isStopping()) { BALL_LOG_WARN << d_clusterData_p->identity().description() << " Partition [" << partitionId << "]: " @@ -412,6 +488,13 @@ void StorageManager::processPrimaryDetect(int partitionId, return; // RETURN } + BALL_LOG_ERROR << "xxm"; + // The fact that we are processing primary detection retraoactively implies + // that queue key info map must have been initialized. We cannot set this + // flag earlier, or else there could be race condition. TODO Explain + // further. + d_isQueueKeyInfoMapVecInitialized = true; + BALL_LOG_INFO << d_clusterData_p->identity().description() << " Partition [" << partitionId << "]: " << "Self Transition to Primary in the Partition FSM."; @@ -424,11 +507,7 @@ void StorageManager::processPrimaryDetect(int partitionId, primaryNode, primaryLeaseId); - mqbs::FileStore* fs = d_fileStores[partitionId].get(); - BSLS_ASSERT_SAFE(fs); - - dispatchEventToPartition(fs, - PartitionFSM::Event::e_DETECT_SELF_PRIMARY, + dispatchEventToPartition(PartitionFSM::Event::e_DETECT_SELF_PRIMARY, eventDataVec); } @@ -436,15 +515,16 @@ void StorageManager::processReplicaDetect(int partitionId, mqbnet::ClusterNode* primaryNode, unsigned int primaryLeaseId) { - // executed by the cluster *DISPATCHER* thread + // executed by *QUEUE_DISPATCHER* thread associated with 'partitionId' // PRECONDITIONS - BSLS_ASSERT_SAFE(dispatcher()->inDispatcherThread(d_cluster_p)); - BSLS_ASSERT_SAFE(primaryNode->nodeId() != - d_clusterData_p->membership().selfNode()->nodeId()); BSLS_ASSERT_SAFE(0 <= partitionId && partitionId < static_cast(d_fileStores.size())); + BSLS_ASSERT_SAFE(d_fileStores[partitionId]->inDispatcherThread()); + BSLS_ASSERT_SAFE(primaryNode->nodeId() != + d_clusterData_p->membership().selfNode()->nodeId()); + BALL_LOG_ERROR << "xxm"; if (d_cluster_p->isStopping()) { BALL_LOG_WARN << d_clusterData_p->identity().description() << " Partition [" << partitionId << "]: " @@ -453,6 +533,13 @@ void StorageManager::processReplicaDetect(int partitionId, return; // RETURN } + BALL_LOG_ERROR << "xxm"; + // The fact that we are processing primary detection retraoactively implies + // that queue key info map must have been initialized. We cannot set this + // flag earlier, or else there could be race condition. TODO Explain + // further. + d_isQueueKeyInfoMapVecInitialized = true; + BALL_LOG_INFO << d_clusterData_p->identity().description() << " Partition [" << partitionId << "]: " << "Self Transition to Replica in the Partition FSM."; @@ -465,11 +552,7 @@ void StorageManager::processReplicaDetect(int partitionId, primaryNode, primaryLeaseId); - mqbs::FileStore* fs = d_fileStores[partitionId].get(); - BSLS_ASSERT_SAFE(fs); - - dispatchEventToPartition(fs, - PartitionFSM::Event::e_DETECT_SELF_REPLICA, + dispatchEventToPartition(PartitionFSM::Event::e_DETECT_SELF_REPLICA, eventDataVec); } @@ -528,11 +611,7 @@ void StorageManager::processReplicaDataRequestPull( PartitionSeqNumDataRange(replicaDataRequest.beginSequenceNumber(), replicaDataRequest.endSequenceNumber())); - mqbs::FileStore* fs = d_fileStores[partitionId].get(); - BSLS_ASSERT_SAFE(fs); - - dispatchEventToPartition(fs, - PartitionFSM::Event::e_REPLICA_DATA_RQST_PULL, + dispatchEventToPartition(PartitionFSM::Event::e_REPLICA_DATA_RQST_PULL, eventDataVec); } @@ -599,11 +678,7 @@ void StorageManager::processReplicaDataRequestPush( PartitionSeqNumDataRange(replicaDataRequest.beginSequenceNumber(), replicaDataRequest.endSequenceNumber())); - mqbs::FileStore* fs = d_fileStores[partitionId].get(); - BSLS_ASSERT_SAFE(fs); - - dispatchEventToPartition(fs, - PartitionFSM::Event::e_REPLICA_DATA_RQST_PUSH, + dispatchEventToPartition(PartitionFSM::Event::e_REPLICA_DATA_RQST_PUSH, eventDataVec); } @@ -663,11 +738,7 @@ void StorageManager::processReplicaDataRequestDrop( partitionId, 1); - mqbs::FileStore* fs = d_fileStores[partitionId].get(); - BSLS_ASSERT_SAFE(fs); - - dispatchEventToPartition(fs, - PartitionFSM::Event::e_REPLICA_DATA_RQST_DROP, + dispatchEventToPartition(PartitionFSM::Event::e_REPLICA_DATA_RQST_DROP, eventDataVec); } @@ -713,9 +784,6 @@ void StorageManager::processPrimaryStateResponseDispatched( return; // RETURN } - mqbs::FileStore* fs = d_fileStores[partitionId].get(); - BSLS_ASSERT_SAFE(fs); - if (context->result() != bmqt::GenericResult::e_SUCCESS) { BALL_LOG_WARN << d_clusterData_p->identity().description() << ": received FAIL_PrmryStateRspn event " @@ -726,7 +794,6 @@ void StorageManager::processPrimaryStateResponseDispatched( eventDataVec.emplace_back(responder, responseId, partitionId, 1); dispatchEventToPartition( - fs, PartitionFSM::Event::e_FAIL_PRIMARY_STATE_RSPN, eventDataVec); return; // RETURN @@ -769,8 +836,7 @@ void StorageManager::processPrimaryStateResponseDispatched( response.latestSequenceNumber(), response.firstSyncPointAfterRolloverSequenceNumber()); - dispatchEventToPartition(fs, - PartitionFSM::Event::e_PRIMARY_STATE_RSPN, + dispatchEventToPartition(PartitionFSM::Event::e_PRIMARY_STATE_RSPN, eventDataVec); } @@ -902,16 +968,14 @@ void StorageManager::processReplicaStateResponseDispatched( BSLS_ASSERT_SAFE(requestPartitionId == response.partitionId()); } - mqbs::FileStore* fs = d_fileStores[requestPartitionId].get(); if (eventDataVec.size() > 0) { - dispatchEventToPartition(fs, - PartitionFSM::Event::e_REPLICA_STATE_RSPN, + dispatchEventToPartition(PartitionFSM::Event::e_REPLICA_STATE_RSPN, eventDataVec); } if (failedEventDataVec.size() > 0) { dispatchEventToPartition( - fs, + PartitionFSM::Event::e_FAIL_REPLICA_STATE_RSPN, failedEventDataVec); } @@ -976,9 +1040,6 @@ void StorageManager::processReplicaDataResponseDispatched( return; // RETURN } - mqbs::FileStore* fs = d_fileStores[partitionId].get(); - BSLS_ASSERT_SAFE(fs); - if (context->result() != bmqt::GenericResult::e_SUCCESS) { BALL_LOG_WARN << d_clusterData_p->identity().description() << ": received FAIL_ReplicaDataResponse event " @@ -991,19 +1052,16 @@ void StorageManager::processReplicaDataResponseDispatched( switch (dataType) { case bmqp_ctrlmsg::ReplicaDataType::E_PULL: { dispatchEventToPartition( - fs, PartitionFSM::Event::e_FAIL_REPLICA_DATA_RSPN_PULL, eventDataVec); } break; case bmqp_ctrlmsg::ReplicaDataType::E_PUSH: { dispatchEventToPartition( - fs, PartitionFSM::Event::e_FAIL_REPLICA_DATA_RSPN_PUSH, eventDataVec); } break; case bmqp_ctrlmsg::ReplicaDataType::E_DROP: { dispatchEventToPartition( - fs, PartitionFSM::Event::e_FAIL_REPLICA_DATA_RSPN_DROP, eventDataVec); } break; @@ -1071,18 +1129,15 @@ void StorageManager::processReplicaDataResponseDispatched( switch (dataType) { case bmqp_ctrlmsg::ReplicaDataType::E_PULL: { - dispatchEventToPartition(fs, - PartitionFSM::Event::e_REPLICA_DATA_RSPN_PULL, + dispatchEventToPartition(PartitionFSM::Event::e_REPLICA_DATA_RSPN_PULL, eventDataVec); } break; case bmqp_ctrlmsg::ReplicaDataType::E_PUSH: { - dispatchEventToPartition(fs, - PartitionFSM::Event::e_REPLICA_DATA_RSPN_PUSH, + dispatchEventToPartition(PartitionFSM::Event::e_REPLICA_DATA_RSPN_PUSH, eventDataVec); } break; case bmqp_ctrlmsg::ReplicaDataType::E_DROP: { - dispatchEventToPartition(fs, - PartitionFSM::Event::e_REPLICA_DATA_RSPN_DROP, + dispatchEventToPartition(PartitionFSM::Event::e_REPLICA_DATA_RSPN_DROP, eventDataVec); } break; case bmqp_ctrlmsg::ReplicaDataType::E_UNKNOWN: @@ -1148,24 +1203,25 @@ void StorageManager::processShutdownEventDispatched(int partitionId) mqbs::FileStore* fs = d_fileStores[partitionId].get(); BSLS_ASSERT_SAFE(fs); + StorageUtil::processShutdownEventDispatched( + d_clusterData_p, + &d_partitionInfoVec[partitionId], + fs, + partitionId); + + StorageUtil::clearPrimaryForPartition( + fs, + &d_partitionInfoVec[partitionId], + d_clusterData_p->identity().description(), + partitionId); + EventData eventDataVec; eventDataVec.emplace_back(d_clusterData_p->membership().selfNode(), -1, // placeholder requestId partitionId, 1); - bsl::shared_ptr > queueSp = - bsl::allocate_shared >( - d_allocator_p); - queueSp->emplace(PartitionFSM::Event::e_STOP_NODE, eventDataVec); - - d_partitionFSMVec[partitionId]->popEventAndProcess(queueSp); - - StorageUtil::processShutdownEventDispatched( - d_clusterData_p, - &d_partitionInfoVec[partitionId], - fs, - partitionId); + dispatchEventToPartition(PartitionFSM::Event::e_STOP_NODE, eventDataVec); } void StorageManager::forceFlushFileStores() @@ -1497,75 +1553,6 @@ void StorageManager::do_storeReplicaSeq(const PartitionFSMArgsSp& args) } } -void StorageManager::do_storePartitionInfo(const PartitionFSMArgsSp& args) -{ - // executed by the *QUEUE DISPATCHER* thread associated with the paritionId - // contained in 'args' - - // PRECONDITIONS - BSLS_ASSERT_SAFE(!args->eventsQueue()->empty()); - - const PartitionFSM::EventWithData& eventWithData = - args->eventsQueue()->front(); - const EventData& eventDataVec = eventWithData.second; - BSLS_ASSERT_SAFE(eventDataVec.size() == 1); - - const PartitionFSMEventData& eventData = eventDataVec[0]; - const int partitionId = eventData.partitionId(); - mqbnet::ClusterNode* primaryNode = eventData.primary(); - unsigned int primaryLeaseId = eventData.primaryLeaseId(); - - // PRECONDITIONS - BSLS_ASSERT_SAFE(d_fileStores[partitionId]->inDispatcherThread()); - - PartitionInfo& pinfo = d_partitionInfoVec[partitionId]; - if (pinfo.primary() && - (pinfo.primary()->nodeId() == primaryNode->nodeId())) { - // Primary node did not change - pinfo.setPrimaryLeaseId(primaryLeaseId); - return; // RETURN - } - - pinfo.setPrimary(primaryNode); - pinfo.setPrimaryLeaseId(primaryLeaseId); - pinfo.setPrimaryStatus(bmqp_ctrlmsg::PrimaryStatus::E_PASSIVE); - - if (d_partitionFSMVec[partitionId]->isSelfReplica()) { - d_recoveryManager_mp->setLiveDataSource(primaryNode, partitionId); - } -} - -void StorageManager::do_clearPartitionInfo(const PartitionFSMArgsSp& args) -{ - // executed by the *QUEUE DISPATCHER* thread associated with the paritionId - // contained in 'args' - - // PRECONDITIONS - BSLS_ASSERT_SAFE(!args->eventsQueue()->empty()); - - const PartitionFSM::EventWithData& eventWithData = - args->eventsQueue()->front(); - const EventData& eventDataVec = eventWithData.second; - BSLS_ASSERT_SAFE(eventDataVec.size() == 1); - - const PartitionFSMEventData& eventData = eventDataVec[0]; - const int partitionId = eventData.partitionId(); - mqbnet::ClusterNode* primaryNode = eventData.primary(); - BSLS_ASSERT_SAFE(0 <= partitionId && - partitionId < static_cast(d_fileStores.size())); - - mqbs::FileStore* fs = d_fileStores[partitionId].get(); - PartitionInfo& pinfo = d_partitionInfoVec[partitionId]; - BSLS_ASSERT_SAFE(fs->inDispatcherThread()); - - StorageUtil::clearPrimaryForPartition( - fs, - &pinfo, - d_clusterData_p->identity().description(), - partitionId, - primaryNode); -} - void StorageManager::do_replicaStateRequest(const PartitionFSMArgsSp& args) { // executed by the *QUEUE DISPATCHER* thread associated with the paritionId @@ -1798,9 +1785,15 @@ void StorageManager::do_primaryStateRequest(const PartitionFSMArgsSp& args) const PartitionFSMEventData& eventData = eventDataVec[0]; const int partitionId = eventData.partitionId(); + mqbnet::ClusterNode* primary = eventData.primary(); BSLS_ASSERT_SAFE(0 <= partitionId && partitionId < static_cast(d_fileStores.size())); + BSLS_ASSERT_SAFE(primary->nodeId() == + d_partitionInfoVec[partitionId].primary()->nodeId()); + BSLS_ASSERT_SAFE(d_partitionFSMVec[partitionId]->isSelfReplica()); + + d_recoveryManager_mp->setLiveDataSource(primary, partitionId); RequestManagerType::RequestSp request = d_clusterData_p->requestManager().createRequest(); @@ -1824,24 +1817,20 @@ void StorageManager::do_primaryStateRequest(const PartitionFSMArgsSp& args) primaryStateRequest.firstSyncPointAfterRolloverSequenceNumber() = getSelfFirstSyncPointAfterRolloverSequenceNumber(partitionId); - mqbnet::ClusterNode* destNode = eventData.primary(); - - BSLS_ASSERT_SAFE(destNode); - request->setResponseCb( bdlf::BindUtil::bind(&StorageManager::processPrimaryStateResponse, this, bdlf::PlaceHolders::_1, - destNode)); + primary)); bmqt::GenericResult::Enum status = d_clusterData_p->cluster().sendRequest( request, - destNode, + primary, bsls::TimeInterval(10)); if (bmqt::GenericResult::e_SUCCESS != status) { EventData failedEventDataVec; - failedEventDataVec.emplace_back(destNode, + failedEventDataVec.emplace_back(primary, -1, // placeholder responseId partitionId, 1); @@ -3600,7 +3589,6 @@ void StorageManager::do_unsupportedPrimaryDowngrade( BSLS_ASSERT_SAFE(0 <= partitionId && partitionId < static_cast(d_fileStores.size())); - BSLS_ASSERT_SAFE(d_partitionFSMVec[partitionId]->isSelfPrimary()); BSLS_ASSERT_SAFE(eventWithData.first == PartitionFSM::Event::e_DETECT_SELF_REPLICA); @@ -3917,14 +3905,23 @@ void StorageManager::stop() d_isStarted = false; for (size_t pid = 0; pid < d_fileStores.size(); ++pid) { + mqbs::FileStore* fs = d_fileStores[pid].get(); + BSLS_ASSERT_SAFE(fs); + + fs->execute( + bdlf::BindUtil::bind(&StorageUtil::clearPrimaryForPartition, + fs, + &d_partitionInfoVec[pid], + d_clusterData_p->identity().description(), + pid)); + EventData eventDataVec; eventDataVec.emplace_back(d_clusterData_p->membership().selfNode(), -1, // placeholder requestId pid, 1); - dispatchEventToPartition(d_fileStores[pid].get(), - PartitionFSM::Event::e_STOP_NODE, + dispatchEventToPartition(PartitionFSM::Event::e_STOP_NODE, eventDataVec); } @@ -3959,13 +3956,7 @@ void StorageManager::initializeQueueKeyInfoMap( BSLS_ASSERT_SAFE(d_dispatcher_p->inDispatcherThread(d_cluster_p)); if (d_isQueueKeyInfoMapVecInitialized) { - BALL_LOG_WARN << d_clusterData_p->identity().description() - << ": Queue key info map should only be initialized " - << "once, but the initalization method is called more " - << "than once. This can happen if the node goes " - << "back-and-forth between healing and healed FSM " - << "states. Please check."; - + // Queue key info map should only be initialized once at startup. return; // RETURN } @@ -3975,6 +3966,7 @@ void StorageManager::initializeQueueKeyInfoMap( bdlf::MemFnUtil::memFn(&QueueKeyInfoMap::empty))); // Populate 'd_queueKeyInfoMapVec' from cluster state + BALL_LOG_ERROR << "xxm"; for (DomainStatesCIter dscit = clusterState.domainStates().cbegin(); dscit != clusterState.domainStates().cend(); ++dscit) { @@ -3998,7 +3990,41 @@ void StorageManager::initializeQueueKeyInfoMap( } } - d_isQueueKeyInfoMapVecInitialized = true; + // TODO We don't set 'd_isQueueKeyInfoMapVecInitialized' to true here + // because TODO + + // Loop for each partition in cluster state + + BALL_LOG_ERROR << "xxm"; + for (size_t pid = 0; pid < d_partitionInfoVec.size(); ++pid) { + mqbs::FileStore* fs = d_fileStores.at(pid).get(); + BSLS_ASSERT_SAFE(fs); + if (d_partitionInfoVec.at(pid).primary()->nodeId() == + d_clusterData_p->membership().selfNode()->nodeId()) { + BALL_LOG_ERROR << "xxm"; + BALL_LOG_ERROR << d_partitionInfoVec.at(pid).primary(); + BALL_LOG_ERROR << "xxm"; + BALL_LOG_ERROR << d_partitionInfoVec.at(pid).primaryLeaseId(); + fs->execute(bdlf::BindUtil::bind( + &StorageManager::processPrimaryDetect, + this, + pid, + d_partitionInfoVec.at(pid).primary(), + d_partitionInfoVec.at(pid).primaryLeaseId())); + } + else { + BALL_LOG_ERROR << "xxm"; + BALL_LOG_ERROR << d_partitionInfoVec.at(pid).primary(); + BALL_LOG_ERROR << "xxm"; + BALL_LOG_ERROR << d_partitionInfoVec.at(pid).primaryLeaseId(); + fs->execute(bdlf::BindUtil::bind( + &StorageManager::processReplicaDetect, + this, + pid, + d_partitionInfoVec.at(pid).primary(), + d_partitionInfoVec.at(pid).primaryLeaseId())); + } + } } void StorageManager::registerQueue(const bmqt::Uri& uri, @@ -4208,13 +4234,15 @@ void StorageManager::setPrimaryForPartition(int partitionId, partitionId < static_cast(d_fileStores.size())); BSLS_ASSERT_SAFE(primaryNode); - if (primaryNode->nodeId() == - d_clusterData_p->membership().selfNode()->nodeId()) { - processPrimaryDetect(partitionId, primaryNode, primaryLeaseId); - } - else { - processReplicaDetect(partitionId, primaryNode, primaryLeaseId); - } + mqbs::FileStore* fs = d_fileStores[partitionId].get(); + BSLS_ASSERT_SAFE(fs); + + fs->execute( + bdlf::BindUtil::bind(&StorageManager::setPrimaryForPartitionDispatched, + this, + partitionId, + primaryNode, + primaryLeaseId)); } void StorageManager::clearPrimaryForPartition(int partitionId, @@ -4231,25 +4259,14 @@ void StorageManager::clearPrimaryForPartition(int partitionId, BSLS_ASSERT_SAFE( !d_clusterState_p->partitionsInfo().at(partitionId).primaryNode()); - BALL_LOG_INFO << d_clusterData_p->identity().description() - << " Partition [" << partitionId << "]: " - << "Self Transition back to Unknown in the Partition FSM."; - - EventData eventDataVec; - eventDataVec.emplace_back( - d_clusterData_p->membership().selfNode(), - -1, // placeholder requestId - partitionId, - 1, - primary, - d_clusterState_p->partitionsInfo().at(partitionId).primaryLeaseId()); - mqbs::FileStore* fs = d_fileStores[partitionId].get(); BSLS_ASSERT_SAFE(fs); - dispatchEventToPartition(fs, - PartitionFSM::Event::e_RST_UNKNOWN, - eventDataVec); + fs->execute(bdlf::BindUtil::bind( + &StorageManager::clearPrimaryForPartitionDispatched, + this, + partitionId, + primary)); } void StorageManager::setPrimaryStatusForPartition( @@ -4327,11 +4344,7 @@ void StorageManager::processPrimaryStateRequest( primaryStateRequest.latestSequenceNumber(), primaryStateRequest.firstSyncPointAfterRolloverSequenceNumber()); - mqbs::FileStore* fs = d_fileStores[partitionId].get(); - BSLS_ASSERT_SAFE(fs); - - dispatchEventToPartition(fs, - PartitionFSM::Event::e_PRIMARY_STATE_RQST, + dispatchEventToPartition(PartitionFSM::Event::e_PRIMARY_STATE_RQST, eventDataVec); } @@ -4387,11 +4400,7 @@ void StorageManager::processReplicaStateRequest( replicaStateRequest.latestSequenceNumber(), replicaStateRequest.firstSyncPointAfterRolloverSequenceNumber()); - mqbs::FileStore* fs = d_fileStores[partitionId].get(); - BSLS_ASSERT_SAFE(fs); - - dispatchEventToPartition(fs, - PartitionFSM::Event::e_REPLICA_STATE_RQST, + dispatchEventToPartition(PartitionFSM::Event::e_REPLICA_STATE_RQST, eventDataVec); } @@ -4516,17 +4525,12 @@ void StorageManager::processStorageEvent( EventData eventDataVec; eventDataVec.emplace_back(event.clusterNode(), pid, 1, event.blob()); - mqbs::FileStore* fs = d_fileStores[pid].get(); - BSLS_ASSERT_SAFE(fs); - if (rawEvent.isStorageEvent()) { - dispatchEventToPartition(fs, - PartitionFSM::Event::e_LIVE_DATA, + dispatchEventToPartition(PartitionFSM::Event::e_LIVE_DATA, eventDataVec); } else { - dispatchEventToPartition(fs, - PartitionFSM::Event::e_RECOVERY_DATA, + dispatchEventToPartition(PartitionFSM::Event::e_RECOVERY_DATA, eventDataVec); } } diff --git a/src/groups/mqb/mqbc/mqbc_storagemanager.h b/src/groups/mqb/mqbc/mqbc_storagemanager.h index ac4a6235eb..6c0fe49655 100644 --- a/src/groups/mqb/mqbc/mqbc_storagemanager.h +++ b/src/groups/mqb/mqbc/mqbc_storagemanager.h @@ -356,7 +356,8 @@ class StorageManager BSLS_KEYWORD_FINAL /// for the i-th partitionId. bsl::vector d_numReplicaDataResponsesReceivedVec; - /// Whether `d_queueKeyInfoMapVec` has been initialized. + /// Whether `d_queueKeyInfoMapVec` has been initialized. This data + /// structure only needs to be initialized once at startup, and no more. bsls::AtomicBool d_isQueueKeyInfoMapVecInitialized; /// Mapping from queue key to queue info indexed by partitionId, populated @@ -457,9 +458,9 @@ class StorageManager BSLS_KEYWORD_FINAL /// Dispatch the event to *QUEUE DISPATCHER* thread associated with /// the partitionId as per the specified `eventDataVec` with the - /// specified `event` using the specified `fs`. - void dispatchEventToPartition(mqbs::FileStore* fs, - PartitionFSM::Event::Enum event, + /// specified `event`. If we are already in *QUEUE DISPATCHER* thread, + /// then execute the event in place. + void dispatchEventToPartition(PartitionFSM::Event::Enum event, const EventData& eventDataVec); /// Set the primary status of the specified `partitionId` to the specified @@ -471,14 +472,31 @@ class StorageManager BSLS_KEYWORD_FINAL int partitionId, bmqp_ctrlmsg::PrimaryStatus::Value value); + /// THREAD: This method is invoked in the associated Queue dispatcher + /// thread for the specified `partitionId`. + void setPrimaryForPartitionDispatched(int partitionId, + mqbnet::ClusterNode* primaryNode, + unsigned int primaryLeaseId); + + /// THREAD: This method is invoked in the associated Queue dispatcher + /// thread for the specified `partitionId`. + void clearPrimaryForPartitionDispatched(int partitionId, + mqbnet::ClusterNode* primary); + /// Apply DETECT_SelfPrimary event to PartitionFSM using the specified /// `partitionId`, `primaryNode`, `primaryLeaseId`. + /// + /// THREAD: This method is invoked in the associated Queue dispatcher + /// thread for the specified `partitionId`. void processPrimaryDetect(int partitionId, mqbnet::ClusterNode* primaryNode, unsigned int primaryLeaseId); /// Apply DETECT_SelfReplica event to StorageFSM using the specified /// `partitionId`, `primaryNode` and `primaryLeaseId`. + /// + /// THREAD: This method is invoked in the associated Queue dispatcher + /// thread for the specified `partitionId`. void processReplicaDetect(int partitionId, mqbnet::ClusterNode* primaryNode, unsigned int primaryLeaseId); @@ -585,12 +603,6 @@ class StorageManager BSLS_KEYWORD_FINAL void do_storeReplicaSeq(const PartitionFSMArgsSp& args) BSLS_KEYWORD_OVERRIDE; - void do_storePartitionInfo(const PartitionFSMArgsSp& args) - BSLS_KEYWORD_OVERRIDE; - - void do_clearPartitionInfo(const PartitionFSMArgsSp& args) - BSLS_KEYWORD_OVERRIDE; - void do_replicaStateRequest(const PartitionFSMArgsSp& args) BSLS_KEYWORD_OVERRIDE; diff --git a/src/groups/mqb/mqbc/mqbc_storagemanager.t.cpp b/src/groups/mqb/mqbc/mqbc_storagemanager.t.cpp index 7fef142732..b3c87a628a 100644 --- a/src/groups/mqb/mqbc/mqbc_storagemanager.t.cpp +++ b/src/groups/mqb/mqbc/mqbc_storagemanager.t.cpp @@ -232,7 +232,7 @@ struct TestHelper { .choice() .makeReplicaStateRequest(); - replicaStateRequest.partitionId() = partitionId; + replicaStateRequest.partitionId() = partitionId; replicaStateRequest.latestSequenceNumber() = seqNum; // TODO: set sequence number once add mocked recovery manager to this @@ -649,7 +649,7 @@ struct TestHelper { .choice() .makePrimaryStateRequest(); - primaryStateRequest.partitionId() = partitionId; + primaryStateRequest.partitionId() = partitionId; primaryStateRequest.latestSequenceNumber() = seqNum; for (TestChannelMapCIter cit = d_cluster_mp->_channels().cbegin(); @@ -687,7 +687,7 @@ struct TestHelper { .choice() .makeReplicaStateResponse(); - replicaStateResponse.partitionId() = partitionId; + replicaStateResponse.partitionId() = partitionId; replicaStateResponse.latestSequenceNumber() = seqNum; for (TestChannelMapCIter cit = d_cluster_mp->_channels().cbegin(); @@ -1143,105 +1143,7 @@ static void test3_unknownDetectSelfReplica() helper.d_cluster_mp->stop(); } -static void test4_primaryHealingStage1DetectSelfReplica() -// ------------------------------------------------------------------------ -// BREATHING TEST -// -// Concerns: -// Ensure proper building and starting of the StorageManager -// -// Plan: -// 1) Create a StorageManager on the stack -// 2) Invoke start. -// 3) Transition to Primary healing stage 1 and then detect self replica. -// 4) Verify the actions as per FSM. -// 5) Invoke stop. -// -// Testing: -// Basic functionality. -// ------------------------------------------------------------------------ -{ - bmqtst::TestHelper::printTestName("BREATHING TEST - " - "PRIMARY HEALING STAGE 1 DETECTS SELF AS" - " REPLICA"); - - TestHelper helper; - - mqbc::StorageManager storageManager( - helper.d_cluster_mp->_clusterDefinition(), - helper.d_cluster_mp.get(), - helper.d_cluster_mp->_clusterData(), - helper.d_cluster_mp->_state(), - helper.d_cluster_mp->_clusterData()->domainFactory(), - helper.d_cluster_mp->dispatcher(), - k_WATCHDOG_TIMEOUT_DURATION, - mockOnRecoveryStatus, - mockOnPartitionPrimaryStatus, - bmqtst::TestHelperUtil::allocator()); - - bmqu::MemOutStream errorDescription; - - static const int k_PARTITION_ID = 1; - - const int rc = storageManager.start(errorDescription); - BSLS_ASSERT_OPT(rc == 0); - - BSLS_ASSERT_OPT(storageManager.partitionHealthState(k_PARTITION_ID) == - mqbc::PartitionFSM::State::e_UNKNOWN); - - const int selfNodeId = helper.d_cluster_mp->_clusterData() - ->membership() - .netCluster() - ->selfNodeId(); - - mqbnet::ClusterNode* selfNode = helper.d_cluster_mp->_clusterData() - ->membership() - .netCluster() - ->lookupNode(selfNodeId); - - helper.setPartitionPrimary(&storageManager, - k_PARTITION_ID, - 1, // primaryLeaseId - selfNode); - - BSLS_ASSERT_OPT(storageManager.partitionHealthState(k_PARTITION_ID) == - mqbc::PartitionFSM::State::e_PRIMARY_HEALING_STG1); - - helper.verifyPrimarySendsReplicaStateRqst(k_PARTITION_ID, selfNodeId); - helper.clearChannels(); - - const NodeToSeqNumCtxMap& nodeToSeqNumCtxMap = - storageManager.nodeToSeqNumCtxMap(k_PARTITION_ID); - - BSLS_ASSERT_OPT(nodeToSeqNumCtxMap.size() == 1); - - // Apply Detect Self Replica event to Node in primaryHealingStage1. - - const int primaryNodeId = selfNodeId + 1; - mqbnet::ClusterNode* primaryNode = helper.d_cluster_mp->_clusterData() - ->membership() - .netCluster() - ->lookupNode(primaryNodeId); - - helper.setPartitionPrimary(&storageManager, - k_PARTITION_ID, - 1, // primaryLeaseId - primaryNode); - - BMQTST_ASSERT_EQ(storageManager.nodeToSeqNumCtxMap(k_PARTITION_ID).size(), - 1U); - BMQTST_ASSERT_EQ(storageManager.partitionHealthState(k_PARTITION_ID), - mqbc::PartitionFSM::State::e_REPLICA_HEALING); - - helper.verifyReplicaSendsPrimaryStateRqst(k_PARTITION_ID, primaryNodeId); - helper.clearChannels(); - - // Stop the cluster - storageManager.stop(); - helper.d_cluster_mp->stop(); -} - -static void test5_primaryHealingStage1ReceivesReplicaStateRqst() +static void test4_primaryHealingStage1ReceivesReplicaStateRqst() // ------------------------------------------------------------------------ // BREATHING TEST // @@ -1332,7 +1234,7 @@ static void test5_primaryHealingStage1ReceivesReplicaStateRqst() seqNum.sequenceNumber() = 1U; seqNum.primaryLeaseId() = 1U; - replicaStateRequest.partitionId() = k_PARTITION_ID; + replicaStateRequest.partitionId() = k_PARTITION_ID; replicaStateRequest.latestSequenceNumber() = seqNum; mqbnet::ClusterNode* source = helper.d_cluster_mp->_clusterData() @@ -1355,7 +1257,7 @@ static void test5_primaryHealingStage1ReceivesReplicaStateRqst() helper.d_cluster_mp->stop(); } -static void test6_primaryHealingStage1ReceivesReplicaStateRspnQuorum() +static void test5_primaryHealingStage1ReceivesReplicaStateRspnQuorum() // ------------------------------------------------------------------------ // BREATHING TEST // @@ -1443,7 +1345,7 @@ static void test6_primaryHealingStage1ReceivesReplicaStateRspnQuorum() seqNum.sequenceNumber() = 1U; seqNum.primaryLeaseId() = 1U; - replicaStateResponse.partitionId() = k_PARTITION_ID; + replicaStateResponse.partitionId() = k_PARTITION_ID; replicaStateResponse.latestSequenceNumber() = seqNum; helper.d_cluster_mp->requestManager().processResponse(message); @@ -1464,7 +1366,7 @@ static void test6_primaryHealingStage1ReceivesReplicaStateRspnQuorum() helper.d_cluster_mp->stop(); } -static void test7_primaryHealingStage1ReceivesPrimaryStateRequestQuorum() +static void test6_primaryHealingStage1ReceivesPrimaryStateRequestQuorum() // ------------------------------------------------------------------------ // BREATHING TEST // @@ -1554,7 +1456,7 @@ static void test7_primaryHealingStage1ReceivesPrimaryStateRequestQuorum() seqNum.sequenceNumber() = 1U; seqNum.primaryLeaseId() = 1U; - primaryStateRequest.partitionId() = k_PARTITION_ID; + primaryStateRequest.partitionId() = k_PARTITION_ID; primaryStateRequest.latestSequenceNumber() = seqNum; storageManager.processPrimaryStateRequest(message, replica1); @@ -1579,7 +1481,7 @@ static void test7_primaryHealingStage1ReceivesPrimaryStateRequestQuorum() helper.d_cluster_mp->stop(); } -static void test8_primaryHealingStage1ReceivesPrimaryStateRqst() +static void test7_primaryHealingStage1ReceivesPrimaryStateRqst() // ------------------------------------------------------------------------ // BREATHING TEST // @@ -1673,7 +1575,7 @@ static void test8_primaryHealingStage1ReceivesPrimaryStateRqst() seqNum.sequenceNumber() = 1U; seqNum.primaryLeaseId() = 1U; - primaryStateRequest.partitionId() = k_PARTITION_ID; + primaryStateRequest.partitionId() = k_PARTITION_ID; primaryStateRequest.latestSequenceNumber() = seqNum; storageManager.processPrimaryStateRequest(message, replicaNode); @@ -1691,7 +1593,7 @@ static void test8_primaryHealingStage1ReceivesPrimaryStateRqst() helper.d_cluster_mp->stop(); } -static void test9_primaryHealingStage1ReceivesReplicaStateRspnNoQuorum() +static void test8_primaryHealingStage1ReceivesReplicaStateRspnNoQuorum() // ------------------------------------------------------------------------ // BREATHING TEST // @@ -1779,7 +1681,7 @@ static void test9_primaryHealingStage1ReceivesReplicaStateRspnNoQuorum() seqNum.sequenceNumber() = 1U; seqNum.primaryLeaseId() = 1U; - replicaStateResponse.partitionId() = k_PARTITION_ID; + replicaStateResponse.partitionId() = k_PARTITION_ID; replicaStateResponse.latestSequenceNumber() = seqNum; helper.d_cluster_mp->requestManager().processResponse(message); @@ -1806,7 +1708,7 @@ static void test9_primaryHealingStage1ReceivesReplicaStateRspnNoQuorum() helper.d_cluster_mp->stop(); } -static void test10_primaryHealingStage1QuorumSendsReplicaDataRequestPull() +static void test9_primaryHealingStage1QuorumSendsReplicaDataRequestPull() // ------------------------------------------------------------------------ // BREATHING TEST // @@ -1896,20 +1798,20 @@ static void test10_primaryHealingStage1QuorumSendsReplicaDataRequestPull() seqNum.sequenceNumber() = 1U; seqNum.primaryLeaseId() = 1U; - replicaStateResponse.partitionId() = k_PARTITION_ID; + replicaStateResponse.partitionId() = k_PARTITION_ID; replicaStateResponse.latestSequenceNumber() = seqNum; helper.d_cluster_mp->requestManager().processResponse(message); - message.rId() = k_REQUEST_ID + 1; - seqNum.sequenceNumber() = 7U; - seqNum.primaryLeaseId() = 1U; + message.rId() = k_REQUEST_ID + 1; + seqNum.sequenceNumber() = 7U; + seqNum.primaryLeaseId() = 1U; replicaStateResponse.latestSequenceNumber() = seqNum; helper.d_cluster_mp->requestManager().processResponse(message); - message.rId() = k_REQUEST_ID + 2; - seqNum.sequenceNumber() = 3U; - seqNum.primaryLeaseId() = 1U; + message.rId() = k_REQUEST_ID + 2; + seqNum.sequenceNumber() = 3U; + seqNum.primaryLeaseId() = 1U; replicaStateResponse.latestSequenceNumber() = seqNum; helper.d_cluster_mp->requestManager().processResponse(message); @@ -1936,156 +1838,7 @@ static void test10_primaryHealingStage1QuorumSendsReplicaDataRequestPull() helper.d_cluster_mp->stop(); } -static void test11_primaryHealingStage2DetectSelfReplica() -// ------------------------------------------------------------------------ -// BREATHING TEST -// -// Concerns: -// Ensure proper building and starting of the StorageManager -// -// Plan: -// 1) Create a StorageManager on the stack -// 2) Invoke start. -// 3) Transition to Primary healing stage 2 and then detect self replica. -// 4) Verify the actions as per FSM. -// 5) Invoke stop. -// -// Testing: -// Basic functionality. -// ------------------------------------------------------------------------ -{ - bmqtst::TestHelper::printTestName("BREATHING TEST - " - "PRIMARY HEALING STAGE 2 DETECTS SELF AS" - " REPLICA"); - - TestHelper helper; - - mqbc::StorageManager storageManager( - helper.d_cluster_mp->_clusterDefinition(), - helper.d_cluster_mp.get(), - helper.d_cluster_mp->_clusterData(), - helper.d_cluster_mp->_state(), - helper.d_cluster_mp->_clusterData()->domainFactory(), - helper.d_cluster_mp->dispatcher(), - k_WATCHDOG_TIMEOUT_DURATION, - mockOnRecoveryStatus, - mockOnPartitionPrimaryStatus, - bmqtst::TestHelperUtil::allocator()); - - bmqu::MemOutStream errorDescription; - - static const int k_PARTITION_ID = 1; - - const int rc = storageManager.start(errorDescription); - BSLS_ASSERT_OPT(rc == 0); - - BSLS_ASSERT_OPT(storageManager.partitionHealthState(k_PARTITION_ID) == - mqbc::PartitionFSM::State::e_UNKNOWN); - - const int selfNodeId = helper.d_cluster_mp->_clusterData() - ->membership() - .netCluster() - ->selfNodeId(); - - mqbnet::ClusterNode* selfNode = helper.d_cluster_mp->_clusterData() - ->membership() - .netCluster() - ->lookupNode(selfNodeId); - - helper.setPartitionPrimary(&storageManager, - k_PARTITION_ID, - 1, // primaryLeaseId - selfNode); - - BSLS_ASSERT_OPT(storageManager.partitionHealthState(k_PARTITION_ID) == - mqbc::PartitionFSM::State::e_PRIMARY_HEALING_STG1); - - helper.verifyPrimarySendsReplicaStateRqst(k_PARTITION_ID, selfNodeId); - helper.clearChannels(); - - const NodeToSeqNumCtxMap& nodeToSeqNumCtxMap = - storageManager.nodeToSeqNumCtxMap(k_PARTITION_ID); - - BSLS_ASSERT_OPT(nodeToSeqNumCtxMap.size() == 1); - - // Receives ReplicaStateResponse from replica nodes. - static const int k_REQUEST_ID = 1; - bmqp_ctrlmsg::ControlMessage message; - message.rId() = k_REQUEST_ID; - bmqp_ctrlmsg::ReplicaStateResponse& replicaStateResponse = - message.choice() - .makeClusterMessage() - .choice() - .makePartitionMessage() - .choice() - .makeReplicaStateResponse(); - - bmqp_ctrlmsg::PartitionSequenceNumber seqNum; - seqNum.sequenceNumber() = 1U; - seqNum.primaryLeaseId() = 1U; - - replicaStateResponse.partitionId() = k_PARTITION_ID; - replicaStateResponse.latestSequenceNumber() = seqNum; - - helper.d_cluster_mp->requestManager().processResponse(message); - - message.rId() = k_REQUEST_ID + 1; - seqNum.sequenceNumber() = 7U; - seqNum.primaryLeaseId() = 1U; - replicaStateResponse.latestSequenceNumber() = seqNum; - helper.d_cluster_mp->requestManager().processResponse(message); - - message.rId() = k_REQUEST_ID + 2; - seqNum.sequenceNumber() = 3U; - seqNum.primaryLeaseId() = 1U; - replicaStateResponse.latestSequenceNumber() = seqNum; - helper.d_cluster_mp->requestManager().processResponse(message); - - BSLS_ASSERT_OPT(storageManager.nodeToSeqNumCtxMap(k_PARTITION_ID).size() == - 4U); - BSLS_ASSERT_OPT(storageManager.partitionHealthState(k_PARTITION_ID) == - mqbc::PartitionFSM::State::e_PRIMARY_HEALING_STG2); - - const NodeSeqNumPair& highestSeqNumNode = - helper.getHighestSeqNumNodeDetails( - selfNode, - storageManager.nodeToSeqNumCtxMap(k_PARTITION_ID)); - - BSLS_ASSERT_OPT(highestSeqNumNode.first != selfNode); - BSLS_ASSERT_OPT(highestSeqNumNode.second.sequenceNumber() == 7U); - - helper.verifyPrimarySendsReplicaDataRqstPull( - k_PARTITION_ID, - highestSeqNumNode.first->nodeId(), - highestSeqNumNode.second); - helper.clearChannels(); - - // Apply Detect Self Replica event to Node in primaryHealingStage2. - const int primaryNodeId = selfNodeId + 1; - mqbnet::ClusterNode* primaryNode = helper.d_cluster_mp->_clusterData() - ->membership() - .netCluster() - ->lookupNode(primaryNodeId); - - helper.setPartitionPrimary(&storageManager, - k_PARTITION_ID, - 1, // primaryLeaseId - primaryNode); - - BMQTST_ASSERT_EQ(storageManager.nodeToSeqNumCtxMap(k_PARTITION_ID).size(), - 1U); - BMQTST_ASSERT_EQ(storageManager.partitionHealthState(k_PARTITION_ID), - mqbc::PartitionFSM::State::e_REPLICA_HEALING); - - helper.verifyReplicaSendsPrimaryStateRqst(k_PARTITION_ID, primaryNodeId); - helper.clearChannels(); - - // Stop the cluster - storageManager.stop(); - helper.d_cluster_mp->stop(); -} - -static void test12_replicaHealingDetectSelfPrimary() +static void test10_replicaHealingDetectSelfPrimary() // ------------------------------------------------------------------------ // BREATHING TEST // @@ -2183,7 +1936,7 @@ static void test12_replicaHealingDetectSelfPrimary() helper.d_cluster_mp->stop(); } -static void test13_replicaHealingReceivesReplicaStateRqst() +static void test11_replicaHealingReceivesReplicaStateRqst() // ------------------------------------------------------------------------ // BREATHING TEST // @@ -2274,7 +2027,7 @@ static void test13_replicaHealingReceivesReplicaStateRqst() seqNum.sequenceNumber() = 1U; seqNum.primaryLeaseId() = 1U; - replicaStateRequest.partitionId() = k_PARTITION_ID; + replicaStateRequest.partitionId() = k_PARTITION_ID; replicaStateRequest.latestSequenceNumber() = seqNum; storageManager.processReplicaStateRequest(message, primaryNode); @@ -2291,7 +2044,7 @@ static void test13_replicaHealingReceivesReplicaStateRqst() helper.d_cluster_mp->stop(); } -static void test14_replicaHealingReceivesPrimaryStateRspn() +static void test12_replicaHealingReceivesPrimaryStateRspn() // ------------------------------------------------------------------------ // BREATHING TEST // @@ -2382,7 +2135,7 @@ static void test14_replicaHealingReceivesPrimaryStateRspn() seqNum.sequenceNumber() = 1U; seqNum.primaryLeaseId() = 1U; - primaryStateResponse.partitionId() = k_PARTITION_ID; + primaryStateResponse.partitionId() = k_PARTITION_ID; primaryStateResponse.latestSequenceNumber() = seqNum; helper.d_cluster_mp->requestManager().processResponse(message); @@ -2397,7 +2150,7 @@ static void test14_replicaHealingReceivesPrimaryStateRspn() helper.d_cluster_mp->stop(); } -static void test15_replicaHealingReceivesFailedPrimaryStateRspn() +static void test13_replicaHealingReceivesFailedPrimaryStateRspn() // ------------------------------------------------------------------------ // BREATHING TEST // @@ -2492,7 +2245,7 @@ static void test15_replicaHealingReceivesFailedPrimaryStateRspn() helper.d_cluster_mp->stop(); } -static void test16_replicaHealingReceivesPrimaryStateRqst() +static void test14_replicaHealingReceivesPrimaryStateRqst() // ------------------------------------------------------------------------ // BREATHING TEST // @@ -2588,7 +2341,7 @@ static void test16_replicaHealingReceivesPrimaryStateRqst() seqNum.sequenceNumber() = 1U; seqNum.primaryLeaseId() = 1U; - primaryStateRequest.partitionId() = k_PARTITION_ID; + primaryStateRequest.partitionId() = k_PARTITION_ID; primaryStateRequest.latestSequenceNumber() = seqNum; storageManager.processPrimaryStateRequest(message, rogueNode); @@ -2606,7 +2359,7 @@ static void test16_replicaHealingReceivesPrimaryStateRqst() helper.d_cluster_mp->stop(); } -static void test17_replicaHealingReceivesReplicaDataRqstPull() +static void test15_replicaHealingReceivesReplicaDataRqstPull() // ------------------------------------------------------------------------ // BREATHING TEST // @@ -2713,7 +2466,7 @@ static void test17_replicaHealingReceivesReplicaDataRqstPull() k_PRIMARY_SEQ_NUM.sequenceNumber() = 1U; k_PRIMARY_SEQ_NUM.primaryLeaseId() = 1U; - replicaStateRequest.partitionId() = k_PARTITION_ID; + replicaStateRequest.partitionId() = k_PARTITION_ID; replicaStateRequest.latestSequenceNumber() = k_PRIMARY_SEQ_NUM; storageManager.processReplicaStateRequest(message, primaryNode); @@ -2762,7 +2515,7 @@ static void test17_replicaHealingReceivesReplicaDataRqstPull() helper.d_cluster_mp->stop(); } -static void test18_primaryHealingStage1SelfHighestSendsDataChunks() +static void test16_primaryHealingStage1SelfHighestSendsDataChunks() // ------------------------------------------------------------------------ // BREATHING TEST // @@ -2866,21 +2619,21 @@ static void test18_primaryHealingStage1SelfHighestSendsDataChunks() .makeReplicaStateResponse(); bmqp_ctrlmsg::PartitionSequenceNumber k_REPLICA_SEQ_NUM_1; - k_REPLICA_SEQ_NUM_1.primaryLeaseId() = k_PRIMARY_LEASE_ID; - k_REPLICA_SEQ_NUM_1.sequenceNumber() = 3U; - replicaStateResponse.partitionId() = k_PARTITION_ID; + k_REPLICA_SEQ_NUM_1.primaryLeaseId() = k_PRIMARY_LEASE_ID; + k_REPLICA_SEQ_NUM_1.sequenceNumber() = 3U; + replicaStateResponse.partitionId() = k_PARTITION_ID; replicaStateResponse.latestSequenceNumber() = k_REPLICA_SEQ_NUM_1; helper.d_cluster_mp->requestManager().processResponse(message); bmqp_ctrlmsg::PartitionSequenceNumber k_REPLICA_SEQ_NUM_2; - k_REPLICA_SEQ_NUM_2.primaryLeaseId() = k_PRIMARY_LEASE_ID; - k_REPLICA_SEQ_NUM_2.sequenceNumber() = 5U; - message.rId() = k_REQUEST_ID + 1; + k_REPLICA_SEQ_NUM_2.primaryLeaseId() = k_PRIMARY_LEASE_ID; + k_REPLICA_SEQ_NUM_2.sequenceNumber() = 5U; + message.rId() = k_REQUEST_ID + 1; replicaStateResponse.latestSequenceNumber() = k_REPLICA_SEQ_NUM_2; helper.d_cluster_mp->requestManager().processResponse(message); - message.rId() = k_REQUEST_ID + 2; + message.rId() = k_REQUEST_ID + 2; replicaStateResponse.latestSequenceNumber() = selfSeqNum; helper.d_cluster_mp->requestManager().processResponse(message); @@ -2923,7 +2676,7 @@ static void test18_primaryHealingStage1SelfHighestSendsDataChunks() helper.d_cluster_mp->stop(); } -static void test19_fileSizesHardLimits() +static void test17_fileSizesHardLimits() // ------------------------------------------------------------------------ // FILE SIZES HARD LIMITS // @@ -3046,28 +2799,26 @@ int main(int argc, char* argv[]) // - test21_replicaHealingReceivesReplicaDataRqstDrop(); // - test20_replicaHealingReceivesReplicaDataRqstPush(); // - test19_primaryHealedSendsDataChunks(); - case 19: test19_fileSizesHardLimits(); break; - case 18: test18_primaryHealingStage1SelfHighestSendsDataChunks(); break; - case 17: test17_replicaHealingReceivesReplicaDataRqstPull(); break; - case 16: test16_replicaHealingReceivesPrimaryStateRqst(); break; - case 15: test15_replicaHealingReceivesFailedPrimaryStateRspn(); break; - case 14: test14_replicaHealingReceivesPrimaryStateRspn(); break; - case 13: test13_replicaHealingReceivesReplicaStateRqst(); break; - case 12: test12_replicaHealingDetectSelfPrimary(); break; - case 11: test11_primaryHealingStage2DetectSelfReplica(); break; - case 10: - test10_primaryHealingStage1QuorumSendsReplicaDataRequestPull(); - break; + case 17: test17_fileSizesHardLimits(); break; + case 16: test16_primaryHealingStage1SelfHighestSendsDataChunks(); break; + case 15: test15_replicaHealingReceivesReplicaDataRqstPull(); break; + case 14: test14_replicaHealingReceivesPrimaryStateRqst(); break; + case 13: test13_replicaHealingReceivesFailedPrimaryStateRspn(); break; + case 12: test12_replicaHealingReceivesPrimaryStateRspn(); break; + case 11: test11_replicaHealingReceivesReplicaStateRqst(); break; + case 10: test10_replicaHealingDetectSelfPrimary(); break; case 9: - test9_primaryHealingStage1ReceivesReplicaStateRspnNoQuorum(); + test9_primaryHealingStage1QuorumSendsReplicaDataRequestPull(); + break; + case 8: + test8_primaryHealingStage1ReceivesReplicaStateRspnNoQuorum(); break; - case 8: test8_primaryHealingStage1ReceivesPrimaryStateRqst(); break; - case 7: - test7_primaryHealingStage1ReceivesPrimaryStateRequestQuorum(); + case 7: test7_primaryHealingStage1ReceivesPrimaryStateRqst(); break; + case 6: + test6_primaryHealingStage1ReceivesPrimaryStateRequestQuorum(); break; - case 6: test6_primaryHealingStage1ReceivesReplicaStateRspnQuorum(); break; - case 5: test5_primaryHealingStage1ReceivesReplicaStateRqst(); break; - case 4: test4_primaryHealingStage1DetectSelfReplica(); break; + case 5: test5_primaryHealingStage1ReceivesReplicaStateRspnQuorum(); break; + case 4: test4_primaryHealingStage1ReceivesReplicaStateRqst(); break; case 3: test3_unknownDetectSelfReplica(); break; case 2: test2_unknownDetectSelfPrimary(); break; case 1: test1_breathingTest(); break; diff --git a/src/groups/mqb/mqbc/mqbc_storageutil.cpp b/src/groups/mqb/mqbc/mqbc_storageutil.cpp index bcc2608db0..ed0ee6e5a5 100644 --- a/src/groups/mqb/mqbc/mqbc_storageutil.cpp +++ b/src/groups/mqb/mqbc/mqbc_storageutil.cpp @@ -1265,11 +1265,10 @@ int StorageUtil::assignPartitionDispatcherThreads( } void StorageUtil::clearPrimaryForPartition( - mqbs::FileStore* fs, - PartitionInfo* partitionInfo, - const bsl::string& clusterDescription, - int partitionId, - mqbnet::ClusterNode* primary) + mqbs::FileStore* fs, + PartitionInfo* partitionInfo, + const bsl::string& clusterDescription, + int partitionId) { // executed by *QUEUE_DISPATCHER* thread associated with 'partitionId' @@ -1284,10 +1283,6 @@ void StorageUtil::clearPrimaryForPartition( return; // RETURN } - if (primary != partitionInfo->primary()) { - return; // RETURN - } - BALL_LOG_INFO << clusterDescription << " Partition [" << partitionId << "]: processing 'clear-primary' event. Current primary: " << partitionInfo->primary()->nodeDescription() diff --git a/src/groups/mqb/mqbc/mqbc_storageutil.h b/src/groups/mqb/mqbc/mqbc_storageutil.h index b3adfec0f9..421e25d28c 100644 --- a/src/groups/mqb/mqbc/mqbc_storageutil.h +++ b/src/groups/mqb/mqbc/mqbc_storageutil.h @@ -499,18 +499,16 @@ struct StorageUtil { const bdlb::NullableValue& queueCreationCb = bdlb::NullableValue()); - /// Clear the specified `primary` of the specified `partitionId` from - /// the specified `fs` and `partitionInfo`, using the specified - /// `clusterDescription`. Behavior is undefined unless the specified - /// `partitionId` is in range and the specified `primary` is not null. + /// Clear the primary of the specified `partitionId` from the specified + /// `fs` and `partitionInfo`, using the specified clusterDescription`. + /// Behavior is undefined unless the specified `partitionId` is in range. /// /// THREAD: Executed by the queue dispatcher thread associated with /// 'partitionId'. static void clearPrimaryForPartition(mqbs::FileStore* fs, PartitionInfo* partitionInfo, const bsl::string& clusterDescription, - int partitionId, - mqbnet::ClusterNode* primary); + int partitionId); /// Find the minimum required disk space using the specified `config`. static bsls::Types::Uint64 diff --git a/src/groups/mqb/mqbs/mqbs_filestore.cpp b/src/groups/mqb/mqbs/mqbs_filestore.cpp index 241320c09e..8a322739dd 100644 --- a/src/groups/mqb/mqbs/mqbs_filestore.cpp +++ b/src/groups/mqb/mqbs/mqbs_filestore.cpp @@ -5581,6 +5581,9 @@ int FileStore::writeQueueCreationRecord(DataStoreRecordHandle* handle, BSLS_ASSERT_SAFE(!queueKey.isNull()); BSLS_ASSERT_SAFE(d_fileSets.size() > 0); + BALL_LOG_ERROR << "xxm0 " << queueUri << " " << queueKey << " " + << appIdKeyPairs.size() << " " << isNewQueue; + enum { rc_SUCCESS = 0, rc_STOPPING = -1,