Skip to content

Commit 54e2fd0

Browse files
committed
base: store timeline to SledStore
Note: This doesn't implement the timeline store for the MemoryStore
1 parent 93dc4e5 commit 54e2fd0

File tree

7 files changed

+1779
-51
lines changed

7 files changed

+1779
-51
lines changed

crates/matrix-sdk-base/src/client.rs

Lines changed: 76 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ use std::{
2727
use matrix_sdk_common::{
2828
deserialized_responses::{
2929
AmbiguityChanges, JoinedRoom, LeftRoom, MemberEvent, MembersResponse, Rooms,
30-
StrippedMemberEvent, SyncResponse, SyncRoomEvent, Timeline,
30+
StrippedMemberEvent, SyncResponse, SyncRoomEvent, Timeline, TimelineSlice,
3131
},
3232
instant::Instant,
3333
locks::RwLock,
@@ -50,7 +50,11 @@ use ruma::{
5050
DeviceId,
5151
};
5252
use ruma::{
53-
api::client::r0::{self as api, push::get_notifications::Notification},
53+
api::client::r0::{
54+
self as api,
55+
message::get_message_events::{Direction, Response as GetMessageEventsResponse},
56+
push::get_notifications::Notification,
57+
},
5458
events::{
5559
room::member::MembershipState, AnyGlobalAccountDataEvent, AnyRoomAccountDataEvent,
5660
AnyStrippedStateEvent, AnySyncEphemeralRoomEvent, AnySyncRoomEvent, AnySyncStateEvent,
@@ -773,6 +777,15 @@ impl BaseClient {
773777
let notification_count = new_info.unread_notifications.into();
774778
room_info.update_notification_count(notification_count);
775779

780+
changes.add_timeline(
781+
&room_id,
782+
TimelineSlice::new(
783+
timeline.events.iter().cloned().rev().collect(),
784+
next_batch.clone(),
785+
timeline.prev_batch.clone(),
786+
),
787+
);
788+
776789
new_rooms.join.insert(
777790
room_id,
778791
JoinedRoom::new(
@@ -816,6 +829,14 @@ impl BaseClient {
816829
self.handle_room_account_data(&room_id, &new_info.account_data.events, &mut changes)
817830
.await;
818831

832+
changes.add_timeline(
833+
&room_id,
834+
TimelineSlice::new(
835+
timeline.events.iter().cloned().rev().collect(),
836+
next_batch.clone(),
837+
timeline.prev_batch.clone(),
838+
),
839+
);
819840
changes.add_room(room_info);
820841
new_rooms
821842
.leave
@@ -892,6 +913,59 @@ impl BaseClient {
892913
}
893914
}
894915

916+
/// Receive a successful /messages response.
917+
///
918+
/// * `response` - The successful response from /messages.
919+
pub async fn receive_messages(
920+
&self,
921+
room_id: &RoomId,
922+
direction: &Direction,
923+
response: &GetMessageEventsResponse,
924+
) -> Result<Vec<SyncRoomEvent>> {
925+
let mut changes = StateChanges::default();
926+
927+
let mut events: Vec<SyncRoomEvent> = vec![];
928+
for event in &response.chunk {
929+
#[allow(unused_mut)]
930+
let mut event: SyncRoomEvent = event.clone().into();
931+
932+
#[cfg(feature = "encryption")]
933+
match event.event.deserialize() {
934+
Ok(AnySyncRoomEvent::Message(AnySyncMessageEvent::RoomEncrypted(encrypted))) => {
935+
if let Some(olm) = self.olm_machine().await {
936+
if let Ok(decrypted) = olm.decrypt_room_event(&encrypted, room_id).await {
937+
event = decrypted.into();
938+
}
939+
}
940+
}
941+
Ok(_) => {}
942+
Err(error) => {
943+
warn!("Error deserializing event {:?}", error);
944+
}
945+
}
946+
947+
events.push(event);
948+
}
949+
950+
let (chunk, start, end) = match direction {
951+
Direction::Backward => {
952+
(events.clone(), response.start.clone().unwrap(), response.end.clone())
953+
}
954+
Direction::Forward => (
955+
events.iter().rev().cloned().collect(),
956+
response.end.clone().unwrap(),
957+
response.start.clone(),
958+
),
959+
};
960+
961+
let timeline = TimelineSlice::new(chunk, start, end);
962+
changes.add_timeline(room_id, timeline);
963+
964+
self.store().save_changes(&changes).await?;
965+
966+
Ok(events)
967+
}
968+
895969
/// Receive a get member events response and convert it to a deserialized
896970
/// `MembersResponse`
897971
///

crates/matrix-sdk-base/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,4 +43,4 @@ pub use client::{BaseClient, BaseClientConfig};
4343
#[cfg(feature = "encryption")]
4444
pub use matrix_sdk_crypto as crypto;
4545
pub use rooms::{Room, RoomInfo, RoomMember, RoomType};
46-
pub use store::{StateChanges, StateStore, Store, StoreError};
46+
pub use store::{StateChanges, StateStore, Store, StoreError, StoredTimelineSlice};

crates/matrix-sdk-base/src/store/memory_store.rs

Lines changed: 36 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ use dashmap::{DashMap, DashSet};
2121
use lru::LruCache;
2222
use matrix_sdk_common::{async_trait, instant::Instant, locks::Mutex};
2323
use ruma::{
24+
api::client::r0::message::get_message_events::Direction,
2425
events::{
2526
presence::PresenceEvent,
2627
receipt::Receipt,
@@ -34,7 +35,7 @@ use ruma::{
3435
};
3536
use tracing::info;
3637

37-
use super::{Result, RoomInfo, StateChanges, StateStore};
38+
use super::{Result, RoomInfo, StateChanges, StateStore, StoredTimelineSlice};
3839
use crate::{
3940
deserialized_responses::{MemberEvent, StrippedMemberEvent},
4041
media::{MediaRequest, UniqueKey},
@@ -272,6 +273,8 @@ impl MemoryStore {
272273
}
273274
}
274275

276+
// TODO: implement writing timeline to the store.
277+
275278
info!("Saved changes in {:?}", now.elapsed());
276279

277280
Ok(())
@@ -438,6 +441,23 @@ impl MemoryStore {
438441

439442
Ok(())
440443
}
444+
445+
async fn get_timeline(
446+
&self,
447+
_room_id: &RoomId,
448+
_start: Option<&EventId>,
449+
_end: Option<&EventId>,
450+
_limit: Option<usize>,
451+
_direction: Direction,
452+
) -> Result<Option<StoredTimelineSlice>> {
453+
// TODO: implement reading from the store.
454+
Ok(None)
455+
}
456+
457+
async fn remove_timeline(&self, _room_id: Option<&RoomId>) -> Result<()> {
458+
// TODO: implement once writing the timeline to the store is implemented.
459+
Ok(())
460+
}
441461
}
442462

443463
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
@@ -584,6 +604,21 @@ impl StateStore for MemoryStore {
584604
async fn remove_media_content_for_uri(&self, uri: &MxcUri) -> Result<()> {
585605
self.remove_media_content_for_uri(uri).await
586606
}
607+
608+
async fn get_timeline(
609+
&self,
610+
room_id: &RoomId,
611+
start: Option<&EventId>,
612+
end: Option<&EventId>,
613+
limit: Option<usize>,
614+
direction: Direction,
615+
) -> Result<Option<StoredTimelineSlice>> {
616+
self.get_timeline(room_id, start, end, limit, direction).await
617+
}
618+
619+
async fn remove_timeline(&self, room_id: Option<&RoomId>) -> Result<()> {
620+
self.remove_timeline(room_id).await
621+
}
587622
}
588623

589624
#[cfg(test)]

crates/matrix-sdk-base/src/store/mod.rs

Lines changed: 61 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,9 @@ use std::{
2323
use dashmap::DashMap;
2424
use matrix_sdk_common::{async_trait, locks::RwLock, AsyncTraitDeps};
2525
use ruma::{
26-
api::client::r0::push::get_notifications::Notification,
26+
api::client::r0::{
27+
message::get_message_events::Direction, push::get_notifications::Notification,
28+
},
2729
events::{
2830
presence::PresenceEvent,
2931
receipt::{Receipt, ReceiptEventContent},
@@ -39,7 +41,7 @@ use ruma::{
3941
use sled::Db;
4042

4143
use crate::{
42-
deserialized_responses::{MemberEvent, StrippedMemberEvent},
44+
deserialized_responses::{MemberEvent, StrippedMemberEvent, SyncRoomEvent, TimelineSlice},
4345
media::MediaRequest,
4446
rooms::{RoomInfo, RoomType},
4547
Room, Session,
@@ -313,6 +315,37 @@ pub trait StateStore: AsyncTraitDeps {
313315
///
314316
/// * `uri` - The `MxcUri` of the media files.
315317
async fn remove_media_content_for_uri(&self, uri: &MxcUri) -> Result<()>;
318+
319+
/// Get a slice of the timeline of a room.
320+
///
321+
/// # Arguments
322+
///
323+
/// * `room_id` - The id of the room for which the timeline should be
324+
/// fetched.
325+
///
326+
/// * `start` - The start point from which events should be returned.
327+
///
328+
/// * `end` - The end point to which events should be returned.
329+
///
330+
/// * `limit` - The maximum number of events to return.
331+
///
332+
/// * `direction` - The direction events should be returned.
333+
async fn get_timeline(
334+
&self,
335+
room_id: &RoomId,
336+
start: Option<&EventId>,
337+
end: Option<&EventId>,
338+
limit: Option<usize>,
339+
direction: Direction,
340+
) -> Result<Option<StoredTimelineSlice>>;
341+
342+
/// Remove the stored timeline.
343+
///
344+
/// # Arguments
345+
///
346+
/// * `room_id` - The id of the room for which the timeline should be
347+
/// removed. If `None` the timeline for every stored room is removed.
348+
async fn remove_timeline(&self, room_id: Option<&RoomId>) -> Result<()>;
316349
}
317350

318351
/// A state store wrapper for the SDK.
@@ -486,6 +519,8 @@ pub struct StateChanges {
486519
pub ambiguity_maps: BTreeMap<Box<RoomId>, BTreeMap<String, BTreeSet<Box<UserId>>>>,
487520
/// A map of `RoomId` to a vector of `Notification`s
488521
pub notifications: BTreeMap<Box<RoomId>, Vec<Notification>>,
522+
/// A mapping of `RoomId` to a `TimelineSlice`
523+
pub timeline: BTreeMap<Box<RoomId>, TimelineSlice>,
489524
}
490525

491526
impl StateChanges {
@@ -570,4 +605,28 @@ impl StateChanges {
570605
pub fn add_receipts(&mut self, room_id: &RoomId, event: ReceiptEventContent) {
571606
self.receipts.insert(room_id.to_owned(), event);
572607
}
608+
609+
/// Update the `StateChanges` struct with the given room with a new
610+
/// `TimelineSlice`.
611+
pub fn add_timeline(&mut self, room_id: &RoomId, timeline: TimelineSlice) {
612+
self.timeline.insert(room_id.to_owned(), timeline);
613+
}
614+
}
615+
616+
/// A slice of the timeline obtained from the store.
617+
#[derive(Debug, Default)]
618+
pub struct StoredTimelineSlice {
619+
/// A start token to fetch more events if the requested slice isn't fully
620+
/// known.
621+
pub token: Option<String>,
622+
623+
/// The requested events
624+
pub events: Vec<SyncRoomEvent>,
625+
}
626+
627+
#[cfg(feature = "sled_state_store")]
628+
impl StoredTimelineSlice {
629+
pub(crate) fn new(events: Vec<SyncRoomEvent>, token: Option<String>) -> Self {
630+
Self { token, events }
631+
}
573632
}

0 commit comments

Comments
 (0)