Skip to content

Commit

Permalink
Merge pull request #3 from childsish/asyncio
Browse files Browse the repository at this point in the history
Asyncio
  • Loading branch information
childsish authored Sep 26, 2016
2 parents dd96cdf + b4403da commit e9e8616
Show file tree
Hide file tree
Showing 25 changed files with 357 additions and 314 deletions.
32 changes: 11 additions & 21 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ There are two core components in MiMo; the `Stream` and the `Workflow`. Streams

### Streams

Implementing a stream can be done through inheriting a sub-class from the `Stream` class or creating a `Stream`class with a custom function as the `fn` parameter. The following code shows the same implementation of a stream that will produce the numbers from 0 to 99.
Implementing a stream can be done through inheriting a sub-class from the `Stream` class or creating a `Stream` class with a custom function as the `fn` parameter. The following code shows two implementations of a stream that will produce the numbers from 0 to 99.


```python
Expand All @@ -28,36 +28,26 @@ class MyStream(Stream):

IN = []
OUT = ['entity']

def __init__(self):
super().__init__()
self.iterator = None

def run(self, ins, outs):
if self.iterator is None:
self.iterator = iter(range(100))
for item in self.iterator:
if not outs.entity.pueh(item):
return True
async def run(self, ins, outs):
for item in iter(range(100)):
await outs.entity.push(item)


# Method 2 (constructor)

my_stream = Stream(outs=['entity], fn=my_stream_fn)

def my_stream_fn(ins, outs, state):
if 'iterator' not in state:
state['iterator'] = iter(range(100))
for item in state['iterator']:
if not outs.entity.push(item):
return True
async def my_stream_fn(ins, outs, state):
for item in iter(range(100)):
await outs.entity.push(item)
```

There are a few things to note about the `run` function.
1. It takes two parameters, `ins` and `outs`, that contain the input streams and the output streams. The names of the input and output streams are defined by the `IN` and `OUT` member variables and accessing the input and output streams can be done through the attributes. From the example above, accessing the `entity` output stream can be done with `outs.entity`.
2. Input streams can be popped and peeked. Input streams haven't been used in the above example, but the entities in the stream can be accessed one at a time with the functions `pop` and `peek`. Popping an entity will remove it from the input stream, and peeking will look at the top-most entity without removing it from the stream.
2. Output streams can be pushed. Pushing an entity to an output stream will make it available to any connected downstream streams. The `push` function return a boolean to indicate whether the stream is full or not (`True` if still pushable). A full stream ca still be pushed to, but users can make their custom streams back-pressure aware by testing this value.
3. The return value is a boolean. If a stream did not fully complete it's task (possibly due to back-pressure), then it should return `True` to indicate that it can be run again after downstream streams have completed. Otherwise a `False` (or equivalent like `None`) will prevent further execution of the stream until new input is available.
1. It must be asynchronous, ie. it must be defined wth the `async def` keywords.
2. It takes two parameters, `ins` and `outs`, that contain the input streams and the output streams. The names of the input and output streams are defined by the `IN` and `OUT` member variables or overridden using the `ins` and `outs` of the initialisation function. Accessing the input and output streams can be done through the attributes. From the example above, accessing the `entity` output stream can be done with `outs.entity`.
3. Input streams can be popped and peeked and this must be done using the `await` keyword. Input streams haven't been used in the above example, but the entities in the stream can be accessed one at a time with the functions `pop` and `peek`. Popping an entity will remove it from the input stream, and peeking will look at the top-most entity without removing it from the stream. Input streams can also be iterated using the `async for` looping construct.
4. Output streams can be pushed and must also use the `await` keyword. Pushing an entity to an output stream will make it available to any connected downstream streams.

### Workflows

Expand Down
30 changes: 12 additions & 18 deletions examples/join.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from mimo import Workflow, Stream
from mimo import Workflow, Stream, azip


def main():
Expand All @@ -14,36 +14,30 @@ def main():
workflow.run()


def stream1(ins, outs, state):
async def stream1(ins, outs, state):
"""
Generates integers from 0 to 99.
"""
if 'iterator' not in state:
state['iterator'] = iter(range(100))
iterator = state['iterator']
for item in iterator:
if not outs.a.push(item):
return True
for item in iter(range(100)):
await outs.a.push(item)
outs.a.close()


def stream2(ins, outs, state):
async def stream2(ins, outs, state):
"""
Generates integers from 99 to 0.
"""
if 'iterator' not in state:
state['iterator'] = iter(range(99, -1, -1))
iterator = state['iterator']
for item in iterator:
if not outs.b.push(item):
return True
for item in iter(range(99, -1, -1)):
await outs.b.push(item)
outs.b.close()


def stream3(ins, outs, state):
async def stream3(ins, outs, state):
"""
Divide incoming entities by 10 and print to stdout
"""
while len(ins.c) > 0 and len(ins.d) > 0:
sys.stdout.write('{}\n'.format(ins.c.pop() + ins.d.pop()))
async for c, d in azip(ins.c, ins.d):
sys.stdout.write('{}\n'.format(c + d))

if __name__ == '__main__':
import sys
Expand Down
27 changes: 11 additions & 16 deletions examples/linear.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,35 +13,30 @@ def main():
workflow.run()


