diff --git a/matrix_sdk_base/src/store/mod.rs b/matrix_sdk_base/src/store/mod.rs index a9b20dd0a12..35902b799ee 100644 --- a/matrix_sdk_base/src/store/mod.rs +++ b/matrix_sdk_base/src/store/mod.rs @@ -22,13 +22,19 @@ use std::{ use dashmap::DashMap; use matrix_sdk_common::{ - api::r0::push::get_notifications::Notification, + api::r0::{ + message::get_message_events::{ + Direction, Request as MessagesRequest, Response as MessagesResponse, + }, + push::get_notifications::Notification, + }, async_trait, events::{ presence::PresenceEvent, room::member::MemberEventContent, AnyGlobalAccountDataEvent, - AnyRoomAccountDataEvent, AnyStrippedStateEvent, AnySyncStateEvent, EventContent, EventType, + AnyRoomAccountDataEvent, AnyRoomEvent, AnyStrippedStateEvent, AnySyncRoomEvent, + AnySyncStateEvent, EventContent, EventType, }, - identifiers::{RoomId, UserId}, + identifiers::{EventId, RoomId, UserId}, locks::RwLock, AsyncTraitDeps, Raw, }; @@ -463,3 +469,425 @@ impl StateChanges { self.notifications.entry(room_id.to_owned()).or_insert_with(Vec::new).push(notification); } } + +/// A token that represents the last known event before incoming events +/// are added via /sync or /messages. +pub type PrevBatchToken = String; + +/// The ending of a chunk of events from /messages. This will match the +/// next prev_batch token from a /sync. +pub type NextBatchToken = String; + +/// The position of a chunk of events in the event stream. This is +/// based on the ordering of `PrevBatchToken` and `NextBatchToken`. +#[derive(Clone, Copy, Debug, Default, PartialEq, Eq, PartialOrd, Ord, Hash)] +pub struct SliceIdx(u128); + +impl SliceIdx { + pub fn first() -> Self { + Self(u128::MAX / 2) + } + + pub fn from_prev(prev: u128) -> Self { + Self(prev + 1) + } + + pub fn from_post(prev: u128) -> Self { + Self(prev - 1) + } +} + +/// The position of an event within a slice or chunk. +#[derive(Clone, Copy, Debug, Default, PartialEq, Eq, PartialOrd, Ord, Hash)] +pub struct EventIdx(u128); + +impl EventIdx { + pub fn first() -> Self { + Self(u128::MAX / 2) + } + + pub fn from_prev(prev: u128) -> Self { + Self(prev + 1) + } + + pub fn from_post(prev: u128) -> Self { + Self(prev - 1) + } +} + +/// Represents a chunk of events from either /sync or /messages. +#[derive(Clone, Debug)] +pub struct TimelineSlice { + start: PrevBatchToken, + end: NextBatchToken, +} + +impl TimelineSlice { + pub fn new(start: PrevBatchToken, end: NextBatchToken) -> Self { + Self { start, end } + } +} + +/// `EventOwnerMap` keeps track of the ordering of all events and allows getting +/// events from a `PrevBatchToken`, `EventId` or `SliceIdx`. +#[derive(Clone, Debug)] +pub struct EventOwnerMap { + slice_map: BTreeMap>, + event_map: BTreeMap, + // Now we can go from an eventId to a sub-slice of it's parent SliceId, I don't think + // we could before? + event_index: BTreeMap, +} + +/// Store the new events from /sync or /messages. +#[derive(Clone, Debug)] +pub struct Timeline { + slices: BTreeMap, + /// Maps a `prev_batch` token to a `SliceIdx` "forwards". + prev_slice_map: BTreeMap, + /// Maps a `next_batch` token to a `SliceIdx` "backwards". + /// + /// This points to the same `SliceIdx` `prev_slice_map` does but with the + /// `next_batch` token. + next_slice_map: BTreeMap, + events: EventOwnerMap, +} + +impl Timeline { + /// Create an empty `Timeline`. + pub fn new() -> Self { + Self { + slices: BTreeMap::new(), + prev_slice_map: BTreeMap::new(), + next_slice_map: BTreeMap::new(), + events: EventOwnerMap { + slice_map: BTreeMap::new(), + event_map: BTreeMap::new(), + event_index: BTreeMap::new(), + }, + } + } + + /// Is this `Timeline` empty it has received no /sync or /message responses. + pub fn is_empty(&self) -> bool { + self.slices.is_empty() && self.prev_slice_map.is_empty() + } + + /// + pub fn handle_sync_timeline( + &mut self, + room_id: &RoomId, + prev_batch: &str, + next_batch: &str, + limited: bool, + events: &[AnySyncRoomEvent], + ) { + let timeline_slice = TimelineSlice::new(prev_batch.to_owned(), next_batch.to_owned()); + + // The new `prev_batch` token is the `next_batch` token of a previous /sync + if let Some(SliceIdx(idx)) = self.next_slice_map.get(prev_batch) { + let next_idx = SliceIdx(idx + 1); + let last_event_index = self + .events + .slice_map + .values() + .last() + .and_then(|map| map.keys().last()) + .map(|e| { + let EventIdx(idx) = e; + *idx + }) + .expect("event map is in sync with timeline"); + + self.prev_slice_map.insert(prev_batch.to_owned(), next_idx); + self.next_slice_map.insert(next_batch.to_owned(), next_idx); + self.slices.insert(next_idx, timeline_slice); + + self.update_events_map_forward(events, last_event_index, next_idx); + } else if self.is_empty() { + let next_idx = SliceIdx::first(); + self.prev_slice_map.insert(prev_batch.to_owned(), next_idx); + self.next_slice_map.insert(next_batch.to_owned(), next_idx); + self.slices.insert(next_idx, timeline_slice); + + self.update_events_map_forward(events, u128::MAX / 2, next_idx); + } else { + todo!("hmmm we got a problem or a gap") + } + } + + /// + pub fn handle_messages_response( + &mut self, + room_id: &RoomId, + resp: &MessagesResponse, + dir: Direction, + ) { + match dir { + // the end token is how to request older events + // events are in reverse-chronological order + Direction::Backward => { + match (&resp.end, &resp.start) { + (Some(prev_batch), Some(end)) => { + let timeline_slice = + TimelineSlice::new(prev_batch.to_owned(), end.to_owned()); + + let old = self.prev_slice_map.get(prev_batch); + let recent = self.prev_slice_map.get(end); + match (old, recent) { + // We have the full chunk already + (Some(_), Some(_)) => {} + (Some(_), None) => { + // We have a gap + // A -> B -> gap -> F + // we know B but we must have gotten an + // incomplete chunk or + // something + } + // we have the recent token but not the older so fill + // backwards + (None, Some(SliceIdx(idx))) => { + let prev_idx = SliceIdx(idx - 1); + let prev_event_index = self + .events + .slice_map + .get(&SliceIdx(*idx)) + .and_then(|map| map.keys().next()) + .map(|e| { + let EventIdx(idx) = e; + *idx + }) + .expect("event map is in sync with timeline"); + + self.prev_slice_map.insert(prev_batch.to_owned(), prev_idx); + self.next_slice_map.insert(end.to_owned(), prev_idx); + self.slices.insert(prev_idx, timeline_slice); + + // We reverse so our slice is oldest -> most recent + self.update_events_map_backward( + &resp.chunk, + prev_event_index, + prev_idx, + ) + } + (None, None) => {} + } + } + (Some(prev), None) => { + // TODO: is this an incomplete chunk do these have + // meanings like if there is no + // prev/next_batch token? + } + (None, Some(end)) => {} + (None, None) => todo!("problems"), + } + } + // the start token is the oldest events + // events are in chronological order + Direction::Forward => { + match (&resp.start, &resp.end) { + (Some(prev_batch), Some(end)) => { + let timeline_slice = + TimelineSlice::new(prev_batch.to_owned(), end.to_owned()); + + let old = self.next_slice_map.get(prev_batch); + let recent = self.next_slice_map.get(end); + match (old, recent) { + // We have the full chunk already + (Some(_), Some(_)) => {} + (Some(SliceIdx(idx)), None) => { + let next_idx = SliceIdx(idx + 1); + let last_event_index = self + .events + .slice_map + .get(&SliceIdx(*idx)) + .and_then(|map| map.keys().next()) + .map(|e| { + let EventIdx(idx) = e; + *idx + }) + .expect("event map is in sync with timeline"); + + self.prev_slice_map.insert(prev_batch.to_owned(), next_idx); + self.next_slice_map.insert(end.to_owned(), next_idx); + self.slices.insert(next_idx, timeline_slice); + + self.update_events_map_forward( + // TODO: + // make the methods more general or specific or + // is this hacky conversion ok? + &resp + .chunk + .iter() + .filter_map(|e| { + // Not ideal but all we need is the eventId + serde_json::from_str(e.json().get()).ok() + }) + .collect::>(), + last_event_index, + next_idx, + ); + } + // We have the recent token but not the older so fill + // backwards + (None, Some(SliceIdx(idx))) => { + let prev_idx = SliceIdx(idx - 1); + let prev_event_index = self + .events + .slice_map + .get(&SliceIdx(*idx)) + .and_then(|map| map.keys().next()) + .map(|e| { + let EventIdx(idx) = e; + *idx + }) + .expect("event map is in sync with timeline"); + + self.prev_slice_map.insert(prev_batch.to_owned(), prev_idx); + self.next_slice_map.insert(end.to_owned(), prev_idx); + self.slices.insert(prev_idx, timeline_slice); + // We reverse so our slice is oldest -> most recent + self.update_events_map_backward( + &resp.chunk, + prev_event_index, + prev_idx, + ) + } + (None, None) => {} + } + } + (Some(prev), None) => {} + (None, Some(end)) => {} + (None, None) => todo!("can't move forward nor backward, timeline full"), + } + } + } + } + + fn update_events_map_forward( + &mut self, + events: &[AnySyncRoomEvent], + last_event_index: u128, + slice_idx: SliceIdx, + ) { + let mut index_event = BTreeMap::new(); + for (i, event) in events.iter().enumerate() { + let e_idx = EventIdx(last_event_index + ((i + 1) as u128)); + let e_id = event.event_id(); + + index_event.insert(e_idx, e_id.clone()); + self.events.event_index.insert(e_id.clone(), e_idx); + self.events.event_map.insert(e_id.clone(), slice_idx); + } + self.events.slice_map.insert(slice_idx, index_event); + } + + fn update_events_map_backward( + &mut self, + events: &[Raw], + last_event_index: u128, + slice_idx: SliceIdx, + ) { + let mut index_event = BTreeMap::new(); + // Reverse so that newer events have a smaller index from enumerate + for (i, event) in events + .iter() + // TODO: don't eat events or is this ok? + .filter_map(|e| e.deserialize().ok()) + .rev() + .enumerate() + { + let e_idx = EventIdx(last_event_index - ((i + 1) as u128)); + let e_id = event.event_id(); + + index_event.insert(e_idx, e_id.clone()); + self.events.event_index.insert(e_id.clone(), e_idx); + self.events.event_map.insert(e_id.clone(), slice_idx); + } + self.events.slice_map.insert(slice_idx, index_event); + } +} + +// TODO: do this in ruma? +trait EventIdExt { + fn event_id(&self) -> &EventId; +} + +impl EventIdExt for AnySyncRoomEvent { + fn event_id(&self) -> &EventId { + match self { + AnySyncRoomEvent::Message(ev) => ev.event_id(), + AnySyncRoomEvent::State(ev) => ev.event_id(), + AnySyncRoomEvent::RedactedMessage(ev) => ev.event_id(), + AnySyncRoomEvent::RedactedState(ev) => ev.event_id(), + } + } +} + +impl EventIdExt for AnyRoomEvent { + fn event_id(&self) -> &EventId { + match self { + AnyRoomEvent::Message(ev) => ev.event_id(), + AnyRoomEvent::State(ev) => ev.event_id(), + AnyRoomEvent::RedactedMessage(ev) => ev.event_id(), + AnyRoomEvent::RedactedState(ev) => ev.event_id(), + } + } +} + +#[cfg(test)] +mod test { + use matrix_sdk_common::{ + api::r0::message::get_message_events::{Direction, Response as MessagesResponse}, + events::{AnyRoomEvent, AnySyncRoomEvent}, + identifiers::room_id, + Raw, + }; + use matrix_sdk_test::test_json; + + use super::Timeline; + + #[test] + fn messages() { + let room_id = room_id!("!SVkFJHzfwvuaIEawgC:localhost"); + + let messages = serde_json::from_value::>>( + test_json::ROOM_MESSAGES["chunk"].clone(), + ) + .unwrap(); + let sync = + serde_json::from_value::( + test_json::SYNC["rooms"]["join"][room_id.as_str()]["timeline"].clone(), + ) + .unwrap(); + + let mut timeline = Timeline::new(); + + timeline.handle_sync_timeline( + &room_id, + "t392-516_47314_0_7_1_1_1_11444_1", + "s526_47314_0_7_1_1_1_11444_1", + sync.limited, + &sync + .events + .into_iter() + .map(|e| e.deserialize()) + .collect::, _>>() + .unwrap(), + ); + + let mut resp = MessagesResponse::new(); + resp.chunk = messages; + resp.start = Some("s526_47314_0_7_1_1_1_11444_1".to_owned()); + resp.end = Some("s_end__end".to_owned()); + + timeline.handle_messages_response(&room_id, &resp, Direction::Forward); + + println!("{:#?}", timeline); + // end: t3336-1714379051_757284961_10998365_725145800_588037087_1999191_200821144_689020759_166049 + // start: t3356-1714663804_757284961_10998365_725145800_588037087_1999191_200821144_689020759_166049 + + // end: t3316-1714212736_757284961_10998365_725145800_588037087_1999191_200821144_689020759_166049 + // start: t3336-1714379051_757284961_10998365_725145800_588037087_1999191_200821144_689020759_166049 + } +} diff --git a/matrix_sdk_crypto/Cargo.toml b/matrix_sdk_crypto/Cargo.toml index 5767be0956a..4077fe7da5f 100644 --- a/matrix_sdk_crypto/Cargo.toml +++ b/matrix_sdk_crypto/Cargo.toml @@ -22,7 +22,7 @@ docs = ["sled_cryptostore"] [dependencies] matrix-sdk-common = { version = "0.2.0", path = "../matrix_sdk_common" } -olm-rs = { version = "1.0.0", features = ["serde"] } +olm-rs = { version = "1.0.1", features = ["serde"] } getrandom = "0.2.2" serde = { version = "1.0.122", features = ["derive", "rc"] } serde_json = "1.0.61"