diff --git a/mimo/__init__.py b/mimo/__init__.py index 283777b..b99b8ef 100644 --- a/mimo/__init__.py +++ b/mimo/__init__.py @@ -1,2 +1,4 @@ from .stream import Stream from .workflow import Workflow + +from .asynctools import * diff --git a/mimo/asynctools/__init__.py b/mimo/asynctools/__init__.py new file mode 100644 index 0000000..36dc84f --- /dev/null +++ b/mimo/asynctools/__init__.py @@ -0,0 +1,10 @@ +from .range import AsynchronousRange +from .zip import AsynchronousZip + + +def azip(*iterables): + return AsynchronousZip(*iterables) + + +def arange(fr, to=None, step=1): + return AsynchronousRange(fr, to, step) diff --git a/mimo/asynctools/range.py b/mimo/asynctools/range.py new file mode 100644 index 0000000..816b774 --- /dev/null +++ b/mimo/asynctools/range.py @@ -0,0 +1,15 @@ +class AsynchronousRange: + def __init__(self, fr, to=None, step=1): + if to is None: + self._iterator = iter(range(fr)) + else: + self._iterator = iter(range(fr, to, step)) + + def __aiter__(self): + return self + + async def __anext__(self): + try: + return next(self._iterator) + except StopIteration: + raise StopAsyncIteration diff --git a/mimo/asynctools/zip.py b/mimo/asynctools/zip.py new file mode 100644 index 0000000..9708fef --- /dev/null +++ b/mimo/asynctools/zip.py @@ -0,0 +1,12 @@ +class AsynchronousZip: + def __init__(self, *iterables): + self._iterables = iterables + + def __aiter__(self): + return self + + async def __anext__(self): + res = [] + for iterator in self._iterables: + res.append(await iterator.__anext__()) + return tuple(res) diff --git a/tests/test_asynctools.py b/tests/test_asynctools.py new file mode 100644 index 0000000..6bd3a23 --- /dev/null +++ b/tests/test_asynctools.py @@ -0,0 +1,85 @@ +import asyncio +import unittest + +from mimo import arange, azip + + +class TestRange(unittest.TestCase): + def setUp(self): + self.loop = asyncio.get_event_loop() + + def test_range(self): + iterator = arange(10) + + future = self.loop.create_task(iterator.__anext__()) + self.loop.run_until_complete(future) + self.assertEqual(0, future.result()) + + future = self.loop.create_task(iterator.__anext__()) + self.loop.run_until_complete(future) + self.assertEqual(1, future.result()) + + future = self.loop.create_task(iterator.__anext__()) + self.loop.run_until_complete(future) + self.assertEqual(2, future.result()) + + future = self.loop.create_task(iterator.__anext__()) + self.loop.run_until_complete(future) + self.assertEqual(3, future.result()) + + future = self.loop.create_task(iterator.__anext__()) + self.loop.run_until_complete(future) + self.assertEqual(4, future.result()) + + future = self.loop.create_task(iterator.__anext__()) + self.loop.run_until_complete(future) + self.assertEqual(5, future.result()) + + future = self.loop.create_task(iterator.__anext__()) + self.loop.run_until_complete(future) + self.assertEqual(6, future.result()) + + future = self.loop.create_task(iterator.__anext__()) + self.loop.run_until_complete(future) + self.assertEqual(7, future.result()) + + future = self.loop.create_task(iterator.__anext__()) + self.loop.run_until_complete(future) + self.assertEqual(8, future.result()) + + future = self.loop.create_task(iterator.__anext__()) + self.loop.run_until_complete(future) + self.assertEqual(9, future.result()) + + future = self.loop.create_task(iterator.__anext__()) + self.assertRaises(StopAsyncIteration, self.loop.run_until_complete, future) + + def test_zip(self): + iterator = azip(arange(5), arange(5, 10), arange(10, 15)) + + future = self.loop.create_task(iterator.__anext__()) + self.loop.run_until_complete(future) + self.assertEqual((0, 5, 10), future.result()) + + future = self.loop.create_task(iterator.__anext__()) + self.loop.run_until_complete(future) + self.assertEqual((1, 6, 11), future.result()) + + future = self.loop.create_task(iterator.__anext__()) + self.loop.run_until_complete(future) + self.assertEqual((2, 7, 12), future.result()) + + future = self.loop.create_task(iterator.__anext__()) + self.loop.run_until_complete(future) + self.assertEqual((3, 8, 13), future.result()) + + future = self.loop.create_task(iterator.__anext__()) + self.loop.run_until_complete(future) + self.assertEqual((4, 9, 14), future.result()) + + future = self.loop.create_task(iterator.__anext__()) + self.assertRaises(StopAsyncIteration, self.loop.run_until_complete, future) + + +if __name__ == '__main__': + unittest.main()