@@ -259,8 +259,7 @@ void BP5Writer::ReroutingCommunicationLoop()
259259 }
260260 }
261261
262- auto keepGoing = [&]()
263- {
262+ auto keepGoing = [&]() {
264263 if (iAmSubCoord)
265264 {
266265 return !receivedGroupClose || expectingWriteCompletion || waitingForCloseAcks;
@@ -316,14 +315,13 @@ void BP5Writer::ReroutingCommunicationLoop()
316315 receivedGroupClose = true ;
317316
318317 std::cout << " Rank " << m_RankMPI << " sending GROUP_CLOSE_ACK to rank "
319- << globalCoord << std::endl;
318+ << globalCoord << std::endl;
320319 adios2::helper::RerouteMessage closeAckMsg;
321320 closeAckMsg.m_MsgType = RerouteMessage::MessageType::GROUP_CLOSE_ACK;
322321 closeAckMsg.m_SrcRank = m_RankMPI;
323322 closeAckMsg.m_DestRank = globalCoord;
324323 closeAckMsg.m_WildCard = m_Aggregator->m_SubStreamIndex ;
325- closeAckMsg.BlockingSendTo (m_Comm, globalCoord,
326- sendBuffers.GetNextBuffer ());
324+ closeAckMsg.BlockingSendTo (m_Comm, globalCoord, sendBuffers.GetNextBuffer ());
327325 break ;
328326 case RerouteMessage::MessageType::GROUP_CLOSE_ACK:
329327 // msg for global coordinator
@@ -371,8 +369,9 @@ void BP5Writer::ReroutingCommunicationLoop()
371369 // is
372370 if (scRank != idleWriter)
373371 {
374- std::cout << " GC (" << m_RankMPI << " ) sending STATUS_INQUIRY to rank "
375- << scRank << std::endl;
372+ std::cout << " GC (" << m_RankMPI
373+ << " ) sending STATUS_INQUIRY to rank " << scRank
374+ << std::endl;
376375 adios2::helper::RerouteMessage inquiryMsg;
377376 inquiryMsg.m_MsgType = RerouteMessage::MessageType::STATUS_INQUIRY;
378377 inquiryMsg.m_SrcRank = m_RankMPI;
@@ -459,13 +458,16 @@ void BP5Writer::ReroutingCommunicationLoop()
459458 // msg for global coordinator
460459
461460 // Both the src and target subcoord states return from PENDING to their prior state
462- if (groupState[message.m_SrcRank ].m_currentStatus == WriterGroupState::Status::PENDING)
461+ if (groupState[message.m_SrcRank ].m_currentStatus ==
462+ WriterGroupState::Status::PENDING)
463463 {
464- groupState[message.m_SrcRank ].m_currentStatus = WriterGroupState::Status::WRITING;
464+ groupState[message.m_SrcRank ].m_currentStatus =
465+ WriterGroupState::Status::WRITING;
465466 groupState[message.m_SrcRank ].m_queueSize = 0 ;
466467 }
467468
468- if (groupState[message.m_DestRank ].m_currentStatus == WriterGroupState::Status::PENDING)
469+ if (groupState[message.m_DestRank ].m_currentStatus ==
470+ WriterGroupState::Status::PENDING)
469471 {
470472 groupState[message.m_DestRank ].m_currentStatus = WriterGroupState::Status::IDLE;
471473 }
@@ -520,7 +522,8 @@ void BP5Writer::ReroutingCommunicationLoop()
520522 // Check if writing has finished, and alert the target SC
521523 if (!sentFinished)
522524 {
523- // std::cout << "Rank " << m_RankMPI << " attempting to get lock on m_NotifMutex" << std::endl;
525+ // std::cout << "Rank " << m_RankMPI << " attempting to get lock on m_NotifMutex" <<
526+ // std::endl;
524527 std::lock_guard<std::mutex> lck (m_NotifMutex);
525528 if (m_FinishedWriting)
526529 {
@@ -534,7 +537,7 @@ void BP5Writer::ReroutingCommunicationLoop()
534537 if (!iAmSubCoord && !iAmGlobalCoord)
535538 {
536539 std::cout << " Rank " << m_RankMPI << " notifying SC (" << m_TargetCoordinator
537- << " ) of write completion -- BLOCKING" << std::endl;
540+ << " ) of write completion -- BLOCKING" << std::endl;
538541 // My only role was to write (no communication responsibility) so I am
539542 // done at this point. However, I need to do a blocking send because I
540543 // am about to return from this function, at which point my buffer pool
@@ -548,9 +551,9 @@ void BP5Writer::ReroutingCommunicationLoop()
548551 else
549552 {
550553 std::cout << " Rank " << m_RankMPI << " notifying SC (" << m_TargetCoordinator
551- << " ) of write completion -- NONBLOCKING" << std::endl;
554+ << " ) of write completion -- NONBLOCKING" << std::endl;
552555 writeCompleteMsg.NonBlockingSendTo (m_Comm, m_TargetCoordinator,
553- sendBuffers.GetNextBuffer ());
556+ sendBuffers.GetNextBuffer ());
554557 }
555558
556559 sentFinished = true ;
@@ -565,7 +568,8 @@ void BP5Writer::ReroutingCommunicationLoop()
565568 {
566569 // Pop the queue and send DO_WRITE
567570 int nextWriter = writerQueue.front ();
568- std::cout << " Rank " << m_RankMPI << " sending DO_WRITE to " << nextWriter << std::endl;
571+ std::cout << " Rank " << m_RankMPI << " sending DO_WRITE to " << nextWriter
572+ << std::endl;
569573 writerQueue.pop ();
570574 adios2::helper::RerouteMessage writeMsg;
571575 writeMsg.m_MsgType = RerouteMessage::MessageType::DO_WRITE;
@@ -579,7 +583,8 @@ void BP5Writer::ReroutingCommunicationLoop()
579583 }
580584 else if (!sentIdle)
581585 {
582- std::cout << " Rank " << m_RankMPI << " sending WRITER_IDLE to gc (" << globalCoord << " )" << std::endl;
586+ std::cout << " Rank " << m_RankMPI << " sending WRITER_IDLE to gc (" << globalCoord
587+ << " )" << std::endl;
583588 // Writer queue now empty, send WRITE_IDLE to the GC
584589 adios2::helper::RerouteMessage idleMsg;
585590 idleMsg.m_MsgType = RerouteMessage::MessageType::WRITER_IDLE;
@@ -605,14 +610,15 @@ void BP5Writer::ReroutingCommunicationLoop()
605610 // Look for possible reroute-to / reroute-from pairs
606611 if (iAmGlobalCoord && needStateCheck && !waitingForCloseAcks)
607612 {
608- std:: cout << " GC (" << m_RankMPI << " ) looking for reroute candidate pair" << std::endl;
613+ std::cout << " GC (" << m_RankMPI << " ) looking for reroute candidate pair" << std::endl;
609614 std::pair<size_t , size_t > nextPair;
610615 StateTraversal::SearchResult result = pairFinder.FindNextPair (groupState, nextPair);
611616
612617 if (result == StateTraversal::SearchResult::FOUND)
613618 {
614- std:: cout << " GC (" << m_RankMPI << " ) found reroute candidate pair (" << nextPair.first
615- << " , " << nextPair.second << " ), sending REROUTE_REQUEST" << std::endl;
619+ std::cout << " GC (" << m_RankMPI << " ) found reroute candidate pair ("
620+ << nextPair.first << " , " << nextPair.second
621+ << " ), sending REROUTE_REQUEST" << std::endl;
616622 // Finding a pair means there was both an idle group and a writing
617623 // group with at least one writer in its queue. With these, we will
618624 // initiate a reroute sequence.
@@ -636,7 +642,8 @@ void BP5Writer::ReroutingCommunicationLoop()
636642
637643 for (size_t i = 0 ; i < groupState.size (); ++i)
638644 {
639- std::cout << " group " << i << " status: " << static_cast <int >(groupState[i].m_currentStatus )
645+ std::cout << " group " << i
646+ << " status: " << static_cast <int >(groupState[i].m_currentStatus )
640647 << " , queue size: " << groupState[i].m_queueSize << std::endl;
641648 }
642649 // If we didn't find a pair, it could be because all the groups are
@@ -647,14 +654,17 @@ void BP5Writer::ReroutingCommunicationLoop()
647654 {
648655 if (subCoordRanks[scIdx] != globalCoord)
649656 {
650- std:: cout << " Rank " << m_RankMPI << " sending GROUP_CLOSE to rank " << subCoordRanks[scIdx] << std::endl;
657+ std::cout << " Rank " << m_RankMPI << " sending GROUP_CLOSE to rank "
658+ << subCoordRanks[scIdx] << std::endl;
651659 adios2::helper::RerouteMessage closeMsg;
652660 closeMsg.m_MsgType = RerouteMessage::MessageType::GROUP_CLOSE;
653- closeMsg.NonBlockingSendTo (m_Comm, subCoordRanks[scIdx], sendBuffers.GetNextBuffer ());
661+ closeMsg.NonBlockingSendTo (m_Comm, subCoordRanks[scIdx],
662+ sendBuffers.GetNextBuffer ());
654663 }
655664 }
656665
657- std::cout << " Rank " << m_RankMPI << " marking my own close ack as received" << std::endl;
666+ std::cout << " Rank " << m_RankMPI << " marking my own close ack as received"
667+ << std::endl;
658668 receivedGroupClose = true ;
659669 closeAcksNeeded.erase (globalCoord);
660670
@@ -672,7 +682,8 @@ void BP5Writer::ReroutingCommunicationLoop()
672682 }
673683 else
674684 {
675- std::cout << " Rank " << m_RankMPI << " still need " << closeAcksNeeded.size () << " close acks [ " ;
685+ std::cout << " Rank " << m_RankMPI << " still need " << closeAcksNeeded.size ()
686+ << " close acks [ " ;
676687 for (int n : closeAcksNeeded)
677688 {
678689 std::cout << " " << n;
0 commit comments