Skip to content

Commit

Permalink
optimization
Browse files Browse the repository at this point in the history
  • Loading branch information
dankmeme01 committed Nov 21, 2023
1 parent 197edc1 commit d40b168
Show file tree
Hide file tree
Showing 3 changed files with 76 additions and 121 deletions.
30 changes: 0 additions & 30 deletions server/game/src/data/packets/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@ use std::any::Any;

use crate::bytebufferext::{Decodable, Encodable};

use self::client::*;

type PacketId = u16;

/*
Expand Down Expand Up @@ -85,8 +83,6 @@ macro_rules! empty_client_packet {
};
}

use anyhow::anyhow;
use bytebuffer::ByteReader;
pub(crate) use empty_client_packet;
pub(crate) use empty_server_packet;
pub(crate) use packet;
Expand All @@ -103,29 +99,3 @@ pub trait PacketWithId {
}

pub const PACKET_HEADER_LEN: usize = std::mem::size_of::<PacketId>() + std::mem::size_of::<bool>();

macro_rules! mpacket {
($typ:ty,$br:expr) => {{
Ok(Box::new(<$typ>::decode_from_reader($br)?))
}};
}

pub fn match_packet(packet_id: PacketId, data: &mut ByteReader<'_>) -> anyhow::Result<Box<dyn Packet>> {
match packet_id {
PingPacket::PACKET_ID => mpacket!(PingPacket, data),
CryptoHandshakeStartPacket::PACKET_ID => mpacket!(CryptoHandshakeStartPacket, data),
KeepalivePacket::PACKET_ID => mpacket!(KeepalivePacket, data),
LoginPacket::PACKET_ID => mpacket!(LoginPacket, data),
DisconnectPacket::PACKET_ID => mpacket!(DisconnectPacket, data),

// game related
SyncIconsPacket::PACKET_ID => mpacket!(SyncIconsPacket, data),
RequestProfilesPacket::PACKET_ID => mpacket!(RequestProfilesPacket, data),
LevelJoinPacket::PACKET_ID => mpacket!(LevelJoinPacket, data),
LevelLeavePacket::PACKET_ID => mpacket!(LevelLeavePacket, data),
PlayerDataPacket::PACKET_ID => mpacket!(PlayerDataPacket, data),

VoicePacket::PACKET_ID => mpacket!(VoicePacket, data),
_ => Err(anyhow!("no matching packet in 'match_packet' with id {packet_id}")),
}
}
14 changes: 5 additions & 9 deletions server/game/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,6 @@ impl GameServer {
}

pub async fn run(&'static self) -> anyhow::Result<()> {
let mut buf = [0u8; 65536];

info!("Server launched on {}", self.address);

tokio::spawn(async move {
Expand All @@ -65,7 +63,7 @@ impl GameServer {
});

loop {
match self.recv_and_handle(&mut buf).await {
match self.recv_and_handle().await {
Ok(_) => {}
Err(err) => {
warn!("Failed to handle a packet: {err}");
Expand Down Expand Up @@ -106,8 +104,9 @@ impl GameServer {

/* private handling stuff */

