Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ log/
weight-dumps/
gsdk/vara_runtime_prod.scale
*.meta.txt
.vscode

# cargo
target/
Expand Down
3 changes: 1 addition & 2 deletions ethexe/ethereum/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,7 @@ impl Ethereum {
sender_address: Address,
) -> Result<Ethereum> {
let provider = create_provider(ethereum_rpc_url, signer.clone(), sender_address).await?;
let router_query =
RouterQuery::from_provider(router_address.into(), provider.root().clone());
let router_query = RouterQuery::from_provider(router_address, provider.root().clone());
let router = router_address.into();
let wvara = router_query.wvara_address().await?.into();
let middleware = router_query.middleware_address().await?.into();
Expand Down
4 changes: 2 additions & 2 deletions ethexe/ethereum/src/router/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -353,9 +353,9 @@ impl RouterQuery {
})
}

pub fn from_provider(router_address: AlloyAddress, provider: RootProvider) -> Self {
pub fn from_provider(router_address: impl Into<AlloyAddress>, provider: RootProvider) -> Self {
Self {
instance: QueryInstance::new(router_address, provider),
instance: QueryInstance::new(router_address.into(), provider),
}
}

Expand Down
7 changes: 6 additions & 1 deletion ethexe/network/src/db_sync/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,8 @@ use std::{
};
use tokio::sync::{mpsc, oneshot};

const STREAM_PROTOCOL: StreamProtocol = StreamProtocol::new("/ethexe/db-sync/1.0.0");
const STREAM_PROTOCOL: StreamProtocol =
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Temporary - in order to be able to connect to current Vara.eth testnet

StreamProtocol::new(concat!("/ethexe/db-sync/", env!("CARGO_PKG_VERSION")));

#[derive(Debug, Copy, Clone, Eq, PartialEq)]
pub enum NewRequestRoundReason {
Expand Down Expand Up @@ -374,6 +375,7 @@ pub(crate) struct Behaviour {
inner: InnerBehaviour,
handle: Handle,
rx: mpsc::UnboundedReceiver<(HandleAction, oneshot::Sender<HandleResult>)>,
peer_score_handle: peer_score::Handle,
ongoing_requests: OngoingRequests,
ongoing_responses: OngoingResponses,
}
Expand All @@ -395,6 +397,7 @@ impl Behaviour {
),
handle,
rx,
peer_score_handle: peer_score_handle.clone(),
ongoing_requests: OngoingRequests::new(
&config,
peer_score_handle,
Expand Down Expand Up @@ -461,6 +464,7 @@ impl Behaviour {
log::debug!(
"request to {peer} failed because it doesn't support {STREAM_PROTOCOL} protocol"
);
self.peer_score_handle.unsupported_protocol(peer);
}

self.ongoing_requests.on_peer_failure(request_id);
Expand All @@ -474,6 +478,7 @@ impl Behaviour {
log::debug!(
"request from {peer} failed because it doesn't support {STREAM_PROTOCOL} protocol"
);
self.peer_score_handle.unsupported_protocol(peer);
}
request_response::Event::InboundFailure { .. } => {}
request_response::Event::ResponseSent { .. } => {}
Expand Down
84 changes: 66 additions & 18 deletions ethexe/network/src/db_sync/requests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,15 +125,26 @@ impl OngoingRequests {
request: OngoingRequest,
channel: oneshot::Sender<HandleResult>,
) {
let multiplier = if let InnerRequest::Hashes(HashesRequest(hashes)) = request
.response_handler
.as_ref()
.expect("response handler must be set at `inner_request` calling")
.inner_request()
{
1 + (hashes.len() / 100)
} else {
1
};

self.requests.insert(
request_id,
(
request
.request(
self.peer_score_handle.clone(),
self.external_data_provider.clone_boxed(),
self.request_timeout,
self.max_rounds_per_request,
self.request_timeout * multiplier as u32,
self.max_rounds_per_request * multiplier as u32,
)
.boxed(),
Some(channel),
Expand Down Expand Up @@ -713,18 +724,29 @@ impl OngoingRequest {
.difference(&self.tried_peers)
.choose_stable(&mut rand::thread_rng())
.copied();
self.tried_peers.extend(peer);

if let Some(peer) = peer {
self.tried_peers.insert(peer);
Poll::Ready(peer)
} else {
event_sent.get_or_insert_with(|| {
ctx.state
.set(OngoingRequestState::PendingState)
.expect("set only once");
});

Poll::Pending
if let Some(peer) = ctx
.peers
.iter()
.choose_stable(&mut rand::thread_rng())
.copied()
{
log::trace!("all peers have been tried, clear `tried_peers` and retry");
self.tried_peers.clear();
self.tried_peers.insert(peer);
Poll::Ready(peer)
} else {
event_sent.get_or_insert_with(|| {
ctx.state
.set(OngoingRequestState::PendingState)
.expect("set only once");
});
Poll::Pending
}
}
})
.await;
Expand All @@ -738,16 +760,33 @@ impl OngoingRequest {
peer: PeerId,
reason: NewRequestRoundReason,
) -> Result<InnerResponse, ()> {
let request = self
.response_handler
.as_ref()
.expect("always Some")
.inner_request();

log::trace!("sending request to {peer} {request:?}, round reason: {reason:?}");

let request = match self
.response_handler
.as_ref()
.expect("always Some")
.inner_request()
{
InnerRequest::Hashes(request) if request.0.len() > 100 => {
let r = InnerRequest::Hashes(HashesRequest(
request.0.iter().take(100).copied().collect(),
));
log::trace!("request is too big, send partial request: {r:?}");
r
}
other => other,
};

CONTEXT.with_mut(|ctx| {
ctx.state
.set(OngoingRequestState::SendRequest(
peer,
self.response_handler
.as_ref()
.expect("always Some")
.inner_request(),
reason,
))
.set(OngoingRequestState::SendRequest(peer, request, reason))
.expect("set only once");
});

Expand Down Expand Up @@ -807,6 +846,15 @@ impl OngoingRequest {
request_timeout: Duration,
max_rounds_per_request: u32,
) -> Result<Response, (RequestFailure, Self)> {
log::trace!(
"starting request {:?} with timeout {} secs and max rounds {max_rounds_per_request}",
self.response_handler
.as_ref()
.expect("response handler must be set at request moment")
.inner_request(),
request_timeout.as_secs(),
);

let request_loop = async {
let mut rounds = 0;
let mut reason = NewRequestRoundReason::FromQueue;
Expand Down
1 change: 1 addition & 0 deletions ethexe/network/src/gossipsub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,7 @@ impl Behaviour {
} => Poll::Pending,
gossipsub::Event::GossipsubNotSupported { peer_id } => {
log::trace!("peer doesn't support gossipsub: {peer_id}");
self.peer_score.unsupported_protocol(peer_id);
Poll::Pending
}
gossipsub::Event::SlowPeer {
Expand Down
18 changes: 12 additions & 6 deletions ethexe/network/src/injected.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

use crate::{
db_sync::{Multiaddr, PeerId},
peer_score,
utils::ParityScaleCodec,
validator::discovery::ValidatorIdentities,
};
Expand Down Expand Up @@ -51,7 +52,8 @@ use std::{
};
use tokio::sync::oneshot;

const STREAM_PROTOCOL: StreamProtocol = StreamProtocol::new("/ethexe/injected-tx/1.0.0");
const STREAM_PROTOCOL: StreamProtocol =
StreamProtocol::new(concat!("/ethexe/injected-tx/", env!("CARGO_PKG_VERSION")));

/// The maximum number of concurrent requests is allowed to be handled
const MAX_PENDING_REQUESTS: NonZeroUsize = NonZeroUsize::new(20).unwrap();
Expand Down Expand Up @@ -127,19 +129,21 @@ type PendingResponseFuture = BoxFuture<'static, (ResponseChannel<InnerResponse>,

pub(crate) struct Behaviour {
inner: InnerBehaviour,
peer_score: peer_score::Handle,
pending_requests: HashMap<OutboundRequestId, HashOf<InjectedTransaction>>,
pending_responses: FuturesUnordered<PendingResponseFuture>,
transaction_cache: LruCache<HashOf<InjectedTransaction>, LruCache<Address, ()>>,
}

impl Behaviour {
pub fn new() -> Self {
pub fn new(peer_score: peer_score::Handle) -> Self {
let inner = request_response::Behaviour::new(
[(STREAM_PROTOCOL, ProtocolSupport::Full)],
request_response::Config::default(),
);
Self {
inner,
peer_score,
pending_requests: HashMap::new(),
pending_responses: FuturesUnordered::new(),
transaction_cache: LruCache::new(MAX_TRANSACTIONS),
Expand Down Expand Up @@ -246,6 +250,7 @@ impl Behaviour {
log::debug!(
"request to {peer} failed because it doesn't support {STREAM_PROTOCOL} protocol"
);
self.peer_score.unsupported_protocol(peer);
}

let acceptance = Err(error.to_string()).into();
Expand All @@ -263,6 +268,7 @@ impl Behaviour {
log::debug!(
"request from {peer} failed because it doesn't support {STREAM_PROTOCOL} protocol"
);
self.peer_score.unsupported_protocol(peer);
}
request_response::Event::InboundFailure { .. } => {}
request_response::Event::ResponseSent { .. } => {}
Expand Down Expand Up @@ -409,7 +415,7 @@ mod tests {

let mut swarm = Swarm::new(
transport,
Behaviour::new(),
Behaviour::new(peer_score::Handle::new_test()),
peer_id,
libp2p::swarm::Config::with_tokio_executor(),
);
Expand Down Expand Up @@ -542,7 +548,7 @@ mod tests {
async fn too_many_pending_requests() {
init_logger();

let mut alice = Behaviour::new();
let mut alice = Behaviour::new(peer_score::Handle::new_test());
let (_bob, bob_identity) = new_swarm().await;
let bob_address = bob_identity.address();

Expand All @@ -564,7 +570,7 @@ mod tests {
async fn transaction_already_sent() {
init_logger();

let mut alice = Behaviour::new();
let mut alice = Behaviour::new(peer_score::Handle::new_test());
let (_bob, bob_identity) = new_swarm().await;

let transaction = addressed_injected_tx(bob_identity.address());
Expand All @@ -582,7 +588,7 @@ mod tests {

#[tokio::test]
async fn validator_not_found() {
let mut alice = Behaviour::new();
let mut alice = Behaviour::new(peer_score::Handle::new_test());

let transaction = addressed_injected_tx(Address::default());

Expand Down
Loading
Loading