Skip to content
This repository was archived by the owner on Nov 6, 2020. It is now read-only.

Commit b4af110

Browse files
committed
Implement eth/65 (EIP-2464)
1 parent 63e2781 commit b4af110

File tree

13 files changed

+229
-48
lines changed

13 files changed

+229
-48
lines changed

Cargo.lock

+1
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

ethcore/client-traits/src/lib.rs

+4-1
Original file line numberDiff line numberDiff line change
@@ -285,7 +285,10 @@ pub trait BlockChainClient:
285285
fn list_storage(&self, id: BlockId, account: &Address, after: Option<&H256>, count: Option<u64>) -> Option<Vec<H256>>;
286286

287287
/// Get transaction with given hash.
288-
fn transaction(&self, id: TransactionId) -> Option<LocalizedTransaction>;
288+
fn block_transaction(&self, id: TransactionId) -> Option<LocalizedTransaction>;
289+
290+
/// Get pool transaction with a given hash.
291+
fn queued_transaction(&self, hash: H256) -> Option<Arc<VerifiedTransaction>>;
289292

290293
/// Get uncle with given id.
291294
fn uncle(&self, id: UncleId) -> Option<encoded::Header>;

ethcore/src/client/client.rs

+5-1
Original file line numberDiff line numberDiff line change
@@ -1898,10 +1898,14 @@ impl BlockChainClient for Client {
18981898
Some(keys)
18991899
}
19001900

