diff --git a/src/storage/src/source/mod.rs b/src/storage/src/source/mod.rs index ecdaf80c082de..e2ab96246022e 100644 --- a/src/storage/src/source/mod.rs +++ b/src/storage/src/source/mod.rs @@ -997,11 +997,11 @@ where let mut pending_messages = vec![]; loop { tokio::select! { + // N.B. This branch is cancel-safe because `next` only borrows the underlying stream. item = source_stream.next() => { match item { - Some(Ok(message)) => pending_messages.push(Ok(message) - ), + Some(Ok(message)) => pending_messages.push(Ok(message)), // TODO: make errors definite Some(Err(e)) => pending_messages.push(Err(e)), None => {}, @@ -1009,39 +1009,25 @@ where } // It's time to timestamp a batch _ = timestamp_interval.tick() => { - let mut max_offsets = HashMap::new(); - for message in pending_messages.iter().filter_map(|m| m.as_ref().ok()) { - let entry = max_offsets.entry(message.partition.clone()).or_default(); - *entry = std::cmp::max(*entry, message.offset); - } - let (bindings, progress) = match timestamper.timestamp_offsets(&max_offsets).await { - Ok((bindings, progress)) => (bindings, progress), - Err(e) => { - error!("Error timestamping offsets: {}", e); + let mut max_offsets = HashMap::new(); + match handle_timestamper_tick::( + &mut max_offsets, + &mut timestamper, + &mut pending_messages, + source_stream.is_done() + ).await { + Some(iter) => { + for event in iter { + yield event; + } + }, + None => { return; - } - }; - - for msg in pending_messages.drain(..) { - match msg{ - Ok(message) => { - let ts = bindings.get(&message.partition).expect("timestamper didn't return partition").0; - yield Event::Message(ts, Ok(message)); - }, - Err(e) => { - // TODO: make errors definite - yield Event::Message(0, Err(e)); - }, - } + }, } - let progress_some = progress.as_option().is_some(); - yield Event::Progress(progress.into_option()); + if source_stream.is_done() { - // We just emitted the last piece of data that needed to be timestamped - if progress_some { - yield Event::Progress(None); - } - break; + return; } } } @@ -1121,6 +1107,54 @@ where } } +async fn handle_timestamper_tick<'a, S: SourceReader>( + max_offsets: &'a mut HashMap, + timestamper: &mut ReclockOperator, + pending_messages: &'a mut Vec, SourceReaderError>>, + is_done: bool, +) -> Option< + impl Iterator< + Item = Event< + Option, + Result, SourceReaderError>, + >, + > + 'a, +> { + for message in pending_messages.iter().filter_map(|m| m.as_ref().ok()) { + let entry = max_offsets.entry(message.partition.clone()).or_default(); + *entry = std::cmp::max(*entry, message.offset); + } + let (bindings, progress) = match timestamper.timestamp_offsets(&*max_offsets).await { + Ok((bindings, progress)) => (bindings, progress), + Err(e) => { + error!("Error timestamping offsets: {}", e); + return None; + } + }; + + let iter = pending_messages.drain(..).map(move |msg| { + match msg { + Ok(message) => { + let ts = bindings + .get(&message.partition) + .expect("timestamper didn't return partition") + .0; + Event::Message(ts, Ok(message)) + } + Err(e) => { + // TODO: make errors definite + Event::Message(0, Err(e)) + } + } + }); + let progress_some = progress.as_option().is_some(); + let iter = iter.chain(Some(Event::Progress(progress.into_option()))); + Some(iter.chain( + // We just emitted the last piece of data that needed to be timestamped + (is_done && progress_some).then(|| Event::Progress(None)), + )) +} + /// Take `message` and assign it the appropriate timestamps and push it into the /// dataflow layer, if possible. ///