Skip to content

Commit

Permalink
test(subscriber): add test for tasks being kept open
Browse files Browse the repository at this point in the history
In the Tokio instrumentation, a tracing span is created for each task
which is spawned. Since the new span is created within the context of
where `tokio::spawn()` (or similar) is called from, it gets a contextual
parent attached.

In tracing, when a span has a child span (either because the child was
created in the context of the parent, or because the parent was set
explicitly) then that span will not be closed until the child has
closed.

The result in the console subscriber is that a task which spawns another
task won't have a `dropped_at` time set until the spawned task exits,
even if the parent task exits much earlier. This causes Tokio Console to
show an incorrect lost waker warning (#345). It also affects other spans
that are entered when a task is spawned (#412).

The solution is to modify the instrumentation in Tokio so that task
spans are explicit roots (`parent: None`). This will be done as part of
enriching the Tokio instrumentation (tokio-rs/tokio#5792).

This change adds functionality to the test framework within
`console-subscriber` so that the state of a task can be set as an
expectation. The state is calculated based on 4 values:
* `console_api::tasks::Stats::dropped_at`
* `console_api::tasks::Stats::last_wake`
* `console_api::PollStats::last_poll_started`
* `console_api::PollStats::last_poll_ended`

It can then be tested that a task that spawns another task and then ends
actually goes to the `Completed` state, even if the spawned task is
still running. As of Tokio 1.33.0, this test fails, but the PR FIXME:TBD
fixes this and the test should pass from Tokio 1.34 onwards.
  • Loading branch information
hds committed Nov 17, 2023
1 parent 8269b5f commit 8b5dbe4
Show file tree
Hide file tree
Showing 4 changed files with 168 additions and 3 deletions.
50 changes: 49 additions & 1 deletion console-subscriber/tests/framework.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use futures::future;
use tokio::{task, time::sleep};

mod support;
use support::{assert_task, assert_tasks, ExpectedTask};
use support::{assert_task, assert_tasks, ExpectedTask, TaskState};

#[test]
fn expect_present() {
Expand Down Expand Up @@ -198,6 +198,54 @@ fn fail_polls() {
assert_task(expected_task, future);
}

#[test]
fn main_task_completes() {
let expected_task = ExpectedTask::default()
.match_default_name()
.expect_state(TaskState::Completed);

let future = async {};

assert_task(expected_task, future);
}

#[test]
#[should_panic(expected = "Test failed: Task validation failed:
- Task { name=task }: expected `state` to be Idle, but actual was Completed")]
fn fail_completed_task_is_idle() {
let expected_task = ExpectedTask::default()
.match_name("task".into())
.expect_state(TaskState::Idle);

let future = async {
_ = task::Builder::new()
.name("task")
.spawn(futures::future::ready(()))
.unwrap()
.await;
};

assert_task(expected_task, future);
}

#[test]
#[should_panic(expected = "Test failed: Task validation failed:
- Task { name=task }: expected `state` to be Completed, but actual was Idle")]
fn fail_idle_task_is_completed() {
let expected_task = ExpectedTask::default()
.match_name("task".into())
.expect_state(TaskState::Completed);

let future = async {
_ = task::Builder::new()
.name("task")
.spawn(futures::future::pending::<()>())
.unwrap();
};

assert_task(expected_task, future);
}

async fn yield_to_runtime() {
// There is a race condition that can occur when tests are run in parallel,
// caused by tokio-rs/tracing#2743. It tends to cause test failures only
Expand Down
27 changes: 26 additions & 1 deletion console-subscriber/tests/spawn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::time::Duration;
use tokio::time::sleep;

mod support;
use support::{assert_tasks, spawn_named, ExpectedTask};
use support::{assert_tasks, spawn_named, ExpectedTask, TaskState};

/// This test asserts the behavior that was fixed in #440. Before that fix,
/// the polls of a child were also counted towards the parent (the task which
Expand Down Expand Up @@ -34,3 +34,28 @@ fn child_polls_dont_count_towards_parent_polls() {

assert_tasks(expected_tasks, future);
}

/// This test asserts that the lifetime of a task is not affected by the
/// lifetimes of tasks that it spawns. The test will pass when #345 is
/// fixed.
#[test]
fn spawner_task_with_running_children_completes() {
let expected_tasks = vec![
ExpectedTask::default()
.match_name("parent".into())
.expect_state(TaskState::Completed),
ExpectedTask::default()
.match_name("child".into())
.expect_state(TaskState::Idle),
];

let future = async {
spawn_named("parent", async {
spawn_named("child", futures::future::pending::<()>());
})
.await
.expect("joining parent failed");
};

assert_tasks(expected_tasks, future);
}
2 changes: 2 additions & 0 deletions console-subscriber/tests/support/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ use subscriber::run_test;

pub(crate) use subscriber::MAIN_TASK_NAME;
pub(crate) use task::ExpectedTask;
#[allow(unused_imports)]
pub(crate) use task::TaskState;
use tokio::task::JoinHandle;

/// Assert that an `expected_task` is recorded by a console-subscriber
Expand Down
92 changes: 91 additions & 1 deletion console-subscriber/tests/support/task.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::{error, fmt};
use std::{error, fmt, time::SystemTime};

use console_api::tasks;
use prost_types::Timestamp;

use super::MAIN_TASK_NAME;

Expand All @@ -13,6 +14,7 @@ use super::MAIN_TASK_NAME;
pub(super) struct ActualTask {
pub(super) id: u64,
pub(super) name: Option<String>,
pub(super) state: Option<TaskState>,
pub(super) wakes: u64,
pub(super) self_wakes: u64,
pub(super) polls: u64,
Expand All @@ -23,6 +25,7 @@ impl ActualTask {
Self {
id,
name: None,
state: None,
wakes: 0,
self_wakes: 0,
polls: 0,
Expand All @@ -35,6 +38,59 @@ impl ActualTask {
if let Some(poll_stats) = &stats.poll_stats {
self.polls = poll_stats.polls;
}

self.state = calculate_task_state(stats);
}
}

/// The state of a task.
///
/// The task state is an amalgamation of a various fields. It is presented in
/// this way to make testing more straight forward.
#[derive(Clone, Debug, Eq, PartialEq)]
pub(crate) enum TaskState {
/// Task has completed.
///
/// Indicates that [`dropped_at`] has some value.
///
/// [`dropped_at`]: fn@tasks::Stats::dropped_at
Completed,
/// Task is being polled.
///
/// Indicates that the task is not [`Completed`] and the
/// [`last_poll_started`] time is later than [`last_poll_ended`] (or
/// [`last_poll_ended`] has not been set).
Running,
/// Task has been scheduled.
///
/// Indicates that the task is not [`Completed`] and the [`last_wake`] time
/// is later than [`last_poll_started`].
Scheduled,
/// Task is idle.
///
/// Indicates that the task is between polls.
Idle,
}

fn calculate_task_state(stats: &tasks::Stats) -> Option<TaskState> {
if stats.dropped_at.is_some() {
return Some(TaskState::Completed);
}

fn convert(ts: &Option<Timestamp>) -> Option<SystemTime> {
ts.as_ref().map(|v| v.clone().try_into().unwrap())
}
let poll_stats = stats.poll_stats.as_ref()?;
let last_poll_started = convert(&poll_stats.last_poll_started);
let last_poll_ended = convert(&poll_stats.last_poll_ended);
let last_wake = convert(&stats.last_wake);

if last_poll_started > last_poll_ended {
Some(TaskState::Running)
} else if last_wake > last_poll_started {
Some(TaskState::Scheduled)
} else {
Some(TaskState::Idle)
}
}

Expand Down Expand Up @@ -88,6 +144,7 @@ impl fmt::Debug for TaskValidationFailure {
pub(crate) struct ExpectedTask {
match_name: Option<String>,
expect_present: Option<bool>,
expect_state: Option<TaskState>,
expect_wakes: Option<u64>,
expect_self_wakes: Option<u64>,
expect_polls: Option<u64>,
Expand All @@ -98,6 +155,7 @@ impl Default for ExpectedTask {
Self {
match_name: None,
expect_present: None,
expect_state: None,
expect_wakes: None,
expect_self_wakes: None,
expect_polls: None,
Expand Down Expand Up @@ -147,6 +205,28 @@ impl ExpectedTask {
no_expectations = false;
}

if let Some(expected_state) = &self.expect_state {
no_expectations = false;
if let Some(actual_state) = &actual_task.state {
if expected_state != actual_state {
return Err(TaskValidationFailure {
expected: self.clone(),
actual: Some(actual_task.clone()),
failure: format!(
"{self}: expected `state` to be \
{expected_state:?}, but actual was \
{actual_state}",
actual_state = actual_task
.state
.as_ref()
.map(|s| format!("{:?}", s))
.unwrap_or("None".into()),
),
});
}
}
}

if let Some(expected_wakes) = self.expect_wakes {
no_expectations = false;
if expected_wakes != actual_task.wakes {
Expand Down Expand Up @@ -239,6 +319,16 @@ impl ExpectedTask {
self
}

/// Expects that a task has a specific [`TaskState`].
///
/// To validate, the actual task must be in this state at the time
/// the test ends and the validation is performed.
#[allow(dead_code)]
pub(crate) fn expect_state(mut self, state: TaskState) -> Self {
self.expect_state = Some(state);
self
}

/// Expects that a task has a specific value for `wakes`.
///
/// To validate, the actual task matching this expected task must have
Expand Down

0 comments on commit 8b5dbe4

Please sign in to comment.