Skip to content

Commit 85dd7a0

Browse files
authored
fix(MemoryStore): undeadlock timeline saving
Merge pull request #509 from matrix-org/gnunicorn/issue508
2 parents 7049027 + cf674b6 commit 85dd7a0

File tree

5 files changed

+206
-99
lines changed

5 files changed

+206
-99
lines changed

crates/matrix-sdk-base/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ indexeddb_cryptostore = ["matrix-sdk-crypto/indexeddb_cryptostore"]
3636
[dependencies]
3737
async-stream = "0.3.2"
3838
chacha20poly1305 = { version = "0.9.0", optional = true }
39-
dashmap = "4.0.2"
39+
dashmap = "5.1.0"
4040
futures-core = "0.3.15"
4141
futures-util = { version = "0.3.15", default-features = false }
4242
futures-channel = "0.3.15"

crates/matrix-sdk-base/src/rooms/normal.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -497,7 +497,11 @@ impl Room {
497497
{
498498
TimelineStreamBackward::new(event_ids.clone(), end_token, Some(stored_events))
499499
} else {
500-
TimelineStreamBackward::new(event_ids.clone(), Some(sync_token.clone().unwrap()), None)
500+
TimelineStreamBackward::new(
501+
event_ids.clone(),
502+
Some(sync_token.clone().expect("Sync token exists")),
503+
None,
504+
)
501505
};
502506

503507
backward_timeline_streams.push(backward_sender);

crates/matrix-sdk-base/src/store/memory_store.rs

Lines changed: 55 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -284,10 +284,10 @@ impl MemoryStore {
284284
info!("Save new timeline batch from messages response for {}", room);
285285
}
286286

287-
let data = if timeline.limited {
287+
let mut delete_timeline = false;
288+
if timeline.limited {
288289
info!("Delete stored timeline for {} because the sync response was limited", room);
289-
self.room_timeline.remove(room);
290-
None
290+
delete_timeline = true;
291291
} else if let Some(mut data) = self.room_timeline.get_mut(room) {
292292
if !timeline.sync && Some(&timeline.start) != data.end.as_ref() {
293293
// This should only happen when a developer adds a wrong timeline
@@ -298,7 +298,6 @@ impl MemoryStore {
298298
}
299299

300300
// Check if the event already exists in the store
301-
let mut delete_timeline = false;
302301
for event in &timeline.events {
303302
if let Some(event_id) = event.event_id() {
304303
if data.event_id_to_position.contains_key(&event_id) {
@@ -308,37 +307,39 @@ impl MemoryStore {
308307
}
309308
}
310309

311-
if delete_timeline {
312-
info!("Delete stored timeline for {} because of duplicated events", room);
313-
self.room_timeline.remove(room);
314-
None
315-
} else if timeline.sync {
316-
data.start = timeline.start.clone();
317-
Some(data)
318-
} else {
319-
data.end = timeline.end.clone();
320-
Some(data)
310+
if !delete_timeline {
311+
if timeline.sync {
312+
data.start = timeline.start.clone();
313+
} else {
314+
data.end = timeline.end.clone();
315+
}
321316
}
322-
} else {
323-
None
324-
};
317+
}
325318

326-
let mut data = &mut *if let Some(data) = data {
327-
data
328-
} else {
329-
let data = TimelineData {
319+
if delete_timeline {
320+
info!("Delete stored timeline for {} because of duplicated events", room);
321+
self.room_timeline.remove(room);
322+
}
323+
324+
let mut data =
325+
self.room_timeline.entry(room.to_owned()).or_insert_with(|| TimelineData {
330326
start: timeline.start.clone(),
331327
end: timeline.end.clone(),
332328
..Default::default()
333-
};
334-
self.room_timeline.insert(room.to_owned(), data);
335-
self.room_timeline.get_mut(room).unwrap()
329+
});
330+
331+
let make_room_version = || {
332+
self.room_info
333+
.get(room)
334+
.and_then(|info| {
335+
info.base_info.create.as_ref().map(|event| event.room_version.clone())
336+
})
337+
.unwrap_or_else(|| {
338+
warn!("Unable to find the room version for {}, assume version 9", room);
339+
RoomVersionId::V9
340+
})
336341
};
337342

338-
// Create a copy of the events if the stream created via `room_timeline()` isn't
339-
// fully consumed
340-
let data_events = Arc::make_mut(&mut data.events);
341-
342343
if timeline.sync {
343344
let mut room_version = None;
344345
for event in timeline.events.iter().rev() {
@@ -347,21 +348,13 @@ impl MemoryStore {
347348
redaction,
348349
))) = event.event.deserialize()
349350
{
350-
if let Some(position) = data.event_id_to_position.get(&redaction.redacts) {
351-
if let Some(mut full_event) = data_events.get_mut(position) {
351+
let pos = data.event_id_to_position.get(&redaction.redacts).copied();
352+
353+
if let Some(position) = pos {
354+
if let Some(mut full_event) = data.events.get_mut(&position.clone()) {
352355
let inner_event = full_event.event.deserialize()?;
353356
if room_version.is_none() {
354-
room_version = Some(self.room_info
355-
.get(room)
356-
.and_then(|info| {
357-
info.base_info
358-
.create
359-
.as_ref()
360-
.map(|event| event.room_version.clone())
361-
}).unwrap_or_else(|| {
362-
warn!("Unable to find the room version for {}, assume version 9", room);
363-
RoomVersionId::V9
364-
}));
357+
room_version = Some(make_room_version());
365358
}
366359

367360
full_event.event = Raw::new(&AnySyncRoomEvent::from(
@@ -372,20 +365,22 @@ impl MemoryStore {
372365
}
373366

374367
data.start_position -= 1;
368+
let start_position = data.start_position;
375369
// Only add event with id to the position map
376370
if let Some(event_id) = event.event_id() {
377-
data.event_id_to_position.insert(event_id, data.start_position);
371+
data.event_id_to_position.insert(event_id, start_position);
378372
}
379-
data_events.insert(data.start_position, event.to_owned());
373+
data.events.insert(start_position, event.clone());
380374
}
381375
} else {
382376
for event in timeline.events.iter() {
383377
data.end_position += 1;
378+
let end_position = data.end_position;
384379
// Only add event with id to the position map
385380
if let Some(event_id) = event.event_id() {
386-
data.event_id_to_position.insert(event_id, data.end_position);
381+
data.event_id_to_position.insert(event_id, end_position);
387382
}
388-
data_events.insert(data.end_position, event.to_owned());
383+
data.events.insert(end_position, event.clone());
389384
}
390385
}
391386
}
@@ -580,22 +575,22 @@ impl MemoryStore {
580575
&self,
581576
room_id: &RoomId,
582577
) -> Result<Option<(BoxStream<Result<SyncRoomEvent>>, Option<String>)>> {
583-
if let Some(data) = self.room_timeline.get(room_id) {
584-
let events = data.events.clone();
585-
let stream = stream! {
586-
for item in events.values() {
587-
yield Ok(item.to_owned());
588-
}
589-
};
590-
info!(
591-
"Found previously stored timeline for {}, with end token {:?}",
592-
room_id, data.end
593-
);
594-
Ok(Some((Box::pin(stream), data.end.to_owned())))
578+
let (events, end_token) = if let Some(data) = self.room_timeline.get(room_id) {
579+
(data.events.clone(), data.end.clone())
595580
} else {
596581
info!("No timeline for {} was previously stored", room_id);
597-
Ok(None)
598-
}
582+
return Ok(None);
583+
};
584+
585+
let stream = stream! {
586+
for (_, item) in events {
587+
yield Ok(item);
588+
}
589+
};
590+
591+
info!("Found previously stored timeline for {}, with end token {:?}", room_id, end_token);
592+
593+
Ok(Some((Box::pin(stream), end_token)))
599594
}
600595
}
601596

@@ -762,7 +757,7 @@ struct TimelineData {
762757
pub start_position: isize,
763758
pub end: Option<String>,
764759
pub end_position: isize,
765-
pub events: Arc<BTreeMap<isize, SyncRoomEvent>>,
760+
pub events: BTreeMap<isize, SyncRoomEvent>,
766761
pub event_id_to_position: HashMap<Box<EventId>, isize>,
767762
}
768763

crates/matrix-sdk-base/src/store/sled_store.rs

Lines changed: 21 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ use std::{
2020
time::Instant,
2121
};
2222

23+
use async_stream::stream;
2324
use futures_core::stream::Stream;
2425
use futures_util::stream::{self, TryStreamExt};
2526
use matrix_sdk_common::async_trait;
@@ -1012,25 +1013,33 @@ impl SledStore {
10121013
) -> Result<Option<(BoxStream<Result<SyncRoomEvent>>, Option<String>)>> {
10131014
let db = self.clone();
10141015
let key = room_id.encode();
1016+
let r_id = room_id.to_owned();
10151017
let metadata: Option<TimelineMetadata> = db
10161018
.room_timeline_metadata
10171019
.get(key.as_slice())?
10181020
.map(|v| serde_json::from_slice(&v).map_err(StoreError::Json))
10191021
.transpose()?;
1020-
if metadata.is_none() {
1021-
info!("No timeline for {} was previously stored", room_id);
1022-
return Ok(None);
1023-
}
1024-
let end_token = metadata.and_then(|m| m.end);
1025-
let stream = Box::pin(stream::iter(
1026-
db.room_timeline
1027-
.scan_prefix(key)
1028-
.map(move |v| db.deserialize_event(&v?.1).map_err(|e| e.into())),
1029-
));
1022+
let metadata = match metadata {
1023+
Some(m) => m,
1024+
None => {
1025+
info!("No timeline for {} was previously stored", r_id);
1026+
return Ok(None);
1027+
}
1028+
};
10301029

1031-
info!("Found previously stored timeline for {}, with end token {:?}", room_id, end_token);
1030+
let mut position = metadata.start_position;
1031+
let end_token = metadata.end;
1032+
1033+
info!("Found previously stored timeline for {}, with end token {:?}", r_id, end_token);
1034+
1035+
let stream = stream! {
1036+
while let Ok(Some(item)) = db.room_timeline.get(&(r_id.as_ref(), position).encode()) {
1037+
position += 1;
1038+
yield db.deserialize_event(&item).map_err(|e| e.into());
1039+
}
1040+
};
10321041

1033-
Ok(Some((stream, end_token)))
1042+
Ok(Some((Box::pin(stream), end_token)))
10341043
}
10351044

10361045
async fn remove_room_timeline(&self, room_id: &RoomId) -> Result<()> {

0 commit comments

Comments
 (0)