Skip to content

Commit c5ca3ab

Browse files
authored
Merge pull request #448 from tox-rs/refactor
refactor(dht): split dht to dht and udp structs
2 parents 96c9d11 + 869eeec commit c5ca3ab

File tree

8 files changed

+353
-486
lines changed

8 files changed

+353
-486
lines changed

examples/dht_server.rs

+10-7
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,10 @@ use std::net::SocketAddr;
1313

1414
use tox_crypto::*;
1515
use tox_packet::dht::packed_node::PackedNode;
16-
use tox_core::dht::server::*;
16+
use tox_core::dht::server::Server as DhtServer;
1717
use tox_core::dht::server_ext::dht_run_socket;
1818
use tox_core::dht::lan_discovery::*;
19+
use tox_core::udp::Server as UdpServer;
1920
use tox_core::stats::Stats;
2021

2122
mod common;
@@ -49,24 +50,26 @@ async fn main() -> Result<(), Error> {
4950
let mut lan_discovery_sender =
5051
LanDiscoverySender::new(tx.clone(), server_pk, local_addr.is_ipv6());
5152

52-
let mut server = Server::new(tx, server_pk, server_sk);
53-
server.set_bootstrap_info(3_000_000_000, Box::new(|_| b"This is tox-rs".to_vec()));
54-
server.enable_lan_discovery(true);
55-
server.enable_ipv6_mode(local_addr.is_ipv6());
53+
let mut dht_server = DhtServer::new(tx, server_pk, server_sk);
54+
dht_server.set_bootstrap_info(3_000_000_000, Box::new(|_| b"This is tox-rs".to_vec()));
55+
dht_server.enable_lan_discovery(true);
56+
dht_server.enable_ipv6_mode(local_addr.is_ipv6());
5657

5758
// Bootstrap from nodes
5859
for &(pk, saddr) in &common::BOOTSTRAP_NODES {
5960
let bootstrap_pn = as_packed_node(pk, saddr);
6061

61-
server.add_initial_bootstrap(bootstrap_pn);
62+
dht_server.add_initial_bootstrap(bootstrap_pn);
6263
}
6364

65+
let udp_server = UdpServer::new(dht_server);
66+
6467
let socket = common::bind_socket(local_addr).await;
6568

6669
info!("Running DHT server on {}", local_addr);
6770

6871
futures::select! {
69-
res = dht_run_socket(&server, socket, rx, stats).fuse() => res.map_err(Error::from),
72+
res = dht_run_socket(&udp_server, socket, rx, stats).fuse() => res.map_err(Error::from),
7073
res = lan_discovery_sender.run().fuse() => res.map_err(Error::from),
7174
}
7275
}

examples/echo.rs

