Skip to content

Commit d7f7828

Browse files
Add Push::size_hint and VecPush terminal operator [ci-bench]
Added `size_hint(self: Pin<&mut Self>, hint: (usize, Option<usize>))` to the `Push` trait as the push-side analog of `Pull::size_hint`. This allows producers to announce how many items they're about to send, enabling downstream operators and sinks to pre-allocate. Trait changes: - `Push::size_hint`: default no-op implementation - `&mut P` blanket impl: forwards to inner - `PushVariadic::size_hint`: new required method Propagation through combinators: - Map, Inspect: pass through unchanged (1:1 mapping) - Filter, FilterMap: lower bound set to 0, upper preserved - FlatMap, Flatten: hint becomes (0, None) — output count unknown - Fanout, Unzip: forward to both branches - Persist: reserves on internal Vec buffer, then forwards - DemuxVar: forwards to all downstream pushes via PushVariadic - ForEach, SinkPush, ResolveFutures: use default no-op New terminal operator: - `VecPush<Buf>`: pushes items into a `Vec`, uses `size_hint` to call `Vec::reserve(hint.0)` for pre-allocation. Gated on `alloc` feature. - Constructor: `push::vec_push(buf)` creates a VecPush from a `&mut Vec<T>`. Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
1 parent 5bccb09 commit d7f7828

33 files changed

+272
-76
lines changed

dfir_lang/src/graph/meta_graph.rs

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -914,9 +914,8 @@ impl DfirGraph {
914914
});
915915
let send_port_code = send_ports.iter().map(|ident| {
916916
quote_spanned! {ident.span()=>
917-
let #ident = #root::dfir_pipes::push::for_each(|v| {
918-
#ident.give(Some(v));
919-
});
917+
let mut #ident = #ident.borrow_mut_give();
918+
let #ident = #root::dfir_pipes::push::vec_push(&mut *#ident);
920919
}
921920
});
922921

@@ -1217,6 +1216,14 @@ impl DfirGraph {
12171216
) -> #root::dfir_pipes::push::PushStep<Self::CanPend> {
12181217
#root::dfir_pipes::push::Push::poll_flush(self.project().inner, ctx)
12191218
}
1219+
1220+
#[inline(always)]
1221+
fn size_hint(
1222+
self: ::std::pin::Pin<&mut Self>,
1223+
hint: (usize, Option<usize>),
1224+
) {
1225+
#root::dfir_pipes::push::Push::size_hint(self.project().inner, hint)
1226+
}
12201227
}
12211228

12221229
PushGuard {

dfir_macro/src/lib.rs

Lines changed: 23 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -288,17 +288,15 @@ pub fn derive_demux_enum(item: proc_macro::TokenStream) -> proc_macro::TokenStre
288288
}),
289289
);
290290

291-
let variant_pats_sink_start_send =
292-
variants
293-
.iter()
294-
.zip(variant_localvars_sink.iter())
295-
.map(|(variant, sinkvar)| {
296-
let Variant { ident, fields, .. } = variant;
297-
let (fields_pat, push_item) = field_pattern_item(fields);
298-
quote! {
299-
Self::#ident #fields_pat => #sinkvar.as_mut().start_send(#push_item)
300-
}
301-
});
291+
let variant_pats_sink_start_send = variants.iter().zip(variant_localvars_sink.iter()).map(
292+
|(variant, sinkvar)| {
293+
let Variant { ident, fields, .. } = variant;
294+
let (fields_pat, push_item) = field_pattern_item(fields);
295+
quote! {
296+
Self::#ident #fields_pat => ::std::pin::Pin::as_mut(#sinkvar).start_send(#push_item)
297+
}
298+
},
299+
);
302300

