Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ const MAX_COUNT_BEFORE_YIELD: usize = 1000;
const MAX_UPPERS_FOLLOWER_PRODUCT: usize = 31;

type TaskIdVec = SmallVec<[TaskId; 4]>;
type TaskIdWithCountVec = SmallVec<[(TaskId, u32); 2]>;

/// Returns true, when a node is aggregating its children and a partial subgraph.
pub fn is_aggregating_node(aggregation_number: u32) -> bool {
Expand Down Expand Up @@ -226,11 +227,21 @@ pub enum AggregationUpdateJob {
upper_ids: TaskIdVec,
new_follower_id: TaskId,
},
/// Notifies multiple upper tasks that one of its inner tasks has a new follower.
InnerOfUppersHasNewFollowerWithCount {
upper_ids: TaskIdWithCountVec,
new_follower_id: TaskId,
},
/// Notifies an upper task that one of its inner tasks has new followers.
InnerOfUpperHasNewFollowers {
upper_id: TaskId,
new_follower_ids: TaskIdVec,
},
/// Notifies an upper task that one of its inner tasks has new followers.
InnerOfUpperHasNewFollowersWithCount {
upper_id: TaskId,
new_follower_ids: TaskIdWithCountVec,
},
/// Notifies multiple upper tasks that one of its inner tasks has new followers.
InnerOfUppersHasNewFollowers(Box<InnerOfUppersHasNewFollowersJob>),
/// Notifies multiple upper tasks that one of its inner tasks has lost a follower.
Expand Down Expand Up @@ -1026,6 +1037,7 @@ impl AggregationUpdateQueue {
ctx,
new_follower_ids[0],
upper_ids[0],
1,
);
} else if uppers > followers {
if let Some(new_follower_id) = new_follower_ids.pop() {
Expand Down Expand Up @@ -1058,7 +1070,18 @@ impl AggregationUpdateQueue {
new_follower_id,
} => {
if upper_ids.len() == 1 {
self.inner_of_upper_has_new_follower(ctx, new_follower_id, upper_ids[0]);
self.inner_of_upper_has_new_follower(ctx, new_follower_id, upper_ids[0], 1);
} else {
self.inner_of_uppers_has_new_follower(ctx, new_follower_id, upper_ids);
}
}
AggregationUpdateJob::InnerOfUppersHasNewFollowerWithCount {
upper_ids,
new_follower_id,
} => {
if upper_ids.len() == 1 {
let (id, count) = upper_ids[0];
self.inner_of_upper_has_new_follower(ctx, new_follower_id, id, count);
} else {
self.inner_of_uppers_has_new_follower(ctx, new_follower_id, upper_ids);
}
Expand All @@ -1068,7 +1091,18 @@ impl AggregationUpdateQueue {
new_follower_ids,
} => {
if new_follower_ids.len() == 1 {
self.inner_of_upper_has_new_follower(ctx, new_follower_ids[0], upper_id);
self.inner_of_upper_has_new_follower(ctx, new_follower_ids[0], upper_id, 1);
} else {
self.inner_of_upper_has_new_followers(ctx, new_follower_ids, upper_id);
}
}
AggregationUpdateJob::InnerOfUpperHasNewFollowersWithCount {
upper_id,
new_follower_ids,
} => {
if new_follower_ids.len() == 1 {
let (id, count) = new_follower_ids[0];
self.inner_of_upper_has_new_follower(ctx, id, upper_id, count);
} else {
self.inner_of_upper_has_new_followers(ctx, new_follower_ids, upper_id);
}
Expand All @@ -1077,7 +1111,7 @@ impl AggregationUpdateQueue {
upper_id,
new_follower_id,
} => {
self.inner_of_upper_has_new_follower(ctx, new_follower_id, upper_id);
self.inner_of_upper_has_new_follower(ctx, new_follower_id, upper_id, 1);
}
AggregationUpdateJob::InnerOfUppersLostFollowers(mut boxed) => {
let InnerOfUppersLostFollowersJob {
Expand Down Expand Up @@ -1826,11 +1860,11 @@ impl AggregationUpdateQueue {
}
}

fn inner_of_uppers_has_new_follower(
fn inner_of_uppers_has_new_follower<T: TaskIdWithOptionalCount, const N: usize>(
&mut self,
ctx: &mut impl ExecuteContext,
new_follower_id: TaskId,
mut upper_ids: TaskIdVec,
mut upper_ids: SmallVec<[T; N]>,
) {
#[cfg(feature = "trace_aggregation_update")]
let _span =
Expand All @@ -1844,11 +1878,12 @@ impl AggregationUpdateQueue {
);
get_aggregation_number(&follower)
};
let mut upper_upper_ids_with_new_follower = SmallVec::new();
let mut upper_upper_ids_with_new_follower = FxHashMap::default();
let mut tasks_for_which_increment_active_count = SmallVec::new();
let mut is_active = false;

swap_retain(&mut upper_ids, |&mut upper_id| {
swap_retain(&mut upper_ids, |upper_item| {
let (upper_id, count) = upper_item.task_id_and_count();
let mut upper = ctx.task(
upper_id,
// For performance reasons this should stay `Meta` and not `All`
Expand All @@ -1866,7 +1901,7 @@ impl AggregationUpdateQueue {
Follower {
task: new_follower_id
},
1
count
) {
// May optimize the task
if count!(upper, Follower).is_power_of_two() {
Expand All @@ -1882,7 +1917,11 @@ impl AggregationUpdateQueue {
}
}
// notify uppers about new follower
upper_upper_ids_with_new_follower.extend(iter_uppers(&upper));
for upper_id in iter_uppers(&upper) {
*upper_upper_ids_with_new_follower
.entry(upper_id)
.or_insert(0) += 1;
}
}

// Balancing is only needed when they are equal. This is not perfect from
Expand Down Expand Up @@ -1913,8 +1952,9 @@ impl AggregationUpdateQueue {
);
let mut uppers_count: Option<usize> = None;
let mut persistent_uppers = 0;
swap_retain(&mut upper_ids, |&mut upper_id| {
if update_count!(follower, Upper { task: upper_id }, 1) {
swap_retain(&mut upper_ids, |upper_item| {
let (upper_id, count) = upper_item.task_id_and_count();
if update_count!(follower, Upper { task: upper_id }, count) {
// It's a new upper
let uppers_count = uppers_count.get_or_insert_with(|| {
let count =
Expand Down Expand Up @@ -1950,33 +1990,37 @@ impl AggregationUpdateQueue {
if has_data || !is_active {
// add data to upper
// For performance reasons this should stay `Meta` and not `All`
ctx.for_each_task_meta(upper_ids.iter().copied(), |mut upper, ctx| {
if has_data {
let diff = data.apply(&mut upper, ctx.should_track_activeness(), self);
if !diff.is_empty() {
let upper_ids = get_uppers(&upper);
self.push(
AggregatedDataUpdateJob {
upper_ids,
update: diff,
}
.into(),
)
ctx.for_each_task_meta(
upper_ids.iter().map(|entry| entry.task_id()),
|mut upper, ctx| {
if has_data {
let diff =
data.apply(&mut upper, ctx.should_track_activeness(), self);
if !diff.is_empty() {
let upper_ids = get_uppers(&upper);
self.push(
AggregatedDataUpdateJob {
upper_ids,
update: diff,
}
.into(),
)
}
}
}
if !is_active {
// We need to check this again, since this might have changed in the
// meantime due to race conditions
if upper.has_key(&CachedDataItemKey::Activeness {}) {
is_active = true;
if !is_active {
// We need to check this again, since this might have changed in the
// meantime due to race conditions
if upper.has_key(&CachedDataItemKey::Activeness {}) {
is_active = true;
}
}
}
});
},
);
}
if !children.is_empty() {
self.push(
InnerOfUppersHasNewFollowersJob {
upper_ids: upper_ids.clone(),
upper_ids: upper_ids.into_iter().map(|entry| entry.task_id()).collect(),
new_follower_ids: children,
}
.into(),
Expand All @@ -1997,17 +2041,17 @@ impl AggregationUpdateQueue {
if !upper_upper_ids_with_new_follower.is_empty() {
#[cfg(feature = "trace_aggregation_update")]
let _span = trace_span!("new follower").entered();
self.push(AggregationUpdateJob::InnerOfUppersHasNewFollower {
upper_ids: upper_upper_ids_with_new_follower,
self.push(AggregationUpdateJob::InnerOfUppersHasNewFollowerWithCount {
upper_ids: upper_upper_ids_with_new_follower.into_iter().collect(),
new_follower_id,
});
}
}

fn inner_of_upper_has_new_followers(
fn inner_of_upper_has_new_followers<T: TaskIdWithOptionalCount, const N: usize>(
&mut self,
ctx: &mut impl ExecuteContext,
new_follower_ids: TaskIdVec,
new_follower_ids: SmallVec<[T; N]>,
upper_id: TaskId,
) {
#[cfg(feature = "trace_aggregation_update")]
Expand All @@ -2019,13 +2063,14 @@ impl AggregationUpdateQueue {

let mut followers_with_aggregation_number = new_follower_ids
.into_iter()
.map(|new_follower_id| {
.map(|new_follower_entry| {
let (new_follower_id, count) = new_follower_entry.task_id_and_count();
let follower = ctx.task(
new_follower_id,
// For performance reasons this should stay `Meta` and not `All`
TaskDataCategory::Meta,
);
(new_follower_id, get_aggregation_number(&follower))
(new_follower_id, count, get_aggregation_number(&follower))
})
.collect::<SmallVec<[_; 4]>>();

Expand All @@ -2050,13 +2095,13 @@ impl AggregationUpdateQueue {

if !is_root_node(upper_aggregation_number) {
followers_with_aggregation_number.retain(
|(follower_id, follower_aggregation_number)| {
|(follower_id, count, follower_aggregation_number)| {
if upper_aggregation_number > *follower_aggregation_number {
// It's an inner node, continue with the list
return true;
}
// It's a follower of the upper node
if update_count!(upper, Follower { task: *follower_id }, 1) {
if update_count!(upper, Follower { task: *follower_id }, *count) {
// May optimize the task
if count!(upper, Follower).is_power_of_two() {
self.push_optimize_task(upper_id);
Expand Down Expand Up @@ -2089,16 +2134,16 @@ impl AggregationUpdateQueue {
#[cfg(feature = "trace_aggregation_update")]
let _span = trace_span!("new inner").entered();
let mut upper_data_updates = Vec::new();
let mut upper_new_followers = SmallVec::new();
let mut upper_new_followers = FxHashMap::default();
swap_retain(
&mut inner_tasks_with_aggregation_number,
|&mut (inner_id, _)| {
|&mut (inner_id, count, _)| {
let mut inner = ctx.task(
inner_id,
// For performance reasons this should stay `Meta` and not `All`
TaskDataCategory::Meta,
);
if update_count!(inner, Upper { task: upper_id }, 1) {
if update_count!(inner, Upper { task: upper_id }, count) {
if count!(inner, Upper).is_power_of_two() {
self.push_optimize_task(inner_id);
}
Expand All @@ -2112,7 +2157,9 @@ impl AggregationUpdateQueue {
if !data.is_empty() {
upper_data_updates.push(data);
}
upper_new_followers.extend(children);
for &child_id in &children {
*upper_new_followers.entry(child_id).or_insert(0) += 1;
}

// Balancing is only needed when they are equal (or could have become equal
// in the meantime). This is not perfect from
Expand All @@ -2134,9 +2181,9 @@ impl AggregationUpdateQueue {
);

if !upper_new_followers.is_empty() {
self.push(AggregationUpdateJob::InnerOfUpperHasNewFollowers {
self.push(AggregationUpdateJob::InnerOfUpperHasNewFollowersWithCount {
upper_id,
new_follower_ids: upper_new_followers,
new_follower_ids: upper_new_followers.into_iter().collect(),
});
}
if !upper_data_updates.is_empty() {
Expand Down Expand Up @@ -2192,7 +2239,7 @@ impl AggregationUpdateQueue {
self.extend_find_and_schedule_dirty(
inner_tasks_with_aggregation_number
.into_iter()
.map(|(id, _)| id),
.map(|(id, _, _)| id),
);
}
}
Expand Down Expand Up @@ -2225,6 +2272,7 @@ impl AggregationUpdateQueue {
ctx: &mut impl ExecuteContext,
new_follower_id: TaskId,
upper_id: TaskId,
count: u32,
) {
#[cfg(feature = "trace_aggregation_update")]
let _span = trace_span!("process new follower").entered();
Expand Down Expand Up @@ -2258,7 +2306,7 @@ impl AggregationUpdateQueue {
Follower {
task: new_follower_id
},
1
count
) {
// May optimize the task
if count!(upper, Follower).is_power_of_two() {
Expand Down Expand Up @@ -2306,7 +2354,7 @@ impl AggregationUpdateQueue {
// For performance reasons this should stay `Meta` and not `All`
TaskDataCategory::Meta,
);
if update_count!(inner, Upper { task: upper_id }, 1) {
if update_count!(inner, Upper { task: upper_id }, count) {
if count!(inner, Upper).is_power_of_two() {
self.push_optimize_task(new_follower_id);
}
Expand Down Expand Up @@ -2685,6 +2733,31 @@ impl Operation for AggregationUpdateQueue {
}
}

trait TaskIdWithOptionalCount {
fn task_id(&self) -> TaskId;
fn task_id_and_count(&self) -> (TaskId, u32);
}

impl TaskIdWithOptionalCount for TaskId {
fn task_id(&self) -> TaskId {
*self
}

fn task_id_and_count(&self) -> (TaskId, u32) {
(*self, 1)
}
}

impl TaskIdWithOptionalCount for (TaskId, u32) {
fn task_id(&self) -> TaskId {
self.0
}

fn task_id_and_count(&self) -> (TaskId, u32) {
(self.0, self.1)
}
}

struct RetryTimeout;

const MAX_YIELD_DURATION: Duration = Duration::from_millis(1);
Expand Down
Loading