Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ bincode = "1.3"
byte-unit = "5.1.6"
bytes = { version = "1", features = ["serde"] }
camino = { version = "1.1", features = ["serde1"] }
camino-tempfile = "1.4.1"
cfg-if = { version = "1" }
chrono = { version = "0.4", features = [ "serde" ] }
clap = { version = "4.5", features = ["derive", "env", "wrap_help"] }
Expand Down
7 changes: 5 additions & 2 deletions agent-antagonist/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,19 +7,22 @@ edition = "2021"

[dependencies]
anyhow.workspace = true
camino-tempfile.workspace = true
clap.workspace = true
crucible-agent-client.workspace = true
crucible-common.workspace = true
crucible-downstairs.workspace = true
crucible-workspace-hack.workspace = true
futures-core.workspace = true
futures.workspace = true
rand.workspace = true
repair-client.workspace = true
reqwest.workspace = true
serde.workspace = true
signal-hook-tokio.workspace = true
signal-hook.workspace = true
slog.workspace = true
slog-term.workspace = true
slog-bunyan.workspace = true
slog-term.workspace = true
slog.workspace = true
tokio.workspace = true
uuid.workspace = true
249 changes: 248 additions & 1 deletion agent-antagonist/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,15 @@ use anyhow::bail;
use anyhow::Result;
use clap::Parser;
use crucible_common::build_logger;
use crucible_common::mkdir_for_file;
use futures::StreamExt;
use rand::random;
use signal_hook::consts::signal::*;
use signal_hook_tokio::Signals;
use slog::Logger;
use slog::{info, warn};
use std::fs::File;
use std::io::Write;
use std::net::SocketAddr;
use std::process::Command;
use std::sync::{
Expand All @@ -16,8 +19,17 @@ use std::sync::{
};
use uuid::Uuid;

use rand::prelude::*;
use repair_client::types::FileType;
use repair_client::Client as RepairClient;

use crucible_common::ExtentId;
use crucible_common::RegionDefinition;
use crucible_downstairs::extent_path;
use crucible_downstairs::Extent;

use crucible_agent_client::{
types::{CreateRegion, RegionId, State as RegionState},
types::{CreateRegion, Region, RegionId, State as RegionState},
Client as CrucibleAgentClient,
};

Expand Down Expand Up @@ -60,6 +72,56 @@ enum Args {
#[clap(short, long)]
tasks: usize,
},

/// In a loop: pick a random region, read a random extent using the repair
/// API, and validate that it does not begin with zeroes. Note that this
/// will show a false positive if that extent's beginning has never been
/// written to.
AgentRepairTest {
/// Address of the crucible agent
#[clap(short, long)]
agent: SocketAddr,

/// Allow reading extents from a read-write downstairs that may not be
/// closed.
#[clap(short, long)]
allow_read_write: bool,

/// How many bytes to check if they're all zeroes
#[clap(long, default_value_t = 32768)]
verify_size: usize,

/// Use Extent::validate
#[clap(long)]
validate: bool,
},

/// In a loop, pick a random extent for a region, read that extent using the
/// repair API, and validate it does not begin with zeroes. Note that this
/// will show a false positive if that extent's beginning has never been
/// written to.
RepairTest {
/// Address of the crucible agent
#[clap(short, long)]
agent: SocketAddr,

/// Region id
#[clap(short, long)]
region_id: Uuid,

/// Allow reading extents from a read-write downstairs that may not be
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems fraught with peril. Can we expect this data to be good?

Or, is this more allowing us to connect to a downstairs that is serving a RW region, but that downstairs not actually taking live IO from an upstairs?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, we can't expect it to be good no, if it's accepting IO - but in cases where it's RW downstairs that's not currently doing anything, we should expect the data to be good.

/// closed.
#[clap(short, long)]
allow_read_write: bool,

/// How many bytes to check if they're all zeroes
#[clap(long, default_value_t = 32768)]
verify_size: usize,

/// Use Extent::validate
#[clap(long)]
validate: bool,
},
}

fn command(log: &Logger, bin: &'static str, args: &[&str]) -> Result<String> {
Expand Down Expand Up @@ -718,6 +780,99 @@ async fn signal_thread(log: Logger, stop_flag: Arc<AtomicBool>) {
}
}

