Skip to content

Commit

Permalink
Stripped wires and added insulation and grounding.
Browse files Browse the repository at this point in the history
Simplified return type for wires from a function plus
its arguments to `Wire(function, arguments)` or None.

Error handling got a major upgrade, since now raise
inside a wire gets sent to a handler.  Calling
`EventLoop.start(wire, handler)` will pass handler
all errors raised -- allowing wire-s to return.
Errors raised from the handler (or with no handler installed)
will also show the point where the wire was first
started into the event loop.

Also, added and exported types using mypy.
  • Loading branch information
frobnitzem committed Oct 2, 2023
1 parent ff3a8f9 commit fdc1d4c
Show file tree
Hide file tree
Showing 10 changed files with 273 additions and 290 deletions.
58 changes: 44 additions & 14 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -8,25 +8,24 @@ It is based on the principles of functional
reactive programming and draws inspiration
from Haskell's `Control.Wire <https://hackage.haskell.org/package/netwire-4.0.7/docs/Control-Wire.html>`_ library.

In particular, every co-routine started by the
event loop is a ``Wire``.
In particular, every co-routine started by the event loop is a ``Wire``.

``Wire``-s either return ``None``, indicating they're done,
or another ``Wire``.
``Wire``-s either return ``None``, indicating they're done, or another
``Wire``.

An example helps explain the idea::

from aiowire import EventLoop

event = 0
async def show_event(ev) \
-> Optional[Callable[[EventLoop],Awaitable]]:
async def show_event(ev) -> Optional[Wire]:
print("Running...")
event += 1
await asyncio.sleep(event*0.15)
print(f"Event {event}")
if event < 5:
return show_event
return Wire(show_event)
return None

async with EventLoop(timeout=1) as event:
event.start(show_event)
Expand All @@ -37,7 +36,7 @@ We start up an event loop and drop in two wires.
Each runs, then returns the ``show_event`` function.
The event loop runs those functions next... and so on.

But this isn't functional programming. The wires
But since this isn't functional programming. The wires
have access to the event loop, and can start more
tasks. Easy, right?

Expand All @@ -50,16 +49,46 @@ working with sockets, and managing timeouts? Drop
in one wire for each program, one polling on socket I/O,
and another acting as a timer (as above).

The canonical task types are thus::
Some canonical task types that do these include::

asyncio.create_subprocess_exec # run a process
asyncio.create_subprocess_exec # run a program

asyncio.sleep # awake the loop after a given time lapse

zmq.asyncio.Poller.poll # awake the loop after I/O on socket/file
# Note: see aiowire.Poller for a Wire-y interface.

Now your sockets can launch programs, and your program
aiowire.Poller # Wire-y interface to zmq.asyncio.Poller


Think about each wire as a finite state machine.
For example,

.. mermaid::

flowchart LR
R[Ready] --> N{New Task?};
N -- Yes --> W[Working];
W --> C{Complete?};
C -- Yes --> R;

can be implemented like so::

async def ready(ev : EventLoop, info : X) -> Optional[Wire]:
if info.new_task():
do_working_action()
return Wire(working, info) # move to working state

# Return a sequence of 2 wires:
return Call(asyncio.sleep, 1.0) >> Wire(ready, info)

async def working(ev : EventLoop, info : X) -> Wire:
if info.complete():
do_complete_action()
return Wire(ready, info)
await asyncio.sleep(0.5) # directly sleep a bit
return Wire(working, info)

Note how your sockets can launch programs, and your program
results can start/stop sockets, and everyone can start
background tasks.

Expand All @@ -82,7 +111,7 @@ You add it to your event loop as usual::
await sock.send( await sock.recv() )

