From e27fe288323c80fd29d2fb2da8a81357161d4dbc Mon Sep 17 00:00:00 2001 From: dank_meme01 <42031238+dankmeme01@users.noreply.github.com> Date: Fri, 24 Nov 2023 14:34:25 +0100 Subject: [PATCH] so much optimization i can't take it anymore --- README.md | 2 +- server/central/src/web/routes/auth.rs | 1 + server/game/Cargo.toml | 10 +- server/game/src/bytebufferext.rs | 54 ++- server/game/src/data/packets/mod.rs | 7 +- .../src/data/packets/server/connection.rs | 14 +- server/game/src/data/types/crypto.rs | 2 + server/game/src/data/types/gd.rs | 14 +- server/game/src/managers/player.rs | 15 + server/game/src/server.rs | 10 +- server/game/src/server_thread.rs | 319 ++++++++++++------ src/data/packets/client/connection.hpp | 1 + src/managers/account_manager.hpp | 4 +- 13 files changed, 313 insertions(+), 140 deletions(-) diff --git a/README.md b/README.md index f35f3c75..9a6fb6c0 100644 --- a/README.md +++ b/README.md @@ -8,7 +8,7 @@ This repository contains the complete rewrite of Globed, for Geometry Dash 2.2 a * Real-time multiplayer * Voice chat and text messages with full encryption and zero logging -* blazing fast server written in pure Rust 🚀 (feauring only **one** unsafe block) +* blazing fast server written in pure Rust 🚀 (feauring only **two** unsafe blocks so far) ## Installation diff --git a/server/central/src/web/routes/auth.rs b/server/central/src/web/routes/auth.rs index 3369dee7..81ab344d 100644 --- a/server/central/src/web/routes/auth.rs +++ b/server/central/src/web/routes/auth.rs @@ -189,6 +189,7 @@ pub async fn challenge_start(context: &mut Context) -> roa::Result Ok(()) } +// rollercoaster of a function i'd say #[allow(clippy::too_many_lines)] pub async fn challenge_finish(context: &mut Context) -> roa::Result { check_user_agent!(context, _ua); diff --git a/server/game/Cargo.toml b/server/game/Cargo.toml index 339f68c4..4716575b 100644 --- a/server/game/Cargo.toml +++ b/server/game/Cargo.toml @@ -8,19 +8,19 @@ edition = "2021" [dependencies] globed-shared = { path = "../shared" } +alloca = "0.4.0" anyhow = "1.0.75" +array-init = "2.1.0" bytebuffer = "2.2.0" colored = "2.0.4" crypto_box = { version = "0.9.1", features = ["std", "chacha20"] } +lazy_static = "1.4.0" log = { version = "0.4.20" } +num_enum = "0.7.1" +parking_lot = "0.12.1" reqwest = "0.11.22" rustc-hash = "1.1.0" serde = { version = "1.0.192", features = ["serde_derive"] } serde_json = "1.0.108" time = { version = "0.3.30", features = ["formatting"] } tokio = { version = "1.34.0", features = ["full"] } -parking_lot = "0.12.1" -array-init = "2.1.0" -num_enum = "0.7.1" -alloca = "0.4.0" -lazy_static = "1.4.0" diff --git a/server/game/src/bytebufferext.rs b/server/game/src/bytebufferext.rs index e332beb3..39735045 100644 --- a/server/game/src/bytebufferext.rs +++ b/server/game/src/bytebufferext.rs @@ -16,9 +16,17 @@ pub trait Decodable { Self: Sized; } +// For dynamically sized types, this must be the maximum permitted size in the encoded form. +// If encode() tries to write more bytes than this value, FastByteBuffer will panic. +pub trait EncodableWithKnownSize: Encodable { + const ENCODED_SIZE: usize; +} + +pub const MAX_ENCODED_STRING_SIZE: usize = 512 + size_of_types!(u32); // 512 chars + macro_rules! encode_impl { - ($packet_type:ty, $buf:ident, $self:ident, $encode:expr) => { - impl crate::bytebufferext::Encodable for $packet_type { + ($typ:ty, $buf:ident, $self:ident, $encode:expr) => { + impl crate::bytebufferext::Encodable for $typ { fn encode(&$self, $buf: &mut bytebuffer::ByteBuffer) { $encode } @@ -31,8 +39,8 @@ macro_rules! encode_impl { } macro_rules! decode_impl { - ($packet_type:ty, $buf:ident, $decode:expr) => { - impl crate::bytebufferext::Decodable for $packet_type { + ($typ:ty, $buf:ident, $decode:expr) => { + impl crate::bytebufferext::Decodable for $typ { fn decode($buf: &mut bytebuffer::ByteBuffer) -> anyhow::Result { $decode } @@ -45,19 +53,19 @@ macro_rules! decode_impl { } macro_rules! encode_unimpl { - ($packet_type:ty) => { - impl crate::bytebufferext::Encodable for $packet_type { + ($typ:ty) => { + impl crate::bytebufferext::Encodable for $typ { fn encode(&self, _: &mut bytebuffer::ByteBuffer) { panic!( "Tried to call {}::encode when Encodable was not implemented for this type", - stringify!($packet_type) + stringify!($typ) ); } fn encode_fast(&self, _: &mut crate::bytebufferext::FastByteBuffer) { panic!( "Tried to call {}::encode_fast when Encodable was not implemented for this type", - stringify!($packet_type) + stringify!($typ) ); } } @@ -65,29 +73,39 @@ macro_rules! encode_unimpl { } macro_rules! decode_unimpl { - ($packet_type:ty) => { - impl crate::bytebufferext::Decodable for $packet_type { + ($typ:ty) => { + impl crate::bytebufferext::Decodable for $typ { fn decode(_: &mut bytebuffer::ByteBuffer) -> anyhow::Result { - Err(anyhow::anyhow!( - "decoding unimplemented for {}", - stringify!($packet_type) - )) + Err(anyhow::anyhow!("decoding unimplemented for {}", stringify!($typ))) } fn decode_from_reader(_: &mut bytebuffer::ByteReader) -> anyhow::Result { - Err(anyhow::anyhow!( - "decoding unimplemented for {}", - stringify!($packet_type) - )) + Err(anyhow::anyhow!("decoding unimplemented for {}", stringify!($typ))) } } }; } +macro_rules! size_calc_impl { + ($typ:ty, $calc:expr) => { + impl crate::bytebufferext::EncodableWithKnownSize for $typ { + const ENCODED_SIZE: usize = $calc; + } + }; +} + +macro_rules! size_of_types { + ($($t:ty),+ $(,)?) => {{ + 0 $(+ std::mem::size_of::<$t>())* + }}; +} + pub(crate) use decode_impl; pub(crate) use decode_unimpl; pub(crate) use encode_impl; pub(crate) use encode_unimpl; +pub(crate) use size_calc_impl; +pub(crate) use size_of_types; /* ByteBuffer extensions */ pub trait ByteBufferExt { diff --git a/server/game/src/data/packets/mod.rs b/server/game/src/data/packets/mod.rs index d7452858..c7f24d17 100644 --- a/server/game/src/data/packets/mod.rs +++ b/server/game/src/data/packets/mod.rs @@ -26,8 +26,9 @@ macro_rules! packet { } } - impl crate::data::packets::PacketWithId for $packet_type { + impl crate::data::packets::PacketMetadata for $packet_type { const PACKET_ID: crate::data::packets::PacketId = $packet_id; + const ENCRYPTED: bool = $encrypted; } }; ($packet_type:ident, $packet_id:expr, $encrypted:expr, { $($field:ident: $field_type:ty),* $(,)? }) => { @@ -49,6 +50,7 @@ macro_rules! packet { impl crate::data::packets::PacketMetadata for $packet_type { const PACKET_ID: crate::data::packets::PacketId = $packet_id; const ENCRYPTED: bool = $encrypted; + const NAME: &'static str = stringify!($packet_type); } }; } @@ -60,6 +62,8 @@ macro_rules! empty_server_packet { encode_impl!($packet_type, _buf, self, {}); decode_unimpl!($packet_type); + + size_calc_impl!($packet_type, 0); }; } @@ -86,6 +90,7 @@ pub trait Packet: Encodable + Decodable + Send + Sync + PacketMetadata { pub trait PacketMetadata { const PACKET_ID: PacketId; const ENCRYPTED: bool; + const NAME: &'static str; } pub const PACKET_HEADER_LEN: usize = std::mem::size_of::() + std::mem::size_of::(); diff --git a/server/game/src/data/packets/server/connection.rs b/server/game/src/data/packets/server/connection.rs index 39324bd3..73d73c81 100644 --- a/server/game/src/data/packets/server/connection.rs +++ b/server/game/src/data/packets/server/connection.rs @@ -1,5 +1,5 @@ use crate::{ - bytebufferext::{decode_unimpl, encode_impl, ByteBufferExtWrite}, + bytebufferext::*, data::{ packets::{empty_server_packet, packet}, types::CryptoPublicKey, @@ -20,6 +20,8 @@ encode_impl!(PingResponsePacket, buf, self, { decode_unimpl!(PingResponsePacket); +size_calc_impl!(PingResponsePacket, size_of_types!(u32, u32)); + /* CryptoHandshakeResponsePacket - 20001 */ packet!(CryptoHandshakeResponsePacket, 20001, false, { @@ -32,6 +34,8 @@ encode_impl!(CryptoHandshakeResponsePacket, buf, self, { decode_unimpl!(CryptoHandshakeResponsePacket); +size_calc_impl!(CryptoHandshakeResponsePacket, CryptoPublicKey::ENCODED_SIZE); + /* KeepaliveResponsePacket - 20002 */ packet!(KeepaliveResponsePacket, 20002, false, { @@ -44,6 +48,8 @@ encode_impl!(KeepaliveResponsePacket, buf, self, { decode_unimpl!(KeepaliveResponsePacket); +size_calc_impl!(KeepaliveResponsePacket, size_of_types!(u32)); + /* ServerDisconnectPacket - 20003 */ packet!(ServerDisconnectPacket, 20003, false, { @@ -56,6 +62,8 @@ encode_impl!(ServerDisconnectPacket, buf, self, { decode_unimpl!(ServerDisconnectPacket); +size_calc_impl!(ServerDisconnectPacket, MAX_ENCODED_STRING_SIZE); + /* LoggedInPacket - 20004 */ empty_server_packet!(LoggedInPacket, 20004); @@ -72,6 +80,8 @@ encode_impl!(LoginFailedPacket, buf, self, { decode_unimpl!(LoginFailedPacket); +size_calc_impl!(LoginFailedPacket, MAX_ENCODED_STRING_SIZE); + /* ServerNoticePacket - 20006 */ // used to communicate a simple message to the user @@ -84,3 +94,5 @@ encode_impl!(ServerNoticePacket, buf, self, { }); decode_unimpl!(ServerNoticePacket); + +size_calc_impl!(ServerNoticePacket, MAX_ENCODED_STRING_SIZE); diff --git a/server/game/src/data/types/crypto.rs b/server/game/src/data/types/crypto.rs index 21918f6d..fb572689 100644 --- a/server/game/src/data/types/crypto.rs +++ b/server/game/src/data/types/crypto.rs @@ -19,3 +19,5 @@ decode_impl!(CryptoPublicKey, buf, { pubkey: PublicKey::from_bytes(key), }) }); + +size_calc_impl!(CryptoPublicKey, KEY_SIZE); diff --git a/server/game/src/data/types/gd.rs b/server/game/src/data/types/gd.rs index 6bac35a7..04ccad4d 100644 --- a/server/game/src/data/types/gd.rs +++ b/server/game/src/data/types/gd.rs @@ -1,6 +1,6 @@ use globed_shared::SpecialUser; -use crate::bytebufferext::{decode_impl, decode_unimpl, encode_impl, ByteBufferExtWrite}; +use crate::bytebufferext::*; use super::Color3B; @@ -148,11 +148,7 @@ encode_impl!(PlayerData, _buf, self, {}); decode_impl!(PlayerData, _buf, Ok(Self {})); -impl PlayerData { - const fn encoded_size() -> usize { - 0 - } -} +size_calc_impl!(PlayerData, 0); /* AssociatedPlayerData */ @@ -167,8 +163,4 @@ encode_impl!(AssociatedPlayerData, buf, self, { buf.write_value(&self.data); }); -impl AssociatedPlayerData { - pub const fn encoded_size() -> usize { - std::mem::size_of::() + PlayerData::encoded_size() - } -} +size_calc_impl!(AssociatedPlayerData, size_of_types!(i32) + PlayerData::ENCODED_SIZE); diff --git a/server/game/src/managers/player.rs b/server/game/src/managers/player.rs index dafdbd8c..4cf85549 100644 --- a/server/game/src/managers/player.rs +++ b/server/game/src/managers/player.rs @@ -37,11 +37,26 @@ impl PlayerManager { self.levels.get(&level_id) } + pub fn get_player_count_on_level(&self, level_id: i32) -> Option { + self.levels.get(&level_id).map(FxHashSet::len) + } + pub fn get_players_on_level(&self, level_id: i32) -> Option> { let ids = self.levels.get(&level_id)?; Some(ids.iter().filter_map(|&key| self.players.get(&key)).collect()) } + pub fn for_each_player_on_level(&self, level_id: i32, f: F, additional: &mut A) + where + F: Fn(&AssociatedPlayerData, &mut A), + { + if let Some(ids) = self.levels.get(&level_id) { + ids.iter().filter_map(|&key| self.players.get(&key)).for_each(|data| { + f(data, additional); + }); + } + } + pub fn add_to_level(&mut self, level_id: i32, account_id: i32) { let players = self.levels.entry(level_id).or_default(); players.insert(account_id); diff --git a/server/game/src/server.rs b/server/game/src/server.rs index 76fbf6af..3d4dee33 100644 --- a/server/game/src/server.rs +++ b/server/game/src/server.rs @@ -82,8 +82,11 @@ impl GameServer { }); } + // we preallocate a buffer to avoid zeroing out MAX_PACKET_SIZE bytes on each packet + let mut buf = [0u8; MAX_PACKET_SIZE]; + loop { - match self.recv_and_handle().await { + match self.recv_and_handle(&mut buf).await { Ok(()) => {} Err(err) => { warn!("Failed to handle a packet: {err}"); @@ -145,9 +148,8 @@ impl GameServer { /* private handling stuff */ - async fn recv_and_handle(&'static self) -> anyhow::Result<()> { - let mut buf = [0u8; MAX_PACKET_SIZE]; - let (len, peer) = self.socket.recv_from(&mut buf).await?; + async fn recv_and_handle(&'static self, buf: &mut [u8]) -> anyhow::Result<()> { + let (len, peer) = self.socket.recv_from(buf).await?; let peer = match peer { SocketAddr::V6(_) => return Err(anyhow!("rejecting request from ipv6 host")), diff --git a/server/game/src/server_thread.rs b/server/game/src/server_thread.rs index 65f6a8a2..b4d1936a 100644 --- a/server/game/src/server_thread.rs +++ b/server/game/src/server_thread.rs @@ -1,15 +1,16 @@ use std::{ + fmt::Display, + io, net::SocketAddrV4, sync::{ atomic::{AtomicBool, AtomicI32, Ordering}, Arc, }, - time::{Duration, SystemTime}, + time::{Duration, SystemTime, SystemTimeError}, }; use parking_lot::Mutex as SyncMutex; -use anyhow::{anyhow, bail}; use bytebuffer::{ByteBuffer, ByteReader}; use crypto_box::{ aead::{Aead, AeadCore, OsRng}, @@ -30,7 +31,7 @@ use crate::{ // TODO adjust this to PlayerData size in the future plus some headroom pub const SMALL_PACKET_LIMIT: usize = 128; -const CHANNEL_BUFFER_SIZE: usize = 4; +const CHANNEL_BUFFER_SIZE: usize = 8; pub enum ServerThreadMessage { Packet(Vec), @@ -56,18 +57,12 @@ pub struct GameServerThread { last_voice_packet: SyncMutex, } -macro_rules! gs_require { - ($cond:expr,$msg:literal) => { - if !($cond) { - bail!($msg); - } - }; -} - macro_rules! gs_handler { ($self:ident,$name:ident,$pktty:ty,$pkt:ident,$code:expr) => { - async fn $name(&$self, buf: &mut ByteReader<'_>) -> anyhow::Result<()> { + async fn $name(&$self, buf: &mut ByteReader<'_>) -> Result<()> { let $pkt = <$pktty>::decode_from_reader(buf)?; + #[cfg(debug_assertions)] + log::debug!("Handling packet {}", <$pktty>::NAME); $code } }; @@ -75,8 +70,10 @@ macro_rules! gs_handler { macro_rules! gs_handler_sync { ($self:ident,$name:ident,$pktty:ty,$pkt:ident,$code:expr) => { - fn $name(&$self, buf: &mut ByteReader<'_>) -> anyhow::Result<()> { + fn $name(&$self, buf: &mut ByteReader<'_>) -> Result<()> { let $pkt = <$pktty>::decode_from_reader(buf)?; + #[cfg(debug_assertions)] + log::debug!("Handling packet {}", <$pktty>::NAME); $code } }; @@ -85,7 +82,9 @@ macro_rules! gs_handler_sync { macro_rules! gs_disconnect { ($self:ident, $msg:expr) => { $self.terminate(); - $self.send_packet(&ServerDisconnectPacket { message: $msg }).await?; + $self + .send_packet_fast(&ServerDisconnectPacket { message: $msg }) + .await?; return Ok(()); }; } @@ -105,6 +104,26 @@ macro_rules! gs_needauth { }; } +enum PacketHandlingError { + Other(String), + WrongCryptoBoxState, + EncryptionError(String), + DecryptionError(String), + IOError(io::Error), + MalformedMessage, + MalformedLoginAttempt, + MalformedCiphertext, + NoHandler(u16), + WebRequestError(reqwest::Error), + UnexpectedPlayerData, + SystemTimeError(SystemTimeError), + SocketSendFailed(io::Error), + SocketWouldBlock, + UnexpectedCentralResponse, +} + +type Result = core::result::Result; + impl GameServerThread { /* public api for the main server */ @@ -154,41 +173,95 @@ impl GameServerThread { /* private utilities */ - async fn send_packet(&self, packet: &impl Packet) -> anyhow::Result<()> { + async fn send_packet(&self, packet: &P) -> Result<()> { + #[cfg(debug_assertions)] + log::debug!("Sending {}", P::NAME); + let serialized = self.serialize_packet(packet)?; self.send_buffer(serialized.as_bytes()).await } - async fn send_buffer(&self, buffer: &[u8]) -> anyhow::Result<()> { - self.game_server.socket.send_to(buffer, self.peer).await?; + // fast packet sending with zero heap allocation + // packet must implement EncodableWithKnownSize to be fast sendable + // it also must **NOT** be encrypted. + // on average 2-3x faster than send_packet, even worst case should be faster by a bit + async fn send_packet_fast(&self, packet: &P) -> Result<()> { + assert!( + !packet.get_encrypted(), + "Attempting to fast encode an encrypted packet ({})", + P::NAME + ); + + let to_send: Result>> = alloca::with_alloca(PACKET_HEADER_LEN + P::ENCODED_SIZE, move |data| { + // safety: 'data' will have garbage data but that is considered safe for all our intents and purposes + // as `FastByteBuffer::as_bytes()` will only return what was already written. + let data = unsafe { + let ptr = data.as_mut_ptr().cast::(); + let len = std::mem::size_of_val(data); + std::slice::from_raw_parts_mut(ptr, len) + }; + + let mut buf = FastByteBuffer::new(data); + buf.write_u16(packet.get_packet_id()); + buf.write_bool(false); + buf.write_value(packet); + + let send_data = buf.as_bytes(); + match self.send_buffer_immediate(send_data) { + Err(PacketHandlingError::SocketWouldBlock) => Ok(Some(send_data.to_vec())), + Err(e) => Err(e), + Ok(()) => Ok(None), + } + }); + + if let Some(to_send) = to_send? { + self.send_buffer(&to_send).await?; + } + Ok(()) } + async fn send_buffer(&self, buffer: &[u8]) -> Result<()> { + self.game_server + .socket + .send_to(buffer, self.peer) + .await + .map(|_size| ()) + .map_err(PacketHandlingError::SocketSendFailed) + } + // attempt to send a buffer immediately to the socket, but if it requires blocking then nuh uh - fn send_buffer_immediate(&self, buffer: &[u8]) -> std::io::Result<()> { + fn send_buffer_immediate(&self, buffer: &[u8]) -> Result<()> { self.game_server .socket .try_send_to(buffer, std::net::SocketAddr::V4(self.peer)) .map(|_| ()) + .map_err(|e| { + if e.kind() == std::io::ErrorKind::WouldBlock { + PacketHandlingError::SocketWouldBlock + } else { + PacketHandlingError::SocketSendFailed(e) + } + }) } - fn serialize_packet(&self, packet: &impl Packet) -> anyhow::Result { + fn serialize_packet(&self, packet: &P) -> Result { let mut buf = ByteBuffer::new(); - buf.write_u16(packet.get_packet_id()); - buf.write_bool(packet.get_encrypted()); + buf.write_u16(P::PACKET_ID); + buf.write_bool(P::ENCRYPTED); - if !packet.get_encrypted() { + if !P::ENCRYPTED { packet.encode(&mut buf); return Ok(buf); } let cbox = self.crypto_box.lock(); + // should never happen #[cfg(debug_assertions)] - gs_require!( - cbox.is_some(), - "trying to send an encrypted packet when no cryptobox was initialized" - ); + if !cbox.is_some() { + return Err(PacketHandlingError::WrongCryptoBoxState); + } let mut cltxtbuf = ByteBuffer::new(); packet.encode(&mut cltxtbuf); @@ -196,7 +269,9 @@ impl GameServerThread { let cbox = cbox.as_ref().unwrap(); let nonce = ChaChaBox::generate_nonce(&mut OsRng); - let encrypted = cbox.encrypt(&nonce, cltxtbuf.as_bytes())?; + let encrypted = cbox + .encrypt(&nonce, cltxtbuf.as_bytes()) + .map_err(|e| PacketHandlingError::EncryptionError(e.to_string()))?; #[cfg(not(rust_analyzer))] // i am so sorry buf.write_bytes(&nonce); @@ -205,23 +280,11 @@ impl GameServerThread { Ok(buf) } - async fn handle_message(&self, message: ServerThreadMessage) -> anyhow::Result<()> { + async fn handle_message(&self, message: ServerThreadMessage) -> Result<()> { match message { - ServerThreadMessage::Packet(data) => match self.handle_packet(&data).await { - Ok(()) => {} - Err(err) => bail!("failed to handle packet: {err}"), - }, - - ServerThreadMessage::SmallPacket(data) => match self.handle_packet(&data).await { - Ok(()) => {} - Err(err) => bail!("failed to handle packet: {err}"), - }, - - ServerThreadMessage::BroadcastVoice(voice_packet) => match self.send_packet(&*voice_packet).await { - Ok(()) => {} - Err(err) => bail!("failed to broadcast voice packet: {err}"), - }, - + ServerThreadMessage::Packet(data) => self.handle_packet(&data).await?, + ServerThreadMessage::SmallPacket(data) => self.handle_packet(&data).await?, + ServerThreadMessage::BroadcastVoice(voice_packet) => self.send_packet(&*voice_packet).await?, ServerThreadMessage::TerminationNotice(message) => { self.terminate(); self.send_packet(&ServerDisconnectPacket { message }).await?; @@ -233,16 +296,23 @@ impl GameServerThread { /* packet handlers */ - async fn handle_packet(&self, message: &[u8]) -> anyhow::Result<()> { + async fn handle_packet(&self, message: &[u8]) -> Result<()> { #[cfg(debug_assertions)] - gs_require!(message.len() >= PACKET_HEADER_LEN, "packet is missing a header"); + if message.len() < PACKET_HEADER_LEN { + return Err(PacketHandlingError::MalformedMessage); + } let mut data = ByteReader::from_bytes(message); let packet_id = data.read_u16()?; let encrypted = data.read_bool()?; - // for optimization, reject the voice packet immediately if the player is blocked from vc + // minor optimization + if packet_id == PlayerDataPacket::PACKET_ID { + return self.handle_player_data(&mut data).await; + } + + // also for optimization, reject the voice packet immediately if the player is blocked from vc if packet_id == VoicePacket::PACKET_ID { let accid = self.account_id.load(Ordering::Relaxed); if self.game_server.chat_blocked(accid) { @@ -251,32 +321,31 @@ impl GameServerThread { } } - let cleartext_vec; - if encrypted { - let cbox = self.crypto_box.lock(); + // reject cleartext credentials + if packet_id == LoginPacket::PACKET_ID && !encrypted { + return Err(PacketHandlingError::MalformedLoginAttempt); + } - gs_require!( - cbox.is_some(), - "attempting to decode an encrypted packet when no cryptobox was initialized" - ); + let cleartext_vec: Vec; + if encrypted { + if message.len() < 24 + PACKET_HEADER_LEN { + return Err(PacketHandlingError::MalformedCiphertext); + } - let encrypted_data = data.read_bytes(data.len() - data.get_rpos())?; - let nonce = &encrypted_data[..24]; - let rest = &encrypted_data[24..]; + let cbox = self.crypto_box.lock(); + if cbox.is_none() { + return Err(PacketHandlingError::WrongCryptoBoxState); + } let cbox = cbox.as_ref().unwrap(); - cleartext_vec = cbox.decrypt(nonce.into(), rest)?; - data = ByteReader::from_bytes(&cleartext_vec); - } + let nonce = &message[PACKET_HEADER_LEN..PACKET_HEADER_LEN + 24]; + let ciphertext = &message[PACKET_HEADER_LEN + 24..]; - // minor optimization - if packet_id == PlayerDataPacket::PACKET_ID { - return self.handle_player_data(&mut data).await; - } - - if packet_id == LoginPacket::PACKET_ID && !encrypted { - bail!("trying to login with cleartext credentials"); + cleartext_vec = cbox + .decrypt(nonce.into(), ciphertext) + .map_err(|e| PacketHandlingError::DecryptionError(e.to_string()))?; + data = ByteReader::from_bytes(&cleartext_vec); } match packet_id { @@ -296,12 +365,14 @@ impl GameServerThread { RequestPlayerListPacket::PACKET_ID => self.handle_request_player_list(&mut data).await, VoicePacket::PACKET_ID => self.handle_voice(&mut data).await, - x => Err(anyhow!("No handler for packet id {x}")), + x => Err(PacketHandlingError::NoHandler(x)), } } + /* connection related */ + gs_handler!(self, handle_ping, PingPacket, packet, { - self.send_packet(&PingResponsePacket { + self.send_packet_fast(&PingResponsePacket { id: packet.id, player_count: self.game_server.state.player_count.load(Ordering::Relaxed), }) @@ -335,13 +406,15 @@ impl GameServerThread { // as ServerThread is now tied to the SocketAddrV4 and not account id like in globed v0 // erroring here is not a concern, even if the user's game crashes without a disconnect packet, // they would have a new randomized port when they restart and this would never fail. - gs_require!(cbox.is_none(), "attempting to initialize a cryptobox twice"); + if cbox.is_some() { + return Err(PacketHandlingError::WrongCryptoBoxState); + } let new_box = ChaChaBox::new(&packet.key.pubkey, &self.game_server.secret_key); *cbox = Some(new_box); } - self.send_packet(&CryptoHandshakeResponsePacket { + self.send_packet_fast(&CryptoHandshakeResponsePacket { key: CryptoPublicKey { pubkey: self.game_server.secret_key.public_key().clone(), }, @@ -352,7 +425,7 @@ impl GameServerThread { gs_handler!(self, handle_keepalive, KeepalivePacket, _packet, { gs_needauth!(self); - self.send_packet(&KeepaliveResponsePacket { + self.send_packet_fast(&KeepaliveResponsePacket { player_count: self.game_server.state.player_count.load(Ordering::Relaxed), }) .await @@ -370,7 +443,7 @@ impl GameServerThread { account_data.account_id = packet.account_id; account_data.name = format!("Player{}", packet.account_id); } - self.send_packet(&LoggedInPacket {}).await?; + self.send_packet_fast(&LoggedInPacket {}).await?; return Ok(()); } @@ -404,7 +477,10 @@ impl GameServerThread { return Ok(()); } - let player_name = response.split_once(':').ok_or(anyhow!("central server is drunk"))?.1; + let player_name = response + .split_once(':') + .ok_or(PacketHandlingError::UnexpectedCentralResponse)? + .1; self.game_server.check_already_logged_in(packet.account_id)?; self.authenticated.store(true, Ordering::Relaxed); @@ -431,7 +507,7 @@ impl GameServerThread { debug!("Login successful from {player_name} ({})", packet.account_id); - self.send_packet(&LoggedInPacket {}).await?; + self.send_packet_fast(&LoggedInPacket {}).await?; Ok(()) }); @@ -497,19 +573,20 @@ impl GameServerThread { let level_id = self.level_id.load(Ordering::Relaxed); if level_id == 0 { - bail!("player sending PlayerDataPacket when not on a level"); + return Err(PacketHandlingError::UnexpectedPlayerData); } let account_id = self.account_id.load(Ordering::Relaxed); - let retval = { + let retval: Result>> = { let mut pm = self.game_server.state.player_manager.lock(); pm.set_player_data(account_id, &packet.data); // this unwrap should be safe - let players = pm.get_players_on_level(level_id).unwrap(); + let written_players = pm.get_player_count_on_level(level_id).unwrap() - 1; + drop(pm); - let calc_size = PACKET_HEADER_LEN + 4 + ((players.len() - 1) * AssociatedPlayerData::encoded_size()); + let calc_size = PACKET_HEADER_LEN + 4 + (written_players * AssociatedPlayerData::ENCODED_SIZE); alloca::with_alloca(calc_size, move |data| { // safety: 'data' will have garbage data but that is considered safe for all our intents and purposes @@ -525,33 +602,35 @@ impl GameServerThread { // dont actually do this anywhere else please buf.write_u16(LevelDataPacket::PACKET_ID); buf.write_bool(false); - buf.write_u32(players.len() as u32 - 1); // minus ourselves - - for player in &players { - if player.account_id == account_id { - continue; - } - - buf.write_value(*player); - } + buf.write_u32(written_players as u32); + + // this is scary + self.game_server.state.player_manager.lock().for_each_player_on_level( + level_id, + |player, buf| { + // we do additional length check because player count may have changed since 1st lock + // NOTE: this assumes encoded size of AssociatedPlayerData is a constant + // change this to something else if that won't be true in the future + if buf.len() != buf.capacity() && player.account_id != account_id { + buf.write_value(player); + } + }, + &mut buf, + ); let data = buf.as_bytes(); - - // see if we can send it right there and then - let retval: Result>, anyhow::Error> = match self.send_buffer_immediate(data) { + match self.send_buffer_immediate(data) { // if we cant send without blocking, accept our defeat and clone the data to a vec - Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => Ok(Some(data.to_vec())), + Err(PacketHandlingError::SocketWouldBlock) => Ok(Some(data.to_vec())), // if another error occured, propagate it up - Err(e) => Err(e.into()), + Err(e) => Err(e), // if all good, do nothing Ok(()) => Ok(None), - }; - - retval + } }) - }?; + }; - if let Some(data) = retval { + if let Some(data) = retval? { debug!("fast PlayerData response failed, issuing a blocking call"); self.send_buffer(&data).await?; } @@ -601,3 +680,49 @@ impl GameServerThread { Ok(()) }); } + +impl From for PacketHandlingError { + fn from(value: anyhow::Error) -> Self { + PacketHandlingError::Other(value.to_string()) + } +} + +impl From for PacketHandlingError { + fn from(value: reqwest::Error) -> Self { + PacketHandlingError::WebRequestError(value) + } +} + +impl From for PacketHandlingError { + fn from(value: SystemTimeError) -> Self { + PacketHandlingError::SystemTimeError(value) + } +} + +impl From for PacketHandlingError { + fn from(value: std::io::Error) -> Self { + PacketHandlingError::IOError(value) + } +} + +impl Display for PacketHandlingError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Self::Other(msg) => f.write_str(msg), + Self::IOError(msg) => f.write_fmt(format_args!("IO Error: {msg}")), + Self::WrongCryptoBoxState => f.write_str("wrong crypto box state for the given operation"), + Self::EncryptionError(msg) => f.write_fmt(format_args!("Encryption failed: {msg}")), + Self::DecryptionError(msg) => f.write_fmt(format_args!("Decryption failed: {msg}")), + Self::MalformedCiphertext => f.write_str("malformed ciphertext in an encrypted packet"), + Self::MalformedMessage => f.write_str("malformed message structure"), + Self::MalformedLoginAttempt => f.write_str("malformed login attempt"), + Self::NoHandler(id) => f.write_fmt(format_args!("no packet handler for packet ID {id}")), + Self::WebRequestError(msg) => f.write_fmt(format_args!("web request error: {msg}")), + Self::UnexpectedPlayerData => f.write_str("received PlayerDataPacket when not on a level"), + Self::SystemTimeError(msg) => f.write_fmt(format_args!("system time error: {msg}")), + Self::SocketSendFailed(err) => f.write_fmt(format_args!("socket send failed: {err}")), + Self::SocketWouldBlock => f.write_str("could not do a non-blocking operation on the socket as it would block"), + Self::UnexpectedCentralResponse => f.write_str("got unexpected response from the central server"), + } + } +} diff --git a/src/data/packets/client/connection.hpp b/src/data/packets/client/connection.hpp index df42e765..6da5e21a 100644 --- a/src/data/packets/client/connection.hpp +++ b/src/data/packets/client/connection.hpp @@ -24,6 +24,7 @@ class CryptoHandshakeStartPacket : public Packet { buf.writeU16(protocol); buf.writeValue(key); } + GLOBED_PACKET_DECODE_UNIMPL CryptoHandshakeStartPacket(uint16_t _protocol, CryptoPublicKey _key) : protocol(_protocol), key(_key) {} diff --git a/src/managers/account_manager.hpp b/src/managers/account_manager.hpp index f7773be1..4ab40d83 100644 --- a/src/managers/account_manager.hpp +++ b/src/managers/account_manager.hpp @@ -24,7 +24,7 @@ class GlobedAccountManager { GlobedAccountManager(); - // This method can be called multiple times, and in fact it is even advised that you do so. + // This method can be called multiple times, and in fact it is even advised that you do so often. // It must be called at least once before calling any other method or they will throw an exception. void initialize(const std::string& name, int accountId, const std::string& gjp, const std::string& central); @@ -41,7 +41,7 @@ class GlobedAccountManager { std::string computeGDDataHash(const std::string& name, int accountId, const std::string& gjp, const std::string& central); - // uses the precomputed hash from GDData and prepends it to the given 'key' + // uses the precomputed hash from GDData and appends it to the given 'key' // i.e. getKeyFor("auth-totp-key") => "auth-totp-key-ab12cd34ef" std::string getKeyFor(const std::string& key); }; \ No newline at end of file