async fn repair_test(
client: &reqwest::Client,
region_id: Uuid,
region_addr: SocketAddr,
verify_size: usize,
validate: bool,
) -> Result<()> {
let mut rng = rand::rng();

// Build a repair client
let repair_client = RepairClient::new_with_client(
&format!("http://{}", region_addr),
client.clone(),
);

// get a random extent
let def: RegionDefinition =
repair_client.get_region_info().await?.into_inner();
let eid = rng.random_range(0..def.extent_count());

let mut stream: repair_client::ByteStream = repair_client
.get_extent_file(eid, FileType::Data)
.await?
.into_inner();

if validate {
// Stream the whole extent to a tmp file, then use Extent::validate

let log = build_logger();

let mut tmpdir = camino_tempfile::tempdir().unwrap();
let base_path = tmpdir.path();
info!(log, "base path is {base_path}");
let extent_path =
extent_path(base_path.as_std_path(), ExtentId::from(eid));
info!(log, "extent path is {extent_path:?}");

mkdir_for_file(&extent_path)?;

let mut file = File::create(&extent_path)?;

while let Some(chunk) = stream.next().await {
let chunk = chunk?;
file.write_all(&chunk)?;
}

drop(file);

info!(log, "opening extent {eid} at {base_path}");

let extent = Extent::open(
base_path.as_std_path(),
&def,
ExtentId::from(eid),
true, // read-only
&log,
)?;

if let Err(e) = extent.validate() {
tmpdir.disable_cleanup(true);
return Err(e.into());
}
} else {
// Read just the beginning and look for a string of zeroes
let blank: Vec<u8> = vec![0u8; verify_size];
let mut start = vec![1u8; verify_size];
let mut i = 0;

while let Some(chunk) = stream.next().await {
let chunk = chunk?;
if i >= verify_size {
break;
}
let n = std::cmp::min(chunk.len(), verify_size - i);
start[i..(i + n)].copy_from_slice(&chunk[0..n]);
i += n;
}

if i < verify_size {
bail!("region {} extent {} truncated read!", region_id, eid);
}

if start == blank {
eprintln!(
"region {} extent {} returned with a blank beginning!",
region_id, eid,
);
}
}

Ok(())
}

#[allow(
clippy::disallowed_macros,
reason = "using `#[tokio::main]` in tests is fine"
Expand Down Expand Up @@ -838,6 +993,98 @@ async fn main() -> Result<()> {
info!(log, "All tasks have ended");
info!(log, "Summary: {:?}", result_summary);
}

Args::AgentRepairTest {
agent,
allow_read_write,
verify_size,
validate,
} => {
let mut rng = rand::rng();

let client = get_client(&agent);

let regions: Vec<Region> = client.region_list().await?.into_inner();

println!("{} total regions", regions.len());

let regions: Vec<Region> = regions
.into_iter()
.filter(|r| r.state == RegionState::Created)
.filter(|r| r.read_only || allow_read_write)
.collect();

println!("{} candidate regions", regions.len());

let dur = std::time::Duration::from_secs(60);

let client = reqwest::ClientBuilder::new()
.connect_timeout(dur)
.build()
.unwrap();

loop {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will just run forever, or until you control-C it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yep!

let region: &_ = regions.choose(&mut rng).unwrap();

let mut region_addr: SocketAddr = agent;
region_addr.set_port(
region.port_number + crucible_common::REPAIR_PORT_OFFSET,
);

repair_test(
&client,
region.id.0.parse().unwrap(),
region_addr,
verify_size,
validate,
)
.await?;
}
}

Args::RepairTest {
agent,
region_id,
allow_read_write,
verify_size,
validate,
} => {
let client = get_client(&agent);

let region: Region = client
.region_get(&RegionId(region_id.to_string()))
.await?
.into_inner();

if !allow_read_write && !region.read_only {
eprintln!("region is read-write");
return Ok(());
}

let mut region_addr: SocketAddr = agent;
region_addr.set_port(
region.port_number + crucible_common::REPAIR_PORT_OFFSET,
);

let dur = std::time::Duration::from_secs(60);

let client = reqwest::ClientBuilder::new()
.connect_timeout(dur)
.build()
.unwrap();

loop {
repair_test(
&client,
region_id,
region_addr,
verify_size,
validate,
)
.await?;
}
}
}

Ok(())
}
2 changes: 2 additions & 0 deletions downstairs/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ use region::Region;
pub use admin::run_dropshot;
pub use dump::{dump_region, verify_region};
pub use dynamometer::*;
pub use extent::extent_path;
pub use extent::Extent;
pub use stats::{DsCountStat, DsStatOuter};

/// Single IO operation
Expand Down