From d9bc803ae2cd25ff841d8915c42cfa147de517b7 Mon Sep 17 00:00:00 2001 From: Ryan Alameddine Date: Sun, 23 Feb 2025 13:27:34 -0800 Subject: [PATCH] feat(hydroflow): Re-introduced poll_futures. --- Cargo.lock | 10 +- benches/Cargo.toml | 5 + benches/benches/futures.rs | 175 ++++++++++++++++++ dfir_lang/src/graph/ops/mod.rs | 2 + dfir_lang/src/graph/ops/poll_futures.rs | 111 +++++++++++ .../src/graph/ops/poll_futures_ordered.rs | 26 +++ dfir_rs/tests/surface_poll_futures.rs | 94 ++++++++++ dfir_rs/tests/surface_poll_futures_ordered.rs | 94 ++++++++++ hydro_lang/src/ir.rs | 81 ++++++++ hydro_lang/src/rewrites/persist_pullup.rs | 22 +++ hydro_lang/src/stream.rs | 107 +++++++++++ hydro_test_local/Cargo.toml | 1 + hydro_test_local/src/local/futures.rs | 97 ++++++++++ hydro_test_local/src/local/mod.rs | 1 + 14 files changed, 822 insertions(+), 4 deletions(-) create mode 100644 benches/benches/futures.rs create mode 100644 dfir_lang/src/graph/ops/poll_futures.rs create mode 100644 dfir_lang/src/graph/ops/poll_futures_ordered.rs create mode 100644 dfir_rs/tests/surface_poll_futures.rs create mode 100644 dfir_rs/tests/surface_poll_futures_ordered.rs create mode 100644 hydro_test_local/src/local/futures.rs diff --git a/Cargo.lock b/Cargo.lock index c4febfb60bfd..6f9b975358fb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -356,6 +356,7 @@ dependencies = [ "criterion", "dfir_rs", "differential-dataflow-master", + "futures", "nameof", "rand", "rand_distr", @@ -1707,6 +1708,7 @@ dependencies = [ "rand", "stageleft", "stageleft_tool", + "tokio", ] [[package]] @@ -3896,9 +3898,9 @@ checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20" [[package]] name = "tokio" -version = "1.39.3" +version = "1.43.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9babc99b9923bfa4804bd74722ff02c0381021eafa4db9949217e3be8e84fff5" +checksum = "3d61fa4ffa3de412bfea335c6ecff681de2b609ba3c77ef3e00e521813a9ed9e" dependencies = [ "backtrace", "bytes", @@ -3914,9 +3916,9 @@ dependencies = [ [[package]] name = "tokio-macros" -version = "2.4.0" +version = "2.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "693d596312e88961bc67d7f1f97af8a70227d9f90c31bba5806eec004978d752" +checksum = "6e06d43f1345a3bcd39f6a56dbb7dcab2ba47e68e8ac134855e7e2bdbaf8cab8" dependencies = [ "proc-macro2", "quote", diff --git a/benches/Cargo.toml b/benches/Cargo.toml index bcfbbfa78b59..2d571adec298 100644 --- a/benches/Cargo.toml +++ b/benches/Cargo.toml @@ -16,6 +16,7 @@ workspace = true criterion = { version = "0.5.0", features = [ "async_tokio", "html_reports" ] } dfir_rs = { path = "../dfir_rs", features = [ "debugging" ] } nameof = "1.0.0" +futures = "0.3" rand = "0.8.0" rand_distr = "0.4.3" seq-macro = "0.2.0" @@ -67,3 +68,7 @@ harness = false [[bench]] name = "words_diamond" harness = false + +[[bench]] +name = "futures" +harness = false \ No newline at end of file diff --git a/benches/benches/futures.rs b/benches/benches/futures.rs new file mode 100644 index 000000000000..87d80ea609f3 --- /dev/null +++ b/benches/benches/futures.rs @@ -0,0 +1,175 @@ +use std::cell::RefCell; +use std::future::Future; +use std::pin::Pin; +use std::rc::Rc; +use std::task::{Context, Poll, Waker}; +use std::time::Duration; + +use criterion::{criterion_group, criterion_main, Criterion}; +use hydroflow::hydroflow_syntax; +use hydroflow::scheduled::graph::Hydroflow; + +const NUM_ELEMS: u32 = 3000; + +/// A future which returns () after it manually woken +pub struct ManualFut { + done: Rc>, + waker: Rc>>, +} +impl ManualFut { + pub fn new(done: Rc>, waker: Rc>>) -> ManualFut { + ManualFut { done, waker } + } +} + +impl Future for ManualFut { + type Output = (); + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + match *self.done.borrow() { + true => Poll::Ready(()), + false => { + self.waker.replace(Some(cx.waker().clone())); + Poll::Pending + } + } + } +} + +fn benchmark_immediately_available(c: &mut Criterion) { + c.bench_function("futures/immediately_available", |b| { + b.iter_batched( + || { + let mut df = hydroflow_syntax! { + source_iter(0..NUM_ELEMS) + -> map(|x| async move { + x + }) + -> defer_tick() + -> poll_futures() + -> for_each(|_| {}); + }; + + df.run_tick(); // skip loading and mapping to future + df + }, + |mut df| { + df.run_tick(); + }, + criterion::BatchSize::SmallInput, + ); + }); +} + +type WakeInfo = (Rc>, Vec>>>); + +fn benchmark_delayed(c: &mut Criterion) { + fn setup<'a>(count: u32, wake_one: bool) -> (Hydroflow<'a>, WakeInfo) { + let done = Rc::new(RefCell::new(false)); + let mut wakers = Vec::new(); + + let range = 0..count; + let futs = range + .map(|i| { + let waker = Rc::new(RefCell::new(None)); + let d = if !wake_one || i == 0 { + wakers.push(waker.clone()); + done.clone() + } else { + Rc::new(RefCell::new(false)) + }; + ManualFut::new(d, waker) + }) + .collect::>(); + + let df = { + hydroflow_syntax! { + source_iter(futs) + -> poll_futures() + -> for_each(|_| {}); + } + }; + + (df, (done, wakers)) + } + + fn wake_all((done, wakers): WakeInfo) { + *done.borrow_mut() = true; + wakers.into_iter().for_each(|waker| { + if let Some(waker) = waker.borrow_mut().take() { + waker.wake(); + } else { + panic!("waker not found but future should have been polled") + } + }) + } + + // Tick with the initial poll + c.bench_function("futures/delayed/initial", |b| { + b.iter_batched( + || setup(NUM_ELEMS, false).0, + |mut df| { + df.run_tick(); + }, + criterion::BatchSize::SmallInput, + ); + }); + + // Tick when no results are available + c.bench_function("futures/delayed/waiting", |b| { + b.iter_batched( + || { + let (mut df, wakes) = setup(NUM_ELEMS, true); + df.run_tick(); + df.run_tick(); + df.run_tick(); + wake_all(wakes); + df + }, + |mut df| { + df.run_tick(); + }, + criterion::BatchSize::SmallInput, + ); + }); + + // Tick when results became available + c.bench_function("futures/delayed/ready", |b| { + b.iter_batched( + || { + let (mut df, wakes) = setup(NUM_ELEMS, false); + df.run_tick(); + wake_all(wakes); + df + }, + |mut df| { + df.run_tick(); + }, + criterion::BatchSize::SmallInput, + ); + }); + // Tick after all results have been consumed + c.bench_function("futures/delayed/done", |b| { + b.iter_batched( + || { + let (mut df, wakes) = setup(NUM_ELEMS, false); + df.run_tick(); + wake_all(wakes); + df.run_tick(); + df + }, + |mut df| { + df.run_tick(); + }, + criterion::BatchSize::SmallInput, + ); + }); +} + +criterion_group!( + name=futures; + config=Criterion::default().measurement_time(Duration::from_secs(30)); + targets=benchmark_immediately_available, + benchmark_delayed +); +criterion_main!(futures); diff --git a/dfir_lang/src/graph/ops/mod.rs b/dfir_lang/src/graph/ops/mod.rs index 3baeac215bfe..288bb505f75d 100644 --- a/dfir_lang/src/graph/ops/mod.rs +++ b/dfir_lang/src/graph/ops/mod.rs @@ -296,6 +296,8 @@ declare_ops![ persist_mut::PERSIST_MUT, persist_mut_keyed::PERSIST_MUT_KEYED, prefix::PREFIX, + poll_futures::POLL_FUTURES, + poll_futures_ordered::POLL_FUTURES_ORDERED, py_udf::PY_UDF, reduce::REDUCE, spin::SPIN, diff --git a/dfir_lang/src/graph/ops/poll_futures.rs b/dfir_lang/src/graph/ops/poll_futures.rs new file mode 100644 index 000000000000..bcb2fab354ca --- /dev/null +++ b/dfir_lang/src/graph/ops/poll_futures.rs @@ -0,0 +1,111 @@ +use quote::quote_spanned; +use syn::Ident; + +use super::{ + OperatorCategory, OperatorConstraints, OperatorWriteOutput, WriteContextArgs, RANGE_0, RANGE_1 +}; + +pub const POLL_FUTURES: OperatorConstraints = OperatorConstraints { + name: "poll_futures", + categories: &[OperatorCategory::Map], + hard_range_inn: RANGE_1, + soft_range_inn: RANGE_1, + hard_range_out: RANGE_1, + soft_range_out: RANGE_1, + num_args: 0, + persistence_args: RANGE_0, + type_args: RANGE_0, + is_external_input: false, + has_singleton_output: false, + flo_type: None, + ports_inn: None, + ports_out: None, + input_delaytype_fn: |_| None, + write_fn: move |wc, _| poll_futures_writer( + Ident::new("FuturesUnordered", wc.op_span), + Ident::new("push", wc.op_span), + wc) +}; + +pub fn poll_futures_writer(future_type: Ident, push_fn: Ident, wc @ &WriteContextArgs { + root, + context, + op_span, + ident, + inputs, + outputs, + is_pull, + .. +} : &WriteContextArgs) -> Result { + let futures_ident = wc.make_ident("futures"); + + let write_prologue = quote_spanned! {op_span=> + let #futures_ident = df.add_state( + ::std::cell::RefCell::new( + #root::futures::stream::#future_type::new() + ) + ); + }; + + let write_iterator = if is_pull { + let input = &inputs[0]; + quote_spanned! {op_span=> + let #ident = { + let mut out = ::std::vec::Vec::new(); + + #input + .for_each(|fut| { + let mut fut = ::std::boxed::Box::pin(fut); + if let #root::futures::task::Poll::Ready(val) = #root::futures::Future::poll(::std::pin::Pin::as_mut(&mut fut), &mut ::std::task::Context::from_waker(&#context.waker())) { + out.push(val); + } else { + #context + .state_ref(#futures_ident) + .borrow_mut() + .#push_fn(fut); + } + }); + while let #root::futures::task::Poll::Ready(Some(val)) = + #root::futures::Stream::poll_next(::std::pin::Pin::new(&mut *#context + .state_ref(#futures_ident) + .borrow_mut() + ), &mut ::std::task::Context::from_waker(&#context.waker())) + { + out.push(val); + } + + ::std::iter::IntoIterator::into_iter(out) + }; + } + } else { + let output = &outputs[0]; + quote_spanned! {op_span=> + let #ident = { + let mut out = #output; + let consumer = #root::pusherator::for_each::ForEach::new(|fut| { + let fut = ::std::boxed::Box::pin(fut); + #context + .state_ref(#futures_ident) + .borrow_mut() + .#push_fn(fut); + #context.schedule_subgraph(#context.current_subgraph(), true); + }); + while let #root::futures::task::Poll::Ready(Some(val)) = + #root::futures::Stream::poll_next(::std::pin::Pin::new(&mut *#context + .state_ref(#futures_ident) + .borrow_mut() + ), &mut ::std::task::Context::from_waker(&#context.waker())) + { + #root::pusherator::Pusherator::give(&mut out, val) + } + + consumer + }; + } + }; + Ok(OperatorWriteOutput { + write_prologue, + write_iterator, + ..Default::default() + }) +} diff --git a/dfir_lang/src/graph/ops/poll_futures_ordered.rs b/dfir_lang/src/graph/ops/poll_futures_ordered.rs new file mode 100644 index 000000000000..884de57a2320 --- /dev/null +++ b/dfir_lang/src/graph/ops/poll_futures_ordered.rs @@ -0,0 +1,26 @@ +use syn::Ident; + +use super::{ + poll_futures::poll_futures_writer, OperatorCategory, OperatorConstraints, RANGE_0, RANGE_1 +}; + +pub const POLL_FUTURES_ORDERED: OperatorConstraints = OperatorConstraints { + name: "poll_futures_ordered", + categories: &[OperatorCategory::Map], + hard_range_inn: RANGE_1, + soft_range_inn: RANGE_1, + hard_range_out: RANGE_1, + soft_range_out: RANGE_1, + num_args: 0, + persistence_args: RANGE_0, + type_args: RANGE_0, + is_external_input: false, + has_singleton_output: false, + flo_type: None, + ports_inn: None, + ports_out: None, + input_delaytype_fn: |_| None, + write_fn: move |wc, _| poll_futures_writer(Ident::new("FuturesOrdered", wc.op_span), + Ident::new("push_back", wc.op_span), + wc) +}; diff --git a/dfir_rs/tests/surface_poll_futures.rs b/dfir_rs/tests/surface_poll_futures.rs new file mode 100644 index 000000000000..c976ea416c7c --- /dev/null +++ b/dfir_rs/tests/surface_poll_futures.rs @@ -0,0 +1,94 @@ +use std::collections::HashSet; + +use dfir_rs::dfir_syntax; +use dfir_rs::util::collect_ready_async; +use multiplatform_test::multiplatform_test; +use tokio::time::{sleep, Duration}; + +#[multiplatform_test(dfir, env_tracing)] +async fn single_batch_test() { + let (result_send, mut result_recv) = dfir_rs::util::unbounded_channel::(); + + let mut df = dfir_syntax! { + source_iter(0..10) + -> map(|x| async move { + sleep(Duration::from_millis(100)).await; + x + }) + -> poll_futures() + -> for_each(|x| result_send.send(x).unwrap()); + }; + + let handle = tokio::task::spawn(async move { + sleep(Duration::from_secs(1)).await; + assert_eq!( + HashSet::from_iter([0, 1, 2, 3, 4, 5, 6, 7, 8, 9]), + collect_ready_async::, _>(&mut result_recv).await + ); + }); + + tokio::time::timeout(Duration::from_secs(2), df.run_async()) + .await + .expect_err("Expected time out"); + + handle.await.unwrap(); +} + +#[multiplatform_test(dfir, env_tracing)] +async fn multi_batch_test() { + let (result_send, mut result_recv) = dfir_rs::util::unbounded_channel::(); + + let mut df = dfir_syntax! { + source_iter([2, 3, 1, 9, 6, 5, 4, 7, 8]) + -> map(|x| async move { + sleep(Duration::from_millis(10*x)).await; + x + }) + -> poll_futures() + -> for_each(|x| result_send.send(x).unwrap()); + }; + + let handle = tokio::task::spawn(async move { + sleep(Duration::from_secs(1)).await; + assert_eq!( + HashSet::from_iter([1, 2, 3, 4, 5, 6, 7, 8, 9]), + collect_ready_async::, _>(&mut result_recv).await + ); + }); + + tokio::time::timeout(Duration::from_secs(2), df.run_async()) + .await + .expect_err("Expected time out"); + + handle.await.unwrap(); +} + +#[multiplatform_test(dfir, env_tracing)] +async fn pusherator_test() { + let (result_send, mut result_recv) = dfir_rs::util::unbounded_channel::(); + + let mut df = dfir_syntax! { + ins = source_iter([2, 3, 1, 9, 6, 5, 4, 7, 8]) + -> tee(); + + ins -> for_each(|_| {}); + ins -> map(|x| async move { + sleep(Duration::from_millis(10*x)).await; + x + }) -> poll_futures() -> for_each(|x| result_send.send(x).unwrap()); + }; + + let handle = tokio::task::spawn(async move { + sleep(Duration::from_secs(1)).await; + assert_eq!( + HashSet::from_iter([1, 2, 3, 4, 5, 6, 7, 8, 9]), + collect_ready_async::, _>(&mut result_recv).await + ); + }); + + tokio::time::timeout(Duration::from_secs(2), df.run_async()) + .await + .expect_err("Expected time out"); + + handle.await.unwrap(); +} diff --git a/dfir_rs/tests/surface_poll_futures_ordered.rs b/dfir_rs/tests/surface_poll_futures_ordered.rs new file mode 100644 index 000000000000..55aded3fb969 --- /dev/null +++ b/dfir_rs/tests/surface_poll_futures_ordered.rs @@ -0,0 +1,94 @@ +use std::collections::HashSet; + +use dfir_rs::dfir_syntax; +use dfir_rs::util::collect_ready_async; +use multiplatform_test::multiplatform_test; +use tokio::time::{sleep, Duration}; + +#[multiplatform_test(dfir, env_tracing)] +async fn single_batch_test() { + let (result_send, mut result_recv) = dfir_rs::util::unbounded_channel::(); + + let mut df = dfir_syntax! { + source_iter(0..10) + -> map(|x| async move { + sleep(Duration::from_millis(100)).await; + x + }) + -> poll_futures_ordered() + -> for_each(|x| result_send.send(x).unwrap()); + }; + + let handle = tokio::task::spawn(async move { + sleep(Duration::from_secs(1)).await; + assert_eq!( + Vec::from_iter([0, 1, 2, 3, 4, 5, 6, 7, 8, 9]), + collect_ready_async::, _>(&mut result_recv).await + ); + }); + + tokio::time::timeout(Duration::from_secs(2), df.run_async()) + .await + .expect_err("Expected time out"); + + handle.await.unwrap(); +} + +#[multiplatform_test(dfir, env_tracing)] +async fn multi_batch_test() { + let (result_send, mut result_recv) = dfir_rs::util::unbounded_channel::(); + + let mut df = dfir_syntax! { + source_iter([2, 3, 1, 9, 6, 5, 4, 7, 8]) + -> map(|x| async move { + sleep(Duration::from_millis(10*x)).await; + x + }) + -> poll_futures_ordered() + -> for_each(|x| result_send.send(x).unwrap()); + }; + + let handle = tokio::task::spawn(async move { + sleep(Duration::from_secs(1)).await; + assert_eq!( + Vec::from_iter([2, 3, 1, 9, 6, 5, 4, 7, 8]), + collect_ready_async::, _>(&mut result_recv).await + ); + }); + + tokio::time::timeout(Duration::from_secs(2), df.run_async()) + .await + .expect_err("Expected time out"); + + handle.await.unwrap(); +} + +#[multiplatform_test(dfir, env_tracing)] +async fn pusherator_test() { + let (result_send, mut result_recv) = dfir_rs::util::unbounded_channel::(); + + let mut df = dfir_syntax! { + ins = source_iter([2, 3, 1, 9, 6, 5, 4, 7, 8]) + -> tee(); + + ins -> for_each(|_| {}); + ins -> map(|x| async move { + sleep(Duration::from_millis(10*x)).await; + x + }) -> poll_futures_ordered() -> for_each(|x| result_send.send(x).unwrap()); + }; + + let handle = tokio::task::spawn(async move { + sleep(Duration::from_secs(1)).await; + assert_eq!( + HashSet::from_iter([2, 3, 1, 9, 6, 5, 4, 7, 8]), + collect_ready_async::, _>(&mut result_recv).await + ); + }); + + tokio::time::timeout(Duration::from_secs(2), df.run_async()) + .await + .expect_err("Expected time out"); + + handle.await.unwrap(); +} diff --git a/hydro_lang/src/ir.rs b/hydro_lang/src/ir.rs index cc1ba451acf6..04c250581669 100644 --- a/hydro_lang/src/ir.rs +++ b/hydro_lang/src/ir.rs @@ -584,6 +584,15 @@ pub enum HydroNode { metadata: HydroIrMetadata, }, + PollFutures { + input: Box, + metadata: HydroIrMetadata, + }, + PollFuturesOrdered { + input: Box, + metadata: HydroIrMetadata, + }, + Map { f: DebugExpr, input: Box, @@ -830,6 +839,8 @@ impl<'a> HydroNode { } HydroNode::Map { input, .. } + | HydroNode::PollFutures { input, .. } + | HydroNode::PollFuturesOrdered { input, .. } | HydroNode::FlatMap { input, .. } | HydroNode::Filter { input, .. } | HydroNode::FilterMap { input, .. } @@ -950,6 +961,14 @@ impl<'a> HydroNode { neg: Box::new(neg.deep_clone(seen_tees)), metadata: metadata.clone(), }, + HydroNode::PollFutures { input, metadata } => HydroNode::PollFutures { + input: Box::new(input.deep_clone(seen_tees)), + metadata: metadata.clone(), + }, + HydroNode::PollFuturesOrdered { input, metadata } => HydroNode::PollFuturesOrdered { + input: Box::new(input.deep_clone(seen_tees)), + metadata: metadata.clone(), + }, HydroNode::Map { f, input, metadata } => HydroNode::Map { f: f.clone(), input: Box::new(input.deep_clone(seen_tees)), @@ -1467,6 +1486,64 @@ impl<'a> HydroNode { (stream_ident, pos_location_id) } + HydroNode::PollFutures { input, .. } => { + let (input_ident, input_location_id) = + input.emit_core(builders_or_callback, built_tees, next_stmt_id); + + let futures_id = *next_stmt_id; + *next_stmt_id += 1; + + let futures_ident = + syn::Ident::new(&format!("stream_{}", futures_id), Span::call_site()); + + match builders_or_callback { + BuildersOrCallback::Builders(graph_builders) => { + let builder = graph_builders.entry(input_location_id).or_default(); + builder.add_dfir( + parse_quote! { + #futures_ident = #input_ident -> poll_futures(); + }, + None, + Some(&futures_id.to_string()), + ); + } + BuildersOrCallback::Callback(_, ref mut node_callback) => { + node_callback(self, futures_id); + } + } + + (futures_ident, input_location_id) + } + + HydroNode::PollFuturesOrdered { input, .. } => { + let (input_ident, input_location_id) = + input.emit_core(builders_or_callback, built_tees, next_stmt_id); + + let futures_id = *next_stmt_id; + *next_stmt_id += 1; + + let futures_ident = + syn::Ident::new(&format!("stream_{}", futures_id), Span::call_site()); + + match builders_or_callback { + BuildersOrCallback::Builders(graph_builders) => { + let builder = graph_builders.entry(input_location_id).or_default(); + builder.add_dfir( + parse_quote! { + #futures_ident = #input_ident -> poll_futures_ordered(); + }, + None, + Some(&futures_id.to_string()), + ); + } + BuildersOrCallback::Callback(_, ref mut node_callback) => { + node_callback(self, futures_id); + } + } + + (futures_ident, input_location_id) + } + HydroNode::Map { f, input, .. } => { let (input_ident, input_location_id) = input.emit_core(builders_or_callback, built_tees, next_stmt_id); @@ -2010,6 +2087,8 @@ impl<'a> HydroNode { HydroNode::Join { metadata, .. } => metadata, HydroNode::Difference { metadata, .. } => metadata, HydroNode::AntiJoin { metadata, .. } => metadata, + HydroNode::PollFutures { metadata, .. } => metadata, + HydroNode::PollFuturesOrdered { metadata, .. } => metadata, HydroNode::Map { metadata, .. } => metadata, HydroNode::FlatMap { metadata, .. } => metadata, HydroNode::Filter { metadata, .. } => metadata, @@ -2100,6 +2179,8 @@ impl<'a> HydroNode { HydroNode::AntiJoin { pos, neg, .. } => { format!("AntiJoin({}, {})", pos.print_root(), neg.print_root()) } + HydroNode::PollFutures { .. } => "PollFutures()".to_string(), + HydroNode::PollFuturesOrdered { .. } => "PollFuturesOrdered()".to_string(), HydroNode::Map { f, .. } => format!("Map({:?})", f), HydroNode::FlatMap { f, .. } => format!("FlatMap({:?})", f), HydroNode::Filter { f, .. } => format!("Filter({:?})", f), diff --git a/hydro_lang/src/rewrites/persist_pullup.rs b/hydro_lang/src/rewrites/persist_pullup.rs index 48908ee1467d..dcf235d2387d 100644 --- a/hydro_lang/src/rewrites/persist_pullup.rs +++ b/hydro_lang/src/rewrites/persist_pullup.rs @@ -45,6 +45,28 @@ fn persist_pullup_node( } } + HydroNode::PollFutures { + input: mb!(* HydroNode::Persist { inner: behind_persist, .. }), + metadata, + } => HydroNode::Persist { + inner: Box::new(HydroNode::PollFutures { + input: behind_persist, + metadata: metadata.clone(), + }), + metadata: metadata.clone(), + }, + + HydroNode::PollFuturesOrdered { + input: mb!(* HydroNode::Persist { inner: behind_persist, .. }), + metadata, + } => HydroNode::Persist { + inner: Box::new(HydroNode::PollFuturesOrdered { + input: behind_persist, + metadata: metadata.clone(), + }), + metadata: metadata.clone(), + }, + HydroNode::Map { f, input: mb!(* HydroNode::Persist { inner: behind_persist, .. }), diff --git a/hydro_lang/src/stream.rs b/hydro_lang/src/stream.rs index 06045fc015db..ee5c069ade59 100644 --- a/hydro_lang/src/stream.rs +++ b/hydro_lang/src/stream.rs @@ -1,4 +1,5 @@ use std::cell::RefCell; +use std::future::Future; use std::hash::Hash; use std::marker::PhantomData; use std::ops::Deref; @@ -1573,6 +1574,112 @@ impl<'a, T, L: Location<'a> + NoTick + NoAtomic, B, Order> Stream`, produces a new stream of the resulting `T` outputs. + /// Future outputs are produced as available, regardless of input arrival order. + /// + /// # Example + /// ```rust + /// # use std::collections::HashSet; + /// # use dfir_rs::futures::StreamExt; + /// # use hydro_lang::*; + /// # tokio_test::block_on(test_util::stream_transform_test( + /// |process| { + /// let tick = process.tick(); + /// process.source_iter(q!([2, 3, 1, 9, 6, 5, 4, 7, 8])) + /// .map(q!(|x| async move { + /// // tokio::time::sleep works, import then just sleep does not, unsure why + /// tokio::time::sleep(tokio::time::Duration::from_millis(10)).await; + /// x + /// })) + /// .poll_futures() + /// }, + /// |mut stream| async move { + /// assert_eq!( + /// HashSet::::from_iter(1..10), + /// HashSet::from_iter(vec![stream.next().await.unwrap(), stream.next().await.unwrap(), stream.next().await.unwrap(), stream.next().await.unwrap(), stream.next().await.unwrap(), stream.next().await.unwrap(), stream.next().await.unwrap(), stream.next().await.unwrap(), stream.next().await.unwrap()]) + /// ); + /// }, + /// # )); + pub fn poll_futures(self) -> Stream + where + T: Future, + { + Stream::new( + self.location.clone(), + HydroNode::PollFutures { + input: Box::new(self.ir_node.into_inner()), + metadata: self.location.new_node_metadata::(), + }, + ) + } + + /// Consumes a stream of `Future`, produces a new stream of the resulting `T` outputs. + /// Future outputs are produced in the same order as the input stream. + /// + /// # Example + /// ```rust + /// # use std::collections::HashSet; + /// # use dfir_rs::futures::StreamExt; + /// # use hydro_lang::*; + /// # tokio_test::block_on(test_util::stream_transform_test( + /// |process| { + /// let tick = process.tick(); + /// process.source_iter(q!([2, 3, 1, 9, 6, 5, 4, 7, 8])) + /// .map(q!(|x| async move { + /// // tokio::time::sleep works, import then just sleep does not, unsure why + /// tokio::time::sleep(tokio::time::Duration::from_millis(10)).await; + /// x + /// })) + /// .poll_futures_ordered() + /// }, + /// |mut stream| async move { + /// assert_eq!( + /// vec![2, 3, 1, 9, 6, 5, 4, 7, 8], + /// vec![stream.next().await.unwrap(), stream.next().await.unwrap(), stream.next().await.unwrap(), stream.next().await.unwrap(), stream.next().await.unwrap(), stream.next().await.unwrap(), stream.next().await.unwrap(), stream.next().await.unwrap(), stream.next().await.unwrap()] + /// ); + /// }, + /// # )); + pub fn poll_futures_ordered(self) -> Stream + where + T: Future, + { + Stream::new( + self.location.clone(), + HydroNode::PollFuturesOrdered { + input: Box::new(self.ir_node.into_inner()), + metadata: self.location.new_node_metadata::(), + }, + ) + } + + // fn test() { + // use std::collections::HashSet; + + // use dfir_rs::futures::StreamExt; + // use tokio::time::Duration; + // use super::*; + // // use hydro_lang::*; + // tokio_test::block_on(test_util::stream_transform_test( + // |process| { + // let tick = process.tick(); + // process.source_iter(q!([2, 3, 1, 9, 6, 5, 4, 7, 8])) + // .map(q!(|x| async move { + // // tokio::time::sleep works, import then just sleep does not, unsure why + // tokio::time::sleep(Duration::from_millis(10)).await; + // x + // })) + // .poll_futures() + // }, + // |stream| async move { + + // assert_eq!( + // HashSet::::from_iter(1..10), + // HashSet::from_iter(vec![stream.next().await.unwrap(), stream.next().await.unwrap(), stream.next().await.unwrap(), stream.next().await.unwrap(), stream.next().await.unwrap(), stream.next().await.unwrap(), stream.next().await.unwrap(), stream.next().await.unwrap(), stream.next().await.unwrap(), stream.next().await.unwrap()]) + // ); + // }, + // )); + // } + /// Given a tick, returns a stream corresponding to a batch of elements segmented by /// that tick. These batches are guaranteed to be contiguous across ticks and preserve /// the order of the input. diff --git a/hydro_test_local/Cargo.toml b/hydro_test_local/Cargo.toml index 5488699f8c58..1281f99ae717 100644 --- a/hydro_test_local/Cargo.toml +++ b/hydro_test_local/Cargo.toml @@ -19,6 +19,7 @@ stageleft = { path = "../stageleft", version = "^0.6.0" } rand = "0.8.0" hydro_test_local_macro = { path = "../hydro_test_local_macro" } +tokio = "1.43.0" [build-dependencies] stageleft_tool = { path = "../stageleft_tool", version = "^0.5.0" } diff --git a/hydro_test_local/src/local/futures.rs b/hydro_test_local/src/local/futures.rs new file mode 100644 index 000000000000..2af6a2791879 --- /dev/null +++ b/hydro_test_local/src/local/futures.rs @@ -0,0 +1,97 @@ +use std::time::Duration; + +use dfir_rs::tokio; +use dfir_rs::tokio::sync::mpsc::UnboundedSender; +use hydro_lang::deploy::SingleProcessGraph; +use hydro_lang::dfir_rs::scheduled::graph::Dfir; +use hydro_lang::*; +use stageleft::{q, Quoted, RuntimeData}; + +#[stageleft::entry] +pub fn unordered<'a>( + flow: FlowBuilder<'a>, + output: RuntimeData<&'a UnboundedSender>, +) -> impl Quoted<'a, Dfir<'a>> { + let process = flow.process::<()>(); + + process + .source_iter(q!([2, 3, 1, 9, 6, 5, 4, 7, 8])) + .map(q!(|x| async move { + // tokio::time::sleep works, import then just sleep does not, unsure why + tokio::time::sleep(Duration::from_millis(10)).await; + x + })) + .poll_futures() + .for_each(q!(|x| output.send(x).unwrap())); + + flow.compile_no_network::() +} + +#[stageleft::entry] +pub fn ordered<'a>( + flow: FlowBuilder<'a>, + output: RuntimeData<&'a UnboundedSender>, +) -> impl Quoted<'a, Dfir<'a>> { + let process = flow.process::<()>(); + + process + .source_iter(q!([2, 3, 1, 9, 6, 5, 4, 7, 8])) + .map(q!(|x| async move { + // tokio::time::sleep works, import then just sleep does not, unsure why + tokio::time::sleep(Duration::from_millis(10)).await; + x + })) + .poll_futures_ordered() + .for_each(q!(|x| output.send(x).unwrap())); + + flow.compile_no_network::() +} + +#[cfg(stageleft_runtime)] +#[cfg(test)] +mod tests { + use std::collections::HashSet; + use std::time::Duration; + + use dfir_rs::util::collect_ready_async; + + #[tokio::test] + async fn test_unordered() { + let (out, mut out_recv) = dfir_rs::util::unbounded_channel(); + + let mut flow = super::unordered!(&out); + let handle = tokio::task::spawn(async move { + tokio::time::sleep(Duration::from_secs(1)).await; + assert_eq!( + HashSet::from_iter(1..10), + collect_ready_async::, _>(&mut out_recv).await + ); + }); + + tokio::time::timeout(Duration::from_secs(2), flow.run_async()) + .await + .expect_err("Expected time out"); + + handle.await.unwrap(); + } + + #[tokio::test] + async fn test_ordered() { + let (out, mut out_recv) = dfir_rs::util::unbounded_channel(); + + let mut flow = super::ordered!(&out); + let handle = tokio::task::spawn(async move { + tokio::time::sleep(Duration::from_secs(1)).await; + assert_eq!( + Vec::from_iter([2, 3, 1, 9, 6, 5, 4, 7, 8]), + collect_ready_async::, _>(&mut out_recv).await + ); + }); + + tokio::time::timeout(Duration::from_secs(2), flow.run_async()) + .await + .expect_err("Expected time out"); + + handle.await.unwrap(); + } +} diff --git a/hydro_test_local/src/local/mod.rs b/hydro_test_local/src/local/mod.rs index c6c723e588a7..3ba5d3d51a39 100644 --- a/hydro_test_local/src/local/mod.rs +++ b/hydro_test_local/src/local/mod.rs @@ -2,6 +2,7 @@ pub mod chat_app; pub mod compute_pi; pub mod count_elems; pub mod first_ten; +pub mod futures; pub mod graph_reachability; pub mod negation; pub mod teed_join;