Skip to content

Commit d26b63a

Browse files
committed
Add a SystemParam primitive for deferred mutations; allow #[derive]ing more types of SystemParam (#6817)
# Objective One pattern to increase parallelism is deferred mutation: instead of directly mutating the world (and preventing other systems from running at the same time), you queue up operations to be applied to the world at the end of the stage. The most common example of this pattern uses the `Commands` SystemParam. In order to avoid the overhead associated with commands, some power users may want to add their own deferred mutation behavior. To do this, you must implement the unsafe trait `SystemParam`, which interfaces with engine internals in a way that we'd like users to be able to avoid. ## Solution Add the `Deferred<T>` primitive `SystemParam`, which encapsulates the deferred mutation pattern. This can be combined with other types of `SystemParam` to safely and ergonomically create powerful custom types. Essentially, this is just a variant of `Local<T>` which can run code at the end of the stage. This type is used in the engine to derive `Commands` and `ParallelCommands`, which removes a bunch of unsafe boilerplate. ### Example ```rust use bevy_ecs::system::{Deferred, SystemBuffer}; /// Sends events with a delay, but may run in parallel with other event writers. #[derive(SystemParam)] pub struct BufferedEventWriter<'s, E: Event> { queue: Deferred<'s, EventQueue<E>>, } struct EventQueue<E>(Vec<E>); impl<'s, E: Event> BufferedEventWriter<'s, E> { /// Queues up an event to be sent at the end of this stage. pub fn send(&mut self, event: E) { self.queue.0.push(event); } } // The `SystemBuffer` trait controls how [`Deferred`] gets applied at the end of the stage. impl<E: Event> SystemBuffer for EventQueue<E> { fn apply(&mut self, world: &mut World) { let mut events = world.resource_mut::<Events<E>>(); for e in self.0.drain(..) { events.send(e); } } } ``` --- ## Changelog + Added the `SystemParam` type `Deferred<T>`, which can be used to defer `World` mutations. Powered by the new trait `SystemBuffer`.
1 parent 952735f commit d26b63a

File tree

4 files changed

+210
-69
lines changed

4 files changed

+210
-69
lines changed

crates/bevy_ecs/src/lib.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,8 +43,8 @@ pub mod prelude {
4343
system::{
4444
adapter as system_adapter,
4545
adapter::{dbg, error, ignore, info, unwrap, warn},
46-
Commands, In, IntoPipeSystem, IntoSystem, Local, NonSend, NonSendMut, ParallelCommands,
47-
ParamSet, Query, Res, ResMut, Resource, System, SystemParamFunction,
46+
Commands, Deferred, In, IntoPipeSystem, IntoSystem, Local, NonSend, NonSendMut,
47+
ParallelCommands, ParamSet, Query, Res, ResMut, Resource, System, SystemParamFunction,
4848
},
4949
world::{FromWorld, World},
5050
};

crates/bevy_ecs/src/system/commands/mod.rs

Lines changed: 21 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2,16 +2,18 @@ mod command_queue;
22
mod parallel_scope;
33

44
use crate::{
5+
self as bevy_ecs,
56
bundle::Bundle,
67
entity::{Entities, Entity},
78
world::{FromWorld, World},
89
};
10+
use bevy_ecs_macros::SystemParam;
911
use bevy_utils::tracing::{error, info};
1012
pub use command_queue::CommandQueue;
1113
pub use parallel_scope::*;
1214
use std::marker::PhantomData;
1315

14-
use super::Resource;
16+
use super::{Deferred, Resource, SystemBuffer, SystemMeta};
1517

1618
/// A [`World`] mutation.
1719
///
@@ -97,22 +99,31 @@ pub trait Command: Send + 'static {
9799
/// [`System::apply_buffers`]: crate::system::System::apply_buffers
98100
/// [`apply_system_buffers`]: crate::schedule::apply_system_buffers
99101
/// [`Schedule::apply_system_buffers`]: crate::schedule::Schedule::apply_system_buffers
102+
#[derive(SystemParam)]
100103
pub struct Commands<'w, 's> {
101-
queue: &'s mut CommandQueue,
104+
queue: Deferred<'s, CommandQueue>,
102105
entities: &'w Entities,
103106
}
104107

