-
Notifications
You must be signed in to change notification settings - Fork 1.7k
Intermediate result blocked approach to aggregation memory management #15591
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Intermediate result blocked approach to aggregation memory management #15591
Conversation
Hi @Rachelint I think I have a alternative proposal that seems relatively easy to implement. |
Really thanks. This design in pr indeed still introduces quite a few code changes... I tried to not modify anythings about
But I found this way will introduce too many extra cost... Maybe we place the |
cc37eba
to
f690940
Compare
95c6a36
to
a4c6f42
Compare
2100a5b
to
0ee951c
Compare
Has finished development(and test) of all needed common structs!
|
c51d409
to
2863809
Compare
It is very close, just need to add more tests! |
31d660d
to
2b8dd1e
Compare
}; | ||
|
||
fn generate_group_indices(len: usize) -> Vec<usize> { | ||
(0..len).collect() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does using indices 0..len
mean this benchmark is testing the case where each input has a new unique group?
If this is the case, then I think it would make sense that the benchmark shows lots of time allocating / zeroing memory: the bulk of the work will be copying each value into a new accumulator slot.
I have tried to larger the block size(8 * batch, 16 * batch...), but it seems make slight difference to the performance. So after experiement, I think
|
I think I nearly understand why about this, it is possibly led by
Got it!
I think it maybe be possible in this situation?
Yes, in current implementation, it only help to performance with |
Yeah I think that was expected.
Yeah it is quite efficient, although problematic for large inputs
|
If I don't misunderstand, does it mean strategy like that:
Agree. It also leads to large memory usage, because we only release memory after all the batches are returned(we hold the really large single batch in memory, and only return slice of it now, and only release memory at once after all slices are returned). |
Yes - exactly! |
I wonder what the plan is for this PR? From what I understand, it currently improves performance for aggregates with large numbers of groups, but (slightly) slows down aggregates for smaller numbers of groups. I think this is due to accessing group storage via two indirections (block index / actual index) It seems like the proposal is to have some sort of adaptive structure that uses one part indexes for small numbers of groups and then switches to two part indexes for larger numbers. |
I am experimenting if something we can do basing on this pr to improve performance more, like [memory reuse] (#15591 (comment)). Actually #16135 is part of the attempt. My biggest concern is if we can get more obvious improvement to make the change worthy... |
As I understood the way to get a bigger improvement would be to implement the chunked approach for more group storage / aggregates so that more queries in our benchmarks (like ClickBench) could use the new code path Though of course that would make this PR even bigger We could also make a "POC" type PR with some more implementation to prove out the performance and then break it into smaller pieces for review 🤔 |
Yes, I am trying to implement it for But according to the flamegraph, I found all such queries' bottleneck is actually I think the benefit from this one currently is:
Do you think it still worthy continuing to push forward for above benefits? |
Without having a full understanding of this PR (I have just been following the conversation because the change is exciting) my 2¢ is: for us memory management is currently one of the biggest thorns with DataFusion. It is quite hard at the moment to run with a fixed memory budget given the mix of exceeding memory through under accounting and refusing to run operators that can't spill / try to claim more memory than they actually use. |
I agree with @adriangb , even when it doesn't provide performance improvements still super valuable to push it forward. |
Thanks @adriangb @Dandandan . I just start my new job this week and a bit busy, and I will continue to push it forward this weekend. The new targets for this one may be
So the rest works I think:
|
Thanks for all your help @Rachelint and congratulations on the new job |
thanks @Rachelint and congratulations! |
Thank you for your contribution. Unfortunately, this pull request is stale because it has been open 60 days with no activity. Please remove the stale label or comment or this will be closed in 7 days. |
It is unfortunate we never figured out how to get this over the line 😢 Thank you for the effort anyways @Rachelint |
It is sorry... but actually I still want to continue push it forward... However it is too busy recent few months... |
Maybe if/when you are able to return to it with a fresh set of eyes after a break we'll make progress No worries at all -- I totally understand |
Thank you for your contribution. Unfortunately, this pull request is stale because it has been open 60 days with no activity. Please remove the stale label or comment or this will be closed in 7 days. |
Maybe someday we'll get back to it 😢 |
Which issue does this PR close?
Rationale for this change
As mentioned in #7065 , we use a single
Vec
to manageaggregation intermediate results
both inGroupAccumulator
andGroupValues
.It is simple but not efficient enough in high-cardinality aggregation, because when
Vec
is not large enough, we need to allocate a newVec
and copy all data from the old one.So this pr introduces a
blocked approach
to manage theaggregation intermediate results
. We will never resize theVec
in the approach, and instead we split the data to blocks, when the capacity is not enough, we just allocate a new block. Detail can see #7065What changes are included in this PR?
PrimitiveGroupsAccumulator
andGroupValuesPrimitive
as the exampleAre these changes tested?
Test by exist tests. And new unit tests, new fuzzy tests.
Are there any user-facing changes?
Two functions are added to
GroupValues
andGroupAccumulator
trait.But as you can see, there are default implementations for them, and users can choose to really support the blocked approach when wanting a better performance for their
udaf
s.