Skip to content

Commit

Permalink
Issue #12786: Create hook for dispatching messages out of order
Browse files Browse the repository at this point in the history
  • Loading branch information
sonthonaxrk authored and davidbrochart committed Jul 15, 2021
1 parent cc6c171 commit 953eb4b
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 12 deletions.
4 changes: 1 addition & 3 deletions ipykernel/inprocess/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -173,10 +173,8 @@ def _dispatch_to_kernel(self, msg):
raise RuntimeError('Cannot send request. No kernel exists.')

stream = kernel.shell_stream
self.session.send(stream, msg)
msg_parts = stream.recv_multipart()
loop = asyncio.get_event_loop()
loop.run_until_complete(kernel.dispatch_shell(msg_parts))
loop.run_until_complete(kernel.dispatch_shell(msg))
idents, reply_msg = self.session.recv(stream, copy=False)
self.shell_channel.call_handlers_later(reply_msg)

Expand Down
37 changes: 28 additions & 9 deletions ipykernel/kernelbase.py
Original file line number Diff line number Diff line change
Expand Up @@ -306,12 +306,8 @@ async def dispatch_shell(self, msg):
# flush control queue before handling shell requests
await self._flush_control_queue()

idents, msg = self.session.feed_identities(msg, copy=False)
try:
msg = self.session.deserialize(msg, content=True, copy=False)
except Exception:
self.log.error("Invalid Message", exc_info=True)
return
if idents is None:
idents = []

# Set the parent message for side effects.
self.set_parent(idents, msg, channel='shell')
Expand Down Expand Up @@ -465,15 +461,38 @@ async def dispatch_queue(self):
def _message_counter_default(self):
return itertools.count()

def schedule_dispatch(self, dispatch, *args):
def should_dispatch_immediately(self, msg):
"""
This provides a hook for dispatching incoming messages
from the frontend immediately, and out of order.
It could be used to allow asynchronous messages from
GUIs to be processed.
"""
return False

def schedule_dispatch(self, msg, dispatch):
"""schedule a message for dispatch"""

idents, msg = self.session.feed_identities(msg, copy=False)
try:
msg = self.session.deserialize(msg, content=True, copy=False)
except:
self.log.error("Invalid shell message", exc_info=True)
return

new_args = (msg, idents)

if self.should_dispatch_immediately(msg):
return self.io_loop.add_callback(dispatch, *new_args)

idx = next(self._message_counter)

self.msg_queue.put_nowait(
(
idx,
dispatch,
args,
new_args,
)
)
# ensure the eventloop wakes up
Expand All @@ -497,7 +516,7 @@ def start(self):
self.shell_stream.on_recv(
partial(
self.schedule_dispatch,
self.dispatch_shell,
dispatch=self.dispatch_shell,
),
copy=False,
)
Expand Down

0 comments on commit 953eb4b

Please sign in to comment.