108+
impl SystemBuffer for CommandQueue {
109+
#[inline]
110+
fn apply(&mut self, _system_meta: &SystemMeta, world: &mut World) {
111+
#[cfg(feature = "trace")]
112+
let _system_span =
113+
bevy_utils::tracing::info_span!("system_commands", name = _system_meta.name())
114+
.entered();
115+
self.apply(world);
116+
}
117+
}
118+
105119
impl<'w, 's> Commands<'w, 's> {
106120
/// Returns a new `Commands` instance from a [`CommandQueue`] and a [`World`].
107121
///
108122
/// It is not required to call this constructor when using `Commands` as a [system parameter].
109123
///
110124
/// [system parameter]: crate::system::SystemParam
111125
pub fn new(queue: &'s mut CommandQueue, world: &'w World) -> Self {
112-
Self {
113-
queue,
114-
entities: world.entities(),
115-
}
126+
Self::new_from_entities(queue, world.entities())
116127
}
117128

118129
/// Returns a new `Commands` instance from a [`CommandQueue`] and an [`Entities`] reference.
@@ -121,7 +132,10 @@ impl<'w, 's> Commands<'w, 's> {
121132
///
122133
/// [system parameter]: crate::system::SystemParam
123134
pub fn new_from_entities(queue: &'s mut CommandQueue, entities: &'w Entities) -> Self {
124-
Self { queue, entities }
135+
Self {
136+
queue: Deferred(queue),
137+
entities,
138+
}
125139
}
126140

127141
/// Pushes a [`Command`] to the queue for creating a new empty [`Entity`],

crates/bevy_ecs/src/system/commands/parallel_scope.rs

Lines changed: 9 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -3,17 +3,16 @@ use std::cell::Cell;
33
use thread_local::ThreadLocal;
44

55
use crate::{
6+
self as bevy_ecs,
67
entity::Entities,
78
prelude::World,
8-
system::{SystemMeta, SystemParam},
9+
system::{Deferred, SystemBuffer, SystemMeta, SystemParam},
910
};
1011

1112
use super::{CommandQueue, Commands};
1213

13-
/// The internal [`SystemParam`] state of the [`ParallelCommands`] type
14-
#[doc(hidden)]
1514
#[derive(Default)]
16-
pub struct ParallelCommandsState {
15+
struct ParallelCommandQueue {
1716
thread_local_storage: ThreadLocal<Cell<CommandQueue>>,
1817
}
1918

@@ -43,41 +42,23 @@ pub struct ParallelCommandsState {
4342
/// }
4443
/// # bevy_ecs::system::assert_is_system(parallel_command_system);
4544
///```
45+
#[derive(SystemParam)]
4646
pub struct ParallelCommands<'w, 's> {
47-
state: &'s mut ParallelCommandsState,
47+
state: Deferred<'s, ParallelCommandQueue>,
4848
entities: &'w Entities,
4949
}
5050

51-
// SAFETY: no component or resource access to report
52-
unsafe impl SystemParam for ParallelCommands<'_, '_> {
53-
type State = ParallelCommandsState;
54-
type Item<'w, 's> = ParallelCommands<'w, 's>;
55-
56-
fn init_state(_: &mut World, _: &mut crate::system::SystemMeta) -> Self::State {
57-
ParallelCommandsState::default()
58-
}
59-
60-
fn apply(state: &mut Self::State, _system_meta: &SystemMeta, world: &mut World) {
51+
impl SystemBuffer for ParallelCommandQueue {
52+
#[inline]
53+
fn apply(&mut self, _system_meta: &SystemMeta, world: &mut World) {
6154
#[cfg(feature = "trace")]
6255
let _system_span =
6356
bevy_utils::tracing::info_span!("system_commands", name = _system_meta.name())
6457
.entered();
65-
for cq in &mut state.thread_local_storage {
58+
for cq in &mut self.thread_local_storage {
6659
cq.get_mut().apply(world);
6760
}
6861
}
69-
70-
unsafe fn get_param<'w, 's>(
71-
state: &'s mut Self::State,
72-
_: &crate::system::SystemMeta,
73-
world: &'w World,
74-
_: u32,
75-
) -> Self::Item<'w, 's> {
76-
ParallelCommands {
77-
state,
78-
entities: world.entities(),
79-
}
80-
}
8162
}
8263

8364
impl<'w, 's> ParallelCommands<'w, 's> {

crates/bevy_ecs/src/system/system_param.rs

Lines changed: 178 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ use crate::{
88
query::{
99
Access, FilteredAccess, FilteredAccessSet, QueryState, ReadOnlyWorldQuery, WorldQuery,
1010
},
11-
system::{CommandQueue, Commands, Query, SystemMeta},
11+
system::{Query, SystemMeta},
1212
world::{FromWorld, World},
1313
};
1414
pub use bevy_ecs_macros::Resource;
@@ -153,6 +153,8 @@ pub unsafe trait SystemParam: Sized {
153153

154154
/// Applies any deferred mutations stored in this [`SystemParam`]'s state.
155155
/// This is used to apply [`Commands`] during [`apply_system_buffers`](crate::prelude::apply_system_buffers).
156+
///
157+
/// [`Commands`]: crate::prelude::Commands
156158
#[inline]
157159
#[allow(unused_variables)]
158160
fn apply(state: &mut Self::State, system_meta: &SystemMeta, world: &mut World) {}
@@ -593,37 +595,6 @@ unsafe impl<'a, T: Resource> SystemParam for Option<ResMut<'a, T>> {
593595
}
594596
}
595597

596-
// SAFETY: Commands only accesses internal state
597-
unsafe impl<'w, 's> ReadOnlySystemParam for Commands<'w, 's> {}
598-
599-
// SAFETY: `Commands::get_param` does not access the world.
600-
unsafe impl SystemParam for Commands<'_, '_> {
601-
type State = CommandQueue;
602-
type Item<'w, 's> = Commands<'w, 's>;
603-
604-
fn init_state(_world: &mut World, _system_meta: &mut SystemMeta) -> Self::State {
605-
Default::default()
606-
}
607-
608-
fn apply(state: &mut Self::State, _system_meta: &SystemMeta, world: &mut World) {
609-
#[cfg(feature = "trace")]
610-
let _system_span =
611-
bevy_utils::tracing::info_span!("system_commands", name = _system_meta.name())
612-
.entered();
613-
state.apply(world);
614-
}
615-
616-
#[inline]
617-
unsafe fn get_param<'w, 's>(
618-
state: &'s mut Self::State,
619-
_system_meta: &SystemMeta,
620-
world: &'w World,
621-
_change_tick: u32,
622-
) -> Self::Item<'w, 's> {
623-
Commands::new(state, world)
624-
}
625-
}
626-
627598
/// SAFETY: only reads world
628599
unsafe impl<'w> ReadOnlySystemParam for &'w World {}
629600

@@ -787,6 +758,181 @@ unsafe impl<'a, T: FromWorld + Send + 'static> SystemParam for Local<'a, T> {
787758
}
788759
}
789760

761+
/// Types that can be used with [`Deferred<T>`] in systems.
762+
/// This allows storing system-local data which is used to defer [`World`] mutations.
763+
///
764+
/// Types that implement `SystemBuffer` should take care to perform as many
765+
/// computations up-front as possible. Buffers cannot be applied in parallel,
766+
/// so you should try to minimize the time spent in [`SystemBuffer::apply`].
767+
pub trait SystemBuffer: FromWorld + Send + 'static {
768+
/// Applies any deferred mutations to the [`World`].
769+
fn apply(&mut self, system_meta: &SystemMeta, world: &mut World);
770+
}
771+
772+
/// A [`SystemParam`] that stores a buffer which gets applied to the [`World`] at the end of a stage.
773+
/// This is used internally by [`Commands`] to defer `World` mutations.
774+
///
775+
/// [`Commands`]: crate::system::Commands
776+
///
777+
/// # Examples
778+
///
779+
/// By using this type to defer mutations, you can avoid mutable `World` access within
780+
/// a system, which allows it to run in parallel with more systems.
781+
///
782+
/// Note that deferring mutations is *not* free, and should only be used if
783+
/// the gains in parallelization outweigh the time it takes to apply deferred mutations.
784+
/// In general, [`Deferred`] should only be used for mutations that are infrequent,
785+
/// or which otherwise take up a small portion of a system's run-time.
786+
///
787+
/// ```
788+
/// # use bevy_ecs::prelude::*;
789+
/// // Tracks whether or not there is a threat the player should be aware of.
790+
/// #[derive(Resource, Default)]
791+
/// pub struct Alarm(bool);
792+
///
793+
/// #[derive(Component)]
794+
/// pub struct Settlement {
795+
/// // ...
796+
/// }
797+
///
798+
/// // A threat from inside the settlement.
799+
/// #[derive(Component)]
800+
/// pub struct Criminal;
801+
///
802+
/// // A threat from outside the settlement.
803+
/// #[derive(Component)]
804+
/// pub struct Monster;
805+
///
806+
/// # impl Criminal { pub fn is_threat(&self, _: &Settlement) -> bool { true } }
807+
///
808+
/// use bevy_ecs::system::{Deferred, SystemBuffer, SystemMeta};
809+
///
810+
/// // Uses deferred mutations to allow signalling the alarm from multiple systems in parallel.
811+
/// #[derive(Resource, Default)]
812+
/// struct AlarmFlag(bool);
813+
///
814+
/// impl AlarmFlag {
815+
/// /// Sounds the alarm at the end of the current stage.
816+
/// pub fn flag(&mut self) {
817+
/// self.0 = true;
818+
/// }
819+
/// }
820+
///
821+
/// impl SystemBuffer for AlarmFlag {
822+
/// // When `AlarmFlag` is used in a system, this function will get
823+
/// // called at the end of the system's stage.
824+
/// fn apply(&mut self, system_meta: &SystemMeta, world: &mut World) {
825+
/// if self.0 {
826+
/// world.resource_mut::<Alarm>().0 = true;
827+
/// self.0 = false;
828+
/// }
829+
/// }
830+
/// }
831+
///
832+
/// // Sound the alarm if there are any criminals who pose a threat.
833+
/// fn alert_criminal(
834+
/// settlements: Query<&Settlement>,
835+
/// criminals: Query<&Criminal>,
836+
/// mut alarm: Deferred<AlarmFlag>
837+
/// ) {
838+
/// let settlement = settlements.single();
839+
/// for criminal in &criminals {
840+
/// // Only sound the alarm if the criminal is a threat.
841+
/// // For this example, assume that this check is expensive to run.
842+
/// // Since the majority of this system's run-time is dominated
843+
/// // by calling `is_threat()`, we defer sounding the alarm to
844+
/// // allow this system to run in parallel with other alarm systems.
845+
/// if criminal.is_threat(settlement) {
846+
/// alarm.flag();
847+
/// }
848+
/// }
849+
/// }
850+
///
851+
/// // Sound the alarm if there is a monster.
852+
/// fn alert_monster(
853+
/// monsters: Query<&Monster>,
854+
/// mut alarm: ResMut<Alarm>
855+
/// ) {
856+
/// if monsters.iter().next().is_some() {
857+
/// // Since this system does nothing except for sounding the alarm,
858+
/// // it would be pointless to defer it, so we sound the alarm directly.
859+
/// alarm.0 = true;
860+
/// }
861+
/// }
862+
///
863+
/// let mut world = World::new();
864+
/// world.init_resource::<Alarm>();
865+
/// world.spawn(Settlement {
866+
/// // ...
867+
/// });
868+
///
869+
/// let mut schedule = Schedule::new();
870+
/// schedule
871+
/// // These two systems have no conflicts and will run in parallel.
872+
/// .add_system(alert_criminal)
873+
/// .add_system(alert_monster);
874+
///
875+
/// // There are no criminals or monsters, so the alarm is not sounded.
876+
/// schedule.run(&mut world);
877+
/// assert_eq!(world.resource::<Alarm>().0, false);
878+
///
879+
/// // Spawn a monster, which will cause the alarm to be sounded.
880+
/// let m_id = world.spawn(Monster).id();
881+
/// schedule.run(&mut world);
882+
/// assert_eq!(world.resource::<Alarm>().0, true);
883+
///
884+
/// // Remove the monster and reset the alarm.
885+
/// world.entity_mut(m_id).despawn();
886+
/// world.resource_mut::<Alarm>().0 = false;
887+
///
888+
/// // Spawn a criminal, which will cause the alarm to be sounded.
889+
/// world.spawn(Criminal);
890+
/// schedule.run(&mut world);
891+
/// assert_eq!(world.resource::<Alarm>().0, true);
892+
/// ```
893+
pub struct Deferred<'a, T: SystemBuffer>(pub(crate) &'a mut T);
894+
895+
impl<'a, T: SystemBuffer> Deref for Deferred<'a, T> {
896+
type Target = T;
897+
#[inline]
898+
fn deref(&self) -> &Self::Target {
899+
self.0
900+
}
901+
}
902+
903+
impl<'a, T: SystemBuffer> DerefMut for Deferred<'a, T> {
904+
#[inline]
905+
fn deref_mut(&mut self) -> &mut Self::Target {
906+
self.0
907+
}
908+
}
909+
910+
// SAFETY: Only local state is accessed.
911+
unsafe impl<T: SystemBuffer> ReadOnlySystemParam for Deferred<'_, T> {}
912+
913+
// SAFETY: Only local state is accessed.
914+
unsafe impl<T: SystemBuffer> SystemParam for Deferred<'_, T> {
915+
type State = SyncCell<T>;
916+
type Item<'w, 's> = Deferred<'s, T>;
917+
918+
fn init_state(world: &mut World, _system_meta: &mut SystemMeta) -> Self::State {
919+
SyncCell::new(T::from_world(world))
920+
}
921+
922+
fn apply(state: &mut Self::State, system_meta: &SystemMeta, world: &mut World) {
923+
state.get().apply(system_meta, world);
924+
}
925+
926+
unsafe fn get_param<'w, 's>(
927+
state: &'s mut Self::State,
928+
_system_meta: &SystemMeta,
929+
_world: &'w World,
930+
_change_tick: u32,
931+
) -> Self::Item<'w, 's> {
932+
Deferred(state.get())
933+
}
934+
}
935+
790936
/// Shared borrow of a non-[`Send`] resource.
791937
///
792938
/// Only `Send` resources may be accessed with the [`Res`] [`SystemParam`]. In case that the

0 commit comments

Comments
 (0)