Skip to content

Commit 7a6d59c

Browse files
Do not double compress subscribe messages (#2665)
1 parent eeb3333 commit 7a6d59c

File tree

1 file changed

+126
-22
lines changed

1 file changed

+126
-22
lines changed

crates/core/src/subscription/module_subscription_actor.rs

Lines changed: 126 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,8 @@ use crate::worker_metrics::WORKER_METRICS;
2323
use parking_lot::RwLock;
2424
use prometheus::IntGauge;
2525
use spacetimedb_client_api_messages::websocket::{
26-
self as ws, BsatnFormat, FormatSwitch, JsonFormat, SubscribeMulti, SubscribeSingle, TableUpdate, Unsubscribe,
27-
UnsubscribeMulti,
26+
self as ws, BsatnFormat, Compression, FormatSwitch, JsonFormat, SubscribeMulti, SubscribeSingle, TableUpdate,
27+
Unsubscribe, UnsubscribeMulti,
2828
};
2929
use spacetimedb_execution::pipelined::PipelinedProject;
3030
use spacetimedb_expr::check::parse_and_type_sub;
@@ -170,7 +170,6 @@ impl ModuleSubscriptions {
170170
auth,
171171
)?;
172172

173-
let comp = sender.config.compression;
174173
let table_id = query.subscribed_table_id();
175174
let table_name = query.subscribed_table_name();
176175

@@ -187,10 +186,34 @@ impl ModuleSubscriptions {
187186
let tx = DeltaTx::from(tx);
188187

189188
Ok(match sender.config.protocol {
190-
Protocol::Binary => collect_table_update(&plans, table_id, table_name.into(), comp, &tx, update_type)
191-
.map(|(table_update, metrics)| (FormatSwitch::Bsatn(table_update), metrics)),
192-
Protocol::Text => collect_table_update(&plans, table_id, table_name.into(), comp, &tx, update_type)
193-
.map(|(table_update, metrics)| (FormatSwitch::Json(table_update), metrics)),
189+
Protocol::Binary => {
190+
collect_table_update(
191+
&plans,
192+
table_id,
193+
table_name.into(),
194+
// We will compress the outer server message,
195+
// after we release the tx lock.
196+
// There's no need to compress the inner table update too.
197+
Compression::None,
198+
&tx,
199+
update_type,
200+
)
201+
.map(|(table_update, metrics)| (FormatSwitch::Bsatn(table_update), metrics))
202+
}
203+
Protocol::Text => {
204+
collect_table_update(
205+
&plans,
206+
table_id,
207+
table_name.into(),
208+
// We will compress the outer server message,
209+
// after we release the tx lock,
210+
// There's no need to compress the inner table update too.
211+
Compression::None,
212+
&tx,
213+
update_type,
214+
)
215+
.map(|(table_update, metrics)| (FormatSwitch::Json(table_update), metrics))
216+
}
194217
}?)
195218
}
196219

@@ -213,16 +236,31 @@ impl ModuleSubscriptions {
213236
},
214237
auth,
215238
)?;
216-
let comp = sender.config.compression;
217239

