Skip to content

Commit

Permalink
Make sure we can reach the user's requested FVM concurrency
Browse files Browse the repository at this point in the history
We previously used the FVM's `ThreadedExecutor` to execute messages on
separate threads because the FVM requires 64MiB of stack space.

1. The FVM v3 supported for 8 concurrent threads.
2. The FVM v4 supports up to the number of CPU threads available.

Unfortunately, neither version was influenced by the
`LOTUS_FVM_CONCURRENCY` environment variable.

This patch fixes this by:

1. Moving the thread-pool to the FFI itself (sharing it between FVM
versions).
2. Setting the thread-pool size equal to `LOTUS_FVM_CONCURRENCY`.

It also defaults `LOTUS_FVM_CONCURRENCY` to the number of available
CPU threads instead of the previous 4.

fixes filecoin-project/lotus#11817
  • Loading branch information
Stebalien committed Apr 6, 2024
1 parent b715c94 commit fa130da
Show file tree
Hide file tree
Showing 4 changed files with 80 additions and 66 deletions.
1 change: 1 addition & 0 deletions rust/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion rust/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -46,13 +46,14 @@ serde = "1.0.117"
serde_tuple = "0.5"
safer-ffi = { version = "0.0.7", features = ["proc_macros"] }
filecoin-proofs-api = { version = "16.1", default-features = false }
yastl = "0.1.2"

[dev-dependencies]
memmap2 = "0.5"
tempfile = "3.0.8"

[features]
default = ["cuda", "multicore-sdr" ]
default = ["cuda", "multicore-sdr"]
blst-portable = ["bls-signatures/blst-portable", "blstrs/portable"]
cuda = ["filecoin-proofs-api/cuda", "rust-gpu-tools/cuda", "fvm2/cuda", "fvm3/cuda", "fvm4/cuda"]
cuda-supraseal = ["filecoin-proofs-api/cuda-supraseal", "rust-gpu-tools/cuda", "fvm3/cuda-supraseal", "fvm4/cuda-supraseal"]
Expand Down
73 changes: 10 additions & 63 deletions rust/src/fvm/engine.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use std::collections::HashMap;
use std::ops::RangeInclusive;
use std::sync::{Arc, Mutex};

use anyhow::anyhow;
Expand All @@ -18,7 +17,7 @@ use super::externs::CgoExterns;
use super::types::*;

