@@ -413,6 +413,13 @@ void StorageManager::setPrimaryForPartitionDispatched(
413413 pinfo.setPrimaryLeaseId (primaryLeaseId);
414414 pinfo.setPrimaryStatus (bmqp_ctrlmsg::PrimaryStatus::E_PASSIVE);
415415
416+ if (!d_isQueueKeyInfoMapVecInitialized) {
417+ // We must wait until queue key info map is initialized before
418+ // processing primary detection in the Partition FSM.
419+
420+ // RETURN
421+ }
422+
416423 if (primaryNode->nodeId () ==
417424 d_clusterData_p->membership ().selfNode ()->nodeId ()) {
418425 processPrimaryDetect (partitionId, primaryNode, primaryLeaseId);
@@ -472,6 +479,7 @@ void StorageManager::processPrimaryDetect(int partitionId,
472479 BSLS_ASSERT_SAFE (primaryNode->nodeId () ==
473480 d_clusterData_p->membership ().selfNode ()->nodeId ());
474481
482+ BALL_LOG_ERROR << " xxm" ;
475483 if (d_cluster_p->isStopping ()) {
476484 BALL_LOG_WARN << d_clusterData_p->identity ().description ()
477485 << " Partition [" << partitionId << " ]: "
@@ -480,6 +488,13 @@ void StorageManager::processPrimaryDetect(int partitionId,
480488 return ; // RETURN
481489 }
482490
491+ BALL_LOG_ERROR << " xxm" ;
492+ // The fact that we are processing primary detection retraoactively implies
493+ // that queue key info map must have been initialized. We cannot set this
494+ // flag earlier, or else there could be race condition. TODO Explain
495+ // further.
496+ d_isQueueKeyInfoMapVecInitialized = true ;
497+
483498 BALL_LOG_INFO << d_clusterData_p->identity ().description ()
484499 << " Partition [" << partitionId << " ]: "
485500 << " Self Transition to Primary in the Partition FSM." ;
@@ -509,6 +524,7 @@ void StorageManager::processReplicaDetect(int partitionId,
509524 BSLS_ASSERT_SAFE (primaryNode->nodeId () !=
510525 d_clusterData_p->membership ().selfNode ()->nodeId ());
511526
527+ BALL_LOG_ERROR << " xxm" ;
512528 if (d_cluster_p->isStopping ()) {
513529 BALL_LOG_WARN << d_clusterData_p->identity ().description ()
514530 << " Partition [" << partitionId << " ]: "
@@ -517,6 +533,13 @@ void StorageManager::processReplicaDetect(int partitionId,
517533 return ; // RETURN
518534 }
519535
536+ BALL_LOG_ERROR << " xxm" ;
537+ // The fact that we are processing primary detection retraoactively implies
538+ // that queue key info map must have been initialized. We cannot set this
539+ // flag earlier, or else there could be race condition. TODO Explain
540+ // further.
541+ d_isQueueKeyInfoMapVecInitialized = true ;
542+
520543 BALL_LOG_INFO << d_clusterData_p->identity ().description ()
521544 << " Partition [" << partitionId << " ]: "
522545 << " Self Transition to Replica in the Partition FSM." ;
@@ -3933,13 +3956,7 @@ void StorageManager::initializeQueueKeyInfoMap(
39333956 BSLS_ASSERT_SAFE (d_dispatcher_p->inDispatcherThread (d_cluster_p));
39343957
39353958 if (d_isQueueKeyInfoMapVecInitialized) {
3936- BALL_LOG_WARN << d_clusterData_p->identity ().description ()
3937- << " : Queue key info map should only be initialized "
3938- << " once, but the initalization method is called more "
3939- << " than once. This can happen if the node goes "
3940- << " back-and-forth between healing and healed FSM "
3941- << " states. Please check." ;
3942-
3959+ // Queue key info map should only be initialized once at startup.
39433960 return ; // RETURN
39443961 }
39453962
@@ -3949,6 +3966,7 @@ void StorageManager::initializeQueueKeyInfoMap(
39493966 bdlf::MemFnUtil::memFn (&QueueKeyInfoMap::empty)));
39503967
39513968 // Populate 'd_queueKeyInfoMapVec' from cluster state
3969+ BALL_LOG_ERROR << " xxm" ;
39523970 for (DomainStatesCIter dscit = clusterState.domainStates ().cbegin ();
39533971 dscit != clusterState.domainStates ().cend ();
39543972 ++dscit) {
@@ -3972,7 +3990,41 @@ void StorageManager::initializeQueueKeyInfoMap(
39723990 }
39733991 }
39743992
3975- d_isQueueKeyInfoMapVecInitialized = true ;
3993+ // TODO We don't set 'd_isQueueKeyInfoMapVecInitialized' to true here
3994+ // because TODO
3995+
3996+ // Loop for each partition in cluster state
3997+
3998+ BALL_LOG_ERROR << " xxm" ;
3999+ for (size_t pid = 0 ; pid < d_partitionInfoVec.size (); ++pid) {
4000+ mqbs::FileStore* fs = d_fileStores.at (pid).get ();
4001+ BSLS_ASSERT_SAFE (fs);
4002+ if (d_partitionInfoVec.at (pid).primary ()->nodeId () ==
4003+ d_clusterData_p->membership ().selfNode ()->nodeId ()) {
4004+ BALL_LOG_ERROR << " xxm" ;
4005+ BALL_LOG_ERROR << d_partitionInfoVec.at (pid).primary ();
4006+ BALL_LOG_ERROR << " xxm" ;
4007+ BALL_LOG_ERROR << d_partitionInfoVec.at (pid).primaryLeaseId ();
4008+ fs->execute (bdlf::BindUtil::bind (
4009+ &StorageManager::processPrimaryDetect,
4010+ this ,
4011+ pid,
4012+ d_partitionInfoVec.at (pid).primary (),
4013+ d_partitionInfoVec.at (pid).primaryLeaseId ()));
4014+ }
4015+ else {
4016+ BALL_LOG_ERROR << " xxm" ;
4017+ BALL_LOG_ERROR << d_partitionInfoVec.at (pid).primary ();
4018+ BALL_LOG_ERROR << " xxm" ;
4019+ BALL_LOG_ERROR << d_partitionInfoVec.at (pid).primaryLeaseId ();
4020+ fs->execute (bdlf::BindUtil::bind (
4021+ &StorageManager::processReplicaDetect,
4022+ this ,
4023+ pid,
4024+ d_partitionInfoVec.at (pid).primary (),
4025+ d_partitionInfoVec.at (pid).primaryLeaseId ()));
4026+ }
4027+ }
39764028}
39774029
39784030void StorageManager::registerQueue (const bmqt::Uri& uri,
0 commit comments