diff --git a/Cargo.lock b/Cargo.lock index a9afe45af1f..136130f1b66 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -172,10 +172,10 @@ dependencies = [ ] [[package]] -name = "anymap" -version = "0.12.1" +name = "anymap3" +version = "1.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "33954243bd79057c2de7338850b85983a44588021f8a5fee574a8888c6de4344" +checksum = "170433209e817da6aae2c51aa0dd443009a613425dd041ebfb2492d1c4c11a25" [[package]] name = "approx" @@ -2058,6 +2058,68 @@ version = "0.3.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a8d1add55171497b4705a648c6b583acafb01d58050a51727785f0b2c8e0a2b2" +[[package]] +name = "gloo-console" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2a17868f56b4a24f677b17c8cb69958385102fa879418052d60b50bc1727e261" +dependencies = [ + "gloo-utils", + "js-sys", + "serde", + "wasm-bindgen", + "web-sys", +] + +[[package]] +name = "gloo-net" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c06f627b1a58ca3d42b45d6104bf1e1a03799df472df00988b6ba21accc10580" +dependencies = [ + "futures-channel", + "futures-core", + "futures-sink", + "gloo-utils", + "http 1.3.1", + "js-sys", + "pin-project", + "serde", + "serde_json", + "thiserror 1.0.69", + "wasm-bindgen", + "wasm-bindgen-futures", + "web-sys", +] + +[[package]] +name = "gloo-storage" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fbc8031e8c92758af912f9bc08fbbadd3c6f3cfcbf6b64cdf3d6a81f0139277a" +dependencies = [ + "gloo-utils", + "js-sys", + "serde", + "serde_json", + "thiserror 1.0.69", + "wasm-bindgen", + "web-sys", +] + +[[package]] +name = "gloo-utils" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0b5555354113b18c547c1d3a98fbf7fb32a9ff4f6fa112ce823a21641a0ba3aa" +dependencies = [ + "js-sys", + "serde", + "serde_json", + "wasm-bindgen", + "web-sys", +] + [[package]] name = "h2" version = "0.3.26" @@ -5184,7 +5246,7 @@ name = "spacetimedb-bench" version = "1.1.1" dependencies = [ "anyhow", - "anymap", + "anymap3", "byte-unit", "clap 4.5.37", "criterion", @@ -5802,16 +5864,21 @@ dependencies = [ name = "spacetimedb-sdk" version = "1.1.1" dependencies = [ - "anymap", + "anymap3", "base64 0.21.7", "brotli", "bytes", "cursive", "futures", "futures-channel", + "getrandom 0.3.2", + "gloo-console", + "gloo-net", + "gloo-storage", "hex", "home", "http 1.3.1", + "js-sys", "log", "once_cell", "prometheus", @@ -5825,6 +5892,10 @@ dependencies = [ "thiserror 1.0.69", "tokio", "tokio-tungstenite", + "tokio-tungstenite-wasm", + "wasm-bindgen", + "wasm-bindgen-futures", + "web-sys", ] [[package]] @@ -6727,6 +6798,25 @@ dependencies = [ "tungstenite", ] +[[package]] +name = "tokio-tungstenite-wasm" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "02567f5f341725fb3e452c1f55dd4e5b0f2a685355c3b10babf0fe8e137d176e" +dependencies = [ + "bytes", + "futures-channel", + "futures-util", + "http 1.3.1", + "httparse", + "js-sys", + "thiserror 2.0.12", + "tokio", + "tokio-tungstenite", + "wasm-bindgen", + "web-sys", +] + [[package]] name = "tokio-util" version = "0.7.15" diff --git a/Cargo.toml b/Cargo.toml index cf35ad4f169..38f8e239d50 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -128,7 +128,7 @@ spacetimedb-subscription = { path = "crates/subscription", version = "1.1.1" } # from appearing in module dependency graphs. ahash = { version = "0.8", default-features = false, features = ["std"] } anyhow = "1.0.68" -anymap = "0.12" +anymap = { package = "anymap3", version = "1.0.1" } arrayvec = "0.7.2" async-stream = "0.3.6" async-trait = "0.1.68" diff --git a/crates/codegen/src/rust.rs b/crates/codegen/src/rust.rs index af5b7ccbaab..890cf4ce697 100644 --- a/crates/codegen/src/rust.rs +++ b/crates/codegen/src/rust.rs @@ -1210,6 +1210,7 @@ impl __sdk::InModule for RemoteTables {{ /// /// - [`DbConnection::frame_tick`]. /// - [`DbConnection::run_threaded`]. +/// - [`DbConnection::run_background`]. /// - [`DbConnection::run_async`]. /// - [`DbConnection::advance_one_message`]. /// - [`DbConnection::advance_one_message_blocking`]. @@ -1309,8 +1310,19 @@ impl DbConnection {{ /// This is a low-level primitive exposed for power users who need significant control over scheduling. /// Most applications should call [`Self::run_threaded`] to spawn a thread /// which advances the connection automatically. + /// + /// # Panics + /// At runtime if called on any `wasm32` target. pub fn advance_one_message_blocking(&self) -> __sdk::Result<()> {{ - self.imp.advance_one_message_blocking() + #[cfg(target_arch = \"wasm32\")] + {{ + panic!(\"`DbConnection::advance_one_message_blocking` is not supported on WebAssembly (wasm32); \\ + prefer using `advance_one_message` or `advance_one_message_async` instead\"); + }} + #[cfg(not(target_arch = \"wasm32\"))] + {{ + self.imp.advance_one_message_blocking() + }} }} /// Process one WebSocket message, `await`ing until one is received. @@ -1334,8 +1346,35 @@ impl DbConnection {{ }} /// Spawn a thread which processes WebSocket messages as they are received. + /// + /// # Panics + /// At runtime if called on any `wasm32` target. pub fn run_threaded(&self) -> std::thread::JoinHandle<()> {{ - self.imp.run_threaded() + #[cfg(target_arch = \"wasm32\")] + {{ + panic!(\"`DbConnection::run_threaded` is not supported on WebAssembly (wasm32); \\ + prefer using `DbConnection::run_background` instead\"); + }} + #[cfg(not(target_arch = \"wasm32\"))] + {{ + self.imp.run_threaded() + }} + }} + + /// Spawn a task which processes WebSocket messages as they are received. + /// + /// # Panics + /// At runtime if called on any non-`wasm32` target. + pub fn run_background(&self) {{ + #[cfg(not(target_arch = \"wasm32\"))] + {{ + panic!(\"`DbConnection::run_background` is only supported on WebAssembly (wasm32); \\ + prefer using `DbConnection::run_threaded` instead\"); + }} + #[cfg(target_arch = \"wasm32\")] + {{ + self.imp.run_background() + }} }} /// Run an `async` loop which processes WebSocket messages when polled. diff --git a/crates/codegen/tests/snapshots/codegen__codegen_rust.snap b/crates/codegen/tests/snapshots/codegen__codegen_rust.snap index eb902cf4210..2d107f01c8c 100644 --- a/crates/codegen/tests/snapshots/codegen__codegen_rust.snap +++ b/crates/codegen/tests/snapshots/codegen__codegen_rust.snap @@ -1753,6 +1753,7 @@ impl __sdk::InModule for RemoteTables { /// /// - [`DbConnection::frame_tick`]. /// - [`DbConnection::run_threaded`]. +/// - [`DbConnection::run_background`]. /// - [`DbConnection::run_async`]. /// - [`DbConnection::advance_one_message`]. /// - [`DbConnection::advance_one_message_blocking`]. @@ -1852,8 +1853,19 @@ impl DbConnection { /// This is a low-level primitive exposed for power users who need significant control over scheduling. /// Most applications should call [`Self::run_threaded`] to spawn a thread /// which advances the connection automatically. + /// + /// # Panics + /// At runtime if called on any `wasm32` target. pub fn advance_one_message_blocking(&self) -> __sdk::Result<()> { - self.imp.advance_one_message_blocking() + #[cfg(target_arch = "wasm32")] + { + panic!("`DbConnection::advance_one_message_blocking` is not supported on WebAssembly (wasm32); \ + prefer using `advance_one_message` or `advance_one_message_async` instead"); + } + #[cfg(not(target_arch = "wasm32"))] + { + self.imp.advance_one_message_blocking() + } } /// Process one WebSocket message, `await`ing until one is received. @@ -1877,8 +1889,35 @@ impl DbConnection { } /// Spawn a thread which processes WebSocket messages as they are received. + /// + /// # Panics + /// At runtime if called on any `wasm32` target. pub fn run_threaded(&self) -> std::thread::JoinHandle<()> { - self.imp.run_threaded() + #[cfg(target_arch = "wasm32")] + { + panic!("`DbConnection::run_threaded` is not supported on WebAssembly (wasm32); \ + prefer using `DbConnection::run_background` instead"); + } + #[cfg(not(target_arch = "wasm32"))] + { + self.imp.run_threaded() + } + } + + /// Spawn a task which processes WebSocket messages as they are received. + /// + /// # Panics + /// At runtime if called on any non-`wasm32` target. + pub fn run_background(&self) { + #[cfg(not(target_arch = "wasm32"))] + { + panic!("`DbConnection::run_background` is only supported on WebAssembly (wasm32); \ + prefer using `DbConnection::run_threaded` instead"); + } + #[cfg(target_arch = "wasm32")] + { + self.imp.run_background() + } } /// Run an `async` loop which processes WebSocket messages when polled. diff --git a/crates/sdk/Cargo.toml b/crates/sdk/Cargo.toml index 9daa2a983eb..2471fe09358 100644 --- a/crates/sdk/Cargo.toml +++ b/crates/sdk/Cargo.toml @@ -7,6 +7,20 @@ description = "A Rust SDK for clients to interface with SpacetimeDB" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html +[features] +default = [] +web = [ + "dep:getrandom", + "dep:gloo-console", + "dep:gloo-net", + "dep:gloo-storage", + "dep:js-sys", + "dep:tokio-tungstenite-wasm", + "dep:wasm-bindgen", + "dep:wasm-bindgen-futures", + "dep:web-sys", +] + [dependencies] spacetimedb-data-structures.workspace = true spacetimedb-sats.workspace = true @@ -21,12 +35,26 @@ brotli.workspace = true bytes.workspace = true futures.workspace = true futures-channel.workspace = true -home.workspace = true http.workspace = true log.workspace = true once_cell.workspace = true prometheus.workspace = true rand.workspace = true + +getrandom = { version = "0.3.2", features = ["wasm_js"], optional = true } +gloo-console = { version = "0.3.0", optional = true } +gloo-net = { version = "0.6.0", optional = true } +gloo-storage = { version = "0.3.0", optional = true } +js-sys = { version = "0.3", optional = true } +tokio-tungstenite-wasm = { version = "0.6.0", optional = true } +wasm-bindgen = { version = "0.2.100", optional = true } +wasm-bindgen-futures = { version = "0.4.45", optional = true } +web-sys = { version = "0.3.77", features = [ + "Document", "HtmlDocument", "Window" +], optional = true} + +[target.'cfg(not(target_arch = "wasm32"))'.dependencies] +home.workspace = true tokio.workspace = true tokio-tungstenite.workspace = true diff --git a/crates/sdk/src/client_cache.rs b/crates/sdk/src/client_cache.rs index 46fc31ce013..f62aca56b2c 100644 --- a/crates/sdk/src/client_cache.rs +++ b/crates/sdk/src/client_cache.rs @@ -5,7 +5,7 @@ use crate::callbacks::CallbackId; use crate::db_connection::{PendingMutation, SharedCell}; use crate::spacetime_module::{InModule, SpacetimeModule, TableUpdate, WithBsatn}; -use anymap::{any::Any, Map}; +use anymap::Map; use bytes::Bytes; use core::any::type_name; use core::hash::Hash; @@ -282,7 +282,7 @@ impl TableCache { /// Called by the codegen when initializing the client cache during [`crate::DbConnectionBuilder::build`]. pub fn add_unique_constraint(&mut self, unique_index_name: &'static str, get_unique_col: fn(&Row) -> &Col) where - Col: Any + Clone + std::hash::Hash + Eq + Send + Sync + std::fmt::Debug + 'static, + Col: std::any::Any + Clone + std::hash::Hash + Eq + Send + Sync + std::fmt::Debug + 'static, { assert!(self.entries.is_empty(), "Cannot add a unique constraint to a populated table; constraints should only be added during initialization, before subscribing to any rows."); if self @@ -306,7 +306,7 @@ pub struct ClientCache { /// "keyed" on the type `HashMap<&'static str, TableCache`. /// /// The strings are table names, since we may have multiple tables with the same row type. - tables: Map, + tables: Map, _module: PhantomData, } @@ -572,7 +572,7 @@ pub struct UniqueIndexImpl { impl UniqueIndexDyn for UniqueIndexImpl where Row: Clone + Send + Sync + 'static, - Col: Any + Clone + std::hash::Hash + Eq + Send + Sync + std::fmt::Debug + 'static, + Col: std::any::Any + Clone + std::hash::Hash + Eq + Send + Sync + std::fmt::Debug + 'static, { type Row = Row; fn add_row(&mut self, row: Self::Row) { diff --git a/crates/sdk/src/credentials.rs b/crates/sdk/src/credentials.rs index bdef761048c..4f5c774b591 100644 --- a/crates/sdk/src/credentials.rs +++ b/crates/sdk/src/credentials.rs @@ -8,144 +8,319 @@ //! } //! ``` -use home::home_dir; -use spacetimedb_lib::{bsatn, de::Deserialize, ser::Serialize}; -use std::path::PathBuf; -use thiserror::Error; - -const CREDENTIALS_DIR: &str = ".spacetimedb_client_credentials"; - -#[derive(Error, Debug)] -pub enum CredentialFileError { - #[error("Failed to determine user home directory as root for credentials storage")] - DetermineHomeDir, - #[error("Error creating credential storage directory {path}")] - CreateDir { - path: PathBuf, - #[source] - source: std::io::Error, - }, - #[error("Error serializing credentials for storage in file")] - Serialize { - #[source] - source: bsatn::EncodeError, - }, - #[error("Error writing BSATN-serialized credentials to file {path}")] - Write { - path: PathBuf, - #[source] - source: std::io::Error, - }, - #[error("Error reading BSATN-serialized credentials from file {path}")] - Read { - path: PathBuf, - #[source] - source: std::io::Error, - }, - #[error("Error deserializing credentials from bytes stored in file {path}")] - Deserialize { - path: PathBuf, - #[source] - source: bsatn::DecodeError, - }, -} +#[cfg(not(feature = "web"))] +mod native_mod { + use home::home_dir; + use spacetimedb_lib::{bsatn, de::Deserialize, ser::Serialize}; + use std::path::PathBuf; + use thiserror::Error; -/// A file on disk which stores, or can store, a JWT for authenticating with SpacetimeDB. -/// -/// The file does not necessarily exist or store credentials. -/// If the credentials have been stored previously, they can be accessed with [`File::load`]. -/// New credentials can be saved to disk with [`File::save`]. -pub struct File { - filename: String, -} + const CREDENTIALS_DIR: &str = ".spacetimedb_client_credentials"; -#[derive(Serialize, Deserialize)] -struct Credentials { - token: String, -} + #[derive(Error, Debug)] + pub enum CredentialFileError { + #[error("Failed to determine user home directory as root for credentials storage")] + DetermineHomeDir, + #[error("Error creating credential storage directory {path}")] + CreateDir { + path: PathBuf, + #[source] + source: std::io::Error, + }, + #[error("Error serializing credentials for storage in file")] + Serialize { + #[source] + source: bsatn::EncodeError, + }, + #[error("Error writing BSATN-serialized credentials to file {path}")] + Write { + path: PathBuf, + #[source] + source: std::io::Error, + }, + #[error("Error reading BSATN-serialized credentials from file {path}")] + Read { + path: PathBuf, + #[source] + source: std::io::Error, + }, + #[error("Error deserializing credentials from bytes stored in file {path}")] + Deserialize { + path: PathBuf, + #[source] + source: bsatn::DecodeError, + }, + } -impl File { - /// Get a handle on a file which stores a SpacetimeDB [`Identity`] and its private access token. - /// - /// This method does not create the file or check that it exists. - /// - /// Distinct applications running as the same user on the same machine - /// may share [`Identity`]/token pairs by supplying the same `key`. - /// Users who desire distinct credentials for their application - /// should supply a unique `key` per application. + /// A file on disk which stores, or can store, a JWT for authenticating with SpacetimeDB. /// - /// No additional namespacing is provided to tie the stored token - /// to a particular SpacetimeDB instance or cluster. - /// Users who intend to connect to multiple instances or clusters - /// should use a distinct `key` per cluster. - pub fn new(key: impl Into) -> Self { - Self { filename: key.into() } + /// The file does not necessarily exist or store credentials. + /// If the credentials have been stored previously, they can be accessed with [`File::load`]. + /// New credentials can be saved to disk with [`File::save`]. + pub struct File { + filename: String, } - fn determine_home_dir() -> Result { - home_dir().ok_or(CredentialFileError::DetermineHomeDir) + #[derive(Serialize, Deserialize)] + struct Credentials { + token: String, } - fn ensure_credentials_dir() -> Result<(), CredentialFileError> { - let mut path = Self::determine_home_dir()?; - path.push(CREDENTIALS_DIR); + impl File { + /// Get a handle on a file which stores a SpacetimeDB [`Identity`] and its private access token. + /// + /// This method does not create the file or check that it exists. + /// + /// Distinct applications running as the same user on the same machine + /// may share [`Identity`]/token pairs by supplying the same `key`. + /// Users who desire distinct credentials for their application + /// should supply a unique `key` per application. + /// + /// No additional namespacing is provided to tie the stored token + /// to a particular SpacetimeDB instance or cluster. + /// Users who intend to connect to multiple instances or clusters + /// should use a distinct `key` per cluster. + pub fn new(key: impl Into) -> Self { + Self { filename: key.into() } + } - std::fs::create_dir_all(&path).map_err(|source| CredentialFileError::CreateDir { path, source }) - } + fn determine_home_dir() -> Result { + home_dir().ok_or(CredentialFileError::DetermineHomeDir) + } - fn path(&self) -> Result { - let mut path = Self::determine_home_dir()?; - path.push(CREDENTIALS_DIR); - path.push(&self.filename); - Ok(path) - } + fn ensure_credentials_dir() -> Result<(), CredentialFileError> { + let mut path = Self::determine_home_dir()?; + path.push(CREDENTIALS_DIR); - /// Store the provided `token` to disk in the file referred to by `self`. - /// - /// Future calls to [`Self::load`] on a `File` with the same key can retrieve the token. - /// - /// Expected usage is to call this from a [`super::DbConnectionBuilder::on_connect`] callback. - /// - /// ```ignore - /// DbConnection::builder() - /// .on_connect(|_ctx, _identity, token| { - /// credentials::File::new("my_app").save(token).unwrap(); - /// }) - /// ``` - pub fn save(self, token: impl Into) -> Result<(), CredentialFileError> { - Self::ensure_credentials_dir()?; - - let creds = bsatn::to_vec(&Credentials { token: token.into() }) - .map_err(|source| CredentialFileError::Serialize { source })?; - let path = self.path()?; - std::fs::write(&path, creds).map_err(|source| CredentialFileError::Write { path, source })?; - Ok(()) + std::fs::create_dir_all(&path).map_err(|source| CredentialFileError::CreateDir { path, source }) + } + + fn path(&self) -> Result { + let mut path = Self::determine_home_dir()?; + path.push(CREDENTIALS_DIR); + path.push(&self.filename); + Ok(path) + } + + /// Store the provided `token` to disk in the file referred to by `self`. + /// + /// Future calls to [`Self::load`] on a `File` with the same key can retrieve the token. + /// + /// Expected usage is to call this from a [`super::DbConnectionBuilder::on_connect`] callback. + /// + /// ```ignore + /// DbConnection::builder() + /// .on_connect(|_ctx, _identity, token| { + /// credentials::File::new("my_app").save(token).unwrap(); + /// }) + /// ``` + pub fn save(self, token: impl Into) -> Result<(), CredentialFileError> { + Self::ensure_credentials_dir()?; + + let creds = bsatn::to_vec(&Credentials { token: token.into() }) + .map_err(|source| CredentialFileError::Serialize { source })?; + let path = self.path()?; + std::fs::write(&path, creds).map_err(|source| CredentialFileError::Write { path, source })?; + Ok(()) + } + + /// Load a saved token from disk in the file referred to by `self`, + /// if they have previously been stored by [`Self::save`]. + /// + /// Returns `Err` if I/O fails, + /// `None` if credentials have not previously been stored, + /// or `Some` if credentials are successfully loaded from disk. + /// After unwrapping the `Result`, the returned `Option` can be passed to + /// [`super::DbConnectionBuilder::with_token`]. + /// + /// ```ignore + /// DbConnection::builder() + /// .with_token(credentials::File::new("my_app").load().unwrap()) + /// ``` + pub fn load(self) -> Result, CredentialFileError> { + let path = self.path()?; + + let bytes = match std::fs::read(&path) { + Ok(bytes) => bytes, + Err(e) if matches!(e.kind(), std::io::ErrorKind::NotFound) => return Ok(None), + Err(source) => return Err(CredentialFileError::Read { path, source }), + }; + + let creds = bsatn::from_slice::(&bytes) + .map_err(|source| CredentialFileError::Deserialize { path, source })?; + Ok(Some(creds.token)) + } } +} - /// Load a saved token from disk in the file referred to by `self`, - /// if they have previously been stored by [`Self::save`]. - /// - /// Returns `Err` if I/O fails, - /// `None` if credentials have not previously been stored, - /// or `Some` if credentials are successfully loaded from disk. - /// After unwrapping the `Result`, the returned `Option` can be passed to - /// [`super::DbConnectionBuilder::with_token`]. - /// - /// ```ignore - /// DbConnection::builder() - /// .with_token(credentials::File::new("my_app").load().unwrap()) - /// ``` - pub fn load(self) -> Result, CredentialFileError> { - let path = self.path()?; - - let bytes = match std::fs::read(&path) { - Ok(bytes) => bytes, - Err(e) if matches!(e.kind(), std::io::ErrorKind::NotFound) => return Ok(None), - Err(source) => return Err(CredentialFileError::Read { path, source }), - }; - - let creds = bsatn::from_slice::(&bytes) - .map_err(|source| CredentialFileError::Deserialize { path, source })?; - Ok(Some(creds.token)) +#[cfg(feature = "web")] +mod web_mod { + pub use gloo_storage::{LocalStorage, SessionStorage, Storage}; + + pub mod cookies { + use thiserror::Error; + use wasm_bindgen::{JsCast, JsValue}; + use web_sys::{window, Document, HtmlDocument}; + + #[derive(Error, Debug)] + pub enum CookieError { + #[error("Window Object not valid in this context")] + NoWindow, + #[error("No `document` available on `window` object")] + NoDocument, + #[error("`document` is not an HtmlDocument")] + NoHtmlDocument, + #[error("web_sys error: {0:?}")] + WebSys(JsValue), + } + + impl From for CookieError { + fn from(err: JsValue) -> Self { + CookieError::WebSys(err) + } + } + + /// A builder for contructing and setting cookies. + pub struct Cookie { + name: String, + value: String, + path: Option, + domain: Option, + max_age: Option, + secure: bool, + same_site: Option, + } + + impl Cookie { + /// Creates a new cookie builder with the specified name and value. + pub fn new(name: impl Into, value: impl Into) -> Self { + Self { + name: name.into(), + value: value.into(), + path: None, + domain: None, + max_age: None, + secure: false, + same_site: None, + } + } + + /// Gets the value of a cookie by name. + pub fn get(name: &str) -> Result, CookieError> { + let doc = get_html_document()?; + let all = doc.cookie().map_err(CookieError::from)?; + for cookie in all.split(';') { + let cookie = cookie.trim(); + if let Some((k, v)) = cookie.split_once('=') { + if k == name { + return Ok(Some(v.to_string())); + } + } + } + + Ok(None) + } + + /// Sets the `Path` attribute (e.g., "/"). + pub fn path(mut self, path: impl Into) -> Self { + self.path = Some(path.into()); + self + } + + /// Sets the `Domain` attribute (e.g., "example.com"). + pub fn domain(mut self, domain: impl Into) -> Self { + self.domain = Some(domain.into()); + self + } + + /// Sets the `Max-Age` attribute in seconds. + pub fn max_age(mut self, seconds: i32) -> Self { + self.max_age = Some(seconds); + self + } + + /// Toggles the `Secure` flag. + /// The default is `false`. + pub fn secure(mut self, enabled: bool) -> Self { + self.secure = enabled; + self + } + + /// Sets the `SameSite` attribute (`Strict`, `Lax`, or `None`). + pub fn same_site(mut self, same_site: SameSite) -> Self { + self.same_site = Some(same_site); + self + } + + pub fn set(self) -> Result<(), CookieError> { + let doc = get_html_document()?; + let mut parts = vec![format!("{}={}", self.name, self.value)]; + + if let Some(path) = self.path { + parts.push(format!("Path={}", path)); + } + if let Some(domain) = self.domain { + parts.push(format!("Domain={}", domain)); + } + if let Some(age) = self.max_age { + parts.push(format!("Max-Age={}", age)); + } + if self.secure { + parts.push("Secure".into()); + } + if let Some(same) = self.same_site { + parts.push(format!("SameSite={}", same.to_string())); + } + + let cookie_str = parts.join("; "); + doc.set_cookie(&cookie_str).map_err(CookieError::from) + } + + /// Deletes the cookie by setting its value to empty and `Max-Age=0`. + pub fn delete(self) -> Result<(), CookieError> { + self.value("").max_age(0).set() + } + + /// Helper to override value for delete + fn value(mut self, value: impl Into) -> Self { + self.value = value.into(); + self + } + } + + /// Controls the `SameSite` attribute for cookies. + pub enum SameSite { + Strict, + Lax, + None, + } + + impl ToString for SameSite { + fn to_string(&self) -> String { + match self { + SameSite::Strict => "Strict".into(), + SameSite::Lax => "Lax".into(), + SameSite::None => "None".into(), + } + } + } + + fn get_document() -> Result { + window() + .ok_or(CookieError::NoWindow)? + .document() + .ok_or(CookieError::NoDocument) + } + + fn get_html_document() -> Result { + let doc = get_document()?; + doc.dyn_into::().map_err(|_| CookieError::NoHtmlDocument) + } } } + +#[cfg(not(feature = "web"))] +pub use native_mod::*; + +#[cfg(feature = "web")] +pub use web_mod::*; diff --git a/crates/sdk/src/db_connection.rs b/crates/sdk/src/db_connection.rs index 9c8031698af..fd62c857b66 100644 --- a/crates/sdk/src/db_connection.rs +++ b/crates/sdk/src/db_connection.rs @@ -31,6 +31,8 @@ use crate::{ }; use bytes::Bytes; use futures::StreamExt; +#[cfg(feature = "web")] +use futures::{pin_mut, FutureExt}; use futures_channel::mpsc; use http::Uri; use spacetimedb_client_api_messages::websocket as ws; @@ -40,6 +42,7 @@ use std::{ collections::HashMap, sync::{atomic::AtomicU32, Arc, Mutex as StdMutex, OnceLock}, }; +#[cfg(not(feature = "web"))] use tokio::{ runtime::{self, Runtime}, sync::Mutex as TokioMutex, @@ -53,6 +56,7 @@ pub(crate) type SharedCell = Arc>; /// This must be relatively cheaply `Clone`-able, and have internal sharing, /// as numerous operations will clone it to get new handles on the connection. pub struct DbContextImpl { + #[cfg(not(feature = "web"))] runtime: runtime::Handle, /// All the state which is safe to hold a lock on while running callbacks. @@ -66,7 +70,10 @@ pub struct DbContextImpl { /// Receiver channel for WebSocket messages, /// which are pre-parsed in the background by [`parse_loop`]. + #[cfg(not(feature = "web"))] recv: Arc>>>, + #[cfg(feature = "web")] + recv: SharedCell>>, /// Channel into which operations which apparently mutate SDK state, /// e.g. registering callbacks, push [`PendingMutation`] messages, @@ -76,7 +83,10 @@ pub struct DbContextImpl { /// Receive end of `pending_mutations_send`, /// from which [Self::apply_pending_mutations] and friends read mutations. + #[cfg(not(feature = "web"))] pending_mutations_recv: Arc>>>, + #[cfg(feature = "web")] + pending_mutations_recv: SharedCell>>, /// This connection's `Identity`. /// @@ -88,6 +98,7 @@ pub struct DbContextImpl { impl Clone for DbContextImpl { fn clone(&self) -> Self { Self { + #[cfg(not(feature = "web"))] runtime: self.runtime.clone(), // Being very explicit with `Arc::clone` here, // since we'll be doing `DbContextImpl::clone` very frequently, @@ -259,9 +270,10 @@ impl DbContextImpl { /// Apply all queued [`PendingMutation`]s. fn apply_pending_mutations(&self) -> crate::Result<()> { - while let Ok(Some(pending_mutation)) = self.pending_mutations_recv.blocking_lock().try_next() { + while let Ok(Some(pending_mutation)) = get_lock_sync(&self.pending_mutations_recv).try_next() { self.apply_mutation(pending_mutation)?; } + Ok(()) } @@ -475,7 +487,7 @@ impl DbContextImpl { // returns `Err(_)`. Similar behavior as `Iterator::next` and // `Stream::poll_next`. No comment on whether this is a good mental // model or not. - let res = match self.recv.blocking_lock().try_next() { + let res = match get_lock_sync(&self.recv).try_next() { Ok(None) => { let disconnect_ctx = self.make_event_ctx(None); self.invoke_disconnected(&disconnect_ctx); @@ -498,8 +510,8 @@ impl DbContextImpl { // We call this out as an incorrect and unsupported thing to do. #![allow(clippy::await_holding_lock)] - let mut pending_mutations = self.pending_mutations_recv.lock().await; - let mut recv = self.recv.lock().await; + let mut pending_mutations = get_lock_async(&self.pending_mutations_recv).await; + let mut recv = get_lock_async(&self.recv).await; // Always process pending mutations before WS messages, if they're available, // so that newly registered callbacks run on messages. @@ -510,15 +522,28 @@ impl DbContextImpl { return Message::Local(pending_mutation.unwrap()); } + #[cfg(not(feature = "web"))] tokio::select! { pending_mutation = pending_mutations.next() => Message::Local(pending_mutation.unwrap()), incoming_message = recv.next() => Message::Ws(incoming_message), } + + #[cfg(feature = "web")] + { + let (pending_fut, recv_fut) = (pending_mutations.next().fuse(), recv.next().fuse()); + pin_mut!(pending_fut, recv_fut); + + futures::select! { + pending_mutation = pending_fut => Message::Local(pending_mutation.unwrap()), + incoming_message = recv_fut => Message::Ws(incoming_message), + } + } } /// Like [`Self::advance_one_message`], but sleeps the thread until a message is available. /// /// Called by the autogenerated `DbConnection` method of the same name. + #[cfg(not(feature = "web"))] pub fn advance_one_message_blocking(&self) -> crate::Result<()> { match self.runtime.block_on(self.get_message()) { Message::Local(pending) => self.apply_mutation(pending), @@ -557,6 +582,7 @@ impl DbContextImpl { /// Spawn a thread which does [`Self::advance_one_message_blocking`] in a loop. /// /// Called by the autogenerated `DbConnection` method of the same name. + #[cfg(not(feature = "web"))] pub fn run_threaded(&self) -> std::thread::JoinHandle<()> { let this = self.clone(); std::thread::spawn(move || loop { @@ -568,6 +594,23 @@ impl DbContextImpl { }) } + /// Spawn a background task which does [`Self::advance_one_message_async`] in a loop. + /// + /// Called by the autogenerated `DbConnection` method of the same name. + #[cfg(feature = "web")] + pub fn run_background(&self) { + let this = self.clone(); + wasm_bindgen_futures::spawn_local(async move { + loop { + match this.advance_one_message_async().await { + Ok(()) => (), + Err(e) if error_is_normal_disconnect(&e) => return, + Err(e) => panic!("{e:?}"), + } + } + }) + } + /// An async task which does [`Self::advance_one_message_async`] in a loop. /// /// Called by the autogenerated `DbConnection` method of the same name. @@ -691,6 +734,7 @@ pub(crate) struct DbContextImplInner { /// `Some` if not within the context of an outer runtime. The `Runtime` must /// then live as long as `Self`. #[allow(unused)] + #[cfg(not(feature = "web"))] runtime: Option, db_callbacks: DbCallbacks, @@ -814,6 +858,7 @@ You must explicitly advance the connection by calling any one of: - `DbConnection::frame_tick`. - `DbConnection::run_threaded`. +- `DbConnection::run_background`. - `DbConnection::run_async`. - `DbConnection::advance_one_message`. - `DbConnection::advance_one_message_blocking`. @@ -822,13 +867,21 @@ You must explicitly advance the connection by calling any one of: Which of these methods you should call depends on the specific needs of your application, but you must call one of them, or else the connection will never progress. "] + #[cfg(not(feature = "web"))] pub fn build(self) -> crate::Result { let imp = self.build_impl()?; Ok(::new(imp)) } + #[cfg(feature = "web")] + pub async fn build(self) -> crate::Result { + let imp = self.build_impl().await?; + Ok(::new(imp)) + } + /// Open a WebSocket connection, build an empty client cache, &c, /// to construct a [`DbContextImpl`]. + #[cfg(not(feature = "web"))] fn build_impl(self) -> crate::Result> { let (runtime, handle) = enter_or_create_runtime()?; let db_callbacks = DbCallbacks::default(); @@ -883,6 +936,56 @@ but you must call one of them, or else the connection will never progress. Ok(ctx_imp) } + #[cfg(feature = "web")] + pub async fn build_impl(self) -> crate::Result> { + let db_callbacks = DbCallbacks::default(); + let reducer_callbacks = ReducerCallbacks::default(); + + let ws_connection = WsConnection::connect( + self.uri.unwrap(), + self.module_name.as_ref().unwrap(), + self.token.as_deref(), + get_connection_id(), + self.params, + ) + .await + .map_err(|source| crate::Error::FailedToConnect { + source: InternalError::new("Failed to initiate WebSocket connection").with_cause(source), + })?; + + let (raw_msg_recv, raw_msg_send) = ws_connection.spawn_message_loop(); + let parsed_recv_chan = spawn_parse_loop::(raw_msg_recv); + + let inner = Arc::new(StdMutex::new(DbContextImplInner { + db_callbacks, + reducer_callbacks, + subscriptions: SubscriptionManager::default(), + + on_connect: self.on_connect, + on_connect_error: self.on_connect_error, + on_disconnect: self.on_disconnect, + call_reducer_flags: <_>::default(), + })); + + let mut cache = ClientCache::default(); + M::register_tables(&mut cache); + let cache = Arc::new(StdMutex::new(cache)); + let send_chan = Arc::new(StdMutex::new(Some(raw_msg_send))); + + let (pending_mutations_send, pending_mutations_recv) = mpsc::unbounded(); + let ctx_imp = DbContextImpl { + inner, + send_chan, + cache, + recv: Arc::new(StdMutex::new(parsed_recv_chan)), + pending_mutations_send, + pending_mutations_recv: Arc::new(StdMutex::new(pending_mutations_recv)), + identity: Arc::new(StdMutex::new(None)), + }; + + Ok(ctx_imp) + } + /// Set the URI of the SpacetimeDB host which is running the remote module. /// /// The URI must have either no scheme or one of the schemes `http`, `https`, `ws` or `wss`. @@ -995,6 +1098,7 @@ Instead of registering multiple `on_disconnect` callbacks, register a single cal // When called from within an async context, return a handle to it (and no // `Runtime`), otherwise create a fresh `Runtime` and return it along with a // handle to it. +#[cfg(not(feature = "web"))] fn enter_or_create_runtime() -> crate::Result<(Option, runtime::Handle)> { match runtime::Handle::try_current() { Err(e) if e.is_missing_context() => { @@ -1017,6 +1121,31 @@ fn enter_or_create_runtime() -> crate::Result<(Option, runtime::Handle) } } +/// Synchronous lock helper: native = blocking_lock, web = lock().unwrap() +#[cfg(not(feature = "web"))] +fn get_lock_sync(mutex: &TokioMutex) -> tokio::sync::MutexGuard<'_, T> { + mutex.blocking_lock() +} + +/// Synchronous lock helper: native = blocking_lock, web = lock().unwrap() +#[cfg(feature = "web")] +fn get_lock_sync(mutex: &StdMutex) -> std::sync::MutexGuard<'_, T> { + mutex.lock().unwrap() +} + +// Async‐lock helper: native = .lock().await, web = lock().unwrap() inside async fn +#[cfg(not(feature = "web"))] +async fn get_lock_async(mutex: &TokioMutex) -> tokio::sync::MutexGuard<'_, T> { + mutex.lock().await +} + +// Async‐lock helper: native = .lock().await, web = lock().unwrap() inside async fn +#[cfg(feature = "web")] +pub async fn get_lock_async(mutex: &StdMutex) -> std::sync::MutexGuard<'_, T> { + // still async, but does the sync lock immediately + mutex.lock().unwrap() +} + enum ParsedMessage { InitialSubscription { db_update: M::DbUpdate, sub_id: u32 }, TransactionUpdate(Event, Option), @@ -1027,6 +1156,7 @@ enum ParsedMessage { Error(crate::Error), } +#[cfg(not(feature = "web"))] fn spawn_parse_loop( raw_message_recv: mpsc::UnboundedReceiver>, handle: &runtime::Handle, @@ -1036,6 +1166,15 @@ fn spawn_parse_loop( (handle, parsed_message_recv) } +#[cfg(feature = "web")] +fn spawn_parse_loop( + raw_message_recv: mpsc::UnboundedReceiver>, +) -> mpsc::UnboundedReceiver> { + let (parsed_message_send, parsed_message_recv) = mpsc::unbounded(); + wasm_bindgen_futures::spawn_local(parse_loop(raw_message_recv, parsed_message_send)); + parsed_message_recv +} + /// A loop which reads raw WS messages from `recv`, parses them into domain types, /// and pushes the [`ParsedMessage`]s into `send`. async fn parse_loop( diff --git a/crates/sdk/src/lib.rs b/crates/sdk/src/lib.rs index 66459845a37..f821dd7521e 100644 --- a/crates/sdk/src/lib.rs +++ b/crates/sdk/src/lib.rs @@ -72,3 +72,9 @@ pub mod unstable { pub use crate::metrics::{ClientMetrics, CLIENT_METRICS}; pub use spacetimedb_client_api_messages::websocket::CallReducerFlags; } + +#[cfg(all(not(target_arch = "wasm32"), feature = "web"))] +compile_error!( + "The `web` feature can only be enabled when targeting `wasm32`. \ + Remove `--features web` or switch your target to a wasm32 architecture." +); diff --git a/crates/sdk/src/websocket.rs b/crates/sdk/src/websocket.rs index 436520d50d3..9c0d657480e 100644 --- a/crates/sdk/src/websocket.rs +++ b/crates/sdk/src/websocket.rs @@ -2,12 +2,16 @@ //! //! This module is internal, and may incompatibly change without warning. +#[cfg(not(feature = "web"))] use std::mem; use std::sync::Arc; +#[cfg(not(feature = "web"))] use std::time::Duration; use bytes::Bytes; -use futures::{SinkExt, StreamExt as _, TryStreamExt}; +#[cfg(not(feature = "web"))] +use futures::TryStreamExt; +use futures::{SinkExt, StreamExt as _}; use futures_channel::mpsc; use http::uri::{InvalidUri, Scheme, Uri}; use spacetimedb_client_api_messages::websocket::{ @@ -17,15 +21,17 @@ use spacetimedb_client_api_messages::websocket::{ use spacetimedb_client_api_messages::websocket::{ClientMessage, ServerMessage}; use spacetimedb_lib::{bsatn, ConnectionId}; use thiserror::Error; -use tokio::task::JoinHandle; -use tokio::time::Instant; -use tokio::{net::TcpStream, runtime}; +#[cfg(not(feature = "web"))] +use tokio::{net::TcpStream, runtime, task::JoinHandle, time::Instant}; +#[cfg(not(feature = "web"))] use tokio_tungstenite::{ connect_async_with_config, tungstenite::client::IntoClientRequest, tungstenite::protocol::{Message as WebSocketMessage, WebSocketConfig}, MaybeTlsStream, WebSocketStream, }; +#[cfg(feature = "web")] +use tokio_tungstenite_wasm::{Message as WebSocketMessage, WebSocketStream}; use crate::metrics::CLIENT_METRICS; @@ -55,6 +61,7 @@ pub enum WsError { #[error(transparent)] UriError(#[from] UriError), + #[cfg(not(feature = "web"))] #[error("Error in WebSocket connection with {uri}: {source}")] Tungstenite { uri: Uri, @@ -63,6 +70,15 @@ pub enum WsError { source: Arc, }, + #[cfg(feature = "web")] + #[error("Error in WebSocket connection with {uri}: {source}")] + Tungstenite { + uri: Uri, + #[source] + // `Arc` is required for `Self: Clone`, as `tungstenite::Error: !Clone`. + source: Arc, + }, + #[error("Received empty raw message, but valid messages always start with a one-byte compression flag")] EmptyMessage, @@ -82,12 +98,19 @@ pub enum WsError { #[error("Unrecognized compression scheme: {scheme:#x}")] UnknownCompressionScheme { scheme: u8 }, + + #[cfg(feature = "web")] + #[error("Token verification error: {0}")] + TokenVerification(String), } pub(crate) struct WsConnection { db_name: Box, connection_id: ConnectionId, + #[cfg(not(feature = "web"))] sock: WebSocketStream>, + #[cfg(feature = "web")] + sock: WebSocketStream, } fn parse_scheme(scheme: Option) -> Result { @@ -112,7 +135,29 @@ pub(crate) struct WsParams { pub light: bool, } +#[cfg(not(feature = "web"))] fn make_uri(host: Uri, db_name: &str, connection_id: ConnectionId, params: WsParams) -> Result { + make_uri_impl(host, db_name, connection_id, params, None) +} + +#[cfg(feature = "web")] +fn make_uri( + host: Uri, + db_name: &str, + connection_id: ConnectionId, + params: WsParams, + token: Option<&str>, +) -> Result { + make_uri_impl(host, db_name, connection_id, params, token) +} + +fn make_uri_impl( + host: Uri, + db_name: &str, + connection_id: ConnectionId, + params: WsParams, + token: Option<&str>, +) -> Result { let mut parts = host.into_parts(); let scheme = parse_scheme(parts.scheme.take())?; parts.scheme = Some(scheme); @@ -152,6 +197,11 @@ fn make_uri(host: Uri, db_name: &str, connection_id: ConnectionId, params: WsPar path.push_str("&light=true"); } + // Specify the `token` param if needed + if let Some(token) = token { + path.push_str(&format!("&token={token}")); + } + parts.path_and_query = Some(path.parse().map_err(|source: InvalidUri| UriError::InvalidUri { source: Arc::new(source), })?); @@ -169,6 +219,7 @@ fn make_uri(host: Uri, db_name: &str, connection_id: ConnectionId, params: WsPar // rather than having Tungstenite manage its own connections. Should this library do // the same? +#[cfg(not(feature = "web"))] fn make_request( host: Uri, db_name: &str, @@ -186,6 +237,7 @@ fn make_request( Ok(req) } +#[cfg(not(feature = "web"))] fn request_insert_protocol_header(req: &mut http::Request<()>) { req.headers_mut().insert( http::header::SEC_WEBSOCKET_PROTOCOL, @@ -193,6 +245,7 @@ fn request_insert_protocol_header(req: &mut http::Request<()>) { ); } +#[cfg(not(feature = "web"))] fn request_insert_auth_header(req: &mut http::Request<()>, token: Option<&str>) { if let Some(token) = token { let auth = ["Bearer ", token].concat().try_into().unwrap(); @@ -200,7 +253,55 @@ fn request_insert_auth_header(req: &mut http::Request<()>, token: Option<&str>) } } +#[cfg(feature = "web")] +async fn fetch_ws_token(host: &Uri, auth_token: &str) -> Result { + use gloo_net::http::{Method, RequestBuilder}; + use js_sys::{Reflect, JSON}; + use wasm_bindgen::{JsCast, JsValue}; + + let url = format!("{}v1/identity/websocket-token", host); + + // helpers to convert gloo_net::Error or JsValue into WsError::TokenVerification + let gloo_to_ws_err = |e: gloo_net::Error| match e { + gloo_net::Error::JsError(js_err) => WsError::TokenVerification(js_err.message.into()), + gloo_net::Error::SerdeError(e) => WsError::TokenVerification(e.to_string()), + gloo_net::Error::GlooError(msg) => WsError::TokenVerification(msg), + }; + let js_to_ws_err = |e: JsValue| { + if let Some(err) = e.dyn_ref::() { + WsError::TokenVerification(err.message().into()) + } else if let Some(s) = e.as_string() { + WsError::TokenVerification(s) + } else { + WsError::TokenVerification(format!("{:?}", e)) + } + }; + + let res = RequestBuilder::new(&url) + .method(Method::POST) + .header("Authorization", &format!("Bearer {auth_token}")) + .send() + .await + .map_err(gloo_to_ws_err)?; + + if !res.ok() { + return Err(WsError::TokenVerification(format!( + "HTTP error: {} {}", + res.status(), + res.status_text() + ))); + } + + let body = res.text().await.map_err(gloo_to_ws_err)?; + let json = JSON::parse(&body).map_err(js_to_ws_err)?; + let token_js = Reflect::get(&json, &JsValue::from_str("token")).map_err(js_to_ws_err)?; + token_js + .as_string() + .ok_or_else(|| WsError::TokenVerification("`token` parsing failed".into())) +} + impl WsConnection { + #[cfg(not(feature = "web"))] pub(crate) async fn connect( host: Uri, db_name: &str, @@ -233,6 +334,35 @@ impl WsConnection { }) } + #[cfg(feature = "web")] + pub(crate) async fn connect( + host: Uri, + db_name: &str, + token: Option<&str>, + connection_id: ConnectionId, + params: WsParams, + ) -> Result { + let token = if let Some(auth_token) = token { + Some(fetch_ws_token(&host, auth_token).await?) + } else { + None + }; + + let uri = make_uri(host, db_name, connection_id, params, token.as_deref())?; + let sock = tokio_tungstenite_wasm::connect_with_protocols(&uri.to_string(), &[BIN_PROTOCOL]) + .await + .map_err(|source| WsError::Tungstenite { + uri, + source: Arc::new(source), + })?; + + Ok(WsConnection { + db_name: db_name.into(), + connection_id, + sock, + }) + } + pub(crate) fn parse_response(bytes: &[u8]) -> Result, WsError> { let (compression, bytes) = bytes.split_first().ok_or(WsError::EmptyMessage)?; @@ -264,12 +394,14 @@ impl WsConnection { WebSocketMessage::Binary(bsatn::to_vec(&msg).unwrap().into()) } + #[cfg(not(feature = "web"))] fn maybe_log_error(cause: &str, res: std::result::Result) { if let Err(e) = res { log::warn!("{}: {:?}", cause, e); } } + #[cfg(not(feature = "web"))] async fn message_loop( mut self, incoming_messages: mpsc::UnboundedSender>, @@ -409,6 +541,7 @@ impl WsConnection { } } + #[cfg(not(feature = "web"))] pub(crate) fn spawn_message_loop( self, runtime: &runtime::Handle, @@ -422,4 +555,101 @@ impl WsConnection { let handle = runtime.spawn(self.message_loop(incoming_send, outgoing_recv)); (handle, incoming_recv, outgoing_send) } + + #[cfg(feature = "web")] + pub(crate) fn spawn_message_loop( + self, + ) -> ( + mpsc::UnboundedReceiver>, + mpsc::UnboundedSender>, + ) { + let record_metrics = move |msg_size: usize| { + CLIENT_METRICS + .websocket_received + .with_label_values(&self.db_name, &self.connection_id) + .inc(); + CLIENT_METRICS + .websocket_received_msg_size + .with_label_values(&self.db_name, &self.connection_id) + .observe(msg_size as f64); + }; + + let (outgoing_tx, outgoing_rx) = mpsc::unbounded::>(); + let (incoming_tx, incoming_rx) = mpsc::unbounded::>(); + + let (mut ws_writer, ws_reader) = self.sock.split(); + + wasm_bindgen_futures::spawn_local(async move { + let mut incoming = ws_reader.fuse(); + let mut outgoing = outgoing_rx.fuse(); + + loop { + futures::select! { + // 1) inbound WS frames + inbound = incoming.next() => match inbound { + Some(Err(tokio_tungstenite_wasm::Error::ConnectionClosed)) | None => { + gloo_console::log!("Connection closed"); + break; + }, + + Some(Ok(WebSocketMessage::Binary(bytes))) => { + record_metrics(bytes.len()); + // parse + forward into `incoming_tx` + match Self::parse_response(&bytes) { + Ok(msg) => if let Err(_e) = incoming_tx.unbounded_send(msg) { + gloo_console::warn!("Incoming receiver dropped."); + break; + }, + Err(e) => { + gloo_console::warn!( + "Error decoding WebSocketMessage::Binay payload: ", + format!("{:?}", e) + ); + }, + } + }, + + Some(Ok(WebSocketMessage::Close(r))) => { + let reason: String = if let Some(r) = r { + format!("{}:{:?}", r, r.code) + } else {String::default()}; + gloo_console::warn!("Connection Closed.", reason); + let _ = ws_writer.close().await; + break; + }, + + Some(Err(e)) => { + gloo_console::warn!( + "Error reading message from read WebSocket stream: ", + format!("{:?}",e) + ); + break; + }, + + Some(Ok(other)) => { + record_metrics(other.len()); + gloo_console::warn!("Unexpected WebSocket message: ", format!("{:?}",other)); + } + }, + + // 2) outbound messages + outbound = outgoing.next() => if let Some(client_msg) = outbound { + let raw = Self::encode_message(client_msg); + if let Err(e) = ws_writer.send(raw).await { + gloo_console::warn!("Error sending outgoing message:", format!("{:?}",e)); + break; + } + } else { + // channel closed, so we're done sending + if let Err(e) = ws_writer.close().await { + gloo_console::warn!("Error sending close frame:", format!("{:?}", e)); + } + break; + }, + } + } + }); + + (incoming_rx, outgoing_tx) + } }