diff --git a/Cargo.toml b/Cargo.toml index ed5dec75c..720075733 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -2,7 +2,11 @@ # Was necessary after switching to dev tree-sitter to fix this warning: # > some crates are on edition 2021 which defaults to `resolver = "2"`, but -# > virtual workspaces default to `resolver = "1"` +# > virtual workspaces default to `resolver = "1"`. +# +# Also necessary to enable the `testing` feature of harp only when testing +# (i.e. when building downstream packages like Ark with Harp's `testing` +# feature set in `dev-dependencies`). resolver = "2" members = [ diff --git a/crates/amalthea/Cargo.toml b/crates/amalthea/Cargo.toml index 3fa289f39..5613605e5 100644 --- a/crates/amalthea/Cargo.toml +++ b/crates/amalthea/Cargo.toml @@ -19,6 +19,8 @@ hex = "0.4.3" hmac = "0.12.1" log = "0.4.17" nix = "0.26.2" +portpicker = "0.1.1" +rand = "0.8.5" serde = { version = "1.0.154", features = ["derive"] } serde_json = { version = "1.0.94", features = ["preserve_order"]} sha2 = "0.10.6" @@ -34,6 +36,4 @@ serde_repr = "0.1.17" tracing = "0.1.40" [dev-dependencies] -rand = "0.8.5" -portpicker = "0.1.1" env_logger = "0.10.0" diff --git a/crates/amalthea/tests/frontend/mod.rs b/crates/amalthea/src/fixtures/dummy_frontend.rs similarity index 53% rename from crates/amalthea/tests/frontend/mod.rs rename to crates/amalthea/src/fixtures/dummy_frontend.rs index 976402118..ca8465f0e 100644 --- a/crates/amalthea/tests/frontend/mod.rs +++ b/crates/amalthea/src/fixtures/dummy_frontend.rs @@ -1,18 +1,26 @@ /* - * mod.rs + * dummy_frontend.rs * - * Copyright (C) 2022 Posit Software, PBC. All rights reserved. + * Copyright (C) 2022-2024 Posit Software, PBC. All rights reserved. * */ -use amalthea::connection_file::ConnectionFile; -use amalthea::session::Session; -use amalthea::socket::socket::Socket; -use amalthea::wire::jupyter_message::JupyterMessage; -use amalthea::wire::jupyter_message::Message; -use amalthea::wire::jupyter_message::ProtocolMessage; +use serde_json::Value; +use stdext::assert_match; -pub struct Frontend { +use crate::connection_file::ConnectionFile; +use crate::session::Session; +use crate::socket::socket::Socket; +use crate::wire::execute_input::ExecuteInput; +use crate::wire::execute_reply::ExecuteReply; +use crate::wire::execute_request::ExecuteRequest; +use crate::wire::jupyter_message::JupyterMessage; +use crate::wire::jupyter_message::Message; +use crate::wire::jupyter_message::ProtocolMessage; +use crate::wire::status::ExecutionState; +use crate::wire::wire_message::WireMessage; + +pub struct DummyFrontend { pub _control_socket: Socket, pub shell_socket: Socket, pub iopub_socket: Socket, @@ -27,7 +35,7 @@ pub struct Frontend { heartbeat_port: u16, } -impl Frontend { +impl DummyFrontend { pub fn new() -> Self { use rand::Rng; @@ -117,7 +125,7 @@ impl Frontend { /// Completes initialization of the frontend (usually done after the kernel /// is ready and connected) - pub fn complete_intialization(&self) { + pub fn complete_initialization(&self) { self.iopub_socket.subscribe().unwrap(); } @@ -130,6 +138,17 @@ impl Frontend { id } + pub fn send_execute_request(&self, code: &str) -> String { + self.send_shell(ExecuteRequest { + code: String::from(code), + silent: false, + store_history: true, + user_expressions: serde_json::Value::Null, + allow_stdin: false, + stop_on_error: false, + }) + } + /// Sends a Jupyter message on the Stdin socket pub fn send_stdin(&self, msg: T) { let message = JupyterMessage::create(msg, None, &self.session); @@ -137,22 +156,82 @@ impl Frontend { } /// Receives a Jupyter message from the Shell socket - pub fn receive_shell(&self) -> Message { + pub fn recv_shell(&self) -> Message { Message::read_from_socket(&self.shell_socket).unwrap() } + /// Receive from Shell and assert ExecuteReply message + pub fn recv_shell_execute_reply(&self) -> ExecuteReply { + let msg = self.recv_shell(); + + assert_match!(msg, Message::ExecuteReply(data) => { + data.content + }) + } + /// Receives a Jupyter message from the IOPub socket - pub fn receive_iopub(&self) -> Message { + pub fn recv_iopub(&self) -> Message { Message::read_from_socket(&self.iopub_socket).unwrap() } + /// Receive from IOPub and assert Busy message + pub fn recv_iopub_busy(&self) -> () { + let msg = self.recv_iopub(); + + assert_match!(msg, Message::Status(data) => { + assert_eq!(data.content.execution_state, ExecutionState::Busy); + }); + } + + /// Receive from IOPub and assert Idle message + pub fn recv_iopub_idle(&self) -> () { + let msg = self.recv_iopub(); + + assert_match!(msg, Message::Status(data) => { + assert_eq!(data.content.execution_state, ExecutionState::Idle); + }); + } + + /// Receive from IOPub and assert ExecuteInput message + pub fn recv_iopub_execute_input(&self) -> ExecuteInput { + let msg = self.recv_iopub(); + + assert_match!(msg, Message::ExecuteInput(data) => { + data.content + }) + } + + /// Receive from IOPub and assert ExecuteResult message. Returns compulsory + /// `plain/text` result. + pub fn recv_iopub_execute_result(&self) -> String { + let msg = self.recv_iopub(); + + assert_match!(msg, Message::ExecuteResult(data) => { + assert_match!(data.content.data, Value::Object(map) => { + assert_match!(map["text/plain"], Value::String(ref string) => { + string.clone() + }) + }) + }) + } + + /// Receive from IOPub and assert ExecuteResult message. Returns compulsory + /// `evalue` field. + pub fn recv_iopub_execute_error(&self) -> String { + let msg = self.recv_iopub(); + + assert_match!(msg, Message::ExecuteError(data) => { + data.content.exception.evalue + }) + } + /// Receives a Jupyter message from the Stdin socket - pub fn receive_stdin(&self) -> Message { + pub fn recv_stdin(&self) -> Message { Message::read_from_socket(&self.stdin_socket).unwrap() } /// Receives a (raw) message from the heartbeat socket - pub fn receive_heartbeat(&self) -> zmq::Message { + pub fn recv_heartbeat(&self) -> zmq::Message { let mut msg = zmq::Message::new(); self.heartbeat_socket.recv(&mut msg).unwrap(); msg @@ -178,4 +257,39 @@ impl Frontend { key: self.key.clone(), } } + + /// Asserts that no socket has incoming data + pub fn assert_no_incoming(&mut self) { + let mut has_incoming = false; + + if self.iopub_socket.has_incoming_data().unwrap() { + has_incoming = true; + Self::flush_incoming("IOPub", &self.iopub_socket); + } + if self.shell_socket.has_incoming_data().unwrap() { + has_incoming = true; + Self::flush_incoming("Shell", &self.shell_socket); + } + if self.stdin_socket.has_incoming_data().unwrap() { + has_incoming = true; + Self::flush_incoming("StdIn", &self.stdin_socket); + } + if self.heartbeat_socket.has_incoming_data().unwrap() { + has_incoming = true; + Self::flush_incoming("Heartbeat", &self.heartbeat_socket); + } + + if has_incoming { + panic!("Sockets must be empty on exit (see details above)"); + } + } + + fn flush_incoming(name: &str, socket: &Socket) { + println!("{name} has incoming data:"); + + while socket.has_incoming_data().unwrap() { + dbg!(WireMessage::read_from_socket(socket).unwrap()); + println!("---"); + } + } } diff --git a/crates/amalthea/src/fixtures/mod.rs b/crates/amalthea/src/fixtures/mod.rs new file mode 100644 index 000000000..4e9fa5f0e --- /dev/null +++ b/crates/amalthea/src/fixtures/mod.rs @@ -0,0 +1 @@ +pub mod dummy_frontend; diff --git a/crates/amalthea/src/lib.rs b/crates/amalthea/src/lib.rs index b056087a1..7fd85f73c 100644 --- a/crates/amalthea/src/lib.rs +++ b/crates/amalthea/src/lib.rs @@ -8,6 +8,7 @@ pub mod comm; pub mod connection_file; pub mod error; +pub mod fixtures; pub mod kernel; pub mod kernel_dirs; pub mod kernel_spec; diff --git a/crates/amalthea/src/socket/socket.rs b/crates/amalthea/src/socket/socket.rs index 90b34b93a..fb07ab51e 100644 --- a/crates/amalthea/src/socket/socket.rs +++ b/crates/amalthea/src/socket/socket.rs @@ -180,6 +180,10 @@ impl Socket { } } + pub fn has_incoming_data(&self) -> zmq::Result { + Ok(self.socket.poll(zmq::PollEvents::POLLIN, 0)? != 0) + } + /// Subscribes a SUB socket to all the published messages from a PUB socket. /// /// Note that this needs to be called *after* the socket connection is diff --git a/crates/amalthea/tests/client.rs b/crates/amalthea/tests/client.rs index 59aca3ce1..ad270c303 100644 --- a/crates/amalthea/tests/client.rs +++ b/crates/amalthea/tests/client.rs @@ -10,6 +10,7 @@ use std::sync::Mutex; use amalthea::comm::comm_channel::CommMsg; use amalthea::comm::event::CommManagerEvent; +use amalthea::fixtures::dummy_frontend::DummyFrontend; use amalthea::kernel::Kernel; use amalthea::kernel::StreamBehavior; use amalthea::socket::comm::CommInitiator; @@ -38,23 +39,22 @@ use log::info; use serde_json; mod control; -mod frontend; mod shell; #[test] fn test_kernel() { - let frontend = frontend::Frontend::new(); + let frontend = DummyFrontend::new(); let connection_file = frontend.get_connection_file(); let mut kernel = Kernel::new("amalthea", connection_file).unwrap(); - let shell_tx = kernel.create_iopub_tx(); + let iopub_tx = kernel.create_iopub_tx(); let comm_manager_tx = kernel.create_comm_manager_tx(); let (stdin_request_tx, stdin_request_rx) = bounded::(1); let (stdin_reply_tx, stdin_reply_rx) = unbounded(); let shell = Arc::new(Mutex::new(shell::Shell::new( - shell_tx, + iopub_tx, stdin_request_tx, stdin_reply_rx, ))); @@ -82,7 +82,7 @@ fn test_kernel() { // Complete client initialization info!("Completing frontend initialization"); - frontend.complete_intialization(); + frontend.complete_initialization(); // Ask the kernel for the kernel info. This should return an object with the // language "Test" defined in our shell handler. @@ -90,7 +90,7 @@ fn test_kernel() { frontend.send_shell(KernelInfoRequest {}); info!("Waiting for kernel info reply"); - let reply = frontend.receive_shell(); + let reply = frontend.recv_shell(); match reply { Message::KernelInfoReply(reply) => { info!("Kernel info received: {:?}", reply); @@ -114,7 +114,7 @@ fn test_kernel() { // The kernel should send an execute reply message indicating that the execute succeeded info!("Waiting for execute reply"); - let reply = frontend.receive_shell(); + let reply = frontend.recv_shell(); match reply { Message::ExecuteReply(reply) => { info!("Received execute reply: {:?}", reply); @@ -139,7 +139,7 @@ fn test_kernel() { // (for the execute_request) info!("Waiting for IOPub execution information messsage 1 of 6: Status"); - let iopub_1 = frontend.receive_iopub(); + let iopub_1 = frontend.recv_iopub(); match iopub_1 { Message::Status(status) => { info!("Got kernel status: {:?}", status); @@ -155,7 +155,7 @@ fn test_kernel() { } info!("Waiting for IOPub execution information messsage 2 of 6: Status"); - let iopub_2 = frontend.receive_iopub(); + let iopub_2 = frontend.recv_iopub(); match iopub_2 { Message::Status(status) => { info!("Got kernel status: {:?}", status); @@ -171,7 +171,7 @@ fn test_kernel() { } info!("Waiting for IOPub execution information messsage 3 of 6: Status"); - let iopub_3 = frontend.receive_iopub(); + let iopub_3 = frontend.recv_iopub(); match iopub_3 { Message::Status(status) => { info!("Got kernel status: {:?}", status); @@ -186,7 +186,7 @@ fn test_kernel() { } info!("Waiting for IOPub execution information messsage 4 of 6: Input Broadcast"); - let iopub_4 = frontend.receive_iopub(); + let iopub_4 = frontend.recv_iopub(); match iopub_4 { Message::ExecuteInput(input) => { info!("Got input rebroadcast: {:?}", input); @@ -201,7 +201,7 @@ fn test_kernel() { } info!("Waiting for IOPub execution information messsage 5 of 6: Execution Result"); - let iopub_5 = frontend.receive_iopub(); + let iopub_5 = frontend.recv_iopub(); match iopub_5 { Message::ExecuteResult(result) => { info!("Got execution result: {:?}", result); @@ -215,7 +215,7 @@ fn test_kernel() { } info!("Waiting for IOPub execution information messsage 6 of 6: Status"); - let iopub_6 = frontend.receive_iopub(); + let iopub_6 = frontend.recv_iopub(); match iopub_6 { Message::Status(status) => { info!("Got kernel status: {:?}", status); @@ -240,7 +240,7 @@ fn test_kernel() { }); info!("Waiting for kernel to send an input request"); - let request = frontend.receive_stdin(); + let request = frontend.recv_stdin(); match request { Message::InputRequest(request) => { info!("Got input request: {:?}", request); @@ -263,35 +263,35 @@ fn test_kernel() { // processing of the above `prompt` execution request assert_eq!( // Status: Busy - WireMessage::try_from(&frontend.receive_iopub()) + WireMessage::try_from(&frontend.recv_iopub()) .unwrap() .message_type(), KernelStatus::message_type() ); assert_eq!( // ExecuteInput (re-broadcast of 'Prompt') - WireMessage::try_from(&frontend.receive_iopub()) + WireMessage::try_from(&frontend.recv_iopub()) .unwrap() .message_type(), ExecuteInput::message_type() ); assert_eq!( // StreamOutput (echoed input) - WireMessage::try_from(&frontend.receive_iopub()) + WireMessage::try_from(&frontend.recv_iopub()) .unwrap() .message_type(), StreamOutput::message_type() ); assert_eq!( // ExecuteResult - WireMessage::try_from(&frontend.receive_iopub()) + WireMessage::try_from(&frontend.recv_iopub()) .unwrap() .message_type(), ExecuteResult::message_type() ); assert_eq!( // Status: Idle - WireMessage::try_from(&frontend.receive_iopub()) + WireMessage::try_from(&frontend.recv_iopub()) .unwrap() .message_type(), KernelStatus::message_type() @@ -300,7 +300,7 @@ fn test_kernel() { // The kernel should send an execute reply message indicating that the execute // of the 'prompt' command succeeded info!("Waiting for execute reply"); - let reply = frontend.receive_shell(); + let reply = frontend.recv_shell(); match reply { Message::ExecuteReply(reply) => { info!("Received execute reply: {:?}", reply); @@ -318,7 +318,7 @@ fn test_kernel() { frontend.send_heartbeat(msg); info!("Waiting for heartbeat reply"); - let reply = frontend.receive_heartbeat(); + let reply = frontend.recv_heartbeat(); assert_eq!(reply, zmq::Message::from("Heartbeat")); // Test the comms @@ -332,14 +332,14 @@ fn test_kernel() { // Absorb the IOPub messages that the kernel sends back during the // processing of the above `CommOpen` request - frontend.receive_iopub(); // Busy - frontend.receive_iopub(); // Idle + frontend.recv_iopub(); // Busy + frontend.recv_iopub(); // Idle info!("Requesting comm info from the kernel (to test opening from the frontend)"); frontend.send_shell(CommInfoRequest { target_name: "".to_string(), }); - let reply = frontend.receive_shell(); + let reply = frontend.recv_shell(); match reply { Message::CommInfoReply(request) => { info!("Got comm info: {:?}", request); @@ -362,7 +362,7 @@ fn test_kernel() { frontend.send_shell(CommInfoRequest { target_name: "i-think-not".to_string(), }); - let reply = frontend.receive_shell(); + let reply = frontend.recv_shell(); match reply { Message::CommInfoReply(request) => { info!("Got comm info: {:?}", request); @@ -383,7 +383,7 @@ fn test_kernel() { data: serde_json::Value::Null, }); loop { - let msg = frontend.receive_iopub(); + let msg = frontend.recv_iopub(); match msg { Message::CommMsg(msg) => { // This is the message we were looking for; break out of the @@ -419,8 +419,8 @@ fn test_kernel() { // Absorb the IOPub messages that the kernel sends back during the // processing of the above `CommClose` request info!("Receiving comm close IOPub messages from the kernel"); - frontend.receive_iopub(); // Busy - frontend.receive_iopub(); // Idle + frontend.recv_iopub(); // Busy + frontend.recv_iopub(); // Idle // Test to see if the comm is still in the list of comms after closing it // (it should not be) @@ -428,7 +428,7 @@ fn test_kernel() { frontend.send_shell(CommInfoRequest { target_name: "variables".to_string(), }); - let reply = frontend.receive_shell(); + let reply = frontend.recv_shell(); match reply { Message::CommInfoReply(request) => { info!("Got comm info: {:?}", request); @@ -466,7 +466,7 @@ fn test_kernel() { // // We do this in a loop because we expect a number of other messages, e.g. busy/idle loop { - let msg = frontend.receive_iopub(); + let msg = frontend.recv_iopub(); match msg { Message::CommOpen(msg) => { assert_eq!(msg.content.comm_id, test_comm_id); @@ -487,7 +487,7 @@ fn test_kernel() { frontend.send_shell(CommInfoRequest { target_name: test_comm_name.clone(), }); - let reply = frontend.receive_shell(); + let reply = frontend.recv_shell(); match reply { Message::CommInfoReply(request) => { info!("Got comm info: {:?}", request); @@ -517,7 +517,7 @@ fn test_kernel() { // Wait for the comm data message to be received by the frontend. loop { - let msg = frontend.receive_iopub(); + let msg = frontend.recv_iopub(); match msg { Message::CommMsg(msg) => { assert_eq!(msg.content.comm_id, test_comm_id); @@ -534,7 +534,7 @@ fn test_kernel() { // Ensure that the frontend is notified loop { - let msg = frontend.receive_iopub(); + let msg = frontend.recv_iopub(); match msg { Message::CommClose(msg) => { assert_eq!(msg.content.comm_id, test_comm_id); diff --git a/crates/ark/Cargo.toml b/crates/ark/Cargo.toml index 6d512c7f0..1a80c1b35 100644 --- a/crates/ark/Cargo.toml +++ b/crates/ark/Cargo.toml @@ -64,6 +64,7 @@ tracing-error = "0.2.0" [dev-dependencies] insta = { version = "1.39.0" } +harp = { path = "../harp", features = ["testing"]} [build-dependencies] chrono = "0.4.23" diff --git a/crates/ark/src/analysis/input_boundaries.rs b/crates/ark/src/analysis/input_boundaries.rs index d07f60a81..43c584513 100644 --- a/crates/ark/src/analysis/input_boundaries.rs +++ b/crates/ark/src/analysis/input_boundaries.rs @@ -258,7 +258,7 @@ fn fill_gaps( #[cfg(test)] mod tests { use crate::analysis::input_boundaries::*; - use crate::test::r_test; + use crate::fixtures::r_test; fn p(text: &str) -> Vec { let mut boundaries = input_boundaries(text).unwrap(); diff --git a/crates/ark/src/connections/r_connection.rs b/crates/ark/src/connections/r_connection.rs index 3d89a40d9..b859811dc 100644 --- a/crates/ark/src/connections/r_connection.rs +++ b/crates/ark/src/connections/r_connection.rs @@ -275,32 +275,32 @@ pub unsafe extern "C" fn ps_connection_opened( code: SEXP, ) -> Result { let id = Uuid::new_v4().to_string(); + let id_r: RObject = id.clone().into(); - // If RMain is not initialized, we are probably in testing mode, so we just don't start the connection - // and let the testing code manually do it - if RMain::initialized() { - let main = RMain::get(); - - let metadata = Metadata { - name: RObject::view(name).to::()?, - language_id: String::from("r"), - host: RObject::view(host).to::>().unwrap_or(None), - r#type: RObject::view(r#type).to::>().unwrap_or(None), - code: RObject::view(code).to::>().unwrap_or(None), - }; - - unwrap! ( - RConnection::start(metadata, main.get_comm_manager_tx().clone(), id.clone()), - Err(err) => { - log::error!("Connection Pane: Failed to start connection: {err:?}"); - return Err(err); - } - ); - } else { + if !RMain::is_initialized() { + // If RMain is not initialized, we are probably in unit tests, so we + // just don't start the connection and let the testing code manually do + // it. Note that RMain could be initialized in integration tests. log::warn!("Connection Pane: RMain is not initialized. Connection will not be started."); + return Ok(id_r.sexp); + } + + let main = RMain::get(); + + let metadata = Metadata { + name: RObject::view(name).to::()?, + language_id: String::from("r"), + host: RObject::view(host).to::>().unwrap_or(None), + r#type: RObject::view(r#type).to::>().unwrap_or(None), + code: RObject::view(code).to::>().unwrap_or(None), + }; + + if let Err(err) = RConnection::start(metadata, main.get_comm_manager_tx().clone(), id) { + log::error!("Connection Pane: Failed to start connection: {err:?}"); + return Err(err); } - Ok(RObject::from(id).into()) + return Ok(id_r.sexp); } #[harp::register] diff --git a/crates/ark/src/dap/dap_variables.rs b/crates/ark/src/dap/dap_variables.rs index 4d240996a..a23f4de25 100644 --- a/crates/ark/src/dap/dap_variables.rs +++ b/crates/ark/src/dap/dap_variables.rs @@ -411,7 +411,7 @@ mod tests { use libr::*; use crate::dap::dap_variables::env_binding_variable; - use crate::test::r_test; + use crate::fixtures::r_test; #[test] fn test_env_binding_variable_base() { diff --git a/crates/ark/src/data_explorer/export_selection.rs b/crates/ark/src/data_explorer/export_selection.rs index 8f29de701..4b44e9235 100644 --- a/crates/ark/src/data_explorer/export_selection.rs +++ b/crates/ark/src/data_explorer/export_selection.rs @@ -117,7 +117,7 @@ mod tests { use harp::object::RObject; use super::*; - use crate::test::r_test; + use crate::fixtures::r_test; fn export_selection_helper(data: RObject, selection: TableSelection) -> String { export_selection_helper_with_format(data, selection, ExportFormat::Csv) diff --git a/crates/ark/src/data_explorer/format.rs b/crates/ark/src/data_explorer/format.rs index af8067471..04855c9f1 100644 --- a/crates/ark/src/data_explorer/format.rs +++ b/crates/ark/src/data_explorer/format.rs @@ -440,7 +440,7 @@ impl Into for FormattedValue { #[cfg(test)] mod tests { use super::*; - use crate::test::r_test; + use crate::fixtures::r_test; fn default_options() -> FormatOptions { FormatOptions { diff --git a/crates/ark/src/data_explorer/histogram.rs b/crates/ark/src/data_explorer/histogram.rs index b77d542af..a933ee908 100644 --- a/crates/ark/src/data_explorer/histogram.rs +++ b/crates/ark/src/data_explorer/histogram.rs @@ -161,7 +161,7 @@ mod tests { use stdext::assert_match; use super::*; - use crate::test::r_test; + use crate::fixtures::r_test; fn default_options() -> FormatOptions { FormatOptions { diff --git a/crates/ark/src/data_explorer/summary_stats.rs b/crates/ark/src/data_explorer/summary_stats.rs index 256716de9..9197f048f 100644 --- a/crates/ark/src/data_explorer/summary_stats.rs +++ b/crates/ark/src/data_explorer/summary_stats.rs @@ -167,7 +167,7 @@ fn get_stat(stats: &HashMap, name: &str) -> anyhow::Result< #[cfg(test)] mod tests { use super::*; - use crate::test::r_test; + use crate::fixtures::r_test; fn default_options() -> FormatOptions { FormatOptions { diff --git a/crates/ark/src/fixtures/dummy_frontend.rs b/crates/ark/src/fixtures/dummy_frontend.rs new file mode 100644 index 000000000..3c8b820ed --- /dev/null +++ b/crates/ark/src/fixtures/dummy_frontend.rs @@ -0,0 +1,83 @@ +use std::ops::Deref; +use std::ops::DerefMut; +use std::sync::Arc; +use std::sync::Mutex; +use std::sync::MutexGuard; +use std::sync::OnceLock; + +use amalthea::fixtures::dummy_frontend::DummyFrontend; + +use crate::interface::RMain; +use crate::interface::SessionMode; + +// There can be only one frontend per process. Needs to be in a mutex because +// the frontend wraps zmq sockets which are unsafe to send across threads. +// +// This is using `OnceLock` because it provides a way of checking whether the +// value has been initialized already. Also we'll need to parameterize +// initialization in the future. +static FRONTEND: OnceLock>> = OnceLock::new(); + +/// Wrapper around `DummyFrontend` that checks sockets are empty on drop +pub struct DummyArkFrontend { + guard: MutexGuard<'static, DummyFrontend>, +} + +impl DummyArkFrontend { + pub fn lock() -> Self { + Self { + guard: Self::get_frontend().lock().unwrap(), + } + } + + fn get_frontend() -> &'static Arc> { + FRONTEND.get_or_init(|| Arc::new(Mutex::new(DummyArkFrontend::init()))) + } + + fn init() -> DummyFrontend { + if FRONTEND.get().is_some() { + panic!("Can't spawn Ark more than once"); + } + + let frontend = DummyFrontend::new(); + let connection_file = frontend.get_connection_file(); + + stdext::spawn!("dummy_kernel", || { + crate::start::start_kernel( + connection_file, + vec![String::from("--no-save"), String::from("--no-restore")], + None, + SessionMode::Console, + false, + ); + }); + + // Wait for startup to complete + RMain::wait_r_initialized(); + + frontend.complete_initialization(); + frontend + } +} + +// Check that we haven't left crumbs behind +impl Drop for DummyArkFrontend { + fn drop(&mut self) { + self.assert_no_incoming() + } +} + +// Allow method calls to be forwarded to inner type +impl Deref for DummyArkFrontend { + type Target = DummyFrontend; + + fn deref(&self) -> &Self::Target { + Deref::deref(&self.guard) + } +} + +impl DerefMut for DummyArkFrontend { + fn deref_mut(&mut self) -> &mut Self::Target { + DerefMut::deref_mut(&mut self.guard) + } +} diff --git a/crates/ark/src/fixtures/mod.rs b/crates/ark/src/fixtures/mod.rs new file mode 100644 index 000000000..4cbec761b --- /dev/null +++ b/crates/ark/src/fixtures/mod.rs @@ -0,0 +1,5 @@ +pub mod dummy_frontend; +pub mod utils; + +pub use dummy_frontend::*; +pub use utils::*; diff --git a/crates/ark/src/test.rs b/crates/ark/src/fixtures/utils.rs similarity index 97% rename from crates/ark/src/test.rs rename to crates/ark/src/fixtures/utils.rs index 14676269a..ea5f7601e 100644 --- a/crates/ark/src/test.rs +++ b/crates/ark/src/fixtures/utils.rs @@ -32,7 +32,7 @@ fn initialize_ark() { INIT.call_once(|| { // Initialize the positron module so tests can use them. // Routines are already registered by `harp::test::r_test()`. - modules::initialize(true).unwrap(); + modules::initialize().unwrap(); }); } @@ -97,7 +97,7 @@ where mod tests { use tree_sitter::Point; - use crate::test::point_from_cursor; + use crate::fixtures::point_from_cursor; #[test] #[rustfmt::skip] diff --git a/crates/ark/src/interface.rs b/crates/ark/src/interface.rs index 61629cef1..804f69dfd 100644 --- a/crates/ark/src/interface.rs +++ b/crates/ark/src/interface.rs @@ -17,7 +17,6 @@ use std::path::PathBuf; use std::result::Result::Ok; use std::sync::Arc; use std::sync::Mutex; -use std::sync::Once; use std::task::Poll; use std::time::Duration; @@ -138,8 +137,15 @@ pub enum SessionMode { // These values must be global in order for them to be accessible from R // callbacks, which do not have a facility for passing or returning context. -/// Ensures that the kernel is only ever initialized once -static INIT: Once = Once::new(); +// We use the `once_cell` crate for init synchronisation because the stdlib +// equivalent `std::sync::Once` does not have a `wait()` method. + +/// Ensures that the kernel is only ever initialized once. Used to wait for the +/// `RMain` singleton initialization in `RMain::wait_initialized()`. +static R_MAIN_INIT: once_cell::sync::OnceCell<()> = once_cell::sync::OnceCell::new(); + +/// Used to wait for complete R startup in `RMain::wait_r_initialized()`. +static R_INIT: once_cell::sync::OnceCell<()> = once_cell::sync::OnceCell::new(); // The global state used by R callbacks. // @@ -149,143 +155,7 @@ static INIT: Once = Once::new(); // `RMain::get_mut()`). static mut R_MAIN: Option = None; -/// Starts the main R thread. Doesn't return. -pub fn start_r( - r_args: Vec, - startup_file: Option, - kernel_mutex: Arc>, - comm_manager_tx: Sender, - r_request_rx: Receiver, - stdin_request_tx: Sender, - stdin_reply_rx: Receiver>, - iopub_tx: Sender, - kernel_init_tx: Bus, - dap: Arc>, - session_mode: SessionMode, -) { - // Initialize global state (ensure we only do this once!) - INIT.call_once(|| unsafe { - R_MAIN_THREAD_ID = Some(std::thread::current().id()); - - // Channels to send/receive tasks from auxiliary threads via `RTask`s - let (tasks_interrupt_tx, tasks_interrupt_rx) = unbounded::(); - let (tasks_idle_tx, tasks_idle_rx) = unbounded::(); - - r_task::initialize(tasks_interrupt_tx.clone(), tasks_idle_tx.clone()); - - R_MAIN = Some(RMain::new( - kernel_mutex, - tasks_interrupt_rx, - tasks_idle_rx, - comm_manager_tx, - r_request_rx, - stdin_request_tx, - stdin_reply_rx, - iopub_tx, - kernel_init_tx, - dap, - session_mode, - )); - }); - - let mut r_args = r_args.clone(); - - // Record if the user has requested that we don't load the site/user level R profiles - let ignore_site_r_profile = startup::should_ignore_site_r_profile(&r_args); - let ignore_user_r_profile = startup::should_ignore_user_r_profile(&r_args); - - // We always manually load site/user level R profiles rather than letting R do it - // to ensure that ark is fully set up before running code that could potentially call - // back into ark internals. - if !ignore_site_r_profile { - startup::push_ignore_site_r_profile(&mut r_args); - } - if !ignore_user_r_profile { - startup::push_ignore_user_r_profile(&mut r_args); - } - - // Build the argument list from the command line arguments. The default - // list is `--interactive` unless altered with the `--` passthrough - // argument. - let mut args = cargs!["ark"]; - for arg in r_args { - args.push(CString::new(arg).unwrap().into_raw()); - } - - // Get `R_HOME`, set by Positron / CI / kernel specification - let r_home = match std::env::var("R_HOME") { - Ok(home) => PathBuf::from(home), - Err(err) => panic!("Can't find `R_HOME`: {err:?}"), - }; - - let libraries = RLibraries::from_r_home_path(&r_home); - libraries.initialize_pre_setup_r(); - - crate::sys::interface::setup_r(args); - - libraries.initialize_post_setup_r(); - - unsafe { - // Register embedded routines - r_register_routines(); - - // Initialize harp (after routine registration) - harp::initialize(); - - // Optionally run a frontend specified R startup script (after harp init) - if let Some(file) = &startup_file { - harp::source(file) - .or_log_error(&format!("Failed to source startup file '{file}' due to")); - } - - // Initialize support functions (after routine registration) - if let Err(err) = modules::initialize(false) { - log::error!("Can't load R modules: {err:?}"); - } - - // Register all hooks once all modules have been imported - let hook_result = RFunction::from(".ps.register_all_hooks").call(); - if let Err(err) = hook_result { - log::error!("Error registering some hooks: {err:?}"); - } - - // Populate srcrefs for namespaces already loaded in the session. - // Namespaces of future loaded packages will be populated on load. - if do_resource_namespaces() { - if let Err(err) = resource_loaded_namespaces() { - log::error!("Can't populate srcrefs for loaded packages: {err:?}"); - } - } - - // Set up the global error handler (after support function initialization) - errors::initialize(); - } - - // Now that R has started (emitting any startup messages), and now that we have set - // up all hooks and handlers, officially finish the R initialization process to - // unblock the kernel-info request and also allow the LSP to start. - RMain::with_mut(|main| { - log::info!( - "R has started and ark handlers have been registered, completing initialization." - ); - main.complete_initialization(); - }); - - // Now that R has started and libr and ark have fully initialized, run site and user - // level R profiles, in that order - if !ignore_site_r_profile { - startup::source_site_r_profile(&r_home); - } - if !ignore_user_r_profile { - startup::source_user_r_profile(); - } - - // Does not return! - crate::sys::interface::run_r(); -} - pub struct RMain { - initializing: bool, kernel_init_tx: Bus, /// Whether we are running in Console, Notebook, or Background mode. @@ -412,6 +282,180 @@ pub enum ConsoleResult { } impl RMain { + /// Starts the main R thread and initializes the `R_MAIN` singleton. + /// Doesn't return. Must be called only once. + pub fn start( + r_args: Vec, + startup_file: Option, + kernel_mutex: Arc>, + comm_manager_tx: Sender, + r_request_rx: Receiver, + stdin_request_tx: Sender, + stdin_reply_rx: Receiver>, + iopub_tx: Sender, + kernel_init_tx: Bus, + dap: Arc>, + session_mode: SessionMode, + ) { + unsafe { R_MAIN_THREAD_ID = Some(std::thread::current().id()) }; + + // Channels to send/receive tasks from auxiliary threads via `RTask`s + let (tasks_interrupt_tx, tasks_interrupt_rx) = unbounded::(); + let (tasks_idle_tx, tasks_idle_rx) = unbounded::(); + + r_task::initialize(tasks_interrupt_tx.clone(), tasks_idle_tx.clone()); + + unsafe { + R_MAIN = Some(RMain::new( + kernel_mutex, + tasks_interrupt_rx, + tasks_idle_rx, + comm_manager_tx, + r_request_rx, + stdin_request_tx, + stdin_reply_rx, + iopub_tx, + kernel_init_tx, + dap, + session_mode, + )) + }; + + // Let other threads know that `R_MAIN` is initialized. Deliberately + // panic if already set as `start()` must be called only once. + R_MAIN_INIT.set(()).expect("R can only be initialized once"); + + let mut r_args = r_args.clone(); + + // Record if the user has requested that we don't load the site/user level R profiles + let ignore_site_r_profile = startup::should_ignore_site_r_profile(&r_args); + let ignore_user_r_profile = startup::should_ignore_user_r_profile(&r_args); + + // We always manually load site/user level R profiles rather than letting R do it + // to ensure that ark is fully set up before running code that could potentially call + // back into ark internals. + if !ignore_site_r_profile { + startup::push_ignore_site_r_profile(&mut r_args); + } + if !ignore_user_r_profile { + startup::push_ignore_user_r_profile(&mut r_args); + } + + // Build the argument list from the command line arguments. The default + // list is `--interactive` unless altered with the `--` passthrough + // argument. + let mut args = cargs!["ark"]; + for arg in r_args { + args.push(CString::new(arg).unwrap().into_raw()); + } + + // Get `R_HOME`, typically set by Positron / CI / kernel specification + let r_home = match std::env::var("R_HOME") { + Ok(home) => PathBuf::from(home), + Err(_) => { + // Get `R_HOME` from `PATH`, via R + let Ok(result) = std::process::Command::new("R").arg("RHOME").output() else { + panic!("Can't find R or `R_HOME`"); + }; + let r_home = String::from_utf8(result.stdout).unwrap(); + let r_home = r_home.trim(); + std::env::set_var("R_HOME", r_home); + PathBuf::from(r_home) + }, + }; + + let libraries = RLibraries::from_r_home_path(&r_home); + libraries.initialize_pre_setup_r(); + + crate::sys::interface::setup_r(args); + + libraries.initialize_post_setup_r(); + + unsafe { + // Register embedded routines + r_register_routines(); + + // Initialize harp (after routine registration) + harp::initialize(); + + // Optionally run a frontend specified R startup script (after harp init) + if let Some(file) = &startup_file { + harp::source(file) + .or_log_error(&format!("Failed to source startup file '{file}' due to")); + } + + // Initialize support functions (after routine registration) + if let Err(err) = modules::initialize() { + log::error!("Can't load R modules: {err:?}"); + } + + // Register all hooks once all modules have been imported + let hook_result = RFunction::from(".ps.register_all_hooks").call(); + if let Err(err) = hook_result { + log::error!("Error registering some hooks: {err:?}"); + } + + // Populate srcrefs for namespaces already loaded in the session. + // Namespaces of future loaded packages will be populated on load. + if do_resource_namespaces() { + if let Err(err) = resource_loaded_namespaces() { + log::error!("Can't populate srcrefs for loaded packages: {err:?}"); + } + } + + // Set up the global error handler (after support function initialization) + errors::initialize(); + + // Now that R has started (emitting any startup messages), and now that we have set + // up all hooks and handlers, officially finish the R initialization process to + // unblock the kernel-info request and also allow the LSP to start. + RMain::with_mut(|main| { + log::info!("R has started and ark handlers have been registered, completing initialization."); + main.complete_initialization(); + }); + } + + // Now that R has started and libr and ark have fully initialized, run site and user + // level R profiles, in that order + if !ignore_site_r_profile { + startup::source_site_r_profile(&r_home); + } + if !ignore_user_r_profile { + startup::source_user_r_profile(); + } + + // Does not return! + crate::sys::interface::run_r(); + } + + /// Completes the kernel's initialization. + /// Unlike `RMain::start()`, this has access to `R_MAIN`'s state, such as + /// the kernel-info banner. + /// SAFETY: Can only be called from the R thread, and only once. + pub unsafe fn complete_initialization(&mut self) { + let version = unsafe { + let version = Rf_findVarInFrame(R_BaseNamespace, r_symbol!("R.version.string")); + RObject::new(version).to::().unwrap() + }; + + // Initial input and continuation prompts + let input_prompt: String = harp::get_option("prompt").try_into().unwrap(); + let continuation_prompt: String = harp::get_option("continue").try_into().unwrap(); + + let kernel_info = KernelInfo { + version: version.clone(), + banner: self.banner_output.clone(), + input_prompt: Some(input_prompt), + continuation_prompt: Some(continuation_prompt), + }; + + log::info!("Sending kernel info: {version}"); + self.kernel_init_tx.broadcast(kernel_info); + + // Thread-safe initialisation flag for R + R_INIT.set(()).expect("`R_INIT` can only be set once"); + } + pub fn new( kernel: Arc>, tasks_interrupt_rx: Receiver, @@ -426,7 +470,6 @@ impl RMain { session_mode: SessionMode, ) -> Self { Self { - initializing: true, r_request_rx, comm_manager_tx, stdin_request_tx, @@ -453,6 +496,36 @@ impl RMain { } } + /// Wait for complete R initialization + /// + /// Wait for R being ready to evaluate R code. Resolves as the same time as + /// the `Bus` init channel does. + /// + /// Thread-safe. + pub fn wait_r_initialized() { + R_INIT.wait(); + } + + /// Has R completed initialization + /// + /// I.e. is it ready to evaluate R code. + /// + /// Thread-safe. + pub fn is_r_initialized() -> bool { + R_INIT.get().is_some() + } + + /// Has the `RMain` singleton completed initialization. + /// + /// This can return true when R might still not have finished starting up. + /// See `wait_r_initialized()`. + /// + /// Thread-safe. But note you can only get access to the singleton on the R + /// thread. + pub fn is_initialized() -> bool { + R_MAIN_INIT.get().is_some() + } + /// Access a reference to the singleton instance of this struct /// /// SAFETY: Accesses must occur after `start_r()` initializes it, and must @@ -461,16 +534,6 @@ impl RMain { RMain::get_mut() } - /// Indicate whether RMain has been created and is initialized. - pub fn initialized() -> bool { - unsafe { - match R_MAIN { - Some(ref main) => !main.initializing, - None => false, - } - } - } - /// Access a mutable reference to the singleton instance of this struct /// /// SAFETY: Accesses must occur after `start_r()` initializes it, and must @@ -515,33 +578,6 @@ impl RMain { thread.id() == unsafe { R_MAIN_THREAD_ID.unwrap() } } - /// Completes the kernel's initialization - pub fn complete_initialization(&mut self) { - if self.initializing { - let version = unsafe { - let version = Rf_findVarInFrame(R_BaseNamespace, r_symbol!("R.version.string")); - RObject::new(version).to::().unwrap() - }; - - // Initial input and continuation prompts - let input_prompt: String = harp::get_option("prompt").try_into().unwrap(); - let continuation_prompt: String = harp::get_option("continue").try_into().unwrap(); - - let kernel_info = KernelInfo { - version: version.clone(), - banner: self.banner_output.clone(), - input_prompt: Some(input_prompt), - continuation_prompt: Some(continuation_prompt), - }; - - log::info!("Sending kernel info: {version}"); - self.kernel_init_tx.broadcast(kernel_info); - self.initializing = false; - } else { - log::warn!("Initialization already complete!"); - } - } - /// Provides read-only access to `iopub_tx` pub fn get_iopub_tx(&self) -> &Sender { &self.iopub_tx @@ -1288,7 +1324,7 @@ This is a Positron limitation we plan to fix. In the meantime, you can: Stream::Stderr }; - if self.initializing { + if !RMain::is_r_initialized() { // During init, consider all output to be part of the startup banner self.banner_output.push_str(&content); return; @@ -1661,6 +1697,12 @@ pub extern "C" fn r_busy(which: i32) { main.busy(which); } +#[no_mangle] +pub extern "C" fn r_suicide(buf: *const c_char) { + let msg = unsafe { CStr::from_ptr(buf) }; + panic!("Suicide: {}", msg.to_str().unwrap()); +} + #[no_mangle] pub unsafe extern "C" fn r_polled_events() { let main = RMain::get_mut(); diff --git a/crates/ark/src/kernel.rs b/crates/ark/src/kernel.rs index 6aa2f360b..d610dc114 100644 --- a/crates/ark/src/kernel.rs +++ b/crates/ark/src/kernel.rs @@ -75,7 +75,7 @@ impl Kernel { r_task::spawn_interrupt(|| async move { // Get the current busy status - let busy = if RMain::initialized() { + let busy = if RMain::is_initialized() { RMain::get().is_busy } else { false diff --git a/crates/ark/src/lib.rs b/crates/ark/src/lib.rs index 4654244e4..8dcbaf379 100644 --- a/crates/ark/src/lib.rs +++ b/crates/ark/src/lib.rs @@ -29,9 +29,10 @@ pub mod request; pub mod shell; pub mod signals; pub mod srcref; +pub mod start; pub mod startup; pub mod sys; -pub mod test; +pub mod fixtures; pub mod thread; pub mod traps; pub mod treesitter; diff --git a/crates/ark/src/lsp/completions/sources/composite.rs b/crates/ark/src/lsp/completions/sources/composite.rs index 953df4f07..62de6f5f5 100644 --- a/crates/ark/src/lsp/completions/sources/composite.rs +++ b/crates/ark/src/lsp/completions/sources/composite.rs @@ -153,7 +153,7 @@ mod tests { use crate::lsp::completions::sources::composite::is_identifier_like; use crate::lsp::document_context::DocumentContext; use crate::lsp::documents::Document; - use crate::test::r_test; + use crate::fixtures::r_test; use crate::treesitter::NodeType; use crate::treesitter::NodeTypeExt; diff --git a/crates/ark/src/lsp/completions/sources/composite/call.rs b/crates/ark/src/lsp/completions/sources/composite/call.rs index 3a78b8ec7..7528d0c13 100644 --- a/crates/ark/src/lsp/completions/sources/composite/call.rs +++ b/crates/ark/src/lsp/completions/sources/composite/call.rs @@ -289,7 +289,7 @@ mod tests { use crate::lsp::completions::sources::composite::call::completions_from_call; use crate::lsp::document_context::DocumentContext; use crate::lsp::documents::Document; - use crate::test::r_test; + use crate::fixtures::r_test; #[test] fn test_completions_after_user_types_part_of_an_argument_name() { diff --git a/crates/ark/src/lsp/completions/sources/composite/pipe.rs b/crates/ark/src/lsp/completions/sources/composite/pipe.rs index b9fc5ec3d..4951742bd 100644 --- a/crates/ark/src/lsp/completions/sources/composite/pipe.rs +++ b/crates/ark/src/lsp/completions/sources/composite/pipe.rs @@ -174,7 +174,7 @@ mod tests { use crate::lsp::completions::sources::composite::pipe::find_pipe_root; use crate::lsp::document_context::DocumentContext; use crate::lsp::documents::Document; - use crate::test::r_test; + use crate::fixtures::r_test; #[test] fn test_find_pipe_root_works_with_native_and_magrittr() { diff --git a/crates/ark/src/lsp/completions/sources/composite/subset.rs b/crates/ark/src/lsp/completions/sources/composite/subset.rs index 7bba1e302..0097a674a 100644 --- a/crates/ark/src/lsp/completions/sources/composite/subset.rs +++ b/crates/ark/src/lsp/completions/sources/composite/subset.rs @@ -79,7 +79,7 @@ mod tests { use crate::lsp::completions::sources::composite::subset::completions_from_subset; use crate::lsp::document_context::DocumentContext; use crate::lsp::documents::Document; - use crate::test::r_test; + use crate::fixtures::r_test; #[test] fn test_subset_completions() { diff --git a/crates/ark/src/lsp/completions/sources/unique/comment.rs b/crates/ark/src/lsp/completions/sources/unique/comment.rs index acff7aa7f..6c7750fdd 100644 --- a/crates/ark/src/lsp/completions/sources/unique/comment.rs +++ b/crates/ark/src/lsp/completions/sources/unique/comment.rs @@ -129,7 +129,7 @@ fn test_comment() { use tree_sitter::Point; use crate::lsp::documents::Document; - use crate::test::r_test; + use crate::fixtures::r_test; r_test(|| { // If not in a comment, return `None` @@ -154,7 +154,7 @@ fn test_roxygen_comment() { use tree_sitter::Point; use crate::lsp::documents::Document; - use crate::test::r_test; + use crate::fixtures::r_test; r_test(|| unsafe { let installed = RFunction::new("", ".ps.is_installed") diff --git a/crates/ark/src/lsp/completions/sources/unique/custom.rs b/crates/ark/src/lsp/completions/sources/unique/custom.rs index 9b7ac7911..4f0396bb6 100644 --- a/crates/ark/src/lsp/completions/sources/unique/custom.rs +++ b/crates/ark/src/lsp/completions/sources/unique/custom.rs @@ -228,8 +228,8 @@ mod tests { use crate::lsp::completions::sources::unique::custom::completions_from_custom_source; use crate::lsp::document_context::DocumentContext; use crate::lsp::documents::Document; - use crate::test::point_from_cursor; - use crate::test::r_test; + use crate::fixtures::point_from_cursor; + use crate::fixtures::r_test; #[test] fn test_completion_custom_library() { diff --git a/crates/ark/src/lsp/completions/sources/unique/extractor.rs b/crates/ark/src/lsp/completions/sources/unique/extractor.rs index e5f4443de..2bb7df52c 100644 --- a/crates/ark/src/lsp/completions/sources/unique/extractor.rs +++ b/crates/ark/src/lsp/completions/sources/unique/extractor.rs @@ -168,8 +168,8 @@ mod tests { use crate::lsp::completions::sources::unique::extractor::completions_from_dollar; use crate::lsp::document_context::DocumentContext; use crate::lsp::documents::Document; - use crate::test::point_from_cursor; - use crate::test::r_test; + use crate::fixtures::point_from_cursor; + use crate::fixtures::r_test; #[test] fn test_dollar_completions() { diff --git a/crates/ark/src/lsp/completions/sources/unique/namespace.rs b/crates/ark/src/lsp/completions/sources/unique/namespace.rs index ce31a0ab8..5a00836c2 100644 --- a/crates/ark/src/lsp/completions/sources/unique/namespace.rs +++ b/crates/ark/src/lsp/completions/sources/unique/namespace.rs @@ -225,7 +225,7 @@ mod tests { use crate::lsp::completions::sources::unique::namespace::completions_from_namespace; use crate::lsp::document_context::DocumentContext; use crate::lsp::documents::Document; - use crate::test::r_test; + use crate::fixtures::r_test; #[test] fn test_completions_after_colons() { diff --git a/crates/ark/src/lsp/completions/sources/unique/string.rs b/crates/ark/src/lsp/completions/sources/unique/string.rs index 7577bdaf1..bf01954b7 100644 --- a/crates/ark/src/lsp/completions/sources/unique/string.rs +++ b/crates/ark/src/lsp/completions/sources/unique/string.rs @@ -61,8 +61,8 @@ mod tests { use crate::lsp::completions::sources::unique::string::completions_from_string; use crate::lsp::document_context::DocumentContext; use crate::lsp::documents::Document; - use crate::test::point_from_cursor; - use crate::test::r_test; + use crate::fixtures::point_from_cursor; + use crate::fixtures::r_test; use crate::treesitter::node_find_string; use crate::treesitter::NodeTypeExt; diff --git a/crates/ark/src/lsp/completions/sources/unique/subset.rs b/crates/ark/src/lsp/completions/sources/unique/subset.rs index 4cb61c353..7c61b774e 100644 --- a/crates/ark/src/lsp/completions/sources/unique/subset.rs +++ b/crates/ark/src/lsp/completions/sources/unique/subset.rs @@ -162,8 +162,8 @@ mod tests { use crate::lsp::completions::sources::unique::subset::completions_from_string_subset; use crate::lsp::document_context::DocumentContext; use crate::lsp::documents::Document; - use crate::test::point_from_cursor; - use crate::test::r_test; + use crate::fixtures::point_from_cursor; + use crate::fixtures::r_test; use crate::treesitter::node_find_string; #[test] diff --git a/crates/ark/src/lsp/completions/sources/utils.rs b/crates/ark/src/lsp/completions/sources/utils.rs index 562af6acd..231f4d81e 100644 --- a/crates/ark/src/lsp/completions/sources/utils.rs +++ b/crates/ark/src/lsp/completions/sources/utils.rs @@ -264,7 +264,7 @@ mod tests { use crate::lsp::completions::sources::utils::CallNodePositionType; use crate::lsp::document_context::DocumentContext; use crate::lsp::documents::Document; - use crate::test::r_test; + use crate::fixtures::r_test; use crate::treesitter::NodeType; use crate::treesitter::NodeTypeExt; diff --git a/crates/ark/src/lsp/diagnostics.rs b/crates/ark/src/lsp/diagnostics.rs index c98e7daa8..ad093aad2 100644 --- a/crates/ark/src/lsp/diagnostics.rs +++ b/crates/ark/src/lsp/diagnostics.rs @@ -1034,7 +1034,7 @@ mod tests { use crate::lsp::diagnostics::generate_diagnostics; use crate::lsp::documents::Document; use crate::lsp::state::WorldState; - use crate::test::r_test; + use crate::fixtures::r_test; // Default state that includes installed packages and default scopes. static DEFAULT_STATE: Lazy = Lazy::new(|| current_state()); diff --git a/crates/ark/src/lsp/help.rs b/crates/ark/src/lsp/help.rs index d2522bd61..80e3606b5 100644 --- a/crates/ark/src/lsp/help.rs +++ b/crates/ark/src/lsp/help.rs @@ -373,7 +373,7 @@ fn for_each_section(doc: &Html, mut callback: impl FnMut(ElementRef, Vec = Cell::new(false); } -fn start_kernel( - connection_file: ConnectionFile, - r_args: Vec, - startup_file: Option, - session_mode: SessionMode, - capture_streams: bool, -) { - // Create a new kernel from the connection file - let mut kernel = match Kernel::new("ark", connection_file) { - Ok(k) => k, - Err(err) => { - log::error!("Failed to create kernel: {err}"); - return; - }, - }; - - // Create the channels used for communication. These are created here - // as they need to be shared across different components / threads. - let iopub_tx = kernel.create_iopub_tx(); - - // A broadcast channel (bus) used to notify clients when the kernel - // has finished initialization. - let mut kernel_init_tx = Bus::new(1); - - // A channel pair used for shell requests. - // These events are used to manage the runtime state, and also to - // handle message delivery, among other things. - let (r_request_tx, r_request_rx) = bounded::(1); - let (kernel_request_tx, kernel_request_rx) = bounded::(1); - - // Create the LSP and DAP clients. - // Not all Amalthea kernels provide these, but ark does. - // They must be able to deliver messages to the shell channel directly. - let lsp = Arc::new(Mutex::new(lsp::handler::Lsp::new(kernel_init_tx.add_rx()))); - - // DAP needs the `RRequest` channel to communicate with - // `read_console()` and send commands to the debug interpreter - let dap = dap::Dap::new_shared(r_request_tx.clone()); - - // Communication channel between the R main thread and the Amalthea - // StdIn socket thread - let (stdin_request_tx, stdin_request_rx) = bounded::(1); - - // Communication channel for `CommEvent` - let comm_manager_tx = kernel.create_comm_manager_tx(); - - // Create the shell. - let kernel_init_rx = kernel_init_tx.add_rx(); - let shell = Shell::new( - comm_manager_tx.clone(), - iopub_tx.clone(), - r_request_tx.clone(), - stdin_request_tx.clone(), - kernel_init_rx, - kernel_request_tx, - kernel_request_rx, - session_mode.clone(), - ); - - // Create the control handler; this is used to handle shutdown/interrupt and - // related requests - let control = Arc::new(Mutex::new(Control::new(r_request_tx.clone()))); - - // Create the stream behavior; this determines whether the kernel should - // capture stdout/stderr and send them to the frontend as IOPub messages - let stream_behavior = match capture_streams { - true => amalthea::kernel::StreamBehavior::Capture, - false => amalthea::kernel::StreamBehavior::None, - }; - - // Create the kernel - let kernel_clone = shell.kernel.clone(); - let shell = Arc::new(Mutex::new(shell)); - - let (stdin_reply_tx, stdin_reply_rx) = unbounded(); - - let res = kernel.connect( - shell, - control, - Some(lsp), - Some(dap.clone()), - stream_behavior, - stdin_request_rx, - stdin_reply_tx, - ); - if let Err(err) = res { - panic!("Couldn't connect to frontend: {err:?}"); - } - - // Start the R REPL (does not return for the duration of the session) - ark::interface::start_r( - r_args, - startup_file, - kernel_clone, - comm_manager_tx, - r_request_rx, - stdin_request_tx, - stdin_reply_rx, - iopub_tx, - kernel_init_tx, - dap, - session_mode, - ) -} - -// Installs the kernelspec JSON file into one of Jupyter's search paths. -fn install_kernel_spec() { - // Create the environment set for the kernel spec - let mut env = serde_json::Map::new(); - - // Detect the active version of R and set the R_HOME environment variable - // accordingly - let r_version = detect_r().unwrap(); - env.insert( - "R_HOME".to_string(), - serde_json::Value::String(r_version.r_home.clone()), - ); - - // Point `LD_LIBRARY_PATH` to a folder with some `libR.so`. It doesn't - // matter which one, but the linker needs to be able to find a file of that - // name, even though we won't use it for symbol resolution. - // https://github.com/posit-dev/positron/issues/1619#issuecomment-1971552522 - if cfg!(target_os = "linux") { - let lib = format!("{}/lib", r_version.r_home.clone()); - env.insert("LD_LIBRARY_PATH".into(), serde_json::Value::String(lib)); - } - - // Create the kernelspec - let exe_path = unwrap!(env::current_exe(), Err(error) => { - eprintln!("Failed to determine path to Ark. {}", error); - return; - }); - - let spec = KernelSpec { - argv: vec![ - String::from(exe_path.to_string_lossy()), - String::from("--connection_file"), - String::from("{connection_file}"), - String::from("--session-mode"), - String::from("notebook"), - ], - language: String::from("R"), - display_name: String::from("Ark R Kernel"), - env, - }; - - let dest = unwrap!(spec.install(String::from("ark")), Err(error) => { - eprintln!("Failed to install Ark's Jupyter kernelspec. {}", error); - return; - }); - - println!( - "Successfully installed Ark Jupyter kernelspec. - - R ({}.{}.{}): {} - Kernel: {} - ", - r_version.major, - r_version.minor, - r_version.patch, - r_version.r_home, - dest.to_string_lossy() - ); -} - -fn parse_file( - connection_file: &String, - r_args: Vec, - startup_file: Option, - session_mode: SessionMode, - capture_streams: bool, -) { - match ConnectionFile::from_file(connection_file) { - Ok(connection) => { - log::info!("Loaded connection information from frontend in {connection_file}"); - log::info!("Connection data: {:?}", connection); - start_kernel( - connection, - r_args, - startup_file, - session_mode, - capture_streams, - ); - }, - Err(error) => { - log::error!("Couldn't read connection file {connection_file}: {error:?}"); - }, - } -} - fn print_usage() { println!("Ark {}, an R Kernel.", env!("CARGO_PKG_VERSION")); println!( @@ -504,3 +303,81 @@ fn main() { ); } } + +fn parse_file( + connection_file: &String, + r_args: Vec, + startup_file: Option, + session_mode: SessionMode, + capture_streams: bool, +) { + match ConnectionFile::from_file(connection_file) { + Ok(connection) => { + log::info!("Loaded connection information from frontend in {connection_file}"); + log::info!("Connection data: {:?}", connection); + start_kernel( + connection, + r_args, + startup_file, + session_mode, + capture_streams, + ); + }, + Err(error) => { + log::error!("Couldn't read connection file {connection_file}: {error:?}"); + }, + } +} + +// Install the kernelspec JSON file into one of Jupyter's search paths. +fn install_kernel_spec() { + // Create the environment set for the kernel spec + let mut env = serde_json::Map::new(); + + // Workaround for https://github.com/posit-dev/positron/issues/2098 + env.insert("RUST_LOG".into(), serde_json::Value::String("error".into())); + + // Point `LD_LIBRARY_PATH` to a folder with some `libR.so`. It doesn't + // matter which one, but the linker needs to be able to find a file of that + // name, even though we won't use it for symbol resolution. + // https://github.com/posit-dev/positron/issues/1619#issuecomment-1971552522 + if cfg!(target_os = "linux") { + // Detect the active version of R + let r_version = detect_r().unwrap(); + + let lib = format!("{}/lib", r_version.r_home.clone()); + env.insert("LD_LIBRARY_PATH".into(), serde_json::Value::String(lib)); + } + + // Create the kernelspec + let exe_path = unwrap!(env::current_exe(), Err(error) => { + eprintln!("Failed to determine path to Ark. {}", error); + return; + }); + + let spec = KernelSpec { + argv: vec![ + String::from(exe_path.to_string_lossy()), + String::from("--connection_file"), + String::from("{connection_file}"), + String::from("--session-mode"), + String::from("notebook"), + ], + language: String::from("R"), + display_name: String::from("Ark R Kernel"), + env, + }; + + let dest = unwrap!(spec.install(String::from("ark")), Err(err) => { + eprintln!("Failed to install Ark's Jupyter kernelspec. {err}"); + return; + }); + + println!( + "Successfully installed Ark Jupyter kernelspec. + + Kernel: {} + ", + dest.to_string_lossy() + ); +} diff --git a/crates/ark/src/modules.rs b/crates/ark/src/modules.rs index 4d9f70095..7ed403024 100644 --- a/crates/ark/src/modules.rs +++ b/crates/ark/src/modules.rs @@ -11,6 +11,7 @@ use harp::environment::R_ENVS; use harp::exec::RFunction; use harp::exec::RFunctionExt; use harp::r_symbol; +use harp::test::IS_TESTING; use harp::utils::r_poke_option; use libr::Rf_ScalarLogical; use libr::SEXP; @@ -74,9 +75,9 @@ pub struct ArkEnvs { pub rstudio_ns: SEXP, } -pub fn initialize(testing: bool) -> anyhow::Result<()> { +pub fn initialize() -> anyhow::Result<()> { // If we are `testing`, set the corresponding R level global option - if testing { + if IS_TESTING { r_poke_option_ark_testing() } @@ -291,7 +292,7 @@ mod tests { use harp::environment::Environment; use libr::CLOENV; - use crate::test::r_test; + use crate::fixtures::r_test; fn get_namespace(exports: Environment, fun: &str) -> Environment { let fun = exports.find(fun).unwrap(); diff --git a/crates/ark/src/start.rs b/crates/ark/src/start.rs new file mode 100644 index 000000000..bce4a707b --- /dev/null +++ b/crates/ark/src/start.rs @@ -0,0 +1,129 @@ +// +// start.rs +// +// Copyright (C) 2023-2024 Posit Software, PBC. All rights reserved. +// +// + +use std::sync::Arc; +use std::sync::Mutex; + +use amalthea::connection_file::ConnectionFile; +use amalthea::socket::stdin::StdInRequest; +use bus::Bus; +use crossbeam::channel::bounded; +use crossbeam::channel::unbounded; + +use crate::control::Control; +use crate::dap; +use crate::interface::SessionMode; +use crate::lsp; +use crate::request::KernelRequest; +use crate::request::RRequest; +use crate::shell::Shell; + +/// Exported for unit tests. Does not return. +pub fn start_kernel( + connection_file: ConnectionFile, + r_args: Vec, + startup_file: Option, + session_mode: SessionMode, + capture_streams: bool, +) { + // Create a new kernel from the connection file + let mut kernel = match amalthea::kernel::Kernel::new("ark", connection_file) { + Ok(k) => k, + Err(err) => { + log::error!("Failed to create kernel: {err}"); + return; + }, + }; + + // Create the channels used for communication. These are created here + // as they need to be shared across different components / threads. + let iopub_tx = kernel.create_iopub_tx(); + + // A broadcast channel (bus) used to notify clients when the kernel + // has finished initialization. + let mut kernel_init_tx = Bus::new(1); + + // A channel pair used for shell requests. + // These events are used to manage the runtime state, and also to + // handle message delivery, among other things. + let (r_request_tx, r_request_rx) = bounded::(1); + let (kernel_request_tx, kernel_request_rx) = bounded::(1); + + // Create the LSP and DAP clients. + // Not all Amalthea kernels provide these, but ark does. + // They must be able to deliver messages to the shell channel directly. + let lsp = Arc::new(Mutex::new(lsp::handler::Lsp::new(kernel_init_tx.add_rx()))); + + // DAP needs the `RRequest` channel to communicate with + // `read_console()` and send commands to the debug interpreter + let dap = dap::Dap::new_shared(r_request_tx.clone()); + + // Communication channel between the R main thread and the Amalthea + // StdIn socket thread + let (stdin_request_tx, stdin_request_rx) = bounded::(1); + + // Communication channel for `CommEvent` + let comm_manager_tx = kernel.create_comm_manager_tx(); + + // Create the shell. + let kernel_init_rx = kernel_init_tx.add_rx(); + let shell = Shell::new( + comm_manager_tx.clone(), + iopub_tx.clone(), + r_request_tx.clone(), + stdin_request_tx.clone(), + kernel_init_rx, + kernel_request_tx, + kernel_request_rx, + session_mode.clone(), + ); + + // Create the control handler; this is used to handle shutdown/interrupt and + // related requests + let control = Arc::new(Mutex::new(Control::new(r_request_tx.clone()))); + + // Create the stream behavior; this determines whether the kernel should + // capture stdout/stderr and send them to the frontend as IOPub messages + let stream_behavior = match capture_streams { + true => amalthea::kernel::StreamBehavior::Capture, + false => amalthea::kernel::StreamBehavior::None, + }; + + // Create the kernel + let kernel_clone = shell.kernel.clone(); + let shell = Arc::new(Mutex::new(shell)); + + let (stdin_reply_tx, stdin_reply_rx) = unbounded(); + + let res = kernel.connect( + shell, + control, + Some(lsp), + Some(dap.clone()), + stream_behavior, + stdin_request_rx, + stdin_reply_tx, + ); + if let Err(err) = res { + panic!("Couldn't connect to frontend: {err:?}"); + } + + // Start the R REPL (does not return for the duration of the session) + crate::interface::RMain::start( + r_args, + startup_file, + kernel_clone, + comm_manager_tx, + r_request_rx, + stdin_request_tx, + stdin_reply_rx, + iopub_tx, + kernel_init_tx, + dap, + session_mode, + ) +} diff --git a/crates/ark/src/sys/unix/interface.rs b/crates/ark/src/sys/unix/interface.rs index 5ab6d8980..56856f6da 100644 --- a/crates/ark/src/sys/unix/interface.rs +++ b/crates/ark/src/sys/unix/interface.rs @@ -11,6 +11,7 @@ use std::os::raw::c_char; use libr::ptr_R_Busy; use libr::ptr_R_ReadConsole; use libr::ptr_R_ShowMessage; +use libr::ptr_R_Suicide; use libr::ptr_R_WriteConsole; use libr::ptr_R_WriteConsoleEx; use libr::run_Rmainloop; @@ -32,6 +33,7 @@ use crate::interface::r_busy; use crate::interface::r_polled_events; use crate::interface::r_read_console; use crate::interface::r_show_message; +use crate::interface::r_suicide; use crate::interface::r_write_console; use crate::signals::initialize_signal_handlers; @@ -64,6 +66,18 @@ pub fn setup_r(mut args: Vec<*mut c_char>) { libr::set(ptr_R_ReadConsole, Some(r_read_console)); libr::set(ptr_R_ShowMessage, Some(r_show_message)); libr::set(ptr_R_Busy, Some(r_busy)); + libr::set(ptr_R_Suicide, Some(r_suicide)); + + // In tests R may be run from various threads. This confuses R's stack + // overflow checks so we disable those. This should not make it in + // production builds as it causes stack overflows to crash R instead of + // throwing an R error. + // + // This must be called _after_ `Rf_initialize_R()`, since that's where R + // detects the stack size and sets the default limit. + if harp::test::IS_TESTING { + libr::set(libr::R_CStackLimit, usize::MAX); + } // Set up main loop setup_Rmainloop(); diff --git a/crates/ark/src/sys/windows/interface.rs b/crates/ark/src/sys/windows/interface.rs index 94698b305..6a5a9e2e9 100644 --- a/crates/ark/src/sys/windows/interface.rs +++ b/crates/ark/src/sys/windows/interface.rs @@ -74,6 +74,7 @@ pub fn setup_r(mut _args: Vec<*mut c_char>) { (*params).ShowMessage = Some(r_show_message); (*params).YesNoCancel = Some(r_yes_no_cancel); (*params).Busy = Some(r_busy); + (*params).Suicide = Some(r_suicide); // This is assigned to `ptr_ProcessEvents` (which we don't set on Unix), // in `R_SetParams()` by `R_SetWin32()` and gets called by `R_ProcessEvents()`. diff --git a/crates/ark/tests/connections.rs b/crates/ark/tests/connections.rs index cc8875832..a1af21b72 100644 --- a/crates/ark/tests/connections.rs +++ b/crates/ark/tests/connections.rs @@ -14,8 +14,8 @@ use ark::connections::r_connection::Metadata; use ark::connections::r_connection::RConnection; use ark::modules::ARK_ENVS; use ark::r_task::r_task; -use ark::test::r_test; -use ark::test::socket_rpc_request; +use ark::fixtures::r_test; +use ark::fixtures::socket_rpc_request; use crossbeam::channel::bounded; use harp::exec::RFunction; use harp::object::RObject; diff --git a/crates/ark/tests/data_explorer.rs b/crates/ark/tests/data_explorer.rs index c5285aefa..a924195aa 100644 --- a/crates/ark/tests/data_explorer.rs +++ b/crates/ark/tests/data_explorer.rs @@ -57,8 +57,8 @@ use ark::data_explorer::r_data_explorer::DataObjectEnvInfo; use ark::data_explorer::r_data_explorer::RDataExplorer; use ark::lsp::events::EVENTS; use ark::r_task::r_task; -use ark::test::r_test; -use ark::test::socket_rpc_request; +use ark::fixtures::r_test; +use ark::fixtures::socket_rpc_request; use ark::thread::RThreadSafe; use crossbeam::channel::bounded; use harp::environment::R_ENVS; diff --git a/crates/ark/tests/help.rs b/crates/ark/tests/help.rs index 6e2880b06..0ccc85016 100644 --- a/crates/ark/tests/help.rs +++ b/crates/ark/tests/help.rs @@ -16,7 +16,7 @@ use amalthea::socket::comm::CommSocket; use ark::help::r_help::RHelp; use ark::help_proxy; use ark::r_task::r_task; -use ark::test::r_test; +use ark::fixtures::r_test; use harp::exec::RFunction; /** diff --git a/crates/ark/tests/kernel.rs b/crates/ark/tests/kernel.rs new file mode 100644 index 000000000..7e641e515 --- /dev/null +++ b/crates/ark/tests/kernel.rs @@ -0,0 +1,35 @@ +use amalthea::wire::jupyter_message::Message; +use amalthea::wire::jupyter_message::Status; +use amalthea::wire::kernel_info_request::KernelInfoRequest; +use ark::fixtures::DummyArkFrontend; +use stdext::assert_match; + +#[test] +fn test_kernel_info() { + let frontend = DummyArkFrontend::lock(); + + frontend.send_shell(KernelInfoRequest {}); + + assert_match!(frontend.recv_shell(), Message::KernelInfoReply(reply) => { + assert_eq!(reply.content.language_info.name, "R"); + }); + + frontend.recv_iopub_busy(); + frontend.recv_iopub_idle(); +} + +#[test] +fn test_execute_request() { + let frontend = DummyArkFrontend::lock(); + + frontend.send_execute_request("42"); + frontend.recv_iopub_busy(); + + assert_eq!(frontend.recv_iopub_execute_input().code, "42"); + assert_eq!(frontend.recv_iopub_execute_result(), "[1] 42"); + + frontend.recv_iopub_idle(); + + let reply = frontend.recv_shell_execute_reply(); + assert_eq!(reply.status, Status::Ok); +} diff --git a/crates/ark/tests/ui.rs b/crates/ark/tests/ui.rs index 8ffec3ae8..19075744f 100644 --- a/crates/ark/tests/ui.rs +++ b/crates/ark/tests/ui.rs @@ -16,7 +16,7 @@ use amalthea::socket::comm::CommInitiator; use amalthea::socket::comm::CommSocket; use amalthea::socket::stdin::StdInRequest; use ark::r_task::r_task; -use ark::test::r_test; +use ark::fixtures::r_test; use ark::ui::UiComm; use ark::ui::UiCommMessage; use crossbeam::channel::bounded; diff --git a/crates/harp/Cargo.toml b/crates/harp/Cargo.toml index 56b7680db..0a10d3d2f 100644 --- a/crates/harp/Cargo.toml +++ b/crates/harp/Cargo.toml @@ -28,3 +28,6 @@ serde = { version = "1.0.183", features = ["derive"] } serde_json = { version = "1.0.94", features = ["preserve_order"]} rust-embed = "8.2.0" tracing-error = "0.2.0" + +[features] +testing = [] diff --git a/crates/harp/src/test.rs b/crates/harp/src/test.rs index 194f3507a..a327286e3 100644 --- a/crates/harp/src/test.rs +++ b/crates/harp/src/test.rs @@ -35,6 +35,30 @@ use crate::R_MAIN_THREAD_ID; pub static mut R_TASK_BYPASS: bool = false; static mut R_RUNTIME_LOCK: Mutex<()> = Mutex::new(()); +// This global variable is a workaround to enable test-only features or +// behaviour in integration tests (i.e. tests that live in `crate/tests/` as +// opposed to tests living in `crate/src/`). +// +// - Unfortunately we can't use `cfg(test)` in integration tests because they +// are treated as an external crate. +// +// - Unfortunately we cannot move some of our integration tests to `src/` +// because they must be run in their own process (e.g. because they are +// running R). +// +// - Unfortunately we can't use the workaround described in +// https://github.com/rust-lang/cargo/issues/2911#issuecomment-749580481 +// to enable a test-only feature in a self dependency in the dev-deps section +// of the manifest file because Rust-Analyzer doesn't support such +// circular dependencies: https://github.com/rust-lang/rust-analyzer/issues/14167. +// So instead we use the same trick with harp rather than ark, so that there +// is no circular dependency, which fixes the issue with Rust-Analyzer. +// +// - Unfortunately we can't query the features enabled in a dependency with `cfg`. +// So instead we define a global variable here that can then be checked at +// runtime in Ark. +pub static IS_TESTING: bool = cfg!(feature = "testing"); + static INIT: Once = Once::new(); pub fn r_test(f: F) { diff --git a/crates/libr/src/r.rs b/crates/libr/src/r.rs index 5e841ecaa..5c5923dbd 100644 --- a/crates/libr/src/r.rs +++ b/crates/libr/src/r.rs @@ -726,6 +726,9 @@ mutable_globals::generate! { #[cfg(target_family = "unix")] pub static mut ptr_R_Busy: Option; + #[cfg(target_family = "unix")] + pub static mut ptr_R_Suicide: Option; + // ----------------------------------------------------------------------------------- // Windows diff --git a/crates/stdext/src/lib.rs b/crates/stdext/src/lib.rs index 59feab9ce..34cff1db1 100644 --- a/crates/stdext/src/lib.rs +++ b/crates/stdext/src/lib.rs @@ -60,13 +60,10 @@ macro_rules! cstr { #[macro_export] macro_rules! assert_match { ($expression:expr, $pattern:pat_param => $code:block) => { - assert!(match $expression { - $pattern => { - $code - true - }, - _ => false - }) + match $expression { + $pattern => $code, + _ => panic!("Expected {}", stringify!($pattern)), + } }; ($expression:expr, $pattern:pat_param) => {