File tree 7 files changed +14
-2
lines changed
chain/ethereum/src/network_indexer 7 files changed +14
-2
lines changed Original file line number Diff line number Diff line change @@ -1174,7 +1174,8 @@ impl NetworkIndexer {
1174
1174
start_block,
1175
1175
} ) ;
1176
1176
1177
- // Launch state machine
1177
+ // Launch state machine.
1178
+ // Blocking due to store interactions. Won't be blocking after #905.
1178
1179
graph:: spawn_blocking (
1179
1180
state_machine
1180
1181
. map_err ( move |e| {
Original file line number Diff line number Diff line change @@ -236,6 +236,7 @@ impl SubgraphInstanceManager {
236
236
// Subgraph instance shutdown senders
237
237
let instances: SharedInstanceKeepAliveMap = Default :: default ( ) ;
238
238
239
+ // Blocking due to store interactions. Won't be blocking after #905.
239
240
graph:: spawn_blocking ( receiver. compat ( ) . try_for_each ( move |event| {
240
241
use self :: SubgraphAssignmentProviderEvent :: * ;
241
242
Original file line number Diff line number Diff line change @@ -111,7 +111,8 @@ where
111
111
112
112
// Deploy named subgraphs found in store
113
113
self . start_assigned_subgraphs ( ) . and_then ( move |( ) | {
114
- // Spawn a task to handle assignment events
114
+ // Spawn a task to handle assignment events.
115
+ // Blocking due to store interactions. Won't be blocking after #905.
115
116
graph:: spawn_blocking (
116
117
assignment_event_stream
117
118
. map_err ( SubgraphAssignmentProviderError :: Unknown )
@@ -255,6 +256,8 @@ where
255
256
let sender = sender. clone ( ) ;
256
257
let provider = provider. clone ( ) ;
257
258
let logger = logger. clone ( ) ;
259
+
260
+ // Blocking due to store interactions. Won't be blocking after #905.
258
261
graph:: spawn_blocking (
259
262
start_subgraph ( id, & * provider, logger)
260
263
. map ( move |( ) | drop ( sender) )
Original file line number Diff line number Diff line change @@ -14,16 +14,19 @@ fn abort_on_panic<T: Send + 'static>(
14
14
} )
15
15
}
16
16
17
+ /// Aborts on panic.
17
18
pub fn spawn < T : Send + ' static > ( f : impl Future03 < Output = T > + Send + ' static ) -> JoinHandle < T > {
18
19
tokio:: spawn ( abort_on_panic ( f) )
19
20
}
20
21
22
+ /// Aborts on panic.
21
23
pub fn spawn_blocking < T : Send + ' static > (
22
24
f : impl Future03 < Output = T > + Send + ' static ,
23
25
) -> JoinHandle < T > {
24
26
tokio:: task:: spawn_blocking ( move || block_on ( abort_on_panic ( f) ) )
25
27
}
26
28
29
+ /// Panics result in an `Err` in `JoinHandle`.
27
30
pub fn spawn_blocking_allow_panic < T : Send + ' static > (
28
31
f : impl Future03 < Output = T > + Send + ' static ,
29
32
) -> JoinHandle < T > {
Original file line number Diff line number Diff line change @@ -680,6 +680,7 @@ fn test_string_to_h160_with_0x() {
680
680
fn block_on < I : Send + ' static , ER : Send + ' static > (
681
681
future : impl Future < Item = I , Error = ER > + Send + ' static ,
682
682
) -> Result < I , ER > {
683
+ // We don't know if the task is blocking or not, but use `blocking` to be cautious.
683
684
graph:: spawn_blocking_allow_panic ( future. compat ( ) )
684
685
. compat ( )
685
686
. wait ( )
Original file line number Diff line number Diff line change @@ -214,6 +214,7 @@ where
214
214
mpsc:: channel :: < Box < dyn std:: future:: Future < Output = ( ) > + Send + Unpin > > ( 100 ) ;
215
215
graph:: spawn ( task_receiver. for_each ( |f| {
216
216
async {
217
+ // Blocking due to store interactions. Won't be blocking after #905.
217
218
graph:: spawn_blocking ( f) ;
218
219
}
219
220
} ) ) ;
Original file line number Diff line number Diff line change @@ -190,6 +190,8 @@ where
190
190
ws_stream,
191
191
graphql_runner. clone ( ) ,
192
192
) ;
193
+
194
+ // Blocking due to store interactions. Won't be blocking after #905.
193
195
graph:: spawn_blocking_allow_panic ( service. into_future ( ) . compat ( ) ) ;
194
196
}
195
197
Err ( e) => {
You can’t perform that action at this time.
0 commit comments