async fn recv_and_handle(&'static self, buf: &mut [u8]) -> anyhow::Result<()> {
let (len, peer) = self.socket.recv_from(buf).await?;
async fn recv_and_handle(&'static self) -> anyhow::Result<()> {
let mut buf = [0u8; 65536];
let (len, peer) = self.socket.recv_from(&mut buf).await?;

let peer = match peer {
SocketAddr::V6(_) => return Err(anyhow!("rejecting request from ipv6 host")),
Expand Down Expand Up @@ -141,10 +140,7 @@ impl GameServer {
thread_cl
};

let packet = thread.parse_packet(&buf[..len]).map_err(|e| anyhow!("parsing failed: {e}"))?;
if let Some(packet) = packet {
thread.send_message(ServerThreadMessage::Packet(packet)).await?;
}
thread.send_message(ServerThreadMessage::Packet(buf[..len].to_vec())).await?;

Ok(())
}
Expand Down
153 changes: 71 additions & 82 deletions server/game/src/server_thread.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,16 @@ use tokio::sync::{
};

use crate::{
bytebufferext::{ByteBufferExt, ByteBufferExtRead, ByteBufferExtWrite},
bytebufferext::{ByteBufferExt, ByteBufferExtRead, ByteBufferExtWrite, Decodable},
data::{
packets::{client::*, match_packet, server::*, Packet, PacketWithId, PACKET_HEADER_LEN},
packets::{client::*, server::*, Packet, PacketWithId, PACKET_HEADER_LEN},
types::{AssociatedPlayerData, CryptoPublicKey, PlayerAccountData},
},
server::GameServer,
};

pub enum ServerThreadMessage {
Packet(Box<dyn Packet>),
Packet(Vec<u8>),
BroadcastVoice(VoiceBroadcastPacket),
}

Expand All @@ -53,20 +53,16 @@ pub struct GameServerThread {
macro_rules! gs_require {
($cond:expr,$msg:literal) => {
if !($cond) {
return Err(anyhow!($msg));
bail!($msg);
}
};
}

macro_rules! gs_handler {
($self:ident,$name:ident,$pktty:ty,$pkt:ident,$code:expr) => {
// Insanity if you ask me
async fn $name(&$self, packet: &dyn Packet) -> anyhow::Result<()> {
let _tmp = packet.as_any().downcast_ref::<$pktty>();
if _tmp.is_none() {
return Err(anyhow!("failed to downcast packet"));
}
let $pkt = _tmp.unwrap();
async fn $name(&$self, buf: &mut ByteReader<'_>) -> anyhow::Result<()> {
let $pkt = <$pktty>::decode_from_reader(buf)?;
$code
}
};
Expand Down Expand Up @@ -126,7 +122,7 @@ impl GameServerThread {
match tokio::time::timeout(Duration::from_secs(60), rx.recv()).await {
Ok(Some(message)) => match self.handle_message(message).await {
Ok(_) => {}
Err(err) => warn!("{}", err.to_string()),
Err(err) => warn!("[@{}]: {}", self.peer, err.to_string()),
},
Ok(None) => break, // sender closed
Err(_) => break, // timeout
Expand All @@ -145,51 +141,6 @@ impl GameServerThread {
self.awaiting_termination.store(true, Ordering::Relaxed);
}

pub fn parse_packet(&self, message: &[u8]) -> anyhow::Result<Option<Box<dyn Packet>>> {
gs_require!(message.len() >= PACKET_HEADER_LEN, "packet is missing a header");

let mut data = ByteReader::from_bytes(message);

let packet_id = data.read_u16()?;
let encrypted = data.read_bool()?;

// for optimization, reject the voice packet immediately if the player is blocked from vc
if packet_id == VoicePacket::PACKET_ID {
let accid = self.account_id.load(Ordering::Relaxed);
if self.game_server.chat_blocked(accid) {
debug!("blocking voice packet from {accid}");
return Ok(None);
}
}

let cleartext_vec;
if encrypted {
let cbox = self.crypto_box.lock();

gs_require!(
cbox.is_some(),
"attempting to decode an encrypted packet when no cryptobox was initialized"
);

let encrypted_data = data.read_bytes(data.len() - data.get_rpos())?;
let nonce = &encrypted_data[..24];
let rest = &encrypted_data[24..];

let cbox = cbox.as_ref().unwrap();
cleartext_vec = cbox.decrypt(nonce.into(), rest)?;

data = ByteReader::from_bytes(&cleartext_vec);
}

let packet = match_packet(packet_id, &mut data)?;

if packet.get_encrypted() && !encrypted {
gs_require!(false, "client sent a cleartext packet when expected an encrypted one");
}

Ok(Some(packet))
}

/* private utilities */

async fn send_packet(&self, packet: &impl Packet) -> anyhow::Result<()> {
Expand All @@ -214,6 +165,7 @@ impl GameServerThread {

let cbox = self.crypto_box.lock();

#[cfg(debug_assertions)]
gs_require!(
cbox.is_some(),
"trying to send an encrypted packet when no cryptobox was initialized"
Expand All @@ -236,16 +188,14 @@ impl GameServerThread {

async fn handle_message(&self, message: ServerThreadMessage) -> anyhow::Result<()> {
match message {
ServerThreadMessage::Packet(packet) => match self.handle_packet(&*packet).await {
ServerThreadMessage::Packet(data) => match self.handle_packet(data).await {
Ok(_) => {}
Err(err) => return Err(anyhow!("failed to handle packet: {}", err.to_string())),
Err(err) => bail!("failed to handle packet: {err}"),
},

ServerThreadMessage::BroadcastVoice(voice_packet) => match self.send_packet(&voice_packet).await {
Ok(_) => {}
Err(err) => {
warn!("failed to broadcast voice packet: {}", err.to_string())
}
Err(err) => bail!("failed to broadcast voice packet: {err}"),
},
}

Expand All @@ -254,23 +204,68 @@ impl GameServerThread {

/* packet handlers */

async fn handle_packet(&self, packet: &dyn Packet) -> anyhow::Result<()> {
match packet.get_packet_id() {
async fn handle_packet(&self, message: Vec<u8>) -> anyhow::Result<()> {
#[cfg(debug_assertions)]
gs_require!(message.len() >= PACKET_HEADER_LEN, "packet is missing a header");

let mut data = ByteReader::from_bytes(&message);

let packet_id = data.read_u16()?;
let encrypted = data.read_bool()?;

// for optimization, reject the voice packet immediately if the player is blocked from vc
if packet_id == VoicePacket::PACKET_ID {
let accid = self.account_id.load(Ordering::Relaxed);
if self.game_server.chat_blocked(accid) {
debug!("blocking voice packet from {accid}");
return Ok(());
}
}

let cleartext_vec;
if encrypted {
let cbox = self.crypto_box.lock();

gs_require!(
cbox.is_some(),
"attempting to decode an encrypted packet when no cryptobox was initialized"
);

let encrypted_data = data.read_bytes(data.len() - data.get_rpos())?;
let nonce = &encrypted_data[..24];
let rest = &encrypted_data[24..];

let cbox = cbox.as_ref().unwrap();
cleartext_vec = cbox.decrypt(nonce.into(), rest)?;

data = ByteReader::from_bytes(&cleartext_vec);
}

// minor optimization
if packet_id == PlayerDataPacket::PACKET_ID {
return self.handle_player_data(&mut data).await;
}

if packet_id == LoginPacket::PACKET_ID && !encrypted {
bail!("trying to login with cleartext credentials");
}

match packet_id {
/* connection related */
PingPacket::PACKET_ID => self.handle_ping(packet).await,
CryptoHandshakeStartPacket::PACKET_ID => self.handle_crypto_handshake(packet).await,
KeepalivePacket::PACKET_ID => self.handle_keepalive(packet).await,
LoginPacket::PACKET_ID => self.handle_login(packet).await,
DisconnectPacket::PACKET_ID => self.handle_disconnect(packet).await,
PingPacket::PACKET_ID => self.handle_ping(&mut data).await,
CryptoHandshakeStartPacket::PACKET_ID => self.handle_crypto_handshake(&mut data).await,
KeepalivePacket::PACKET_ID => self.handle_keepalive(&mut data).await,
LoginPacket::PACKET_ID => self.handle_login(&mut data).await,
DisconnectPacket::PACKET_ID => self.handle_disconnect(&mut data).await,

/* game related */
SyncIconsPacket::PACKET_ID => self.handle_sync_icons(packet).await,
RequestProfilesPacket::PACKET_ID => self.handle_request_profiles(packet).await,
LevelJoinPacket::PACKET_ID => self.handle_level_join(packet).await,
LevelLeavePacket::PACKET_ID => self.handle_level_leave(packet).await,
PlayerDataPacket::PACKET_ID => self.handle_player_data(packet).await,
SyncIconsPacket::PACKET_ID => self.handle_sync_icons(&mut data).await,
RequestProfilesPacket::PACKET_ID => self.handle_request_profiles(&mut data).await,
LevelJoinPacket::PACKET_ID => self.handle_level_join(&mut data).await,
LevelLeavePacket::PACKET_ID => self.handle_level_leave(&mut data).await,
PlayerDataPacket::PACKET_ID => self.handle_player_data(&mut data).await,

VoicePacket::PACKET_ID => self.handle_voice(packet).await,
VoicePacket::PACKET_ID => self.handle_voice(&mut data).await,
x => Err(anyhow!("No handler for packet id {x}")),
}
}
Expand Down Expand Up @@ -378,7 +373,6 @@ impl GameServerThread {
self.account_id.store(packet.account_id, Ordering::Relaxed);
self.game_server.state.player_count.fetch_add(1u32, Ordering::Relaxed); // increment player count

// i love std::sync::Mutex :DDDD
{
let mut account_data = self.account_data.lock();
account_data.account_id = packet.account_id;
Expand Down Expand Up @@ -477,13 +471,8 @@ impl GameServerThread {
let mut pm = self.game_server.state.player_manager.lock();
pm.set_player_data(account_id, &packet.data);

let players = pm.get_players_on_level(level_id);

if players.is_none() {
return Ok(());
}

let players = players.unwrap();
// this unwrap should be safe
let players = pm.get_players_on_level(level_id).unwrap();

let calc_size = PACKET_HEADER_LEN + 4 + ((players.len() - 1) * AssociatedPlayerData::encoded_size());
debug!("alloc with capacity: {calc_size}");
Expand Down

0 comments on commit d40b168

Please sign in to comment.