303301
let (impl_generics_item, ty_generics, where_clause_item) = generics.split_for_impl();
304302
let (impl_generics_sink, _ty_generics_sink, where_clause_sink) =
@@ -392,11 +390,11 @@ pub fn derive_demux_enum(item: proc_macro::TokenStream) -> proc_macro::TokenStre
392390
#(
393391
let #headvar = {
394392
let __ctx = <<#variant_generics_push as #root::dfir_pipes::push::Push<#variant_output_types, ()>>::Ctx<'_> as #root::dfir_pipes::Context<'_>>::unmerge_self(__ctx);
395-
#root::dfir_pipes::push::Push::#method_name(#headvar.as_mut(), __ctx)
393+
#root::dfir_pipes::push::Push::#method_name(::std::pin::Pin::as_mut(#headvar), __ctx)
396394
};
397395
let __ctx = <<#variant_generics_push as #root::dfir_pipes::push::Push<#variant_output_types, ()>>::Ctx<'_> as #root::dfir_pipes::Context<'_>>::unmerge_other(__ctx);
398396
)*
399-
let #lastvar = #root::dfir_pipes::push::Push::#method_name(#lastvar.as_mut(), __ctx);
397+
let #lastvar = #root::dfir_pipes::push::Push::#method_name(::std::pin::Pin::as_mut(#lastvar), __ctx);
400398
// If any are pending, return pending.
401399
#(
402400
if #variant_localvars_push.is_pending() {
@@ -531,6 +529,18 @@ pub fn derive_demux_enum(item: proc_macro::TokenStream) -> proc_macro::TokenStre
531529
#push_poll_flush_body
532530
#root::dfir_pipes::push::PushStep::Done
533531
}
532+
533+
fn size_hint(
534+
( #( #variant_localvars_push, )* ): &mut #variant_generics_pinned_push_all,
535+
__size_hint: (usize, ::std::option::Option<usize>),
536+
) {
537+
#(
538+
#root::dfir_pipes::push::Push::size_hint(
539+
::std::pin::Pin::as_mut(#variant_localvars_push),
540+
__size_hint,
541+
);
542+
)*
543+
}
534544
}
535545

536546
impl #impl_generics_item #root::util::demux_enum::DemuxEnumBase

dfir_pipes/src/pull/cross_singleton.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,14 @@ where
9999
PullStep::Ended(_) => PullStep::ended(),
100100
}
101101
}
102+
103+
fn size_hint(&self) -> (usize, Option<usize>) {
104+
let (mut lower, upper) = self.item_pull.size_hint();
105+
if self.singleton_state.borrow().is_none() {
106+
lower = 0;
107+
}
108+
(lower, upper)
109+
}
102110
}
103111

104112
impl<ItemPull, SinglePull, SingleState> FusedPull

dfir_pipes/src/pull/from_fn.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,4 +49,9 @@ where
4949
let this = self.get_mut();
5050
(this.func)()
5151
}
52+
53+
fn size_hint(&self) -> (usize, Option<usize>) {
54+
// Depends on UDF.
55+
(0, None)
56+
}
5257
}

dfir_pipes/src/pull/mod.rs

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -84,8 +84,7 @@ pub use stream_ready::StreamReady;
8484
#[cfg(feature = "std")]
8585
#[cfg_attr(docsrs, doc(cfg(feature = "std")))]
8686
pub use symmetric_hash_join::{
87-
NewTickJoinIter, NewTickJoinPull, SymmetricHashJoin, SymmetricHashJoinEither,
88-
symmetric_hash_join,
87+
NewTickJoinIter, SymmetricHashJoin, SymmetricHashJoinEither, symmetric_hash_join,
8988
};
9089
pub use take::Take;
9190
pub use take_while::TakeWhile;
@@ -220,12 +219,9 @@ pub trait Pull {
220219
/// That said, the implementation should provide a correct estimation,
221220
/// because otherwise it would be a violation of the trait's protocol.
222221
///
223-
/// The default implementation returns `(0, None)` which is correct for any
224-
/// pull.
225-
#[inline]
226-
fn size_hint(&self) -> (usize, Option<usize>) {
227-
(0, None)
228-
}
222+
/// A default implementation should return `(0, None)` which is correct for any
223+
/// pull. However this is not provided, to prevent oversight.
224+
fn size_hint(&self) -> (usize, Option<usize>);
229225

230226
/// Borrows this pull, allowing it to be used by reference.
231227
fn by_ref(&mut self) -> &mut Self {

dfir_pipes/src/pull/poll_fn.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,4 +50,9 @@ where
5050
let this = self.get_mut();
5151
(this.func)(ctx)
5252
}
53+
54+
fn size_hint(&self) -> (usize, Option<usize>) {
55+
// Depends on UDF.
56+
(0, None)
57+
}
5358
}

