@@ -59,6 +59,7 @@ struct WriterGroupState
5959 IDLE,
6060 PENDING,
6161 CAPACITY,
62+ CLOSED,
6263 };
6364
6465 Status m_currentStatus;
@@ -193,6 +194,9 @@ void BP5Writer::ReroutingCommunicationLoop()
193194 StateTraversal pairFinder;
194195 std::vector<int > subCoordRanks;
195196 bool firstIdleMsg = true ;
197+ std::set<int > closeAcksNeeded;
198+ std::set<int > groupIdlesNeeded;
199+ bool waitingForCloseAcks = false ;
196200
197201 // Sends are non-blocking. We use the pool to avoid the situation where the
198202 // buffer is destructed before the send is complete. If we start seeing
@@ -229,6 +233,8 @@ void BP5Writer::ReroutingCommunicationLoop()
229233 groupState[i].m_currentStatus = WriterGroupState::Status::UNKNOWN;
230234 groupState[i].m_subFileIndex = i;
231235 subCoordRanks[i] = m_Partitioning.m_Partitions [i][0 ];
236+ closeAcksNeeded.insert (subCoordRanks[i]);
237+ groupIdlesNeeded.insert (subCoordRanks[i]);
232238 }
233239 }
234240
@@ -257,7 +263,7 @@ void BP5Writer::ReroutingCommunicationLoop()
257263 {
258264 if (iAmSubCoord)
259265 {
260- return !receivedGroupClose || expectingWriteCompletion;
266+ return !receivedGroupClose || expectingWriteCompletion || waitingForCloseAcks ;
261267 }
262268
263269 return !receivedGroupClose;
@@ -308,7 +314,42 @@ void BP5Writer::ReroutingCommunicationLoop()
308314 // msg for sub coordinator
309315 m_DataPos = currentFilePos;
310316 receivedGroupClose = true ;
311- continue ;
317+
318+ std::cout << " Rank " << m_RankMPI << " sending GROUP_CLOSE_ACK to rank "
319+ << globalCoord << std::endl;
320+ adios2::helper::RerouteMessage closeAckMsg;
321+ closeAckMsg.m_MsgType = RerouteMessage::MessageType::GROUP_CLOSE_ACK;
322+ closeAckMsg.m_SrcRank = m_RankMPI;
323+ closeAckMsg.m_DestRank = globalCoord;
324+ closeAckMsg.m_WildCard = m_Aggregator->m_SubStreamIndex ;
325+ closeAckMsg.BlockingSendTo (m_Comm, globalCoord,
326+ sendBuffers.GetNextBuffer ());
327+ break ;
328+ case RerouteMessage::MessageType::GROUP_CLOSE_ACK:
329+ // msg for global coordinator
330+ {
331+ std::cout << " Rank " << m_RankMPI << " received GROUP_CLOSE_ACK from rank "
332+ << status.Source << std::endl;
333+
334+ size_t ackGroupIdx = static_cast <size_t >(message.m_WildCard );
335+ groupState[ackGroupIdx].m_currentStatus = WriterGroupState::Status::CLOSED;
336+ closeAcksNeeded.erase (status.Source );
337+
338+ if (!closeAcksNeeded.empty ())
339+ {
340+ std::stringstream ss;
341+
342+ ss << " Rank " << m_RankMPI << " still awaiting close acks from: [ " ;
343+ for (int n : closeAcksNeeded)
344+ {
345+ ss << n << " " ;
346+ }
347+ ss << " ]\n " ;
348+
349+ std::cout << ss.str ();
350+ }
351+ }
352+ break ;
312353 case RerouteMessage::MessageType::WRITER_IDLE:
313354 std::cout << " Rank " << m_RankMPI << " received WRITER_IDLE from rank "
314355 << status.Source << std::endl;
@@ -319,6 +360,7 @@ void BP5Writer::ReroutingCommunicationLoop()
319360 size_t idleGroup = static_cast <size_t >(message.m_WildCard );
320361 groupState[idleGroup].m_currentStatus = WriterGroupState::Status::IDLE;
321362 groupState[idleGroup].m_queueSize = 0 ;
363+ groupIdlesNeeded.erase (idleWriter);
322364
323365 if (firstIdleMsg)
324366 {
@@ -448,6 +490,8 @@ void BP5Writer::ReroutingCommunicationLoop()
448490 writeMoreMsg.NonBlockingSendTo (m_Comm, message.m_DestRank ,
449491 sendBuffers.GetNextBuffer ());
450492
493+ groupIdlesNeeded.insert (message.m_DestRank );
494+
451495 // Src subcoord state is returned to writing, dest subcoord state is now writing as
452496 // well
453497 groupState[message.m_SrcRank ].m_currentStatus = WriterGroupState::Status::WRITING;
@@ -474,9 +518,11 @@ void BP5Writer::ReroutingCommunicationLoop()
474518
475519 // All processes
476520 // Check if writing has finished, and alert the target SC
521+ if (!sentFinished)
477522 {
523+ // std::cout << "Rank " << m_RankMPI << " attempting to get lock on m_NotifMutex" << std::endl;
478524 std::lock_guard<std::mutex> lck (m_NotifMutex);
479- if (m_FinishedWriting && !sentFinished )
525+ if (m_FinishedWriting)
480526 {
481527 adios2::helper::RerouteMessage writeCompleteMsg;
482528 writeCompleteMsg.m_MsgType = RerouteMessage::MessageType::WRITE_COMPLETION;
@@ -488,23 +534,23 @@ void BP5Writer::ReroutingCommunicationLoop()
488534 if (!iAmSubCoord && !iAmGlobalCoord)
489535 {
490536 std::cout << " Rank " << m_RankMPI << " notifying SC (" << m_TargetCoordinator
491- << " ) of write completion -- BLOCKING" << std::endl;
537+ << " ) of write completion -- BLOCKING" << std::endl;
492538 // My only role was to write (no communication responsibility) so I am
493539 // done at this point. However, I need to do a blocking send because I
494540 // am about to return from this function, at which point my buffer pool
495541 // goes away.
496542 writeCompleteMsg.BlockingSendTo (m_Comm, m_TargetCoordinator,
497- sendBuffers.GetNextBuffer ());
543+ sendBuffers.GetNextBuffer ());
498544
499545 receivedGroupClose = true ;
500546 continue ;
501547 }
502548 else
503549 {
504550 std::cout << " Rank " << m_RankMPI << " notifying SC (" << m_TargetCoordinator
505- << " ) of write completion -- NONBLOCKING" << std::endl;
551+ << " ) of write completion -- NONBLOCKING" << std::endl;
506552 writeCompleteMsg.NonBlockingSendTo (m_Comm, m_TargetCoordinator,
507- sendBuffers.GetNextBuffer ());
553+ sendBuffers.GetNextBuffer ());
508554 }
509555
510556 sentFinished = true ;
@@ -557,7 +603,7 @@ void BP5Writer::ReroutingCommunicationLoop()
557603
558604 // Global coordinator process
559605 // Look for possible reroute-to / reroute-from pairs
560- if (iAmGlobalCoord && needStateCheck)
606+ if (iAmGlobalCoord && needStateCheck && !waitingForCloseAcks )
561607 {
562608 std:: cout << " GC (" << m_RankMPI << " ) looking for reroute candidate pair" << std::endl;
563609 std::pair<size_t , size_t > nextPair;
@@ -585,9 +631,9 @@ void BP5Writer::ReroutingCommunicationLoop()
585631 groupState[idleIdx].m_currentStatus = WriterGroupState::Status::PENDING;
586632 groupState[writerIdx].m_currentStatus = WriterGroupState::Status::PENDING;
587633 }
588- else if (result == StateTraversal::SearchResult::FINISHED)
634+ else if (result == StateTraversal::SearchResult::FINISHED && groupIdlesNeeded. empty () )
589635 {
590- std:: cout << " GC ( " << m_RankMPI << " ) all work finished, sending GROUP_CLOSE " << std::endl;
636+
591637 for (size_t i = 0 ; i < groupState.size (); ++i)
592638 {
593639 std::cout << " group " << i << " status: " << static_cast <int >(groupState[i].m_currentStatus )
@@ -599,10 +645,39 @@ void BP5Writer::ReroutingCommunicationLoop()
599645 // comm loop.
600646 for (size_t scIdx = 0 ; scIdx < subCoordRanks.size (); ++scIdx)
601647 {
602- adios2::helper::RerouteMessage closeMsg;
603- closeMsg.m_MsgType = RerouteMessage::MessageType::GROUP_CLOSE;
604- closeMsg.NonBlockingSendTo (m_Comm, scIdx, sendBuffers.GetNextBuffer ());
648+ if (subCoordRanks[scIdx] != globalCoord)
649+ {
650+ std:: cout << " Rank " << m_RankMPI << " sending GROUP_CLOSE to rank " << subCoordRanks[scIdx] << std::endl;
651+ adios2::helper::RerouteMessage closeMsg;
652+ closeMsg.m_MsgType = RerouteMessage::MessageType::GROUP_CLOSE;
653+ closeMsg.NonBlockingSendTo (m_Comm, subCoordRanks[scIdx], sendBuffers.GetNextBuffer ());
654+ }
655+ }
656+
657+ std::cout << " Rank " << m_RankMPI << " marking my own close ack as received" << std::endl;
658+ receivedGroupClose = true ;
659+ closeAcksNeeded.erase (globalCoord);
660+
661+ waitingForCloseAcks = true ;
662+ }
663+ }
664+
665+ if (iAmGlobalCoord)
666+ {
667+ if (waitingForCloseAcks && closeAcksNeeded.empty ())
668+ {
669+ std::cout << " Rank " << m_RankMPI << " got all my close acks" << std::endl;
670+ // global coordinator received the final close ack, now it can leave
671+ waitingForCloseAcks = false ;
672+ }
673+ else
674+ {
675+ std::cout << " Rank " << m_RankMPI << " still need " << closeAcksNeeded.size () << " close acks [ " ;
676+ for (int n : closeAcksNeeded)
677+ {
678+ std::cout << " " << n;
605679 }
680+ std::cout << " ]" << std::endl;
606681 }
607682 }
608683 }
0 commit comments