-
Notifications
You must be signed in to change notification settings - Fork 1.7k
Intermediate result blocked approach to aggregation memory management #15591
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 50 commits
4353748
a4450f7
96b7435
38d4fc6
72e0fc3
2dba944
c08f23a
f6f3bd6
53e8c8e
b5d231e
a644b17
67f03fc
d165fb0
319e135
808f142
489f093
54002a1
1f7b4bc
e58afa5
4294ab7
79714a4
266b48e
55de98c
9145833
04f15b0
be64a74
7da0259
d771038
ffb11cd
7f543d8
868210f
bdcd1b8
3c7317d
ff9c3ad
96b3c77
3e23408
bb63628
a7c4c7b
e033567
d173056
29222e1
426e2ee
948c4ce
09b97ab
cee016c
75ee3f3
5a6e030
4c6799f
93e5f9d
7542b49
e8808eb
e3ba95c
8807026
62157a9
add409e
4e6193a
c7ce363
13296c1
da4c590
12c211d
cd4ffea
a9a42e5
c889300
f26d57e
b64b7f4
a650895
3b93e63
907b43e
7869779
cfc3135
9a72685
ab6276d
1890b77
98d04eb
669e0c7
14d8740
c1fc822
e6a06c1
1c417e5
7fbcde5
edc19fe
4fc6f8d
c02c5ef
71ef9ce
b001de8
ec658c8
73ab7f0
72b1f20
a8b8d35
600318f
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
| Original file line number | Diff line number | Diff line change | ||||||
|---|---|---|---|---|---|---|---|---|
|
|
@@ -18,19 +18,23 @@ | |||||||
| //! Vectorized [`GroupsAccumulator`] | ||||||||
|
|
||||||||
| use arrow::array::{ArrayRef, BooleanArray}; | ||||||||
| use datafusion_common::{not_impl_err, Result}; | ||||||||
| use datafusion_common::{not_impl_err, DataFusionError, Result}; | ||||||||
|
|
||||||||
| /// Describes how many rows should be emitted during grouping. | ||||||||
| #[derive(Debug, Clone, Copy)] | ||||||||
| pub enum EmitTo { | ||||||||
| /// Emit all groups | ||||||||
| /// Emit all groups, will clear all existing group indexes | ||||||||
| All, | ||||||||
| /// Emit only the first `n` groups and shift all existing group | ||||||||
| /// indexes down by `n`. | ||||||||
| /// | ||||||||
| /// For example, if `n=10`, group_index `0, 1, ... 9` are emitted | ||||||||
| /// and group indexes `10, 11, 12, ...` become `0, 1, 2, ...`. | ||||||||
| First(usize), | ||||||||
| /// Emit next block in the blocked managed groups | ||||||||
| /// | ||||||||
| /// Similar as `Emit::All`, will also clear all existing group indexes | ||||||||
| NextBlock, | ||||||||
| } | ||||||||
|
|
||||||||
| impl EmitTo { | ||||||||
|
|
@@ -39,6 +43,9 @@ impl EmitTo { | |||||||
| /// remaining values in `v`. | ||||||||
| /// | ||||||||
| /// This avoids copying if Self::All | ||||||||
| /// | ||||||||
| /// NOTICE: only support emit strategies: `Self::All` and `Self::First` | ||||||||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||||
| /// | ||||||||
| pub fn take_needed<T>(&self, v: &mut Vec<T>) -> Vec<T> { | ||||||||
| match self { | ||||||||
| Self::All => { | ||||||||
|
|
@@ -52,6 +59,7 @@ impl EmitTo { | |||||||
| std::mem::swap(v, &mut t); | ||||||||
| t | ||||||||
| } | ||||||||
| Self::NextBlock => unreachable!("don't support take block in take_needed"), | ||||||||
| } | ||||||||
| } | ||||||||
| } | ||||||||
|
|
@@ -250,4 +258,49 @@ pub trait GroupsAccumulator: Send { | |||||||
| /// This function is called once per batch, so it should be `O(n)` to | ||||||||
| /// compute, not `O(num_groups)` | ||||||||
| fn size(&self) -> usize; | ||||||||
|
|
||||||||
| /// Returns `true` if this accumulator supports blocked groups. | ||||||||
| /// | ||||||||
| /// Blocked groups(or called blocked management approach) is an optimization | ||||||||
| /// to reduce the cost of managing aggregation intermediate states. | ||||||||
| /// | ||||||||
| /// Here is brief introduction for two states management approaches: | ||||||||
| /// - Blocked approach, states are stored and managed in multiple `Vec`s, | ||||||||
| /// we call it `Block`s. Organize like this is for avoiding to resize `Vec` | ||||||||
| /// and allocate a new `Vec` instead to reduce cost and get better performance. | ||||||||
| /// When locating data in `Block`s, we need to use `block_id` to locate the | ||||||||
| /// needed `Block` at first, and use `block_offset` to locate the needed | ||||||||
| /// data in `Block` after. | ||||||||
| /// | ||||||||
| /// - Single approach, all states are stored and managed in a single large `Block`. | ||||||||
| /// So when locating data, `block_id` will always be 0, and we only need `block_offset` | ||||||||
| /// to locate data in the single `Block`. | ||||||||
| /// | ||||||||
| /// More details can see: | ||||||||
| /// <https://github.com/apache/datafusion/issues/7065> | ||||||||
| /// | ||||||||
| fn supports_blocked_groups(&self) -> bool { | ||||||||
| false | ||||||||
| } | ||||||||
|
|
||||||||
| /// Alter the block size in the accumulator | ||||||||
| /// | ||||||||
| /// If the target block size is `None`, it will use a single big | ||||||||
| /// block(can think it a `Vec`) to manage the state. | ||||||||
| /// | ||||||||
| /// If the target block size` is `Some(blk_size)`, it will try to | ||||||||
| /// set the block size to `blk_size`, and the try will only success | ||||||||
| /// when the accumulator has supported blocked mode. | ||||||||
| /// | ||||||||
| /// NOTICE: After altering block size, all data in previous will be cleared. | ||||||||
|
||||||||
| /// NOTICE: After altering block size, all data in previous will be cleared. | |
| /// NOTICE: After altering block size, all data in existing accumulators will be cleared. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if there is any value to testing the old code path (
enable_aggregation_blocked_groups = false) if our goal is to remove it eventually.I recommend only testing with the flag set to the default value
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, but it will only go through the new path when all accumaltors in query support
blocked approach, otherwise all of them will fallback toflat approach.So I think we still need to test it now but make sense to remove in future.