// Generic executor; uses the current (v3) engine types
pub trait CgoExecutor {
pub trait CgoExecutor: Send {
fn execute_message(
&mut self,
msg: Message,
Expand Down Expand Up @@ -65,9 +64,6 @@ pub struct MultiEngineContainer {
engines: Mutex<HashMap<EngineVersion, Arc<dyn AbstractMultiEngine + 'static>>>,
}

const LOTUS_FVM_CONCURRENCY_ENV_NAME: &str = "LOTUS_FVM_CONCURRENCY";
const VALID_CONCURRENCY_RANGE: RangeInclusive<u32> = 1..=128;

impl TryFrom<u32> for EngineVersion {
type Error = anyhow::Error;
fn try_from(value: u32) -> Result<Self, Self::Error> {
Expand All @@ -81,41 +77,6 @@ impl TryFrom<u32> for EngineVersion {
}

impl MultiEngineContainer {
/// Constructs a new multi-engine container with the default concurrency (4).
pub fn new() -> MultiEngineContainer {
Self::with_concurrency(4)
}

/// Constructs a new multi-engine container with the concurrency specified in the
/// `LOTUS_FVM_CONCURRENCY` environment variable.
pub fn new_env() -> MultiEngineContainer {
let valosstr = match std::env::var_os(LOTUS_FVM_CONCURRENCY_ENV_NAME) {
Some(v) => v,
None => return Self::new(),
};
let valstr = match valosstr.to_str() {
Some(s) => s,
None => {
log::error!("{LOTUS_FVM_CONCURRENCY_ENV_NAME} has invalid value");
return Self::new();
}
};
let concurrency: u32 = match valstr.parse() {
Ok(v) => v,
Err(e) => {
log::error!("{LOTUS_FVM_CONCURRENCY_ENV_NAME} has invalid value: {e}");
return Self::new();
}
};
if !VALID_CONCURRENCY_RANGE.contains(&concurrency) {
log::error!(
"{LOTUS_FVM_CONCURRENCY_ENV_NAME} must be in the range {VALID_CONCURRENCY_RANGE:?}, not {concurrency}"
);
return Self::new();
}
Self::with_concurrency(concurrency)
}

pub fn with_concurrency(concurrency: u32) -> MultiEngineContainer {
MultiEngineContainer {
engines: Mutex::new(HashMap::new()),
Expand Down Expand Up @@ -146,12 +107,6 @@ impl MultiEngineContainer {
}
}

impl Default for MultiEngineContainer {
fn default() -> MultiEngineContainer {
MultiEngineContainer::new()
}
}

// fvm v4 implementation
mod v4 {
use anyhow::anyhow;
Expand All @@ -160,10 +115,7 @@ mod v4 {

use fvm4::call_manager::DefaultCallManager as DefaultCallManager4;
use fvm4::engine::{EnginePool as EnginePool4, MultiEngine as MultiEngine4};
use fvm4::executor::{
ApplyKind, ApplyRet, DefaultExecutor as DefaultExecutor4,
ThreadedExecutor as ThreadedExecutor4,
};
use fvm4::executor::{ApplyKind, ApplyRet, DefaultExecutor as DefaultExecutor4};
use fvm4::kernel::filecoin::DefaultFilecoinKernel as DefaultFilecoinKernel4;
use fvm4::machine::{DefaultMachine as DefaultMachine4, NetworkConfig};
use fvm4_shared::{chainid::ChainID, clock::ChainEpoch, message::Message};
Expand All @@ -175,14 +127,13 @@ mod v4 {
use super::Config;

type CgoMachine4 = DefaultMachine4<CgoBlockstore, CgoExterns>;
type BaseExecutor4 = DefaultExecutor4<DefaultFilecoinKernel4<DefaultCallManager4<CgoMachine4>>>;
type CgoExecutor4 = ThreadedExecutor4<BaseExecutor4>;
type CgoExecutor4 = DefaultExecutor4<DefaultFilecoinKernel4<DefaultCallManager4<CgoMachine4>>>;

fn new_executor(
engine_pool: EnginePool4,
machine: CgoMachine4,
) -> anyhow::Result<CgoExecutor4> {
Ok(ThreadedExecutor4(BaseExecutor4::new(engine_pool, machine)?))
Ok(CgoExecutor4::new(engine_pool, machine)?)
}

impl CgoExecutor for CgoExecutor4 {
Expand Down Expand Up @@ -254,8 +205,7 @@ mod v3 {
};
use fvm3::engine::{EnginePool as EnginePool3, MultiEngine as MultiEngine3};
use fvm3::executor::{
ApplyFailure as ApplyFailure3, ApplyKind as ApplyKind3,
DefaultExecutor as DefaultExecutor3, ThreadedExecutor as ThreadedExecutor3,
ApplyFailure as ApplyFailure3, ApplyKind as ApplyKind3, DefaultExecutor as DefaultExecutor3,
};
use fvm3::machine::{DefaultMachine as DefaultMachine3, NetworkConfig as NetworkConfig3};
use fvm3::trace::ExecutionEvent as ExecutionEvent3;
Expand Down Expand Up @@ -284,14 +234,13 @@ mod v3 {
use super::Config;

type CgoMachine3 = DefaultMachine3<CgoBlockstore, CgoExterns>;
type BaseExecutor3 = DefaultExecutor3<DefaultKernel3<DefaultCallManager3<CgoMachine3>>>;
type CgoExecutor3 = ThreadedExecutor3<BaseExecutor3>;
type CgoExecutor3 = DefaultExecutor3<DefaultKernel3<DefaultCallManager3<CgoMachine3>>>;

fn new_executor(
engine_pool: EnginePool3,
machine: CgoMachine3,
) -> anyhow::Result<CgoExecutor3> {
Ok(ThreadedExecutor3(BaseExecutor3::new(engine_pool, machine)?))
Ok(CgoExecutor3::new(engine_pool, machine)?)
}

impl CgoExecutor for CgoExecutor3 {
Expand Down Expand Up @@ -533,8 +482,7 @@ mod v2 {
backtrace::Cause as Cause2, DefaultCallManager as DefaultCallManager2,
};
use fvm2::executor::{
ApplyFailure as ApplyFailure2, ApplyKind as ApplyKind2,
DefaultExecutor as DefaultExecutor2, ThreadedExecutor as ThreadedExecutor2,
ApplyFailure as ApplyFailure2, ApplyKind as ApplyKind2, DefaultExecutor as DefaultExecutor2,
};
use fvm2::machine::{
DefaultMachine as DefaultMachine2, MultiEngine as MultiEngine2,
Expand Down Expand Up @@ -565,11 +513,10 @@ mod v2 {
use super::Config;

type CgoMachine2 = DefaultMachine2<CgoBlockstore, CgoExterns>;
type BaseExecutor2 = DefaultExecutor2<DefaultKernel2<DefaultCallManager2<CgoMachine2>>>;
type CgoExecutor2 = ThreadedExecutor2<BaseExecutor2>;
type CgoExecutor2 = DefaultExecutor2<DefaultKernel2<DefaultCallManager2<CgoMachine2>>>;

fn new_executor(machine: CgoMachine2) -> CgoExecutor2 {
ThreadedExecutor2(BaseExecutor2::new(machine))
CgoExecutor2::new(machine)
}

fn bytes_to_block(bytes: RawBytes) -> Option<IpldBlock> {
Expand Down
69 changes: 67 additions & 2 deletions rust/src/fvm/machine.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::borrow::Cow;
use std::convert::{TryFrom, TryInto};
use std::ops::RangeInclusive;

use anyhow::{anyhow, Context};
use cid::Cid;
Expand Down Expand Up @@ -27,8 +28,54 @@ use super::types::*;
use crate::destructor;
use crate::util::types::{catch_panic_response, catch_panic_response_no_default, Result};

const STACK_SIZE: usize = 64 << 20; // 20MiB

lazy_static! {
static ref ENGINES: MultiEngineContainer = MultiEngineContainer::new_env();
static ref CONCURRENCY: u32 = get_concurrency();
static ref ENGINES: MultiEngineContainer = MultiEngineContainer::with_concurrency(*CONCURRENCY);
static ref THREAD_POOL: yastl::Pool = yastl::Pool::with_config(
*CONCURRENCY as usize,
yastl::ThreadConfig::new()
.prefix("fvm")
.stack_size(STACK_SIZE)
);
}

const LOTUS_FVM_CONCURRENCY_ENV_NAME: &str = "LOTUS_FVM_CONCURRENCY";
const VALID_CONCURRENCY_RANGE: RangeInclusive<u32> = 1..=256;

fn available_parallelism() -> u32 {
std::thread::available_parallelism()
.map(usize::from)
.unwrap_or(8) as u32
}

fn get_concurrency() -> u32 {
let valosstr = match std::env::var_os(LOTUS_FVM_CONCURRENCY_ENV_NAME) {
Some(v) => v,
None => return available_parallelism(),
};
let valstr = match valosstr.to_str() {
Some(s) => s,
None => {
log::error!("{LOTUS_FVM_CONCURRENCY_ENV_NAME} has invalid value");
return available_parallelism();
}
};
let concurrency: u32 = match valstr.parse() {
Ok(v) => v,
Err(e) => {
log::error!("{LOTUS_FVM_CONCURRENCY_ENV_NAME} has invalid value: {e}");
return available_parallelism();
}
};
if !VALID_CONCURRENCY_RANGE.contains(&concurrency) {
log::error!(
"{LOTUS_FVM_CONCURRENCY_ENV_NAME} must be in the range {VALID_CONCURRENCY_RANGE:?}, not {concurrency}"
);
return available_parallelism();
}
concurrency
}

#[allow(clippy::too_many_arguments)]
Expand Down Expand Up @@ -181,14 +228,32 @@ fn create_fvm_debug_machine(
)
}

fn with_new_stack<F, T>(name: &str, pool: &yastl::Pool, callback: F) -> repr_c::Box<Result<T>>
where
T: Sized + Default + Send,
F: FnOnce() -> anyhow::Result<T> + std::panic::UnwindSafe + Send,
{
let mut res = None;
pool.scoped(|scope| scope.execute(|| res = Some(catch_panic_response(name, callback))));

res.unwrap_or_else(|| {
repr_c::Box::new(Result::err(
format!("failed to schedule {name}")
.into_bytes()
.into_boxed_slice(),
))
})
}

#[ffi_export]
fn fvm_machine_execute_message(
executor: &'_ InnerFvmMachine,
message: c_slice::Ref<u8>,
chain_len: u64,
apply_kind: u64, /* 0: Explicit, _: Implicit */
) -> repr_c::Box<Result<FvmMachineExecuteResponse>> {
catch_panic_response("fvm_machine_execute_message", || {
// Execute in the thread-pool because we need a 64MiB stack.
with_new_stack("fvm_machine_execute_message", &THREAD_POOL, || {
let apply_kind = if apply_kind == 0 {
ApplyKind::Explicit
} else {
Expand Down

0 comments on commit fa130da

Please sign in to comment.