Skip to content

Commit

Permalink
Support raw mqtt config from new ctor
Browse files Browse the repository at this point in the history
  • Loading branch information
akiroz committed Jan 15, 2024
1 parent 85f60aa commit 302261d
Show file tree
Hide file tree
Showing 6 changed files with 34 additions and 40 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "zika"
version = "3.1.0"
version = "3.2.0"
license = "MIT"
description = "IP Tunneling over MQTT"
repository = "https://github.com/akiroz/zika"
Expand Down
2 changes: 1 addition & 1 deletion src/bin/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,6 @@ async fn main() {
env_logger::init();
let config = read_from_default_location().expect("A proper config file");
log::debug!("Config = {:?}", config);
let mut client = Client::new(&config).await;
let mut client = Client::from_config(config).await;
client.run().await;
}
2 changes: 1 addition & 1 deletion src/bin/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,6 @@ async fn main() {
env_logger::init();
let config = read_from_default_location().expect("A proper config file");
log::debug!("Config = {:?}", config);
let mut server = Server::new(config);
let mut server = Server::from_config(config);
server.run().await;
}
41 changes: 17 additions & 24 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use base64::{engine::general_purpose, Engine as _};
use futures::{SinkExt, stream::{SplitSink, StreamExt}};
use rand::{thread_rng, Rng, distributions::Standard};

use rumqttc;
use etherparse::Ipv4Header;
use ipnetwork::Ipv4Network;
use tokio::{task, sync::{mpsc, broadcast, Mutex}};
Expand Down Expand Up @@ -37,19 +38,18 @@ struct Tunnel {
}

impl Client {
pub async fn new(config: &config::Config) -> Self {
let client_config = config
.client
.as_ref()
.expect("non-null config");

let ip_network: Ipv4Network = client_config
.bind_cidr
.parse()
.expect("CIDR notation");
let local_addr = SizedIpv4NetworkIterator::new(ip_network)
.next()
.expect("subnet size > 1");
pub async fn from_config(config: config::Config) -> Self {
let mqtt_options = config.broker_mqtt_options();
let client_config = config.client.expect("non-null client config");
Self::new(mqtt_options, client_config).await
}

pub async fn new(
mqtt_options: Vec<rumqttc::v5::MqttOptions>,
client_config: config::ClientConfig
) -> Self {
let ip_network: Ipv4Network = client_config.bind_cidr.parse().expect("CIDR notation");
let local_addr = SizedIpv4NetworkIterator::new(ip_network).next().expect("subnet size > 1");

log::info!("tun {:?}/{}", local_addr, ip_network.prefix());
let mut tun_config = tun::Configuration::default();
Expand All @@ -63,17 +63,12 @@ impl Client {
});

tun_config.up();

let tun_dev = tun::create_as_async(&tun_config).expect("tunnel");
let (tun_sink, mut tun_stream) = tun_dev.into_framed().split();

let mqtt_options = config.broker_mqtt_options();

let (remote, remote_recv) = remote::Remote::new(&mqtt_options, Vec::new());

let (remote, remote_recv) = remote::Remote::new(&mqtt_options, vec![]);
let mut tunnels = Vec::with_capacity(client_config.tunnels.len());
let mut rng = thread_rng();

for client_tunnel_config in &client_config.tunnels {
let random_id: Vec<u8> = (&mut rng)
.sample_iter(Standard)
Expand All @@ -95,18 +90,16 @@ impl Client {
log::error!("subscribe failed: {:?}", err);
}

let tunnel = Tunnel {
tunnels.push(Tunnel {
id: random_id,
topic,
topic_base: topic_base.to_string(),
bind_addr,
};

tunnels.push(tunnel);
});
}

let (remote_passthru_send, remote_passthru_recv) = broadcast::channel(1);
let client = Client {
let client = Self {
local_addr,
tunnels: Arc::new(tunnels),
remote: Arc::new(Mutex::new(remote)),
Expand Down
25 changes: 13 additions & 12 deletions src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,20 +30,21 @@ pub struct Server {
}

impl Server {
pub fn new(config: config::Config) -> Self {
pub fn from_config(config: config::Config) -> Self {
let mqtt_options = config.broker_mqtt_options();
let server_config = config.server.expect("non-null config");
let (mut remote, remote_recv) =
remote::Remote::new(&mqtt_options, vec![server_config.topic.clone()]);

let ip_network: Ipv4Network = server_config
.bind_cidr
.parse()
.expect("CIDR notation");
let server_config = config.server.expect("non-null server config");
Self::new(mqtt_options, server_config)
}

pub fn new(
mqtt_options: Vec<rumqttc::v5::MqttOptions>,
server_config: config::ServerConfig
) -> Self {
let ip_network: Ipv4Network = server_config.bind_cidr.parse().expect("CIDR notation");
let mut ip_iter = SizedIpv4NetworkIterator::new(ip_network);
let local_addr = ip_iter
.next()
.expect("subnet size > 1");
let local_addr = ip_iter.next().expect("subnet size > 1");

let (mut remote, remote_recv) = remote::Remote::new(&mqtt_options, vec![server_config.topic.clone()]);

log::info!("bind {:?}/{}", local_addr, ip_network.prefix());

Expand Down

0 comments on commit 302261d

Please sign in to comment.