Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion src/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -993,7 +993,6 @@ impl OpenReplicas {
}
hash_map::Entry::Occupied(mut e) => {
let state = e.get_mut();
tracing::debug!("STATE {state:?}");
state.handles = state.handles.wrapping_sub(1);
if state.handles == 0 {
let _ = e.remove_entry();
Expand Down
143 changes: 143 additions & 0 deletions tests/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1373,3 +1373,146 @@ fn match_sync_finished(event: &LiveEvent, peer: PublicKey) -> bool {
};
e.peer == peer && e.result.is_ok()
}

/// This tests the simplest scenario: A node connects to another node, and performs sync.
#[tokio::test]
// #[traced_test]
async fn sync_rejoin() -> Result<()> {
tracing_subscriber::fmt::try_init().ok();
let mut rng = test_rng(b"sync_rejoin");
let nodes = spawn_nodes(2, &mut rng).await?;
let clients = nodes.iter().map(|node| node.client()).collect::<Vec<_>>();

// create doc on node0
let peer0 = nodes[0].node_id();
let author0 = clients[0].docs().author_create().await?;
let doc0 = clients[0].docs().create().await?;
let blobs0 = clients[0].blobs();
let hash0 = doc0
.set_bytes(author0, b"k1".to_vec(), b"v1".to_vec())
.await?;
assert_latest(blobs0, &doc0, b"k1", b"v1").await;
let ticket = doc0
.share(ShareMode::Write, AddrInfoOptions::RelayAndAddresses)
.await?;

let mut events0 = doc0.subscribe().await?;

info!("node1: join");
let peer1 = nodes[1].node_id();
let doc1 = clients[1].docs().import(ticket.clone()).await?;
let blobs1 = clients[1].blobs();
let mut events1 = doc1.subscribe().await?;
info!("node1: assert 5 events");
assert_next_unordered(
&mut events1,
TIMEOUT,
vec![
Box::new(move |e| matches!(e, LiveEvent::NeighborUp(peer) if *peer == peer0)),
Box::new(move |e| matches!(e, LiveEvent::InsertRemote { from, .. } if *from == peer0 )),
Box::new(move |e| match_sync_finished(e, peer0)),
Box::new(move |e| matches!(e, LiveEvent::ContentReady { hash } if *hash == hash0)),
match_event!(LiveEvent::PendingContentReady),
],
)
.await;
assert_latest(blobs1, &doc1, b"k1", b"v1").await;

info!("node0: assert 3 events");
assert_next(
&mut events0,
TIMEOUT,
vec![
Box::new(move |e| matches!(e, LiveEvent::NeighborUp(peer) if *peer == peer1)),
Box::new(move |e| match_sync_finished(e, peer1)),
match_event!(LiveEvent::PendingContentReady),
],
)
.await;

info!("node0: add blob");
let hash2 = doc0.set_bytes(author0, "k2", "v2").await?;
assert_next(
&mut events0,
TIMEOUT,
vec![Box::new(move |e| {
matches!(e, LiveEvent::InsertLocal { .. })
})],
)
.await;
assert_next(
&mut events1,
TIMEOUT,
vec![
Box::new(move |e| matches!(e, LiveEvent::InsertRemote { from, entry, .. } if *from == peer0 && entry.key() == b"k2")),
Box::new(move |e| matches!(e, LiveEvent::ContentReady { hash } if *hash == hash2)),
],
)
.await;

info!("node1: close doc");
doc1.leave().await?;
drop(events1);
doc1.close().await?;
tokio::time::sleep(Duration::from_secs(1)).await;

info!("node0: assert neighbor down");
assert_next(
&mut events0,
TIMEOUT,
vec![Box::new(
move |e| matches!(e, LiveEvent::NeighborDown(peer) if *peer == peer1),
)],
)
.await;

info!("node1: reopen doc");
let doc1 = clients[1].docs().import(ticket.clone()).await?;
let mut events1 = doc1.subscribe().await?;
assert_next(
&mut events1,
TIMEOUT,
vec![
Box::new(move |e| matches!(e, LiveEvent::NeighborUp(peer) if *peer == peer0)),
Box::new(move |e| match_sync_finished(e, peer0)),
match_event!(LiveEvent::PendingContentReady),
],
)
.await;
assert_next(
&mut events0,
TIMEOUT,
vec![
Box::new(move |e| matches!(e, LiveEvent::NeighborUp(peer) if *peer == peer1)),
Box::new(move |e| match_sync_finished(e, peer1)),
match_event!(LiveEvent::PendingContentReady),
],
)
.await;

info!("node0: add blob");
let hash3 = doc0.set_bytes(author0, "k3", "v3").await?;
assert_next(
&mut events0,
TIMEOUT,
vec![Box::new(move |e| {
matches!(e, LiveEvent::InsertLocal { .. })
})],
)
.await;
info!("node1: confirm live event");
assert_next(
&mut events1,
TIMEOUT,
vec![
Box::new(move |e| matches!(e, LiveEvent::InsertRemote { from, entry, .. } if *from == peer0 && entry.key() == b"k3")),
Box::new(move |e| matches!(e, LiveEvent::ContentReady { hash } if *hash == hash3)),
],
)
.await;

for node in nodes {
node.shutdown().await?;
}
Ok(())
}
Loading