Skip to content

Commit

Permalink
Fix server outgoing topic
Browse files Browse the repository at this point in the history
  • Loading branch information
akiroz committed Feb 5, 2024
1 parent 1691966 commit 304e942
Show file tree
Hide file tree
Showing 3 changed files with 15 additions and 128 deletions.
122 changes: 1 addition & 121 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "zika"
version = "3.4.3"
version = "3.4.4"
license = "MIT"
description = "IP Tunneling over MQTT"
repository = "https://github.com/akiroz/zika"
Expand Down
19 changes: 13 additions & 6 deletions src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,13 +72,14 @@ impl Server {

let loop_ip_pool = server.ip_pool.clone();
let loop_remote = server.remote.clone();
let loop_topic = server.topic.clone();
task::spawn(async move {
while let Some(packet) = tun_stream.next().await {
match packet {
Ok(pkt) => {
let mut ip_pool = loop_ip_pool.lock().await;
let mut remote = loop_remote.lock().await;
let result = Self::handle_packet(&mut remote, &mut ip_pool, &pkt).await;
let result = Self::handle_packet(&mut remote, &mut ip_pool, loop_topic.as_str(), &pkt).await;
if let Err(err) = result {
log::error!("handle_packet error {:?}", err);
}
Expand Down Expand Up @@ -108,12 +109,18 @@ impl Server {
}

// tun -> mqtt
async fn handle_packet(remote: &mut remote::Remote, ip_pool: &mut IpPool, pkt: &TunPacket) -> Result<(), rumqttc::v5::ClientError> {
async fn handle_packet(
remote: &mut remote::Remote,
ip_pool: &mut IpPool,
topic_base: &str,
pkt: &TunPacket
) -> Result<(), rumqttc::v5::ClientError> {
let dest = Ipv4Header::from_slice(&pkt.get_bytes())
.ok()
.map(|(ipv4_header, _)| Ipv4Addr::from(ipv4_header.destination));
if let Some(d) = dest {
if let Some(topic) = ip_pool.get_reverse(&d.into()) {
if let Some(tunnel_id) = ip_pool.get_reverse(&d.into()) {
let topic = format!("{}/{}", &topic_base, tunnel_id);
remote.publish(&topic, pkt.get_bytes().to_vec()).await?;
} else {
log::debug!("drop packet: no tunnel for {:?}", &d);
Expand All @@ -124,13 +131,13 @@ impl Server {

// mqtt -> tun
async fn handle_remote_message(&self, tun_sink: &mut TunSink, id: &[u8], pkt: &[u8]) -> Result<(), Box<dyn StdError>> {
let base64_id = general_purpose::URL_SAFE_NO_PAD.encode(id);
let tunnel_id = general_purpose::URL_SAFE_NO_PAD.encode(id);
let (existing_tunnel, ip) = {
let mut ip_pool = self.ip_pool.lock().await;
ip_pool.get_forward(&base64_id)
ip_pool.get_forward(&tunnel_id)
};
if !existing_tunnel {
log::info!("alloc tunnel {} (IP {})", base64_id, ip);
log::info!("alloc tunnel {} (IP {})", tunnel_id, ip);
}
let nat_pkt = nat::do_nat(pkt, ip, self.local_addr)?;
tun_sink.send(TunPacket::new(nat_pkt)).await?;
Expand Down

0 comments on commit 304e942

Please sign in to comment.