From 3e2c3e73c08caf537f63df793b4ee13b351b6feb Mon Sep 17 00:00:00 2001 From: dank_meme01 <42031238+dankmeme01@users.noreply.github.com> Date: Thu, 23 Nov 2023 19:45:01 +0100 Subject: [PATCH] hell yeah --- README.md | 2 +- server/central/src/main.rs | 9 +- server/central/src/state.rs | 2 +- server/central/src/web/routes/auth.rs | 1 + server/game/Cargo.toml | 2 + server/game/src/bytebufferext.rs | 211 +++++++++++++++++++++----- server/game/src/data/packets/mod.rs | 8 +- server/game/src/logger.rs | 14 +- server/game/src/main.rs | 14 +- server/game/src/server.rs | 39 +++-- server/game/src/server_thread.rs | 99 ++++++++---- server/game/src/state.rs | 2 - 12 files changed, 313 insertions(+), 90 deletions(-) diff --git a/README.md b/README.md index 2fe3cb62..f35f3c75 100644 --- a/README.md +++ b/README.md @@ -8,7 +8,7 @@ This repository contains the complete rewrite of Globed, for Geometry Dash 2.2 a * Real-time multiplayer * Voice chat and text messages with full encryption and zero logging -* blazing fast server written in pure Rust 🚀 +* blazing fast server written in pure Rust 🚀 (feauring only **one** unsafe block) ## Installation diff --git a/server/central/src/main.rs b/server/central/src/main.rs index 3e147c38..d7c7358b 100644 --- a/server/central/src/main.rs +++ b/server/central/src/main.rs @@ -1,3 +1,10 @@ +#![allow( + clippy::must_use_candidate, + clippy::module_name_repetitions, + clippy::missing_errors_doc, + clippy::missing_panics_doc +)] + use std::{error::Error, path::PathBuf, sync::Arc, time::Duration}; use async_watcher::{notify::RecursiveMode, AsyncDebouncer}; @@ -63,7 +70,7 @@ async fn main() -> Result<(), Box> { let state_skey = config.secret_key.clone(); let state = ServerState { - inner: Arc::new(RwLock::new(ServerStateData::new(config_path.clone(), config, state_skey))), + inner: Arc::new(RwLock::new(ServerStateData::new(config_path.clone(), config, &state_skey))), }; // config file watcher diff --git a/server/central/src/state.rs b/server/central/src/state.rs index 2e7a9a60..fb4fac34 100644 --- a/server/central/src/state.rs +++ b/server/central/src/state.rs @@ -36,7 +36,7 @@ pub struct ServerStateData { } impl ServerStateData { - pub fn new(config_path: PathBuf, config: ServerConfig, secret_key: String) -> Self { + pub fn new(config_path: PathBuf, config: ServerConfig, secret_key: &str) -> Self { let skey_bytes = secret_key.as_bytes(); let hmac_obj = Hmac::::new_from_slice(skey_bytes).unwrap(); diff --git a/server/central/src/web/routes/auth.rs b/server/central/src/web/routes/auth.rs index 3d3c2f52..3369dee7 100644 --- a/server/central/src/web/routes/auth.rs +++ b/server/central/src/web/routes/auth.rs @@ -189,6 +189,7 @@ pub async fn challenge_start(context: &mut Context) -> roa::Result Ok(()) } +#[allow(clippy::too_many_lines)] pub async fn challenge_finish(context: &mut Context) -> roa::Result { check_user_agent!(context, _ua); diff --git a/server/game/Cargo.toml b/server/game/Cargo.toml index a1d52ad0..339f68c4 100644 --- a/server/game/Cargo.toml +++ b/server/game/Cargo.toml @@ -22,3 +22,5 @@ tokio = { version = "1.34.0", features = ["full"] } parking_lot = "0.12.1" array-init = "2.1.0" num_enum = "0.7.1" +alloca = "0.4.0" +lazy_static = "1.4.0" diff --git a/server/game/src/bytebufferext.rs b/server/game/src/bytebufferext.rs index 013be6fb..e332beb3 100644 --- a/server/game/src/bytebufferext.rs +++ b/server/game/src/bytebufferext.rs @@ -4,6 +4,7 @@ use bytebuffer::{ByteBuffer, ByteReader}; pub trait Encodable { fn encode(&self, buf: &mut ByteBuffer); + fn encode_fast(&self, buf: &mut FastByteBuffer); } pub trait Decodable { @@ -21,6 +22,10 @@ macro_rules! encode_impl { fn encode(&$self, $buf: &mut bytebuffer::ByteBuffer) { $encode } + + fn encode_fast(&$self, $buf: &mut crate::bytebufferext::FastByteBuffer) { + $encode + } } }; } @@ -48,6 +53,13 @@ macro_rules! encode_unimpl { stringify!($packet_type) ); } + + fn encode_fast(&self, _: &mut crate::bytebufferext::FastByteBuffer) { + panic!( + "Tried to call {}::encode_fast when Encodable was not implemented for this type", + stringify!($packet_type) + ); + } } }; } @@ -77,6 +89,7 @@ pub(crate) use decode_unimpl; pub(crate) use encode_impl; pub(crate) use encode_unimpl; +/* ByteBuffer extensions */ pub trait ByteBufferExt { fn with_capacity(capacity: usize) -> Self; } @@ -117,63 +130,177 @@ pub trait ByteBufferExtRead { fn read_point(&mut self) -> Result; } -impl ByteBufferExt for ByteBuffer { - fn with_capacity(capacity: usize) -> Self { - let mut ret = Self::from_vec(Vec::with_capacity(capacity)); - ret.set_wpos(0); - ret - } +/* FastByteBuffer - zero heap allocation buffer for encoding but also limited functionality */ +// will panic on writes if there isn't enough space. + +pub struct FastByteBuffer<'a> { + pos: usize, + data: &'a mut [u8], } -impl ByteBufferExtWrite for ByteBuffer { - fn write_bool(&mut self, val: bool) { - self.write_u8(u8::from(val)); +impl<'a> FastByteBuffer<'a> { + // Create a new FastByteBuffer given this mutable slice + pub fn new(src: &'a mut [u8]) -> Self { + Self { pos: 0, data: src } } - fn write_byte_array(&mut self, val: &[u8]) { - self.write_u32(val.len() as u32); - self.write_bytes(val); + #[inline] + pub fn write_u8(&mut self, val: u8) { + self.internal_write(&val.to_be_bytes()); } - fn write_value(&mut self, val: &T) { - val.encode(self); + #[inline] + pub fn write_u16(&mut self, val: u16) { + self.internal_write(&val.to_be_bytes()); } - fn write_optional_value(&mut self, val: Option<&T>) { - self.write_bool(val.is_some()); - if let Some(val) = val { - self.write_value(val); - } + #[inline] + pub fn write_u32(&mut self, val: u32) { + self.internal_write(&val.to_be_bytes()); + } + + #[inline] + pub fn write_u64(&mut self, val: u64) { + self.internal_write(&val.to_be_bytes()); } - fn write_value_array(&mut self, val: &[T; N]) { - val.iter().for_each(|v| self.write_value(v)); + #[inline] + pub fn write_i8(&mut self, val: i8) { + self.internal_write(&val.to_be_bytes()); } - fn write_value_vec(&mut self, val: &[T]) { + #[inline] + pub fn write_i16(&mut self, val: i16) { + self.internal_write(&val.to_be_bytes()); + } + + #[inline] + pub fn write_i32(&mut self, val: i32) { + self.internal_write(&val.to_be_bytes()); + } + + #[inline] + pub fn write_i64(&mut self, val: i64) { + self.internal_write(&val.to_be_bytes()); + } + + #[inline] + pub fn write_f32(&mut self, val: f32) { + self.internal_write(&val.to_be_bytes()); + } + + #[inline] + pub fn write_f64(&mut self, val: f64) { + self.internal_write(&val.to_be_bytes()); + } + + #[inline] + pub fn write_bytes(&mut self, data: &[u8]) { + self.internal_write(data); + } + + #[inline] + pub fn write_string(&mut self, val: &str) { self.write_u32(val.len() as u32); - for elem in val { - elem.encode(self); - } + self.write_bytes(val.as_bytes()); + } + + #[inline] + pub fn as_bytes(&'a mut self) -> &'a [u8] { + &self.data[..self.pos] } - fn write_enum, B: Encodable>(&mut self, val: E) { - self.write_value(&val.into()); + #[inline] + pub fn len(&self) -> usize { + self.pos } - fn write_color3(&mut self, val: cocos::Color3B) { - self.write_value(&val); + #[inline] + pub fn is_empty(&self) -> bool { + self.pos == 0 } - fn write_color4(&mut self, val: cocos::Color4B) { - self.write_value(&val); + #[inline] + pub fn capacity(&self) -> usize { + self.data.len() } - fn write_point(&mut self, val: cocos::Point) { - self.write_value(&val); + // Panics if there is not enough capacity left to write the data. + fn internal_write(&mut self, data: &[u8]) { + debug_assert!( + self.pos + data.len() <= self.capacity(), + "not enough space to write data into FastByteBuffer, capacity: {}, pos: {}, attempted write size: {}", + self.capacity(), + self.pos, + data.len() + ); + + self.data[self.pos..self.pos + data.len()].copy_from_slice(data); + self.pos += data.len(); } } +/* ByteBuffer extension implementation for ByteBuffer, ByteReader and FastByteBuffer */ + +impl ByteBufferExt for ByteBuffer { + fn with_capacity(capacity: usize) -> Self { + let mut ret = Self::from_vec(Vec::with_capacity(capacity)); + ret.set_wpos(0); + ret + } +} + +macro_rules! impl_extwrite { + ($encode_fn:ident) => { + fn write_bool(&mut self, val: bool) { + self.write_u8(u8::from(val)); + } + + fn write_byte_array(&mut self, val: &[u8]) { + self.write_u32(val.len() as u32); + self.write_bytes(val); + } + + fn write_value(&mut self, val: &T) { + val.$encode_fn(self); + } + + fn write_optional_value(&mut self, val: Option<&T>) { + self.write_bool(val.is_some()); + if let Some(val) = val { + self.write_value(val); + } + } + + fn write_value_array(&mut self, val: &[T; N]) { + val.iter().for_each(|v| self.write_value(v)); + } + + fn write_value_vec(&mut self, val: &[T]) { + self.write_u32(val.len() as u32); + for elem in val { + elem.$encode_fn(self); + } + } + + fn write_enum, B: Encodable>(&mut self, val: E) { + self.write_value(&val.into()); + } + + fn write_color3(&mut self, val: cocos::Color3B) { + self.write_value(&val); + } + + fn write_color4(&mut self, val: cocos::Color4B) { + self.write_value(&val); + } + + fn write_point(&mut self, val: cocos::Point) { + self.write_value(&val); + } + }; +} + macro_rules! impl_extread { ($decode_fn:ident) => { fn read_bool(&mut self) -> Result { @@ -231,6 +358,14 @@ macro_rules! impl_extread { }; } +impl ByteBufferExtWrite for ByteBuffer { + impl_extwrite!(encode); +} + +impl<'a> ByteBufferExtWrite for FastByteBuffer<'a> { + impl_extwrite!(encode_fast); +} + impl ByteBufferExtRead for ByteBuffer { impl_extread!(decode); } @@ -239,7 +374,7 @@ impl<'a> ByteBufferExtRead for ByteReader<'a> { impl_extread!(decode_from_reader); } -/* Implementations for common types */ +/* Encodable/Decodable implementations for common types */ macro_rules! impl_primitive { ($typ:ty,$read:ident,$write:ident) => { @@ -247,6 +382,10 @@ macro_rules! impl_primitive { fn encode(&self, buf: &mut ByteBuffer) { buf.$write(*self); } + + fn encode_fast(&self, buf: &mut FastByteBuffer) { + buf.$write(*self); + } } impl Decodable for $typ { @@ -279,6 +418,10 @@ impl Encodable for Vec { fn encode(&self, buf: &mut ByteBuffer) { buf.write_byte_array(self); } + + fn encode_fast(&self, buf: &mut FastByteBuffer) { + buf.write_byte_array(self); + } } impl Decodable for Vec { diff --git a/server/game/src/data/packets/mod.rs b/server/game/src/data/packets/mod.rs index 7e428a8a..d7452858 100644 --- a/server/game/src/data/packets/mod.rs +++ b/server/game/src/data/packets/mod.rs @@ -46,8 +46,9 @@ macro_rules! packet { } } - impl crate::data::packets::PacketWithId for $packet_type { + impl crate::data::packets::PacketMetadata for $packet_type { const PACKET_ID: crate::data::packets::PacketId = $packet_id; + const ENCRYPTED: bool = $encrypted; } }; } @@ -76,14 +77,15 @@ pub(crate) use empty_client_packet; pub(crate) use empty_server_packet; pub(crate) use packet; -pub trait Packet: Encodable + Decodable + Send + Sync { +pub trait Packet: Encodable + Decodable + Send + Sync + PacketMetadata { fn get_packet_id(&self) -> PacketId; fn get_encrypted(&self) -> bool; } // god i hate this -pub trait PacketWithId { +pub trait PacketMetadata { const PACKET_ID: PacketId; + const ENCRYPTED: bool; } pub const PACKET_HEADER_LEN: usize = std::mem::size_of::() + std::mem::size_of::(); diff --git a/server/game/src/logger.rs b/server/game/src/logger.rs index 17aa0286..c6b4b1a2 100644 --- a/server/game/src/logger.rs +++ b/server/game/src/logger.rs @@ -1,13 +1,22 @@ use std::time::SystemTime; use colored::Colorize; +use lazy_static::lazy_static; use log::Level; use time::{format_description, OffsetDateTime}; -pub struct Logger; +pub struct Logger { + format_desc: Vec>, +} const TIME_FORMAT: &str = "[year]-[month]-[day] [hour]:[minute]:[second].[subsecond digits:3]"; +lazy_static! { + pub static ref LOGGER_INSTANCE: Logger = Logger { + format_desc: format_description::parse(TIME_FORMAT).unwrap(), + }; +} + impl log::Log for Logger { fn enabled(&self, metadata: &log::Metadata) -> bool { if metadata.target().starts_with("globed_game_server") { @@ -20,8 +29,7 @@ impl log::Log for Logger { fn log(&self, record: &log::Record) { if self.enabled(record.metadata()) { let now: OffsetDateTime = SystemTime::now().into(); - let format_desc = format_description::parse(TIME_FORMAT).unwrap(); - let formatted_time = now.format(&format_desc).unwrap(); + let formatted_time = now.format(&self.format_desc).unwrap(); let (level, args) = match record.level() { Level::Error => ( diff --git a/server/game/src/main.rs b/server/game/src/main.rs index a5de1c80..20f07287 100644 --- a/server/game/src/main.rs +++ b/server/game/src/main.rs @@ -1,9 +1,17 @@ +#![allow( + clippy::must_use_candidate, + clippy::module_name_repetitions, + clippy::cast_possible_truncation, + clippy::missing_errors_doc, + clippy::missing_panics_doc, + clippy::wildcard_imports +)] + use std::{collections::HashMap, error::Error}; use anyhow::anyhow; use globed_shared::{GameServerBootData, PROTOCOL_VERSION}; use log::{error, info, warn, LevelFilter}; -use logger::Logger; use server::GameServerConfiguration; use state::ServerState; @@ -17,11 +25,9 @@ pub mod server; pub mod server_thread; pub mod state; -static LOGGER: Logger = Logger; - #[tokio::main] async fn main() -> Result<(), Box> { - log::set_logger(&LOGGER).unwrap(); + log::set_logger(&*logger::LOGGER_INSTANCE).unwrap(); if std::env::var("GLOBED_GS_LESS_LOG").unwrap_or("0".to_string()) == "1" { log::set_max_level(LevelFilter::Warn); diff --git a/server/game/src/server.rs b/server/game/src/server.rs index a50e7474..a637337b 100644 --- a/server/game/src/server.rs +++ b/server/game/src/server.rs @@ -4,7 +4,7 @@ use std::{ time::Duration, }; -use parking_lot::{Mutex as SyncMutex, RwLock as SyncRwLock}; +use parking_lot::Mutex as SyncMutex; use anyhow::anyhow; use crypto_box::{aead::OsRng, SecretKey}; @@ -35,7 +35,7 @@ pub struct GameServer { pub address: String, pub state: ServerState, pub socket: UdpSocket, - pub threads: SyncRwLock>>, + pub threads: SyncMutex>>, pub secret_key: SecretKey, pub central_conf: SyncMutex, pub config: GameServerConfiguration, @@ -56,7 +56,7 @@ impl GameServer { address: address.clone(), state, socket: UdpSocket::bind(&address).await.unwrap(), - threads: SyncRwLock::new(FxHashMap::default()), + threads: SyncMutex::new(FxHashMap::default()), secret_key, central_conf: SyncMutex::new(central_conf), config, @@ -94,9 +94,9 @@ impl GameServer { /* various calls for other threads */ - pub async fn broadcast_voice_packet(&'static self, vpkt: Arc) -> anyhow::Result<()> { + pub fn broadcast_voice_packet(&'static self, vpkt: &Arc) -> anyhow::Result<()> { // TODO dont send it to every single thread in existence - let threads: Vec<_> = self.threads.read().values().cloned().collect(); + let threads: Vec<_> = self.threads.lock().values().cloned().collect(); for thread in threads { thread.push_new_message(ServerThreadMessage::BroadcastVoice(vpkt.clone()))?; } @@ -105,7 +105,7 @@ impl GameServer { } pub fn gather_profiles(&'static self, ids: &[i32]) -> Vec { - let threads = self.threads.read(); + let threads = self.threads.lock(); ids.iter() .filter_map(|id| { @@ -118,7 +118,7 @@ impl GameServer { } pub fn gather_all_profiles(&'static self) -> Vec { - let threads = self.threads.read(); + let threads = self.threads.lock(); threads .values() .filter(|thr| thr.authenticated.load(Ordering::Relaxed)) @@ -130,6 +130,19 @@ impl GameServer { self.central_conf.lock().no_chat.contains(&user_id) } + pub fn check_already_logged_in(&'static self, user_id: i32) -> anyhow::Result<()> { + let threads = self.threads.lock(); + let thread = threads.values().find(|thr| thr.account_id.load(Ordering::Relaxed) == user_id); + + if let Some(thread) = thread { + thread.push_new_message(ServerThreadMessage::TerminationNotice( + "Someone logged into the same account from a different place.".to_string(), + ))?; + } + + Ok(()) + } + /* private handling stuff */ async fn recv_and_handle(&'static self) -> anyhow::Result<()> { @@ -141,7 +154,7 @@ impl GameServer { SocketAddr::V4(x) => x, }; - let thread = self.threads.read().get(&peer).cloned(); + let thread = self.threads.lock().get(&peer).cloned(); let thread = if let Some(thread) = thread { thread @@ -164,25 +177,27 @@ impl GameServer { self.post_disconnect_cleanup(&thread); }); - self.threads.write().insert(peer, thread_cl.clone()); + self.threads.lock().insert(peer, thread_cl.clone()); thread_cl }; // don't heap allocate for small packets - thread.push_new_message(if len <= SMALL_PACKET_LIMIT { + let message = if len <= SMALL_PACKET_LIMIT { let mut smallbuf = [0u8; SMALL_PACKET_LIMIT]; smallbuf[..len].copy_from_slice(&buf[..len]); ServerThreadMessage::SmallPacket(smallbuf) } else { ServerThreadMessage::Packet(buf[..len].to_vec()) - })?; + }; + + thread.push_new_message(message)?; Ok(()) } fn remove_client(&'static self, key: SocketAddrV4) { - self.threads.write().remove(&key); + self.threads.lock().remove(&key); } fn post_disconnect_cleanup(&'static self, thread: &GameServerThread) { diff --git a/server/game/src/server_thread.rs b/server/game/src/server_thread.rs index 92f78573..2daa8945 100644 --- a/server/game/src/server_thread.rs +++ b/server/game/src/server_thread.rs @@ -17,15 +17,12 @@ use crypto_box::{ }; use globed_shared::PROTOCOL_VERSION; use log::{debug, warn}; -use tokio::sync::{ - mpsc::{self, Receiver, Sender}, - Mutex, -}; +use tokio::sync::{mpsc, Mutex}; use crate::{ bytebufferext::*, data::{ - packets::{client::*, server::*, Packet, PacketWithId, PACKET_HEADER_LEN}, + packets::{client::*, server::*, Packet, PacketMetadata, PACKET_HEADER_LEN}, types::*, }, server::GameServer, @@ -33,19 +30,20 @@ use crate::{ // TODO adjust this to PlayerData size in the future pub const SMALL_PACKET_LIMIT: usize = 128; -const CHANNEL_BUFFER: usize = 4; +const CHANNEL_BUFFER_SIZE: usize = 4; pub enum ServerThreadMessage { Packet(Vec), SmallPacket([u8; SMALL_PACKET_LIMIT]), BroadcastVoice(Arc), + TerminationNotice(String), } pub struct GameServerThread { game_server: &'static GameServer, - rx: Mutex>, - tx: Sender, + rx: Mutex>, + tx: mpsc::Sender, awaiting_termination: AtomicBool, pub authenticated: AtomicBool, crypto_box: SyncMutex>, @@ -111,7 +109,7 @@ impl GameServerThread { /* public api for the main server */ pub fn new(peer: SocketAddrV4, game_server: &'static GameServer) -> Self { - let (tx, rx) = mpsc::channel::(CHANNEL_BUFFER); + let (tx, rx) = mpsc::channel::(CHANNEL_BUFFER_SIZE); Self { tx, rx: Mutex::new(rx), @@ -168,6 +166,14 @@ impl GameServerThread { Ok(()) } + // attempt to send a buffer immediately to the socket, but if it requires blocking then nuh uh + fn send_buffer_immediate(&self, buffer: &[u8]) -> std::io::Result<()> { + self.game_server + .socket + .try_send_to(buffer, std::net::SocketAddr::V4(self.peer)) + .map(|_| ()) + } + fn serialize_packet(&self, packet: &impl Packet) -> anyhow::Result { let mut buf = ByteBuffer::new(); buf.write_u16(packet.get_packet_id()); @@ -217,6 +223,11 @@ impl GameServerThread { Ok(()) => {} Err(err) => bail!("failed to broadcast voice packet: {err}"), }, + + ServerThreadMessage::TerminationNotice(message) => { + self.terminate(); + self.send_packet(&ServerDisconnectPacket { message }).await?; + } } Ok(()) @@ -352,6 +363,7 @@ impl GameServerThread { gs_handler!(self, handle_login, LoginPacket, packet, { if self.game_server.standalone { debug!("Bypassing login for {}", packet.account_id); + self.game_server.check_already_logged_in(packet.account_id)?; self.authenticated.store(true, Ordering::Relaxed); self.account_id.store(packet.account_id, Ordering::Relaxed); self.game_server.state.player_count.fetch_add(1u32, Ordering::Relaxed); @@ -396,6 +408,7 @@ impl GameServerThread { let player_name = response.split_once(':').ok_or(anyhow!("central server is drunk"))?.1; + self.game_server.check_already_logged_in(packet.account_id)?; self.authenticated.store(true, Ordering::Relaxed); self.account_id.store(packet.account_id, Ordering::Relaxed); self.game_server.state.player_count.fetch_add(1u32, Ordering::Relaxed); // increment player count @@ -491,9 +504,7 @@ impl GameServerThread { let account_id = self.account_id.load(Ordering::Relaxed); - let mut buf: ByteBuffer; - - { + let retval = { let mut pm = self.game_server.state.player_manager.lock(); pm.set_player_data(account_id, &packet.data); @@ -501,25 +512,55 @@ impl GameServerThread { let players = pm.get_players_on_level(level_id).unwrap(); let calc_size = PACKET_HEADER_LEN + 4 + ((players.len() - 1) * AssociatedPlayerData::encoded_size()); - debug!("alloc with capacity: {calc_size}"); - buf = ByteBuffer::with_capacity(calc_size); + debug!("alloca with capacity: {calc_size}"); + + alloca::with_alloca(calc_size, move |data| { + // safety: 'data' will have garbage data but that is considered safe for all our intents and purposes + // as `FastByteBuffer::as_bytes()` will only return what was already written. + let data = unsafe { + let ptr = data.as_mut_ptr().cast::(); + let len = std::mem::size_of_val(data); + std::slice::from_raw_parts_mut(ptr, len) + }; + + let mut buf = FastByteBuffer::new(data); + + // dont actually do this anywhere else please + buf.write_u16(LevelDataPacket::PACKET_ID); + buf.write_bool(false); + buf.write_u32(players.len() as u32 - 1); // minus ourselves + + for player in &players { + if player.account_id == account_id { + continue; + } + + buf.write_value(*player); + } - // dont actually do this anywhere else please - buf.write_u16(LevelDataPacket::PACKET_ID); - buf.write_bool(false); - buf.write_u32(players.len() as u32 - 1); // minus ourselves + // see if we can send it right there and then + let data = buf.as_bytes(); + debug!("size at the end: {}", data.len()); - for player in &players { - if player.account_id == account_id { - continue; - } + let retval: Result>, anyhow::Error> = match self.send_buffer_immediate(data) { + // if we cant send without blocking, accept our defeat and clone the data to a vec + Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => Ok(Some(data.to_vec())), + // if another error occured, propagate it up + Err(e) => Err(e.into()), + // if all good, do nothing + Ok(()) => Ok(None), + }; - buf.write_value(*player); - } - debug!("size at the end: {}", buf.len()); + retval + }) + }?; + + if let Some(data) = retval { + debug!("fast PlayerData response failed, issuing a blocking call"); + self.send_buffer(&data).await?; } - self.send_buffer(buf.as_bytes()).await + Ok(()) }); gs_handler!(self, handle_request_player_list, RequestPlayerListPacket, _packet, { @@ -545,10 +586,10 @@ impl GameServerThread { let total_size = packet.data.opus_frames.iter().map(Vec::len).sum::(); - let throughput = (total_size as f32) / (passed_time as f32); // in kb/s + let throughput = total_size / passed_time as usize; // in kb/s debug!("voice packet throughput: {}kb/s", throughput); - if throughput > 8f32 { + if throughput > 8 { warn!("rejecting a voice packet, throughput above the limit: {}kb/s", throughput); return Ok(()); } @@ -559,7 +600,7 @@ impl GameServerThread { data: packet.data.clone(), }); - self.game_server.broadcast_voice_packet(vpkt).await?; + self.game_server.broadcast_voice_packet(&vpkt)?; Ok(()) }); diff --git a/server/game/src/state.rs b/server/game/src/state.rs index b8dfb577..98994eb2 100644 --- a/server/game/src/state.rs +++ b/server/game/src/state.rs @@ -4,8 +4,6 @@ use std::sync::atomic::AtomicU32; pub struct ServerState { pub player_count: AtomicU32, - // make player_manager SyncRwLock if there will be read-only operations - // for now everything requires write access so it's a waste pub player_manager: SyncMutex, }