Skip to content

Commit fdb0b01

Browse files
committed
sdk-ui: fix max concurrent requests for pinned events timeline.
`RequestConfig::max_concurrent_requests` won't work for requests to a same endpoint, since it's a global value that can only be passed to `HttpClient` when created. To replace it I added a buffered stream that can run at most `MAX_PINNED_EVENTS_CONCURRENT_REQUESTS` (10) requests concurrently. There is also a test to make sure this max concurrency mechanism works.
1 parent b8f24bd commit fdb0b01

File tree

3 files changed

+107
-32
lines changed

3 files changed

+107
-32
lines changed

crates/matrix-sdk-ui/src/timeline/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,7 @@ pub use self::{
101101
inner::default_event_filter,
102102
item::{TimelineItem, TimelineItemKind},
103103
pagination::LiveBackPaginationStatus,
104+
pinned_events_loader::MAX_PINNED_EVENTS_CONCURRENT_REQUESTS,
104105
polls::PollResult,
105106
traits::RoomExt,
106107
virtual_item::VirtualTimelineItem,

crates/matrix-sdk-ui/src/timeline/pinned_events_loader.rs

Lines changed: 26 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -12,9 +12,9 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15-
use std::{fmt::Formatter, num::NonZeroUsize, sync::Arc};
15+
use std::{fmt::Formatter, sync::Arc};
1616

17-
use futures_util::future::join_all;
17+
use futures_util::{stream, StreamExt};
1818
use matrix_sdk::{
1919
config::RequestConfig, event_cache::paginator::PaginatorError, Room, SendOutsideWasm,
2020
SyncOutsideWasm,
@@ -26,7 +26,7 @@ use tracing::{debug, warn};
2626

2727
use crate::timeline::event_handler::TimelineEventKind;
2828

29-
const MAX_CONCURRENT_REQUESTS: usize = 10;
29+
pub const MAX_PINNED_EVENTS_CONCURRENT_REQUESTS: usize = 10;
3030

3131
/// Utility to load the pinned events in a room.
3232
pub struct PinnedEventsLoader {
@@ -66,37 +66,33 @@ impl PinnedEventsLoader {
6666
return Ok(Vec::new());
6767
}
6868

69-
let request_config = Some(
70-
RequestConfig::default()
71-
.retry_limit(3)
72-
.max_concurrent_requests(NonZeroUsize::new(MAX_CONCURRENT_REQUESTS)),
73-
);
74-
75-
let new_events = join_all(pinned_event_ids.into_iter().map(|event_id| {
76-
let provider = self.room.clone();
77-
async move {
78-
match provider.load_event_with_relations(&event_id, request_config).await {
79-
Ok((event, related_events)) => {
80-
let mut events = vec![event];
81-
events.extend(related_events);
82-
Some(events)
83-
}
84-
Err(err) => {
85-
warn!("error when loading pinned event: {err}");
86-
None
69+
let request_config = Some(RequestConfig::default().retry_limit(3));
70+
71+
let mut loaded_events: Vec<SyncTimelineEvent> =
72+
stream::iter(pinned_event_ids.into_iter().map(|event_id| {
73+
let provider = self.room.clone();
74+
async move {
75+
match provider.load_event_with_relations(&event_id, request_config).await {
76+
Ok((event, related_events)) => {
77+
let mut events = vec![event];
78+
events.extend(related_events);
79+
Some(events)
80+
}
81+
Err(err) => {
82+
warn!("error when loading pinned event: {err}");
83+
None
84+
}
8785
}
8886
}
89-
}
90-
}))
91-
.await;
92-
93-
let mut loaded_events = new_events
94-
.into_iter()
87+
}))
88+
.buffer_unordered(MAX_PINNED_EVENTS_CONCURRENT_REQUESTS)
9589
// Get only the `Some<Vec<_>>` results
96-
.flatten()
90+
.flat_map(stream::iter)
9791
// Flatten the `Vec`s into a single one containing all their items
98-
.flatten()
99-
.collect::<Vec<SyncTimelineEvent>>();
92+
.flat_map(stream::iter)
93+
.collect()
94+
.await;
95+
10096
if loaded_events.is_empty() {
10197
return Err(PinnedEventsLoaderError::TimelineReloadFailed);
10298
}

crates/matrix-sdk-ui/tests/integration/timeline/pinned_event.rs

Lines changed: 80 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,9 @@ use matrix_sdk::{
1212
use matrix_sdk_base::deserialized_responses::TimelineEvent;
1313
use matrix_sdk_test::{async_test, JoinedRoomBuilder, StateTestEvent, SyncResponseBuilder, BOB};
1414
use matrix_sdk_ui::{
15-
timeline::{TimelineFocus, TimelineItemContent},
15+
timeline::{
16+
RoomExt, TimelineFocus, TimelineItemContent, MAX_PINNED_EVENTS_CONCURRENT_REQUESTS,
17+
},
1618
Timeline,
1719
};
1820
use ruma::{
@@ -21,7 +23,11 @@ use ruma::{
2123
};
2224
use serde_json::json;
2325
use stream_assert::assert_pending;
24-
use wiremock::MockServer;
26+
use tokio::time::sleep;
27+
use wiremock::{
28+
matchers::{header, method, path_regex},
29+
Mock, MockServer, ResponseTemplate,
30+
};
2531

2632
use crate::{mock_event, mock_sync};
2733

@@ -635,6 +641,78 @@ async fn test_edited_events_survive_pinned_event_ids_change() {
635641
assert_pending!(timeline_stream);
636642
}
637643

644+
#[async_test]
645+
async fn test_ensure_max_concurrency_is_observed() {
646+
let (client, server) = logged_in_client_with_server().await;
647+
let room_id = owned_room_id!("!a_room:example.org");
648+
649+
let pinned_event_ids: Vec<String> = (0..100).map(|idx| format!("${idx}")).collect();
650+
651+
let joined_room_builder = JoinedRoomBuilder::new(&room_id)
652+
// Set up encryption
653+
.add_state_event(StateTestEvent::Encryption)
654+
// Add 100 pinned events
655+
.add_state_event(StateTestEvent::Custom(json!(
656+
{
657+
"content": {
658+
"pinned": pinned_event_ids
659+
},
660+
"event_id": "$15139375513VdeRF:localhost",
661+
"origin_server_ts": 151393755,
662+
"sender": "@example:localhost",
663+
"state_key": "",
664+
"type": "m.room.pinned_events",
665+
"unsigned": {
666+
"age": 703422
667+
}
668+
}
669+
)));
670+
671+
// Amount of time to delay the response of an /event mock request, in ms.
672+
let request_delay = 50;
673+
let pinned_event =
674+
EventFactory::new().room(&room_id).sender(*BOB).text_msg("A message").into_raw_timeline();
675+
Mock::given(method("GET"))
676+
.and(path_regex(r"/_matrix/client/r0/rooms/.*/event/.*"))
677+
.and(header("authorization", "Bearer 1234"))
678+
.respond_with(
679+
ResponseTemplate::new(200)
680+
.set_delay(Duration::from_millis(request_delay))
681+
.set_body_json(pinned_event.json()),
682+
)
683+
// Verify this endpoint is only called the max concurrent amount of times.
684+
.expect(MAX_PINNED_EVENTS_CONCURRENT_REQUESTS as u64)
685+
.mount(&server)
686+
.await;
687+
688+
let mut sync_response_builder = SyncResponseBuilder::new();
689+
let sync_settings = SyncSettings::new().timeout(Duration::from_millis(3000));
690+
let json_response =
691+
sync_response_builder.add_joined_room(joined_room_builder).build_json_sync_response();
692+
mock_sync(&server, json_response, None).await;
693+
let _ = client.sync_once(sync_settings.clone()).await;
694+
695+
let room = client.get_room(&room_id).unwrap();
696+
697+
// Start loading the pinned event timeline asynchronously.
698+
let handle = tokio::spawn({
699+
let timeline_builder = room
700+
.timeline_builder()
701+
.with_focus(TimelineFocus::PinnedEvents { max_events_to_load: 100 });
702+
async {
703+
let _ = timeline_builder.build().await;
704+
}
705+
});
706+
707+
// Give it time to load events. As each request takes `request_delay`, we should
708+
// have exactly `MAX_PINNED_EVENTS_CONCURRENT_REQUESTS` if the max
709+
// concurrency setting is honoured.
710+
sleep(Duration::from_millis(request_delay / 2)).await;
711+
712+
// Abort handle to stop requests from being processed.
713+
handle.abort();
714+
}
715+
638716
struct TestHelper {
639717
pub client: Client,
640718
pub server: MockServer,

0 commit comments

Comments
 (0)