-
Notifications
You must be signed in to change notification settings - Fork 1.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: add discv4 terminate #4879
Changes from 1 commit
aef57f6
d77482a
0bc07a3
ce93096
40eb946
d39b4cf
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||
---|---|---|---|---|
|
@@ -389,6 +389,11 @@ impl Discv4 { | |||
self.to_service.send(cmd)?; | ||||
Ok(rx.await?) | ||||
} | ||||
// Terminate discv4 | ||||
pub fn terminate(&self) { | ||||
let cmd = Discv4Command::Terminated; | ||||
self.send_to_service(cmd); | ||||
} | ||||
} | ||||
|
||||
/// Manages discv4 peer discovery over UDP. | ||||
|
@@ -711,8 +716,8 @@ impl Discv4Service { | |||
self.kbuckets | ||||
.closest_values(&target_key) | ||||
.filter(|node| { | ||||
node.value.has_endpoint_proof && | ||||
!self.pending_find_nodes.contains_key(&node.key.preimage().0) | ||||
node.value.has_endpoint_proof | ||||
&& !self.pending_find_nodes.contains_key(&node.key.preimage().0) | ||||
}) | ||||
.take(MAX_NODES_PER_BUCKET) | ||||
.map(|n| (target_key.distance(&n.key), n.value.record)), | ||||
|
@@ -728,7 +733,7 @@ impl Discv4Service { | |||
// (e.g. connectivity problems over a long period of time, or issues during initial | ||||
// bootstrapping) so we attempt to bootstrap again | ||||
self.bootstrap(); | ||||
return | ||||
return; | ||||
} | ||||
|
||||
trace!(target : "discv4", ?target, num = closest.len(), "Start lookup closest nodes"); | ||||
|
@@ -806,7 +811,7 @@ impl Discv4Service { | |||
fn has_bond(&self, remote_id: PeerId, remote_ip: IpAddr) -> bool { | ||||
if let Some(timestamp) = self.received_pongs.last_pong(remote_id, remote_ip) { | ||||
if timestamp.elapsed() < self.config.bond_expiration { | ||||
return true | ||||
return true; | ||||
} | ||||
} | ||||
false | ||||
|
@@ -818,7 +823,7 @@ impl Discv4Service { | |||
/// followup request to retrieve the updated ENR | ||||
fn update_on_reping(&mut self, record: NodeRecord, mut last_enr_seq: Option<u64>) { | ||||
if record.id == self.local_node_record.id { | ||||
return | ||||
return; | ||||
} | ||||
|
||||
// If EIP868 extension is disabled then we want to ignore this | ||||
|
@@ -852,7 +857,7 @@ impl Discv4Service { | |||
/// Callback invoked when we receive a pong from the peer. | ||||
fn update_on_pong(&mut self, record: NodeRecord, mut last_enr_seq: Option<u64>) { | ||||
if record.id == *self.local_peer_id() { | ||||
return | ||||
return; | ||||
} | ||||
|
||||
// If EIP868 extension is disabled then we want to ignore this | ||||
|
@@ -959,7 +964,7 @@ impl Discv4Service { | |||
fn on_ping(&mut self, ping: Ping, remote_addr: SocketAddr, remote_id: PeerId, hash: B256) { | ||||
if self.is_expired(ping.expire) { | ||||
// ping's expiration timestamp is in the past | ||||
return | ||||
return; | ||||
} | ||||
|
||||
// create the record | ||||
|
@@ -1057,17 +1062,17 @@ impl Discv4Service { | |||
fn try_ping(&mut self, node: NodeRecord, reason: PingReason) { | ||||
if node.id == *self.local_peer_id() { | ||||
// don't ping ourselves | ||||
return | ||||
return; | ||||
} | ||||
|
||||
if self.pending_pings.contains_key(&node.id) || | ||||
self.pending_find_nodes.contains_key(&node.id) | ||||
if self.pending_pings.contains_key(&node.id) | ||||
|| self.pending_find_nodes.contains_key(&node.id) | ||||
{ | ||||
return | ||||
return; | ||||
} | ||||
|
||||
if self.queued_pings.iter().any(|(n, _)| n.id == node.id) { | ||||
return | ||||
return; | ||||
} | ||||
|
||||
if self.pending_pings.len() < MAX_NODES_PING { | ||||
|
@@ -1102,7 +1107,7 @@ impl Discv4Service { | |||
/// Returns the echo hash of the ping message. | ||||
pub(crate) fn send_enr_request(&mut self, node: NodeRecord) { | ||||
if !self.config.enable_eip868 { | ||||
return | ||||
return; | ||||
} | ||||
let remote_addr = node.udp_addr(); | ||||
let enr_request = EnrRequest { expire: self.enr_request_expiration() }; | ||||
|
@@ -1117,7 +1122,7 @@ impl Discv4Service { | |||
/// Message handler for an incoming `Pong`. | ||||
fn on_pong(&mut self, pong: Pong, remote_addr: SocketAddr, remote_id: PeerId) { | ||||
if self.is_expired(pong.expire) { | ||||
return | ||||
return; | ||||
} | ||||
|
||||
let PingRequest { node, reason, .. } = match self.pending_pings.entry(remote_id) { | ||||
|
@@ -1126,7 +1131,7 @@ impl Discv4Service { | |||
let request = entry.get(); | ||||
if request.echo_hash != pong.echo { | ||||
debug!( target : "discv4", from=?remote_addr, expected=?request.echo_hash, echo_hash=?pong.echo,"Got unexpected Pong"); | ||||
return | ||||
return; | ||||
} | ||||
} | ||||
entry.remove() | ||||
|
@@ -1164,11 +1169,11 @@ impl Discv4Service { | |||
fn on_find_node(&mut self, msg: FindNode, remote_addr: SocketAddr, node_id: PeerId) { | ||||
if self.is_expired(msg.expire) { | ||||
// ping's expiration timestamp is in the past | ||||
return | ||||
return; | ||||
} | ||||
if node_id == *self.local_peer_id() { | ||||
// ignore find node requests to ourselves | ||||
return | ||||
return; | ||||
} | ||||
|
||||
if self.has_bond(node_id, remote_addr.ip()) { | ||||
|
@@ -1216,7 +1221,7 @@ impl Discv4Service { | |||
request_hash: B256, | ||||
) { | ||||
if !self.config.enable_eip868 || self.is_expired(msg.expire) { | ||||
return | ||||
return; | ||||
} | ||||
|
||||
if self.has_bond(id, remote_addr.ip()) { | ||||
|
@@ -1235,7 +1240,7 @@ impl Discv4Service { | |||
fn on_neighbours(&mut self, msg: Neighbours, remote_addr: SocketAddr, node_id: PeerId) { | ||||
if self.is_expired(msg.expire) { | ||||
// response is expired | ||||
return | ||||
return; | ||||
} | ||||
// check if this request was expected | ||||
let ctx = match self.pending_find_nodes.entry(node_id) { | ||||
|
@@ -1251,7 +1256,7 @@ impl Discv4Service { | |||
request.response_count = total; | ||||
} else { | ||||
debug!(target : "discv4", total, from=?remote_addr, "Received neighbors packet entries exceeds max nodes per bucket"); | ||||
return | ||||
return; | ||||
} | ||||
}; | ||||
|
||||
|
@@ -1267,7 +1272,7 @@ impl Discv4Service { | |||
Entry::Vacant(_) => { | ||||
// received neighbours response without requesting it | ||||
debug!( target : "discv4", from=?remote_addr, "Received unsolicited Neighbours"); | ||||
return | ||||
return; | ||||
} | ||||
}; | ||||
|
||||
|
@@ -1277,7 +1282,7 @@ impl Discv4Service { | |||
// prevent banned peers from being added to the context | ||||
if self.config.ban_list.is_banned(&node.id, &node.address) { | ||||
trace!(target: "discv4", peer_id=?node.id, ip=?node.address, "ignoring banned record"); | ||||
continue | ||||
continue; | ||||
} | ||||
|
||||
ctx.add_node(node); | ||||
|
@@ -1346,7 +1351,7 @@ impl Discv4Service { | |||
self.pending_pings.retain(|node_id, ping_request| { | ||||
if now.duration_since(ping_request.sent_at) > self.config.ping_expiration { | ||||
failed_pings.push(*node_id); | ||||
return false | ||||
return false; | ||||
} | ||||
true | ||||
}); | ||||
|
@@ -1371,7 +1376,7 @@ impl Discv4Service { | |||
// treat this as an hard error since it responded. | ||||
failed_neighbours.push(*node_id); | ||||
} | ||||
return false | ||||
return false; | ||||
} | ||||
true | ||||
}); | ||||
|
@@ -1399,7 +1404,7 @@ impl Discv4Service { | |||
if let Some(bucket) = self.kbuckets.get_bucket(&key) { | ||||
if bucket.num_entries() < MAX_NODES_PER_BUCKET / 2 { | ||||
// skip half empty bucket | ||||
continue | ||||
continue; | ||||
} | ||||
} | ||||
self.remove_node(node_id); | ||||
|
@@ -1446,7 +1451,7 @@ impl Discv4Service { | |||
let now = SystemTime::now().duration_since(UNIX_EPOCH).unwrap_or_default().as_secs(); | ||||
if self.config.enforce_expiration_timestamps && timestamp < now { | ||||
debug!(target: "discv4", "Expired packet"); | ||||
return Err(()) | ||||
return Err(()); | ||||
} | ||||
Ok(()) | ||||
} | ||||
|
@@ -1490,7 +1495,7 @@ impl Discv4Service { | |||
loop { | ||||
// drain buffered events first | ||||
if let Some(event) = self.queued_events.pop_front() { | ||||
return Poll::Ready(event) | ||||
return Poll::Ready(event); | ||||
} | ||||
|
||||
// trigger self lookup | ||||
|
@@ -1554,6 +1559,11 @@ impl Discv4Service { | |||
let _ = self.local_eip_868_enr.set_tcp6(port, &self.secret_key); | ||||
} | ||||
} | ||||
|
||||
Discv4Command::Terminated => { | ||||
//self.terminate(); | ||||
todo!() | ||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this needs to emit the event and here we need to handle the terminate by returning None if event is terminate reth/crates/net/discv4/src/lib.rs Line 1618 in 858ea41
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
I'm really sorry but i don't understand what do you means by it need to emit the events, could you explain me this please? |
||||
} | ||||
} | ||||
} | ||||
|
||||
|
@@ -1612,7 +1622,7 @@ impl Discv4Service { | |||
} | ||||
|
||||
if self.queued_events.is_empty() { | ||||
return Poll::Pending | ||||
return Poll::Pending; | ||||
} | ||||
} | ||||
} | ||||
|
@@ -1688,7 +1698,7 @@ pub(crate) async fn receive_loop(udp: Arc<UdpSocket>, tx: IngressSender, local_i | |||
if packet.node_id == local_id { | ||||
// received our own message | ||||
debug!(target : "discv4", ?remote_addr, "Received own packet."); | ||||
continue | ||||
continue; | ||||
} | ||||
send(IngressEvent::Packet(remote_addr, packet)).await; | ||||
} | ||||
|
@@ -1714,6 +1724,7 @@ enum Discv4Command { | |||
Lookup { node_id: Option<PeerId>, tx: Option<NodeRecordSender> }, | ||||
SetLookupInterval(Duration), | ||||
Updates(OneshotSender<ReceiverStream<DiscoveryUpdate>>), | ||||
Terminated, | ||||
} | ||||
|
||||
/// Event type receiver produces | ||||
|
@@ -1772,7 +1783,7 @@ impl LookupTargetRotator { | |||
self.counter += 1; | ||||
self.counter %= self.interval; | ||||
if self.counter == 0 { | ||||
return *local | ||||
return *local; | ||||
} | ||||
PeerId::random() | ||||
} | ||||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please use nightly formatting
cargo +nightly fmt