Skip to content

Conversation

@nightkr
Copy link
Contributor

@nightkr nightkr commented Nov 22, 2021

Based on kube-rs/kube#720, but with generalizations for runtime-agnosticity, and and pin-project-lite.

Not sure about whether this ultimately belongs here or in a utility crate, but figured it was worth raising the question regardless.

Based on kube-rs/kube#720, but with generalizations
for runtime-agnosticity, and and pin-project-lite.
Comment on lines +206 to +218
pub(crate) fn rt_sleeper() -> impl Sleeper {
AsyncStdSleeper
}

#[cfg(feature = "tokio")]
fn rt_sleeper() -> impl Sleeper {
pub(crate) fn rt_sleeper() -> impl Sleeper {
TokioSleeper
}

#[cfg(feature = "tokio")]
#[cfg_attr(docsrs, doc(cfg(feature = "tokio")))]

struct TokioSleeper;
pub(crate) struct TokioSleeper;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need to use this in stream

#[cfg(feature = "futures")]
#[cfg_attr(docsrs, doc(cfg(feature = "futures")))]
pub mod future;
#[cfg(feature = "futures")]
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Didn't create a new feature here, since this doesn't bring in any new (runtime) dependencies, but happy to change if prefer

tokio::time::pause();
let tick = Duration::from_secs(1);
let rx = stream::iter([Ok(0), Ok(1), Err(2), Ok(3), Ok(4)]);
let rx = StreamBackoff::new(rx, crate::backoff::Constant::new(tick), TokioSleeper);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need to use Tokio specifically for these marble tests, since async-std doesn't support time travel (and, regardless, Sleeper doesn't and probably shouldn't support that).

match this.state.as_mut().project() {
StateProj::BackingOff { mut backoff_sleep } => match backoff_sleep.as_mut().poll(cx) {
Poll::Ready(()) => {
// tracing::debug!(deadline = ?backoff_sleep.deadline(), "Backoff complete, waking up");
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think these trace messages can be valuable for debugging (since backoff is bound to look weird to the user), but I didn't want to bring in new dependencies for this. On the other hand, Tokio already brings in the dependency so I'm not sure it'd be a huge problem to add.

use tokio_1 as tokio;

#[tokio::test]
async fn stream_should_back_off() {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These tests depend on TokioSleeper which requires the tokio feature, so they'll fail to build with the feature set futures,!tokio. Do we want to always build TokioSleeper in tests?

}

/// Dynamic backoff policy that is still deterministic and testable
struct LinearBackoff {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might be worth promoting this to crate::backoff? Not sure I'd use it in production, but it's pretty nice to have a predictable and deterministic variant of ExponentialBackoff when writing marble tests.

///
/// If [`Backoff::next_backoff`] returns [`None`] then the backing stream is given up on, and closed.
#[cfg(any(feature = "tokio", feature = "async-std"))]
pub fn backoff<S: TryStream, B: Backoff>(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ideally this'd be an extension method on TryStream, but that isn't allowed for impl Trait return values.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant