Skip to content

Commit

Permalink
format async-stream
Browse files Browse the repository at this point in the history
  • Loading branch information
guswynn committed May 19, 2022
1 parent 2e70713 commit 68e7231
Showing 1 changed file with 11 additions and 3 deletions.
14 changes: 11 additions & 3 deletions src/storage/src/source/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -950,8 +950,16 @@ where
let sync_activator = scope.sync_activator_for(&info.address[..]);
let base_metrics = base_metrics.clone();
let source_connector = source_connector.clone();
let mut source_reader = Box::pin(async_stream::stream! {
let mut timestamper = match ReclockOperator::new(name.clone(), storage_metadata, now, timestamp_frequency.clone(), as_of.clone()).await {
let mut source_reader = Box::pin(async_stream::stream!({
let mut timestamper = match ReclockOperator::new(
name.clone(),
storage_metadata,
now,
timestamp_frequency.clone(),
as_of.clone(),
)
.await
{
Ok(t) => t,
Err(e) => {
error!("Failed to create source {} timestamper: {:#}", name, e);
Expand Down Expand Up @@ -1037,7 +1045,7 @@ where
}
}
}
});
}));

let activator = scope.activator_for(&info.address[..]);
move |cap, output| {
Expand Down

0 comments on commit 68e7231

Please sign in to comment.