Skip to content

Commit

Permalink
feat(hydroflow): Re-introduced poll_futures.
Browse files Browse the repository at this point in the history
  • Loading branch information
Ryan Alameddine committed Feb 23, 2025
1 parent cf92350 commit d9bc803
Show file tree
Hide file tree
Showing 14 changed files with 822 additions and 4 deletions.
10 changes: 6 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions benches/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ workspace = true
criterion = { version = "0.5.0", features = [ "async_tokio", "html_reports" ] }
dfir_rs = { path = "../dfir_rs", features = [ "debugging" ] }
nameof = "1.0.0"
futures = "0.3"
rand = "0.8.0"
rand_distr = "0.4.3"
seq-macro = "0.2.0"
Expand Down Expand Up @@ -67,3 +68,7 @@ harness = false
[[bench]]
name = "words_diamond"
harness = false

[[bench]]
name = "futures"
harness = false
175 changes: 175 additions & 0 deletions benches/benches/futures.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
use std::cell::RefCell;
use std::future::Future;
use std::pin::Pin;
use std::rc::Rc;
use std::task::{Context, Poll, Waker};
use std::time::Duration;

use criterion::{criterion_group, criterion_main, Criterion};
use hydroflow::hydroflow_syntax;
use hydroflow::scheduled::graph::Hydroflow;

const NUM_ELEMS: u32 = 3000;

/// A future which returns () after it manually woken
pub struct ManualFut {
done: Rc<RefCell<bool>>,
waker: Rc<RefCell<Option<Waker>>>,
}
impl ManualFut {
pub fn new(done: Rc<RefCell<bool>>, waker: Rc<RefCell<Option<Waker>>>) -> ManualFut {
ManualFut { done, waker }
}
}

impl Future for ManualFut {
type Output = ();

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
match *self.done.borrow() {
true => Poll::Ready(()),
false => {
self.waker.replace(Some(cx.waker().clone()));
Poll::Pending
}
}
}
}

fn benchmark_immediately_available(c: &mut Criterion) {
c.bench_function("futures/immediately_available", |b| {
b.iter_batched(
|| {
let mut df = hydroflow_syntax! {
source_iter(0..NUM_ELEMS)
-> map(|x| async move {
x
})
-> defer_tick()
-> poll_futures()
-> for_each(|_| {});
};

df.run_tick(); // skip loading and mapping to future
df
},
|mut df| {
df.run_tick();
},
criterion::BatchSize::SmallInput,
);
});
}

type WakeInfo = (Rc<RefCell<bool>>, Vec<Rc<RefCell<Option<Waker>>>>);

fn benchmark_delayed(c: &mut Criterion) {
fn setup<'a>(count: u32, wake_one: bool) -> (Hydroflow<'a>, WakeInfo) {
let done = Rc::new(RefCell::new(false));
let mut wakers = Vec::new();

let range = 0..count;
let futs = range
.map(|i| {
let waker = Rc::new(RefCell::new(None));
let d = if !wake_one || i == 0 {
wakers.push(waker.clone());
done.clone()
} else {
Rc::new(RefCell::new(false))
};
ManualFut::new(d, waker)
})
.collect::<Vec<_>>();

let df = {
hydroflow_syntax! {
source_iter(futs)
-> poll_futures()
-> for_each(|_| {});
}
};

(df, (done, wakers))
}

fn wake_all((done, wakers): WakeInfo) {
*done.borrow_mut() = true;
wakers.into_iter().for_each(|waker| {
if let Some(waker) = waker.borrow_mut().take() {
waker.wake();
} else {
panic!("waker not found but future should have been polled")
}
})
}

// Tick with the initial poll
c.bench_function("futures/delayed/initial", |b| {
b.iter_batched(
|| setup(NUM_ELEMS, false).0,
|mut df| {
df.run_tick();
},
criterion::BatchSize::SmallInput,
);
});

// Tick when no results are available
c.bench_function("futures/delayed/waiting", |b| {
b.iter_batched(
|| {
let (mut df, wakes) = setup(NUM_ELEMS, true);
df.run_tick();
df.run_tick();
df.run_tick();
wake_all(wakes);
df
},
|mut df| {
df.run_tick();
},
criterion::BatchSize::SmallInput,
);
});

// Tick when results became available
c.bench_function("futures/delayed/ready", |b| {
b.iter_batched(
|| {
let (mut df, wakes) = setup(NUM_ELEMS, false);
df.run_tick();
wake_all(wakes);
df
},
|mut df| {
df.run_tick();
},
criterion::BatchSize::SmallInput,
);
});
// Tick after all results have been consumed
c.bench_function("futures/delayed/done", |b| {
b.iter_batched(
|| {
let (mut df, wakes) = setup(NUM_ELEMS, false);
df.run_tick();
wake_all(wakes);
df.run_tick();
df
},
|mut df| {
df.run_tick();
},
criterion::BatchSize::SmallInput,
);
});
}

