From 49b3a6507a5523fc0d02e1be7af556ccd05bff58 Mon Sep 17 00:00:00 2001 From: Lionel Henry Date: Thu, 1 May 2025 16:52:39 +0200 Subject: [PATCH 1/9] Update comms --- crates/amalthea/src/comm/plot_comm.rs | 44 ++++++++++++------------- crates/amalthea/src/comm/ui_comm.rs | 20 +++++++++++ crates/ark/src/plots/graphics_device.rs | 44 ++++++++++++------------- crates/ark/src/ui/ui.rs | 1 + 4 files changed, 65 insertions(+), 44 deletions(-) diff --git a/crates/amalthea/src/comm/plot_comm.rs b/crates/amalthea/src/comm/plot_comm.rs index 6f324abac..7387ebc69 100644 --- a/crates/amalthea/src/comm/plot_comm.rs +++ b/crates/amalthea/src/comm/plot_comm.rs @@ -36,8 +36,8 @@ pub struct PlotResult { /// The MIME type of the plot data pub mime_type: String, - /// The policy used to render the plot - pub policy: Option + /// The settings used to render the plot + pub settings: Option } /// The size of a plot @@ -50,22 +50,34 @@ pub struct PlotSize { pub width: i64 } -/// The policy used to render the plot +/// The settings used to render the plot #[derive(Copy, Clone, Debug, Serialize, Deserialize, PartialEq)] -pub struct RenderPolicy { - /// Plot size of the render policy +pub struct PlotRenderSettings { + /// Plot size to render the plot to pub size: PlotSize, /// The pixel ratio of the display device pub pixel_ratio: f64, - /// Format of the render policy - pub format: RenderFormat + /// Format in which to render the plot + pub format: PlotRenderFormat } -/// Possible values for RenderFormat +/// Possible values for PlotUnit +#[derive(Copy, Clone, Debug, Serialize, Deserialize, PartialEq, strum_macros::Display)] +pub enum PlotUnit { + #[serde(rename = "pixels")] + #[strum(to_string = "pixels")] + Pixels, + + #[serde(rename = "inches")] + #[strum(to_string = "inches")] + Inches +} + +/// Possible values for PlotRenderFormat #[derive(Copy, Clone, Debug, Serialize, Deserialize, PartialEq, strum_macros::Display)] -pub enum RenderFormat { +pub enum PlotRenderFormat { #[serde(rename = "png")] #[strum(to_string = "png")] Png, @@ -87,18 +99,6 @@ pub enum RenderFormat { Tiff } -/// Possible values for PlotUnit -#[derive(Copy, Clone, Debug, Serialize, Deserialize, PartialEq, strum_macros::Display)] -pub enum PlotUnit { - #[serde(rename = "pixels")] - #[strum(to_string = "pixels")] - Pixels, - - #[serde(rename = "inches")] - #[strum(to_string = "inches")] - Inches -} - /// Parameters for the Render method. #[derive(Clone, Debug, Serialize, Deserialize, PartialEq)] pub struct RenderParams { @@ -110,7 +110,7 @@ pub struct RenderParams { pub pixel_ratio: f64, /// The requested plot format - pub format: RenderFormat, + pub format: PlotRenderFormat, } /** diff --git a/crates/amalthea/src/comm/ui_comm.rs b/crates/amalthea/src/comm/ui_comm.rs index 9057608fc..0aa195569 100644 --- a/crates/amalthea/src/comm/ui_comm.rs +++ b/crates/amalthea/src/comm/ui_comm.rs @@ -10,6 +10,7 @@ use serde::Deserialize; use serde::Serialize; +use super::plot_comm::PlotRenderSettings; /// Items in Params pub type Param = serde_json::Value; @@ -97,6 +98,13 @@ pub struct Range { pub end: Position } +/// Parameters for the DidChangePlotsRenderSettings method. +#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)] +pub struct DidChangePlotsRenderSettingsParams { + /// Plot rendering settings. + pub settings: PlotRenderSettings, +} + /// Parameters for the CallMethod method. #[derive(Clone, Debug, Serialize, Deserialize, PartialEq)] pub struct CallMethodParams { @@ -290,6 +298,15 @@ pub struct ShowHtmlFileParams { #[derive(Clone, Debug, Serialize, Deserialize, PartialEq)] #[serde(tag = "method", content = "params")] pub enum UiBackendRequest { + /// Notification that the settings to render a plot (i.e. the plot size) + /// have changed. + /// + /// Typically fired when the plot component has been resized by the user. + /// This notification is useful to produce accurate pre-renderings of + /// plots. + #[serde(rename = "did_change_plots_render_settings")] + DidChangePlotsRenderSettings(DidChangePlotsRenderSettingsParams), + /// Run a method in the interpreter and return the result to the frontend /// /// Unlike other RPC methods, `call_method` calls into methods implemented @@ -306,6 +323,9 @@ pub enum UiBackendRequest { #[derive(Clone, Debug, Serialize, Deserialize, PartialEq)] #[serde(tag = "method", content = "result")] pub enum UiBackendReply { + /// Unused response to notification + DidChangePlotsRenderSettingsReply(), + /// The method result CallMethodReply(CallMethodResult), diff --git a/crates/ark/src/plots/graphics_device.rs b/crates/ark/src/plots/graphics_device.rs index fccbc4422..25ff6f1f3 100644 --- a/crates/ark/src/plots/graphics_device.rs +++ b/crates/ark/src/plots/graphics_device.rs @@ -21,8 +21,8 @@ use amalthea::comm::plot_comm::PlotBackendRequest; use amalthea::comm::plot_comm::PlotFrontendEvent; use amalthea::comm::plot_comm::PlotResult; use amalthea::comm::plot_comm::PlotSize; -use amalthea::comm::plot_comm::RenderFormat; -use amalthea::comm::plot_comm::RenderPolicy; +use amalthea::comm::plot_comm::PlotRenderFormat; +use amalthea::comm::plot_comm::PlotRenderSettings; use amalthea::socket::comm::CommInitiator; use amalthea::socket::comm::CommSocket; use amalthea::socket::iopub::IOPubMessage; @@ -134,7 +134,7 @@ struct DeviceContext { wrapped_callbacks: WrappedDeviceCallbacks, /// The settings used for pre-renderings of new plots. - current_render_policy: Cell, + current_render_settings: Cell, } impl DeviceContext { @@ -149,13 +149,13 @@ impl DeviceContext { id: RefCell::new(Self::new_id()), sockets: RefCell::new(HashMap::new()), wrapped_callbacks: WrappedDeviceCallbacks::default(), - current_render_policy: Cell::new(RenderPolicy { + current_render_settings: Cell::new(PlotRenderSettings { size: PlotSize { width: 640, height: 400, }, pixel_ratio: 1., - format: RenderFormat::Png, + format: PlotRenderFormat::Png, }), } } @@ -365,7 +365,7 @@ impl DeviceContext { return Err(anyhow!("Intrinsically sized plots are not yet supported.")); }); - let policy = RenderPolicy { + let settings = PlotRenderSettings { size: PlotSize { width: size.width, height: size.height, @@ -380,15 +380,15 @@ impl DeviceContext { // This way a one-off render request with special settings won't cause // the next pre-render to be invalid and force the frontend to request // a proper render. - self.current_render_policy.replace(policy); + self.current_render_settings.replace(settings); - let data = self.render_plot(&id, &policy)?; + let data = self.render_plot(&id, &settings)?; let mime_type = Self::get_mime_type(&plot_meta.format); Ok(PlotBackendReply::RenderReply(PlotResult { data: data.to_string(), mime_type: mime_type.to_string(), - policy: Some(policy), + settings: Some(settings), })) }, } @@ -416,13 +416,13 @@ impl DeviceContext { } } - fn get_mime_type(format: &RenderFormat) -> String { + fn get_mime_type(format: &PlotRenderFormat) -> String { match format { - RenderFormat::Png => "image/png".to_string(), - RenderFormat::Svg => "image/svg+xml".to_string(), - RenderFormat::Pdf => "application/pdf".to_string(), - RenderFormat::Jpeg => "image/jpeg".to_string(), - RenderFormat::Tiff => "image/tiff".to_string(), + PlotRenderFormat::Png => "image/png".to_string(), + PlotRenderFormat::Svg => "image/svg+xml".to_string(), + PlotRenderFormat::Pdf => "application/pdf".to_string(), + PlotRenderFormat::Jpeg => "image/jpeg".to_string(), + PlotRenderFormat::Tiff => "image/tiff".to_string(), } } @@ -475,17 +475,17 @@ impl DeviceContext { POSITRON_PLOT_CHANNEL_ID.to_string(), ); - let policy = self.current_render_policy.get(); + let settings = self.current_render_settings.get(); // Prepare a pre-rendering of the plot so Positron has something to display immediately - let data = match self.render_plot(id, &policy) { + let data = match self.render_plot(id, &settings) { Ok(pre_render) => { - let mime_type = Self::get_mime_type(&RenderFormat::Png); + let mime_type = Self::get_mime_type(&PlotRenderFormat::Png); let pre_render = PlotResult { data: pre_render.to_string(), mime_type: mime_type.to_string(), - policy: Some(policy), + settings: Some(settings), }; serde_json::json!({ "pre_render": pre_render }) @@ -598,13 +598,13 @@ impl DeviceContext { fn create_display_data_plot(&self, id: &PlotId) -> Result { // TODO: Take these from R global options? Like `ark.plot.width`? - let policy = RenderPolicy { + let policy = PlotRenderSettings { size: PlotSize { width: 800, height: 600, }, pixel_ratio: 1.0, - format: RenderFormat::Png, + format: PlotRenderFormat::Png, }; let data = unwrap!(self.render_plot(id, &policy), Err(error) => { @@ -618,7 +618,7 @@ impl DeviceContext { } #[tracing::instrument(level = "trace", skip(self))] - fn render_plot(&self, id: &PlotId, policy: &RenderPolicy) -> anyhow::Result { + fn render_plot(&self, id: &PlotId, policy: &PlotRenderSettings) -> anyhow::Result { log::trace!("Rendering plot"); let image_path = r_task(|| unsafe { diff --git a/crates/ark/src/ui/ui.rs b/crates/ark/src/ui/ui.rs index f14b2b09d..56d5990ab 100644 --- a/crates/ark/src/ui/ui.rs +++ b/crates/ark/src/ui/ui.rs @@ -140,6 +140,7 @@ impl UiComm { ) -> anyhow::Result { let request = match request { UiBackendRequest::CallMethod(request) => request, + UiBackendRequest::DidChangePlotsRenderSettings(_params) => todo!(), }; log::trace!("Handling '{}' frontend RPC method", request.method); From 34c0f3a9719cf3eada914ff9df1e2f6ff66d158f Mon Sep 17 00:00:00 2001 From: Lionel Henry Date: Mon, 5 May 2025 03:32:33 -0400 Subject: [PATCH 2/9] Process graphics device notifications with async task --- crates/ark/src/interface.rs | 5 +- crates/ark/src/plots/graphics_device.rs | 44 +++++- crates/ark/src/shell.rs | 15 +- crates/ark/src/start.rs | 8 ++ crates/ark/src/ui/ui.rs | 182 ++++++++++++++++++++++-- crates/ark/tests/ui.rs | 141 ------------------ 6 files changed, 239 insertions(+), 156 deletions(-) delete mode 100644 crates/ark/tests/ui.rs diff --git a/crates/ark/src/interface.rs b/crates/ark/src/interface.rs index 180b6c755..f97487c65 100644 --- a/crates/ark/src/interface.rs +++ b/crates/ark/src/interface.rs @@ -106,6 +106,7 @@ use crate::lsp::state_handlers::ConsoleInputs; use crate::modules; use crate::modules::ARK_ENVS; use crate::plots::graphics_device; +use crate::plots::graphics_device::GraphicsDeviceNotification; use crate::r_task; use crate::r_task::BoxFuture; use crate::r_task::RTask; @@ -310,7 +311,7 @@ impl RMain { /// Sets up the main R thread, initializes the `R_MAIN` singleton, /// and starts R. Does not return! /// SAFETY: Must be called only once. Enforced with a panic. - pub fn start( + pub(crate) fn start( r_args: Vec, startup_file: Option, comm_manager_tx: Sender, @@ -323,6 +324,7 @@ impl RMain { dap: Arc>, session_mode: SessionMode, default_repos: DefaultRepos, + graphics_device_rx: Receiver, ) { // Set the main thread ID. // Must happen before doing anything that checks `RMain::on_main_thread()`, @@ -357,6 +359,7 @@ impl RMain { graphics_device::init_graphics_device( main.get_comm_manager_tx().clone(), main.get_iopub_tx().clone(), + graphics_device_rx, ); let mut r_args = r_args.clone(); diff --git a/crates/ark/src/plots/graphics_device.rs b/crates/ark/src/plots/graphics_device.rs index 25ff6f1f3..13980ce47 100644 --- a/crates/ark/src/plots/graphics_device.rs +++ b/crates/ark/src/plots/graphics_device.rs @@ -19,10 +19,10 @@ use amalthea::comm::event::CommManagerEvent; use amalthea::comm::plot_comm::PlotBackendReply; use amalthea::comm::plot_comm::PlotBackendRequest; use amalthea::comm::plot_comm::PlotFrontendEvent; -use amalthea::comm::plot_comm::PlotResult; -use amalthea::comm::plot_comm::PlotSize; use amalthea::comm::plot_comm::PlotRenderFormat; use amalthea::comm::plot_comm::PlotRenderSettings; +use amalthea::comm::plot_comm::PlotResult; +use amalthea::comm::plot_comm::PlotSize; use amalthea::socket::comm::CommInitiator; use amalthea::socket::comm::CommSocket; use amalthea::socket::iopub::IOPubMessage; @@ -32,6 +32,7 @@ use amalthea::wire::update_display_data::UpdateDisplayData; use anyhow::anyhow; use base64::engine::general_purpose; use base64::Engine; +use crossbeam::channel::Receiver; use crossbeam::channel::Select; use crossbeam::channel::Sender; use harp::exec::RFunction; @@ -44,6 +45,7 @@ use libr::SEXP; use serde_json::json; use stdext::result::ResultOrLog; use stdext::unwrap; +use tokio::task::yield_now; use uuid::Uuid; use crate::interface::RMain; @@ -51,6 +53,11 @@ use crate::interface::SessionMode; use crate::modules::ARK_ENVS; use crate::r_task; +#[derive(Debug)] +pub(crate) enum GraphicsDeviceNotification { + DidChangePlotRenderSettings(PlotRenderSettings), +} + thread_local! { // Safety: Set once by `RMain` on initialization static DEVICE_CONTEXT: RefCell = panic!("Must access `DEVICE_CONTEXT` from the R thread"); @@ -62,8 +69,39 @@ const POSITRON_PLOT_CHANNEL_ID: &str = "positron.plot"; pub(crate) fn init_graphics_device( comm_manager_tx: Sender, iopub_tx: Sender, + graphics_device_rx: Receiver, ) { - DEVICE_CONTEXT.set(DeviceContext::new(comm_manager_tx, iopub_tx)) + DEVICE_CONTEXT.set(DeviceContext::new(comm_manager_tx, iopub_tx)); + + // Launch an R thread task to process messages from the frontend + r_task::spawn_interrupt(|| async move { process_notifications(graphics_device_rx).await }); +} + +async fn process_notifications(graphics_device_rx: Receiver) { + loop { + let mut i = 0; + + while let Ok(notification) = graphics_device_rx.try_recv() { + log::trace!("Got graphics device notification: {notification:#?}"); + i = i + 1; + + match notification { + GraphicsDeviceNotification::DidChangePlotRenderSettings(plot_render_settings) => { + DEVICE_CONTEXT.with_borrow(|ctx| { + ctx.current_render_settings.replace(plot_render_settings) + }); + }, + } + + // Yield regularly to the R thread when something went wrong on the + // frontend side and is spamming messages + if i >= 5 { + break; + } + } + + yield_now().await; + } } /// Wrapped callbacks of the original graphics device we shadow diff --git a/crates/ark/src/shell.rs b/crates/ark/src/shell.rs index 356728660..0a092c7b2 100644 --- a/crates/ark/src/shell.rs +++ b/crates/ark/src/shell.rs @@ -42,6 +42,7 @@ use crate::help::r_help::RHelp; use crate::help_proxy; use crate::interface::KernelInfo; use crate::interface::RMain; +use crate::plots::graphics_device::GraphicsDeviceNotification; use crate::r_task; use crate::request::KernelRequest; use crate::request::RRequest; @@ -55,6 +56,7 @@ pub struct Shell { kernel_request_tx: Sender, kernel_init_rx: BusReader, kernel_info: Option, + graphics_device_tx: Sender, } #[derive(Debug)] @@ -64,12 +66,13 @@ pub enum REvent { impl Shell { /// Creates a new instance of the shell message handler. - pub fn new( + pub(crate) fn new( comm_manager_tx: Sender, r_request_tx: Sender, stdin_request_tx: Sender, kernel_init_rx: BusReader, kernel_request_tx: Sender, + graphics_device_tx: Sender, ) -> Self { Self { comm_manager_tx, @@ -78,6 +81,7 @@ impl Shell { kernel_request_tx, kernel_init_rx, kernel_info: None, + graphics_device_tx, } } @@ -216,7 +220,10 @@ impl ShellHandler for Shell { }) } - /// Handles a request to open a new comm channel + /// Handle a request to open a new comm channel + /// + /// Note that there might be multiple requests during a single session if + /// the UI has been disconnected and reconnected. async fn handle_comm_open(&self, target: Comm, comm: CommSocket) -> amalthea::Result { match target { Comm::Variables => handle_comm_open_variables(comm, self.comm_manager_tx.clone()), @@ -224,6 +231,7 @@ impl ShellHandler for Shell { comm, self.stdin_request_tx.clone(), self.kernel_request_tx.clone(), + self.graphics_device_tx.clone(), ), Comm::Help => handle_comm_open_help(comm), _ => Ok(false), @@ -246,10 +254,11 @@ fn handle_comm_open_ui( comm: CommSocket, stdin_request_tx: Sender, kernel_request_tx: Sender, + graphics_device_tx: Sender, ) -> amalthea::Result { // Create a frontend to wrap the comm channel we were just given. This starts // a thread that proxies messages to the frontend. - let ui_comm_tx = UiComm::start(comm, stdin_request_tx); + let ui_comm_tx = UiComm::start(comm, stdin_request_tx, graphics_device_tx); // Send the frontend event channel to the execution thread so it can emit // events to the frontend. diff --git a/crates/ark/src/start.rs b/crates/ark/src/start.rs index b798ec447..79111ba11 100644 --- a/crates/ark/src/start.rs +++ b/crates/ark/src/start.rs @@ -22,6 +22,7 @@ use crate::control::Control; use crate::dap; use crate::interface::SessionMode; use crate::lsp; +use crate::plots::graphics_device::GraphicsDeviceNotification; use crate::repos::DefaultRepos; use crate::request::KernelRequest; use crate::request::RRequest; @@ -68,6 +69,11 @@ pub fn start_kernel( // StdIn socket thread let (stdin_request_tx, stdin_request_rx) = bounded::(1); + // Communication channel between the graphics device (running on the R + // thread) and the shell thread + let (graphics_device_tx, graphics_device_rx) = + crossbeam::channel::unbounded::(); + // Create the shell. let kernel_init_rx = kernel_init_tx.add_rx(); let shell = Box::new(Shell::new( @@ -76,6 +82,7 @@ pub fn start_kernel( stdin_request_tx.clone(), kernel_init_rx, kernel_request_tx, + graphics_device_tx, )); // Create the control handler; this is used to handle shutdown/interrupt and @@ -125,5 +132,6 @@ pub fn start_kernel( dap, session_mode, default_repos, + graphics_device_rx, ) } diff --git a/crates/ark/src/ui/ui.rs b/crates/ark/src/ui/ui.rs index 56d5990ab..b2945b8ac 100644 --- a/crates/ark/src/ui/ui.rs +++ b/crates/ark/src/ui/ui.rs @@ -6,6 +6,8 @@ // use amalthea::comm::comm_channel::CommMsg; +use amalthea::comm::ui_comm::CallMethodParams; +use amalthea::comm::ui_comm::DidChangePlotsRenderSettingsParams; use amalthea::comm::ui_comm::UiBackendReply; use amalthea::comm::ui_comm::UiBackendRequest; use amalthea::comm::ui_comm::UiFrontendEvent; @@ -22,6 +24,7 @@ use serde_json::Value; use stdext::spawn; use stdext::unwrap; +use crate::plots::graphics_device::GraphicsDeviceNotification; use crate::r_task; #[derive(Debug)] @@ -37,21 +40,24 @@ pub struct UiComm { comm: CommSocket, ui_comm_rx: Receiver, stdin_request_tx: Sender, + graphics_device_tx: Sender, } impl UiComm { - pub fn start( + pub(crate) fn start( comm: CommSocket, stdin_request_tx: Sender, + graphics_device_tx: Sender, ) -> Sender { // Create a sender-receiver pair for Positron global events let (ui_comm_tx, ui_comm_rx) = crossbeam::channel::unbounded::(); spawn!("ark-comm-ui", move || { let frontend = Self { - comm: comm.clone(), - ui_comm_rx: ui_comm_rx.clone(), - stdin_request_tx: stdin_request_tx.clone(), + comm, + ui_comm_rx, + stdin_request_tx, + graphics_device_tx, }; frontend.execution_thread(); }); @@ -138,11 +144,18 @@ impl UiComm { &self, request: UiBackendRequest, ) -> anyhow::Result { - let request = match request { - UiBackendRequest::CallMethod(request) => request, - UiBackendRequest::DidChangePlotsRenderSettings(_params) => todo!(), - }; + match request { + UiBackendRequest::CallMethod(request) => self.handle_call_method(request), + UiBackendRequest::DidChangePlotsRenderSettings(params) => { + self.handle_did_change_plot_render_settings(params) + }, + } + } + fn handle_call_method( + &self, + request: CallMethodParams, + ) -> anyhow::Result { log::trace!("Handling '{}' frontend RPC method", request.method); // Today, all RPCs are fulfilled by R directly. Check to see if an R @@ -181,6 +194,19 @@ impl UiComm { Ok(UiBackendReply::CallMethodReply(result)) } + fn handle_did_change_plot_render_settings( + &self, + params: DidChangePlotsRenderSettingsParams, + ) -> anyhow::Result { + self.graphics_device_tx + .send(GraphicsDeviceNotification::DidChangePlotRenderSettings( + params.settings, + )) + .unwrap(); + + Ok(UiBackendReply::DidChangePlotsRenderSettingsReply()) + } + /** * Send an RPC request to the frontend. */ @@ -191,3 +217,143 @@ impl UiComm { Ok(()) } } + +#[cfg(test)] +mod tests { + use amalthea::comm::base_comm::JsonRpcError; + use amalthea::comm::comm_channel::CommMsg; + use amalthea::comm::ui_comm::BusyParams; + use amalthea::comm::ui_comm::CallMethodParams; + use amalthea::comm::ui_comm::UiBackendReply; + use amalthea::comm::ui_comm::UiBackendRequest; + use amalthea::comm::ui_comm::UiFrontendEvent; + use amalthea::socket::comm::CommInitiator; + use amalthea::socket::comm::CommSocket; + use amalthea::socket::stdin::StdInRequest; + use crossbeam::channel::bounded; + use harp::exec::RFunction; + use harp::exec::RFunctionExt; + use harp::object::RObject; + use serde_json::Value; + + use crate::plots::graphics_device::GraphicsDeviceNotification; + use crate::r_task::r_task; + use crate::ui::UiComm; + use crate::ui::UiCommMessage; + + #[test] + fn test_ui_comm() { + // Create a sender/receiver pair for the comm channel. + let comm_socket = CommSocket::new( + CommInitiator::FrontEnd, + String::from("test-ui-comm-id"), + String::from("positron.UI"), + ); + + // Communication channel between the main thread and the Amalthea + // StdIn socket thread + let (stdin_request_tx, _stdin_request_rx) = bounded::(1); + + let (graphics_device_tx, _graphics_device_rx) = + crossbeam::channel::unbounded::(); + + // Create a frontend instance, get access to the sender channel + let ui_comm_tx = UiComm::start(comm_socket.clone(), stdin_request_tx, graphics_device_tx); + + // Get the current console width + let old_width = r_task(|| unsafe { + let width = RFunction::from("getOption") + .param("x", "width") + .call() + .unwrap(); + RObject::to::(width).unwrap() + }); + + // Send a message to the frontend + let id = String::from("test-id-1"); + let request = UiBackendRequest::CallMethod(CallMethodParams { + method: String::from("setConsoleWidth"), + params: vec![Value::from(123)], + }); + comm_socket + .incoming_tx + .send(CommMsg::Rpc(id, serde_json::to_value(request).unwrap())) + .unwrap(); + + // Wait for the reply; this should be a FrontendRpcResult. We don't wait + // more than a second since this should be quite fast and we don't want to + // hang the test suite if it doesn't return. + let response = comm_socket + .outgoing_rx + .recv_timeout(std::time::Duration::from_secs(1)) + .unwrap(); + match response { + CommMsg::Rpc(id, result) => { + println!("Got RPC result: {:?}", result); + let result = serde_json::from_value::(result).unwrap(); + assert_eq!(id, "test-id-1"); + // This RPC should return the old width + assert_eq!( + result, + UiBackendReply::CallMethodReply(Value::from(old_width)) + ); + }, + _ => panic!("Unexpected response: {:?}", response), + } + + // Get the new console width + let new_width = r_task(|| unsafe { + let width = RFunction::from("getOption") + .param("x", "width") + .call() + .unwrap(); + RObject::to::(width).unwrap() + }); + + // Assert that the console width changed + assert_eq!(new_width, 123); + + // Now try to invoke an RPC that doesn't exist + let id = String::from("test-id-2"); + let request = UiBackendRequest::CallMethod(CallMethodParams { + method: String::from("thisRpcDoesNotExist"), + params: vec![], + }); + comm_socket + .incoming_tx + .send(CommMsg::Rpc(id, serde_json::to_value(request).unwrap())) + .unwrap(); + + // Wait for the reply + let response = comm_socket + .outgoing_rx + .recv_timeout(std::time::Duration::from_secs(1)) + .unwrap(); + match response { + CommMsg::Rpc(id, result) => { + println!("Got RPC result: {:?}", result); + let _reply = serde_json::from_value::(result).unwrap(); + // Ensure that the error code is -32601 (method not found) + assert_eq!(id, "test-id-2"); + + // TODO: This should normally throw a `MethodNotFound` but + // that's currently a bit hard because of the nested method + // call. One way to solve this would be for RPC handler + // functions to return a typed JSON-RPC error instead of a + // `anyhow::Result`. Then we could return a `MethodNotFound` from + // `callMethod()`. + // + // assert_eq!(reply.error.code, JsonRpcErrorCode::MethodNotFound); + }, + _ => panic!("Unexpected response: {:?}", response), + } + + // Mark not busy (this prevents the frontend comm from being closed due to + // the Sender being dropped) + ui_comm_tx + .send(UiCommMessage::Event(UiFrontendEvent::Busy(BusyParams { + busy: false, + }))) + .unwrap(); + } +} diff --git a/crates/ark/tests/ui.rs b/crates/ark/tests/ui.rs deleted file mode 100644 index 8106dbf4a..000000000 --- a/crates/ark/tests/ui.rs +++ /dev/null @@ -1,141 +0,0 @@ -// -// ui.rs -// -// Copyright (C) 2023-2024 Posit Software, PBC. All rights reserved. -// -// - -use amalthea::comm::base_comm::JsonRpcError; -use amalthea::comm::comm_channel::CommMsg; -use amalthea::comm::ui_comm::BusyParams; -use amalthea::comm::ui_comm::CallMethodParams; -use amalthea::comm::ui_comm::UiBackendReply; -use amalthea::comm::ui_comm::UiBackendRequest; -use amalthea::comm::ui_comm::UiFrontendEvent; -use amalthea::socket::comm::CommInitiator; -use amalthea::socket::comm::CommSocket; -use amalthea::socket::stdin::StdInRequest; -use ark::r_task::r_task; -use ark::ui::UiComm; -use ark::ui::UiCommMessage; -use crossbeam::channel::bounded; -use harp::exec::RFunction; -use harp::exec::RFunctionExt; -use harp::object::RObject; -use serde_json::Value; - -/** - * Basic tests for the UI comm. - */ -#[test] -fn test_ui_comm() { - // Create a sender/receiver pair for the comm channel. - let comm_socket = CommSocket::new( - CommInitiator::FrontEnd, - String::from("test-ui-comm-id"), - String::from("positron.UI"), - ); - - // Communication channel between the main thread and the Amalthea - // StdIn socket thread - let (stdin_request_tx, _stdin_request_rx) = bounded::(1); - - // Create a frontend instance, get access to the sender channel - let ui_comm_tx = UiComm::start(comm_socket.clone(), stdin_request_tx); - - // Get the current console width - let old_width = r_task(|| unsafe { - let width = RFunction::from("getOption") - .param("x", "width") - .call() - .unwrap(); - RObject::to::(width).unwrap() - }); - - // Send a message to the frontend - let id = String::from("test-id-1"); - let request = UiBackendRequest::CallMethod(CallMethodParams { - method: String::from("setConsoleWidth"), - params: vec![Value::from(123)], - }); - comm_socket - .incoming_tx - .send(CommMsg::Rpc(id, serde_json::to_value(request).unwrap())) - .unwrap(); - - // Wait for the reply; this should be a FrontendRpcResult. We don't wait - // more than a second since this should be quite fast and we don't want to - // hang the test suite if it doesn't return. - let response = comm_socket - .outgoing_rx - .recv_timeout(std::time::Duration::from_secs(1)) - .unwrap(); - match response { - CommMsg::Rpc(id, result) => { - println!("Got RPC result: {:?}", result); - let result = serde_json::from_value::(result).unwrap(); - assert_eq!(id, "test-id-1"); - // This RPC should return the old width - assert_eq!( - result, - UiBackendReply::CallMethodReply(Value::from(old_width)) - ); - }, - _ => panic!("Unexpected response: {:?}", response), - } - - // Get the new console width - let new_width = r_task(|| unsafe { - let width = RFunction::from("getOption") - .param("x", "width") - .call() - .unwrap(); - RObject::to::(width).unwrap() - }); - - // Assert that the console width changed - assert_eq!(new_width, 123); - - // Now try to invoke an RPC that doesn't exist - let id = String::from("test-id-2"); - let request = UiBackendRequest::CallMethod(CallMethodParams { - method: String::from("thisRpcDoesNotExist"), - params: vec![], - }); - comm_socket - .incoming_tx - .send(CommMsg::Rpc(id, serde_json::to_value(request).unwrap())) - .unwrap(); - - // Wait for the reply - let response = comm_socket - .outgoing_rx - .recv_timeout(std::time::Duration::from_secs(1)) - .unwrap(); - match response { - CommMsg::Rpc(id, result) => { - println!("Got RPC result: {:?}", result); - let _reply = serde_json::from_value::(result).unwrap(); - // Ensure that the error code is -32601 (method not found) - assert_eq!(id, "test-id-2"); - - // TODO: This should normally throw a `MethodNotFound` but - // that's currently a bit hard because of the nested method - // call. One way to solve this would be for RPC handler - // functions to return a typed JSON-RPC error instead of a - // `anyhow::Result`. Then we could return a `MethodNotFound` from - // `callMethod()`. - // - // assert_eq!(reply.error.code, JsonRpcErrorCode::MethodNotFound); - }, - _ => panic!("Unexpected response: {:?}", response), - } - - // Mark not busy (this prevents the frontend comm from being closed due to - // the Sender being dropped) - ui_comm_tx - .send(UiCommMessage::Event(UiFrontendEvent::Busy(BusyParams { - busy: false, - }))) - .unwrap(); -} From 1b3a1e9185a69191fc61241feafde4dae1eb4ad6 Mon Sep 17 00:00:00 2001 From: Lionel Henry Date: Mon, 5 May 2025 09:24:01 -0400 Subject: [PATCH 3/9] Rework how Ark tasks and R events are polled --- crates/ark/src/interface.rs | 48 +++++++------- crates/ark/src/plots/graphics_device.rs | 2 + crates/ark/src/r_task.rs | 88 +++++++++++++------------ 3 files changed, 72 insertions(+), 66 deletions(-) diff --git a/crates/ark/src/interface.rs b/crates/ark/src/interface.rs index f97487c65..d5f08cc02 100644 --- a/crates/ark/src/interface.rs +++ b/crates/ark/src/interface.rs @@ -52,7 +52,6 @@ use amalthea::Error; use anyhow::*; use bus::Bus; use crossbeam::channel::bounded; -use crossbeam::channel::unbounded; use crossbeam::channel::Receiver; use crossbeam::channel::Sender; use harp::command::r_command; @@ -336,9 +335,7 @@ impl RMain { }; } - // Channels to send/receive tasks from auxiliary threads via `RTask`s - let (tasks_interrupt_tx, tasks_interrupt_rx) = unbounded::(); - let (tasks_idle_tx, tasks_idle_rx) = unbounded::(); + let (tasks_interrupt_rx, tasks_idle_rx) = r_task::take_receivers(); R_MAIN.set(UnsafeCell::new(RMain::new( tasks_interrupt_rx, @@ -437,12 +434,6 @@ impl RMain { .or_log_error(&format!("Failed to source startup file '{file}' due to")); } - // R and ark are now set up enough to allow interrupt-time and idle-time tasks - // to be sent through. Idle-time tasks will be run once we enter - // `read_console()` for the first time. Interrupt-time tasks could be run - // sooner if we hit a check-interrupt before then. - r_task::initialize(tasks_interrupt_tx, tasks_idle_tx); - // Initialize support functions (after routine registration, after r_task initialization) match modules::initialize() { Err(err) => { @@ -784,10 +775,14 @@ impl RMain { let tasks_interrupt_rx = self.tasks_interrupt_rx.clone(); let tasks_idle_rx = self.tasks_idle_rx.clone(); + // Process R's polled events regularly while waiting for console input + let polled_events_rx = crossbeam::channel::tick(Duration::from_millis(50)); + let r_request_index = select.recv(&r_request_rx); let stdin_reply_index = select.recv(&stdin_reply_rx); let kernel_request_index = select.recv(&kernel_request_rx); let tasks_interrupt_index = select.recv(&tasks_interrupt_rx); + let polled_events_index = select.recv(&polled_events_rx); // Don't process idle tasks in browser prompts. We currently don't want // idle tasks (e.g. for srcref generation) to run when the call stack is @@ -833,18 +828,7 @@ impl RMain { } } - let oper = select.select_timeout(Duration::from_millis(200)); - - let Ok(oper) = oper else { - // We hit a timeout. Process idle events because we need to - // pump the event loop while waiting for console input. - // - // Alternatively, we could try to figure out the file - // descriptors that R has open and select() on those for - // available data? - unsafe { Self::process_idle_events() }; - continue; - }; + let oper = select.select(); match oper.index() { // We've got an execute request from the frontend @@ -884,6 +868,12 @@ impl RMain { self.handle_task(task); }, + // It's time to run R's polled events + i if i == polled_events_index => { + let _ = oper.recv(&polled_events_rx).unwrap(); + Self::process_idle_events(); + }, + i => log::error!("Unexpected index in Select: {i}"), } } @@ -1808,6 +1798,14 @@ impl RMain { /// Invoked by the R event loop fn polled_events(&mut self) { + // Don't process tasks until R is fully initialized + if !Self::is_initialized() { + if !self.tasks_interrupt_rx.is_empty() { + log::trace!("Delaying execution of interrupt task as R isn't initialized yet"); + } + return; + } + // Skip running tasks if we don't have 128KB of stack space available. // This is 1/8th of the typical Windows stack space (1MB, whereas macOS // and Linux have 8MB). @@ -1826,21 +1824,21 @@ impl RMain { } } - unsafe fn process_idle_events() { + fn process_idle_events() { // Process regular R events. We're normally running with polled // events disabled so that won't run here. We also run with // interrupts disabled, so on Windows those won't get run here // either (i.e. if `UserBreak` is set), but it will reset `UserBreak` // so we need to ensure we handle interrupts right before calling // this. - R_ProcessEvents(); + unsafe { R_ProcessEvents() }; crate::sys::interface::run_activity_handlers(); // Run pending finalizers. We need to do this eagerly as otherwise finalizers // might end up being executed on the LSP thread. // https://github.com/rstudio/positron/issues/431 - R_RunPendingFinalizers(); + unsafe { R_RunPendingFinalizers() }; // Check for Positron render requests graphics_device::on_process_idle_events(); diff --git a/crates/ark/src/plots/graphics_device.rs b/crates/ark/src/plots/graphics_device.rs index 13980ce47..5c4c977ac 100644 --- a/crates/ark/src/plots/graphics_device.rs +++ b/crates/ark/src/plots/graphics_device.rs @@ -78,6 +78,8 @@ pub(crate) fn init_graphics_device( } async fn process_notifications(graphics_device_rx: Receiver) { + log::trace!("Now listening for graphics device notifications"); + loop { let mut i = 0; diff --git a/crates/ark/src/r_task.rs b/crates/ark/src/r_task.rs index b7d45a05b..7951daa38 100644 --- a/crates/ark/src/r_task.rs +++ b/crates/ark/src/r_task.rs @@ -8,23 +8,66 @@ use std::future::Future; use std::pin::Pin; use std::sync::Arc; +use std::sync::LazyLock; use std::sync::Mutex; -use std::sync::OnceLock; use std::time::Duration; use crossbeam::channel::bounded; +use crossbeam::channel::unbounded; +use crossbeam::channel::Receiver; use crossbeam::channel::Sender; use uuid::Uuid; use crate::fixtures::r_test_init; use crate::interface::RMain; +/// Task channels for interrupt-time tasks +static INTERRUPT_TASKS: LazyLock = LazyLock::new(|| TaskChannels::new()); + +/// Task channels for idle-time tasks +static IDLE_TASKS: LazyLock = LazyLock::new(|| TaskChannels::new()); + // Compared to `futures::BoxFuture`, this doesn't require the future to be Send. // We don't need this bound since the executor runs on only on the R thread pub(crate) type BoxFuture<'a, T> = Pin + 'a>>; type SharedOption = Arc>>; +/// Manages task channels for sending tasks to `R_MAIN`. +struct TaskChannels { + tx: Sender, + rx: Mutex>>, +} + +impl TaskChannels { + fn new() -> Self { + let (tx, rx) = unbounded::(); + Self { + tx, + rx: Mutex::new(Some(rx)), + } + } + + fn tx(&self) -> Sender { + self.tx.clone() + } + + fn take_rx(&self) -> Option> { + let mut rx = self.rx.lock().unwrap(); + rx.take() + } +} + +/// Returns receivers for both interrupt and idle tasks. +/// Initializes the task channels if they haven't been initialized yet. +/// Can only be called once (intended for `RMain` during init). +pub(crate) fn take_receivers() -> (Receiver, Receiver) { + ( + INTERRUPT_TASKS.take_rx().unwrap(), + IDLE_TASKS.take_rx().unwrap(), + ) +} + pub enum RTask { Sync(RTaskSync), Async(RTaskAsync), @@ -191,7 +234,7 @@ where status_tx: Some(status_tx), start_info: RTaskStartInfo::new(false), }); - get_tasks_interrupt_tx().send(task).unwrap(); + INTERRUPT_TASKS.tx().send(task).unwrap(); // Block until we get the signal that the task has started let status = status_rx.recv().unwrap(); @@ -265,9 +308,9 @@ where // Note that this blocks until the channels are initialized, // even though these are async tasks! let tasks_tx = if only_idle { - get_tasks_idle_tx() + IDLE_TASKS.tx() } else { - get_tasks_interrupt_tx() + INTERRUPT_TASKS.tx() }; // Send the async task to the R thread @@ -280,42 +323,5 @@ where tasks_tx.send(task).unwrap(); } -/// Channel for sending tasks to `R_MAIN`. Initialized by `initialize()`, but -/// is otherwise only accessed to create `RTask`s. -static R_MAIN_TASKS_INTERRUPT_TX: OnceLock> = OnceLock::new(); -static R_MAIN_TASKS_IDLE_TX: OnceLock> = OnceLock::new(); - -pub fn initialize(tasks_tx: Sender, tasks_idle_tx: Sender) { - R_MAIN_TASKS_INTERRUPT_TX.set(tasks_tx).unwrap(); - R_MAIN_TASKS_IDLE_TX.set(tasks_idle_tx).unwrap(); -} - -// Be defensive for the case an auxiliary thread runs a task before R is initialized -// by `RMain::start()` which calls `r_task::initialize()` -fn get_tasks_interrupt_tx() -> &'static Sender { - get_tx(&R_MAIN_TASKS_INTERRUPT_TX) -} -fn get_tasks_idle_tx() -> &'static Sender { - get_tx(&R_MAIN_TASKS_IDLE_TX) -} - -fn get_tx(once_tx: &'static OnceLock>) -> &'static Sender { - let now = std::time::Instant::now(); - - loop { - if let Some(tx) = once_tx.get() { - return tx; - } - - // Wait for `initialize()` - log::info!("`tasks_tx` not yet initialized, going to sleep for 50ms."); - std::thread::sleep(Duration::from_millis(50)); - - if now.elapsed().as_secs() > 50 { - panic!("Can't acquire `tasks_tx` after 50 seconds."); - } - } -} - // Tests are tricky because `harp::fixtures::r_test_init()` is very bare bones and // doesn't have an `R_MAIN` or `R_MAIN_TASKS_TX`. From 1e4782940d1670e175646bdf6f7c2bf087cc2447 Mon Sep 17 00:00:00 2001 From: Lionel Henry Date: Mon, 5 May 2025 10:54:44 -0400 Subject: [PATCH 4/9] Finish renaming "policy" to "settings" --- crates/ark/src/plots/graphics_device.rs | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/crates/ark/src/plots/graphics_device.rs b/crates/ark/src/plots/graphics_device.rs index 5c4c977ac..4133711bf 100644 --- a/crates/ark/src/plots/graphics_device.rs +++ b/crates/ark/src/plots/graphics_device.rs @@ -638,7 +638,7 @@ impl DeviceContext { fn create_display_data_plot(&self, id: &PlotId) -> Result { // TODO: Take these from R global options? Like `ark.plot.width`? - let policy = PlotRenderSettings { + let settings = PlotRenderSettings { size: PlotSize { width: 800, height: 600, @@ -647,7 +647,7 @@ impl DeviceContext { format: PlotRenderFormat::Png, }; - let data = unwrap!(self.render_plot(id, &policy), Err(error) => { + let data = unwrap!(self.render_plot(id, &settings), Err(error) => { return Err(anyhow!("Failed to render plot with id {id} due to: {error}.")); }); @@ -658,16 +658,16 @@ impl DeviceContext { } #[tracing::instrument(level = "trace", skip(self))] - fn render_plot(&self, id: &PlotId, policy: &PlotRenderSettings) -> anyhow::Result { + fn render_plot(&self, id: &PlotId, settings: &PlotRenderSettings) -> anyhow::Result { log::trace!("Rendering plot"); let image_path = r_task(|| unsafe { RFunction::from(".ps.graphics.render_plot_from_recording") .param("id", id) - .param("width", RObject::try_from(policy.size.width)?) - .param("height", RObject::try_from(policy.size.height)?) - .param("pixel_ratio", policy.pixel_ratio) - .param("format", policy.format.to_string()) + .param("width", RObject::try_from(settings.size.width)?) + .param("height", RObject::try_from(settings.size.height)?) + .param("pixel_ratio", settings.pixel_ratio) + .param("format", settings.format.to_string()) .call()? .to::() }); From 53bb9c476549463b07ed9556a9fc473fee6122bc Mon Sep 17 00:00:00 2001 From: Lionel Henry Date: Mon, 5 May 2025 17:57:29 -0400 Subject: [PATCH 5/9] Be defensive against invalid sizes --- crates/ark/src/ui/ui.rs | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/crates/ark/src/ui/ui.rs b/crates/ark/src/ui/ui.rs index b2945b8ac..ca93bcff4 100644 --- a/crates/ark/src/ui/ui.rs +++ b/crates/ark/src/ui/ui.rs @@ -198,6 +198,15 @@ impl UiComm { &self, params: DidChangePlotsRenderSettingsParams, ) -> anyhow::Result { + // The frontend shoudn't send invalid sizes but be defensive. Sometimes + // the plot container is in a strange state when it's hidden. + if params.settings.size.height <= 0 || params.settings.size.width <= 0 { + return Err(anyhow::anyhow!( + "Got invalid plot render size: {size:?}", + size = params.settings.size, + )); + } + self.graphics_device_tx .send(GraphicsDeviceNotification::DidChangePlotRenderSettings( params.settings, From ca061ad922d93b0d3e745d265cc4f0f1b0415e54 Mon Sep 17 00:00:00 2001 From: Lionel Henry Date: Mon, 5 May 2025 13:13:20 -0400 Subject: [PATCH 6/9] Don't update render settings from render requests These might be one-off requests e.g. for saving a file --- crates/ark/src/plots/graphics_device.rs | 8 -------- 1 file changed, 8 deletions(-) diff --git a/crates/ark/src/plots/graphics_device.rs b/crates/ark/src/plots/graphics_device.rs index 4133711bf..a1baf75a4 100644 --- a/crates/ark/src/plots/graphics_device.rs +++ b/crates/ark/src/plots/graphics_device.rs @@ -414,14 +414,6 @@ impl DeviceContext { format: plot_meta.format, }; - // Update the current rendering policy so that pre-rendering is - // as accurate as possible. - // TODO: Once we get render policy events we should use that instead. - // This way a one-off render request with special settings won't cause - // the next pre-render to be invalid and force the frontend to request - // a proper render. - self.current_render_settings.replace(settings); - let data = self.render_plot(&id, &settings)?; let mime_type = Self::get_mime_type(&plot_meta.format); From a0b90a021b75e30214b7b88b811a08d646035c40 Mon Sep 17 00:00:00 2001 From: Lionel Henry Date: Tue, 6 May 2025 03:18:13 -0400 Subject: [PATCH 7/9] Take async channel in `process_notifications()` So that we only process notifications when a message is waiting --- crates/ark/src/interface.rs | 3 ++- crates/ark/src/plots/graphics_device.rs | 26 ++++++++++--------------- crates/ark/src/shell.rs | 7 ++++--- crates/ark/src/start.rs | 2 +- crates/ark/src/ui/ui.rs | 7 ++++--- 5 files changed, 21 insertions(+), 24 deletions(-) diff --git a/crates/ark/src/interface.rs b/crates/ark/src/interface.rs index d5f08cc02..733838a06 100644 --- a/crates/ark/src/interface.rs +++ b/crates/ark/src/interface.rs @@ -88,6 +88,7 @@ use regex::Regex; use serde_json::json; use stdext::result::ResultOrLog; use stdext::*; +use tokio::sync::mpsc::UnboundedReceiver as AsyncUnboundedReceiver; use uuid::Uuid; use crate::dap::dap::DapBackendEvent; @@ -323,7 +324,7 @@ impl RMain { dap: Arc>, session_mode: SessionMode, default_repos: DefaultRepos, - graphics_device_rx: Receiver, + graphics_device_rx: AsyncUnboundedReceiver, ) { // Set the main thread ID. // Must happen before doing anything that checks `RMain::on_main_thread()`, diff --git a/crates/ark/src/plots/graphics_device.rs b/crates/ark/src/plots/graphics_device.rs index a1baf75a4..ae5f98723 100644 --- a/crates/ark/src/plots/graphics_device.rs +++ b/crates/ark/src/plots/graphics_device.rs @@ -32,7 +32,6 @@ use amalthea::wire::update_display_data::UpdateDisplayData; use anyhow::anyhow; use base64::engine::general_purpose; use base64::Engine; -use crossbeam::channel::Receiver; use crossbeam::channel::Select; use crossbeam::channel::Sender; use harp::exec::RFunction; @@ -45,7 +44,7 @@ use libr::SEXP; use serde_json::json; use stdext::result::ResultOrLog; use stdext::unwrap; -use tokio::task::yield_now; +use tokio::sync::mpsc::UnboundedReceiver as AsyncUnboundedReceiver; use uuid::Uuid; use crate::interface::RMain; @@ -69,7 +68,7 @@ const POSITRON_PLOT_CHANNEL_ID: &str = "positron.plot"; pub(crate) fn init_graphics_device( comm_manager_tx: Sender, iopub_tx: Sender, - graphics_device_rx: Receiver, + graphics_device_rx: AsyncUnboundedReceiver, ) { DEVICE_CONTEXT.set(DeviceContext::new(comm_manager_tx, iopub_tx)); @@ -77,32 +76,27 @@ pub(crate) fn init_graphics_device( r_task::spawn_interrupt(|| async move { process_notifications(graphics_device_rx).await }); } -async fn process_notifications(graphics_device_rx: Receiver) { +async fn process_notifications( + mut graphics_device_rx: AsyncUnboundedReceiver, +) { log::trace!("Now listening for graphics device notifications"); loop { - let mut i = 0; - - while let Ok(notification) = graphics_device_rx.try_recv() { + while let Some(notification) = graphics_device_rx.recv().await { log::trace!("Got graphics device notification: {notification:#?}"); - i = i + 1; match notification { GraphicsDeviceNotification::DidChangePlotRenderSettings(plot_render_settings) => { + // Safety: Note that `DEVICE_CONTEXT` is accessed at + // interrupt time. Other methods in this file should be + // written in accordance and avoid causing R interrupt + // checks while they themselves access the device. DEVICE_CONTEXT.with_borrow(|ctx| { ctx.current_render_settings.replace(plot_render_settings) }); }, } - - // Yield regularly to the R thread when something went wrong on the - // frontend side and is spamming messages - if i >= 5 { - break; - } } - - yield_now().await; } } diff --git a/crates/ark/src/shell.rs b/crates/ark/src/shell.rs index 0a092c7b2..af95a13df 100644 --- a/crates/ark/src/shell.rs +++ b/crates/ark/src/shell.rs @@ -37,6 +37,7 @@ use harp::ParseResult; use log::*; use serde_json::json; use stdext::unwrap; +use tokio::sync::mpsc::UnboundedSender as AsyncUnboundedSender; use crate::help::r_help::RHelp; use crate::help_proxy; @@ -56,7 +57,7 @@ pub struct Shell { kernel_request_tx: Sender, kernel_init_rx: BusReader, kernel_info: Option, - graphics_device_tx: Sender, + graphics_device_tx: AsyncUnboundedSender, } #[derive(Debug)] @@ -72,7 +73,7 @@ impl Shell { stdin_request_tx: Sender, kernel_init_rx: BusReader, kernel_request_tx: Sender, - graphics_device_tx: Sender, + graphics_device_tx: AsyncUnboundedSender, ) -> Self { Self { comm_manager_tx, @@ -254,7 +255,7 @@ fn handle_comm_open_ui( comm: CommSocket, stdin_request_tx: Sender, kernel_request_tx: Sender, - graphics_device_tx: Sender, + graphics_device_tx: AsyncUnboundedSender, ) -> amalthea::Result { // Create a frontend to wrap the comm channel we were just given. This starts // a thread that proxies messages to the frontend. diff --git a/crates/ark/src/start.rs b/crates/ark/src/start.rs index 79111ba11..4d0ce344f 100644 --- a/crates/ark/src/start.rs +++ b/crates/ark/src/start.rs @@ -72,7 +72,7 @@ pub fn start_kernel( // Communication channel between the graphics device (running on the R // thread) and the shell thread let (graphics_device_tx, graphics_device_rx) = - crossbeam::channel::unbounded::(); + tokio::sync::mpsc::unbounded_channel::(); // Create the shell. let kernel_init_rx = kernel_init_tx.add_rx(); diff --git a/crates/ark/src/ui/ui.rs b/crates/ark/src/ui/ui.rs index ca93bcff4..93255c27e 100644 --- a/crates/ark/src/ui/ui.rs +++ b/crates/ark/src/ui/ui.rs @@ -23,6 +23,7 @@ use harp::object::RObject; use serde_json::Value; use stdext::spawn; use stdext::unwrap; +use tokio::sync::mpsc::UnboundedSender as AsyncUnboundedSender; use crate::plots::graphics_device::GraphicsDeviceNotification; use crate::r_task; @@ -40,14 +41,14 @@ pub struct UiComm { comm: CommSocket, ui_comm_rx: Receiver, stdin_request_tx: Sender, - graphics_device_tx: Sender, + graphics_device_tx: AsyncUnboundedSender, } impl UiComm { pub(crate) fn start( comm: CommSocket, stdin_request_tx: Sender, - graphics_device_tx: Sender, + graphics_device_tx: AsyncUnboundedSender, ) -> Sender { // Create a sender-receiver pair for Positron global events let (ui_comm_tx, ui_comm_rx) = crossbeam::channel::unbounded::(); @@ -264,7 +265,7 @@ mod tests { let (stdin_request_tx, _stdin_request_rx) = bounded::(1); let (graphics_device_tx, _graphics_device_rx) = - crossbeam::channel::unbounded::(); + tokio::sync::mpsc::unbounded_channel::(); // Create a frontend instance, get access to the sender channel let ui_comm_tx = UiComm::start(comm_socket.clone(), stdin_request_tx, graphics_device_tx); From b1a72e0a02019b456be16feaaeb565859d0d2327 Mon Sep 17 00:00:00 2001 From: Lionel Henry Date: Tue, 6 May 2025 07:31:49 -0400 Subject: [PATCH 8/9] More documentation --- crates/ark/src/interface.rs | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/crates/ark/src/interface.rs b/crates/ark/src/interface.rs index 733838a06..4b5d4250d 100644 --- a/crates/ark/src/interface.rs +++ b/crates/ark/src/interface.rs @@ -776,7 +776,9 @@ impl RMain { let tasks_interrupt_rx = self.tasks_interrupt_rx.clone(); let tasks_idle_rx = self.tasks_idle_rx.clone(); - // Process R's polled events regularly while waiting for console input + // Process R's polled events regularly while waiting for console input. + // We used to poll every 200ms but that lead to visible delays for the + // processing of plot events. let polled_events_rx = crossbeam::channel::tick(Duration::from_millis(50)); let r_request_index = select.recv(&r_request_rx); @@ -1841,7 +1843,11 @@ impl RMain { // https://github.com/rstudio/positron/issues/431 unsafe { R_RunPendingFinalizers() }; - // Check for Positron render requests + // Check for Positron render requests. + // + // TODO: This should move to a spawned task that'd be woken up by + // incoming messages on plot comms. This way we'll prevent the delays + // introduced by timeout-based event polling. graphics_device::on_process_idle_events(); } From 6b775bd8c1311dc7cacf5a3635fccc7f24f0c814 Mon Sep 17 00:00:00 2001 From: Lionel Henry Date: Tue, 6 May 2025 16:38:53 -0400 Subject: [PATCH 9/9] Delay initialisation of graphics device --- crates/ark/src/interface.rs | 19 ++++++++++++------- 1 file changed, 12 insertions(+), 7 deletions(-) diff --git a/crates/ark/src/interface.rs b/crates/ark/src/interface.rs index 4b5d4250d..8596f4657 100644 --- a/crates/ark/src/interface.rs +++ b/crates/ark/src/interface.rs @@ -353,13 +353,6 @@ impl RMain { let main = RMain::get_mut(); - // Initialize the GD context on this thread - graphics_device::init_graphics_device( - main.get_comm_manager_tx().clone(), - main.get_iopub_tx().clone(), - graphics_device_rx, - ); - let mut r_args = r_args.clone(); // Record if the user has requested that we don't load the site/user level R profiles @@ -477,6 +470,18 @@ impl RMain { ); Self::complete_initialization(main.banner.take(), kernel_init_tx); + // Initialize the GD context on this thread. + // Note that we do it after init is complete to avoid deadlocking + // integration tests by spawning an async task. The deadlock is caused + // by https://github.com/posit-dev/ark/blob/bd827e735970ca17102aeddfbe2c3ccf26950a36/crates/ark/src/r_task.rs#L261. + // We should be able to remove this escape hatch in `r_task()` by + // instantiating an `RMain` in unit tests as well. + graphics_device::init_graphics_device( + main.get_comm_manager_tx().clone(), + main.get_iopub_tx().clone(), + graphics_device_rx, + ); + // Now that R has started and libr and ark have fully initialized, run site and user // level R profiles, in that order if !ignore_site_r_profile {