Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 6 additions & 3 deletions crates/hyperion/src/egress/channel.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
use bevy::{ecs::world::OnDespawn, prelude::*};
use hyperion_proto::{ServerToProxyMessage, UpdateChannelPosition, UpdateChannelPositions};
use hyperion_proto::UpdateChannelPosition;
use hyperion_utils::EntityExt;
use tracing::error;
use valence_bytes::CowBytes;
use valence_protocol::{ByteAngle, RawBytes, VarInt, packets::play};

use crate::{
egress::metadata::show_all,
net::{Channel, ChannelId, Compose, ConnectionId},
net::{
Channel, ChannelId, Compose, ConnectionId,
intermediate::{IntermediateServerToProxyMessage, UpdateChannelPositions},
},
simulation::{
Pitch, Position, RequestSubscribeChannelPackets, Uuid, Velocity, Yaw,
entity_kind::EntityKind,
Expand Down Expand Up @@ -47,7 +50,7 @@ fn update_channel_positions(

compose
.io_buf()
.add_proxy_message(&ServerToProxyMessage::UpdateChannelPositions(
.add_proxy_message(&IntermediateServerToProxyMessage::UpdateChannelPositions(
UpdateChannelPositions { updates: &updates },
));
}
Expand Down
12 changes: 7 additions & 5 deletions crates/hyperion/src/egress/mod.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
use bevy::prelude::*;
use hyperion_proto::{ServerToProxyMessage, UpdatePlayerPositions};
use tracing::error;
use valence_protocol::{VarInt, packets::play::PlayerActionResponseS2c};

use crate::{
Blocks,
net::{Compose, ConnectionId},
net::{
Compose, ConnectionId,
intermediate::{IntermediateServerToProxyMessage, UpdatePlayerPositions},
},
simulation::Position,
};
mod channel;
Expand All @@ -29,14 +31,14 @@ fn send_chunk_positions(
let mut stream = Vec::with_capacity(count);
let mut positions = Vec::with_capacity(count);

for (io, pos) in query.iter() {
stream.push(io.inner());
for (&io, pos) in query.iter() {
stream.push(io);
positions.push(hyperion_proto::ChunkPosition::from(pos.to_chunk()));
}

let packet = UpdatePlayerPositions { stream, positions };

let chunk_positions = ServerToProxyMessage::UpdatePlayerPositions(packet);
let chunk_positions = IntermediateServerToProxyMessage::UpdatePlayerPositions(packet);

compose.io_buf().add_proxy_message(&chunk_positions);
}
Expand Down
12 changes: 6 additions & 6 deletions crates/hyperion/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -247,21 +247,21 @@ impl Plugin for HyperionCore {

let global = Global::new(shared.clone());

let mut compose = Compose::new(shared.compression_level, global, IoBuf::default());

app.add_plugins(CommandChannelPlugin);

if let Some(address) = app.world().get_resource::<Endpoint>() {
let crypto = app.world().resource::<Crypto>();
let command_channel = app.world().resource::<CommandChannel>();
let egress_comm =
init_proxy_comms(&runtime, command_channel.clone(), address.0, crypto.clone());
compose.io_buf_mut().add_egress_comm(egress_comm);
init_proxy_comms(&runtime, command_channel.clone(), address.0, crypto.clone());
} else {
warn!("Endpoint was not set while loading HyperionCore");
}

app.insert_resource(compose);
app.insert_resource(Compose::new(
shared.compression_level,
global,
IoBuf::default(),
));
app.insert_resource(runtime);
app.insert_resource(CraftingRegistry::default());
app.insert_resource(StreamLookup::default());
Expand Down
206 changes: 206 additions & 0 deletions crates/hyperion/src/net/intermediate.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,206 @@
use hyperion_proto::{ChunkPosition, ServerToProxyMessage, UpdateChannelPosition};

use crate::net::{ConnectionId, ProxyId};

#[derive(Clone, PartialEq)]
pub struct UpdatePlayerPositions {
pub stream: Vec<ConnectionId>,
pub positions: Vec<ChunkPosition>,
}

#[derive(Clone, Copy, PartialEq, Eq)]
pub struct AddChannel<'a> {
pub channel_id: u32,

pub unsubscribe_packets: &'a [u8],
}

#[derive(Clone, PartialEq)]
pub struct UpdateChannelPositions<'a> {
pub updates: &'a [UpdateChannelPosition],
}

#[derive(Clone, Copy, PartialEq, Eq)]
pub struct RemoveChannel {
pub channel_id: u32,
}

#[derive(Clone, Copy, PartialEq, Eq)]
pub struct SubscribeChannelPackets<'a> {
pub channel_id: u32,
pub exclude: Option<ConnectionId>,

pub data: &'a [u8],
}

#[derive(Clone, Copy, PartialEq, Eq)]
pub struct SetReceiveBroadcasts {
pub stream: ConnectionId,
}

#[derive(Clone, PartialEq, Eq)]
pub struct BroadcastGlobal<'a> {
pub exclude: Option<ConnectionId>,

pub data: &'a [u8],
}

#[derive(Clone, PartialEq)]
pub struct BroadcastLocal<'a> {
pub center: ChunkPosition,
pub exclude: Option<ConnectionId>,

pub data: &'a [u8],
}

#[derive(Clone, PartialEq, Eq)]
pub struct BroadcastChannel<'a> {
pub channel_id: u32,
pub exclude: Option<ConnectionId>,

pub data: &'a [u8],
}

