Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

format usages of async-stream #12562

Merged
merged 1 commit into from
May 23, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions src/dataflow-types/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -729,15 +729,17 @@ pub trait GenericClient<C, R>: fmt::Debug + Send {
where
R: Send + 'a,
{
Box::pin(async_stream::stream! {
Box::pin(async_stream::stream!({
loop {
match self.recv().await {
Ok(Some(response)) => yield Ok(response),
Err(error) => yield Err(error),
Ok(None) => { return; }
Ok(None) => {
return;
}
}
}
})
}))
}
}

Expand Down
6 changes: 3 additions & 3 deletions src/persist-client/src/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -241,13 +241,13 @@ where
{
/// Convert listener into futures::Stream
pub fn into_stream(mut self) -> impl Stream<Item = ListenEvent<K, V, T, D>> {
async_stream::stream! {
loop{
async_stream::stream!({
loop {
for msg in self.next().await {
yield msg;
}
}
}
})
}

/// Attempt to pull out the next values of this subscription.
Expand Down
18 changes: 13 additions & 5 deletions src/storage/src/source/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -445,11 +445,11 @@ pub trait SourceReader {
where
Self: Sized + 'a,
{
Box::pin(async_stream::stream! {
Box::pin(async_stream::stream!({
while let Some(msg) = self.next(timestamp_frequency).await {
yield msg;
}
})
}))
}
}

Expand Down Expand Up @@ -950,8 +950,16 @@ where
let sync_activator = scope.sync_activator_for(&info.address[..]);
let base_metrics = base_metrics.clone();
let source_connector = source_connector.clone();
let mut source_reader = Box::pin(async_stream::stream! {
let mut timestamper = match ReclockOperator::new(name.clone(), storage_metadata, now, timestamp_frequency.clone(), as_of.clone()).await {
let mut source_reader = Box::pin(async_stream::stream!({
let mut timestamper = match ReclockOperator::new(
name.clone(),
storage_metadata,
now,
timestamp_frequency.clone(),
as_of.clone(),
)
.await
{
Ok(t) => t,
Err(e) => {
error!("Failed to create source {} timestamper: {:#}", name, e);
Expand Down Expand Up @@ -1037,7 +1045,7 @@ where
}
}
}
});
}));

let activator = scope.activator_for(&info.address[..]);
move |cap, output| {
Expand Down
4 changes: 2 additions & 2 deletions src/storage/src/source/persist_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ where

// This is a generator that sets up an async `Stream` that can be continously polled to get the
// values that are `yield`-ed from it's body.
let async_stream = async_stream::try_stream! {
let async_stream = async_stream::try_stream!({
// We are reading only from worker 0. We can split the work of reading from the snapshot to
// multiple workers, but someone has to distribute the splits. Also, in the glorious
// STORAGE future, we will use multiple persist shards to back a STORAGE collection. Then,
Expand Down Expand Up @@ -112,7 +112,7 @@ where
yield event;
}
}
};
});

let mut pinned_stream = Box::pin(async_stream);

Expand Down