218240
let tx = DeltaTx::from(tx);
219241
match sender.config.protocol {
220242
Protocol::Binary => {
221-
let (update, metrics) = execute_plans(queries, comp, &tx, update_type)?;
243+
let (update, metrics) = execute_plans(
244+
queries,
245+
// We will compress the outer server message,
246+
// after we release the tx lock.
247+
// There's no need to compress the inner table updates too.
248+
Compression::None,
249+
&tx,
250+
update_type,
251+
)?;
222252
Ok((FormatSwitch::Bsatn(update), metrics))
223253
}
224254
Protocol::Text => {
225-
let (update, metrics) = execute_plans(queries, comp, &tx, update_type)?;
255+
let (update, metrics) = execute_plans(
256+
queries,
257+
// We will compress the outer server message,
258+
// after we release the tx lock.
259+
// There's no need to compress the inner table updates too.
260+
Compression::None,
261+
&tx,
262+
update_type,
263+
)?;
226264
Ok((FormatSwitch::Json(update), metrics))
227265
}
228266
}
@@ -598,8 +636,6 @@ impl ModuleSubscriptions {
598636

599637
drop(guard);
600638

601-
let comp = sender.config.compression;
602-
603639
check_row_limit(
604640
&queries,
605641
&self.relational_db,
@@ -614,10 +650,26 @@ impl ModuleSubscriptions {
614650

615651
let tx = DeltaTx::from(&*tx);
616652
let (database_update, metrics) = match sender.config.protocol {
617-
Protocol::Binary => execute_plans(&queries, comp, &tx, TableUpdateType::Subscribe)
618-
.map(|(table_update, metrics)| (FormatSwitch::Bsatn(table_update), metrics))?,
619-
Protocol::Text => execute_plans(&queries, comp, &tx, TableUpdateType::Subscribe)
620-
.map(|(table_update, metrics)| (FormatSwitch::Json(table_update), metrics))?,
653+
Protocol::Binary => execute_plans(
654+
&queries,
655+
// We will compress the outer server message,
656+
// after we release the tx lock.
657+
// There's no need to compress the inner table updates too.
658+
Compression::None,
659+
&tx,
660+
TableUpdateType::Subscribe,
661+
)
662+
.map(|(table_update, metrics)| (FormatSwitch::Bsatn(table_update), metrics))?,
663+
Protocol::Text => execute_plans(
664+
&queries,
665+
// We will compress the outer server message,
666+
// after we release the tx lock.
667+
// There's no need to compress the inner table updates too.
668+
Compression::None,
669+
&tx,
670+
TableUpdateType::Subscribe,
671+
)
672+
.map(|(table_update, metrics)| (FormatSwitch::Json(table_update), metrics))?,
621673
};
622674

623675
record_exec_metrics(
@@ -719,8 +771,8 @@ pub struct WriteConflict;
719771
mod tests {
720772
use super::{AssertTxFn, ModuleSubscriptions};
721773
use crate::client::messages::{
722-
SerializableMessage, SubscriptionError, SubscriptionMessage, SubscriptionResult, SubscriptionUpdateMessage,
723-
TransactionUpdateMessage,
774+
SerializableMessage, SubscriptionData, SubscriptionError, SubscriptionMessage, SubscriptionResult,
775+
SubscriptionUpdateMessage, TransactionUpdateMessage,
724776
};
725777
use crate::client::{ClientActorId, ClientConfig, ClientConnectionSender, ClientName, Protocol};
726778
use crate::db::datastore::system_tables::{StRowLevelSecurityRow, ST_ROW_LEVEL_SECURITY_ID};
@@ -740,7 +792,7 @@ mod tests {
740792
use spacetimedb_client_api_messages::energy::EnergyQuanta;
741793
use spacetimedb_client_api_messages::websocket::{
742794
CompressableQueryUpdate, Compression, FormatSwitch, QueryId, Subscribe, SubscribeMulti, SubscribeSingle,
743-
Unsubscribe, UnsubscribeMulti,
795+
TableUpdate, Unsubscribe, UnsubscribeMulti,
744796
};
745797
use spacetimedb_execution::dml::MutDatastore;
746798
use spacetimedb_lib::bsatn::ToBsatn;
@@ -856,19 +908,27 @@ mod tests {
856908
}
857909
}
858910

859-
/// Instantiate a client connection
860-
fn client_connection(client_id: ClientActorId) -> (Arc<ClientConnectionSender>, Receiver<SerializableMessage>) {
911+
/// Instantiate a client connection with compression
912+
fn client_connection_with_compression(
913+
client_id: ClientActorId,
914+
compression: Compression,
915+
) -> (Arc<ClientConnectionSender>, Receiver<SerializableMessage>) {
861916
let (sender, rx) = ClientConnectionSender::dummy_with_channel(
862917
client_id,
863918
ClientConfig {
864919
protocol: Protocol::Binary,
865-
compression: Compression::None,
920+
compression,
866921
tx_update_full: true,
867922
},
868923
);
869924
(Arc::new(sender), rx)
870925
}
871926

927+
/// Instantiate a client connection
928+
fn client_connection(client_id: ClientActorId) -> (Arc<ClientConnectionSender>, Receiver<SerializableMessage>) {
929+
client_connection_with_compression(client_id, Compression::None)
930+
}
931+
872932
/// Insert rules into the RLS system table
873933
fn insert_rls_rules(
874934
db: &RelationalDB,
@@ -1472,6 +1532,50 @@ mod tests {
14721532
Ok(())
14731533
}
14741534

1535+
/// Test that we do not compress the results of an initial subscribe call
1536+
#[tokio::test]
1537+
async fn test_no_compression_for_subscribe() -> anyhow::Result<()> {
1538+
// Establish a client connection with compression
1539+
let (tx, mut rx) = client_connection_with_compression(client_id_from_u8(1), Compression::Brotli);
1540+
1541+
let db = relational_db()?;
1542+
let subs = module_subscriptions(db.clone());
1543+
1544+
let table_id = db.create_table_for_test("t", &[("x", AlgebraicType::U64)], &[])?;
1545+
1546+
let mut inserts = vec![];
1547+
1548+
for i in 0..16_000u64 {
1549+
inserts.push((table_id, product![i]));
1550+
}
1551+
1552+
// Insert a lot of rows into `t`.
1553+
// We want to insert enough to cross any threshold there might be for compression.
1554+
commit_tx(&db, &subs, [], inserts)?;
1555+
1556+
// Subscribe to the entire table
1557+
subscribe_multi(&subs, &["select * from t"], tx, &mut 0)?;
1558+
1559+
// Assert the table updates within this message are all be uncompressed
1560+
match rx.recv().await {
1561+
Some(SerializableMessage::Subscription(SubscriptionMessage {
1562+
result:
1563+
SubscriptionResult::SubscribeMulti(SubscriptionData {
1564+
data: FormatSwitch::Bsatn(ws::DatabaseUpdate { tables }),
1565+
}),
1566+
..
1567+
})) => {
1568+
assert!(tables.iter().all(|TableUpdate { updates, .. }| updates
1569+
.iter()
1570+
.all(|query_update| matches!(query_update, CompressableQueryUpdate::Uncompressed(_)))));
1571+
}
1572+
Some(_) => panic!("unexpected message from subscription"),
1573+
None => panic!("channel unexpectedly closed"),
1574+
};
1575+
1576+
Ok(())
1577+
}
1578+
14751579
/// In this test we subscribe to a join query, update the lhs table,
14761580
/// and assert that the server sends the correct delta to the client.
14771581
#[tokio::test]

0 commit comments

Comments
 (0)