Skip to content

Commit 5350cf1

Browse files
committed
Take async channel in process_notifications()
So that we only process notifications when a message is waiting
1 parent 27f13a2 commit 5350cf1

File tree

5 files changed

+21
-24
lines changed

5 files changed

+21
-24
lines changed

crates/ark/src/interface.rs

+2-1
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,7 @@ use regex::Regex;
8888
use serde_json::json;
8989
use stdext::result::ResultOrLog;
9090
use stdext::*;
91+
use tokio::sync::mpsc::UnboundedReceiver as AsyncUnboundedReceiver;
9192
use uuid::Uuid;
9293

9394
use crate::dap::dap::DapBackendEvent;
@@ -323,7 +324,7 @@ impl RMain {
323324
dap: Arc<Mutex<Dap>>,
324325
session_mode: SessionMode,
325326
default_repos: DefaultRepos,
326-
graphics_device_rx: Receiver<GraphicsDeviceNotification>,
327+
graphics_device_rx: AsyncUnboundedReceiver<GraphicsDeviceNotification>,
327328
) {
328329
// Set the main thread ID.
329330
// Must happen before doing anything that checks `RMain::on_main_thread()`,

crates/ark/src/plots/graphics_device.rs

+10-16
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,6 @@ use amalthea::wire::update_display_data::UpdateDisplayData;
3232
use anyhow::anyhow;
3333
use base64::engine::general_purpose;
3434
use base64::Engine;
35-
use crossbeam::channel::Receiver;
3635
use crossbeam::channel::Select;
3736
use crossbeam::channel::Sender;
3837
use harp::exec::RFunction;
@@ -45,7 +44,7 @@ use libr::SEXP;
4544
use serde_json::json;
4645
use stdext::result::ResultOrLog;
4746
use stdext::unwrap;
48-
use tokio::task::yield_now;
47+
use tokio::sync::mpsc::UnboundedReceiver as AsyncUnboundedReceiver;
4948
use uuid::Uuid;
5049

5150
use crate::interface::RMain;
@@ -69,40 +68,35 @@ const POSITRON_PLOT_CHANNEL_ID: &str = "positron.plot";
6968
pub(crate) fn init_graphics_device(
7069
comm_manager_tx: Sender<CommManagerEvent>,
7170
iopub_tx: Sender<IOPubMessage>,
72-
graphics_device_rx: Receiver<GraphicsDeviceNotification>,
71+
graphics_device_rx: AsyncUnboundedReceiver<GraphicsDeviceNotification>,
7372
) {
7473
DEVICE_CONTEXT.set(DeviceContext::new(comm_manager_tx, iopub_tx));
7574

7675
// Launch an R thread task to process messages from the frontend
7776
r_task::spawn_interrupt(|| async move { process_notifications(graphics_device_rx).await });
7877
}
7978

80-
async fn process_notifications(graphics_device_rx: Receiver<GraphicsDeviceNotification>) {
79+
async fn process_notifications(
80+
mut graphics_device_rx: AsyncUnboundedReceiver<GraphicsDeviceNotification>,
81+
) {
8182
log::trace!("Now listening for graphics device notifications");
8283

8384
loop {
84-
let mut i = 0;
85-
86-
while let Ok(notification) = graphics_device_rx.try_recv() {
85+
while let Some(notification) = graphics_device_rx.recv().await {
8786
log::trace!("Got graphics device notification: {notification:#?}");
88-
i = i + 1;
8987

9088
match notification {
9189
GraphicsDeviceNotification::DidChangePlotRenderSettings(plot_render_settings) => {
90+
// Safety: Note that `DEVICE_CONTEXT` is accessed at
91+
// interrupt time. Other methods in this file should be
92+
// written in accordance and avoid causing R interrupt
93+
// checks while they themselves access the device.
9294
DEVICE_CONTEXT.with_borrow(|ctx| {
9395
ctx.current_render_settings.replace(plot_render_settings)
9496
});
9597
},
9698
}
97-
98-
// Yield regularly to the R thread when something went wrong on the
99-
// frontend side and is spamming messages
100-
if i >= 5 {
101-
break;
102-
}
10399
}
104-
105-
yield_now().await;
106100
}
107101
}
108102

crates/ark/src/shell.rs

+4-3
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ use harp::ParseResult;
3737
use log::*;
3838
use serde_json::json;
3939
use stdext::unwrap;
40+
use tokio::sync::mpsc::UnboundedSender as AsyncUnboundedSender;
4041

4142
use crate::help::r_help::RHelp;
4243
use crate::help_proxy;
@@ -56,7 +57,7 @@ pub struct Shell {
5657
kernel_request_tx: Sender<KernelRequest>,
5758
kernel_init_rx: BusReader<KernelInfo>,
5859
kernel_info: Option<KernelInfo>,
59-
graphics_device_tx: Sender<GraphicsDeviceNotification>,
60+
graphics_device_tx: AsyncUnboundedSender<GraphicsDeviceNotification>,
6061
}
6162

6263
#[derive(Debug)]
@@ -72,7 +73,7 @@ impl Shell {
7273
stdin_request_tx: Sender<StdInRequest>,
7374
kernel_init_rx: BusReader<KernelInfo>,
7475
kernel_request_tx: Sender<KernelRequest>,
75-
graphics_device_tx: Sender<GraphicsDeviceNotification>,
76+
graphics_device_tx: AsyncUnboundedSender<GraphicsDeviceNotification>,
7677
) -> Self {
7778
Self {
7879
comm_manager_tx,
@@ -254,7 +255,7 @@ fn handle_comm_open_ui(
254255
comm: CommSocket,
255256
stdin_request_tx: Sender<StdInRequest>,
256257
kernel_request_tx: Sender<KernelRequest>,
257-
graphics_device_tx: Sender<GraphicsDeviceNotification>,
258+
graphics_device_tx: AsyncUnboundedSender<GraphicsDeviceNotification>,
258259
) -> amalthea::Result<bool> {
259260
// Create a frontend to wrap the comm channel we were just given. This starts
260261
// a thread that proxies messages to the frontend.

crates/ark/src/start.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ pub fn start_kernel(
7272
// Communication channel between the graphics device (running on the R
7373
// thread) and the shell thread
7474
let (graphics_device_tx, graphics_device_rx) =
75-
crossbeam::channel::unbounded::<GraphicsDeviceNotification>();
75+
tokio::sync::mpsc::unbounded_channel::<GraphicsDeviceNotification>();
7676

7777
// Create the shell.
7878
let kernel_init_rx = kernel_init_tx.add_rx();

crates/ark/src/ui/ui.rs

+4-3
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ use harp::object::RObject;
2323
use serde_json::Value;
2424
use stdext::spawn;
2525
use stdext::unwrap;
26+
use tokio::sync::mpsc::UnboundedSender as AsyncUnboundedSender;
2627

2728
use crate::plots::graphics_device::GraphicsDeviceNotification;
2829
use crate::r_task;
@@ -40,7 +41,7 @@ pub struct UiComm {
4041
comm: CommSocket,
4142
ui_comm_rx: Receiver<UiCommMessage>,
4243
stdin_request_tx: Sender<StdInRequest>,
43-
graphics_device_tx: Sender<GraphicsDeviceNotification>,
44+
graphics_device_tx: AsyncUnboundedSender<GraphicsDeviceNotification>,
4445
}
4546

4647
// Pass a channel to the
@@ -49,7 +50,7 @@ impl UiComm {
4950
pub(crate) fn start(
5051
comm: CommSocket,
5152
stdin_request_tx: Sender<StdInRequest>,
52-
graphics_device_tx: Sender<GraphicsDeviceNotification>,
53+
graphics_device_tx: AsyncUnboundedSender<GraphicsDeviceNotification>,
5354
) -> Sender<UiCommMessage> {
5455
// Create a sender-receiver pair for Positron global events
5556
let (ui_comm_tx, ui_comm_rx) = crossbeam::channel::unbounded::<UiCommMessage>();
@@ -266,7 +267,7 @@ mod tests {
266267
let (stdin_request_tx, _stdin_request_rx) = bounded::<StdInRequest>(1);
267268

268269
let (graphics_device_tx, _graphics_device_rx) =
269-
crossbeam::channel::unbounded::<GraphicsDeviceNotification>();
270+
tokio::sync::mpsc::unbounded_channel::<GraphicsDeviceNotification>();
270271

271272
// Create a frontend instance, get access to the sender channel
272273
let ui_comm_tx = UiComm::start(comm_socket.clone(), stdin_request_tx, graphics_device_tx);

0 commit comments

Comments
 (0)