Skip to content

Commit c5e9302

Browse files
hymmjames7132
authored andcommitted
tick local executor (bevyengine#6121)
# Objective - bevyengine#4466 broke local tasks running. - Fixes bevyengine#6120 ## Solution - Add system for ticking local executors on main thread into bevy_core where the tasks pools are initialized. - Add ticking local executors into thread executors ## Changelog - tick all thread local executors in task pool. ## Notes - ~~Not 100% sure about this PR. Ticking the local executor for the main thread in scope feels a little kludgy as it requires users of bevy_tasks to be calling scope periodically for those tasks to make progress.~~ took this out in favor of a system that ticks the local executors.
1 parent ae3dbdc commit c5e9302

File tree

5 files changed

+111
-4
lines changed

5 files changed

+111
-4
lines changed

crates/bevy_core/Cargo.toml

+3
Original file line numberDiff line numberDiff line change
@@ -20,3 +20,6 @@ bevy_utils = { path = "../bevy_utils", version = "0.9.0-dev" }
2020

2121
# other
2222
bytemuck = "1.5"
23+
24+
[dev-dependencies]
25+
crossbeam-channel = "0.5.0"

crates/bevy_core/src/lib.rs

+51
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,11 @@ use bevy_utils::{Duration, HashSet, Instant};
2222
use std::borrow::Cow;
2323
use std::ops::Range;
2424

25+
#[cfg(not(target_arch = "wasm32"))]
26+
use bevy_ecs::schedule::IntoSystemDescriptor;
27+
#[cfg(not(target_arch = "wasm32"))]
28+
use bevy_tasks::tick_global_task_pools_on_main_thread;
29+
2530
/// Adds core functionality to Apps.
2631
#[derive(Default)]
2732
pub struct CorePlugin;
@@ -35,6 +40,13 @@ impl Plugin for CorePlugin {
3540
.unwrap_or_default()
3641
.create_default_pools();
3742

43+
#[cfg(not(target_arch = "wasm32"))]
44+
app.add_system_to_stage(
45+
bevy_app::CoreStage::Last,
46+
tick_global_task_pools_on_main_thread.at_end(),
47+
);
48+
49+
app.register_type::<Entity>().register_type::<Name>();
3850
app.register_type::<Entity>()
3951
.register_type::<Name>()
4052
.register_type::<Range<f32>>()
@@ -97,3 +109,42 @@ fn register_math_types(app: &mut App) {
97109
/// Wraps to 0 when it reaches the maximum u32 value
98110
#[derive(Default, Resource, Clone, Copy)]
99111
pub struct FrameCount(pub u32);
112+
113+
#[cfg(test)]
114+
mod tests {
115+
use super::*;
116+
use bevy_tasks::prelude::{AsyncComputeTaskPool, ComputeTaskPool, IoTaskPool};
117+
118+
#[test]
119+
fn runs_spawn_local_tasks() {
120+
let mut app = App::new();
121+
app.add_plugin(CorePlugin);
122+
123+
let (async_tx, async_rx) = crossbeam_channel::unbounded();
124+
AsyncComputeTaskPool::get()
125+
.spawn_local(async move {
126+
async_tx.send(()).unwrap();
127+
})
128+
.detach();
129+
130+
let (compute_tx, compute_rx) = crossbeam_channel::unbounded();
131+
ComputeTaskPool::get()
132+
.spawn_local(async move {
133+
compute_tx.send(()).unwrap();
134+
})
135+
.detach();
136+
137+
let (io_tx, io_rx) = crossbeam_channel::unbounded();
138+
IoTaskPool::get()
139+
.spawn_local(async move {
140+
io_tx.send(()).unwrap();
141+
})
142+
.detach();
143+
144+
app.run();
145+
146+
async_rx.try_recv().unwrap();
147+
compute_rx.try_recv().unwrap();
148+
io_rx.try_recv().unwrap();
149+
}
150+
}

crates/bevy_tasks/src/lib.rs

+2
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@ mod single_threaded_task_pool;
1818
pub use single_threaded_task_pool::{Scope, TaskPool, TaskPoolBuilder};
1919

2020
mod usages;
21+
#[cfg(not(target_arch = "wasm32"))]
22+
pub use usages::tick_global_task_pools_on_main_thread;
2123
pub use usages::{AsyncComputeTaskPool, ComputeTaskPool, IoTaskPool};
2224

2325
mod iter;

crates/bevy_tasks/src/task_pool.rs

+29-4
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ use std::{
88
};
99

1010
use concurrent_queue::ConcurrentQueue;
11-
use futures_lite::{future, pin};
11+
use futures_lite::{future, pin, FutureExt};
1212

1313
use crate::Task;
1414

@@ -117,9 +117,16 @@ impl TaskPool {
117117

118118
thread_builder
119119
.spawn(move || {
120-
let shutdown_future = ex.run(shutdown_rx.recv());
121-
// Use unwrap_err because we expect a Closed error
122-
future::block_on(shutdown_future).unwrap_err();
120+
TaskPool::LOCAL_EXECUTOR.with(|local_executor| {
121+
let tick_forever = async move {
122+
loop {
123+
local_executor.tick().await;
124+
}
125+
};
126+
let shutdown_future = ex.run(tick_forever.or(shutdown_rx.recv()));
127+
// Use unwrap_err because we expect a Closed error
128+
future::block_on(shutdown_future).unwrap_err();
129+
});
123130
})
124131
.expect("Failed to spawn thread.")
125132
})
@@ -314,6 +321,24 @@ impl TaskPool {
314321
{
315322
Task::new(TaskPool::LOCAL_EXECUTOR.with(|executor| executor.spawn(future)))
316323
}
324+
325+
/// Runs a function with the local executor. Typically used to tick
326+
/// the local executor on the main thread as it needs to share time with
327+
/// other things.
328+
///
329+
/// ```rust
330+
/// use bevy_tasks::TaskPool;
331+
///
332+
/// TaskPool::new().with_local_executor(|local_executor| {
333+
/// local_executor.try_tick();
334+
/// });
335+
/// ```
336+
pub fn with_local_executor<F, R>(&self, f: F) -> R
337+
where
338+
F: FnOnce(&async_executor::LocalExecutor) -> R,
339+
{
340+
Self::LOCAL_EXECUTOR.with(f)
341+
}
317342
}
318343

319344
impl Default for TaskPool {

crates/bevy_tasks/src/usages.rs

+26
Original file line numberDiff line numberDiff line change
@@ -109,3 +109,29 @@ impl Deref for IoTaskPool {
109109
&self.0
110110
}
111111
}
112+
113+
/// Used by `bevy_core` to tick the global tasks pools on the main thread.
114+
/// This will run a maximum of 100 local tasks per executor per call to this function.
115+
#[cfg(not(target_arch = "wasm32"))]
116+
pub fn tick_global_task_pools_on_main_thread() {
117+
COMPUTE_TASK_POOL
118+
.get()
119+
.unwrap()
120+
.with_local_executor(|compute_local_executor| {
121+
ASYNC_COMPUTE_TASK_POOL
122+
.get()
123+
.unwrap()
124+
.with_local_executor(|async_local_executor| {
125+
IO_TASK_POOL
126+
.get()
127+
.unwrap()
128+
.with_local_executor(|io_local_executor| {
129+
for _ in 0..100 {
130+
compute_local_executor.try_tick();
131+
async_local_executor.try_tick();
132+
io_local_executor.try_tick();
133+
}
134+
});
135+
});
136+
});
137+
}

0 commit comments

Comments
 (0)