dfir_pipes/src/pull/symmetric_hash_join.rs

Lines changed: 14 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,8 @@ use pin_project_lite::pin_project;
99
use smallvec::SmallVec;
1010

1111
use crate::pull::half_join_state::HalfJoinState;
12-
use crate::pull::{FusedPull, Pull, PullStep};
13-
use crate::{Context, No, Toggle, Yes};
12+
use crate::pull::{self, FusedPull, Pull, PullStep};
13+
use crate::{Context, Toggle};
1414

1515
pin_project! {
1616
/// Pull combinator for symmetric hash join operations.
@@ -124,6 +124,11 @@ where
124124
return PullStep::ended();
125125
}
126126
}
127+
128+
fn size_hint(&self) -> (usize, Option<usize>) {
129+
// TODO(mingwei): actual estimate
130+
(0, None)
131+
}
127132
}
128133

129134
/// Iterator for new tick - iterates over all matches after both sides are drained.
@@ -204,6 +209,11 @@ where
204209
self.next_rhs_smaller()
205210
}
206211
}
212+
213+
fn size_hint(&self) -> (usize, Option<usize>) {
214+
// TODO(mingwei): proper size hint estimate
215+
(0, None)
216+
}
207217
}
208218

209219
impl<'a, Key, V1, V2, LhsState, RhsState> NewTickJoinIter<'a, Key, V1, V2, LhsState, RhsState>
@@ -291,50 +301,9 @@ where
291301
}
292302
}
293303

294-
pin_project! {
295-
/// Pull wrapper for the new tick iterator case.
296-
#[must_use = "`Pull`s do nothing unless polled"]
297-
pub struct NewTickJoinPull<'a, Key, V1, V2, LhsState, RhsState>
298-
where
299-
Key: Clone,
300-
V1: Clone,
301-
V2: Clone,
302-
{
303-
iter: NewTickJoinIter<'a, Key, V1, V2, LhsState, RhsState>,
304-
}
305-
}
306-
307-
impl<'a, Key, V1, V2, LhsState, RhsState> Pull
308-
for NewTickJoinPull<'a, Key, V1, V2, LhsState, RhsState>
309-
where
310-
Key: Eq + std::hash::Hash + Clone,
311-
V1: Clone,
312-
V2: Clone,
313-
LhsState: HalfJoinState<Key, V1, V2>,
314-
RhsState: HalfJoinState<Key, V2, V1>,
315-
{
316-
type Ctx<'ctx> = ();
317-
318-
type Item = (Key, (V1, V2));
319-
type Meta = ();
320-
type CanPend = No;
321-
type CanEnd = Yes;
322-
323-
fn pull(
324-
self: Pin<&mut Self>,
325-
_ctx: &mut Self::Ctx<'_>,
326-
) -> PullStep<Self::Item, Self::Meta, Self::CanPend, Self::CanEnd> {
327-
let this = self.project();
328-
match this.iter.next() {
329-
Some(item) => PullStep::Ready(item, ()),
330-
None => PullStep::Ended(Yes),
331-
}
332-
}
333-
}
334-
335304
/// Type alias for the `Either` pull returned by [`symmetric_hash_join`].
336305
pub type SymmetricHashJoinEither<'a, Key, V1, V2, Lhs, Rhs, LhsState, RhsState> = Either<
337-
NewTickJoinPull<'a, Key, V1, V2, LhsState, RhsState>,
306+
pull::Iter<NewTickJoinIter<'a, Key, V1, V2, LhsState, RhsState>>,
338307
SymmetricHashJoin<Lhs, Rhs, &'a mut LhsState, &'a mut RhsState, LhsState, RhsState>,
339308
>;
340309

