12
12
// See the License for the specific language governing permissions and
13
13
// limitations under the License.
14
14
15
- use std:: { fmt:: Formatter , num :: NonZeroUsize , sync:: Arc } ;
15
+ use std:: { fmt:: Formatter , sync:: Arc } ;
16
16
17
- use futures_util:: { future :: join_all , FutureExt as _} ;
17
+ use futures_util:: { stream , FutureExt as _, StreamExt } ;
18
18
use matrix_sdk:: {
19
19
config:: RequestConfig , event_cache:: paginator:: PaginatorError , BoxFuture , Room ,
20
20
SendOutsideWasm , SyncOutsideWasm ,
@@ -24,8 +24,6 @@ use ruma::{EventId, MilliSecondsSinceUnixEpoch, OwnedEventId};
24
24
use thiserror:: Error ;
25
25
use tracing:: { debug, warn} ;
26
26
27
- const MAX_CONCURRENT_REQUESTS : usize = 10 ;
28
-
29
27
/// Utility to load the pinned events in a room.
30
28
pub struct PinnedEventsLoader {
31
29
/// Backend to load pinned events.
@@ -34,12 +32,21 @@ pub struct PinnedEventsLoader {
34
32
/// Maximum number of pinned events to load (either from network or the
35
33
/// cache).
36
34
max_events_to_load : usize ,
35
+
36
+ /// Number of requests to load pinned events that can run concurrently. This
37
+ /// is used to avoid overwhelming a home server with dozens or hundreds
38
+ /// of concurrent requests.
39
+ max_concurrent_requests : usize ,
37
40
}
38
41
39
42
impl PinnedEventsLoader {
40
43
/// Creates a new `PinnedEventsLoader` instance.
41
- pub fn new ( room : Arc < dyn PinnedEventsRoom > , max_events_to_load : usize ) -> Self {
42
- Self { room, max_events_to_load }
44
+ pub fn new (
45
+ room : Arc < dyn PinnedEventsRoom > ,
46
+ max_events_to_load : usize ,
47
+ max_concurrent_requests : usize ,
48
+ ) -> Self {
49
+ Self { room, max_events_to_load, max_concurrent_requests }
43
50
}
44
51
45
52
/// Loads the pinned events in this room, using the cache first and then
@@ -64,37 +71,33 @@ impl PinnedEventsLoader {
64
71
return Ok ( Vec :: new ( ) ) ;
65
72
}
66
73
67
- let request_config = Some (
68
- RequestConfig :: default ( )
69
- . retry_limit ( 3 )
70
- . max_concurrent_requests ( NonZeroUsize :: new ( MAX_CONCURRENT_REQUESTS ) ) ,
71
- ) ;
72
-
73
- let new_events = join_all ( pinned_event_ids. into_iter ( ) . map ( |event_id| {
74
- let provider = self . room . clone ( ) ;
75
- async move {
76
- match provider. load_event_with_relations ( & event_id, request_config) . await {
77
- Ok ( ( event, related_events) ) => {
78
- let mut events = vec ! [ event] ;
79
- events. extend ( related_events) ;
80
- Some ( events)
81
- }
82
- Err ( err) => {
83
- warn ! ( "error when loading pinned event: {err}" ) ;
84
- None
74
+ let request_config = Some ( RequestConfig :: default ( ) . retry_limit ( 3 ) ) ;
75
+
76
+ let mut loaded_events: Vec < SyncTimelineEvent > =
77
+ stream:: iter ( pinned_event_ids. into_iter ( ) . map ( |event_id| {
78
+ let provider = self . room . clone ( ) ;
79
+ async move {
80
+ match provider. load_event_with_relations ( & event_id, request_config) . await {
81
+ Ok ( ( event, related_events) ) => {
82
+ let mut events = vec ! [ event] ;
83
+ events. extend ( related_events) ;
84
+ Some ( events)
85
+ }
86
+ Err ( err) => {
87
+ warn ! ( "error when loading pinned event: {err}" ) ;
88
+ None
89
+ }
85
90
}
86
91
}
87
- }
88
- } ) )
89
- . await ;
90
-
91
- let mut loaded_events = new_events
92
- . into_iter ( )
92
+ } ) )
93
+ . buffer_unordered ( self . max_concurrent_requests )
93
94
// Get only the `Some<Vec<_>>` results
94
- . flatten ( )
95
+ . flat_map ( stream :: iter )
95
96
// Flatten the `Vec`s into a single one containing all their items
96
- . flatten ( )
97
- . collect :: < Vec < SyncTimelineEvent > > ( ) ;
97
+ . flat_map ( stream:: iter)
98
+ . collect ( )
99
+ . await ;
100
+
98
101
if loaded_events. is_empty ( ) {
99
102
return Err ( PinnedEventsLoaderError :: TimelineReloadFailed ) ;
100
103
}
0 commit comments