From cae2628b8a53b029320bdd636af63fedded1d608 Mon Sep 17 00:00:00 2001 From: Phoebe Goldman Date: Wed, 2 Apr 2025 15:25:50 -0400 Subject: [PATCH 1/2] Rip Rayon out of reducer execution and query evaluation For testing/benchmarking purposes. One theory about our performance is that we're spending a lot of time context-switching between Tokio and Rayon threads. This build will be used in the first of a series of experiments to investigate that overhead. In this patch, we just do sequential evaluation on the Tokio worker threads where the rest of our code runs, instead of sending stuff to Rayon. Rayon use is mostly, but not entirely, removed from Core. The two next steps I am interested in: - Use parallelism, but on Tokio workers rather than Rayon threads. I.e. replace `par_iter`, `fold` and `reduce_with` calls with Tokio-isms, instead of this patch's `std::iter::Iterator` versions. - (Not discussed in meeting) continue using `par_iter` and friends, but invoke the "outer loop" from Tokio threads. I.e. retain use of `par_iter`, `fold` and `reduce_with`, but remove calls to `rayon::scope` or `rayon::spawn`. --- .../src/host/wasm_common/module_host_actor.rs | 6 ++-- crates/core/src/subscription/mod.rs | 4 +-- .../module_subscription_manager.rs | 33 ++++++++++--------- crates/core/src/subscription/subscription.rs | 5 ++- 4 files changed, 24 insertions(+), 24 deletions(-) diff --git a/crates/core/src/host/wasm_common/module_host_actor.rs b/crates/core/src/host/wasm_common/module_host_actor.rs index 6bb641fb352..53792c2ca68 100644 --- a/crates/core/src/host/wasm_common/module_host_actor.rs +++ b/crates/core/src/host/wasm_common/module_host_actor.rs @@ -462,9 +462,9 @@ impl WasmModuleInstance { ) .entered(); - // run the call_reducer call in rayon. it's important that we don't acquire a lock inside a rayon task, - // as that can lead to deadlock. - let (mut tx, result) = rayon::scope(|_| tx_slot.set(tx, || self.instance.call_reducer(op, budget))); + // FOR BENCHMARKING: Just run the reducer on whatever thread we're already on, + // instead of bouncing to a Rayon thread. + let (mut tx, result) = tx_slot.set(tx, || self.instance.call_reducer(op, budget)); let ExecuteResult { energy, diff --git a/crates/core/src/subscription/mod.rs b/crates/core/src/subscription/mod.rs index f9131433411..6324900c657 100644 --- a/crates/core/src/subscription/mod.rs +++ b/crates/core/src/subscription/mod.rs @@ -2,7 +2,6 @@ use std::sync::Arc; use anyhow::Result; use module_subscription_manager::Plan; -use rayon::iter::{IntoParallelRefIterator, ParallelIterator}; use spacetimedb_client_api_messages::websocket::{ ByteListLen, Compression, DatabaseUpdate, QueryUpdate, TableUpdate, WebsocketFormat, }; @@ -112,8 +111,9 @@ where Tx: Datastore + DeltaStore + Sync, F: WebsocketFormat, { + // FOR TESTING: Just evaluate sequentially. plans - .par_iter() + .iter() .map(|plan| (plan, plan.subscribed_table_id(), plan.subscribed_table_name())) .map(|(plan, table_id, table_name)| { plan.physical_plan() diff --git a/crates/core/src/subscription/module_subscription_manager.rs b/crates/core/src/subscription/module_subscription_manager.rs index 0a15db4b252..dbfd0174e5b 100644 --- a/crates/core/src/subscription/module_subscription_manager.rs +++ b/crates/core/src/subscription/module_subscription_manager.rs @@ -13,7 +13,6 @@ use crate::subscription::delta::eval_delta; use crate::subscription::record_exec_metrics; use hashbrown::hash_map::OccupiedError; use hashbrown::{HashMap, HashSet}; -use rayon::iter::{IntoParallelRefIterator, ParallelIterator}; use spacetimedb_client_api_messages::websocket::{ BsatnFormat, CompressableQueryUpdate, Compression, FormatSwitch, JsonFormat, QueryId, QueryUpdate, WebsocketFormat, }; @@ -463,8 +462,8 @@ impl SubscriptionManager { let tables = &event.status.database_update().unwrap().tables; - // Put the main work on a rayon compute thread. - rayon::scope(|_| { + // FOR TESTING: Just do all this work on whatever the calling thread is. + { let span = tracing::info_span!("eval_incr").entered(); let (updates, errs, metrics) = tables .iter() @@ -473,15 +472,16 @@ impl SubscriptionManager { .filter_map(|table_id| self.tables.get(table_id)) .flatten() .collect::>() - .par_iter() + // FOR TESTING: Sequential execution, rather than parallel. + .iter() .filter_map(|&hash| { self.queries .get(hash) .map(|state| (hash, &state.query, ExecutionMetrics::default())) }) - // If N clients are subscribed to a query, - // we copy the DatabaseTableUpdate N times, - // which involves cloning BSATN (binary) or product values (json). + // If N clients are subscribed to a query, + // we copy the DatabaseTableUpdate N times, + // which involves cloning BSATN (binary) or product values (json). .map(|(hash, plan, mut metrics)| { let table_id = plan.subscribed_table_id(); let table_name: Box = plan.subscribed_table_name().into(); @@ -561,7 +561,7 @@ impl SubscriptionManager { (updates, metrics) }) .fold( - || (vec![], vec![], ExecutionMetrics::default()), + (vec![], vec![], ExecutionMetrics::default()), |(mut rows, mut errs, mut agg_metrics), (result, metrics)| { match result { Ok(x) => { @@ -575,13 +575,14 @@ impl SubscriptionManager { (rows, errs, agg_metrics) }, ) - .reduce_with(|(mut acc_rows, mut acc_errs, mut acc_metrics), (rows, errs, metrics)| { - acc_rows.extend(rows); - acc_errs.extend(errs); - acc_metrics.merge(metrics); - (acc_rows, acc_errs, acc_metrics) - }) - .unwrap_or_default(); + // .reduce_with(|(mut acc_rows, mut acc_errs, mut acc_metrics), (rows, errs, metrics)| { + // acc_rows.extend(rows); + // acc_errs.extend(errs); + // acc_metrics.merge(metrics); + // (acc_rows, acc_errs, acc_metrics) + // }) + // .unwrap_or_default() + ; record_exec_metrics(&WorkloadType::Update, database_identity, metrics); @@ -685,7 +686,7 @@ impl SubscriptionManager { ); } } - }) + } } } diff --git a/crates/core/src/subscription/subscription.rs b/crates/core/src/subscription/subscription.rs index c904a6d6482..65151908619 100644 --- a/crates/core/src/subscription/subscription.rs +++ b/crates/core/src/subscription/subscription.rs @@ -32,7 +32,6 @@ use crate::sql::ast::SchemaViewer; use crate::vm::{build_query, TxMode}; use anyhow::Context; use itertools::Either; -use rayon::iter::{IntoParallelRefIterator, ParallelIterator}; use spacetimedb_client_api_messages::websocket::{Compression, WebsocketFormat}; use spacetimedb_data_structures::map::HashSet; use spacetimedb_lib::db::auth::{StAccess, StTableType}; @@ -519,11 +518,11 @@ impl ExecutionSet { slow_query_threshold: Option, compression: Compression, ) -> ws::DatabaseUpdate { - // evaluate each of the execution units in this ExecutionSet in parallel + // FOR TESTING: Just do sequential execution. let tables = self .exec_units // if you need eval to run single-threaded for debugging, change this to .iter() - .par_iter() + .iter() .filter_map(|unit| unit.eval(db, tx, &unit.sql, slow_query_threshold, compression)) .collect(); ws::DatabaseUpdate { tables } From f28dc30a5608ab61f958da5662ff57b461aec926 Mon Sep 17 00:00:00 2001 From: Phoebe Goldman Date: Fri, 4 Apr 2025 10:03:09 -0400 Subject: [PATCH 2/2] Second experiment: use `par_iter`, but not `scope` or `spawn` This experiment is intended to test the theory that our synchronization overhead is coming from moving work off the "main thread" to Rayon workers via `scope` and `spawn`, not from `par_iter` parallel iteration. If that is the case, we should see synchronization overhead (i.e. `futex_wait` time) decreased similar to the previous experiment relavite to the control, but also see improved CPU utilization across multiple cores. --- crates/core/src/subscription/mod.rs | 4 +-- .../module_subscription_manager.rs | 30 +++++++++---------- crates/core/src/subscription/subscription.rs | 4 +-- 3 files changed, 19 insertions(+), 19 deletions(-) diff --git a/crates/core/src/subscription/mod.rs b/crates/core/src/subscription/mod.rs index 6324900c657..f9131433411 100644 --- a/crates/core/src/subscription/mod.rs +++ b/crates/core/src/subscription/mod.rs @@ -2,6 +2,7 @@ use std::sync::Arc; use anyhow::Result; use module_subscription_manager::Plan; +use rayon::iter::{IntoParallelRefIterator, ParallelIterator}; use spacetimedb_client_api_messages::websocket::{ ByteListLen, Compression, DatabaseUpdate, QueryUpdate, TableUpdate, WebsocketFormat, }; @@ -111,9 +112,8 @@ where Tx: Datastore + DeltaStore + Sync, F: WebsocketFormat, { - // FOR TESTING: Just evaluate sequentially. plans - .iter() + .par_iter() .map(|plan| (plan, plan.subscribed_table_id(), plan.subscribed_table_name())) .map(|(plan, table_id, table_name)| { plan.physical_plan() diff --git a/crates/core/src/subscription/module_subscription_manager.rs b/crates/core/src/subscription/module_subscription_manager.rs index dbfd0174e5b..c88862f668f 100644 --- a/crates/core/src/subscription/module_subscription_manager.rs +++ b/crates/core/src/subscription/module_subscription_manager.rs @@ -13,6 +13,7 @@ use crate::subscription::delta::eval_delta; use crate::subscription::record_exec_metrics; use hashbrown::hash_map::OccupiedError; use hashbrown::{HashMap, HashSet}; +use rayon::iter::{IntoParallelRefIterator, ParallelIterator}; use spacetimedb_client_api_messages::websocket::{ BsatnFormat, CompressableQueryUpdate, Compression, FormatSwitch, JsonFormat, QueryId, QueryUpdate, WebsocketFormat, }; @@ -462,7 +463,8 @@ impl SubscriptionManager { let tables = &event.status.database_update().unwrap().tables; - // FOR TESTING: Just do all this work on whatever the calling thread is. + // FOR TESTING: Don't put the main work on a rayon compute thread. + // Do still use `par_iter` to parallelize. { let span = tracing::info_span!("eval_incr").entered(); let (updates, errs, metrics) = tables @@ -472,16 +474,15 @@ impl SubscriptionManager { .filter_map(|table_id| self.tables.get(table_id)) .flatten() .collect::>() - // FOR TESTING: Sequential execution, rather than parallel. - .iter() + .par_iter() .filter_map(|&hash| { self.queries .get(hash) .map(|state| (hash, &state.query, ExecutionMetrics::default())) }) - // If N clients are subscribed to a query, - // we copy the DatabaseTableUpdate N times, - // which involves cloning BSATN (binary) or product values (json). + // If N clients are subscribed to a query, + // we copy the DatabaseTableUpdate N times, + // which involves cloning BSATN (binary) or product values (json). .map(|(hash, plan, mut metrics)| { let table_id = plan.subscribed_table_id(); let table_name: Box = plan.subscribed_table_name().into(); @@ -561,7 +562,7 @@ impl SubscriptionManager { (updates, metrics) }) .fold( - (vec![], vec![], ExecutionMetrics::default()), + || (vec![], vec![], ExecutionMetrics::default()), |(mut rows, mut errs, mut agg_metrics), (result, metrics)| { match result { Ok(x) => { @@ -575,14 +576,13 @@ impl SubscriptionManager { (rows, errs, agg_metrics) }, ) - // .reduce_with(|(mut acc_rows, mut acc_errs, mut acc_metrics), (rows, errs, metrics)| { - // acc_rows.extend(rows); - // acc_errs.extend(errs); - // acc_metrics.merge(metrics); - // (acc_rows, acc_errs, acc_metrics) - // }) - // .unwrap_or_default() - ; + .reduce_with(|(mut acc_rows, mut acc_errs, mut acc_metrics), (rows, errs, metrics)| { + acc_rows.extend(rows); + acc_errs.extend(errs); + acc_metrics.merge(metrics); + (acc_rows, acc_errs, acc_metrics) + }) + .unwrap_or_default(); record_exec_metrics(&WorkloadType::Update, database_identity, metrics); diff --git a/crates/core/src/subscription/subscription.rs b/crates/core/src/subscription/subscription.rs index 65151908619..d4f1b9c5801 100644 --- a/crates/core/src/subscription/subscription.rs +++ b/crates/core/src/subscription/subscription.rs @@ -32,6 +32,7 @@ use crate::sql::ast::SchemaViewer; use crate::vm::{build_query, TxMode}; use anyhow::Context; use itertools::Either; +use rayon::iter::{IntoParallelRefIterator, ParallelIterator}; use spacetimedb_client_api_messages::websocket::{Compression, WebsocketFormat}; use spacetimedb_data_structures::map::HashSet; use spacetimedb_lib::db::auth::{StAccess, StTableType}; @@ -518,11 +519,10 @@ impl ExecutionSet { slow_query_threshold: Option, compression: Compression, ) -> ws::DatabaseUpdate { - // FOR TESTING: Just do sequential execution. let tables = self .exec_units // if you need eval to run single-threaded for debugging, change this to .iter() - .iter() + .par_iter() .filter_map(|unit| unit.eval(db, tx, &unit.sql, slow_query_threshold, compression)) .collect(); ws::DatabaseUpdate { tables }