def stream1(ins, outs, state):
async def stream1(ins, outs, state):
"""
Generates integers from 0 to 99.
"""
if 'iterator' not in state:
state['iterator'] = iter(range(100))
iterator = state['iterator']
for item in iterator:
if not outs.a.push(item):
return True
for item in iter(range(100)):
await outs.a.push(item)
outs.a.close()


def stream2(ins, outs, state):
async def stream2(ins, outs, state):
"""
Multiplies the integers by 2.
"""
while len(ins.b) > 0:
item = ins.b.pop()
if not outs.c.push(item * 2):
break
return len(ins.b) > 0
async for item in ins.b:
await outs.c.push(item * 2)
outs.c.close()


def stream3(ins, outs, state):
async def stream3(ins, outs, state):
"""
Print incoming entities to stdout
"""
while len(ins.d) > 0:
print(ins.d.pop())
async for item in ins.d:
print(item)

if __name__ == '__main__':
import sys
Expand Down
37 changes: 18 additions & 19 deletions examples/multi_output.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,43 +3,42 @@

def main():
workflow = Workflow(10)
step1 = workflow.add_stream(Stream(outs=['a'], fn=stream1))
step2 = workflow.add_stream(Stream(['b'], fn=stream2))
step3 = workflow.add_stream(Stream(['c'], fn=stream3))
step1 = workflow.add_stream(Stream(outs=['a', 'b'], fn=stream1))
step2 = workflow.add_stream(Stream(['c'], fn=stream2))
step3 = workflow.add_stream(Stream(['d'], fn=stream3))

step1.pipe(step2)
step1.pipe(step3)
step1.pipe(step2, 'a')
step1.pipe(step3, 'b')

print(str(workflow))
workflow.run()


def stream1(ins, outs, state):
async def stream1(ins, outs, state):
"""
Generates integers from 0 to 99.
Generates one stream of integers from 0 to 99 and another from 100 to 1
"""
if 'iterator' not in state:
state['iterator'] = iter(range(100))
iterator = state['iterator']
for item in iterator:
if not outs.a.push(item):
return True
for item in iter(range(100)):
await outs.a.push(item)
await outs.b.push(100 - item)
outs.a.close()
outs.b.close()


def stream2(ins, outs, state):
async def stream2(ins, outs, state):
"""
Multiply incoming entities by 2 and print to stdout
"""
while len(ins.b) > 0:
sys.stdout.write('{}\n'.format(2 * ins.b.pop()))
async for item in ins.c:
sys.stdout.write('{}\n'.format(2 * item))


def stream3(ins, outs, state):
async def stream3(ins, outs, state):
"""
Divide incoming entities by 10 and print to stdout
"""
while len(ins.c) > 0:
sys.stdout.write('{}\n'.format(ins.c.pop() / 10))
async for item in ins.d:
sys.stdout.write('{}\n'.format(item / 10))

if __name__ == '__main__':
import sys
Expand Down
23 changes: 10 additions & 13 deletions examples/split_output.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,32 +14,29 @@ def main():
workflow.run()


def stream1(ins, outs, state):
async def stream1(ins, outs, state):
"""
Generates integers from 0 to 99.
"""
if 'iterator' not in state:
state['iterator'] = iter(range(100))
iterator = state['iterator']
for item in iterator:
if not outs.a.push(item):
return True
for item in iter(range(100)):
await outs.a.push(item)
outs.a.close()


def stream2(ins, outs, state):
async def stream2(ins, outs, state):
"""
Multiply incoming entities by 2 and print to stdout
"""
while len(ins.b) > 0:
sys.stdout.write('{}\n'.format(2 * ins.b.pop()))
async for item in ins.b:
sys.stdout.write('{}\n'.format(2 * item))


def stream3(ins, outs, state):
async def stream3(ins, outs, state):
"""
Divide incoming entities by 10 and print to stdout
"""
while len(ins.c) > 0:
sys.stdout.write('{}\n'.format(ins.c.pop() / 10))
async for item in ins.c:
sys.stdout.write('{}\n'.format(item / 10))

if __name__ == '__main__':
import sys
Expand Down
2 changes: 2 additions & 0 deletions mimo/__init__.py
Original file line number Diff line number Diff line change
@@ -1,2 +1,4 @@
from .stream import Stream
from .workflow import Workflow

from .asynctools import *
10 changes: 10 additions & 0 deletions mimo/asynctools/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
from .range import AsynchronousRange
from .zip import AsynchronousZip


def azip(*iterables):
return AsynchronousZip(*iterables)


def arange(fr, to=None, step=1):
return AsynchronousRange(fr, to, step)
15 changes: 15 additions & 0 deletions mimo/asynctools/range.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
class AsynchronousRange:
def __init__(self, fr, to=None, step=1):
if to is None:
self._iterator = iter(range(fr))
else:
self._iterator = iter(range(fr, to, step))

def __aiter__(self):
return self

async def __anext__(self):
try:
return next(self._iterator)
except StopIteration:
raise StopAsyncIteration
12 changes: 12 additions & 0 deletions mimo/asynctools/zip.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
class AsynchronousZip:
def __init__(self, *iterables):
self._iterables = iterables

def __aiter__(self):
return self

async def __anext__(self):
res = []
for iterator in self._iterables:
res.append(await iterator.__anext__())
return tuple(res)
18 changes: 0 additions & 18 deletions mimo/connection/connection.py

This file was deleted.

Loading

0 comments on commit e9e8616

Please sign in to comment.