Skip to content

Commit

Permalink
format select
Browse files Browse the repository at this point in the history
  • Loading branch information
guswynn committed May 18, 2022
1 parent 7f0e9fa commit e807e54
Showing 1 changed file with 66 additions and 32 deletions.
98 changes: 66 additions & 32 deletions src/storage/src/source/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -997,51 +997,37 @@ where
let mut pending_messages = vec![];
loop {
tokio::select! {

// N.B. This branch is cancel-safe because `next` only borrows the underlying stream.
item = source_stream.next() => {
match item {
Some(Ok(message)) => pending_messages.push(Ok(message)
),
Some(Ok(message)) => pending_messages.push(Ok(message)),
// TODO: make errors definite
Some(Err(e)) => pending_messages.push(Err(e)),
None => {},
}
}
// It's time to timestamp a batch
_ = timestamp_interval.tick() => {
let mut max_offsets = HashMap::new();
for message in pending_messages.iter().filter_map(|m| m.as_ref().ok()) {
let entry = max_offsets.entry(message.partition.clone()).or_default();
*entry = std::cmp::max(*entry, message.offset);
}
let (bindings, progress) = match timestamper.timestamp_offsets(&max_offsets).await {
Ok((bindings, progress)) => (bindings, progress),
Err(e) => {
error!("Error timestamping offsets: {}", e);
let mut max_offsets = HashMap::new();
match handle_timestamper_tick::<S>(
&mut max_offsets,
&mut timestamper,
&mut pending_messages,
source_stream.is_done()
).await {
Some(iter) => {
for event in iter {
yield event;
}
},
None => {
return;
}
};

for msg in pending_messages.drain(..) {
match msg{
Ok(message) => {
let ts = bindings.get(&message.partition).expect("timestamper didn't return partition").0;
yield Event::Message(ts, Ok(message));
},
Err(e) => {
// TODO: make errors definite
yield Event::Message(0, Err(e));
},
}
},
}
let progress_some = progress.as_option().is_some();
yield Event::Progress(progress.into_option());

if source_stream.is_done() {
// We just emitted the last piece of data that needed to be timestamped
if progress_some {
yield Event::Progress(None);
}
break;
return;
}
}
}
Expand Down Expand Up @@ -1121,6 +1107,54 @@ where
}
}

async fn handle_timestamper_tick<'a, S: SourceReader>(
max_offsets: &'a mut HashMap<PartitionId, MzOffset>,
timestamper: &mut ReclockOperator,
pending_messages: &'a mut Vec<Result<SourceMessage<S::Key, S::Value>, SourceReaderError>>,
is_done: bool,
) -> Option<
impl Iterator<
Item = Event<
Option<Timestamp>,
Result<SourceMessage<S::Key, S::Value>, SourceReaderError>,
>,
> + 'a,
> {
for message in pending_messages.iter().filter_map(|m| m.as_ref().ok()) {
let entry = max_offsets.entry(message.partition.clone()).or_default();
*entry = std::cmp::max(*entry, message.offset);
}
let (bindings, progress) = match timestamper.timestamp_offsets(&*max_offsets).await {
Ok((bindings, progress)) => (bindings, progress),
Err(e) => {
error!("Error timestamping offsets: {}", e);
return None;
}
};

let iter = pending_messages.drain(..).map(move |msg| {
match msg {
Ok(message) => {
let ts = bindings
.get(&message.partition)
.expect("timestamper didn't return partition")
.0;
Event::Message(ts, Ok(message))
}
Err(e) => {
// TODO: make errors definite
Event::Message(0, Err(e))
}
}
});
let progress_some = progress.as_option().is_some();
let iter = iter.chain(Some(Event::Progress(progress.into_option())));
Some(iter.chain(
// We just emitted the last piece of data that needed to be timestamped
(is_done && progress_some).then(|| Event::Progress(None)),
))
}

/// Take `message` and assign it the appropriate timestamps and push it into the
/// dataflow layer, if possible.
///
Expand Down

0 comments on commit e807e54

Please sign in to comment.