diff --git a/ipykernel/tests/test_kernel.py b/ipykernel/tests/test_kernel.py index fda8a1f4d..7560bcddb 100644 --- a/ipykernel/tests/test_kernel.py +++ b/ipykernel/tests/test_kernel.py @@ -17,10 +17,11 @@ from IPython.paths import locate_profile from ipython_genutils.tempdir import TemporaryDirectory +from ipykernel.tests.test_message_spec import validate_message from .utils import ( new_kernel, kernel, TIMEOUT, assemble_output, execute, flush_channels, wait_for_idle, -) + connect_to_kernel) def _check_master(kc, expected=True, stream="stdout"): @@ -326,3 +327,43 @@ def test_shutdown(): else: break assert not km.is_alive() + +def test_fork_metadata(): + with kernel() as kc: + km = kc.parent + fork_msg_id = kc.fork() + fork_reply = kc.get_shell_msg(block=True, timeout=TIMEOUT) + # validate_message(fork_reply, "execute_reply", fork_msg_id) # TODO: Make it work (need the `fork_reply`) + assert fork_msg_id == fork_reply['parent_header']['msg_id'] == fork_msg_id + assert fork_reply['content']['conn']['key'] != kc.session.key.decode() + fork_pid = fork_reply['content']['pid'] + _check_status(fork_reply['content']) + wait_for_idle(kc) + + assert fork_pid != km.kernel.pid + #TODO: Inspect if `fork_pid` is running? Might need to use `psutil` for this in order to be cross platform + + with connect_to_kernel(fork_reply['content']['conn'], TIMEOUT) as kc_fork: + assert fork_reply['content']['conn']['key'] == kc_fork.session.key.decode() + kc_fork.shutdown() + +def test_fork(): + def execute_with_user_expression(kc, code, user_expression): + _, reply = execute(code, kc=kc, user_expressions={"my-user-expression": user_expression}) + content = reply["user_expressions"]["my-user-expression"]["data"]["text/plain"] + wait_for_idle(kc) + return content + + """Kernel forks after fork_request""" + with kernel() as kc: + assert execute_with_user_expression(kc, u'a = 1', "a") == "1" + assert execute_with_user_expression(kc, u'b = 2', "b") == "2" + kc.fork() + fork_reply = kc.get_shell_msg(block=True, timeout=TIMEOUT) + wait_for_idle(kc) + + with connect_to_kernel(fork_reply['content']['conn'], TIMEOUT) as kc_fork: + assert execute_with_user_expression(kc_fork, 'a = 11', "a, b") == str((11, 2)) + assert execute_with_user_expression(kc_fork, 'b = 12', "a, b") == str((11, 12)) + assert execute_with_user_expression(kc, 'z = 20', "a, b") == str((1, 2)) + kc_fork.shutdown() diff --git a/ipykernel/tests/utils.py b/ipykernel/tests/utils.py index 47eab54df..0e9af0758 100644 --- a/ipykernel/tests/utils.py +++ b/ipykernel/tests/utils.py @@ -170,3 +170,14 @@ def wait_for_idle(kc): content = msg['content'] if msg_type == 'status' and content['execution_state'] == 'idle': break + +@contextmanager +def connect_to_kernel(connection_info, timeout): + from jupyter_client import BlockingKernelClient + kc = BlockingKernelClient() + kc.log.setLevel('DEBUG') + kc.load_connection_info(connection_info) + kc.start_channels() + kc.wait_for_ready(timeout) + yield kc + kc.stop_channels()