+8-6
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,10 @@ use tox_packet::friend_connection::*;
2020
use tox_packet::onion::InnerOnionResponse;
2121
use tox_packet::relay::DataPayload;
2222
use tox_packet::toxid::ToxId;
23-
use tox_core::dht::server::Server;
23+
use tox_core::dht::server::Server as DhtServer;
2424
use tox_core::dht::server_ext::dht_run_socket;
2525
use tox_core::dht::lan_discovery::LanDiscoverySender;
26+
use tox_core::udp::Server as UdpServer;
2627
use tox_core::friend_connection::FriendConnections;
2728
use tox_core::net_crypto::{NetCrypto, NetCryptoNewArgs};
2829
use tox_core::onion::client::OnionClient;
@@ -70,7 +71,7 @@ async fn main() -> Result<(), Error> {
7071

7172
let (tcp_incoming_tx, mut tcp_incoming_rx) = mpsc::unbounded();
7273

73-
let mut dht_server = Server::new(tx.clone(), dht_pk, dht_sk.clone());
74+
let mut dht_server = DhtServer::new(tx.clone(), dht_pk, dht_sk.clone());
7475
dht_server.enable_lan_discovery(true);
7576
dht_server.enable_ipv6_mode(local_addr.is_ipv6());
7677

@@ -97,9 +98,6 @@ async fn main() -> Result<(), Error> {
9798
let (net_crypto_tcp_tx, mut net_crypto_tcp_rx) = mpsc::channel(32);
9899
net_crypto.set_tcp_sink(net_crypto_tcp_tx).await;
99100

100-
dht_server.set_net_crypto(net_crypto.clone());
101-
dht_server.set_onion_client(onion_client.clone());
102-
103101
let friend_connections = FriendConnections::new(
104102
real_sk,
105103
real_pk,
@@ -122,6 +120,10 @@ async fn main() -> Result<(), Error> {
122120
onion_client.add_path_node(node).await;
123121
}
124122

123+
let mut udp_server = UdpServer::new(dht_server);
124+
udp_server.set_net_crypto(net_crypto.clone());
125+
udp_server.set_onion_client(onion_client.clone());
126+
125127
let net_crypto_tcp_future = async {
126128
while let Some((packet, pk)) = net_crypto_tcp_rx.next().await {
127129
tcp_connections.send_data(pk, packet).await?;
@@ -211,7 +213,7 @@ async fn main() -> Result<(), Error> {
211213
}
212214

213215
futures::select!(
214-
res = dht_run_socket(&dht_server, socket, rx, stats).fuse() => res.map_err(Error::from),
216+
res = dht_run_socket(&udp_server, socket, rx, stats).fuse() => res.map_err(Error::from),
215217
res = lan_discovery_sender.run().fuse() => res.map_err(Error::from),
216218
res = tcp_connections.run().fuse() => res.map_err(Error::from),
217219
res = onion_client.run().fuse() => res.map_err(Error::from),

tox_core/src/dht/server/mod.rs

+161-461
Large diffs are not rendered by default.

tox_core/src/dht/server_ext.rs

+6-5
Original file line numberDiff line numberDiff line change
@@ -10,12 +10,12 @@ use failure::Fail;
1010

1111
use crate::dht::codec::*;
1212
use tox_packet::dht::Packet;
13-
use crate::dht::server::Server;
13+
use crate::udp::Server;
1414
use crate::stats::Stats;
1515

1616
/// Run DHT server on `UdpSocket`.
1717
pub async fn dht_run_socket(
18-
dht: &Server,
18+
udp: &Server,
1919
socket: UdpSocket,
2020
mut rx: Receiver<(Packet, SocketAddr)>,
2121
stats: Stats
@@ -32,7 +32,7 @@ pub async fn dht_run_socket(
3232
match event {
3333
Ok((packet, addr)) => {
3434
trace!("Received packet {:?}", packet);
35-
let res = dht.handle_packet(packet, addr).await;
35+
let res = udp.handle_packet(packet, addr).await;
3636

3737
if let Err(ref err) = res {
3838
error!("Failed to handle packet: {:?}", err);
@@ -74,7 +74,7 @@ pub async fn dht_run_socket(
7474
futures::select! {
7575
read = network_reader.fuse() => read,
7676
write = network_writer.fuse() => write,
77-
run = dht.run().fuse() => {
77+
run = udp.dht.run().fuse() => { // TODO: should we run it here?
7878
let res: Result<_, _> = run;
7979
res.map_err(|e| Error::new(ErrorKind::Other, e.compat()))
8080
}
@@ -90,6 +90,7 @@ mod tests {
9090

9191
use tox_crypto::*;
9292
use tox_packet::dht::*;
93+
use crate::dht::server::Server as DhtServer;
9394

9495
#[tokio::test]
9596
async fn run_socket() {
@@ -100,7 +101,7 @@ mod tests {
100101

101102
let (tx, rx) = mpsc::channel(32);
102103

103-
let server = Server::new(tx, server_pk, server_sk);
104+
let server = Server::new(DhtServer::new(tx, server_pk, server_sk));
104105

105106
// Bind server socket
106107
let server_addr: SocketAddr = "127.0.0.1:0".parse().unwrap();

tox_core/src/friend_connection/mod.rs

+3-6
Original file line numberDiff line numberDiff line change
@@ -436,7 +436,7 @@ mod tests {
436436
let (tcp_incoming_tx, _tcp_incoming_rx) = mpsc::unbounded();
437437
let (lossless_tx, lossless_rx) = mpsc::unbounded();
438438
let (lossy_tx, _lossy_rx) = mpsc::unbounded();
439-
let mut dht = DhtServer::new(udp_tx.clone(), dht_pk, dht_sk.clone());
439+
let dht = DhtServer::new(udp_tx.clone(), dht_pk, dht_sk.clone());
440440
let tcp_connections = TcpConnections::new(dht_pk, dht_sk.clone(), tcp_incoming_tx);
441441
let onion_client = OnionClient::new(dht.clone(), tcp_connections.clone(), real_sk.clone(), real_pk);
442442
let precomputed_keys = PrecomputedCache::new(dht_sk.clone(), 1);
@@ -450,8 +450,6 @@ mod tests {
450450
real_sk: real_sk.clone(),
451451
precomputed_keys,
452452
});
453-
dht.set_onion_client(onion_client.clone());
454-
dht.set_net_crypto(net_crypto.clone());
455453
let friend_connections = FriendConnections::new(
456454
real_sk,
457455
real_pk,
@@ -962,9 +960,8 @@ mod tests {
962960
let handshake = CryptoHandshake::new(&precomputed_key, &handshake_payload, cookie);
963961

964962
let net_crypto = friend_connections.net_crypto.clone();
965-
let dht = friend_connections.dht.clone();
966963
let packets_future = async {
967-
dht.handle_packet(DhtPacket::CryptoHandshake(handshake), friend_saddr).await.unwrap();
964+
net_crypto.handle_udp_crypto_handshake(&handshake, friend_saddr).await.unwrap();
968965

969966
let session_pk = net_crypto.get_session_pk(&friend_pk).await.unwrap();
970967
let session_precomputed_key = precompute(&session_pk, &friend_session_sk);
@@ -976,7 +973,7 @@ mod tests {
976973
};
977974
let crypto_data = CryptoData::new(&session_precomputed_key, sent_nonce, &crypto_data_payload);
978975

979-
dht.handle_packet(DhtPacket::CryptoData(crypto_data), friend_saddr).await.unwrap();
976+
net_crypto.handle_udp_crypto_data(&crypto_data, friend_saddr).await.unwrap();
980977

981978
let (received, _lossless_rx) = lossless_rx.into_future().await;
982979
let (received_pk, received_data) = received.unwrap();

tox_core/src/lib.rs

+1
Original file line numberDiff line numberDiff line change
@@ -30,3 +30,4 @@ pub mod net_crypto;
3030
pub mod utils;
3131
pub mod friend_connection;
3232
pub mod stats;
33+
pub mod udp;

tox_core/src/onion/client/mod.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -2323,7 +2323,7 @@ mod tests {
23232323
id: request_payload.id,
23242324
};
23252325
let response_packet = NodesResponse::new(&shared_secret, &node_pk, &response_payload);
2326-
onion_client.dht.handle_packet(Packet::NodesResponse(response_packet), node.saddr).await.unwrap();
2326+
onion_client.dht.handle_nodes_resp(response_packet, node.saddr).await.unwrap();
23272327

23282328
let mut state = onion_client.state.lock().await;
23292329

tox_core/src/udp.rs

+163
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,163 @@
1+
use crate::dht::ip_port::IsGlobal;
2+
use crate::dht::server::Server as DhtServer;
3+
use crate::dht::server::errors::*;
4+
use crate::net_crypto::NetCrypto;
5+
use crate::onion::client::OnionClient;
6+
use failure::Fail;
7+
use tox_packet::dht::*;
8+
use tox_packet::onion::*;
9+
use std::net::SocketAddr;
10+
11+
/// UDP server that handles DHT, onion and net_crypto packets. Onion and
12+
/// net_crypro handlers are optional since appropriate packets are not handled
13+
/// when operating in DHT server mode.
14+
pub struct Server {
15+
/// DHT server.
16+
pub dht: DhtServer,
17+
/// Onion client that handles `OnionDataResponse` and
18+
/// `OnionAnnounceResponse` packets. It can be `None` in case of pure
19+
/// bootstrap server.
20+
onion_client: Option<OnionClient>,
21+
/// Net crypto module that handles `CookieRequest`, `CookieResponse`,
22+
/// `CryptoHandshake` and `CryptoData` packets. It can be `None` in case of
23+
/// pure bootstrap server when we don't have friends and therefore don't
24+
/// have to handle related packets.
25+
net_crypto: Option<NetCrypto>,
26+
}
27+
28+
impl Server {
29+
/// Create new `Server` instance.
30+
pub fn new(dht: DhtServer) -> Self {
31+
Self {
32+
dht,
33+
onion_client: None,
34+
net_crypto: None,
35+
}
36+
}
37+
38+
/// Function to handle incoming packets and send responses if necessary.
39+
pub async fn handle_packet(&self, packet: Packet, addr: SocketAddr) -> Result<(), HandlePacketError> {
40+
match packet {
41+
Packet::PingRequest(packet) =>
42+
self.dht.handle_ping_req(packet, addr).await,
43+
Packet::PingResponse(packet) =>
44+
self.dht.handle_ping_resp(packet, addr).await,
45+
Packet::NodesRequest(packet) =>
46+
self.dht.handle_nodes_req(packet, addr).await,
47+
Packet::NodesResponse(packet) =>
48+
self.dht.handle_nodes_resp(packet, addr).await,
49+
Packet::CookieRequest(packet) =>
50+
self.handle_cookie_request(&packet, addr).await,
51+
Packet::CookieResponse(packet) =>
52+
self.handle_cookie_response(&packet, addr).await,
53+
Packet::CryptoHandshake(packet) =>
54+
self.handle_crypto_handshake(&packet, addr).await,
55+
Packet::DhtRequest(packet) =>
56+
self.dht.handle_dht_req(packet, addr).await,
57+
Packet::LanDiscovery(packet) =>
58+
self.dht.handle_lan_discovery(&packet, addr).await,
59+
Packet::OnionRequest0(packet) =>
60+
self.dht.handle_onion_request_0(packet, addr).await,
61+
Packet::OnionRequest1(packet) =>
62+
self.dht.handle_onion_request_1(packet, addr).await,
63+
Packet::OnionRequest2(packet) =>
64+
self.dht.handle_onion_request_2(packet, addr).await,
65+
Packet::OnionAnnounceRequest(packet) =>
66+
self.dht.handle_onion_announce_request(packet, addr).await,
67+
Packet::OnionDataRequest(packet) =>
68+
self.dht.handle_onion_data_request(packet).await,
69+
Packet::OnionResponse3(packet) =>
70+
self.dht.handle_onion_response_3(packet).await,
71+
Packet::OnionResponse2(packet) =>
72+
self.dht.handle_onion_response_2(packet).await,
73+
Packet::OnionResponse1(packet) =>
74+
self.dht.handle_onion_response_1(packet).await,
75+
Packet::BootstrapInfo(packet) =>
76+
self.dht.handle_bootstrap_info(&packet, addr).await,
77+
Packet::CryptoData(packet) =>
78+
self.handle_crypto_data(&packet, addr).await,
79+
Packet::OnionDataResponse(packet) =>
80+
self.handle_onion_data_response(&packet).await,
81+
Packet::OnionAnnounceResponse(packet) =>
82+
self.handle_onion_announce_response(&packet, addr).await,
83+
}
84+
}
85+
86+
/// Handle received `OnionDataResponse` packet and pass it to `onion_client` module.
87+
async fn handle_onion_data_response(&self, packet: &OnionDataResponse) -> Result<(), HandlePacketError> {
88+
if let Some(ref onion_client) = self.onion_client {
89+
onion_client.handle_data_response(packet).await
90+
.map_err(|e| e.context(HandlePacketErrorKind::HandleOnionClient).into())
91+
} else {
92+
Err(HandlePacketError::from(HandlePacketErrorKind::OnionClient))
93+
}
94+
}
95+
96+
/// Handle received `OnionAnnounceResponse` packet and pass it to `onion_client` module.
97+
async fn handle_onion_announce_response(&self, packet: &OnionAnnounceResponse, addr: SocketAddr) -> Result<(), HandlePacketError> {
98+
if let Some(ref onion_client) = self.onion_client {
99+
onion_client.handle_announce_response(packet, IsGlobal::is_global(&addr.ip())).await
100+
.map_err(|e| e.context(HandlePacketErrorKind::HandleOnionClient).into())
101+
} else {
102+
Err(HandlePacketError::from(HandlePacketErrorKind::OnionClient))
103+
}
104+
}
105+
106+
/// Set `onion_client` module.
107+
pub fn set_onion_client(&mut self, onion_client: OnionClient) {
108+
self.onion_client = Some(onion_client);
109+
}
110+
111+
/// Handle received `CookieRequest` packet and pass it to `net_crypto`
112+
/// module.
113+
pub async fn handle_cookie_request(&self, packet: &CookieRequest, addr: SocketAddr)
114+
-> Result<(), HandlePacketError> {
115+
if let Some(ref net_crypto) = self.net_crypto {
116+
net_crypto.handle_udp_cookie_request(packet, addr).await
117+
.map_err(|e| e.context(HandlePacketErrorKind::HandleNetCrypto).into())
118+
} else {
119+
Err(
120+
HandlePacketError::from(HandlePacketErrorKind::NetCrypto)
121+
)
122+
}
123+
}
124+
125+
/// Handle received `CookieResponse` packet and pass it to `net_crypto`
126+
/// module.
127+
pub async fn handle_cookie_response(&self, packet: &CookieResponse, addr: SocketAddr)
128+
-> Result<(), HandlePacketError> {
129+
if let Some(ref net_crypto) = self.net_crypto {
130+
net_crypto.handle_udp_cookie_response(packet, addr).await
131+
.map_err(|e| e.context(HandlePacketErrorKind::HandleNetCrypto).into())
132+
} else {
133+
Err(HandlePacketError::from(HandlePacketErrorKind::NetCrypto))
134+
}
135+
}
136+
137+
/// Handle received `CryptoHandshake` packet and pass it to `net_crypto`
138+
/// module.
139+
pub async fn handle_crypto_handshake(&self, packet: &CryptoHandshake, addr: SocketAddr)
140+
-> Result<(), HandlePacketError> {
141+
if let Some(ref net_crypto) = self.net_crypto {
142+
net_crypto.handle_udp_crypto_handshake(packet, addr).await
143+
.map_err(|e| e.context(HandlePacketErrorKind::HandleNetCrypto).into())
144+
} else {
145+
Err(HandlePacketError::from(HandlePacketErrorKind::NetCrypto))
146+
}
147+
}
148+
149+
/// Handle received `CryptoData` packet and pass it to `net_crypto` module.
150+
pub async fn handle_crypto_data(&self, packet: &CryptoData, addr: SocketAddr) -> Result<(), HandlePacketError> {
151+
if let Some(ref net_crypto) = self.net_crypto {
152+
net_crypto.handle_udp_crypto_data(packet, addr).await
153+
.map_err(|e| e.context(HandlePacketErrorKind::HandleNetCrypto).into())
154+
} else {
155+
Err(HandlePacketError::from(HandlePacketErrorKind::NetCrypto))
156+
}
157+
}
158+
159+
/// Set `net_crypto` module.
160+
pub fn set_net_crypto(&mut self, net_crypto: NetCrypto) {
161+
self.net_crypto = Some(net_crypto);
162+
}
163+
}

0 commit comments

Comments
 (0)