1901-
fn transaction(&self, id: TransactionId) -> Option<LocalizedTransaction> {
1901+
fn block_transaction(&self, id: TransactionId) -> Option<LocalizedTransaction> {
19021902
self.transaction_address(id).and_then(|address| self.chain.read().transaction(&address))
19031903
}
19041904

1905+
fn queued_transaction(&self, hash: H256) -> Option<Arc<VerifiedTransaction>> {
1906+
self.importer.miner.transaction(&hash)
1907+
}
1908+
19051909
fn uncle(&self, id: UncleId) -> Option<encoded::Header> {
19061910
let index = id.position;
19071911
self.block_body(id.block).and_then(|body| body.view().uncle_rlp_at(index))

ethcore/src/test_helpers/test_client.rs

+4-1
Original file line numberDiff line numberDiff line change
@@ -737,9 +737,12 @@ impl BlockChainClient for TestBlockChainClient {
737737
fn list_storage(&self, _id: BlockId, _account: &Address, _after: Option<&H256>, _count: Option<u64>) -> Option<Vec<H256>> {
738738
None
739739
}
740-
fn transaction(&self, _id: TransactionId) -> Option<LocalizedTransaction> {
740+
fn block_transaction(&self, _id: TransactionId) -> Option<LocalizedTransaction> {
741741
None // Simple default.
742742
}
743+
fn queued_transaction(&self, _hash: H256) -> Option<Arc<VerifiedTransaction>> {
744+
None
745+
}
743746

744747
fn uncle(&self, _id: UncleId) -> Option<encoded::Header> {
745748
None // Simple default.

ethcore/sync/Cargo.toml

+1
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ rlp = "0.4.5"
3636
snapshot = { path = "../snapshot" }
3737
trace-time = "0.1"
3838
triehash-ethereum = { version = "0.2", path = "../../util/triehash-ethereum" }
39+
transaction-pool = "2"
3940

4041
[dev-dependencies]
4142
env_logger = "0.5"

ethcore/sync/src/chain/handler.rs

+64-6
Original file line numberDiff line numberDiff line change
@@ -26,14 +26,12 @@ use crate::{
2626
sync_packet::{
2727
PacketInfo,
2828
SyncPacket::{
29-
self, BlockBodiesPacket, BlockHeadersPacket, NewBlockHashesPacket, NewBlockPacket,
30-
PrivateStatePacket, PrivateTransactionPacket, ReceiptsPacket, SignedPrivateTransactionPacket,
31-
SnapshotDataPacket, SnapshotManifestPacket, StatusPacket,
29+
self, *,
3230
}
3331
},
3432
BlockSet, ChainSync, ForkConfirmation, PacketProcessError, PeerAsking, PeerInfo, SyncRequester,
35-
SyncState, ETH_PROTOCOL_VERSION_63, ETH_PROTOCOL_VERSION_64, MAX_NEW_BLOCK_AGE, MAX_NEW_HASHES,
36-
PAR_PROTOCOL_VERSION_1, PAR_PROTOCOL_VERSION_3, PAR_PROTOCOL_VERSION_4,
33+
SyncState, ETH_PROTOCOL_VERSION_63, ETH_PROTOCOL_VERSION_64, ETH_PROTOCOL_VERSION_65,
34+
MAX_NEW_BLOCK_AGE, MAX_NEW_HASHES, PAR_PROTOCOL_VERSION_1, PAR_PROTOCOL_VERSION_3, PAR_PROTOCOL_VERSION_4,
3735
}
3836
};
3937

@@ -53,6 +51,7 @@ use common_types::{
5351
verification::Unverified,
5452
snapshot::{ManifestData, RestorationStatus},
5553
};
54+
use transaction_pool::VerifiedTransaction;
5655

5756

5857
/// The Chain Sync Handler: handles responses from peers
@@ -70,6 +69,8 @@ impl SyncHandler {
7069
ReceiptsPacket => SyncHandler::on_peer_block_receipts(sync, io, peer, &rlp),
7170
NewBlockPacket => SyncHandler::on_peer_new_block(sync, io, peer, &rlp),
7271
NewBlockHashesPacket => SyncHandler::on_peer_new_hashes(sync, io, peer, &rlp),
72+
NewPooledTransactionHashesPacket => SyncHandler::on_peer_new_pooled_transactions(sync, io, peer, &rlp),
73+
PooledTransactionsPacket => SyncHandler::on_peer_pooled_transactions(sync, io, peer, &rlp),
7374
SnapshotManifestPacket => SyncHandler::on_snapshot_manifest(sync, io, peer, &rlp),
7475
SnapshotDataPacket => SyncHandler::on_snapshot_data(sync, io, peer, &rlp),
7576
PrivateTransactionPacket => SyncHandler::on_private_transaction(sync, io, peer, &rlp),
@@ -595,9 +596,11 @@ impl SyncHandler {
595596
difficulty,
596597
latest_hash,
597598
genesis,
599+
unsent_pooled_hashes: if eth_protocol_version >= ETH_PROTOCOL_VERSION_65.0 { Some(io.chain().transactions_to_propagate().into_iter().map(|tx| *tx.hash()).collect()) } else { None },
598600
asking: PeerAsking::Nothing,
599601
asking_blocks: Vec::new(),
600602
asking_hash: None,
603+
asking_pooled_transactions: if eth_protocol_version >= ETH_PROTOCOL_VERSION_65.0 { Some(Vec::new()) } else { None },
601604
asking_private_state: None,
602605
ask_time: Instant::now(),
603606
last_sent_transactions: Default::default(),
@@ -656,7 +659,7 @@ impl SyncHandler {
656659

657660
if false
658661
|| (warp_protocol && (peer.protocol_version < PAR_PROTOCOL_VERSION_1.0 || peer.protocol_version > PAR_PROTOCOL_VERSION_4.0))
659-
|| (!warp_protocol && (peer.protocol_version < ETH_PROTOCOL_VERSION_63.0 || peer.protocol_version > ETH_PROTOCOL_VERSION_64.0))
662+
|| (!warp_protocol && (peer.protocol_version < ETH_PROTOCOL_VERSION_63.0 || peer.protocol_version > ETH_PROTOCOL_VERSION_65.0))
660663
{
661664
trace!(target: "sync", "Peer {} unsupported eth protocol ({})", peer_id, peer.protocol_version);
662665
return Err(DownloaderImportError::Invalid);
@@ -703,6 +706,61 @@ impl SyncHandler {
703706
Ok(())
704707
}
705708

709+
/// Called when peer sends us a set of new pooled transactions
710+
pub fn on_peer_new_pooled_transactions(sync: &mut ChainSync, io: &mut dyn SyncIo, peer_id: PeerId, tx_rlp: &Rlp) -> Result<(), DownloaderImportError> {
711+
for item in tx_rlp {
712+
let hash = item.as_val::<H256>().map_err(|_| DownloaderImportError::Invalid)?;
713+
714+
if io.chain().queued_transaction(hash).is_none() {
715+
let unfetched = sync.unfetched_pooled_transactions.entry(hash).or_insert_with(|| super::UnfetchedTransaction {
716+
announcer: peer_id,
717+
next_fetch: Instant::now(),
718+
tries: 0,
719+
});
720+
721+
// Only reset the budget if we hear from multiple sources
722+
if unfetched.announcer != peer_id {
723+
unfetched.next_fetch = Instant::now();
724+
unfetched.tries = 0;
725+
}
726+
}
727+
}
728+
729+
Ok(())
730+
}
731+
732+
/// Called when peer sends us a list of pooled transactions
733+
pub fn on_peer_pooled_transactions(sync: &ChainSync, io: &mut dyn SyncIo, peer_id: PeerId, tx_rlp: &Rlp) -> Result<(), DownloaderImportError> {
734+
let peer = match sync.peers.get(&peer_id).filter(|p| p.can_sync()) {
735+
Some(peer) => peer,
736+
None => {
737+
trace!(target: "sync", "{} Ignoring transactions from unconfirmed/unknown peer", peer_id);
738+
return Ok(());
739+
}
740+
};
741+
742+
// TODO: actually check against asked hashes
743+
let item_count = tx_rlp.item_count()?;
744+
if let Some(p) = &peer.asking_pooled_transactions {
745+
if item_count > p.len() {
746+
trace!(target: "sync", "{} Peer sent us more transactions than was supposed to", peer_id);
747+
return Err(DownloaderImportError::Invalid);
748+
}
749+
} else {
750+
trace!(target: "sync", "{} Peer sent us pooled transactions but does not declare support for them", peer_id);
751+
return Err(DownloaderImportError::Invalid);
752+
}
753+
trace!(target: "sync", "{:02} -> PooledTransactions ({} entries)", peer_id, item_count);
754+
let mut transactions = Vec::with_capacity(item_count);
755+
for i in 0 .. item_count {
756+
let rlp = tx_rlp.at(i)?;
757+
let tx = rlp.as_raw().to_vec();
758+
transactions.push(tx);
759+
}
760+
io.chain().queue_transactions(transactions, peer_id);
761+
Ok(())
762+
}
763+
706764
/// Called when peer sends us signed private transaction packet
707765
fn on_signed_private_transaction(sync: &mut ChainSync, _io: &mut dyn SyncIo, peer_id: PeerId, r: &Rlp) -> Result<(), DownloaderImportError> {
708766
if !sync.peers.get(&peer_id).map_or(false, |p| p.can_sync()) {

ethcore/sync/src/chain/mod.rs

+75-3
Original file line numberDiff line numberDiff line change
@@ -166,6 +166,8 @@ impl From<DecoderError> for PacketProcessError {
166166
}
167167
}
168168

169+
/// Version 65 of the Ethereum protocol and number of packet IDs reserved by the protocol (packet count).
170+
pub const ETH_PROTOCOL_VERSION_65: (u8, u8) = (65, 0x11);
169171
/// Version 64 of the Ethereum protocol and number of packet IDs reserved by the protocol (packet count).
170172
pub const ETH_PROTOCOL_VERSION_64: (u8, u8) = (64, 0x11);
171173
/// Version 63 of the Ethereum protocol and number of packet IDs reserved by the protocol (packet count).
@@ -217,6 +219,7 @@ const STATUS_TIMEOUT: Duration = Duration::from_secs(10);
217219
const HEADERS_TIMEOUT: Duration = Duration::from_secs(15);
218220
const BODIES_TIMEOUT: Duration = Duration::from_secs(20);
219221
const RECEIPTS_TIMEOUT: Duration = Duration::from_secs(10);
222+
const POOLED_TRANSACTIONS_TIMEOUT: Duration = Duration::from_secs(10);
220223
const FORK_HEADER_TIMEOUT: Duration = Duration::from_secs(3);
221224
/// Max time to wait for the Snapshot Manifest packet to arrive from a peer after it's being asked.
222225
const SNAPSHOT_MANIFEST_TIMEOUT: Duration = Duration::from_secs(5);
@@ -318,6 +321,7 @@ pub enum PeerAsking {
318321
BlockHeaders,
319322
BlockBodies,
320323
BlockReceipts,
324+
PooledTransactions,
321325
SnapshotManifest,
322326
SnapshotData,
323327
PrivateState,
@@ -352,6 +356,8 @@ pub struct PeerInfo {
352356
network_id: u64,
353357
/// Peer best block hash
354358
latest_hash: H256,
359+
/// Unpropagated tx pool hashes
360+
unsent_pooled_hashes: Option<H256FastSet>,
355361
/// Peer total difficulty if known
356362
difficulty: Option<U256>,
357363
/// Type of data currently being requested by us from a peer.
@@ -360,6 +366,8 @@ pub struct PeerInfo {
360366
asking_blocks: Vec<H256>,
361367
/// Holds requested header hash if currently requesting block header by hash
362368
asking_hash: Option<H256>,
369+
/// Holds requested transaction IDs
370+
asking_pooled_transactions: Option<Vec<H256>>,
363371
/// Holds requested private state hash
364372
asking_private_state: Option<H256>,
365373
/// Holds requested snapshot chunk hash if any.
@@ -669,6 +677,13 @@ enum PeerState {
669677
SameBlock
670678
}
671679

680+
#[derive(Clone, MallocSizeOf)]
681+
struct UnfetchedTransaction {
682+
announcer: PeerId,
683+
next_fetch: Instant,
684+
tries: usize,
685+
}
686+
672687
/// Blockchain sync handler.
673688
/// See module documentation for more details.
674689
#[derive(MallocSizeOf)]
@@ -708,6 +723,8 @@ pub struct ChainSync {
708723
sync_start_time: Option<Instant>,
709724
/// Transactions propagation statistics
710725
transactions_stats: TransactionsStats,
726+
/// Transactions whose hash has been announced, but that we have not fetched
727+
unfetched_pooled_transactions: H256FastMap<UnfetchedTransaction>,
711728
/// Enable ancient block downloading
712729
download_old_blocks: bool,
713730
/// Shared private tx service.
@@ -751,6 +768,7 @@ impl ChainSync {
751768
snapshot: Snapshot::new(),
752769
sync_start_time: None,
753770
transactions_stats: TransactionsStats::default(),
771+
unfetched_pooled_transactions: Default::default(),
754772
private_tx_handler,
755773
warp_sync: config.warp_sync,
756774
status_sinks: Vec::new()
@@ -764,7 +782,7 @@ impl ChainSync {
764782
let last_imported_number = self.new_blocks.last_imported_block_number();
765783
SyncStatus {
766784
state: self.state.clone(),
767-
protocol_version: ETH_PROTOCOL_VERSION_64.0,
785+
protocol_version: ETH_PROTOCOL_VERSION_65.0,
768786
network_id: self.network_id,
769787
start_block_number: self.starting_block,
770788
last_imported_block_number: Some(last_imported_number),
@@ -798,8 +816,17 @@ impl ChainSync {
798816

799817
/// Updates the set of transactions recently sent to this peer to avoid spamming.
800818
pub fn transactions_received(&mut self, txs: &[UnverifiedTransaction], peer_id: PeerId) {
801-
if let Some(peer_info) = self.peers.get_mut(&peer_id) {
802-
peer_info.last_sent_transactions.extend(txs.iter().map(|tx| tx.hash()));
819+
for (id, peer) in &mut self.peers {
820+
let hashes = txs.iter().map(|tx| tx.hash());
821+
if *id == peer_id {
822+
peer.last_sent_transactions.extend(hashes);
823+
} else if let Some(s) = &mut peer.unsent_pooled_hashes {
824+
s.extend(hashes);
825+
}
826+
}
827+
828+
for tx in txs {
829+
self.unfetched_pooled_transactions.remove(&tx.hash());
803830
}
804831
}
805832

@@ -1149,6 +1176,48 @@ impl ChainSync {
11491176
}
11501177
}
11511178

1179+
// get the peer to give us at least some of announced but unfetched transactions
1180+
if !self.unfetched_pooled_transactions.is_empty() {
1181+
if let Some(s) = &mut self.peers.get_mut(&peer_id).expect("this is always an active peer; qed").asking_pooled_transactions {
1182+
let now = Instant::now();
1183+
1184+
let mut new_asking_pooled_transactions = s.iter().copied().collect::<HashSet<_>>();
1185+
let mut remaining_unfetched_pooled_transactions = self.unfetched_pooled_transactions.clone();
1186+
for (hash, mut item) in self.unfetched_pooled_transactions.drain() {
1187+
if new_asking_pooled_transactions.len() >= 256 {
1188+
// can't request any more transactions
1189+
break;
1190+
}
1191+
1192+
// if enough time has passed since last attempt...
1193+
if item.next_fetch < now {
1194+
// ...queue this hash for requesting
1195+
new_asking_pooled_transactions.insert(hash);
1196+
item.tries += 1;
1197+
1198+
// if we just started asking for it, queue it to be asked later on again
1199+
if item.tries < 5 {
1200+
item.next_fetch = now + (POOLED_TRANSACTIONS_TIMEOUT / 2);
1201+
remaining_unfetched_pooled_transactions.insert(hash, item);
1202+
} else {
1203+
// ...otherwise we assume this transaction does not exist and remove its hash from request queue
1204+
remaining_unfetched_pooled_transactions.remove(&hash);
1205+
}
1206+
}
1207+
}
1208+
1209+
let new_asking_pooled_transactions = new_asking_pooled_transactions.into_iter().collect::<Vec<_>>();
1210+
SyncRequester::request_pooled_transactions(self, io, peer_id, &new_asking_pooled_transactions);
1211+
1212+
self.peers.get_mut(&peer_id).expect("this is always an active peer; qed").asking_pooled_transactions = Some(new_asking_pooled_transactions);
1213+
self.unfetched_pooled_transactions = remaining_unfetched_pooled_transactions;
1214+
1215+
return;
1216+
} else {
1217+
trace!(target: "sync", "Skipping transaction fetch for peer {} as they don't support eth/65", peer_id);
1218+
}
1219+
}
1220+
11521221
// Only ask for old blocks if the peer has an equal or higher difficulty
11531222
let equal_or_higher_difficulty = peer_difficulty.map_or(true, |pd| pd >= syncing_difficulty);
11541223

@@ -1340,6 +1409,7 @@ impl ChainSync {
13401409
PeerAsking::BlockHeaders => elapsed > HEADERS_TIMEOUT,
13411410
PeerAsking::BlockBodies => elapsed > BODIES_TIMEOUT,
13421411
PeerAsking::BlockReceipts => elapsed > RECEIPTS_TIMEOUT,
1412+
PeerAsking::PooledTransactions => elapsed > POOLED_TRANSACTIONS_TIMEOUT,
13431413
PeerAsking::Nothing => false,
13441414
PeerAsking::ForkHeader => elapsed > FORK_HEADER_TIMEOUT,
13451415
PeerAsking::SnapshotManifest => elapsed > SNAPSHOT_MANIFEST_TIMEOUT,
@@ -1668,10 +1738,12 @@ pub mod tests {
16681738
genesis: H256::zero(),
16691739
network_id: 0,
16701740
latest_hash: peer_latest_hash,
1741+
unsent_pooled_hashes: Some(Default::default()),
16711742
difficulty: None,
16721743
asking: PeerAsking::Nothing,
16731744
asking_blocks: Vec::new(),
16741745
asking_hash: None,
1746+
asking_pooled_transactions: Some(Vec::new()),
16751747
asking_private_state: None,
16761748
ask_time: Instant::now(),
16771749
last_sent_transactions: Default::default(),

0 commit comments

Comments
 (0)