From 3b2831da60289473b399d2db657df75c1760c899 Mon Sep 17 00:00:00 2001 From: nathan Date: Fri, 5 Jul 2024 11:45:33 +0800 Subject: [PATCH] chore: subscribe connect state --- appflowy-plugin/src/core/plugin.rs | 33 +++++++++++++++++++++++++--- appflowy-plugin/src/core/rpc_loop.rs | 14 ++++++------ appflowy-plugin/src/core/rpc_peer.rs | 10 ++++++--- appflowy-plugin/src/manager.rs | 18 +++++++-------- 4 files changed, 53 insertions(+), 22 deletions(-) diff --git a/appflowy-plugin/src/core/plugin.rs b/appflowy-plugin/src/core/plugin.rs index 67b12e1..831ce82 100644 --- a/appflowy-plugin/src/core/plugin.rs +++ b/appflowy-plugin/src/core/plugin.rs @@ -1,5 +1,5 @@ use crate::error::SidecarError; -use crate::manager::WeakSidecarState; +use crate::manager::WeakPluginState; use std::fmt::{Display, Formatter}; use crate::core::parser::ResponseParser; @@ -60,6 +60,22 @@ pub struct RpcCtx { pub peer: RpcPeer, } +#[derive(Debug, Clone)] +pub enum RunningState { + /// The plugin is in the process of establishing a connection + Connecting, + /// The plugin has successfully established a connection + Connected, + /// The plugin is currently running + Running, + /// The plugin has been stopped intentionally + Stopped, + /// The plugin stopped unexpectedly + UnexpectedStop, +} + +pub type RunningStateSender = tokio::sync::broadcast::Sender; + #[derive(Clone)] pub struct Plugin { peer: RpcPeer, @@ -67,6 +83,7 @@ pub struct Plugin { pub(crate) name: String, #[allow(dead_code)] pub(crate) process: Arc, + pub(crate) running_state: RunningStateSender, } impl Display for Plugin { @@ -141,6 +158,10 @@ impl Plugin { }, } } + + pub fn subscribe_running_state(&self) -> tokio::sync::broadcast::Receiver { + self.running_state.subscribe() + } } #[derive(Debug)] @@ -152,7 +173,7 @@ pub struct PluginInfo { pub(crate) async fn start_plugin_process( plugin_info: PluginInfo, id: PluginId, - state: WeakSidecarState, + state: WeakPluginState, ) -> Result<(), anyhow::Error> { trace!("start plugin process: {:?}, {:?}", id, plugin_info); let (tx, rx) = tokio::sync::oneshot::channel(); @@ -167,9 +188,12 @@ pub(crate) async fn start_plugin_process( match child { Ok(mut child) => { + let (running_state, _) = tokio::sync::broadcast::channel(1); let child_stdin = child.stdin.take().unwrap(); let child_stdout = child.stdout.take().unwrap(); - let mut looper = RpcLoop::new(child_stdin); + let mut looper = RpcLoop::new(child_stdin, running_state.clone()); + let _ = running_state.send(RunningState::Connecting); + let peer: RpcPeer = Arc::new(looper.get_raw_peer()); let name = plugin_info.name.clone(); peer.send_rpc_notification("ping", &JsonValue::Array(Vec::new())); @@ -179,9 +203,11 @@ pub(crate) async fn start_plugin_process( process: Arc::new(child), name, id, + running_state: running_state.clone(), }; state.plugin_connect(Ok(plugin)); + let _ = running_state.send(RunningState::Connected); let _ = tx.send(()); let mut state = state; let err = looper.mainloop( @@ -189,6 +215,7 @@ pub(crate) async fn start_plugin_process( || BufReader::new(child_stdout), &mut state, ); + let _ = running_state.send(RunningState::Stopped); state.plugin_exit(id, err); }, Err(err) => { diff --git a/appflowy-plugin/src/core/rpc_loop.rs b/appflowy-plugin/src/core/rpc_loop.rs index c3d3428..bd39ffe 100644 --- a/appflowy-plugin/src/core/rpc_loop.rs +++ b/appflowy-plugin/src/core/rpc_loop.rs @@ -1,5 +1,5 @@ use crate::core::parser::{Call, MessageReader}; -use crate::core::plugin::RpcCtx; +use crate::core::plugin::{RpcCtx, RunningStateSender}; use crate::core::rpc_object::RpcObject; use crate::core::rpc_peer::{RawPeer, ResponsePayload, RpcState}; use crate::error::{ReadError, RemoteError, SidecarError}; @@ -41,7 +41,7 @@ impl<'a, W: Write + 'static> Drop for PanicGuard<'a, W> { // 2. The `disconnect()` method is called on the peer. if thread::panicking() { error!("[RPC] panic guard hit, closing run loop"); - self.0.disconnect(); + self.0.unexpected_disconnect(); } } } @@ -55,8 +55,8 @@ pub struct RpcLoop { impl RpcLoop { /// Creates a new `RpcLoop` with the given output stream (which is used for /// sending requests and notifications, as well as responses). - pub fn new(writer: W) -> Self { - let rpc_peer = RawPeer(Arc::new(RpcState::new(writer))); + pub fn new(writer: W, running_state: RunningStateSender) -> Self { + let rpc_peer = RawPeer(Arc::new(RpcState::new(writer, running_state))); RpcLoop { reader: MessageReader::default(), peer: rpc_peer, @@ -165,7 +165,7 @@ impl RpcLoop { Err(err) => { if self.peer.0.is_blocking() { error!("[RPC] {:?}, disconnecting peer", err); - self.peer.disconnect(); + self.peer.unexpected_disconnect(); } self.peer.put_rpc_object(Err(err)); break; @@ -203,7 +203,7 @@ impl RpcLoop { Ok(json) => json, Err(err) => { error!("[RPC] error reading message: {:?}, disconnecting peer", err); - peer.disconnect(); + peer.unexpected_disconnect(); return err; }, }; @@ -221,7 +221,7 @@ impl RpcLoop { }, Err(err) => { error!("[RPC] error parsing message: {:?}", err); - peer.disconnect(); + peer.unexpected_disconnect(); return ReadError::UnknownRequest(err); }, Ok(Call::Message(_msg)) => { diff --git a/appflowy-plugin/src/core/rpc_peer.rs b/appflowy-plugin/src/core/rpc_peer.rs index db53362..cafe20a 100644 --- a/appflowy-plugin/src/core/rpc_peer.rs +++ b/appflowy-plugin/src/core/rpc_peer.rs @@ -1,4 +1,4 @@ -use crate::core::plugin::{Peer, PluginId}; +use crate::core::plugin::{Peer, PluginId, RunningState, RunningStateSender}; use crate::core::rpc_object::RpcObject; use crate::error::{ReadError, RemoteError, SidecarError}; use parking_lot::{Condvar, Mutex}; @@ -58,6 +58,7 @@ pub struct RpcState { timers: Mutex>, needs_exit: AtomicBool, is_blocking: AtomicBool, + running_state: RunningStateSender, } impl RpcState { @@ -70,7 +71,7 @@ impl RpcState { /// # Returns /// /// A new `RawPeer` instance wrapped in an `Arc`. - pub fn new(writer: W) -> Self { + pub fn new(writer: W, running_state: RunningStateSender) -> Self { RpcState { rx_queue: Mutex::new(VecDeque::new()), rx_cvar: Condvar::new(), @@ -80,6 +81,7 @@ impl RpcState { timers: Mutex::new(BinaryHeap::new()), needs_exit: AtomicBool::new(false), is_blocking: Default::default(), + running_state, } } @@ -342,8 +344,10 @@ impl RawPeer { } /// send disconnect error to pending requests. - pub(crate) fn disconnect(&self) { + pub(crate) fn unexpected_disconnect(&self) { trace!("[RPC] disconnecting peer"); + let _ = self.0.running_state.send(RunningState::UnexpectedStop); + let mut pending = self.0.pending.lock(); let ids = pending.keys().cloned().collect::>(); for id in &ids { diff --git a/appflowy-plugin/src/manager.rs b/appflowy-plugin/src/manager.rs index cc3f90a..75e6e3a 100644 --- a/appflowy-plugin/src/manager.rs +++ b/appflowy-plugin/src/manager.rs @@ -14,7 +14,7 @@ use std::sync::{Arc, Weak}; use tracing::{error, info, instrument, trace, warn}; pub struct SidecarManager { - state: Arc>, + state: Arc>, plugin_id_counter: Arc, operating_system: OperatingSystem, } @@ -28,7 +28,7 @@ impl Default for SidecarManager { impl SidecarManager { pub fn new() -> Self { SidecarManager { - state: Arc::new(Mutex::new(SidecarState { + state: Arc::new(Mutex::new(PluginState { plugins: Vec::new(), })), plugin_id_counter: Arc::new(Default::default()), @@ -43,7 +43,7 @@ impl SidecarManager { ))); } let plugin_id = PluginId::from(self.plugin_id_counter.fetch_add(1, Ordering::SeqCst)); - let weak_state = WeakSidecarState(Arc::downgrade(&self.state)); + let weak_state = WeakPluginState(Arc::downgrade(&self.state)); start_plugin_process(plugin_info, plugin_id, weak_state).await?; Ok(plugin_id) } @@ -126,11 +126,11 @@ impl SidecarManager { } } -pub struct SidecarState { +pub struct PluginState { plugins: Vec>, } -impl SidecarState { +impl PluginState { pub fn plugin_connect(&mut self, plugin: Result) { match plugin { Ok(plugin) => { @@ -168,10 +168,10 @@ impl SidecarState { } #[derive(Clone)] -pub struct WeakSidecarState(Weak>); +pub struct WeakPluginState(Weak>); -impl WeakSidecarState { - pub fn upgrade(&self) -> Option>> { +impl WeakPluginState { + pub fn upgrade(&self) -> Option>> { self.0.upgrade() } @@ -188,7 +188,7 @@ impl WeakSidecarState { } } -impl Handler for WeakSidecarState { +impl Handler for WeakPluginState { type Request = PluginCommand; fn handle_request(