Skip to content

Commit 54b7f28

Browse files
Fix a few bugs and adjust debug logs
- One certain issue was storing globalState using subcoordinator rank ids, rather than the index of the groups they coordinate. Fix that by adding a mapping from rank id to global state index. - When rerouting happens on time step zero, opening without append could result in a zero block appearing where the rerouted rank wrote. Fix that by allowing to specify forceAppend when rerouted ranks open their files.
1 parent 046a795 commit 54b7f28

File tree

3 files changed

+99
-73
lines changed

3 files changed

+99
-73
lines changed

source/adios2/engine/bp5/BP5Writer.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1668,7 +1668,7 @@ void BP5Writer::InitTransports()
16681668
InitBPBuffer();
16691669
}
16701670

1671-
void BP5Writer::OpenSubfile(bool useComm)
1671+
void BP5Writer::OpenSubfile(bool useComm, bool forceAppend)
16721672
{
16731673
std::string cacheKey = GetCacheKey(m_Aggregator);
16741674
auto search = m_AggregatorSpecifics.find(cacheKey);
@@ -1716,7 +1716,7 @@ void BP5Writer::OpenSubfile(bool useComm)
17161716

17171717
if (m_Parameters.AggregationType == (int)AggregationType::DataSizeBased)
17181718
{
1719-
if (m_WriterStep > 0)
1719+
if (m_WriterStep > 0 || forceAppend)
17201720
{
17211721
// override the mode to be append if we're opening a file that
17221722
// was already opened by another rank.

source/adios2/engine/bp5/BP5Writer.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ class BP5Writer : public BP5Engine, public core::Engine
7878
std::map<std::string, AggTransportData> m_AggregatorSpecifics;
7979
helper::RankPartition GetPartitionInfo(const uint64_t rankDataSize, const int subStreams,
8080
helper::Comm const &parentComm);
81-
void OpenSubfile(const bool useComm = true);
81+
void OpenSubfile(const bool useComm = true, const bool forceAppend = false);
8282
helper::Partitioning m_Partitioning;
8383

8484
/** Single object controlling BP buffering */

source/adios2/engine/bp5/BP5Writer_WithRerouting.cpp

Lines changed: 96 additions & 70 deletions
Original file line numberDiff line numberDiff line change
@@ -193,6 +193,7 @@ void BP5Writer::ReroutingCommunicationLoop()
193193
bool needStateCheck = false;
194194
StateTraversal pairFinder;
195195
std::vector<int> subCoordRanks;
196+
std::map<int, size_t> scRankToIndex;
196197
bool firstIdleMsg = true;
197198
std::set<int> closeAcksNeeded;
198199
std::set<int> groupIdlesNeeded;
@@ -233,6 +234,7 @@ void BP5Writer::ReroutingCommunicationLoop()
233234
groupState[i].m_currentStatus = WriterGroupState::Status::UNKNOWN;
234235
groupState[i].m_subFileIndex = i;
235236
subCoordRanks[i] = m_Partitioning.m_Partitions[i][0];
237+
scRankToIndex[m_Partitioning.m_Partitions[i][0]] = i;
236238
closeAcksNeeded.insert(subCoordRanks[i]);
237239
groupIdlesNeeded.insert(subCoordRanks[i]);
238240
}
@@ -332,20 +334,6 @@ void BP5Writer::ReroutingCommunicationLoop()
332334
size_t ackGroupIdx = static_cast<size_t>(message.m_WildCard);
333335
groupState[ackGroupIdx].m_currentStatus = WriterGroupState::Status::CLOSED;
334336
closeAcksNeeded.erase(status.Source);
335-
336-
if (!closeAcksNeeded.empty())
337-
{
338-
std::stringstream ss;
339-
340-
ss << "Rank " << m_RankMPI << " still awaiting close acks from: [ ";
341-
for (int n : closeAcksNeeded)
342-
{
343-
ss << n << " ";
344-
}
345-
ss << " ]\n";
346-
347-
std::cout << ss.str();
348-
}
349337
}
350338
break;
351339
case RerouteMessage::MessageType::WRITER_IDLE:
@@ -455,56 +443,65 @@ void BP5Writer::ReroutingCommunicationLoop()
455443
case RerouteMessage::MessageType::REROUTE_REJECT:
456444
std::cout << "Rank " << m_RankMPI << " received REROUTE_REJECT from rank "
457445
<< status.Source << std::endl;
458-
// msg for global coordinator
459-
460-
// Both the src and target subcoord states return from PENDING to their prior state
461-
if (groupState[message.m_SrcRank].m_currentStatus ==
462-
WriterGroupState::Status::PENDING)
463446
{
464-
groupState[message.m_SrcRank].m_currentStatus =
465-
WriterGroupState::Status::WRITING;
466-
groupState[message.m_SrcRank].m_queueSize = 0;
467-
}
447+
// msg for global coordinator
468448

469-
if (groupState[message.m_DestRank].m_currentStatus ==
470-
WriterGroupState::Status::PENDING)
471-
{
472-
groupState[message.m_DestRank].m_currentStatus = WriterGroupState::Status::IDLE;
473-
}
449+
size_t srcIdx = scRankToIndex[message.m_SrcRank];
450+
size_t destIdx = scRankToIndex[message.m_DestRank];
451+
452+
// Both the src and target subcoord states return from PENDING to their prior state
453+
if (groupState[srcIdx].m_currentStatus ==
454+
WriterGroupState::Status::PENDING)
455+
{
456+
groupState[srcIdx].m_currentStatus =
457+
WriterGroupState::Status::WRITING;
458+
groupState[srcIdx].m_queueSize = 0;
459+
}
474460

475-
// The reason to check here is that global coord triggers at most
476-
// one reroute sequence per iteration through the loop. Otherwise
477-
// we probably don't need a state check upon receipt of rejection.
478-
needStateCheck = true;
461+
if (groupState[destIdx].m_currentStatus ==
462+
WriterGroupState::Status::PENDING)
463+
{
464+
groupState[destIdx].m_currentStatus = WriterGroupState::Status::IDLE;
465+
}
466+
467+
// The reason to check here is that global coord triggers at most
468+
// one reroute sequence per iteration through the loop. Otherwise
469+
// we probably don't need a state check upon receipt of rejection.
470+
needStateCheck = true;
471+
}
479472
break;
480473
case RerouteMessage::MessageType::REROUTE_ACK:
481474
std::cout << "Rank " << m_RankMPI << " received REROUTE_ACK from rank "
482475
<< status.Source << std::endl;
483476
// msg for global coordinator
477+
{
478+
479+
std::cout << "Rank " << m_RankMPI << " sending WRITE_MORE to rank "
480+
<< message.m_DestRank << std::endl;
484481

485-
std::cout << "Rank " << m_RankMPI << " sending WRITE_MORE to rank "
486-
<< message.m_DestRank << std::endl;
487-
488-
// Send the lucky volunteer another writer
489-
adios2::helper::RerouteMessage writeMoreMsg;
490-
writeMoreMsg.m_MsgType = RerouteMessage::MessageType::WRITE_MORE;
491-
writeMoreMsg.m_WildCard = message.m_WildCard; // i.e. the rerouted writer rank
492-
writeMoreMsg.NonBlockingSendTo(m_Comm, message.m_DestRank,
493-
sendBuffers.GetNextBuffer());
494-
495-
groupIdlesNeeded.insert(message.m_DestRank);
496-
497-
// Src subcoord state is returned to writing, dest subcoord state is now writing as
498-
// well
499-
groupState[message.m_SrcRank].m_currentStatus = WriterGroupState::Status::WRITING;
500-
groupState[message.m_SrcRank].m_queueSize -= 1;
501-
groupState[message.m_DestRank].m_currentStatus = WriterGroupState::Status::WRITING;
502-
// groupState[message.m_DestRank].m_queueSize += 1;
503-
504-
// The reason to check here is that global coord triggers at most
505-
// one reroute sequence per iteration through the loop. Otherwise
506-
// we probably don't need a state check upon receiving reroute ack.
507-
needStateCheck = true;
482+
// Send the lucky volunteer another writer
483+
adios2::helper::RerouteMessage writeMoreMsg;
484+
writeMoreMsg.m_MsgType = RerouteMessage::MessageType::WRITE_MORE;
485+
writeMoreMsg.m_WildCard = message.m_WildCard; // i.e. the rerouted writer rank
486+
writeMoreMsg.NonBlockingSendTo(m_Comm, message.m_DestRank,
487+
sendBuffers.GetNextBuffer());
488+
489+
groupIdlesNeeded.insert(message.m_DestRank);
490+
491+
// Src subcoord state is returned to writing, dest subcoord state is now writing as
492+
// well
493+
size_t srcIdx = scRankToIndex[message.m_SrcRank];
494+
size_t destIdx = scRankToIndex[message.m_DestRank];
495+
groupState[srcIdx].m_currentStatus = WriterGroupState::Status::WRITING;
496+
groupState[srcIdx].m_queueSize -= 1;
497+
groupState[destIdx].m_currentStatus = WriterGroupState::Status::WRITING;
498+
// groupState[destIdx].m_queueSize += 1;
499+
500+
// The reason to check here is that global coord triggers at most
501+
// one reroute sequence per iteration through the loop. Otherwise
502+
// we probably don't need a state check upon receiving reroute ack.
503+
needStateCheck = true;
504+
}
508505
break;
509506
case RerouteMessage::MessageType::WRITE_MORE:
510507
std::cout << "Rank " << m_RankMPI << " received WRITE_MORE from rank "
@@ -608,7 +605,7 @@ void BP5Writer::ReroutingCommunicationLoop()
608605

