@@ -88,6 +88,19 @@ pub(crate) struct Downstairs {
8888 /// (including automatic flushes).
8989 next_flush : u64 ,
9090
91+ /// Indicates whether we are eligible for replay
92+ ///
93+ /// We are only eligible for replay if all jobs since the last flush are
94+ /// buffered (i.e. none have been retired by a `Barrier` operation).
95+ can_replay : bool ,
96+
97+ /// How many `Flush` or `Barrier` operations are pending?
98+ ///
99+ /// We only want to send a `Barrier` if there isn't already one pending, so
100+ /// we track it here (incrementing in `submit_flush` / `submit_barrier` and
101+ /// decrementing in `retire_check`).
102+ pending_barrier : usize ,
103+
91104 /// Ringbuf of recently acked job IDs.
92105 acked_ids : AllocRingBuffer < JobId > ,
93106
@@ -291,6 +304,8 @@ impl Downstairs {
291304 } ,
292305 cfg,
293306 next_flush : 0 ,
307+ can_replay : true ,
308+ pending_barrier : 0 ,
294309 ds_active : ActiveJobs :: new ( ) ,
295310 gw_active : HashSet :: new ( ) ,
296311 acked_ids : AllocRingBuffer :: new ( 2048 ) ,
@@ -523,7 +538,7 @@ impl Downstairs {
523538
524539 // Restart the IO task for that specific client, transitioning to a new
525540 // state.
526- self . clients [ client_id] . reinitialize ( up_state) ;
541+ self . clients [ client_id] . reinitialize ( up_state, self . can_replay ) ;
527542
528543 for i in ClientId :: iter ( ) {
529544 // Clear per-client delay, because we're starting a new session
@@ -1887,6 +1902,7 @@ impl Downstairs {
18871902 extent_limit : extent_under_repair,
18881903 } ;
18891904
1905+ self . pending_barrier += 1 ;
18901906 self . enqueue (
18911907 next_id,
18921908 flush,
@@ -1896,6 +1912,47 @@ impl Downstairs {
18961912 next_id
18971913 }
18981914
1915+ /// Checks to see whether a `Barrier` operation is needed
1916+ ///
1917+ /// A `Barrier` is needed if we have buffered more than
1918+ /// `IO_CACHED_MAX_BYTES/JOBS` worth of complete jobs, and there are no
1919+ /// other barrier (or flush) operations in flight
1920+ pub ( crate ) fn needs_barrier ( & self ) -> bool {
1921+ if self . pending_barrier > 0 {
1922+ return false ;
1923+ }
1924+
1925+ // n.b. This may not be 100% reliable: if different Downstairs have
1926+ // finished a different subset of jobs, then it's theoretically possible
1927+ // for each DownstairsClient to be under our limits, but for the true
1928+ // number of cached bytes/jobs to be over the limits.
1929+ //
1930+ // It's hard to imagine how we could encounter such a situation, given
1931+ // job dependencies and no out-of-order execution, so this is more of a
1932+ // "fun fact" and less an actual concern.
1933+ let max_jobs = self
1934+ . clients
1935+ . iter ( )
1936+ . map ( |c| {
1937+ let i = c. io_state_job_count ( ) ;
1938+ i. skipped + i. done + i. error
1939+ } )
1940+ . max ( )
1941+ . unwrap ( ) ;
1942+ let max_bytes = self
1943+ . clients
1944+ . iter ( )
1945+ . map ( |c| {
1946+ let i = c. io_state_byte_count ( ) ;
1947+ i. skipped + i. done + i. error
1948+ } )
1949+ . max ( )
1950+ . unwrap ( ) ;
1951+
1952+ max_jobs as u64 >= crate :: IO_CACHED_MAX_JOBS
1953+ || max_bytes >= crate :: IO_CACHED_MAX_BYTES
1954+ }
1955+
18991956 pub ( crate ) fn submit_barrier ( & mut self ) -> JobId {
19001957 let next_id = self . next_id ( ) ;
19011958 cdt:: gw__barrier__start!( || ( next_id. 0 ) ) ;
@@ -1906,6 +1963,7 @@ impl Downstairs {
19061963 let dependencies = self . ds_active . deps_for_flush ( next_id) ;
19071964 debug ! ( self . log, "IO Barrier {next_id} has deps {dependencies:?}" ) ;
19081965
1966+ self . pending_barrier += 1 ;
19091967 self . enqueue (
19101968 next_id,
19111969 IOop :: Barrier { dependencies } ,
@@ -2439,11 +2497,17 @@ impl Downstairs {
24392497 Ok ( ReplaceResult :: Started )
24402498 }
24412499
2500+ /// Checks whether the given client state should go from Offline -> Faulted
2501+ ///
2502+ /// # Panics
2503+ /// If the given client is not in the `Offline` state
24422504 pub ( crate ) fn check_gone_too_long (
24432505 & mut self ,
24442506 client_id : ClientId ,
24452507 up_state : & UpstairsState ,
24462508 ) {
2509+ assert_eq ! ( self . clients[ client_id] . state( ) , DsState :: Offline ) ;
2510+
24472511 let byte_count = self . clients [ client_id] . total_bytes_outstanding ( ) ;
24482512 let work_count = self . clients [ client_id] . total_live_work ( ) ;
24492513 let failed = if work_count > crate :: IO_OUTSTANDING_MAX_JOBS {
@@ -2458,6 +2522,13 @@ impl Downstairs {
24582522 "downstairs failed, too many outstanding bytes {byte_count}"
24592523 ) ;
24602524 Some ( ClientStopReason :: TooManyOutstandingBytes )
2525+ } else if !self . can_replay {
2526+ // XXX can this actually happen?
2527+ warn ! (
2528+ self . log,
2529+ "downstairs became ineligible for replay while offline"
2530+ ) ;
2531+ Some ( ClientStopReason :: IneligibleForReplay )
24612532 } else {
24622533 None
24632534 } ;
@@ -2589,9 +2660,12 @@ impl Downstairs {
25892660 /// writes and if they aren't included in replay then the write will
25902661 /// never start.
25912662 fn retire_check ( & mut self , ds_id : JobId ) {
2592- if !self . is_flush ( ds_id) {
2593- return ;
2594- }
2663+ let job = self . ds_active . get ( & ds_id) . expect ( "checked missing job" ) ;
2664+ let can_replay = match job. work {
2665+ IOop :: Flush { .. } => true ,
2666+ IOop :: Barrier { .. } => false ,
2667+ _ => return ,
2668+ } ;
25952669
25962670 // Only a completed flush will remove jobs from the active queue -
25972671 // currently we have to keep everything around for use during replay
@@ -2645,6 +2719,13 @@ impl Downstairs {
26452719 for & id in & retired {
26462720 let job = self . ds_active . remove ( & id) ;
26472721
2722+ // Update our barrier count for the removed job
2723+ if matches ! ( job. work, IOop :: Flush { .. } | IOop :: Barrier { .. } )
2724+ {
2725+ self . pending_barrier =
2726+ self . pending_barrier . checked_sub ( 1 ) . unwrap ( ) ;
2727+ }
2728+
26482729 // Jobs should have their backpressure contribution removed when
26492730 // they are completed (in `process_io_completion_inner`),
26502731 // **not** when they are retired. We'll do a sanity check here
@@ -2666,6 +2747,9 @@ impl Downstairs {
26662747 for cid in ClientId :: iter ( ) {
26672748 self . clients [ cid] . skipped_jobs . retain ( |& x| x >= ds_id) ;
26682749 }
2750+
2751+ // Update the flag indicating whether replay is allowed
2752+ self . can_replay = can_replay;
26692753 }
26702754 }
26712755
@@ -4176,6 +4260,13 @@ impl Downstairs {
41764260 self . ddef = Some ( ddef) ;
41774261 }
41784262
4263+ /// Checks whether there are any in-progress jobs present
4264+ pub ( crate ) fn has_live_jobs ( & self ) -> bool {
4265+ self . clients
4266+ . iter ( )
4267+ . any ( |c| c. backpressure_counters . get_jobs ( ) > 0 )
4268+ }
4269+
41794270 /// Returns the per-client state for the given job
41804271 ///
41814272 /// This is a helper function to make unit tests shorter
0 commit comments