todo = { 0: Call(print, "received input on sys.stdin"),
sock: echo
sock: Wire(echo)
}
async with EventLoop() as ev:
ev.start( Poller(todo) )
Expand All @@ -103,7 +132,8 @@ programming idioms:
* `Wire(w)`: acts like an identity over "async func(ev):" functions
* `Repeat(w, n)`: repeat wire ``w`` n times in a row
* `Forever(w)`: repeat forever -- like `Repeat(w) * infinity`
* `Call(fn, *args)`: call fn (normal or async), ignore the return, and exit
* `Call(fn, *args, **kargs)`: call fn (normal or async),
ignore the return, and exit

Consider, for example, printing 4 alarms separated by some time interval::

Expand Down
11 changes: 8 additions & 3 deletions aiowire/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,12 @@

__version__ = importlib.metadata.version("aiowire")

from .event_loop import EventLoop
from .event_loop import EventLoop, UnhandledException
from .poller import Poller
from .wire import Wire, Sequence, Call, Repeat, Forever
from .wire import ApplyM, RepeatM, ForeverM
from .wire import (
Wire,
Sequence,
Call,
Repeat,
Forever,
)
110 changes: 77 additions & 33 deletions aiowire/event_loop.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,34 @@
from typing import Optional, Callable, Union, Awaitable, List, Any, Tuple
from typing import Optional, Callable, List, Dict
from inspect import isawaitable
import asyncio

from .wire import Wire

Handler = Callable[['EventLoop', Exception], None]

class UnhandledException(Exception):
pass

def default_handler(handler : Optional[Handler] = None) -> Handler:
""" This default handler captures the traceback that lead to its creation.
Any exception raised by the provided handler will
be re-raised, with an extra note on where this
default_handler was created (i.e. the original `EventLoop.start()`)..
If handler is None, then this works as if handler was just
`raise UnhandledException`.
"""
def insulated_handler(ev : 'EventLoop', e : Exception):
if handler is None:
raise UnhandledException("Wire with no exception handler") from e
else:
try:
handler(ev, e)
except Exception as e2:
raise e2 from e
return insulated_handler

class EventLoop:
"""
Create a wire-driven event loop.
Expand All @@ -26,25 +53,51 @@ class EventLoop:
several network connections and subprocesses, and
wants to perform task/connection management as
part of some of the callbacks.
Exception Handling:
The ev.start() method can take a second parameter
that is a callback where all exceptions are sent.
When a task (or any of its subtasks) raise an exception,
the callback is called with the exception as its argument.
Note that if a task is cancelled, the
exception type will be asyncio.CancelledError.
Because each task either raises an exception,
returns another wire, or returns None,
the handler will only be called once by each start()-ed
task (or not at all).
"""
def __init__(self, timeout=None):
self.tasks = set()
def __init__(self, timeout : Optional[float] = None):
self.tasks : Dict[asyncio.Task, Handler] = {}
self.timeout = timeout

def start(self, ret) -> List[Any]:
if ret is None:
return []
fn, args = get_args(ret)
if fn is None:
return args
coro = fn(self, *args)
def start(self, w : Optional[Wire],
handler : Optional[Handler] = None,
capture_context : bool = True) -> None:
""" Schedule the wire, `w`, for execution with
the given exception handler.
If capture_context is True, then exceptions raised
during invocation of the handler will be wrapped with
a traceback showing where start() was originally called.
`capture_context` is set to False when running Wire-s returned
by Wire-s so we don't trace the entire state history, only
the original `start()` location.
"""
if w is None:
return None
coro = w(self)
if isawaitable(coro):
#t = asyncio.ensure_future(coro)
t = asyncio.create_task(coro)
self.tasks.add(t)
return args
t : asyncio.Task = asyncio.create_task(coro) # type: ignore[arg-type]
if handler is None or capture_context:
self.tasks[t] = default_handler(handler)
else:
self.tasks[t] = handler
return None