@@ -370,7 +339,7 @@ where
370339
} else {
371340
NewTickJoinIter::new_rhs_smaller(lhs_state, rhs_state)
372341
};
373-
SymmetricHashJoinEither::Left(NewTickJoinPull { iter })
342+
SymmetricHashJoinEither::Left(pull::iter(iter))
374343
} else {
375344
SymmetricHashJoinEither::Right(SymmetricHashJoin::new(lhs, rhs, lhs_state, rhs_state))
376345
}

dfir_pipes/src/push/demux_var.rs

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,9 @@ where
7070
/// unmerged context slice. All pushes are flushed even if one returns
7171
/// pending, so that all wakers are registered.
7272
fn poll_flush(self: Pin<&mut Self>, ctx: &mut Self::Ctx<'_>) -> PushStep<Self::CanPend>;
73+
74+
/// Inform all downstream pushes that approximately `hint` items are about to be sent.
75+
fn size_hint(self: Pin<&mut Self>, hint: (usize, Option<usize>));
7376
}
7477

7578
/// Recursive case: a push `P` followed by the rest of the variadic `Rest`.
@@ -108,6 +111,12 @@ where
108111
);
109112
PushStep::Done
110113
}
114+
115+
fn size_hint(self: Pin<&mut Self>, hint: (usize, Option<usize>)) {
116+
let (push, rest) = pin_project_pair(self);
117+
push.size_hint(hint);
118+
rest.size_hint(hint);
119+
}
111120
}
112121

113122
/// Base case: the empty variadic. Always ready, panics on send.
@@ -130,6 +139,8 @@ where
130139
fn poll_flush(self: Pin<&mut Self>, _ctx: &mut Self::Ctx<'_>) -> PushStep<No> {
131140
PushStep::Done
132141
}
142+
143+
fn size_hint(self: Pin<&mut Self>, _hint: (usize, Option<usize>)) {}
133144
}
134145

135146
/// Pin-projects a pair `(A, B)` into its two pinned components.
@@ -199,6 +210,10 @@ where
199210
fn poll_flush(self: Pin<&mut Self>, ctx: &mut Self::Ctx<'_>) -> PushStep<Self::CanPend> {
200211
self.project().pushes.poll_flush(ctx)
201212
}
213+
214+
fn size_hint(self: Pin<&mut Self>, hint: (usize, Option<usize>)) {
215+
self.project().pushes.size_hint(hint);
216+
}
202217
}
203218

204219
/// Creates a [`DemuxVar`] push that dispatches each `(usize, Item)` pair to

dfir_pipes/src/push/fanout.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,12 @@ where
6767
);
6868
PushStep::Done
6969
}
70+
71+
fn size_hint(self: Pin<&mut Self>, hint: (usize, Option<usize>)) {
72+
let this = self.project();
73+
this.push_0.size_hint(hint);
74+
this.push_1.size_hint(hint);
75+
}
7076
}
7177

7278
#[cfg(test)]

dfir_pipes/src/push/filter.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,4 +50,8 @@ where
5050
fn poll_flush(self: Pin<&mut Self>, ctx: &mut Self::Ctx<'_>) -> PushStep<Self::CanPend> {
5151
self.project().next.poll_flush(ctx)
5252
}
53+
54+
fn size_hint(self: Pin<&mut Self>, hint: (usize, Option<usize>)) {
55+
self.project().next.size_hint((0, hint.1));
56+
}
5357
}

0 commit comments

Comments
 (0)