1
1
use std:: {
2
2
sync:: {
3
3
atomic:: { AtomicBool , Ordering } ,
4
- Arc , Mutex ,
4
+ Arc ,
5
5
} ,
6
6
thread,
7
7
time:: { Duration , Instant } ,
8
8
} ;
9
9
10
10
use avalanche_types:: rpcchainvm;
11
11
use chan:: chan_select;
12
- use tokio:: sync:: { RwLock } ;
13
12
use crossbeam_channel:: TryRecvError ;
14
-
13
+ use tokio :: sync :: RwLock ;
15
14
16
15
use crate :: vm;
17
16
17
+ // TODO: make configurable
18
+ const GOSSIP_INTERVAL : Duration = Duration :: from_secs ( 1 ) ;
19
+ const REGOSSIP_INTERVAL : Duration = Duration :: from_secs ( 30 ) ;
20
+
18
21
pub trait Builder {
19
22
fn build ( & self ) ;
20
23
fn gossip ( & self ) ;
@@ -28,7 +31,7 @@ pub struct Timed {
28
31
/// [DontBuild] indicates there's no need to build a block.
29
32
/// [MayBuild] indicates the Vm should proceed to build a block.
30
33
/// [Building] indicates the Vm has sent a request to the engine to build a block.
31
- pub status : Arc < Mutex < Status > > ,
34
+ pub status : Arc < RwLock < Status > > ,
32
35
33
36
pub build_block_timer : Timer ,
34
37
@@ -54,10 +57,10 @@ pub enum Status {
54
57
}
55
58
56
59
pub struct Timer {
57
- // Timeout Tx channel is used to reset ticker threads.
60
+ /// Timeout Tx channel is used to reset ticker threads.
58
61
timeout_tx : crossbeam_channel:: Sender < ( ) > ,
59
62
60
- // Timeout Rx channel listens.
63
+ /// Timeout Rx channel listens.
61
64
timeout_rx : crossbeam_channel:: Receiver < ( ) > ,
62
65
63
66
/// New timer creation stops when true.
@@ -70,14 +73,14 @@ pub struct Timer {
70
73
duration : Arc < RwLock < Duration > > ,
71
74
}
72
75
73
- // Directs the engine when to build blocks and gossip transactions.
76
+ /// Directs the engine when to build blocks and gossip transactions.
74
77
impl Timed {
75
78
/// Sets the initial timeout on the two stage timer if the process
76
79
/// has not already begun from an earlier notification. If [buildStatus] is anything
77
80
/// other than [DontBuild], then the attempt has already begun and this notification
78
81
/// can be safely skipped.
79
82
async fn signal_txs_ready ( & mut self ) {
80
- if * self . status . lock ( ) . unwrap ( ) == Status :: DontBuild {
83
+ if * self . status . read ( ) . await == Status :: DontBuild {
81
84
return ;
82
85
}
83
86
@@ -99,7 +102,7 @@ impl Timed {
99
102
// release lock
100
103
drop ( vm) ;
101
104
102
- let mut status = self . status . lock ( ) . unwrap ( ) ;
105
+ let mut status = self . status . write ( ) . await ;
103
106
* status = Status :: Building ;
104
107
return ;
105
108
}
@@ -108,7 +111,7 @@ impl Timed {
108
111
// [HandleGenerateBlock] invocation could lead to quiescence, building a block with
109
112
// some delay, or attempting to build another block immediately
110
113
pub async fn handle_generate_block ( & mut self ) {
111
- let mut status = self . status . lock ( ) . unwrap ( ) ;
114
+ let mut status = self . status . write ( ) . await ;
112
115
113
116
if self . need_to_build ( ) . await {
114
117
* status = Status :: MayBuild ;
@@ -118,7 +121,7 @@ impl Timed {
118
121
}
119
122
}
120
123
121
- // needToBuild returns true if there are outstanding transactions to be issued
124
+ // Returns true if there are outstanding transactions to be issued
122
125
// into a block.
123
126
async fn need_to_build ( & self ) -> bool {
124
127
let mempool = self . vm . mempool . read ( ) . await ;
@@ -128,7 +131,7 @@ impl Timed {
128
131
/// Parses the block current status and
129
132
pub async fn build_block_parse_status ( & mut self ) {
130
133
let mut mark_building = false ;
131
- match & * self . status . lock ( ) . unwrap ( ) {
134
+ match & * self . status . read ( ) . await {
132
135
Status :: DontBuild => {
133
136
// no op
134
137
}
@@ -223,14 +226,14 @@ impl Timed {
223
226
ticker_duration = * duration;
224
227
}
225
228
reset. store( true , Ordering :: Relaxed ) ;
226
- log:: debug!( "timeout\n " ) ;
229
+ log:: debug!( "timeout" ) ;
227
230
break
228
231
}
229
232
230
233
// ticker
231
234
recv( ticker_rx) -> _ => {
232
235
cleared. store( true , Ordering :: Relaxed ) ;
233
- log:: debug!( "tick\n " ) ;
236
+ log:: debug!( "tick" ) ;
234
237
break
235
238
}
236
239
}
@@ -242,10 +245,12 @@ impl Timed {
242
245
/// considered for the next block.
243
246
pub async fn build ( & mut self ) {
244
247
log:: debug!( "starting build loops" ) ;
248
+
245
249
self . signal_txs_ready ( ) . await ;
246
250
let mempool = self . vm . mempool . read ( ) . await ;
247
251
let mempool_pending_ch = mempool. subscribe_pending ( ) ;
248
252
drop ( mempool) ;
253
+
249
254
let stop_ch = self . stop . clone ( ) ;
250
255
let builder_stop_ch = self . builder_stop . clone ( ) ;
251
256
@@ -274,8 +279,8 @@ impl Timed {
274
279
pub async fn gossip ( & self ) {
275
280
log:: debug!( "starting gossip loops" ) ;
276
281
277
- let gossip = chan:: tick ( Duration :: from_millis ( 100 ) ) ;
278
- let regossip = chan:: tick ( Duration :: from_millis ( 100 ) ) ;
282
+ let gossip = chan:: tick ( GOSSIP_INTERVAL ) ;
283
+ let regossip = chan:: tick ( REGOSSIP_INTERVAL ) ;
279
284
let stop_ch = self . stop . clone ( ) ;
280
285
281
286
while stop_ch. try_recv ( ) == Err ( TryRecvError :: Empty ) {
0 commit comments