@@ -618,11 +618,16 @@ Series::flushFileBased( iterations_iterator begin, iterations_iterator end )
618618 switch ( openIterationIfDirty ( it->first , it->second ) )
619619 {
620620 using IO = IterationOpened;
621+ case IO::RemainsClosed:
622+ // we might need to proceed further if the close status is
623+ // ClosedInFrontend
624+ // hence no continue here
625+ // otherwise, we might forget to close files physically
626+ break ;
621627 case IO::HasBeenOpened:
628+ // continue below
622629 it->second .flush ();
623630 break ;
624- case IO::RemainsClosed:
625- break ;
626631 }
627632
628633 // Phase 2
@@ -701,11 +706,15 @@ Series::flushGorVBased( iterations_iterator begin, iterations_iterator end )
701706 switch ( openIterationIfDirty ( it->first , it->second ) )
702707 {
703708 using IO = IterationOpened;
709+ case IO::RemainsClosed:
710+ // we might need to proceed further if the close status is
711+ // ClosedInFrontend
712+ // hence no continue here
713+ break ;
704714 case IO::HasBeenOpened:
715+ // continue below
705716 it->second .flush ();
706717 break ;
707- case IO::RemainsClosed:
708- break ;
709718 }
710719
711720 // Phase 2
@@ -739,6 +748,7 @@ Series::flushGorVBased( iterations_iterator begin, iterations_iterator end )
739748 {
740749 using IO = IterationOpened;
741750 case IO::HasBeenOpened:
751+ series.m_currentlyActiveIterations .emplace ( it->first );
742752 if ( !it->second .written () )
743753 {
744754 it->second .parent () = getWritable ( &series.iterations );
@@ -1250,9 +1260,15 @@ Series::advance(
12501260 * opening an iteration's file by beginning a step on it.
12511261 * So, return now.
12521262 */
1263+ iteration.get ().m_closed = internal::CloseStatus::ClosedInBackend;
12531264 return AdvanceStatus::OK;
12541265 }
12551266
1267+ if ( mode == AdvanceMode::ENDSTEP )
1268+ {
1269+ flushStep ( /* doFlush = */ false );
1270+ }
1271+
12561272 Parameter< Operation::ADVANCE > param;
12571273 if ( itData.m_closed == internal::CloseStatus::ClosedTemporarily &&
12581274 series.m_iterationEncoding == IterationEncoding::fileBased )
@@ -1324,6 +1340,28 @@ Series::advance(
13241340 return *param.status ;
13251341}
13261342
1343+ void Series::flushStep ( bool doFlush )
1344+ {
1345+ auto & series = get ();
1346+ if ( !series.m_currentlyActiveIterations .empty () )
1347+ {
1348+ // @todo I don't think this understands changing extents over time
1349+ // Not strictly necessary yet but it might come biting us later
1350+ Parameter< Operation::WRITE_ATT > wAttr;
1351+ wAttr.changesOverSteps = true ;
1352+ wAttr.name = " snapshot" ;
1353+ wAttr.resource = std::vector< unsigned long long >{
1354+ series.m_currentlyActiveIterations .begin (),
1355+ series.m_currentlyActiveIterations .end () };
1356+ wAttr.dtype = Datatype::VEC_ULONGLONG;
1357+ IOHandler ()->enqueue ( IOTask ( &series.iterations , wAttr ) );
1358+ if ( doFlush )
1359+ {
1360+ IOHandler ()->flush ();
1361+ }
1362+ }
1363+ }
1364+
13271365auto Series::openIterationIfDirty ( uint64_t index, Iteration iteration )
13281366 -> IterationOpened
13291367{
@@ -1586,6 +1624,7 @@ SeriesData::~SeriesData()
15861624 {
15871625 Series impl{ { this , []( auto const * ){} } };
15881626 impl.flush ();
1627+ impl.flushStep ( /* doFlush = */ true );
15891628 }
15901629 }
15911630 catch ( std::exception const & ex )
0 commit comments