Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion examples/interval-server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ async fn accept_connection(peer: SocketAddr, stream: TcpStream) {
async fn handle_connection(peer: SocketAddr, stream: TcpStream) -> Result<()> {
let ws_stream = accept_async(stream).await.expect("Failed to accept");
info!("New WebSocket connection: {}", peer);
let (ws_sender, mut ws_receiver) = ws_stream.split();
let (mut ws_sender, mut ws_receiver) = ws_stream.split();
let mut interval = async_std::stream::interval(Duration::from_millis(1000));

// Echo incoming WebSocket messages and send a message periodically every second.
Expand Down
4 changes: 2 additions & 2 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -609,7 +609,7 @@ pub struct WebSocketSender<S> {

impl<S> WebSocketSender<S> {
/// Send a message via [websocket](WebSocketStream).
pub async fn send(&self, msg: Message) -> Result<(), WsError>
pub async fn send(&mut self, msg: Message) -> Result<(), WsError>
where
S: AsyncRead + AsyncWrite + Unpin,
{
Expand All @@ -621,7 +621,7 @@ impl<S> WebSocketSender<S> {
}

/// Close the underlying [websocket](WebSocketStream).
pub async fn close(&self, msg: Option<CloseFrame>) -> Result<(), WsError>
pub async fn close(&mut self, msg: Option<CloseFrame>) -> Result<(), WsError>
where
S: AsyncRead + AsyncWrite + Unpin,
{
Expand Down
52 changes: 1 addition & 51 deletions tests/communication.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ async fn split_communication() {
.await
.expect("Client failed to connect");

let (tx, rx) = stream.split();
let (mut tx, rx) = stream.split();

for i in 1..10 {
info!("Sending message");
Expand All @@ -120,53 +120,3 @@ async fn split_communication() {
assert!(rx.is_pair_of(&tx));
WebSocketStream::reunite(tx, rx).expect("Failed to reunite the stream");
}

#[async_std::test]
async fn concurrent_send() {
let _ = env_logger::try_init();

let (con_tx, con_rx) = futures::channel::oneshot::channel();
let (msg_tx, msg_rx) = futures::channel::oneshot::channel();

let f = async move {
let listener = TcpListener::bind("127.0.0.1:12347").await.unwrap();
info!("Server ready");
con_tx.send(()).unwrap();
info!("Waiting on next connection");
let (connection, _) = listener.accept().await.expect("No connections to accept");
let stream = accept_async(connection).await;
let stream = stream.expect("Failed to handshake with connection");
run_connection(stream, msg_tx).await;
};

task::spawn(f);

info!("Waiting for server to be ready");

con_rx.await.expect("Server not ready");
let tcp = TcpStream::connect("127.0.0.1:12347")
.await
.expect("Failed to connect");
let url = url::Url::parse("ws://localhost:12347/").unwrap();
let (stream, _) = client_async(url, tcp)
.await
.expect("Client failed to connect");

let (tx, _rx) = stream.split();

// the `WebSocketSender::send` takes a shared `&self`, so you can call it concurrently.
// this test case checks that it works
let results = futures::future::join_all((1..10).map(|i| {
info!("Sending message");
tx.send(Message::text(format!("{}", i)))
}))
.await;

assert!(results.iter().all(Result::is_ok));

tx.close(None).await.expect("Failed to close");

info!("Waiting for response messages");
let messages = msg_rx.await.expect("Failed to receive messages");
assert_eq!(messages.len(), 10);
}