609606
// Global coordinator process
610607
// Look for possible reroute-to / reroute-from pairs
611-
if (iAmGlobalCoord && needStateCheck && !waitingForCloseAcks)
608+
if (iAmGlobalCoord /*&& needStateCheck*/ && !waitingForCloseAcks)
612609
{
613610
std::cout << "GC (" << m_RankMPI << ") looking for reroute candidate pair" << std::endl;
614611
std::pair<size_t, size_t> nextPair;
@@ -670,25 +667,51 @@ void BP5Writer::ReroutingCommunicationLoop()
670667

671668
waitingForCloseAcks = true;
672669
}
670+
else
671+
{
672+
std::cout << "candidate pair search returned ";
673+
674+
switch (result)
675+
{
676+
case StateTraversal::SearchResult::NOT_FOUND:
677+
std::cout << "NOT FOUND" << std::endl;
678+
std::cout << " states: [ ";
679+
for (size_t i = 0; i < groupState.size(); ++i)
680+
{
681+
std::cout << static_cast<int>(groupState[i].m_currentStatus) << " ";
682+
}
683+
std::cout << "]" << std::endl;
684+
break;
685+
case StateTraversal::SearchResult::FOUND:
686+
std::cout << "FOUND" << std::endl;
687+
break;
688+
case StateTraversal::SearchResult::FINISHED:
689+
std::cout << "FINISHED" << std::endl;
690+
break;
691+
}
692+
}
673693
}
674694

