1
+ use tokio:: sync:: { Mutex , Semaphore } ;
2
+ use util:: sync:: RwLock ;
3
+
1
4
use std:: {
2
5
collections:: VecDeque ,
3
6
sync:: atomic:: { AtomicBool , AtomicUsize , Ordering } ,
4
7
} ;
5
8
6
- use tokio:: sync:: { Mutex , Semaphore } ;
7
- use util:: sync:: RwLock ;
8
-
9
9
use crate :: chunk:: chunk_payload_data:: ChunkPayloadData ;
10
10
11
+ // TODO: benchmark performance between multiple Atomic+Mutex vs one Mutex<PendingQueueInternal>
12
+
13
+ // Some tests push a lot of data before starting to process any data...
14
+ #[ cfg( test) ]
15
+ const QUEUE_BYTES_LIMIT : usize = 128 * 1024 * 1024 ;
16
+ /// Maximum size of the pending queue, in bytes.
17
+ #[ cfg( not( test) ) ]
18
+ const QUEUE_BYTES_LIMIT : usize = 128 * 1024 ;
19
+ /// Total user data size, beyound which the packet will be split into chunks. The chunks will be
20
+ /// added to the pending queue one by one.
21
+ const QUEUE_APPEND_LARGE : usize = ( QUEUE_BYTES_LIMIT * 2 ) / 3 ;
22
+
11
23
/// Basic queue for either ordered or unordered chunks.
12
24
pub ( crate ) type PendingBaseQueue = VecDeque < ChunkPayloadData > ;
13
25
14
- // TODO: benchmark performance between multiple Atomic+Mutex vs one Mutex<PendingQueueInternal>
15
-
16
26
/// A queue for both ordered and unordered chunks.
17
27
#[ derive( Debug ) ]
18
28
pub ( crate ) struct PendingQueue {
@@ -39,14 +49,6 @@ impl Default for PendingQueue {
39
49
}
40
50
}
41
51
42
- // Some tests push a lot of data before starting to process any data...
43
- #[ cfg( test) ]
44
- const QUEUE_BYTES_LIMIT : usize = 128 * 1024 * 1024 ;
45
- #[ cfg( not( test) ) ]
46
- const QUEUE_BYTES_LIMIT : usize = 128 * 1024 ;
47
-
48
- const QUEUE_APPEND_LARGE : usize = ( QUEUE_BYTES_LIMIT * 2 ) / 3 ;
49
-
50
52
impl PendingQueue {
51
53
pub ( crate ) fn new ( ) -> Self {
52
54
Self {
@@ -66,7 +68,7 @@ impl PendingQueue {
66
68
let user_data_len = c. user_data . len ( ) ;
67
69
68
70
{
69
- let sem_lock = self . semaphore_lock . lock ( ) . await ;
71
+ let _sem_lock = self . semaphore_lock . lock ( ) . await ;
70
72
let permits = self . semaphore . acquire_many ( user_data_len as u32 ) . await ;
71
73
// unwrap ok because we never close the semaphore unless we have dropped self
72
74
permits. unwrap ( ) . forget ( ) ;
@@ -78,7 +80,6 @@ impl PendingQueue {
78
80
let mut ordered_queue = self . ordered_queue . write ( ) ;
79
81
ordered_queue. push_back ( c) ;
80
82
}
81
- drop ( sem_lock) ;
82
83
}
83
84
84
85
self . n_bytes . fetch_add ( user_data_len, Ordering :: SeqCst ) ;
@@ -100,22 +101,21 @@ impl PendingQueue {
100
101
if total_user_data_len >= QUEUE_APPEND_LARGE {
101
102
self . append_large ( chunks) . await
102
103
} else {
103
- let sem_lock = self . semaphore_lock . lock ( ) . await ;
104
+ let _sem_lock = self . semaphore_lock . lock ( ) . await ;
104
105
let permits = self
105
106
. semaphore
106
107
. acquire_many ( total_user_data_len as u32 )
107
108
. await ;
108
109
// unwrap ok because we never close the semaphore unless we have dropped self
109
110
permits. unwrap ( ) . forget ( ) ;
110
111
self . append_unlimited ( chunks, total_user_data_len) ;
111
- drop ( sem_lock) ;
112
112
}
113
113
}
114
114
115
115
// If this is a very large message we append chunks one by one to allow progress while we are appending
116
116
async fn append_large ( & self , chunks : Vec < ChunkPayloadData > ) {
117
117
// lock this for the whole duration
118
- let sem_lock = self . semaphore_lock . lock ( ) . await ;
118
+ let _sem_lock = self . semaphore_lock . lock ( ) . await ;
119
119
120
120
for chunk in chunks. into_iter ( ) {
121
121
let user_data_len = chunk. user_data . len ( ) ;
@@ -133,8 +133,6 @@ impl PendingQueue {
133
133
self . n_bytes . fetch_add ( user_data_len, Ordering :: SeqCst ) ;
134
134
self . queue_len . fetch_add ( 1 , Ordering :: SeqCst ) ;
135
135
}
136
-
137
- drop ( sem_lock) ;
138
136
}
139
137
140
138
/// Assumes that A) enough permits have been acquired and forget from the semaphore and that the semaphore_lock is held
0 commit comments