diff --git a/examples/join.py b/examples/join.py index 2e005b0..ac96e32 100644 --- a/examples/join.py +++ b/examples/join.py @@ -1,4 +1,4 @@ -from mimo import Workflow, Stream +from mimo import Workflow, Stream, azip def main(): @@ -14,36 +14,30 @@ def main(): workflow.run() -def stream1(ins, outs, state): +async def stream1(ins, outs, state): """ Generates integers from 0 to 99. """ - if 'iterator' not in state: - state['iterator'] = iter(range(100)) - iterator = state['iterator'] - for item in iterator: - if not outs.a.push(item): - return True + for item in iter(range(100)): + await outs.a.push(item) + outs.a.close() -def stream2(ins, outs, state): +async def stream2(ins, outs, state): """ Generates integers from 99 to 0. """ - if 'iterator' not in state: - state['iterator'] = iter(range(99, -1, -1)) - iterator = state['iterator'] - for item in iterator: - if not outs.b.push(item): - return True + for item in iter(range(99, -1, -1)): + await outs.b.push(item) + outs.b.close() -def stream3(ins, outs, state): +async def stream3(ins, outs, state): """ Divide incoming entities by 10 and print to stdout """ - while len(ins.c) > 0 and len(ins.d) > 0: - sys.stdout.write('{}\n'.format(ins.c.pop() + ins.d.pop())) + async for c, d in azip(ins.c, ins.d): + sys.stdout.write('{}\n'.format(c + d)) if __name__ == '__main__': import sys diff --git a/examples/linear.py b/examples/linear.py index a8733ac..d43e7a0 100644 --- a/examples/linear.py +++ b/examples/linear.py @@ -13,35 +13,30 @@ def main(): workflow.run() -def stream1(ins, outs, state): +async def stream1(ins, outs, state): """ Generates integers from 0 to 99. """ - if 'iterator' not in state: - state['iterator'] = iter(range(100)) - iterator = state['iterator'] - for item in iterator: - if not outs.a.push(item): - return True + for item in iter(range(100)): + await outs.a.push(item) + outs.a.close() -def stream2(ins, outs, state): +async def stream2(ins, outs, state): """ Multiplies the integers by 2. """ - while len(ins.b) > 0: - item = ins.b.pop() - if not outs.c.push(item * 2): - break - return len(ins.b) > 0 + async for item in ins.b: + await outs.c.push(item * 2) + outs.c.close() -def stream3(ins, outs, state): +async def stream3(ins, outs, state): """ Print incoming entities to stdout """ - while len(ins.d) > 0: - print(ins.d.pop()) + async for item in ins.d: + print(item) if __name__ == '__main__': import sys diff --git a/examples/multi_output.py b/examples/multi_output.py index bdc2edc..4cdb3ed 100644 --- a/examples/multi_output.py +++ b/examples/multi_output.py @@ -3,43 +3,42 @@ def main(): workflow = Workflow(10) - step1 = workflow.add_stream(Stream(outs=['a'], fn=stream1)) - step2 = workflow.add_stream(Stream(['b'], fn=stream2)) - step3 = workflow.add_stream(Stream(['c'], fn=stream3)) + step1 = workflow.add_stream(Stream(outs=['a', 'b'], fn=stream1)) + step2 = workflow.add_stream(Stream(['c'], fn=stream2)) + step3 = workflow.add_stream(Stream(['d'], fn=stream3)) - step1.pipe(step2) - step1.pipe(step3) + step1.pipe(step2, 'a') + step1.pipe(step3, 'b') print(str(workflow)) workflow.run() -def stream1(ins, outs, state): +async def stream1(ins, outs, state): """ - Generates integers from 0 to 99. + Generates one stream of integers from 0 to 99 and another from 100 to 1 """ - if 'iterator' not in state: - state['iterator'] = iter(range(100)) - iterator = state['iterator'] - for item in iterator: - if not outs.a.push(item): - return True + for item in iter(range(100)): + await outs.a.push(item) + await outs.b.push(100 - item) + outs.a.close() + outs.b.close() -def stream2(ins, outs, state): +async def stream2(ins, outs, state): """ Multiply incoming entities by 2 and print to stdout """ - while len(ins.b) > 0: - sys.stdout.write('{}\n'.format(2 * ins.b.pop())) + async for item in ins.c: + sys.stdout.write('{}\n'.format(2 * item)) -def stream3(ins, outs, state): +async def stream3(ins, outs, state): """ Divide incoming entities by 10 and print to stdout """ - while len(ins.c) > 0: - sys.stdout.write('{}\n'.format(ins.c.pop() / 10)) + async for item in ins.d: + sys.stdout.write('{}\n'.format(item / 10)) if __name__ == '__main__': import sys diff --git a/examples/split_output.py b/examples/split_output.py index bdc2edc..ea0638a 100644 --- a/examples/split_output.py +++ b/examples/split_output.py @@ -14,32 +14,29 @@ def main(): workflow.run() -def stream1(ins, outs, state): +async def stream1(ins, outs, state): """ Generates integers from 0 to 99. """ - if 'iterator' not in state: - state['iterator'] = iter(range(100)) - iterator = state['iterator'] - for item in iterator: - if not outs.a.push(item): - return True + for item in iter(range(100)): + await outs.a.push(item) + outs.a.close() -def stream2(ins, outs, state): +async def stream2(ins, outs, state): """ Multiply incoming entities by 2 and print to stdout """ - while len(ins.b) > 0: - sys.stdout.write('{}\n'.format(2 * ins.b.pop())) + async for item in ins.b: + sys.stdout.write('{}\n'.format(2 * item)) -def stream3(ins, outs, state): +async def stream3(ins, outs, state): """ Divide incoming entities by 10 and print to stdout """ - while len(ins.c) > 0: - sys.stdout.write('{}\n'.format(ins.c.pop() / 10)) + async for item in ins.c: + sys.stdout.write('{}\n'.format(item / 10)) if __name__ == '__main__': import sys