675695
if (iAmGlobalCoord)
676696
{
677-
if (waitingForCloseAcks && closeAcksNeeded.empty())
678-
{
679-
std::cout << "Rank " << m_RankMPI << " got all my close acks" << std::endl;
680-
// global coordinator received the final close ack, now it can leave
681-
waitingForCloseAcks = false;
682-
}
683-
else
697+
if (waitingForCloseAcks)
684698
{
685-
std::cout << "Rank " << m_RankMPI << " still need " << closeAcksNeeded.size()
686-
<< " close acks [ ";
687-
for (int n : closeAcksNeeded)
699+
if (closeAcksNeeded.empty())
688700
{
689-
std::cout << " " << n;
701+
std::cout << "Rank " << m_RankMPI << " got all my close acks" << std::endl;
702+
// global coordinator received the final close ack, now it can leave
703+
waitingForCloseAcks = false;
704+
}
705+
else
706+
{
707+
std::cout << "Rank " << m_RankMPI << " still need " << closeAcksNeeded.size()
708+
<< " close acks [ ";
709+
for (int n : closeAcksNeeded)
710+
{
711+
std::cout << " " << n;
712+
}
713+
std::cout << " ]" << std::endl;
690714
}
691-
std::cout << " ]" << std::endl;
692715
}
693716
}
694717
}
@@ -742,8 +765,11 @@ void BP5Writer::WriteData_WithRerouting(format::BufferV *Data)
742765
m_Aggregator->m_SubStreamIndex = m_TargetIndex;
743766

744767
// Open the subfile without doing any collective communications, since the global
745-
// coordinator ensures only one rerouted rank opens this file at a time.
746-
OpenSubfile(false);
768+
// coordinator ensures only one rerouted rank opens this file at a time. Also,
769+
// be sure to open in append mode because another rank already wrote to this file,
770+
// and open without append mode in that case can result in a block of zeros getting
771+
// written.
772+
OpenSubfile(false, true);
747773
}
748774

749775
// align to PAGE_SIZE

0 commit comments

Comments
 (0)