Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 33 additions & 20 deletions ownserver_server/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ pub struct Client<S> {
pub client_id: ClientId,
pub endpoints: Endpoints,

ws_tx: SplitSink<S, Message>,
ws_tx: Option<SplitSink<S, Message>>,
ct_self: CancellationToken,
ct_child: CancellationToken,
state: ClientState,
Expand Down Expand Up @@ -54,10 +54,10 @@ where
break
}
result = stream.next() => {
let message = match result {
let mut bytes = match result {
// handle protocol message
Some(Ok(msg)) if (msg.is_binary() || msg.is_text()) && !msg.as_bytes().is_empty() => {
msg.into_bytes()
BytesMut::from(msg.as_bytes())
}
// handle close with reason
Some(Ok(msg)) if msg.is_close() && !msg.as_bytes().is_empty() => {
Expand All @@ -70,7 +70,6 @@ where
}
};

let mut bytes = BytesMut::from(&message[..]);
counter!("ownserver_server.client.control_packet.received_bytes", "client_id" => client_id.to_string()).increment(bytes.len() as u64);

let packet = match ControlPacketV2Codec::new().decode(&mut bytes) {
Expand Down Expand Up @@ -143,7 +142,7 @@ where
Self {
client_id,
endpoints,
ws_tx: sink,
ws_tx: Some(sink),
ct_self: token,
ct_child: CancellationToken::new(),
state: ClientState::Connected,
Expand All @@ -152,29 +151,43 @@ where
}

pub async fn send_to_client(&mut self, packet: ControlPacketV2) -> Result<(), SendToClientError> {
if !matches!(self.state, ClientState::Connected) {
return Err(SendToClientError::ClientNotConnected(self.client_id))
}
let mut codec = ControlPacketV2Codec::new();
let mut bytes = BytesMut::new();
if let Err(e) = codec.encode(packet, &mut bytes) {
tracing::warn!(cid = %self.client_id, error = ?e, "failed to encode message");
return Err(SendToClientError::EncodeError(e))
}
match self.ws_tx {
None => {
// ClientState should be Connected
Err(SendToClientError::ClientNotConnected(self.client_id))
}
Some(ref mut ws_tx) => {
if !matches!(self.state, ClientState::Connected) {
return Err(SendToClientError::ClientNotConnected(self.client_id))
}

if let Err(e) = self.ws_tx.send(Message::binary(bytes.to_vec())).await {
tracing::warn!(cid = %self.client_id, error = ?e, "client disconnected: aborting");
self.set_wait_reconnect();
return Err(SendToClientError::WriteError(e))
let mut codec = ControlPacketV2Codec::new();
let mut bytes = BytesMut::new();
if let Err(e) = codec.encode(packet, &mut bytes) {
tracing::warn!(cid = %self.client_id, error = ?e, "failed to encode message");
return Err(SendToClientError::EncodeError(e))
}

let bytes_len = bytes.len();
if let Err(e) = ws_tx.send(Message::binary(bytes)).await {
tracing::warn!(cid = %self.client_id, error = ?e, "client disconnected: aborting");
self.set_wait_reconnect();
return Err(SendToClientError::WriteError(e))
}
counter!("ownserver_server.client.control_packet.sent_bytes", "client_id" => self.client_id.to_string()).increment(bytes_len as u64);
Ok(())
}
}
counter!("ownserver_server.client.control_packet.sent_bytes", "client_id" => self.client_id.to_string()).increment(bytes.len() as u64);
Ok(())
}

pub fn set_wait_reconnect(&mut self) {
if let ClientState::Connected = self.state {
self.state = ClientState::WaitReconnect { expires: Utc::now() + self.reconnect_window };
tracing::debug!(cid = %self.client_id, "set client state: {:?}", self.state);
// dropping read half of websocket stream
self.ct_self.cancel();
// dropping write half of websocket stream
self.ws_tx = None;
}
}

Expand Down