Skip to content

Commit 2f33924

Browse files
Prune queries from subscription evaluation (#2641)
1 parent 7a6d59c commit 2f33924

File tree

6 files changed

+556
-36
lines changed

6 files changed

+556
-36
lines changed

crates/core/src/host/wasm_common/module_host_actor.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -583,7 +583,7 @@ impl<T: WasmInstance> WasmModuleInstance<T> {
583583
request_id,
584584
timer,
585585
};
586-
let event = match self
586+
let (event, _) = match self
587587
.info
588588
.subscriptions
589589
.commit_and_broadcast_event(client.as_deref(), event, tx)

crates/core/src/subscription/execution_unit.rs

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ use std::time::Duration;
3737
/// as is the case for incremental joins.
3838
/// And we want to associate a hash with the entire unit of execution,
3939
/// rather than an individual plan.
40-
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
40+
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord)]
4141
pub struct QueryHash {
4242
data: [u8; 32],
4343
}
@@ -49,8 +49,15 @@ impl From<QueryHash> for u256 {
4949
}
5050

5151
impl QueryHash {
52+
/// The zero value of a QueryHash
5253
pub const NONE: Self = Self { data: [0; 32] };
5354

55+
/// The min value of a QueryHash
56+
pub const MIN: Self = Self::NONE;
57+
58+
/// The max value of a QueryHash
59+
pub const MAX: Self = Self { data: [0xFFu8; 32] };
60+
5461
pub fn from_bytes(bytes: &[u8]) -> Self {
5562
Self {
5663
data: blake3::hash(bytes).into(),

crates/core/src/subscription/module_subscription_actor.rs

Lines changed: 129 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -712,7 +712,7 @@ impl ModuleSubscriptions {
712712
caller: Option<&ClientConnectionSender>,
713713
mut event: ModuleEvent,
714714
tx: MutTx,
715-
) -> Result<Result<Arc<ModuleEvent>, WriteConflict>, DBError> {
715+
) -> Result<Result<(Arc<ModuleEvent>, ExecutionMetrics), WriteConflict>, DBError> {
716716
// Take a read lock on `subscriptions` before committing tx
717717
// else it can result in subscriber receiving duplicate updates.
718718
let subscriptions = self.subscriptions.read();
@@ -742,10 +742,16 @@ impl ModuleSubscriptions {
742742
.unwrap_or_else(|| DeltaTx::from(&*read_tx));
743743

744744
let event = Arc::new(event);
745+
let mut metrics = ExecutionMetrics::default();
745746

746747
match &event.status {
747748
EventStatus::Committed(_) => {
748-
subscriptions.eval_updates(&read_tx, event.clone(), caller, &self.relational_db.database_identity())
749+
metrics.merge(subscriptions.eval_updates(
750+
&read_tx,
751+
event.clone(),
752+
caller,
753+
&self.relational_db.database_identity(),
754+
));
749755
}
750756
EventStatus::Failed(_) => {
751757
if let Some(client) = caller {
@@ -761,7 +767,7 @@ impl ModuleSubscriptions {
761767
EventStatus::OutOfEnergy => {} // ?
762768
}
763769

764-
Ok(Ok(event))
770+
Ok(Ok((event, metrics)))
765771
}
766772
}
767773

@@ -798,6 +804,7 @@ mod tests {
798804
use spacetimedb_lib::bsatn::ToBsatn;
799805
use spacetimedb_lib::db::auth::StAccess;
800806
use spacetimedb_lib::identity::AuthCtx;
807+
use spacetimedb_lib::metrics::ExecutionMetrics;
801808
use spacetimedb_lib::{bsatn, ConnectionId, ProductType, ProductValue, Timestamp};
802809
use spacetimedb_lib::{error::ResultTest, AlgebraicType, Identity};
803810
use spacetimedb_primitives::TableId;
@@ -1079,19 +1086,19 @@ mod tests {
10791086
subs: &ModuleSubscriptions,
10801087
deletes: impl IntoIterator<Item = (TableId, ProductValue)>,
10811088
inserts: impl IntoIterator<Item = (TableId, ProductValue)>,
1082-
) -> anyhow::Result<()> {
1089+
) -> anyhow::Result<ExecutionMetrics> {
10831090
let mut tx = db.begin_mut_tx(IsolationLevel::Serializable, Workload::ForTests);
10841091
for (table_id, row) in deletes {
10851092
tx.delete_product_value(table_id, &row)?;
10861093
}
10871094
for (table_id, row) in inserts {
10881095
db.insert(&mut tx, table_id, &bsatn::to_vec(&row)?)?;
10891096
}
1090-
assert!(matches!(
1091-
subs.commit_and_broadcast_event(None, module_event(), tx),
1092-
Ok(Ok(_))
1093-
));
1094-
Ok(())
1097+
1098+
let Ok(Ok((_, metrics))) = subs.commit_and_broadcast_event(None, module_event(), tx) else {
1099+
panic!("Encountered an error in `commit_and_broadcast_event`");
1100+
};
1101+
Ok(metrics)
10951102
}
10961103

10971104
#[test]
@@ -1689,6 +1696,119 @@ mod tests {
16891696
Ok(())
16901697
}
16911698

1699+
/// Test that we do not evaluate queries that we know will not match table update rows
1700+
#[tokio::test]
1701+
async fn test_query_pruning() -> anyhow::Result<()> {
1702+
// Establish a connection for each client
1703+
let (tx_for_a, mut rx_for_a) = client_connection(client_id_from_u8(1));
1704+
let (tx_for_b, mut rx_for_b) = client_connection(client_id_from_u8(2));
1705+
1706+
let db = relational_db()?;
1707+
let subs = module_subscriptions(db.clone());
1708+
1709+
let u_id = db.create_table_for_test(
1710+
"u",
1711+
&[
1712+
("i", AlgebraicType::U64),
1713+
("a", AlgebraicType::U64),
1714+
("b", AlgebraicType::U64),
1715+
],
1716+
&[0.into()],
1717+
)?;
1718+
let v_id = db.create_table_for_test(
1719+
"v",
1720+
&[
1721+
("i", AlgebraicType::U64),
1722+
("x", AlgebraicType::U64),
1723+
("y", AlgebraicType::U64),
1724+
],
1725+
&[0.into(), 1.into()],
1726+
)?;
1727+
1728+
commit_tx(
1729+
&db,
1730+
&subs,
1731+
[],
1732+
[
1733+
(u_id, product![0u64, 1u64, 1u64]),
1734+
(u_id, product![1u64, 2u64, 2u64]),
1735+
(u_id, product![2u64, 3u64, 3u64]),
1736+
(v_id, product![0u64, 4u64, 4u64]),
1737+
(v_id, product![1u64, 5u64, 5u64]),
1738+
],
1739+
)?;
1740+
1741+
let mut query_ids = 0;
1742+
1743+
// Returns (i: 0, a: 1, b: 1)
1744+
subscribe_multi(
1745+
&subs,
1746+
&[
1747+
"select u.* from u join v on u.i = v.i where v.x = 4",
1748+
"select u.* from u join v on u.i = v.i where v.x = 6",
1749+
],
1750+
tx_for_a,
1751+
&mut query_ids,
1752+
)?;
1753+
1754+
// Returns (i: 1, a: 2, b: 2)
1755+
subscribe_multi(
1756+
&subs,
1757+
&[
1758+
"select u.* from u join v on u.i = v.i where v.x = 5",
1759+
"select u.* from u join v on u.i = v.i where v.x = 7",
1760+
],
1761+
tx_for_b,
1762+
&mut query_ids,
1763+
)?;
1764+
1765+
// Wait for both subscriptions
1766+
assert!(matches!(
1767+
rx_for_a.recv().await,
1768+
Some(SerializableMessage::Subscription(SubscriptionMessage {
1769+
result: SubscriptionResult::SubscribeMulti(_),
1770+
..
1771+
}))
1772+
));
1773+
assert!(matches!(
1774+
rx_for_b.recv().await,
1775+
Some(SerializableMessage::Subscription(SubscriptionMessage {
1776+
result: SubscriptionResult::SubscribeMulti(_),
1777+
..
1778+
}))
1779+
));
1780+
1781+
// Modify a single row in `v`
1782+
let metrics = commit_tx(
1783+
&db,
1784+
&subs,
1785+
[(v_id, product![1u64, 5u64, 5u64])],
1786+
[(v_id, product![1u64, 5u64, 6u64])],
1787+
)?;
1788+
1789+
// We should only have evaluated a single query
1790+
assert_eq!(metrics.delta_queries_evaluated, 1);
1791+
assert_eq!(metrics.delta_queries_matched, 1);
1792+
1793+
// Insert a new row into `v`
1794+
let metrics = commit_tx(&db, &subs, [], [(v_id, product![2u64, 6u64, 6u64])])?;
1795+
1796+
assert_tx_update_for_table(
1797+
&mut rx_for_a,
1798+
u_id,
1799+
&ProductType::from([AlgebraicType::U64, AlgebraicType::U64, AlgebraicType::U64]),
1800+
[product![2u64, 3u64, 3u64]],
1801+
[],
1802+
)
1803+
.await;
1804+
1805+
// We should only have evaluated a single query
1806+
assert_eq!(metrics.delta_queries_evaluated, 1);
1807+
assert_eq!(metrics.delta_queries_matched, 1);
1808+
1809+
Ok(())
1810+
}
1811+
16921812
/// Asserts that a subscription holds a tx handle for the entire length of its evaluation.
16931813
#[test]
16941814
fn test_tx_subscription_ordering() -> ResultTest<()> {

0 commit comments

Comments
 (0)