Skip to content

Commit

Permalink
Issue #12786: Create hook for dispatching messages out of order
Browse files Browse the repository at this point in the history
  • Loading branch information
Rollo Konig Brock committed Feb 1, 2021
1 parent aba2179 commit b0b3613
Showing 1 changed file with 36 additions and 23 deletions.
59 changes: 36 additions & 23 deletions ipykernel/kernelbase.py
Original file line number Diff line number Diff line change
Expand Up @@ -167,15 +167,8 @@ def __init__(self, **kwargs):
self.control_handlers[msg_type] = getattr(self, msg_type)

@gen.coroutine
def dispatch_control(self, msg):
def dispatch_control(self, msg, ident, stream=None):
"""dispatch control requests"""
idents, msg = self.session.feed_identities(msg, copy=False)
try:
msg = self.session.deserialize(msg, content=True, copy=False)
except:
self.log.error("Invalid Control Message", exc_info=True)
return

self.log.debug("Control received: %s", msg)

# Set the parent message for side effects.
Expand Down Expand Up @@ -215,15 +208,8 @@ def should_handle(self, stream, msg, idents):
return True

@gen.coroutine
def dispatch_shell(self, stream, msg):
def dispatch_shell(self, msg, ident, stream):
"""dispatch shell requests"""
idents, msg = self.session.feed_identities(msg, copy=False)
try:
msg = self.session.deserialize(msg, content=True, copy=False)
except:
self.log.error("Invalid Message", exc_info=True)
return

# Set the parent message for side effects.
self.set_parent(idents, msg)
self._publish_status('busy')
Expand Down Expand Up @@ -385,16 +371,43 @@ def dispatch_queue(self):
def _message_counter_default(self):
return itertools.count()

def schedule_dispatch(self, priority, dispatch, *args):
def should_dispatch_immediately(
self, msg, ident, stream, priority, dispatch
):
"""
This provides a hook for dispatching incoming messages
from the frontend immediately, and out of order.
It could be used to allow asynchronous messages from
GUIs to be processed.
"""
return False

def schedule_dispatch(self, msg, priority, dispatch, stream=None):
"""schedule a message for dispatch"""

idents, msg = self.session.feed_identities(msg, copy=False)
try:
msg = self.session.deserialize(msg, content=True, copy=False)
except:
self.log.error("Invalid Message", exc_info=True)
return

new_args = (msg, idents, stream)

if self.should_dispatch_immediately(
msg, ident, stream, priority, dispatch, stream
):
return self.io_loop.add_callback(dispatch, *new_args)

idx = next(self._message_counter)

self.msg_queue.put_nowait(
(
priority,
idx,
dispatch,
args,
new_args,
)
)
# ensure the eventloop wakes up
Expand All @@ -411,8 +424,8 @@ def start(self):
self.control_stream.on_recv(
partial(
self.schedule_dispatch,
CONTROL_PRIORITY,
self.dispatch_control,
priority=CONTROL_PRIORITY,
dispatch=self.dispatch_control,
),
copy=False,
)
Expand All @@ -423,9 +436,9 @@ def start(self):
s.on_recv(
partial(
self.schedule_dispatch,
SHELL_PRIORITY,
self.dispatch_shell,
s,
priority=SHELL_PRIORITY,
dispatch=self.dispatch_shell,
stream=s,
),
copy=False,
)
Expand Down

0 comments on commit b0b3613

Please sign in to comment.