Skip to content

Commit 4412be9

Browse files
committed
Add tests for the busy and dead cases which are hard to trigger without a buggy system
1 parent e584ad1 commit 4412be9

File tree

1 file changed

+111
-16
lines changed

1 file changed

+111
-16
lines changed

src/store/fs/util/entity_manager.rs

Lines changed: 111 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -364,25 +364,26 @@ mod main_actor {
364364
}
365365
}
366366

367+
#[must_use = "this function may return a future that must be spawned by the caller"]
367368
/// Friendly version of `spawn_boxed` that does the boxing
368-
pub async fn spawn<F, Fut>(&mut self, id: P::EntityId, f: F, tasks: &mut JoinSet<()>)
369+
pub async fn spawn<F, Fut>(
370+
&mut self,
371+
id: P::EntityId,
372+
f: F,
373+
) -> Option<impl Future<Output = ()> + Send + 'static>
369374
where
370375
F: FnOnce(SpawnArg<P>) -> Fut + Send + 'static,
371376
Fut: Future<Output = ()> + Send + 'static,
372377
{
373-
let task = self
374-
.spawn_boxed(
375-
id,
376-
Box::new(|x| {
377-
Box::pin(async move {
378-
f(x).await;
379-
})
380-
}),
381-
)
382-
.await;
383-
if let Some(task) = task {
384-
tasks.spawn(task);
385-
}
378+
self.spawn_boxed(
379+
id,
380+
Box::new(|x| {
381+
Box::pin(async move {
382+
f(x).await;
383+
})
384+
}),
385+
)
386+
.await
386387
}
387388

388389
#[must_use = "this function may return a future that must be spawned by the caller"]
@@ -627,7 +628,7 @@ mod main_actor {
627628
SelectOutcome::TaskDone(result) => {
628629
// Handle completed task
629630
if let Err(e) = result {
630-
eprintln!("Task failed: {e:?}");
631+
error!("Task failed: {e:?}");
631632
}
632633
}
633634
}
@@ -789,6 +790,7 @@ mod tests {
789790
use std::collections::HashMap;
790791

791792
use n0_future::{BufferedStreamExt, StreamExt};
793+
use testresult::TestResult;
792794

793795
use super::*;
794796

@@ -813,7 +815,10 @@ mod tests {
813815
//! global state from the entity state.
814816
use std::{
815817
collections::{HashMap, HashSet},
816-
sync::{Arc, Mutex},
818+
sync::{
819+
atomic::{AtomicUsize, Ordering},
820+
Arc, Mutex,
821+
},
817822
time::Instant,
818823
};
819824

@@ -972,6 +977,96 @@ mod tests {
972977
}
973978
}
974979
}
980+
981+
/// If a task is so busy that it can't drain it's inbox in time, we will
982+
/// get a SpawnArg::Busy instead of access to the actual state.
983+
///
984+
/// This will only happen if the system is seriously overloaded, since
985+
/// the entity actor just spawns tasks for each message. So here we
986+
/// simulate it by just not spawning the task as we are supposed to.
987+
#[tokio::test]
988+
async fn test_busy() -> TestResult<()> {
989+
let mut state = EntityManagerState::<Counters>::new(
990+
Arc::new(Mutex::new(Global::default())),
991+
1024,
992+
8,
993+
8,
994+
2,
995+
);
996+
let active = Arc::new(AtomicUsize::new(0));
997+
let busy = Arc::new(AtomicUsize::new(0));
998+
let inc = || {
999+
let active = active.clone();
1000+
let busy = busy.clone();
1001+
|arg: SpawnArg<Counters>| async move {
1002+
match arg {
1003+
SpawnArg::Active(_) => {
1004+
active.fetch_add(1, Ordering::SeqCst);
1005+
}
1006+
SpawnArg::Busy => {
1007+
busy.fetch_add(1, Ordering::SeqCst);
1008+
}
1009+
SpawnArg::Dead => {
1010+
println!("Entity actor is dead");
1011+
}
1012+
}
1013+
}
1014+
};
1015+
let fut1 = state.spawn(1, inc()).await;
1016+
assert!(fut1.is_some(), "First spawn should give us a task to spawn");
1017+
for _ in 0..9 {
1018+
let fut = state.spawn(1, inc()).await;
1019+
assert!(
1020+
fut.is_none(),
1021+
"Subsequent spawns should assume first task has been spawned"
1022+
);
1023+
}
1024+
assert_eq!(
1025+
active.load(Ordering::SeqCst),
1026+
0,
1027+
"Active should have never been called, since we did not spawn the task!"
1028+
);
1029+
assert_eq!(busy.load(Ordering::SeqCst), 2, "Busy should have been called two times, since we sent 10 msgs to a queue with capacity 8, and nobody is draining it");
1030+
Ok(())
1031+
}
1032+
1033+
/// If there is a panic in any of the fns that run on an entity actor,
1034+
/// the entire entity becomes dead. This can not be recovered from, and
1035+
/// trying to spawn a new task on the dead entity actor will result in
1036+
/// a SpawnArg::Dead.
1037+
#[tokio::test]
1038+
async fn test_dead() -> TestResult<()> {
1039+
let manager = EntityManager::<Counters>::new(
1040+
Arc::new(Mutex::new(Global::default())),
1041+
Options::default(),
1042+
);
1043+
let (tx, rx) = oneshot::channel();
1044+
let killer = |arg: SpawnArg<Counters>| async move {
1045+
match arg {
1046+
SpawnArg::Active(_) => {
1047+
tx.send(()).ok();
1048+
panic!("Panic to kill the task");
1049+
}
1050+
_ => {}
1051+
}
1052+
};
1053+
// spawn a task that kills the entity actor
1054+
manager.spawn(1, killer).await?;
1055+
rx.await.expect("Failed to receive kill confirmation");
1056+
let (tx, rx) = oneshot::channel();
1057+
let counter = |arg: SpawnArg<Counters>| async move {
1058+
match arg {
1059+
SpawnArg::Dead => {
1060+
tx.send(()).ok();
1061+
}
1062+
_ => {}
1063+
}
1064+
};
1065+
// // spawn another task on the - now dead - entity actor
1066+
manager.spawn(1, counter).await?;
1067+
rx.await.expect("Failed to receive dead confirmation");
1068+
Ok(())
1069+
}
9751070
}
9761071

9771072
mod fs {

0 commit comments

Comments
 (0)