Skip to content

Commit

Permalink
chore: subscribe connect state
Browse files Browse the repository at this point in the history
  • Loading branch information
appflowy committed Jul 5, 2024
1 parent 234eab9 commit 3b2831d
Show file tree
Hide file tree
Showing 4 changed files with 53 additions and 22 deletions.
33 changes: 30 additions & 3 deletions appflowy-plugin/src/core/plugin.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::error::SidecarError;
use crate::manager::WeakSidecarState;
use crate::manager::WeakPluginState;
use std::fmt::{Display, Formatter};

use crate::core::parser::ResponseParser;
Expand Down Expand Up @@ -60,13 +60,30 @@ pub struct RpcCtx {
pub peer: RpcPeer,
}

#[derive(Debug, Clone)]
pub enum RunningState {
/// The plugin is in the process of establishing a connection
Connecting,
/// The plugin has successfully established a connection
Connected,
/// The plugin is currently running
Running,
/// The plugin has been stopped intentionally
Stopped,
/// The plugin stopped unexpectedly
UnexpectedStop,
}

pub type RunningStateSender = tokio::sync::broadcast::Sender<RunningState>;

#[derive(Clone)]
pub struct Plugin {
peer: RpcPeer,
pub(crate) id: PluginId,
pub(crate) name: String,
#[allow(dead_code)]
pub(crate) process: Arc<Child>,
pub(crate) running_state: RunningStateSender,
}

impl Display for Plugin {
Expand Down Expand Up @@ -141,6 +158,10 @@ impl Plugin {
},
}
}

pub fn subscribe_running_state(&self) -> tokio::sync::broadcast::Receiver<RunningState> {
self.running_state.subscribe()
}
}