criterion_group!(
name=futures;
config=Criterion::default().measurement_time(Duration::from_secs(30));
targets=benchmark_immediately_available,
benchmark_delayed
);
criterion_main!(futures);
2 changes: 2 additions & 0 deletions dfir_lang/src/graph/ops/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,8 @@ declare_ops![
persist_mut::PERSIST_MUT,
persist_mut_keyed::PERSIST_MUT_KEYED,
prefix::PREFIX,
poll_futures::POLL_FUTURES,
poll_futures_ordered::POLL_FUTURES_ORDERED,
py_udf::PY_UDF,
reduce::REDUCE,
spin::SPIN,
Expand Down
111 changes: 111 additions & 0 deletions dfir_lang/src/graph/ops/poll_futures.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
use quote::quote_spanned;
use syn::Ident;

use super::{
OperatorCategory, OperatorConstraints, OperatorWriteOutput, WriteContextArgs, RANGE_0, RANGE_1
};

pub const POLL_FUTURES: OperatorConstraints = OperatorConstraints {
name: "poll_futures",
categories: &[OperatorCategory::Map],
hard_range_inn: RANGE_1,
soft_range_inn: RANGE_1,
hard_range_out: RANGE_1,
soft_range_out: RANGE_1,
num_args: 0,
persistence_args: RANGE_0,
type_args: RANGE_0,
is_external_input: false,
has_singleton_output: false,
flo_type: None,
ports_inn: None,
ports_out: None,
input_delaytype_fn: |_| None,
write_fn: move |wc, _| poll_futures_writer(
Ident::new("FuturesUnordered", wc.op_span),
Ident::new("push", wc.op_span),
wc)
};

pub fn poll_futures_writer(future_type: Ident, push_fn: Ident, wc @ &WriteContextArgs {
root,
context,
op_span,
ident,
inputs,
outputs,
is_pull,
..
} : &WriteContextArgs) -> Result<OperatorWriteOutput, ()> {
let futures_ident = wc.make_ident("futures");

let write_prologue = quote_spanned! {op_span=>
let #futures_ident = df.add_state(
::std::cell::RefCell::new(
#root::futures::stream::#future_type::new()
)
);
};

let write_iterator = if is_pull {
let input = &inputs[0];
quote_spanned! {op_span=>
let #ident = {
let mut out = ::std::vec::Vec::new();

#input
.for_each(|fut| {
let mut fut = ::std::boxed::Box::pin(fut);
if let #root::futures::task::Poll::Ready(val) = #root::futures::Future::poll(::std::pin::Pin::as_mut(&mut fut), &mut ::std::task::Context::from_waker(&#context.waker())) {
out.push(val);
} else {
#context
.state_ref(#futures_ident)
.borrow_mut()
.#push_fn(fut);
}
});
while let #root::futures::task::Poll::Ready(Some(val)) =
#root::futures::Stream::poll_next(::std::pin::Pin::new(&mut *#context
.state_ref(#futures_ident)
.borrow_mut()
), &mut ::std::task::Context::from_waker(&#context.waker()))
{
out.push(val);
}

::std::iter::IntoIterator::into_iter(out)
};
}
} else {
let output = &outputs[0];
quote_spanned! {op_span=>
let #ident = {
let mut out = #output;
let consumer = #root::pusherator::for_each::ForEach::new(|fut| {
let fut = ::std::boxed::Box::pin(fut);
#context
.state_ref(#futures_ident)
.borrow_mut()
.#push_fn(fut);
#context.schedule_subgraph(#context.current_subgraph(), true);
});
while let #root::futures::task::Poll::Ready(Some(val)) =
#root::futures::Stream::poll_next(::std::pin::Pin::new(&mut *#context
.state_ref(#futures_ident)
.borrow_mut()
), &mut ::std::task::Context::from_waker(&#context.waker()))
{
#root::pusherator::Pusherator::give(&mut out, val)
}

consumer
};
}
};
Ok(OperatorWriteOutput {
write_prologue,
write_iterator,
..Default::default()
})
}
26 changes: 26 additions & 0 deletions dfir_lang/src/graph/ops/poll_futures_ordered.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
use syn::Ident;

use super::{
poll_futures::poll_futures_writer, OperatorCategory, OperatorConstraints, RANGE_0, RANGE_1
};

pub const POLL_FUTURES_ORDERED: OperatorConstraints = OperatorConstraints {
name: "poll_futures_ordered",
categories: &[OperatorCategory::Map],
hard_range_inn: RANGE_1,
soft_range_inn: RANGE_1,
hard_range_out: RANGE_1,
soft_range_out: RANGE_1,
num_args: 0,
persistence_args: RANGE_0,
type_args: RANGE_0,
is_external_input: false,
has_singleton_output: false,
flo_type: None,
ports_inn: None,
ports_out: None,
input_delaytype_fn: |_| None,
write_fn: move |wc, _| poll_futures_writer(Ident::new("FuturesOrdered", wc.op_span),
Ident::new("push_back", wc.op_span),
wc)
};
Loading

0 comments on commit d9bc803

Please sign in to comment.