1
1
/*
2
- * mod .rs
2
+ * dummy_frontend .rs
3
3
*
4
- * Copyright (C) 2022 Posit Software, PBC. All rights reserved.
4
+ * Copyright (C) 2022-2024 Posit Software, PBC. All rights reserved.
5
5
*
6
6
*/
7
7
8
- use amalthea:: connection_file:: ConnectionFile ;
9
- use amalthea:: session:: Session ;
10
- use amalthea:: socket:: socket:: Socket ;
11
- use amalthea:: wire:: jupyter_message:: JupyterMessage ;
12
- use amalthea:: wire:: jupyter_message:: Message ;
13
- use amalthea:: wire:: jupyter_message:: ProtocolMessage ;
8
+ use serde_json:: Value ;
9
+ use stdext:: assert_match;
14
10
15
- pub struct Frontend {
11
+ use crate :: connection_file:: ConnectionFile ;
12
+ use crate :: session:: Session ;
13
+ use crate :: socket:: socket:: Socket ;
14
+ use crate :: wire:: execute_input:: ExecuteInput ;
15
+ use crate :: wire:: execute_reply:: ExecuteReply ;
16
+ use crate :: wire:: execute_request:: ExecuteRequest ;
17
+ use crate :: wire:: jupyter_message:: JupyterMessage ;
18
+ use crate :: wire:: jupyter_message:: Message ;
19
+ use crate :: wire:: jupyter_message:: ProtocolMessage ;
20
+ use crate :: wire:: status:: ExecutionState ;
21
+ use crate :: wire:: wire_message:: WireMessage ;
22
+
23
+ pub struct DummyFrontend {
16
24
pub _control_socket : Socket ,
17
25
pub shell_socket : Socket ,
18
26
pub iopub_socket : Socket ,
@@ -27,7 +35,7 @@ pub struct Frontend {
27
35
heartbeat_port : u16 ,
28
36
}
29
37
30
- impl Frontend {
38
+ impl DummyFrontend {
31
39
pub fn new ( ) -> Self {
32
40
use rand:: Rng ;
33
41
@@ -117,7 +125,7 @@ impl Frontend {
117
125
118
126
/// Completes initialization of the frontend (usually done after the kernel
119
127
/// is ready and connected)
120
- pub fn complete_intialization ( & self ) {
128
+ pub fn complete_initialization ( & self ) {
121
129
self . iopub_socket . subscribe ( ) . unwrap ( ) ;
122
130
}
123
131
@@ -130,29 +138,100 @@ impl Frontend {
130
138
id
131
139
}
132
140
141
+ pub fn send_execute_request ( & self , code : & str ) -> String {
142
+ self . send_shell ( ExecuteRequest {
143
+ code : String :: from ( code) ,
144
+ silent : false ,
145
+ store_history : true ,
146
+ user_expressions : serde_json:: Value :: Null ,
147
+ allow_stdin : false ,
148
+ stop_on_error : false ,
149
+ } )
150
+ }
151
+
133
152
/// Sends a Jupyter message on the Stdin socket
134
153
pub fn send_stdin < T : ProtocolMessage > ( & self , msg : T ) {
135
154
let message = JupyterMessage :: create ( msg, None , & self . session ) ;
136
155
message. send ( & self . stdin_socket ) . unwrap ( ) ;
137
156
}
138
157
139
158
/// Receives a Jupyter message from the Shell socket
140
- pub fn receive_shell ( & self ) -> Message {
159
+ pub fn recv_shell ( & self ) -> Message {
141
160
Message :: read_from_socket ( & self . shell_socket ) . unwrap ( )
142
161
}
143
162
163
+ /// Receive from Shell and assert ExecuteReply message
164
+ pub fn recv_shell_execute_reply ( & self ) -> ExecuteReply {
165
+ let msg = self . recv_shell ( ) ;
166
+
167
+ assert_match ! ( msg, Message :: ExecuteReply ( data) => {
168
+ data. content
169
+ } )
170
+ }
171
+
144
172
/// Receives a Jupyter message from the IOPub socket
145
- pub fn receive_iopub ( & self ) -> Message {
173
+ pub fn recv_iopub ( & self ) -> Message {
146
174
Message :: read_from_socket ( & self . iopub_socket ) . unwrap ( )
147
175
}
148
176
177
+ /// Receive from IOPub and assert Busy message
178
+ pub fn recv_iopub_busy ( & self ) -> ( ) {
179
+ let msg = self . recv_iopub ( ) ;
180
+
181
+ assert_match ! ( msg, Message :: Status ( data) => {
182
+ assert_eq!( data. content. execution_state, ExecutionState :: Busy ) ;
183
+ } ) ;
184
+ }
185
+
186
+ /// Receive from IOPub and assert Idle message
187
+ pub fn recv_iopub_idle ( & self ) -> ( ) {
188
+ let msg = self . recv_iopub ( ) ;
189
+
190
+ assert_match ! ( msg, Message :: Status ( data) => {
191
+ assert_eq!( data. content. execution_state, ExecutionState :: Idle ) ;
192
+ } ) ;
193
+ }
194
+
195
+ /// Receive from IOPub and assert ExecuteInput message
196
+ pub fn recv_iopub_execute_input ( & self ) -> ExecuteInput {
197
+ let msg = self . recv_iopub ( ) ;
198
+
199
+ assert_match ! ( msg, Message :: ExecuteInput ( data) => {
200
+ data. content
201
+ } )
202
+ }
203
+
204
+ /// Receive from IOPub and assert ExecuteResult message. Returns compulsory
205
+ /// `plain/text` result.
206
+ pub fn recv_iopub_execute_result ( & self ) -> String {
207
+ let msg = self . recv_iopub ( ) ;
208
+
209
+ assert_match ! ( msg, Message :: ExecuteResult ( data) => {
210
+ assert_match!( data. content. data, Value :: Object ( map) => {
211
+ assert_match!( map[ "text/plain" ] , Value :: String ( ref string) => {
212
+ string. clone( )
213
+ } )
214
+ } )
215
+ } )
216
+ }
217
+
218
+ /// Receive from IOPub and assert ExecuteResult message. Returns compulsory
219
+ /// `evalue` field.
220
+ pub fn recv_iopub_execute_error ( & self ) -> String {
221
+ let msg = self . recv_iopub ( ) ;
222
+
223
+ assert_match ! ( msg, Message :: ExecuteError ( data) => {
224
+ data. content. exception. evalue
225
+ } )
226
+ }
227
+
149
228
/// Receives a Jupyter message from the Stdin socket
150
- pub fn receive_stdin ( & self ) -> Message {
229
+ pub fn recv_stdin ( & self ) -> Message {
151
230
Message :: read_from_socket ( & self . stdin_socket ) . unwrap ( )
152
231
}
153
232
154
233
/// Receives a (raw) message from the heartbeat socket
155
- pub fn receive_heartbeat ( & self ) -> zmq:: Message {
234
+ pub fn recv_heartbeat ( & self ) -> zmq:: Message {
156
235
let mut msg = zmq:: Message :: new ( ) ;
157
236
self . heartbeat_socket . recv ( & mut msg) . unwrap ( ) ;
158
237
msg
@@ -178,4 +257,39 @@ impl Frontend {
178
257
key : self . key . clone ( ) ,
179
258
}
180
259
}
260
+
261
+ /// Asserts that no socket has incoming data
262
+ pub fn assert_no_incoming ( & mut self ) {
263
+ let mut has_incoming = false ;
264
+
265
+ if self . iopub_socket . has_incoming_data ( ) . unwrap ( ) {
266
+ has_incoming = true ;
267
+ Self :: flush_incoming ( "IOPub" , & self . iopub_socket ) ;
268
+ }
269
+ if self . shell_socket . has_incoming_data ( ) . unwrap ( ) {
270
+ has_incoming = true ;
271
+ Self :: flush_incoming ( "Shell" , & self . shell_socket ) ;
272
+ }
273
+ if self . stdin_socket . has_incoming_data ( ) . unwrap ( ) {
274
+ has_incoming = true ;
275
+ Self :: flush_incoming ( "StdIn" , & self . stdin_socket ) ;
276
+ }
277
+ if self . heartbeat_socket . has_incoming_data ( ) . unwrap ( ) {
278
+ has_incoming = true ;
279
+ Self :: flush_incoming ( "Heartbeat" , & self . heartbeat_socket ) ;
280
+ }
281
+
282
+ if has_incoming {
283
+ panic ! ( "Sockets must be empty on exit (see details above)" ) ;
284
+ }
285
+ }
286
+
287
+ fn flush_incoming ( name : & str , socket : & Socket ) {
288
+ println ! ( "{name} has incoming data:" ) ;
289
+
290
+ while socket. has_incoming_data ( ) . unwrap ( ) {
291
+ dbg ! ( WireMessage :: read_from_socket( socket) . unwrap( ) ) ;
292
+ println ! ( "---" ) ;
293
+ }
294
+ }
181
295
}
0 commit comments