#[derive(Debug)]
Expand All @@ -152,7 +173,7 @@ pub struct PluginInfo {
pub(crate) async fn start_plugin_process(
plugin_info: PluginInfo,
id: PluginId,
state: WeakSidecarState,
state: WeakPluginState,
) -> Result<(), anyhow::Error> {
trace!("start plugin process: {:?}, {:?}", id, plugin_info);
let (tx, rx) = tokio::sync::oneshot::channel();
Expand All @@ -167,9 +188,12 @@ pub(crate) async fn start_plugin_process(

match child {
Ok(mut child) => {
let (running_state, _) = tokio::sync::broadcast::channel(1);
let child_stdin = child.stdin.take().unwrap();
let child_stdout = child.stdout.take().unwrap();
let mut looper = RpcLoop::new(child_stdin);
let mut looper = RpcLoop::new(child_stdin, running_state.clone());
let _ = running_state.send(RunningState::Connecting);

let peer: RpcPeer = Arc::new(looper.get_raw_peer());
let name = plugin_info.name.clone();
peer.send_rpc_notification("ping", &JsonValue::Array(Vec::new()));
Expand All @@ -179,16 +203,19 @@ pub(crate) async fn start_plugin_process(
process: Arc::new(child),
name,
id,
running_state: running_state.clone(),
};

state.plugin_connect(Ok(plugin));
let _ = running_state.send(RunningState::Connected);
let _ = tx.send(());
let mut state = state;
let err = looper.mainloop(
&plugin_info.name,
|| BufReader::new(child_stdout),
&mut state,
);
let _ = running_state.send(RunningState::Stopped);
state.plugin_exit(id, err);
},
Err(err) => {
Expand Down
14 changes: 7 additions & 7 deletions appflowy-plugin/src/core/rpc_loop.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::core::parser::{Call, MessageReader};
use crate::core::plugin::RpcCtx;
use crate::core::plugin::{RpcCtx, RunningStateSender};
use crate::core::rpc_object::RpcObject;
use crate::core::rpc_peer::{RawPeer, ResponsePayload, RpcState};
use crate::error::{ReadError, RemoteError, SidecarError};
Expand Down Expand Up @@ -41,7 +41,7 @@ impl<'a, W: Write + 'static> Drop for PanicGuard<'a, W> {
// 2. The `disconnect()` method is called on the peer.
if thread::panicking() {
error!("[RPC] panic guard hit, closing run loop");
self.0.disconnect();
self.0.unexpected_disconnect();
}
}
}
Expand All @@ -55,8 +55,8 @@ pub struct RpcLoop<W: Write + 'static> {
impl<W: Write + Send> RpcLoop<W> {
/// Creates a new `RpcLoop` with the given output stream (which is used for
/// sending requests and notifications, as well as responses).
pub fn new(writer: W) -> Self {
let rpc_peer = RawPeer(Arc::new(RpcState::new(writer)));
pub fn new(writer: W, running_state: RunningStateSender) -> Self {
let rpc_peer = RawPeer(Arc::new(RpcState::new(writer, running_state)));
RpcLoop {
reader: MessageReader::default(),
peer: rpc_peer,
Expand Down Expand Up @@ -165,7 +165,7 @@ impl<W: Write + Send> RpcLoop<W> {
Err(err) => {
if self.peer.0.is_blocking() {
error!("[RPC] {:?}, disconnecting peer", err);
self.peer.disconnect();
self.peer.unexpected_disconnect();
}
self.peer.put_rpc_object(Err(err));
break;
Expand Down Expand Up @@ -203,7 +203,7 @@ impl<W: Write + Send> RpcLoop<W> {
Ok(json) => json,
Err(err) => {
error!("[RPC] error reading message: {:?}, disconnecting peer", err);
peer.disconnect();
peer.unexpected_disconnect();
return err;
},
};
Expand All @@ -221,7 +221,7 @@ impl<W: Write + Send> RpcLoop<W> {
},
Err(err) => {
error!("[RPC] error parsing message: {:?}", err);
peer.disconnect();
peer.unexpected_disconnect();
return ReadError::UnknownRequest(err);
},
Ok(Call::Message(_msg)) => {
Expand Down
10 changes: 7 additions & 3 deletions appflowy-plugin/src/core/rpc_peer.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::core::plugin::{Peer, PluginId};
use crate::core::plugin::{Peer, PluginId, RunningState, RunningStateSender};
use crate::core::rpc_object::RpcObject;
use crate::error::{ReadError, RemoteError, SidecarError};
use parking_lot::{Condvar, Mutex};
Expand Down Expand Up @@ -58,6 +58,7 @@ pub struct RpcState<W: Write> {
timers: Mutex<BinaryHeap<Timer>>,
needs_exit: AtomicBool,
is_blocking: AtomicBool,
running_state: RunningStateSender,
}

impl<W: Write> RpcState<W> {
Expand All @@ -70,7 +71,7 @@ impl<W: Write> RpcState<W> {
/// # Returns
///
/// A new `RawPeer` instance wrapped in an `Arc`.
pub fn new(writer: W) -> Self {
pub fn new(writer: W, running_state: RunningStateSender) -> Self {
RpcState {
rx_queue: Mutex::new(VecDeque::new()),
rx_cvar: Condvar::new(),
Expand All @@ -80,6 +81,7 @@ impl<W: Write> RpcState<W> {
timers: Mutex::new(BinaryHeap::new()),
needs_exit: AtomicBool::new(false),
is_blocking: Default::default(),
running_state,
}
}

Expand Down Expand Up @@ -342,8 +344,10 @@ impl<W: Write> RawPeer<W> {
}

/// send disconnect error to pending requests.
pub(crate) fn disconnect(&self) {
pub(crate) fn unexpected_disconnect(&self) {
trace!("[RPC] disconnecting peer");
let _ = self.0.running_state.send(RunningState::UnexpectedStop);

let mut pending = self.0.pending.lock();
let ids = pending.keys().cloned().collect::<Vec<_>>();
for id in &ids {
Expand Down
18 changes: 9 additions & 9 deletions appflowy-plugin/src/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use std::sync::{Arc, Weak};
use tracing::{error, info, instrument, trace, warn};

pub struct SidecarManager {
state: Arc<Mutex<SidecarState>>,
state: Arc<Mutex<PluginState>>,
plugin_id_counter: Arc<AtomicI64>,
operating_system: OperatingSystem,
}
Expand All @@ -28,7 +28,7 @@ impl Default for SidecarManager {
impl SidecarManager {
pub fn new() -> Self {
SidecarManager {
state: Arc::new(Mutex::new(SidecarState {
state: Arc::new(Mutex::new(PluginState {
plugins: Vec::new(),
})),
plugin_id_counter: Arc::new(Default::default()),
Expand All @@ -43,7 +43,7 @@ impl SidecarManager {
)));
}
let plugin_id = PluginId::from(self.plugin_id_counter.fetch_add(1, Ordering::SeqCst));
let weak_state = WeakSidecarState(Arc::downgrade(&self.state));
let weak_state = WeakPluginState(Arc::downgrade(&self.state));
start_plugin_process(plugin_info, plugin_id, weak_state).await?;
Ok(plugin_id)
}
Expand Down Expand Up @@ -126,11 +126,11 @@ impl SidecarManager {
}
}

pub struct SidecarState {
pub struct PluginState {
plugins: Vec<Arc<Plugin>>,
}

impl SidecarState {
impl PluginState {
pub fn plugin_connect(&mut self, plugin: Result<Plugin, io::Error>) {
match plugin {
Ok(plugin) => {
Expand Down Expand Up @@ -168,10 +168,10 @@ impl SidecarState {
}

#[derive(Clone)]
pub struct WeakSidecarState(Weak<Mutex<SidecarState>>);
pub struct WeakPluginState(Weak<Mutex<PluginState>>);

impl WeakSidecarState {
pub fn upgrade(&self) -> Option<Arc<Mutex<SidecarState>>> {
impl WeakPluginState {
pub fn upgrade(&self) -> Option<Arc<Mutex<PluginState>>> {
self.0.upgrade()
}

Expand All @@ -188,7 +188,7 @@ impl WeakSidecarState {
}
}

impl Handler for WeakSidecarState {
impl Handler for WeakPluginState {
type Request = PluginCommand<String>;

fn handle_request(
Expand Down

0 comments on commit 3b2831d

Please sign in to comment.