diff --git a/.gitignore b/.gitignore index 72364f9..9719cdf 100644 --- a/.gitignore +++ b/.gitignore @@ -87,3 +87,5 @@ ENV/ # Rope project settings .ropeproject + +.idea \ No newline at end of file diff --git a/mimo/__init__.py b/mimo/__init__.py new file mode 100644 index 0000000..1fde943 --- /dev/null +++ b/mimo/__init__.py @@ -0,0 +1 @@ +from .stream import Stream diff --git a/mimo/connection/__init__.py b/mimo/connection/__init__.py new file mode 100644 index 0000000..acaec5a --- /dev/null +++ b/mimo/connection/__init__.py @@ -0,0 +1,2 @@ +from .connection import Connection +from .connection_set import ConnectionSet diff --git a/mimo/connection/connection.py b/mimo/connection/connection.py new file mode 100644 index 0000000..3496c0f --- /dev/null +++ b/mimo/connection/connection.py @@ -0,0 +1,44 @@ +from collections import deque + + +class Connection: + def __init__(self, name, threshold=10): + self.entities = deque() + self.connections = set() + + self.name = name + self.threshold = threshold + + def peek(self): + return self.entities[0] + + def pop(self): + return self.entities.popleft() + + def push(self, entity): + """ + Add an entity to the end of the connection. Return if connection can still be pushed to. + :param entity: + :return: + """ + self.entities.append(entity) + return len(self.entities) < self.threshold + + def extend(self, entities): + self.entities.extend(entities) + return len(self.entities) < self.threshold + + def is_full(self): + return len(self.entities) >= self.threshold + + def join(self, connection): + self.connections.add(connection) + + def drain(self): + entities = self.entities + if len(entities) == 0 or any(connection.is_full() for connection in self.connections): + return False + for connection in self.connections: + connection.extend(entities) + entities.clear() + return True diff --git a/mimo/connection/connection_set.py b/mimo/connection/connection_set.py new file mode 100644 index 0000000..85ec5cc --- /dev/null +++ b/mimo/connection/connection_set.py @@ -0,0 +1,29 @@ +class ConnectionSet: + def __init__(self, connections): + self.connections = {connection.name: connection for connection in connections} + self.streams = {connection.name: set() for connection in connections} + + def __iter__(self): + return iter(self.connections.values()) + + def __len__(self): + return len(self.connections) + + def __getattr__(self, key): + if key in self.connections: + return self.connections[key] + return self.__getattribute__(key) + + def __getitem__(self, key): + return self.connections[key] + + def drain(self): + """ + Drain all connections and return the streams that received updates. + :return: Set of updated streams + """ + drained = set() + for connection in self.connections.values(): + if connection.drain(): + drained.add(connection.name) + return drained diff --git a/mimo/stream/__init__.py b/mimo/stream/__init__.py new file mode 100644 index 0000000..1fde943 --- /dev/null +++ b/mimo/stream/__init__.py @@ -0,0 +1 @@ +from .stream import Stream diff --git a/mimo/stream/apply_stream.py b/mimo/stream/apply_stream.py new file mode 100644 index 0000000..5e38795 --- /dev/null +++ b/mimo/stream/apply_stream.py @@ -0,0 +1,17 @@ +from mimo.stream import Stream + + +class ApplyStream(Stream): + + IN = ['entity'] + OUT = ['entity'] + + def __init__(self, fn): + super().__init__() + self.fn = fn + + def run(self, ins, outs): + fn = self.fn + while len(ins.entity) > 0 and outs.entity.push(fn(ins.entity.pop())): + continue + return len(ins.entity) > 0 diff --git a/mimo/stream/converter_stream.py b/mimo/stream/converter_stream.py new file mode 100644 index 0000000..068dbde --- /dev/null +++ b/mimo/stream/converter_stream.py @@ -0,0 +1,9 @@ +from mimo.stream import Stream + + +class ConverterStream(Stream): + + IN = ['entity'] + OUT = ['entity'] + + def __init__(self, ): \ No newline at end of file diff --git a/mimo/stream/extractor_stream.py b/mimo/stream/extractor_stream.py new file mode 100644 index 0000000..deb18dd --- /dev/null +++ b/mimo/stream/extractor_stream.py @@ -0,0 +1,21 @@ +from mimo.stream import Stream + + +class ExtractorStream(Stream): + + IN = ['entity'] + OUT = ['entity'] + + def __init__(self, path, ins, outs): + super().__init__(ins, outs) + self.path = path + + def run(self, ins, outs): + path = self.path + while len(ins.entity) > 0: + entity = ins.entity.pop() + for step in path: + entity = step(entity) + if not outs.entity.push(entity): + break + return len(ins.entity) > 0 diff --git a/mimo/stream/iterator_stream.py b/mimo/stream/iterator_stream.py new file mode 100644 index 0000000..f1ff068 --- /dev/null +++ b/mimo/stream/iterator_stream.py @@ -0,0 +1,17 @@ +from mimo.stream import Stream + + +class IteratorStream(Stream): + + IN = [] + OUT = ['line'] + + def __init__(self, iterator): + super().__init__() + self.iterator = iterator + + def run(self, ins, outs): + for line in self.iterator: + if not outs.line.push(line): + return True + return False diff --git a/mimo/stream/stream.py b/mimo/stream/stream.py new file mode 100644 index 0000000..2770a29 --- /dev/null +++ b/mimo/stream/stream.py @@ -0,0 +1,84 @@ +from mimo.connection import Connection, ConnectionSet + + +class Stream: + + IN = [] + OUT = [] + + def __init__(self, ins=None, outs=None, name=None): + self.paused = False + + ins = self.IN if ins is None else ins + outs = self.OUT if outs is None else outs + self.ins = ConnectionSet(Connection(name) for name in ins) + self.outs = ConnectionSet(Connection(name) for name in outs) + self.children = {out: set() for out in outs} + self.name = type(self).__name__ if name is None else name + + def run(self, ins, outs): + """ + The main method to over-ride when implementing custom streams. + :param ins: connection set of input connections + :type ins: ConnectionSet + :param outs: connection set of output connections + :type outs: ConnectionSet + :return: True if stream is paused + """ + raise NotImplementedError + + def activate(self): + """ + Run a step and propogate the output entities to any connected child streams. + :return: + """ + run = self.run + ins = self.ins + outs = self.outs + children = self.children + + paused = True + while paused: + paused = run(ins, outs) + updated_streams = set() + for streams in children.values(): + updated_streams.update(stream for stream in streams if stream.paused) + for out in outs.drain(): + updated_streams.update(children[out]) + for updated_stream in updated_streams: + updated_stream.activate() + self.paused = paused + + def pipe(self, stream, output=None, input=None): + """ + Pipe the output of one stream to the input of another. If there are more than one outputs or inputs, the + specific output/input must be specified. + :param stream: stream to connect to + :param output: name of the output connection (default: None) + :param input: name of the input connection (default: None) + :return: stream connected to + """ + if len(self.outs) == 0: + raise ValueError('{} has no output to pipe from'.format(self.name)) + elif len(stream.ins) == 0: + raise ValueError('{} has no input to pipe to'.format(stream.name)) + + if output is None: + if len(self.outs) == 1: + from_connection = next(iter(self.outs)) + else: + raise ValueError('{} has multiple output and none chosen to pipe from'.format(self.name)) + else: + from_connection = self.outs[output] + + if input is None: + if len(stream.ins) == 1: + to_connection = next(iter(stream.ins)) + else: + raise ValueError('{} has multiple output and none chosen to pipe from'.format(stream.name)) + else: + to_connection = stream.ins[input] + + from_connection.join(to_connection) + self.children[from_connection.name].add(stream) + return stream diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/test_connection/__init__.py b/tests/test_connection/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/test_connection/test_connection.py b/tests/test_connection/test_connection.py new file mode 100644 index 0000000..441d266 --- /dev/null +++ b/tests/test_connection/test_connection.py @@ -0,0 +1,60 @@ +import unittest + +from mimo.connection import Connection +from mimo.stream import Stream + + +class TestConnection(unittest.TestCase): + def test_peek(self): + connection = Connection('a') + + self.assertRaises(IndexError, connection.peek) + connection.entities.append(1) + self.assertEqual(1, connection.peek()) + + def test_pop(self): + connection = Connection('a') + + self.assertRaises(IndexError, connection.pop) + connection.entities.extend((1, 2, 3)) + self.assertEqual(1, connection.pop()) + self.assertEqual([2, 3], list(connection.entities)) + + def test_push(self): + connection = Connection('a', 3) + + self.assertTrue(connection.push(1)) + self.assertEqual([1], list(connection.entities)) + self.assertTrue(connection.push(2)) + self.assertEqual([1, 2], list(connection.entities)) + self.assertFalse(connection.push(3)) + self.assertEqual([1, 2, 3], list(connection.entities)) + + def test_extend(self): + connection = Connection('a', 3) + + self.assertFalse(connection.extend((1, 2, 3, 4))) + self.assertEqual([1, 2, 3, 4], list(connection.entities)) + + def test_connect_to_input(self): + connection = Connection('a') + stream = Stream(['input'], ['output']) + + connection.join(stream, 'input') + + self.assertEqual(stream, next(iter(connection.streams))) + self.assertEqual(stream.ins['input'], next(iter(connection.connections))) + + def test_drain(self): + connection = Connection('a') + stream = Stream(['input', 'output']) + + connection.join(stream, 'input') + connection.entities.extend([1, 2, 3, 4, 5]) + connection.drain() + + self.assertEqual([1, 2, 3, 4, 5], list(stream.ins.input.entities)) + + +if __name__ == '__main__': + unittest.main() diff --git a/tests/test_connection/test_connection_set.py b/tests/test_connection/test_connection_set.py new file mode 100644 index 0000000..e4dfcd2 --- /dev/null +++ b/tests/test_connection/test_connection_set.py @@ -0,0 +1,30 @@ +import unittest + +from mimo.connection import Connection, ConnectionSet + + +class TestConnectionSet(unittest.TestCase): + def test_items_and_attributes(self): + a = Connection('a') + b = Connection('b') + connections = ConnectionSet([a, b]) + + self.assertEqual(a, connections.a) + self.assertEqual(b, connections.b) + self.assertEqual(a, connections['a']) + self.assertEqual(b, connections['b']) + + def test_drain(self): + connection = Connection('a') + connection.entities.extend((1, 2, 3, 4)) + + self.assertTrue(connection.drain()) + + def test_drain_empty(self): + connection = Connection('a') + + self.assertFalse(connection.drain()) + + +if __name__ == '__main__': + unittest.main() diff --git a/tests/test_stream/__init__.py b/tests/test_stream/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/test_stream/test_stream.py b/tests/test_stream/test_stream.py new file mode 100644 index 0000000..c0737f1 --- /dev/null +++ b/tests/test_stream/test_stream.py @@ -0,0 +1,18 @@ +import unittest + +from mimo import Stream + + +class TestStream(unittest.TestCase): + def test_pipe(self): + stream1 = Stream(['a'], ['b']) + stream2 = Stream(['c'], ['d']) + + stream1.pipe(stream2) + + self.assertEqual({stream2}, stream1.children['b']) + self.assertEqual({stream2.ins['c']}, stream1.outs['b'].connections) + + +if __name__ == '__main__': + unittest.main()