#[derive(Clone, PartialEq, Eq)]
pub struct Unicast<'a> {
pub stream: ConnectionId,

pub data: &'a [u8],
}

#[derive(Clone, Copy, PartialEq, Eq, Debug)]
pub struct Shutdown {
pub stream: ConnectionId,
}

#[derive(Clone, PartialEq)]
pub enum IntermediateServerToProxyMessage<'a> {
UpdatePlayerPositions(UpdatePlayerPositions),
AddChannel(AddChannel<'a>),
UpdateChannelPositions(UpdateChannelPositions<'a>),
RemoveChannel(RemoveChannel),
SubscribeChannelPackets(SubscribeChannelPackets<'a>),
BroadcastGlobal(BroadcastGlobal<'a>),
BroadcastLocal(BroadcastLocal<'a>),
BroadcastChannel(BroadcastChannel<'a>),
Unicast(Unicast<'a>),
SetReceiveBroadcasts(SetReceiveBroadcasts),
Shutdown(Shutdown),
}

impl IntermediateServerToProxyMessage<'_> {
/// Whether the result of [`IntermediateServerToProxyMessage::transform_for_proxy`] will be
/// affected by the proxy id provided
#[must_use]
pub const fn affected_by_proxy(&self) -> bool {
match self {
Self::UpdatePlayerPositions(_)
| Self::SubscribeChannelPackets(_)
| Self::BroadcastGlobal(_)
| Self::BroadcastLocal(_)
| Self::BroadcastChannel(_)
| Self::Unicast(_)
| Self::SetReceiveBroadcasts(_)
| Self::Shutdown(_) => true,
Self::AddChannel(_) | Self::UpdateChannelPositions(_) | Self::RemoveChannel(_) => false,
}
}

/// Transforms an intermediate message to a message suitable for sending to a particular proxy.
/// Returns `None` if this message should not be sent to the proxy.
#[must_use]
pub fn transform_for_proxy(&self, proxy_id: ProxyId) -> Option<ServerToProxyMessage<'_>> {
let filter_map_connection_id =
|id: ConnectionId| (id.proxy_id() == proxy_id).then(|| id.inner());
match self {
Self::UpdatePlayerPositions(message) => {
Some(ServerToProxyMessage::UpdatePlayerPositions(
hyperion_proto::UpdatePlayerPositions {
stream: message
.stream
.iter()
.copied()
.filter_map(filter_map_connection_id)
.collect::<Vec<_>>(),
positions: message.positions.clone(),
},
))
}
Self::AddChannel(message) => Some(ServerToProxyMessage::AddChannel(
hyperion_proto::AddChannel {
channel_id: message.channel_id,
unsubscribe_packets: message.unsubscribe_packets,
},
)),
Self::UpdateChannelPositions(message) => {
Some(ServerToProxyMessage::UpdateChannelPositions(
hyperion_proto::UpdateChannelPositions {
updates: message.updates,
},
))
}
Self::RemoveChannel(message) => Some(ServerToProxyMessage::RemoveChannel(
hyperion_proto::RemoveChannel {
channel_id: message.channel_id,
},
)),
Self::SubscribeChannelPackets(message) => {
Some(ServerToProxyMessage::SubscribeChannelPackets(
hyperion_proto::SubscribeChannelPackets {
channel_id: message.channel_id,
exclude: message
.exclude
.and_then(filter_map_connection_id)
.unwrap_or_default(),
data: message.data,
},
))
}
Self::BroadcastGlobal(message) => Some(ServerToProxyMessage::BroadcastGlobal(
hyperion_proto::BroadcastGlobal {
exclude: message
.exclude
.and_then(filter_map_connection_id)
.unwrap_or_default(),
data: message.data,
},
)),
Self::BroadcastLocal(message) => Some(ServerToProxyMessage::BroadcastLocal(
hyperion_proto::BroadcastLocal {
center: message.center,
exclude: message
.exclude
.and_then(filter_map_connection_id)
.unwrap_or_default(),
data: message.data,
},
)),
Self::BroadcastChannel(message) => Some(ServerToProxyMessage::BroadcastChannel(
hyperion_proto::BroadcastChannel {
channel_id: message.channel_id,
exclude: message
.exclude
.and_then(filter_map_connection_id)
.unwrap_or_default(),
data: message.data,
},
)),
Self::Unicast(message) => {
Some(ServerToProxyMessage::Unicast(hyperion_proto::Unicast {
stream: filter_map_connection_id(message.stream)?,
data: message.data,
}))
}
Self::SetReceiveBroadcasts(message) => Some(
ServerToProxyMessage::SetReceiveBroadcasts(hyperion_proto::SetReceiveBroadcasts {
stream: filter_map_connection_id(message.stream)?,
}),
),
Self::Shutdown(message) => Some(ServerToProxyMessage::SetReceiveBroadcasts(
hyperion_proto::SetReceiveBroadcasts {
stream: filter_map_connection_id(message.stream)?,
},
)),
}
}
}
Loading
Loading