async def run(self, timeout = None) -> None:
async def run(self, timeout : Optional[float] = None) -> None:
"""
Run the event loop. Usually this is called
automatically when the ``async with EventLoop ...``
Expand All @@ -67,10 +120,17 @@ async def run(self, timeout = None) -> None:
timeout = dt,
return_when = asyncio.FIRST_COMPLETED)
for t in done:
self.tasks.remove(t)
handler = self.tasks.pop(t)
# Need to await t to get its return value,
# then pass it to start again.
self.start(await t)
try:
ret = await t
if isinstance(ret, Wire):
self.start(ret, handler, False)
elif ret is not None:
handler(self, TypeError(f"Wire returned {ret}"))
except Exception as e:
handler(self, e)
t1 = loop.time()

async def __aenter__(self):
Expand All @@ -85,21 +145,5 @@ async def __aexit__(self, exc_type, exc, traceback):
for t in self.tasks: #[max(self.cur-1,0):]:
if not t.done():
t.cancel()
self.tasks = set()
self.tasks.clear()
return False # continue to raise any exception

def get_args( ret : Union[Callable, Tuple[Optional[Callable], List]] ) \
-> (Optional[Callable], List):
"""
Interpret mutiple different potential return types from a Wire.
- Callable ~> call ret(ev)
- List/Tuple ~> call ret[0](ev, *ret[1])
"""
if isinstance(ret, Callable):
return ret, []
if len(ret) != 2 or not (ret[0] is None \
or isinstance(ret[0], Callable)) \
or not isinstance(ret[1], (list,tuple)):
raise ValueError("Invalid Wire return type: should be either `None`, `Wire`, or `(Wire, List)`")
return ret[0], ret[1]
30 changes: 19 additions & 11 deletions aiowire/poller.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,17 @@
from typing import Optional, Dict, Any
from typing import Optional, Dict, Union

import zmq
import zmq.asyncio
try:
import zmq
import zmq.asyncio
Socket = Union[zmq.Socket, int]
zmqPOLLIN = zmq.POLLIN
except ImportError:
zmq = None # type: ignore[assignment]
Socket = int # type: ignore[misc]
zmqPOLLIN = 101

from .wire import Wire
from .event_loop import EventLoop

class Poller(Wire):
"""
Expand All @@ -20,10 +28,10 @@ class Poller(Wire):
See `the pyzmq docs <https://pyzmq.readthedocs.io/en/latest/api/zmq.html#polling>`_
for more info.
"""
def __init__(self, socks,
default_flags = zmq.POLLIN,
def __init__(self, socks : Dict[Socket, Wire],
default_flags = zmqPOLLIN,
interval : Optional[int] = 1000):
self.socks = {}
self.socks : Dict[Socket, Wire] = {}
self.default_flags = default_flags
self.interval = interval
self.done = False
Expand All @@ -32,9 +40,9 @@ def __init__(self, socks,
for sock, cb in socks.items():
self.register(sock, cb)

def register(self, sock, cb, flags = None):
def register(self, sock : Socket, cb : Wire, flags = None) -> None:
"""
Add a listener on sock, invoking ``Wire`` cb on activity.
Add a listener on sock, invoking cb on activity.
See pyzmq.Poller.register for more info on sock.
If flags is None, self.default_flags is used.
Expand All @@ -46,18 +54,18 @@ def register(self, sock, cb, flags = None):
self.poller.register(sock, flags)
self.socks[sock] = cb

def unregister(self, sock):
def unregister(self, sock : Socket) -> None:
self.poller.unregister(sock)
del self.socks[sock]

def shutdown(self):
def shutdown(self) -> None:
"""
Shutdown only needs to be called if the EventLoop
is running in non-stop mode.
"""
self.done = True

async def __call__(self, ev):
async def __call__(self, ev : EventLoop) -> Optional[Wire]:
# TODO: we could further capture the coroutine
# here within self, then cancel it
# during shutdown -- then await would probably throw something.
Expand Down
Empty file added aiowire/py.typed
Empty file.
Loading

0 comments on commit fdc1d4c

Please sign in to comment.