Skip to content

Commit 2e22282

Browse files
committed
sdk-ui: fix max concurrent requests for pinned events timeline.
`RequestConfig::max_concurrent_requests` won't work for requests to a same endpoint, since it's a global value that can only be passed to `HttpClient` when created. To replace it I added a buffered stream that can run at most `MAX_PINNED_EVENTS_CONCURRENT_REQUESTS` (10) requests concurrently. There is also a test to make sure this max concurrency mechanism works.
1 parent c86c842 commit 2e22282

File tree

3 files changed

+107
-32
lines changed

3 files changed

+107
-32
lines changed

crates/matrix-sdk-ui/src/timeline/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,7 @@ pub use self::{
101101
inner::default_event_filter,
102102
item::{TimelineItem, TimelineItemKind},
103103
pagination::LiveBackPaginationStatus,
104+
pinned_events_loader::MAX_PINNED_EVENTS_CONCURRENT_REQUESTS,
104105
polls::PollResult,
105106
traits::RoomExt,
106107
virtual_item::VirtualTimelineItem,

crates/matrix-sdk-ui/src/timeline/pinned_events_loader.rs

Lines changed: 26 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -12,9 +12,9 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15-
use std::{fmt::Formatter, num::NonZeroUsize, sync::Arc};
15+
use std::{fmt::Formatter, sync::Arc};
1616

17-
use futures_util::future::join_all;
17+
use futures_util::{stream, StreamExt};
1818
use matrix_sdk::{
1919
config::RequestConfig, event_cache::paginator::PaginatorError, Room, SendOutsideWasm,
2020
SyncOutsideWasm,
@@ -26,7 +26,7 @@ use tracing::{debug, warn};
2626

2727
use crate::timeline::event_handler::TimelineEventKind;
2828

29-
const MAX_CONCURRENT_REQUESTS: usize = 10;
29+
pub const MAX_PINNED_EVENTS_CONCURRENT_REQUESTS: usize = 10;
3030

3131
/// Utility to load the pinned events in a room.
3232
pub struct PinnedEventsLoader {
@@ -66,37 +66,33 @@ impl PinnedEventsLoader {
6666
return Ok(Vec::new());
6767
}
6868

69-
let request_config = Some(
70-
RequestConfig::default()
71-
.retry_limit(3)
72-
.max_concurrent_requests(NonZeroUsize::new(MAX_CONCURRENT_REQUESTS)),
73-
);
74-
75-
let new_events = join_all(pinned_event_ids.into_iter().map(|event_id| {
76-
let provider = self.room.clone();
77-
async move {
78-
match provider.load_event_with_relations(&event_id, request_config).await {
79-
Ok((event, related_events)) => {
80-
let mut events = vec![event];
81-
events.extend(related_events);
82-
Some(events)
83-
}
84-
Err(err) => {
85-
warn!("error when loading pinned event: {err}");
86-
None
69+
let request_config = Some(RequestConfig::default().retry_limit(3));
70+
71+
let mut loaded_events: Vec<SyncTimelineEvent> =
72+
stream::iter(pinned_event_ids.into_iter().map(|event_id| {
73+
let provider = self.room.clone();
74+
async move {
75+
match provider.load_event_with_relations(&event_id, request_config).await {
76+
Ok((event, related_events)) => {
77+
let mut events = vec![event];
78+
events.extend(related_events);
79+
Some(events)
80+
}
81+
Err(err) => {
82+
warn!("error when loading pinned event: {err}");
83+
None
84+
}
8785
}
8886
}
89-
}
90-
}))
91-
.await;
92-
93-
let mut loaded_events = new_events
94-
.into_iter()
87+
}))
88+
.buffer_unordered(MAX_PINNED_EVENTS_CONCURRENT_REQUESTS)
9589
// Get only the `Some<Vec<_>>` results
96-
.flatten()
90+
.flat_map(stream::iter)
9791
// Flatten the `Vec`s into a single one containing all their items
98-
.flatten()
99-
.collect::<Vec<SyncTimelineEvent>>();
92+
.flat_map(stream::iter)
93+
.collect()
94+
.await;
95+
10096
if loaded_events.is_empty() {
10197
return Err(PinnedEventsLoaderError::TimelineReloadFailed);
10298
}

crates/matrix-sdk-ui/tests/integration/timeline/pinned_event.rs

Lines changed: 80 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,13 +12,19 @@ use matrix_sdk::{
1212
use matrix_sdk_base::deserialized_responses::TimelineEvent;
1313
use matrix_sdk_test::{async_test, JoinedRoomBuilder, StateTestEvent, SyncResponseBuilder, BOB};
1414
use matrix_sdk_ui::{
15-
timeline::{TimelineFocus, TimelineItemContent},
15+
timeline::{
16+
RoomExt, TimelineFocus, TimelineItemContent, MAX_PINNED_EVENTS_CONCURRENT_REQUESTS,
17+
},
1618
Timeline,
1719
};
1820
use ruma::{event_id, owned_room_id, MilliSecondsSinceUnixEpoch, OwnedRoomId};
1921
use serde_json::json;
2022
use stream_assert::assert_pending;
21-
use wiremock::MockServer;
23+
use tokio::time::sleep;
24+
use wiremock::{
25+
matchers::{header, method, path_regex},
26+
Mock, MockServer, ResponseTemplate,
27+
};
2228

2329
use crate::{mock_event, mock_sync};
2430

@@ -624,6 +630,78 @@ async fn test_edited_events_survive_pinned_event_ids_change() {
624630
assert_pending!(timeline_stream);
625631
}
626632

633+
#[async_test]
634+
async fn test_ensure_max_concurrency_is_observed() {
635+
let (client, server) = logged_in_client_with_server().await;
636+
let room_id = owned_room_id!("!a_room:example.org");
637+
638+
let pinned_event_ids: Vec<String> = (0..100).map(|idx| format!("${idx}")).collect();
639+
640+
let joined_room_builder = JoinedRoomBuilder::new(&room_id)
641+
// Set up encryption
642+
.add_state_event(StateTestEvent::Encryption)
643+
// Add 100 pinned events
644+
.add_state_event(StateTestEvent::Custom(json!(
645+
{
646+
"content": {
647+
"pinned": pinned_event_ids
648+
},
649+
"event_id": "$15139375513VdeRF:localhost",
650+
"origin_server_ts": 151393755,
651+
"sender": "@example:localhost",
652+
"state_key": "",
653+
"type": "m.room.pinned_events",
654+
"unsigned": {
655+
"age": 703422
656+
}
657+
}
658+
)));
659+
660+
// Amount of time to delay the response of an /event mock request, in ms.
661+
let request_delay = 50;
662+
let pinned_event =
663+
EventFactory::new().room(&room_id).sender(*BOB).text_msg("A message").into_raw_timeline();
664+
Mock::given(method("GET"))
665+
.and(path_regex(r"/_matrix/client/r0/rooms/.*/event/.*"))
666+
.and(header("authorization", "Bearer 1234"))
667+
.respond_with(
668+
ResponseTemplate::new(200)
669+
.set_delay(Duration::from_millis(request_delay))
670+
.set_body_json(pinned_event.json()),
671+
)
672+
// Verify this endpoint is only called the max concurrent amount of times.
673+
.expect(MAX_PINNED_EVENTS_CONCURRENT_REQUESTS as u64)
674+
.mount(&server)
675+
.await;
676+
677+
let mut sync_response_builder = SyncResponseBuilder::new();
678+
let sync_settings = SyncSettings::new().timeout(Duration::from_millis(3000));
679+
let json_response =
680+
sync_response_builder.add_joined_room(joined_room_builder).build_json_sync_response();
681+
mock_sync(&server, json_response, None).await;
682+
let _ = client.sync_once(sync_settings.clone()).await;
683+
684+
let room = client.get_room(&room_id).unwrap();
685+
686+
// Start loading the pinned event timeline asynchronously.
687+
let handle = tokio::spawn({
688+
let timeline_builder = room
689+
.timeline_builder()
690+
.with_focus(TimelineFocus::PinnedEvents { max_events_to_load: 100 });
691+
async {
692+
let _ = timeline_builder.build().await;
693+
}
694+
});
695+
696+
// Give it time to load events. As each request takes `request_delay`, we should
697+
// have exactly `MAX_PINNED_EVENTS_CONCURRENT_REQUESTS` if the max
698+
// concurrency setting is honoured.
699+
sleep(Duration::from_millis(request_delay)).await;
700+
701+
// Abort handle to stop requests from being processed.
702+
handle.abort();
703+
}
704+
627705
struct TestHelper {
628706
pub client: Client,
629707
pub server: MockServer,

0 commit comments

Comments
 (0)