Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 23 additions & 9 deletions downstairs/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use rand::prelude::*;
use slog::{debug, error, info, o, warn, Logger};
use tokio::net::TcpListener;
use tokio::sync::{mpsc, oneshot};
use tokio::time::Instant;
use tokio::time::{sleep, Instant};
use tokio_util::codec::FramedRead;
use uuid::Uuid;

Expand Down Expand Up @@ -1469,14 +1469,28 @@ impl ActiveConnection {
.get_next(&mut stop_at)
.or_else(|| retry.pop_front())
{
if flags.lossy && rand::random_bool(0.25) {
// Skip a job that needs to be done, moving it to the back of
// the list. This exercises job dependency tracking in the face
// of arbitrary reordering.
info!(self.log, "[lossy] skipping {}", ds_id);
retry.push_back((ds_id, work));
continue;
} else if let Some((ds_id, work)) = self
if flags.lossy {
// First, mimic the old `random() && random()` skip probability (~25%).
if rand::random_bool(0.25) {
// Move the job to the back to exercise dependency tracking in the
// face of arbitrary reordering.
info!(self.log, "[lossy] skipping {}", ds_id);
retry.push_back((ds_id, work));
continue;
}

// If we didn't skip, give ourselves another 25% chance to delay.
if rand::random_bool(0.25) {
let delay_ms = rand::random_range(50..=250);
debug!(
self.log,
"[lossy] delaying {} by {}ms", ds_id, delay_ms
);
sleep(Duration::from_millis(delay_ms as u64)).await;
}
}

if let Some((ds_id, work)) = self
.do_work(ds_id, work, flags, reqwest_client, dss, region)
.await?
{
Expand Down