Skip to content

Commit ba2aeef

Browse files
authored
Merge pull request #2571 from blockstack/feat/events-for-microblocks
Feat: Added event for new microblocks
2 parents f6697f3 + 5b70def commit ba2aeef

File tree

6 files changed

+427
-73
lines changed

6 files changed

+427
-73
lines changed

src/chainstate/stacks/db/unconfirmed.rs

Lines changed: 40 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,24 @@ use crate::types::chainstate::{StacksBlockHeader, StacksBlockId, StacksMicrobloc
4141

4242
pub type UnconfirmedTxMap = HashMap<Txid, (StacksTransaction, BlockHeaderHash, u16)>;
4343

44+
pub struct ProcessedUnconfirmedState {
45+
pub total_burns: u128,
46+
pub total_fees: u128,
47+
// each element of this vector is a tuple, where each tuple contains a microblock
48+
// sequence number, and a vector of transaction receipts for that microblock
49+
pub receipts: Vec<(u16, Vec<StacksTransactionReceipt>)>,
50+
}
51+
52+
impl Default for ProcessedUnconfirmedState {
53+
fn default() -> Self {
54+
ProcessedUnconfirmedState {
55+
total_burns: 0,
56+
total_fees: 0,
57+
receipts: vec![],
58+
}
59+
}
60+
}
61+
4462
pub struct UnconfirmedState {
4563
pub confirmed_chain_tip: StacksBlockId,
4664
pub unconfirmed_chain_tip: StacksBlockId,
@@ -136,10 +154,10 @@ impl UnconfirmedState {
136154
chainstate: &StacksChainState,
137155
burn_dbconn: &dyn BurnStateDB,
138156
mblocks: Vec<StacksMicroblock>,
139-
) -> Result<(u128, u128, Vec<StacksTransactionReceipt>), Error> {
157+
) -> Result<ProcessedUnconfirmedState, Error> {
140158
if self.last_mblock_seq == u16::max_value() {
141159
// drop them -- nothing to do
142-
return Ok((0, 0, vec![]));
160+
return Ok(Default::default());
143161
}
144162

145163
debug!(
@@ -193,7 +211,7 @@ impl UnconfirmedState {
193211
&mblock_hash, mblock.header.sequence
194212
);
195213

196-
let (stx_fees, stx_burns, mut receipts) =
214+
let (stx_fees, stx_burns, receipts) =
197215
match StacksChainState::process_microblocks_transactions(
198216
&mut clarity_tx,
199217
&vec![mblock.clone()],
@@ -211,7 +229,7 @@ impl UnconfirmedState {
211229
total_fees += stx_fees;
212230
total_burns += stx_burns;
213231
num_new_mblocks += 1;
214-
all_receipts.append(&mut receipts);
232+
all_receipts.push((seq, receipts));
215233

216234
last_mblock = Some(mblock_header);
217235
last_mblock_seq = seq;
@@ -255,7 +273,11 @@ impl UnconfirmedState {
255273
self.bytes_so_far = 0;
256274
}
257275

258-
Ok((total_fees, total_burns, all_receipts))
276+
Ok(ProcessedUnconfirmedState {
277+
total_fees,
278+
total_burns,
279+
receipts: all_receipts,
280+
})
259281
}
260282

261283
/// Load up the Stacks microblock stream to process, composed of only the new microblocks
@@ -280,24 +302,25 @@ impl UnconfirmedState {
280302
}
281303

282304
/// Update the view of the current confiremd chain tip's unconfirmed microblock state
305+
/// Returns ProcessedUnconfirmedState for the microblocks newly added to the unconfirmed state
283306
pub fn refresh(
284307
&mut self,
285308
chainstate: &StacksChainState,
286309
burn_dbconn: &dyn BurnStateDB,
287-
) -> Result<(u128, u128, Vec<StacksTransactionReceipt>), Error> {
310+
) -> Result<ProcessedUnconfirmedState, Error> {
288311
assert!(
289312
!self.readonly,
290313
"BUG: code tried to write unconfirmed state to a read-only instance"
291314
);
292315

293316
if self.last_mblock_seq == u16::max_value() {
294317
// no-op
295-
return Ok((0, 0, vec![]));
318+
return Ok(Default::default());
296319
}
297320

298321
match self.load_child_microblocks(chainstate)? {
299322
Some(microblocks) => self.append_microblocks(chainstate, burn_dbconn, microblocks),
300-
None => Ok((0, 0, vec![])),
323+
None => Ok(Default::default()),
301324
}
302325
}
303326

@@ -375,15 +398,15 @@ impl StacksChainState {
375398
&self,
376399
burn_dbconn: &dyn BurnStateDB,
377400
anchored_block_id: StacksBlockId,
378-
) -> Result<(UnconfirmedState, u128, u128, Vec<StacksTransactionReceipt>), Error> {
401+
) -> Result<(UnconfirmedState, ProcessedUnconfirmedState), Error> {
379402
debug!("Make new unconfirmed state off of {}", &anchored_block_id);
380403
let mut unconfirmed_state = UnconfirmedState::new(self, anchored_block_id)?;
381-
let (fees, burns, receipts) = unconfirmed_state.refresh(self, burn_dbconn)?;
404+
let processed_unconfirmed_state = unconfirmed_state.refresh(self, burn_dbconn)?;
382405
debug!(
383406
"Made new unconfirmed state off of {} (at {})",
384407
&anchored_block_id, &unconfirmed_state.unconfirmed_chain_tip
385408
);
386-
Ok((unconfirmed_state, fees, burns, receipts))
409+
Ok((unconfirmed_state, processed_unconfirmed_state))
387410
}
388411

389412
/// Reload the unconfirmed view from a new chain tip.
@@ -395,7 +418,7 @@ impl StacksChainState {
395418
&mut self,
396419
burn_dbconn: &dyn BurnStateDB,
397420
canonical_tip: StacksBlockId,
398-
) -> Result<(u128, u128, Vec<StacksTransactionReceipt>), Error> {
421+
) -> Result<ProcessedUnconfirmedState, Error> {
399422
debug!("Reload unconfirmed state off of {}", &canonical_tip);
400423

401424
let unconfirmed_state = self.unconfirmed_state.take();
@@ -427,7 +450,7 @@ impl StacksChainState {
427450
self.drop_unconfirmed_state(unconfirmed_state);
428451
}
429452

430-
let (new_unconfirmed_state, fees, burns, receipts) =
453+
let (new_unconfirmed_state, processed_unconfirmed_state) =
431454
self.make_unconfirmed_state(burn_dbconn, canonical_tip)?;
432455

433456
debug!(
@@ -436,19 +459,19 @@ impl StacksChainState {
436459
);
437460

438461
self.unconfirmed_state = Some(new_unconfirmed_state);
439-
Ok((fees, burns, receipts))
462+
Ok(processed_unconfirmed_state)
440463
}
441464

442465
/// Refresh the current unconfirmed chain state
443466
pub fn refresh_unconfirmed_state(
444467
&mut self,
445468
burn_dbconn: &dyn BurnStateDB,
446-
) -> Result<(u128, u128, Vec<StacksTransactionReceipt>), Error> {
469+
) -> Result<ProcessedUnconfirmedState, Error> {
447470
let mut unconfirmed_state = self.unconfirmed_state.take();
448471
let res = if let Some(ref mut unconfirmed_state) = unconfirmed_state {
449472
if !unconfirmed_state.is_readable() {
450473
warn!("Unconfirmed state is not readable; it will soon be refreshed");
451-
return Ok((0, 0, vec![]));
474+
return Ok(Default::default());
452475
}
453476

454477
debug!(
@@ -465,7 +488,7 @@ impl StacksChainState {
465488
res
466489
} else {
467490
warn!("No unconfirmed state instantiated");
468-
Ok((0, 0, vec![]))
491+
Ok(Default::default())
469492
};
470493
self.unconfirmed_state = unconfirmed_state;
471494
res

src/net/relay.rs

Lines changed: 29 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ use vm::costs::ExecutionCost;
5353

5454
use crate::chainstate::coordinator::BlockEventDispatcher;
5555
use crate::types::chainstate::{PoxId, SortitionId};
56+
use chainstate::stacks::db::unconfirmed::ProcessedUnconfirmedState;
5657
use types::chainstate::BurnchainHeaderHash;
5758

5859
pub type BlocksAvailableMap = HashMap<BurnchainHeaderHash, (u64, ConsensusHash)>;
@@ -85,6 +86,7 @@ pub struct RelayerStats {
8586

8687
pub struct ProcessedNetReceipts {
8788
pub mempool_txs_added: Vec<StacksTransaction>,
89+
pub processed_unconfirmed_state: ProcessedUnconfirmedState,
8890
}
8991

9092
/// Private trait for keeping track of messages that can be relayed, so we can identify the peers
@@ -750,7 +752,7 @@ impl Relayer {
750752
Ok((new_blocks, bad_neighbors))
751753
}
752754

753-
/// Prerocess all downloaded, confirmed microblock streams.
755+
/// Preprocess all downloaded, confirmed microblock streams.
754756
/// Does not fail on invalid blocks; just logs a warning.
755757
/// Returns the consensus hashes for the sortitions that elected the stacks anchored blocks that produced these streams.
756758
fn preprocess_downloaded_microblocks(
@@ -1105,7 +1107,7 @@ impl Relayer {
11051107
pub fn setup_unconfirmed_state(
11061108
chainstate: &mut StacksChainState,
11071109
sortdb: &SortitionDB,
1108-
) -> Result<(), Error> {
1110+
) -> Result<ProcessedUnconfirmedState, Error> {
11091111
let (canonical_consensus_hash, canonical_block_hash) =
11101112
SortitionDB::get_canonical_stacks_chain_tip_hash(sortdb.conn())?;
11111113
let canonical_tip = StacksBlockHeader::make_index_block_hash(
@@ -1117,8 +1119,10 @@ impl Relayer {
11171119
"Reload unconfirmed state off of {}/{}",
11181120
&canonical_consensus_hash, &canonical_block_hash
11191121
);
1120-
chainstate.reload_unconfirmed_state(&sortdb.index_conn(), canonical_tip)?;
1121-
Ok(())
1122+
let processed_unconfirmed_state =
1123+
chainstate.reload_unconfirmed_state(&sortdb.index_conn(), canonical_tip)?;
1124+
1125+
Ok(processed_unconfirmed_state)
11221126
}
11231127

11241128
/// Set up unconfirmed chain state in a read-only fashion
@@ -1142,16 +1146,23 @@ impl Relayer {
11421146
Ok(())
11431147
}
11441148

1145-
pub fn refresh_unconfirmed(chainstate: &mut StacksChainState, sortdb: &mut SortitionDB) {
1146-
if let Err(e) = Relayer::setup_unconfirmed_state(chainstate, sortdb) {
1147-
if let net_error::ChainstateError(ref err_msg) = e {
1148-
if err_msg == "Stacks chainstate error: NoSuchBlockError" {
1149-
trace!("Failed to instantiate unconfirmed state: {:?}", &e);
1149+
pub fn refresh_unconfirmed(
1150+
chainstate: &mut StacksChainState,
1151+
sortdb: &mut SortitionDB,
1152+
) -> ProcessedUnconfirmedState {
1153+
match Relayer::setup_unconfirmed_state(chainstate, sortdb) {
1154+
Ok(processed_unconfirmed_state) => processed_unconfirmed_state,
1155+
Err(e) => {
1156+
if let net_error::ChainstateError(ref err_msg) = e {
1157+
if err_msg == "Stacks chainstate error: NoSuchBlockError" {
1158+
trace!("Failed to instantiate unconfirmed state: {:?}", &e);
1159+
} else {
1160+
warn!("Failed to instantiate unconfirmed state: {:?}", &e);
1161+
}
11501162
} else {
11511163
warn!("Failed to instantiate unconfirmed state: {:?}", &e);
11521164
}
1153-
} else {
1154-
warn!("Failed to instantiate unconfirmed state: {:?}", &e);
1165+
Default::default()
11551166
}
11561167
}
11571168
}
@@ -1272,13 +1283,18 @@ impl Relayer {
12721283
}
12731284
}
12741285

1275-
let receipts = ProcessedNetReceipts { mempool_txs_added };
1286+
let mut processed_unconfirmed_state = Default::default();
12761287

12771288
// finally, refresh the unconfirmed chainstate, if need be
12781289
if network_result.has_microblocks() {
1279-
Relayer::refresh_unconfirmed(chainstate, sortdb);
1290+
processed_unconfirmed_state = Relayer::refresh_unconfirmed(chainstate, sortdb);
12801291
}
12811292

1293+
let receipts = ProcessedNetReceipts {
1294+
mempool_txs_added,
1295+
processed_unconfirmed_state,
1296+
};
1297+
12821298
Ok(receipts)
12831299
}
12841300
}

testnet/stacks-node/src/config.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1308,6 +1308,7 @@ pub enum EventKeyType {
13081308
AssetEvent(AssetIdentifier),
13091309
STXEvent,
13101310
MemPoolTransactions,
1311+
Microblocks,
13111312
AnyEvent,
13121313
BurnchainBlocks,
13131314
}
@@ -1330,6 +1331,10 @@ impl EventKeyType {
13301331
return Some(EventKeyType::BurnchainBlocks);
13311332
}
13321333

1334+
if raw_key == "microblocks" {
1335+
return Some(EventKeyType::Microblocks);
1336+
}
1337+
13331338
let comps: Vec<_> = raw_key.split("::").collect();
13341339
if comps.len() == 1 {
13351340
let split: Vec<_> = comps[0].split(".").collect();

0 commit comments

Comments
 (0)