Skip to content

Commit

Permalink
updated examples
Browse files Browse the repository at this point in the history
  • Loading branch information
Liam Childs committed Sep 26, 2016
1 parent a9eb91e commit cde808b
Show file tree
Hide file tree
Showing 4 changed files with 51 additions and 66 deletions.
30 changes: 12 additions & 18 deletions examples/join.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from mimo import Workflow, Stream
from mimo import Workflow, Stream, azip


def main():
Expand All @@ -14,36 +14,30 @@ def main():
workflow.run()


def stream1(ins, outs, state):
async def stream1(ins, outs, state):
"""
Generates integers from 0 to 99.
"""
if 'iterator' not in state:
state['iterator'] = iter(range(100))
iterator = state['iterator']
for item in iterator:
if not outs.a.push(item):
return True
for item in iter(range(100)):
await outs.a.push(item)
outs.a.close()


def stream2(ins, outs, state):
async def stream2(ins, outs, state):
"""
Generates integers from 99 to 0.
"""
if 'iterator' not in state:
state['iterator'] = iter(range(99, -1, -1))
iterator = state['iterator']
for item in iterator:
if not outs.b.push(item):
return True
for item in iter(range(99, -1, -1)):
await outs.b.push(item)
outs.b.close()


def stream3(ins, outs, state):
async def stream3(ins, outs, state):
"""
Divide incoming entities by 10 and print to stdout
"""
while len(ins.c) > 0 and len(ins.d) > 0:
sys.stdout.write('{}\n'.format(ins.c.pop() + ins.d.pop()))
async for c, d in azip(ins.c, ins.d):
sys.stdout.write('{}\n'.format(c + d))

if __name__ == '__main__':
import sys
Expand Down
27 changes: 11 additions & 16 deletions examples/linear.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,35 +13,30 @@ def main():
workflow.run()


def stream1(ins, outs, state):
async def stream1(ins, outs, state):
"""
Generates integers from 0 to 99.
"""
if 'iterator' not in state:
state['iterator'] = iter(range(100))
iterator = state['iterator']
for item in iterator:
if not outs.a.push(item):
return True
for item in iter(range(100)):
await outs.a.push(item)
outs.a.close()


def stream2(ins, outs, state):
async def stream2(ins, outs, state):
"""
Multiplies the integers by 2.
"""
while len(ins.b) > 0:
item = ins.b.pop()
if not outs.c.push(item * 2):
break
return len(ins.b) > 0
async for item in ins.b:
await outs.c.push(item * 2)
outs.c.close()


def stream3(ins, outs, state):
async def stream3(ins, outs, state):
"""
Print incoming entities to stdout
"""
while len(ins.d) > 0:
print(ins.d.pop())
async for item in ins.d:
print(item)

if __name__ == '__main__':
import sys
Expand Down
37 changes: 18 additions & 19 deletions examples/multi_output.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,43 +3,42 @@

def main():
workflow = Workflow(10)
step1 = workflow.add_stream(Stream(outs=['a'], fn=stream1))
step2 = workflow.add_stream(Stream(['b'], fn=stream2))
step3 = workflow.add_stream(Stream(['c'], fn=stream3))
step1 = workflow.add_stream(Stream(outs=['a', 'b'], fn=stream1))
step2 = workflow.add_stream(Stream(['c'], fn=stream2))
step3 = workflow.add_stream(Stream(['d'], fn=stream3))

step1.pipe(step2)
step1.pipe(step3)
step1.pipe(step2, 'a')
step1.pipe(step3, 'b')

print(str(workflow))
workflow.run()


def stream1(ins, outs, state):
async def stream1(ins, outs, state):
"""
Generates integers from 0 to 99.
Generates one stream of integers from 0 to 99 and another from 100 to 1
"""
if 'iterator' not in state:
state['iterator'] = iter(range(100))
iterator = state['iterator']
for item in iterator:
if not outs.a.push(item):
return True
for item in iter(range(100)):
await outs.a.push(item)
await outs.b.push(100 - item)
outs.a.close()
outs.b.close()


def stream2(ins, outs, state):
async def stream2(ins, outs, state):
"""
Multiply incoming entities by 2 and print to stdout
"""
while len(ins.b) > 0:
sys.stdout.write('{}\n'.format(2 * ins.b.pop()))
async for item in ins.c:
sys.stdout.write('{}\n'.format(2 * item))


def stream3(ins, outs, state):
async def stream3(ins, outs, state):
"""
Divide incoming entities by 10 and print to stdout
"""
while len(ins.c) > 0:
sys.stdout.write('{}\n'.format(ins.c.pop() / 10))
async for item in ins.d:
sys.stdout.write('{}\n'.format(item / 10))

if __name__ == '__main__':
import sys
Expand Down
23 changes: 10 additions & 13 deletions examples/split_output.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,32 +14,29 @@ def main():
workflow.run()


def stream1(ins, outs, state):
async def stream1(ins, outs, state):
"""
Generates integers from 0 to 99.
"""
if 'iterator' not in state:
state['iterator'] = iter(range(100))
iterator = state['iterator']
for item in iterator:
if not outs.a.push(item):
return True
for item in iter(range(100)):
await outs.a.push(item)
outs.a.close()


def stream2(ins, outs, state):
async def stream2(ins, outs, state):
"""
Multiply incoming entities by 2 and print to stdout
"""
while len(ins.b) > 0:
sys.stdout.write('{}\n'.format(2 * ins.b.pop()))
async for item in ins.b:
sys.stdout.write('{}\n'.format(2 * item))


def stream3(ins, outs, state):
async def stream3(ins, outs, state):
"""
Divide incoming entities by 10 and print to stdout
"""
while len(ins.c) > 0:
sys.stdout.write('{}\n'.format(ins.c.pop() / 10))
async for item in ins.c:
sys.stdout.write('{}\n'.format(item / 10))

if __name__ == '__main__':
import sys
Expand Down

0 comments on commit cde808b

Please sign in to comment.