Skip to content

Commit b133a9c

Browse files
committed
Finish up sub I/O
1 parent e9f57f7 commit b133a9c

File tree

5 files changed

+450
-217
lines changed

5 files changed

+450
-217
lines changed

crates/corrosion-tests/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ pub struct TestSubsDb {
5151
matcher_conns: std::collections::BTreeMap<uuid::Uuid, types::sqlite::CrConn>,
5252
db_version: usize,
5353
pub trip: Trip,
54-
btx: BroadcastingTransactor,
54+
pub btx: BroadcastingTransactor,
5555
}
5656

5757
impl TestSubsDb {
Lines changed: 192 additions & 77 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,27 @@
11
//! Tests for basic connection, operation, and disconnection between a UDP
22
//! server and clients
33
4-
use std::sync::Arc;
4+
use std::time::Duration;
55

6-
use corrosion::{Peer, db, pubsub, persistent::{server, client, proto::{v1 as p, ExecResponse, ExecResult}}};
6+
use corro_api_types::Statement;
7+
use corro_types::base::CrsqlDbVersion;
8+
use corrosion::{
9+
Peer, db,
10+
persistent::{
11+
ErrorCode, client,
12+
executor::BroadcastingTransactor,
13+
proto::{ExecResponse, ExecResult, v1 as p},
14+
server,
15+
},
16+
pubsub,
17+
};
718
use corrosion_tests::{self as ct, Cell};
819
use quilkin_types::{Endpoint, IcaoCode, TokenSet};
920

1021
#[derive(Clone)]
1122
struct InstaPrinter {
1223
db: corro_types::agent::SplitPool,
24+
btx: BroadcastingTransactor,
1325
}
1426

1527
impl InstaPrinter {
@@ -37,21 +49,33 @@ impl InstaPrinter {
3749
servers.push_str(&dc);
3850
servers
3951
}
52+
53+
async fn do_mutate(&self, ops: &[Statement]) -> (usize, Option<CrsqlDbVersion>, Duration) {
54+
self.btx
55+
.make_broadcastable_changes(None, |tx| {
56+
corrosion_tests::exec_interr(tx, ops.iter()).map_err(|e| {
57+
corro_types::agent::ChangeError::Rusqlite {
58+
source: e,
59+
actor_id: None,
60+
version: None,
61+
}
62+
})
63+
})
64+
.await
65+
.unwrap()
66+
}
4067
}
4168

4269
#[async_trait::async_trait]
4370
impl server::Mutator for InstaPrinter {
4471
async fn connected(&self, peer: Peer, icao: IcaoCode, qcmp_port: u16) {
4572
let mut dc = smallvec::SmallVec::<[_; 1]>::new();
46-
let mut dc = db::write::Datacenter(&mut dc);
47-
dc.insert(peer, qcmp_port, icao);
48-
4973
{
50-
let mut conn = self.db.write_priority().await.unwrap();
51-
let tx = conn.transaction().unwrap();
52-
ct::exec(&tx, dc.0.iter()).unwrap();
53-
tx.commit().unwrap();
74+
let mut dc = db::write::Datacenter(&mut dc);
75+
dc.insert(peer, qcmp_port, icao);
5476
}
77+
78+
self.do_mutate(&dc).await;
5579
}
5680

5781
async fn execute(&self, peer: Peer, statements: &[p::ServerChange]) -> ExecResponse {
@@ -85,21 +109,15 @@ impl server::Mutator for InstaPrinter {
85109
}
86110
}
87111

88-
let rows_affected = {
89-
let mut conn = self.db.write_normal().await.unwrap();
90-
let tx = conn.transaction().unwrap();
91-
let rows = ct::exec(&tx, v.iter()).unwrap();
92-
tx.commit().unwrap();
93-
rows
94-
};
112+
let (rows_affected, version, dur) = self.do_mutate(&v).await;
95113

96114
ExecResponse {
97115
results: vec![ExecResult::Execute {
98116
rows_affected,
99117
time: 0.,
100118
}],
101-
time: 0.,
102-
version: None,
119+
time: dur.as_secs_f64(),
120+
version: version.map(|v| v.0),
103121
actor_id: None,
104122
}
105123
}
@@ -125,8 +143,11 @@ struct ServerSub {
125143

126144
#[async_trait::async_trait]
127145
impl server::SubManager for ServerSub {
128-
async fn subscribe(&self, subp: pubsub::SubParamsv1) -> Result<pubsub::Subscription, pubsub::MatcherUpsertError> {
129-
unimplemented!();
146+
async fn subscribe(
147+
&self,
148+
subp: pubsub::SubParamsv1,
149+
) -> Result<pubsub::Subscription, pubsub::MatcherUpsertError> {
150+
self.ctx.subscribe(subp).await
130151
}
131152
}
132153

@@ -136,6 +157,7 @@ async fn test_quic_stream() {
136157

137158
let ip = InstaPrinter {
138159
db: db.pool.clone(),
160+
btx: db.btx.clone(),
139161
};
140162

141163
let ss = ServerSub {
@@ -152,80 +174,173 @@ async fn test_quic_stream() {
152174
.await
153175
.unwrap();
154176

155-
let mutator = client::MutationClient::connect(client, 2001, icao)
177+
let mutator = client::MutationClient::connect(client.clone(), 2001, icao)
156178
.await
157179
.unwrap();
158180
insta::assert_snapshot!("connect", ip.print().await);
159181

160182
// TODO: Pull this out into an actual type used in quilkin when we integrate the corrosion stuff in
161-
let mut actual = std::collections::BTreeMap::<IcaoCode, (Endpoint, TokenSet)>::new();
162-
let mut expected = std::collections::BTreeMap::<IcaoCode, (Endpoint, TokenSet)>::new();
163-
164-
165-
166-
167-
mutator
168-
.transactions(&[p::ServerChange::Upsert(vec![
169-
p::ServerUpsert {
170-
endpoint: Endpoint {
171-
address: std::net::Ipv4Addr::new(1, 2, 3, 4).into(),
172-
port: 2002,
173-
},
174-
icao,
175-
tokens: [[20; 2]].into(),
183+
let mut actual = std::collections::BTreeMap::<Endpoint, (IcaoCode, TokenSet)>::new();
184+
let mut expected = std::collections::BTreeMap::<Endpoint, (IcaoCode, TokenSet)>::new();
185+
186+
let (subscriber, sub) = client::SubscriptionClient::connect(
187+
client,
188+
corrosion::pubsub::SubParamsv1 {
189+
query: corrosion::api::Statement::Simple(pubsub::SERVER_QUERY.into()),
190+
from: None,
191+
skip_rows: false,
192+
max_buffer: 0,
193+
max_time: Duration::from_millis(10),
194+
process_interval: Duration::from_millis(10),
195+
change_threshold: 0,
196+
},
197+
)
198+
.await
199+
.unwrap();
200+
201+
let mut srx = sub.rx;
202+
203+
let mut mutate = async |sc: &[p::ServerChange]| {
204+
for sc in sc {
205+
match sc {
206+
p::ServerChange::Upsert(up) => {
207+
for u in up {
208+
expected.insert(u.endpoint.clone(), (u.icao, u.tokens.clone()));
209+
}
210+
}
211+
p::ServerChange::Update(up) => {
212+
for u in up {
213+
let s = expected
214+
.get_mut(&u.endpoint)
215+
.expect("failed to find expected endpoint");
216+
if let Some(ic) = u.icao {
217+
s.0 = ic;
218+
}
219+
if let Some(ts) = u.tokens.clone() {
220+
s.1 = ts;
221+
}
222+
}
223+
}
224+
p::ServerChange::Remove(r) => {
225+
for r in r {
226+
expected.remove(r);
227+
}
228+
}
229+
p::ServerChange::UpdateMutator(_) => unreachable!(),
230+
}
231+
}
232+
233+
mutator
234+
.transactions(sc)
235+
.await
236+
.expect("failed to apply transactions");
237+
};
238+
239+
let mut process_sub = async || -> bool {
240+
let change = tokio::time::timeout(Duration::from_millis(10000), srx.recv())
241+
.await
242+
.expect("timed out waiting for server change")
243+
.expect("expected change");
244+
245+
use corrosion::{
246+
api::{TypedQueryEvent as tqe, sqlite::ChangeType},
247+
db::read::{self, FromSqlValue},
248+
};
249+
250+
let (cty, row) = match change {
251+
tqe::Change(cty, _, row, _) => (cty, row),
252+
tqe::Row(_, row) => (ChangeType::Insert, row),
253+
_ => return false,
254+
};
255+
256+
let row = read::ServerRow::from_sql(&row).expect("failed to deserialize row");
257+
258+
match cty {
259+
ChangeType::Insert => {
260+
actual.insert(row.endpoint, (row.icao, row.tokens));
261+
}
262+
ChangeType::Update => {
263+
let r = actual
264+
.get_mut(&row.endpoint)
265+
.expect("expected endpoint not found");
266+
r.0 = row.icao;
267+
r.1 = row.tokens;
268+
}
269+
ChangeType::Delete => {
270+
actual.remove(&row.endpoint);
271+
}
272+
}
273+
274+
true
275+
};
276+
277+
mutate(&[p::ServerChange::Upsert(vec![
278+
p::ServerUpsert {
279+
endpoint: Endpoint {
280+
address: std::net::Ipv4Addr::new(1, 2, 3, 4).into(),
281+
port: 2002,
176282
},
177-
p::ServerUpsert {
178-
endpoint: Endpoint {
179-
address: std::net::Ipv4Addr::new(9, 9, 9, 9).into(),
180-
port: 2003,
181-
},
182-
icao,
183-
tokens: [[30; 3]].into(),
283+
icao,
284+
tokens: [[20; 2]].into(),
285+
},
286+
p::ServerUpsert {
287+
endpoint: Endpoint {
288+
address: std::net::Ipv4Addr::new(9, 9, 9, 9).into(),
289+
port: 2003,
184290
},
185-
p::ServerUpsert {
186-
endpoint: Endpoint {
187-
address: std::net::Ipv6Addr::from_bits(0xf0ccac1a).into(),
188-
port: 2004,
189-
},
190-
icao,
191-
tokens: [[40; 4]].into(),
291+
icao,
292+
tokens: [[30; 3]].into(),
293+
},
294+
p::ServerUpsert {
295+
endpoint: Endpoint {
296+
address: std::net::Ipv6Addr::from_bits(0xf0ccac1a).into(),
297+
port: 2004,
192298
},
193-
p::ServerUpsert {
194-
endpoint: Endpoint {
195-
address: quilkin_types::AddressKind::Name("game.boop.com".into()),
196-
port: 2005,
197-
},
198-
icao,
199-
tokens: [[50; 5]].into(),
299+
icao,
300+
tokens: [[40; 4]].into(),
301+
},
302+
p::ServerUpsert {
303+
endpoint: Endpoint {
304+
address: quilkin_types::AddressKind::Name("game.boop.com".into()),
305+
port: 2005,
200306
},
201-
])])
202-
.await
203-
.unwrap();
307+
icao,
308+
tokens: [[50; 5]].into(),
309+
},
310+
])])
311+
.await;
312+
313+
for _ in 0..4 {
314+
while !process_sub().await {}
315+
}
204316

205317
insta::assert_snapshot!("initial_insert", ip.print().await);
206318

207-
mutator
208-
.transactions(&[
209-
p::ServerChange::Remove(vec![Endpoint {
210-
address: std::net::Ipv4Addr::new(9, 9, 9, 9).into(),
211-
port: 2003,
212-
}]),
213-
p::ServerChange::Update(vec![p::ServerUpdate {
214-
endpoint: Endpoint {
215-
address: std::net::Ipv6Addr::from_bits(0xf0ccac1a).into(),
216-
port: 2004,
217-
},
218-
icao: Some(IcaoCode::new_testing([b'X'; 4])),
219-
tokens: None,
220-
}]),
221-
])
222-
.await
223-
.unwrap();
319+
mutate(&[
320+
p::ServerChange::Remove(vec![Endpoint {
321+
address: std::net::Ipv4Addr::new(9, 9, 9, 9).into(),
322+
port: 2003,
323+
}]),
324+
p::ServerChange::Update(vec![p::ServerUpdate {
325+
endpoint: Endpoint {
326+
address: std::net::Ipv6Addr::from_bits(0xf0ccac1a).into(),
327+
port: 2004,
328+
},
329+
icao: Some(IcaoCode::new_testing([b'X'; 4])),
330+
tokens: None,
331+
}]),
332+
])
333+
.await;
334+
335+
for _ in 0..2 {
336+
while !process_sub().await {}
337+
}
224338

225339
insta::assert_snapshot!("remove_and_update", ip.print().await);
226340

227341
mutator.shutdown().await;
228342
insta::assert_snapshot!("disconnect", ip.print().await);
343+
subscriber.shutdown(ErrorCode::Ok).await;
229344

230345
assert_eq!(expected, actual);
231346
}

0 commit comments

Comments
 (0)